Skip to content

Commit 8c28f8b

Browse files
committed
Overhaul simple_query
1 parent c7055dc commit 8c28f8b

File tree

7 files changed

+44
-57
lines changed

7 files changed

+44
-57
lines changed

postgres/src/client.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -303,22 +303,7 @@ impl Client {
303303
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
304304
/// them to this method!
305305
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
306-
self.simple_query_iter(query)?.collect()
307-
}
308-
309-
/// Like `simple_query`, except that it returns a fallible iterator over the resulting values rather than buffering
310-
/// the response in memory.
311-
///
312-
/// # Warning
313-
///
314-
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
315-
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
316-
/// them to this method!
317-
pub fn simple_query_iter<'a>(
318-
&'a mut self,
319-
query: &'a str,
320-
) -> Result<impl FallibleIterator<Item = SimpleQueryMessage, Error = Error> + 'a, Error> {
321-
Ok(Iter::new(self.0.simple_query(query)))
306+
executor::block_on(self.0.simple_query(query))
322307
}
323308

324309
/// Executes a sequence of SQL statements using the simple query protocol.

postgres/src/transaction.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,7 @@ impl<'a> Transaction<'a> {
137137

138138
/// Like `Client::simple_query`.
139139
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
140-
self.simple_query_iter(query)?.collect()
141-
}
142-
143-
/// Like `Client::simple_query_iter`.
144-
pub fn simple_query_iter<'b>(
145-
&'b mut self,
146-
query: &'b str,
147-
) -> Result<impl FallibleIterator<Item = SimpleQueryMessage, Error = Error> + 'b, Error> {
148-
Ok(Iter::new(self.0.simple_query(query)))
140+
executor::block_on(self.0.simple_query(query))
149141
}
150142

151143
/// Like `Client::batch_execute`.

tokio-postgres/src/client.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::cancel_query;
33
use crate::codec::BackendMessages;
44
use crate::config::{Host, SslMode};
55
use crate::connection::{Request, RequestMessages};
6+
use crate::simple_query::SimpleQueryStream;
67
use crate::copy_out::CopyStream;
78
use crate::query::RowStream;
89
use crate::slice_iter;
@@ -20,7 +21,7 @@ use crate::{Error, Statement};
2021
use bytes::IntoBuf;
2122
use fallible_iterator::FallibleIterator;
2223
use futures::channel::mpsc;
23-
use futures::{future, Stream, TryStream, TryStreamExt};
24+
use futures::{future, TryStream, TryStreamExt};
2425
use futures::{ready, StreamExt};
2526
use parking_lot::Mutex;
2627
use postgres_protocol::message::backend::Message;
@@ -320,11 +321,15 @@ impl Client {
320321
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
321322
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
322323
/// them to this method!
323-
pub fn simple_query<'a>(
324-
&'a self,
325-
query: &'a str,
326-
) -> impl Stream<Item = Result<SimpleQueryMessage, Error>> + 'a {
327-
simple_query::simple_query(self.inner(), query)
324+
pub async fn simple_query(
325+
&self,
326+
query: &str,
327+
) -> Result<Vec<SimpleQueryMessage>, Error> {
328+
self.simple_query_raw(query).await?.try_collect().await
329+
}
330+
331+
pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
332+
simple_query::simple_query(self.inner(), query).await
328333
}
329334

330335
/// Executes a sequence of SQL statements using the simple query protocol.

tokio-postgres/src/connect.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::connect_raw::connect_raw;
44
use crate::connect_socket::connect_socket;
55
use crate::tls::{MakeTlsConnect, TlsConnect};
66
use crate::{Client, Config, Connection, Error, SimpleQueryMessage, Socket};
7-
use futures::future;
7+
use futures::{future, Future};
88
use futures::{FutureExt, Stream};
99
use pin_utils::pin_mut;
1010
use std::io;
@@ -50,7 +50,7 @@ where
5050
}
5151
}
5252

53-
return Err(error.unwrap());
53+
Err(error.unwrap())
5454
}
5555

