Skip to content

Commit e899d18

Browse files
author
Vlad Krasnov
committed
Small changes to expose WAL
1 parent 0c064a9 commit e899d18

File tree

6 files changed

+55
-7
lines changed

6 files changed

+55
-7
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub const PARAMETER_STATUS_TAG: u8 = b'S';
3232
pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
3333
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
3434
pub const READY_FOR_QUERY_TAG: u8 = b'Z';
35+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
3536

3637
#[derive(Debug, Copy, Clone)]
3738
pub struct Header {
@@ -93,6 +94,7 @@ pub enum Message {
9394
CopyDone,
9495
CopyInResponse(CopyInResponseBody),
9596
CopyOutResponse(CopyOutResponseBody),
97+
CopyBothResponse(CopyBothResponseBody),
9698
DataRow(DataRowBody),
9799
EmptyQueryResponse,
98100
ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
190192
storage,
191193
})
192194
}
195+
COPY_BOTH_RESPONSE_TAG => {
196+
let format = buf.read_u8()?;
197+
let len = buf.read_u16::<BigEndian>()?;
198+
let storage = buf.read_all();
199+
Message::CopyBothResponse(CopyBothResponseBody {
200+
format,
201+
len,
202+
storage,
203+
})
204+
}
193205
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194206
BACKEND_KEY_DATA_TAG => {
195207
let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,27 @@ impl CopyOutResponseBody {
524536
}
525537
}
526538

539+
pub struct CopyBothResponseBody {
540+
storage: Bytes,
541+
len: u16,
542+
format: u8,
543+
}
544+
545+
impl CopyBothResponseBody {
546+
#[inline]
547+
pub fn format(&self) -> u8 {
548+
self.format
549+
}
550+
551+
#[inline]
552+
pub fn column_formats(&self) -> ColumnFormats<'_> {
553+
ColumnFormats {
554+
remaining: self.len,
555+
buf: &self.storage,
556+
}
557+
}
558+
}
559+
527560
pub struct DataRowBody {
528561
storage: Bytes,
529562
len: u16,

tokio-postgres/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl Client {
188188
}
189189
}
190190

191-
pub(crate) fn inner(&self) -> &Arc<InnerClient> {
191+
pub fn inner(&self) -> &Arc<InnerClient> {
192192
&self.inner
193193
}
194194

tokio-postgres/src/config.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ pub struct Config {
159159
pub(crate) keepalives_idle: Duration,
160160
pub(crate) target_session_attrs: TargetSessionAttrs,
161161
pub(crate) channel_binding: ChannelBinding,
162+
pub(crate) replication: Option<String>,
162163
}
163164

164165
impl Default for Config {
@@ -184,6 +185,7 @@ impl Config {
184185
keepalives_idle: Duration::from_secs(2 * 60 * 60),
185186
target_session_attrs: TargetSessionAttrs::Any,
186187
channel_binding: ChannelBinding::Prefer,
188+
replication: None,
187189
}
188190
}
189191

@@ -224,6 +226,12 @@ impl Config {
224226
self
225227
}
226228

229+
/// Sets the kind of replication.
230+
pub fn set_replication_database(&mut self) -> &mut Config {
231+
self.replication = Some("database".to_string());
232+
self
233+
}
234+
227235
/// Gets the name of the database to connect to, if one has been configured
228236
/// with the `dbname` method.
229237
pub fn get_dbname(&self) -> Option<&str> {
@@ -476,6 +484,9 @@ impl Config {
476484
};
477485
self.channel_binding(channel_binding);
478486
}
487+
"replication" => {
488+
self.replication = Some(value.to_string());
489+
}
479490
key => {
480491
return Err(Error::config_parse(Box::new(UnknownOption(
481492
key.to_string(),

tokio-postgres/src/connect_raw.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ where
124124
if let Some(application_name) = &config.application_name {
125125
params.push(("application_name", &**application_name));
126126
}
127+
if let Some(replication) = &config.replication {
128+
params.push(("replication", &**replication));
129+
}
127130

128131
let mut buf = BytesMut::new();
129132
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;

tokio-postgres/src/lib.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@
113113
//! | `with-uuid-0_8` | Enable support for the `uuid` crate. | [uuid](https://crates.io/crates/uuid) 0.8 | no |
114114
//! | `with-time-0_2` | Enable support for the `time` crate. | [time](https://crates.io/crates/time) 0.2 | no |
115115
#![doc(html_root_url = "https://docs.rs/tokio-postgres/0.7")]
116-
#![warn(rust_2018_idioms, clippy::all, missing_docs)]
116+
#![warn(rust_2018_idioms, clippy::all)]
117117

118118
pub use crate::cancel_token::CancelToken;
119119
pub use crate::client::Client;
@@ -138,23 +138,24 @@ pub use crate::to_statement::ToStatement;
138138
pub use crate::transaction::Transaction;
139139
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
140140
use crate::types::ToSql;
141+
pub use postgres_protocol::message::backend::Message;
141142

142143
pub mod binary_copy;
143144
mod bind;
144145
#[cfg(feature = "runtime")]
145146
mod cancel_query;
146147
mod cancel_query_raw;
147148
mod cancel_token;
148-
mod client;
149-
mod codec;
149+
pub mod client;
150+
pub mod codec;
150151
pub mod config;
151152
#[cfg(feature = "runtime")]
152153
mod connect;
153154
mod connect_raw;
154155
#[cfg(feature = "runtime")]
155156
mod connect_socket;
156157
mod connect_tls;
157-
mod connection;
158+
pub mod connection;
158159
mod copy_in;
159160
mod copy_out;
160161
pub mod error;
@@ -164,7 +165,7 @@ mod portal;
164165
mod prepare;
165166
mod query;
166167
pub mod row;
167-
mod simple_query;
168+
pub mod simple_query;
168169
#[cfg(feature = "runtime")]
169170
mod socket;
170171
mod statement;

tokio-postgres/src/simple_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Erro
6161
}
6262
}
6363

64-
fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
64+
pub fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
6565
client.with_buf(|buf| {
6666
frontend::query(query, buf).map_err(Error::encode)?;
6767
Ok(buf.split().freeze())

0 commit comments

Comments
 (0)