Skip to content

Commit 34278d2

Browse files
committed
add copy_both_simple method
Signed-off-by: Petros Angelatos <[email protected]>
1 parent b7a3cfa commit 34278d2

File tree

7 files changed

+549
-2
lines changed

7 files changed

+549
-2
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2222
pub const ERROR_RESPONSE_TAG: u8 = b'E';
2323
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
2424
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
25+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
2526
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
2627
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
2728
pub const NO_DATA_TAG: u8 = b'n';
@@ -93,6 +94,7 @@ pub enum Message {
9394
CopyDone,
9495
CopyInResponse(CopyInResponseBody),
9596
CopyOutResponse(CopyOutResponseBody),
97+
CopyBothResponse(CopyBothResponseBody),
9698
DataRow(DataRowBody),
9799
EmptyQueryResponse,
98100
ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
190192
storage,
191193
})
192194
}
195+
COPY_BOTH_RESPONSE_TAG => {
196+
let format = buf.read_u8()?;
197+
let len = buf.read_u16::<BigEndian>()?;
198+
let storage = buf.read_all();
199+
Message::CopyBothResponse(CopyBothResponseBody {
200+
format,
201+
len,
202+
storage,
203+
})
204+
}
193205
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194206
BACKEND_KEY_DATA_TAG => {
195207
let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,28 @@ impl CopyOutResponseBody {
524536
}
525537
}
526538

539+
#[derive(Debug)]
540+
pub struct CopyBothResponseBody {
541+
format: u8,
542+
len: u16,
543+
storage: Bytes,
544+
}
545+
546+
impl CopyBothResponseBody {
547+
#[inline]
548+
pub fn format(&self) -> u8 {
549+
self.format
550+
}
551+
552+
#[inline]
553+
pub fn column_formats(&self) -> ColumnFormats<'_> {
554+
ColumnFormats {
555+
remaining: self.len,
556+
buf: &self.storage,
557+
}
558+
}
559+
}
560+
527561
#[derive(Debug)]
528562
pub struct DataRowBody {
529563
storage: Bytes,

tokio-postgres/src/client.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::codec::{BackendMessages, FrontendMessage};
33
use crate::config::Host;
44
use crate::config::SslMode;
55
use crate::connection::{Request, RequestMessages};
6+
use crate::copy_both::CopyBothDuplex;
67
use crate::copy_out::CopyOutStream;
78
#[cfg(feature = "runtime")]
89
use crate::keepalive::KeepaliveConfig;
@@ -15,8 +16,9 @@ use crate::types::{Oid, ToSql, Type};
1516
#[cfg(feature = "runtime")]
1617
use crate::Socket;
1718
use crate::{
18-
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
19-
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
19+
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
20+
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
21+
TransactionBuilder,
2022
};
2123
use bytes::{Buf, BytesMut};
2224
use fallible_iterator::FallibleIterator;
@@ -466,6 +468,15 @@ impl Client {
466468
copy_out::copy_out(self.inner(), statement).await
467469
}
468470

471+
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
472+
/// data.
473+
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
474+
where
475+
T: Buf + 'static + Send,
476+
{
477+
copy_both::copy_both_simple(self.inner(), query).await
478+
}
479+
469480
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
470481
///
471482
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

tokio-postgres/src/connection.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2+
use crate::copy_both::CopyBothReceiver;
23
use crate::copy_in::CopyInReceiver;
34
use crate::error::DbError;
45
use crate::maybe_tls_stream::MaybeTlsStream;
@@ -20,6 +21,7 @@ use tokio_util::codec::Framed;
2021
pub enum RequestMessages {
2122
Single(FrontendMessage),
2223
CopyIn(CopyInReceiver),
24+
CopyBoth(CopyBothReceiver),
2325
}
2426

2527
pub struct Request {
@@ -258,6 +260,24 @@ where
258260
.map_err(Error::io)?;
259261
self.pending_request = Some(RequestMessages::CopyIn(receiver));
260262
}
263+
RequestMessages::CopyBoth(mut receiver) => {
264+
let message = match receiver.poll_next_unpin(cx) {
265+
Poll::Ready(Some(message)) => message,
266+
Poll::Ready(None) => {
267+
trace!("poll_write: finished copy_both request");
268+
continue;
269+
}
270+
Poll::Pending => {
271+
trace!("poll_write: waiting on copy_both stream");
272+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
273+
return Ok(true);
274+
}
275+
};
276+
Pin::new(&mut self.stream)
277+
.start_send(message)
278+
.map_err(Error::io)?;
279+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
280+
}
261281
}
262282
}
263283
}

0 commit comments

Comments
 (0)