5656
async fn connect_once<T>(
@@ -73,7 +73,16 @@ where
7373
let (mut client, mut connection) = connect_raw(socket, tls, config).await?;
7474

7575
if let TargetSessionAttrs::ReadWrite = config.target_session_attrs {
76-
let rows = client.simple_query("SHOW transaction_read_only");
76+
let rows = client.simple_query_raw("SHOW transaction_read_only");
77+
pin_mut!(rows);
78+
79+
let rows = future::poll_fn(|cx| {
80+
if connection.poll_unpin(cx)?.is_ready() {
81+
return Poll::Ready(Err(Error::closed()));
82+
}
83+
84+
rows.as_mut().poll(cx)
85+
}).await?;
7786
pin_mut!(rows);
7887

7988
loop {

tokio-postgres/src/simple_query.rs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,24 @@ use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
55
use fallible_iterator::FallibleIterator;
6-
use futures::{ready, Stream, TryFutureExt};
6+
use futures::{ready, Stream};
77
use postgres_protocol::message::backend::Message;
88
use postgres_protocol::message::frontend;
99
use std::pin::Pin;
1010
use std::sync::Arc;
1111
use std::task::{Context, Poll};
1212

13-
pub fn simple_query<'a>(
14-
client: &'a InnerClient,
15-
query: &'a str,
16-
) -> impl Stream<Item = Result<SimpleQueryMessage, Error>> + 'a {
17-
let f = async move {
18-
let buf = encode(query)?;
19-
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
13+
pub async fn simple_query(
14+
client: &InnerClient,
15+
query: &str,
16+
) -> Result<SimpleQueryStream, Error> {
17+
let buf = encode(query)?;
18+
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
2019

21-
Ok(SimpleQuery {
22-
responses,
23-
columns: None,
24-
})
25-
};
26-
f.try_flatten_stream()
20+
Ok(SimpleQueryStream {
21+
responses,
22+
columns: None,
23+
})
2724
}
2825

2926
pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
@@ -48,12 +45,12 @@ fn encode(query: &str) -> Result<Vec<u8>, Error> {
4845
Ok(buf)
4946
}
5047

51-
struct SimpleQuery {
48+
pub struct SimpleQueryStream {
5249
responses: Responses,
5350
columns: Option<Arc<[String]>>,
5451
}
5552

56-
impl Stream for SimpleQuery {
53+
impl Stream for SimpleQueryStream {
5754
type Item = Result<SimpleQueryMessage, Error>;
5855

5956
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

tokio-postgres/src/transaction.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
bind, query, slice_iter, Client, Error, Portal, Row, SimpleQueryMessage, Statement, ToStatement,
1313
};
1414
use bytes::IntoBuf;
15-
use futures::{Stream, TryStream, TryStreamExt};
15+
use futures::{TryStream, TryStreamExt};
1616
use postgres_protocol::message::frontend;
1717
use std::error;
1818
use tokio::io::{AsyncRead, AsyncWrite};
@@ -224,11 +224,11 @@ impl<'a> Transaction<'a> {
224224
}
225225

226226
/// Like `Client::simple_query`.
227-
pub fn simple_query<'b>(
228-
&'b self,
229-
query: &'b str,
230-
) -> impl Stream<Item = Result<SimpleQueryMessage, Error>> + 'b {
231-
self.client.simple_query(query)
227+
pub async fn simple_query(
228+
&self,
229+
query: &str,
230+
) -> Result<Vec<SimpleQueryMessage>, Error> {
231+
self.client.simple_query(query).await
232232
}
233233

234234
/// Like `Client::batch_execute`.

tokio-postgres/tests/test/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,6 @@ async fn simple_query() {
264264
INSERT INTO foo (name) VALUES ('steven'), ('joe');
265265
SELECT * FROM foo ORDER BY id;",
266266
)
267-
.try_collect::<Vec<_>>()
268267
.await
269268
.unwrap();
270269

0 commit comments

Comments
 (0)