Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a1bdd0b

Browse files
authoredJul 14, 2024··
Merge pull request #1160 from sfackler/query-typed-cleanup
query_typed tweaks
2 parents 257bcfd + 71c836b commit a1bdd0b

File tree

7 files changed

+243
-71
lines changed

7 files changed

+243
-71
lines changed
 

‎postgres/src/client.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,71 @@ impl Client {
257257
Ok(RowIter::new(self.connection.as_ref(), stream))
258258
}
259259

260+
/// Like `query`, but requires the types of query parameters to be explicitly specified.
261+
///
262+
/// Compared to `query`, this method allows performing queries without three round trips (for
263+
/// prepare, execute, and close) by requiring the caller to specify parameter values along with
264+
/// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
265+
/// supported (such as Cloudflare Workers with Hyperdrive).
266+
///
267+
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the
268+
/// parameter of the list provided, 1-indexed.
269+
pub fn query_typed(
270+
&mut self,
271+
query: &str,
272+
params: &[(&(dyn ToSql + Sync), Type)],
273+
) -> Result<Vec<Row>, Error> {
274+
self.connection
275+
.block_on(self.client.query_typed(query, params))
276+
}
277+
278+
/// The maximally flexible version of [`query_typed`].
279+
///
280+
/// Compared to `query`, this method allows performing queries without three round trips (for
281+
/// prepare, execute, and close) by requiring the caller to specify parameter values along with
282+
/// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
283+
/// supported (such as Cloudflare Workers with Hyperdrive).
284+
///
285+
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the
286+
/// parameter of the list provided, 1-indexed.
287+
///
288+
/// [`query_typed`]: #method.query_typed
289+
///
290+
/// # Examples
291+
/// ```no_run
292+
/// # use postgres::{Client, NoTls};
293+
/// use postgres::types::{ToSql, Type};
294+
/// use fallible_iterator::FallibleIterator;
295+
/// # fn main() -> Result<(), postgres::Error> {
296+
/// # let mut client = Client::connect("host=localhost user=postgres", NoTls)?;
297+
///
298+
/// let params: Vec<(String, Type)> = vec![
299+
/// ("first param".into(), Type::TEXT),
300+
/// ("second param".into(), Type::TEXT),
301+
/// ];
302+
/// let mut it = client.query_typed_raw(
303+
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
304+
/// params,
305+
/// )?;
306+
///
307+
/// while let Some(row) = it.next()? {
308+
/// let foo: i32 = row.get("foo");
309+
/// println!("foo: {}", foo);
310+
/// }
311+
/// # Ok(())
312+
/// # }
313+
/// ```
314+
pub fn query_typed_raw<P, I>(&mut self, query: &str, params: I) -> Result<RowIter<'_>, Error>
315+
where
316+
P: BorrowToSql,
317+
I: IntoIterator<Item = (P, Type)>,
318+
{
319+
let stream = self
320+
.connection
321+
.block_on(self.client.query_typed_raw(query, params))?;
322+
Ok(RowIter::new(self.connection.as_ref(), stream))
323+
}
324+
260325
/// Creates a new prepared statement.
261326
///
262327
/// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),

‎postgres/src/generic_client.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,19 @@ pub trait GenericClient: private::Sealed {
4444
I: IntoIterator<Item = P>,
4545
I::IntoIter: ExactSizeIterator;
4646

47+
/// Like [`Client::query_typed`]
48+
fn query_typed(
49+
&mut self,
50+
statement: &str,
51+
params: &[(&(dyn ToSql + Sync), Type)],
52+
) -> Result<Vec<Row>, Error>;
53+
54+
/// Like [`Client::query_typed_raw`]
55+
fn query_typed_raw<P, I>(&mut self, statement: &str, params: I) -> Result<RowIter<'_>, Error>
56+
where
57+
P: BorrowToSql,
58+
I: IntoIterator<Item = (P, Type)> + Sync + Send;
59+
4760
/// Like `Client::prepare`.
4861
fn prepare(&mut self, query: &str) -> Result<Statement, Error>;
4962

@@ -115,6 +128,22 @@ impl GenericClient for Client {
115128
self.query_raw(query, params)
116129
}
117130

131+
fn query_typed(
132+
&mut self,
133+
statement: &str,
134+
params: &[(&(dyn ToSql + Sync), Type)],
135+
) -> Result<Vec<Row>, Error> {
136+
self.query_typed(statement, params)
137+
}
138+
139+
fn query_typed_raw<P, I>(&mut self, statement: &str, params: I) -> Result<RowIter<'_>, Error>
140+
where
141+
P: BorrowToSql,
142+
I: IntoIterator<Item = (P, Type)> + Sync + Send,
143+
{
144+
self.query_typed_raw(statement, params)
145+
}
146+
118147
fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
119148
self.prepare(query)
120149
}
@@ -195,6 +224,22 @@ impl GenericClient for Transaction<'_> {
195224
self.query_raw(query, params)
196225
}
197226

