Skip to content

Add text protocol based query method #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
- run: docker compose up -d
- uses: sfackler/actions/rustup@master
with:
version: 1.62.0
version: 1.65.0
- run: echo "::set-output name=version::$(rustc --version)"
id: rust-version
- uses: actions/cache@v1
Expand Down
4 changes: 2 additions & 2 deletions postgres-derive-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ where
T: PartialEq + FromSqlOwned + ToSql + Sync,
S: fmt::Display,
{
for &(ref val, ref repr) in checks.iter() {
for (val, repr) in checks.iter() {
let stmt = conn
.prepare(&format!("SELECT {}::{}", *repr, sql_type))
.unwrap();
Expand All @@ -38,7 +38,7 @@ pub fn test_type_asymmetric<T, F, S, C>(
S: fmt::Display,
C: Fn(&T, &F) -> bool,
{
for &(ref val, ref repr) in checks.iter() {
for (val, repr) in checks.iter() {
let stmt = conn
.prepare(&format!("SELECT {}::{}", *repr, sql_type))
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion postgres-protocol/src/authentication/sasl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl<'a> Parser<'a> {
}

fn posit_number(&mut self) -> io::Result<u32> {
let n = self.take_while(|c| matches!(c, '0'..='9'))?;
let n = self.take_while(|c| c.is_ascii_digit())?;
n.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}
Expand Down
2 changes: 1 addition & 1 deletion postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ impl<'a> FallibleIterator for DataRowRanges<'a> {
));
}
let base = self.len - self.buf.len();
self.buf = &self.buf[len as usize..];
self.buf = &self.buf[len..];
Ok(Some(Some(base..base + len)))
}
}
Expand Down
18 changes: 17 additions & 1 deletion postgres-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,22 @@ impl WrongType {
}
}

/// An error indicating that a as_text conversion was attempted on a binary
/// result.
#[derive(Debug)]
pub struct WrongFormat {}

impl Error for WrongFormat {}

impl fmt::Display for WrongFormat {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
fmt,
"cannot read column as text while it is in binary format"
)
}
}

/// A trait for types that can be created from a Postgres value.
///
/// # Types
Expand Down Expand Up @@ -846,7 +862,7 @@ pub trait ToSql: fmt::Debug {
/// Supported Postgres message format types
///
/// Using Text format in a message assumes a Postgres `SERVER_ENCODING` of `UTF8`
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Format {
/// Text format (UTF-8)
Text,
Expand Down
85 changes: 84 additions & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use crate::copy_both::CopyBothDuplex;
use crate::copy_out::CopyOutStream;
#[cfg(feature = "runtime")]
use crate::keepalive::KeepaliveConfig;
use crate::prepare::get_type;
use crate::query::RowStream;
use crate::simple_query::SimpleQueryStream;
use crate::statement::Column;
#[cfg(feature = "runtime")]
use crate::tls::MakeTlsConnect;
use crate::tls::TlsConnect;
Expand All @@ -20,7 +22,7 @@ use crate::{
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
TransactionBuilder,
};
use bytes::{Buf, BytesMut};
use bytes::{Buf, BufMut, BytesMut};
use fallible_iterator::FallibleIterator;
use futures_channel::mpsc;
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -374,6 +376,87 @@ impl Client {
query::query(&self.inner, statement, params).await
}

/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
/// to save a roundtrip
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
where
S: AsRef<str>,
I: IntoIterator<Item = S>,
I::IntoIter: ExactSizeIterator,
{
let params = params.into_iter();
let params_len = params.len();

let buf = self.inner.with_buf(|buf| {
// Parse, anonymous portal
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
// Bind, pass params as text, retrieve as binary
match frontend::bind(
"", // empty string selects the unnamed portal
"", // empty string selects the unnamed prepared statement
std::iter::empty(), // all parameters use the default format (text)
params,
|param, buf| {
buf.put_slice(param.as_ref().as_bytes());
Ok(postgres_protocol::IsNull::No)
},
Some(0), // all text
buf,
) {
Ok(()) => Ok(()),
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
}?;

// Describe portal to typecast results
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
// Execute
frontend::execute("", 0, buf).map_err(Error::encode)?;
// Sync
frontend::sync(buf);

Ok(buf.split().freeze())
})?;

let mut responses = self
.inner
.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;

// now read the responses

match responses.next().await? {
Message::ParseComplete => {}
_ => return Err(Error::unexpected_message()),
}
match responses.next().await? {
Message::BindComplete => {}
_ => return Err(Error::unexpected_message()),
}
let row_description = match responses.next().await? {
Message::RowDescription(body) => Some(body),
Message::NoData => None,
_ => return Err(Error::unexpected_message()),
};

// construct statement object

let parameters = vec![Type::UNKNOWN; params_len];

let mut columns = vec![];
if let Some(row_description) = row_description {
let mut it = row_description.fields();
while let Some(field) = it.next().map_err(Error::parse)? {
let type_ = get_type(&self.inner, field.type_oid()).await?;
let column = Column::new(field.name().to_string(), type_);
columns.push(column);
}
}

let statement = Statement::new_text(&self.inner, "".to_owned(), parameters, columns);

Ok(RowStream::new(statement, responses))
}

/// Executes a statement, returning the number of rows modified.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
Expand Down
13 changes: 12 additions & 1 deletion tokio-postgres/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ impl FallibleIterator for BackendMessages {
}
}

pub struct PostgresCodec;
pub struct PostgresCodec {
pub max_message_size: Option<usize>,
}

impl Encoder<FrontendMessage> for PostgresCodec {
type Error = io::Error;
Expand Down Expand Up @@ -64,6 +66,15 @@ impl Decoder for PostgresCodec {
break;
}

if let Some(max) = self.max_message_size {
if len > max {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"message too large",
));
}
}

