Skip to content

Commit c125179

Browse files
committed
add copy_both_simple method
Signed-off-by: Petros Angelatos <[email protected]>
1 parent e7ecfce commit c125179

File tree

7 files changed

+576
-3
lines changed

7 files changed

+576
-3
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2222
pub const ERROR_RESPONSE_TAG: u8 = b'E';
2323
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
2424
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
25+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
2526
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
2627
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
2728
pub const NO_DATA_TAG: u8 = b'n';
@@ -93,6 +94,7 @@ pub enum Message {
9394
CopyDone,
9495
CopyInResponse(CopyInResponseBody),
9596
CopyOutResponse(CopyOutResponseBody),
97+
CopyBothResponse(CopyBothResponseBody),
9698
DataRow(DataRowBody),
9799
EmptyQueryResponse,
98100
ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
190192
storage,
191193
})
192194
}
195+
COPY_BOTH_RESPONSE_TAG => {
196+
let format = buf.read_u8()?;
197+
let len = buf.read_u16::<BigEndian>()?;
198+
let storage = buf.read_all();
199+
Message::CopyBothResponse(CopyBothResponseBody {
200+
format,
201+
len,
202+
storage,
203+
})
204+
}
193205
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194206
BACKEND_KEY_DATA_TAG => {
195207
let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,28 @@ impl CopyOutResponseBody {
524536
}
525537
}
526538

539+
#[derive(Debug, Clone)]
540+
pub struct CopyBothResponseBody {
541+
format: u8,
542+
len: u16,
543+
storage: Bytes,
544+
}
545+
546+
impl CopyBothResponseBody {
547+
#[inline]
548+
pub fn format(&self) -> u8 {
549+
self.format
550+
}
551+
552+
#[inline]
553+
pub fn column_formats(&self) -> ColumnFormats<'_> {
554+
ColumnFormats {
555+
remaining: self.len,
556+
buf: &self.storage,
557+
}
558+
}
559+
}
560+
527561
#[derive(Debug, Clone)]
528562
pub struct DataRowBody {
529563
storage: Bytes,

tokio-postgres/src/client.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
use crate::codec::BackendMessages;
1+
use crate::codec::{BackendMessages, FrontendMessage};
22
use crate::config::SslMode;
33
use crate::connection::{Request, RequestMessages};
4+
use crate::copy_both::{CopyBothDuplex, CopyBothReceiver};
45
use crate::copy_out::CopyOutStream;
56
#[cfg(feature = "runtime")]
67
use crate::keepalive::KeepaliveConfig;
@@ -13,8 +14,9 @@ use crate::types::{Oid, ToSql, Type};
1314
#[cfg(feature = "runtime")]
1415
use crate::Socket;
1516
use crate::{
16-
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
17-
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
17+
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
18+
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
19+
TransactionBuilder,
1820
};
1921
use bytes::{Buf, BytesMut};
2022
use fallible_iterator::FallibleIterator;
@@ -41,6 +43,11 @@ pub struct Responses {
4143
cur: BackendMessages,
4244
}
4345

46+
pub struct CopyBothHandles {
47+
pub(crate) stream_receiver: mpsc::Receiver<Result<Message, Error>>,
48+
pub(crate) sink_sender: mpsc::Sender<FrontendMessage>,
49+
}
50+
4451
impl Responses {
4552
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
4653
loop {
@@ -115,6 +122,32 @@ impl InnerClient {
115122
})
116123
}
117124

125+
pub fn start_copy_both(&self) -> Result<CopyBothHandles, Error> {
126+
let (sender, receiver) = mpsc::channel(1);
127+
let (stream_sender, stream_receiver) = mpsc::channel(0);
128+
let (sink_sender, sink_receiver) = mpsc::channel(0);
129+
130+
let responses = Responses {
131+
receiver,
132+
cur: BackendMessages::empty(),
133+
};
134+
let messages = RequestMessages::CopyBoth(CopyBothReceiver::new(
135+
responses,
136+
sink_receiver,
137+
stream_sender,
138+
));
139+
140+
let request = Request { messages, sender };
141+
self.sender
142+
.unbounded_send(request)
143+
.map_err(|_| Error::closed())?;
144+
145+
Ok(CopyBothHandles {
146+
stream_receiver,
147+
sink_sender,
148+
})
149+
}
150+
118151
pub fn typeinfo(&self) -> Option<Statement> {
119152
self.cached_typeinfo.lock().typeinfo.clone()
120153
}
@@ -505,6 +538,15 @@ impl Client {
505538
copy_out::copy_out(self.inner(), statement).await
506539
}
507540

541+
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
542+
/// data.
543+
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
544+
where
545+
T: Buf + 'static + Send,
546+
{
547+
copy_both::copy_both_simple(self.inner(), query).await
548+
}
549+
508550
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
509551
///
510552
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

tokio-postgres/src/connection.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2+
use crate::copy_both::CopyBothReceiver;
23
use crate::copy_in::CopyInReceiver;
34
use crate::error::DbError;
45
use crate::maybe_tls_stream::MaybeTlsStream;
@@ -20,6 +21,7 @@ use tokio_util::codec::Framed;
2021
pub enum RequestMessages {
2122
Single(FrontendMessage),
2223
CopyIn(CopyInReceiver),
24+
CopyBoth(CopyBothReceiver),
2325
}
2426

2527
pub struct Request {
@@ -258,6 +260,24 @@ where
258260
.map_err(Error::io)?;
259261
self.pending_request = Some(RequestMessages::CopyIn(receiver));
260262
}
263+
RequestMessages::CopyBoth(mut receiver) => {
264+
let message = match receiver.poll_next_unpin(cx) {
265+
Poll::Ready(Some(message)) => message,
266+
Poll::Ready(None) => {
267+
trace!("poll_write: finished copy_both request");
268+
continue;
269+
}
270+
Poll::Pending => {
271+
trace!("poll_write: waiting on copy_both stream");
272+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
273+
return Ok(true);
274+
}
275+
};
276+
Pin::new(&mut self.stream)
277+
.start_send(message)
278+
.map_err(Error::io)?;
279+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
280+
}
261281
}
262282
}
263283
}

0 commit comments

Comments
 (0)