227+
fn query_typed(
228+
&mut self,
229+
statement: &str,
230+
params: &[(&(dyn ToSql + Sync), Type)],
231+
) -> Result<Vec<Row>, Error> {
232+
self.query_typed(statement, params)
233+
}
234+
235+
fn query_typed_raw<P, I>(&mut self, statement: &str, params: I) -> Result<RowIter<'_>, Error>
236+
where
237+
P: BorrowToSql,
238+
I: IntoIterator<Item = (P, Type)> + Sync + Send,
239+
{
240+
self.query_typed_raw(statement, params)
241+
}
242+
198243
fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
199244
self.prepare(query)
200245
}

‎postgres/src/transaction.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,35 @@ impl<'a> Transaction<'a> {
115115
Ok(RowIter::new(self.connection.as_ref(), stream))
116116
}
117117

118+
/// Like `Client::query_typed`.
119+
pub fn query_typed(
120+
&mut self,
121+
statement: &str,
122+
params: &[(&(dyn ToSql + Sync), Type)],
123+
) -> Result<Vec<Row>, Error> {
124+
self.connection.block_on(
125+
self.transaction
126+
.as_ref()
127+
.unwrap()
128+
.query_typed(statement, params),
129+
)
130+
}
131+
132+
/// Like `Client::query_typed_raw`.
133+
pub fn query_typed_raw<P, I>(&mut self, query: &str, params: I) -> Result<RowIter<'_>, Error>
134+
where
135+
P: BorrowToSql,
136+
I: IntoIterator<Item = (P, Type)>,
137+
{
138+
let stream = self.connection.block_on(
139+
self.transaction
140+
.as_ref()
141+
.unwrap()
142+
.query_typed_raw(query, params),
143+
)?;
144+
Ok(RowIter::new(self.connection.as_ref(), stream))
145+
}
146+
118147
/// Binds parameters to a statement, creating a "portal".
119148
///
120149
/// Portals can be used with the `query_portal` method to page through the results of a query without being forced

