Skip to content

Commit aefa11c

Browse files
jeff-davispetrosagg
andcommitted
Connection string config for replication.
Co-authored-by: Petros Angelatos <[email protected]>
1 parent 73b62cd commit aefa11c

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

tokio-postgres/src/config.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ pub enum ChannelBinding {
5656
Require,
5757
}
5858

59+
/// Replication mode configuration.
60+
#[derive(Debug, Copy, Clone, PartialEq)]
61+
#[non_exhaustive]
62+
pub enum ReplicationMode {
63+
/// Physical replication.
64+
Physical,
65+
/// Logical replication.
66+
Logical,
67+
}
68+
5969
/// A host specification.
6070
#[derive(Debug, Clone, PartialEq)]
6171
pub enum Host {
@@ -159,6 +169,7 @@ pub struct Config {
159169
pub(crate) keepalives_idle: Duration,
160170
pub(crate) target_session_attrs: TargetSessionAttrs,
161171
pub(crate) channel_binding: ChannelBinding,
172+
pub(crate) replication_mode: Option<ReplicationMode>,
162173
}
163174

164175
impl Default for Config {
@@ -184,6 +195,7 @@ impl Config {
184195
keepalives_idle: Duration::from_secs(2 * 60 * 60),
185196
target_session_attrs: TargetSessionAttrs::Any,
186197
channel_binding: ChannelBinding::Prefer,
198+
replication_mode: None,
187199
}
188200
}
189201

@@ -387,6 +399,17 @@ impl Config {
387399
self.channel_binding
388400
}
389401

402+
/// Set replication mode.
403+
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
404+
self.replication_mode = Some(replication_mode);
405+
self
406+
}
407+
408+
/// Get replication mode.
409+
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
410+
self.replication_mode
411+
}
412+
390413
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
391414
match key {
392415
"user" => {
@@ -476,6 +499,17 @@ impl Config {
476499
};
477500
self.channel_binding(channel_binding);
478501
}
502+
"replication" => {
503+
let mode = match value {
504+
"off" => None,
505+
"true" => Some(ReplicationMode::Physical),
506+
"database" => Some(ReplicationMode::Logical),
507+
_ => return Err(Error::config_parse(Box::new(InvalidValue("replication")))),
508+
};
509+
if let Some(mode) = mode {
510+
self.replication_mode(mode);
511+
}
512+
}
479513
key => {
480514
return Err(Error::config_parse(Box::new(UnknownOption(
481515
key.to_string(),
@@ -548,6 +582,7 @@ impl fmt::Debug for Config {
548582
.field("keepalives_idle", &self.keepalives_idle)
549583
.field("target_session_attrs", &self.target_session_attrs)
550584
.field("channel_binding", &self.channel_binding)
585+
.field("replication", &self.replication_mode)
551586
.finish()
552587
}
553588
}

tokio-postgres/src/connect_raw.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2-
use crate::config::{self, Config};
2+
use crate::config::{self, Config, ReplicationMode};
33
use crate::connect_tls::connect_tls;
44
use crate::maybe_tls_stream::MaybeTlsStream;
55
use crate::tls::{TlsConnect, TlsStream};
@@ -124,6 +124,12 @@ where
124124
if let Some(application_name) = &config.application_name {
125125
params.push(("application_name", &**application_name));
126126
}
127+
if let Some(replication_mode) = &config.replication_mode {
128+
match replication_mode {
129+
ReplicationMode::Physical => params.push(("replication", "true")),
130+
ReplicationMode::Logical => params.push(("replication", "database")),
131+
}
132+
}
127133

128134
let mut buf = BytesMut::new();
129135
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;

0 commit comments

Comments
 (0)