Skip to content

Commit cfc25fd

Browse files
committed
add copy_both_simple method
Signed-off-by: Petros Angelatos <[email protected]>
1 parent e7ecfce commit cfc25fd

File tree

7 files changed

+549
-2
lines changed

7 files changed

+549
-2
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2222
pub const ERROR_RESPONSE_TAG: u8 = b'E';
2323
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
2424
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
25+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
2526
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
2627
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
2728
pub const NO_DATA_TAG: u8 = b'n';
@@ -93,6 +94,7 @@ pub enum Message {
9394
CopyDone,
9495
CopyInResponse(CopyInResponseBody),
9596
CopyOutResponse(CopyOutResponseBody),
97+
CopyBothResponse(CopyBothResponseBody),
9698
DataRow(DataRowBody),
9799
EmptyQueryResponse,
98100
ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
190192
storage,
191193
})
192194
}
195+
COPY_BOTH_RESPONSE_TAG => {
196+
let format = buf.read_u8()?;
197+
let len = buf.read_u16::<BigEndian>()?;
198+
let storage = buf.read_all();
199+
Message::CopyBothResponse(CopyBothResponseBody {
200+
format,
201+
len,
202+
storage,
203+
})
204+
}
193205
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194206
BACKEND_KEY_DATA_TAG => {
195207
let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,28 @@ impl CopyOutResponseBody {
524536
}
525537
}
526538

539+
#[derive(Debug, Clone)]
540+
pub struct CopyBothResponseBody {
541+
format: u8,
542+
len: u16,
543+
storage: Bytes,
544+
}
545+
546+
impl CopyBothResponseBody {
547+
#[inline]
548+
pub fn format(&self) -> u8 {
549+
self.format
550+
}
551+
552+
#[inline]
553+
pub fn column_formats(&self) -> ColumnFormats<'_> {
554+
ColumnFormats {
555+
remaining: self.len,
556+
buf: &self.storage,
557+
}
558+
}
559+
}
560+
527561
#[derive(Debug, Clone)]
528562
pub struct DataRowBody {
529563
storage: Bytes,

tokio-postgres/src/client.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::codec::BackendMessages;
22
use crate::config::SslMode;
33
use crate::connection::{Request, RequestMessages};
4+
use crate::copy_both::CopyBothDuplex;
45
use crate::copy_out::CopyOutStream;
56
#[cfg(feature = "runtime")]
67
use crate::keepalive::KeepaliveConfig;
@@ -13,8 +14,9 @@ use crate::types::{Oid, ToSql, Type};
1314
#[cfg(feature = "runtime")]
1415
use crate::Socket;
1516
use crate::{
16-
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
17-
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
17+
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
18+
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
19+
TransactionBuilder,
1820
};
1921
use bytes::{Buf, BytesMut};
2022
use fallible_iterator::FallibleIterator;
@@ -505,6 +507,15 @@ impl Client {
505507
copy_out::copy_out(self.inner(), statement).await
506508
}
507509

510+
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
511+
/// data.
512+
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
513+
where
514+
T: Buf + 'static + Send,
515+
{
516+
copy_both::copy_both_simple(self.inner(), query).await
517+
}
518+
508519
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
509520
///
510521
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

tokio-postgres/src/connection.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2+
use crate::copy_both::CopyBothReceiver;
23
use crate::copy_in::CopyInReceiver;
34
use crate::error::DbError;
45
use crate::maybe_tls_stream::MaybeTlsStream;
@@ -20,6 +21,7 @@ use tokio_util::codec::Framed;
2021
pub enum RequestMessages {
2122
Single(FrontendMessage),
2223
CopyIn(CopyInReceiver),
24+
CopyBoth(CopyBothReceiver),
2325
}
2426

2527
pub struct Request {
@@ -258,6 +260,24 @@ where
258260
.map_err(Error::io)?;
259261
self.pending_request = Some(RequestMessages::CopyIn(receiver));
260262
}
263+
RequestMessages::CopyBoth(mut receiver) => {
264+
let message = match receiver.poll_next_unpin(cx) {
265+
Poll::Ready(Some(message)) => message,
266+
Poll::Ready(None) => {
267+
trace!("poll_write: finished copy_both request");
268+
continue;
269+
}
270+
Poll::Pending => {
271+
trace!("poll_write: waiting on copy_both stream");
272+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
273+
return Ok(true);
274+
}
275+
};
276+
Pin::new(&mut self.stream)
277+
.start_send(message)
278+
.map_err(Error::io)?;
279+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
280+
}
261281
}
262282
}
263283
}

0 commit comments

Comments
 (0)