‎tokio-postgres/src/client.rs

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ impl Client {
333333
///
334334
/// ```no_run
335335
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
336-
/// use tokio_postgres::types::ToSql;
337336
/// use futures_util::{pin_mut, TryStreamExt};
338337
///
339338
/// let params: Vec<String> = vec![
@@ -373,43 +372,59 @@ impl Client {
373372
///
374373
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the
375374
/// parameter of the list provided, 1-indexed.
375+
pub async fn query_typed(
376+
&self,
377+
query: &str,
378+
params: &[(&(dyn ToSql + Sync), Type)],
379+
) -> Result<Vec<Row>, Error> {
380+
self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
381+
.await?
382+
.try_collect()
383+
.await
384+
}
385+
386+
/// The maximally flexible version of [`query_typed`].
387+
///
388+
/// Compared to `query`, this method allows performing queries without three round trips (for
389+
/// prepare, execute, and close) by requiring the caller to specify parameter values along with
390+
/// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
391+
/// supported (such as Cloudflare Workers with Hyperdrive).
392+
///
393+
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the
394+
/// parameter of the list provided, 1-indexed.
395+
///
396+
/// [`query_typed`]: #method.query_typed
376397
///
377398
/// # Examples
378399
///
379400
/// ```no_run
380401
/// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
381-
/// use tokio_postgres::types::ToSql;
382-
/// use tokio_postgres::types::Type;
383402
/// use futures_util::{pin_mut, TryStreamExt};
403+
/// use tokio_postgres::types::Type;
384404
///
385-
/// let rows = client.query_typed(
405+
/// let params: Vec<(String, Type)> = vec![
406+
/// ("first param".into(), Type::TEXT),
407+
/// ("second param".into(), Type::TEXT),
408+
/// ];
409+
/// let mut it = client.query_typed_raw(
386410
/// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
387-
/// &[(&"first param", Type::TEXT), (&2i32, Type::INT4)],
411+
/// params,
388412
/// ).await?;
389413
///
390-
/// for row in rows {
391-
/// let foo: i32 = row.get("foo");
392-
/// println!("foo: {}", foo);
414+
/// pin_mut!(it);
415+
/// while let Some(row) = it.try_next().await? {
416+
/// let foo: i32 = row.get("foo");
417+
/// println!("foo: {}", foo);
393418
/// }
394419
/// # Ok(())
395420
/// # }
396421
/// ```
397-
pub async fn query_typed(
398-
&self,
399-
statement: &str,
400-
params: &[(&(dyn ToSql + Sync), Type)],
401-
) -> Result<Vec<Row>, Error> {
402-
fn slice_iter<'a>(
403-
s: &'a [(&'a (dyn ToSql + Sync), Type)],
404-
) -> impl ExactSizeIterator<Item = (&'a dyn ToSql, Type)> + 'a {
405-
s.iter()
406-
.map(|(param, param_type)| (*param as _, param_type.clone()))
407-
}
408-
409-
query::query_typed(&self.inner, statement, slice_iter(params))
410-
.await?
411-
.try_collect()
412-
.await
422+
pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
423+
where
424+
P: BorrowToSql,
425+
I: IntoIterator<Item = (P, Type)>,
426+
{
427+
query::query_typed(&self.inner, query, params).await
413428
}
414429

415430
/// Executes a statement, returning the number of rows modified.

‎tokio-postgres/src/generic_client.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ pub trait GenericClient: private::Sealed {
6363
params: &[(&(dyn ToSql + Sync), Type)],
6464
) -> Result<Vec<Row>, Error>;
6565

66+
/// Like [`Client::query_typed_raw`]
67+
async fn query_typed_raw<P, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
68+
where
69+
P: BorrowToSql,
70+
I: IntoIterator<Item = (P, Type)> + Sync + Send;
71+
6672
/// Like [`Client::prepare`].
6773
async fn prepare(&self, query: &str) -> Result<Statement, Error>;
6874

@@ -154,6 +160,14 @@ impl GenericClient for Client {
154160
self.query_typed(statement, params).await
155161
}
156162

163+
async fn query_typed_raw<P, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
164+
where
165+
P: BorrowToSql,
166+
I: IntoIterator<Item = (P, Type)> + Sync + Send,
167+
{
168+
self.query_typed_raw(statement, params).await
169+
}
170+
157171
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
158172
self.prepare(query).await
159173
}
@@ -252,6 +266,14 @@ impl GenericClient for Transaction<'_> {
252266
self.query_typed(statement, params).await
253267
}
254268

269+
async fn query_typed_raw<P, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
270+
where
271+
P: BorrowToSql,
272+
I: IntoIterator<Item = (P, Type)> + Sync + Send,
273+
{
274+
self.query_typed_raw(statement, params).await
275+
}
276+
255277
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
256278
self.prepare(query).await
257279
}

‎tokio-postgres/src/query.rs

