Skip to content

Commit 64eb4a3

Browse files
committed
implement SHOW command
1 parent d9166ed commit 64eb4a3

File tree

1 file changed

+59
-5
lines changed

1 file changed

+59
-5
lines changed

tokio-postgres/src/replication_client.rs

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::types::Lsn;
4+
use crate::types::{Lsn, Type};
55
use crate::{simple_query, Client, Error};
66
use bytes::Bytes;
7+
use fallible_iterator::FallibleIterator;
78
use futures::{ready, Stream};
89
use pin_project_lite::pin_project;
9-
use postgres_protocol::message::backend::Message;
10-
use postgres_protocol::message::backend::ReplicationMessage;
10+
use postgres_protocol::message::backend::{Field, Message, ReplicationMessage};
11+
use std::io;
1112
use std::marker::PhantomPinned;
13+
use std::ops::Range;
1214
use std::pin::Pin;
1315
use std::task::{Context, Poll};
1416

@@ -19,6 +21,58 @@ use std::task::{Context, Poll};
1921
pub struct ReplicationClient(pub(crate) Client);
2022

2123
impl ReplicationClient {
24+
/// show the value of the given setting
25+
pub async fn show(&self, name: &str) -> Result<Vec<u8>, Error> {
26+
let iclient = self.0.inner();
27+
let command = format!("SHOW \"{}\"", name);
28+
let buf = simple_query::encode(iclient, &command)?;
29+
let mut responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
30+
match responses.next().await? {
31+
Message::RowDescription(rowdesc) => {
32+
let fields: Vec<Field<'_>> = rowdesc.fields().collect().map_err(Error::parse)?;
33+
if fields.len() != 1 {
34+
return Err(Error::parse(io::Error::new(
35+
io::ErrorKind::InvalidInput,
36+
"expected single column in response",
37+
)))
38+
}
39+
if fields[0].type_oid() != Type::TEXT.oid() {
40+
return Err(Error::parse(io::Error::new(
41+
io::ErrorKind::InvalidInput,
42+
"expected single text column in response",
43+
)))
44+
}
45+
}
46+
_ => return Err(Error::unexpected_message()),
47+
};
48+
49+
match responses.next().await? {
50+
Message::DataRow(d) => {
51+
let ranges: Vec<Option<Range<usize>>> = d.ranges().collect().map_err(Error::parse)?;
52+
if ranges.len() != 1 {
53+
return Err(Error::parse(io::Error::new(
54+
io::ErrorKind::InvalidInput,
55+
"expected single column in response",
56+
)))
57+
}
58+
// fetch only column
59+
match &ranges[0] {
60+
Some(r) => Ok(Vec::from(&d.buffer()[r.to_owned()])),
61+
None => {
62+
Err(Error::parse(io::Error::new(
63+
io::ErrorKind::InvalidInput,
64+
"unexpected NULL setting in response",
65+
)))
66+
}
67+
}
68+
}
69+
m => {
70+
dbg!(m);
71+
Err(Error::unexpected_message())
72+
}
73+
}
74+
}
75+
2276
/// Begin physical replication, consuming the replication client and producing a replication stream.
2377
///
2478
/// Replication begins starting with the given Log Sequence Number
@@ -31,7 +85,7 @@ impl ReplicationClient {
3185
) -> Result<ReplicationStream, Error> {
3286
let iclient = self.0.inner();
3387
let slot = match slot_name {
34-
Some(name) => format!(" SLOT {}", name),
88+
Some(name) => format!(" SLOT \"{}\"", name),
3589
None => String::from(""),
3690
};
3791
let timeline = match timeline_id {
@@ -70,7 +124,7 @@ impl ReplicationClient {
70124
" ({})",
71125
options
72126
.iter()
73-
.map(|pair| format!("{} '{}'", pair.0, pair.1))
127+
.map(|pair| format!("\"{}\" '{}'", pair.0, pair.1))
74128
.collect::<Vec<String>>()
75129
.as_slice()
76130
.join(", ")

0 commit comments

Comments
 (0)