match header.tag() {
backend::NOTICE_RESPONSE_TAG
| backend::NOTIFICATION_RESPONSE_TAG
Expand Down
21 changes: 21 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub struct Config {
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
pub(crate) replication_mode: Option<ReplicationMode>,
pub(crate) max_backend_message_size: Option<usize>,
}

impl Default for Config {
Expand Down Expand Up @@ -217,6 +218,7 @@ impl Config {
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
replication_mode: None,
max_backend_message_size: None,
}
}

Expand Down Expand Up @@ -472,6 +474,17 @@ impl Config {
self.replication_mode
}

/// Set limit for backend messages size.
pub fn max_backend_message_size(&mut self, max_backend_message_size: usize) -> &mut Config {
self.max_backend_message_size = Some(max_backend_message_size);
self
}

/// Get limit for backend messages size.
pub fn get_max_backend_message_size(&self) -> Option<usize> {
self.max_backend_message_size
}

fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key {
"user" => {
Expand Down Expand Up @@ -586,6 +599,14 @@ impl Config {
self.replication_mode(mode);
}
}
"max_backend_message_size" => {
let limit = value.parse::<usize>().map_err(|_| {
Error::config_parse(Box::new(InvalidValue("max_backend_message_size")))
})?;
if limit > 0 {
self.max_backend_message_size(limit);
}
}
key => {
return Err(Error::config_parse(Box::new(UnknownOption(
key.to_string(),
Expand Down
7 changes: 6 additions & 1 deletion tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ where
let stream = connect_tls(stream, config.ssl_mode, tls).await?;

let mut stream = StartupStream {
inner: Framed::new(stream, PostgresCodec),
inner: Framed::new(
stream,
PostgresCodec {
max_message_size: config.max_backend_message_size,
},
),
buf: BackendMessages::empty(),
delayed: VecDeque::new(),
};
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Resu
})
}

async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error> {
pub async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error> {
if let Some(type_) = Type::from_oid(oid) {
return Ok(type_);
}
Expand Down
33 changes: 30 additions & 3 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ where
Ok(RowStream {
statement,
responses,
command_tag: None,
_p: PhantomPinned,
})
}
Expand All @@ -72,6 +73,7 @@ pub async fn query_portal(
Ok(RowStream {
statement: portal.statement().clone(),
responses,
command_tag: None,
_p: PhantomPinned,
})
}
Expand Down Expand Up @@ -202,11 +204,24 @@ pin_project! {
pub struct RowStream {
statement: Statement,
responses: Responses,
command_tag: Option<String>,
#[pin]
_p: PhantomPinned,
}
}

impl RowStream {
/// Creates a new `RowStream`.
pub fn new(statement: Statement, responses: Responses) -> Self {
RowStream {
statement,
responses,
command_tag: None,
_p: PhantomPinned,
}
}
}

impl Stream for RowStream {
type Item = Result<Row, Error>;

Expand All @@ -217,12 +232,24 @@ impl Stream for RowStream {
Message::DataRow(body) => {
return Poll::Ready(Some(Ok(Row::new(this.statement.clone(), body)?)))
}
Message::EmptyQueryResponse
| Message::CommandComplete(_)
| Message::PortalSuspended => {}
Message::EmptyQueryResponse | Message::PortalSuspended => {}
Message::CommandComplete(body) => {
if let Ok(tag) = body.tag() {
*this.command_tag = Some(tag.to_string());
}
}
Message::ReadyForQuery(_) => return Poll::Ready(None),
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
}
}
}
}

impl RowStream {
/// Returns the command tag of this query.
///
/// This is only available after the stream has been exhausted.
pub fn command_tag(&self) -> Option<String> {
self.command_tag.clone()
}
}
22 changes: 22 additions & 0 deletions tokio-postgres/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::types::{FromSql, Type, WrongType};
use crate::{Error, Statement};
use fallible_iterator::FallibleIterator;
use postgres_protocol::message::backend::DataRowBody;
use postgres_types::{Format, WrongFormat};
use std::fmt;
use std::ops::Range;
use std::str;
Expand Down Expand Up @@ -187,6 +188,27 @@ impl Row {
let range = self.ranges[idx].to_owned()?;
Some(&self.body.buffer()[range])
}

/// Interpret the column at the given index as text
///
/// Useful when using query_raw_txt() which sets text transfer mode
pub fn as_text(&self, idx: usize) -> Result<Option<&str>, Error> {
if self.statement.output_format() == Format::Text {
match self.col_buffer(idx) {
Some(raw) => {
FromSql::from_sql(&Type::TEXT, raw).map_err(|e| Error::from_sql(e, idx))
}
None => Ok(None),
}
} else {
Err(Error::from_sql(Box::new(WrongFormat {}), idx))
}
}

/// Row byte size
pub fn body_len(&self) -> usize {
self.body.buffer().len()
}
}

impl AsName for SimpleColumn {
Expand Down
Loading