Lines changed: 25 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -69,29 +69,21 @@ pub async fn query_typed<'a, P, I>(
6969
where
7070
P: BorrowToSql,
7171
I: IntoIterator<Item = (P, Type)>,
72-
I::IntoIter: ExactSizeIterator,
7372
{
74-
let (params, param_types): (Vec<_>, Vec<_>) = params.into_iter().unzip();
75-
76-
let params = params.into_iter();
77-
78-
let param_oids = param_types.iter().map(|t| t.oid()).collect::<Vec<_>>();
79-
80-
let params = params.into_iter();
81-
82-
let buf = client.with_buf(|buf| {
83-
frontend::parse("", query, param_oids.into_iter(), buf).map_err(Error::parse)?;
84-
85-
encode_bind_with_statement_name_and_param_types("", &param_types, params, "", buf)?;
86-
87-
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
88-
89-
frontend::execute("", 0, buf).map_err(Error::encode)?;
73+
let buf = {
74+
let params = params.into_iter().collect::<Vec<_>>();
75+
let param_oids = params.iter().map(|(_, t)| t.oid()).collect::<Vec<_>>();
9076

91-
frontend::sync(buf);
77+
client.with_buf(|buf| {
78+
frontend::parse("", query, param_oids.into_iter(), buf).map_err(Error::parse)?;
79+
encode_bind_raw("", params, "", buf)?;
80+
frontend::describe(b'S', "", buf).map_err(Error::encode)?;
81+
frontend::execute("", 0, buf).map_err(Error::encode)?;
82+
frontend::sync(buf);
9283

93-
Ok(buf.split().freeze())
94-
})?;
84+
Ok(buf.split().freeze())
85+
})?
86+
};
9587

9688
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
9789

@@ -233,47 +225,42 @@ where
233225
I: IntoIterator<Item = P>,
234226
I::IntoIter: ExactSizeIterator,
235227
{
236-
encode_bind_with_statement_name_and_param_types(
228+
let params = params.into_iter();
229+
if params.len() != statement.params().len() {
230+
return Err(Error::parameters(params.len(), statement.params().len()));
231+
}
232+
233+
encode_bind_raw(
237234
statement.name(),
238-
statement.params(),
239-
params,
235+
params.zip(statement.params().iter().cloned()),
240236
portal,
241237
buf,
242238
)
243239
}
244240

245-
fn encode_bind_with_statement_name_and_param_types<P, I>(
241+
fn encode_bind_raw<P, I>(
246242
statement_name: &str,
247-
param_types: &[Type],
248243
params: I,
249244
portal: &str,
250245
buf: &mut BytesMut,
251246
) -> Result<(), Error>
252247
where
253248
P: BorrowToSql,
254-
I: IntoIterator<Item = P>,
249+
I: IntoIterator<Item = (P, Type)>,
255250
I::IntoIter: ExactSizeIterator,
256251
{
257-
let params = params.into_iter();
258-
259-
if param_types.len() != params.len() {
260-
return Err(Error::parameters(params.len(), param_types.len()));
261-
}
262-
263252
let (param_formats, params): (Vec<_>, Vec<_>) = params
264-
.zip(param_types.iter())
265-
.map(|(p, ty)| (p.borrow_to_sql().encode_format(ty) as i16, p))
253+
.into_iter()
254+
.map(|(p, ty)| (p.borrow_to_sql().encode_format(&ty) as i16, (p, ty)))
266255
.unzip();
267256

268-
let params = params.into_iter();
269-
270257
let mut error_idx = 0;
271258
let r = frontend::bind(
272259
portal,
273260
statement_name,
274261
param_formats,
275-
params.zip(param_types).enumerate(),
276-
|(idx, (param, ty)), buf| match param.borrow_to_sql().to_sql_checked(ty, buf) {
262+
params.into_iter().enumerate(),
263+
|(idx, (param, ty)), buf| match param.borrow_to_sql().to_sql_checked(&ty, buf) {
277264
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
278265
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
279266
Err(e) => {

‎tokio-postgres/src/transaction.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,24 @@ impl<'a> Transaction<'a> {
149149
self.client.query_raw(statement, params).await
150150
}
151151

152+
/// Like `Client::query_typed`.
153+
pub async fn query_typed(
154+
&self,
155+
statement: &str,
156+
params: &[(&(dyn ToSql + Sync), Type)],
157+
) -> Result<Vec<Row>, Error> {
158+
self.client.query_typed(statement, params).await
159+
}
160+
161+
/// Like `Client::query_typed_raw`.
162+
pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
163+
where
164+
P: BorrowToSql,
165+
I: IntoIterator<Item = (P, Type)>,
166+
{
167+
self.client.query_typed_raw(query, params).await
168+
}
169+
152170
/// Like `Client::execute`.
153171
pub async fn execute<T>(
154172
&self,
@@ -227,15 +245,6 @@ impl<'a> Transaction<'a> {
227245
query::query_portal(self.client.inner(), portal, max_rows).await
228246
}
229247

230-
/// Like `Client::query_typed`.
231-
pub async fn query_typed(
232-
&self,
233-
statement: &str,
234-
params: &[(&(dyn ToSql + Sync), Type)],
235-
) -> Result<Vec<Row>, Error> {
236-
self.client.query_typed(statement, params).await
237-
}
238-
239248
/// Like `Client::copy_in`.
240249
pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
241250
where

0 commit comments

Comments
 (0)
Please sign in to comment.