Skip to content

Commit 9eb3092

Browse files
committed
Replication support.
1 parent 83ffcf8 commit 9eb3092

File tree

8 files changed

+421
-3
lines changed

8 files changed

+421
-3
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::str;
1111

1212
use crate::Oid;
1313

14+
// top-level message tags
1415
pub const PARSE_COMPLETE_TAG: u8 = b'1';
1516
pub const BIND_COMPLETE_TAG: u8 = b'2';
1617
pub const CLOSE_COMPLETE_TAG: u8 = b'3';
@@ -22,6 +23,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2223
pub const ERROR_RESPONSE_TAG: u8 = b'E';
2324
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
2425
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
26+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
2527
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
2628
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
2729
pub const NO_DATA_TAG: u8 = b'n';
@@ -33,6 +35,10 @@ pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
3335
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
3436
pub const READY_FOR_QUERY_TAG: u8 = b'Z';
3537

38+
// replication message tags
39+
pub const XLOG_DATA_TAG: u8 = b'w';
40+
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
41+
3642
#[derive(Debug, Copy, Clone)]
3743
pub struct Header {
3844
tag: u8,
@@ -93,6 +99,7 @@ pub enum Message {
9399
CopyDone,
94100
CopyInResponse(CopyInResponseBody),
95101
CopyOutResponse(CopyOutResponseBody),
102+
CopyBothResponse(CopyBothResponseBody),
96103
DataRow(DataRowBody),
97104
EmptyQueryResponse,
98105
ErrorResponse(ErrorResponseBody),
@@ -190,6 +197,16 @@ impl Message {
190197
storage,
191198
})
192199
}
200+
COPY_BOTH_RESPONSE_TAG => {
201+
let format = buf.read_u8()?;
202+
let len = buf.read_u16::<BigEndian>()?;
203+
let storage = buf.read_all();
204+
Message::CopyBothResponse(CopyBothResponseBody {
205+
format,
206+
len,
207+
storage,
208+
})
209+
}
193210
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194211
BACKEND_KEY_DATA_TAG => {
195212
let process_id = buf.read_i32::<BigEndian>()?;
@@ -278,6 +295,58 @@ impl Message {
278295
}
279296
}
280297

298+
/// An enum representing Postgres backend replication messages.
299+
#[non_exhaustive]
300+
#[derive(Debug)]
301+
pub enum ReplicationMessage {
302+
XLogData(XLogDataBody),
303+
PrimaryKeepAlive(PrimaryKeepAliveBody),
304+
}
305+
306+
impl ReplicationMessage {
307+
pub fn parse(bytes: &Bytes) -> io::Result<ReplicationMessage> {
308+
let mut buf = Buffer {
309+
bytes: bytes.clone(),
310+
idx: 0,
311+
};
312+
313+
let tag = buf.read_u8()?;
314+
315+
let replication_message = match tag {
316+
XLOG_DATA_TAG => {
317+
let wal_start = buf.read_u64::<BigEndian>()?;
318+
let wal_end = buf.read_u64::<BigEndian>()?;
319+
let timestamp = buf.read_i64::<BigEndian>()?;
320+
let storage = buf.read_all();
321+
ReplicationMessage::XLogData(XLogDataBody {
322+
wal_start,
323+
wal_end,
324+
timestamp,
325+
storage,
326+
})
327+
}
328+
PRIMARY_KEEPALIVE_TAG => {
329+
let wal_end = buf.read_u64::<BigEndian>()?;
330+
let timestamp = buf.read_i64::<BigEndian>()?;
331+
let reply = buf.read_u8()?;
332+
ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
333+
wal_end,
334+
timestamp,
335+
reply,
336+
})
337+
}
338+
tag => {
339+
return Err(io::Error::new(
340+
io::ErrorKind::InvalidInput,
341+
format!("unknown replication message tag `{}`", tag),
342+
));
343+
}
344+
};
345+
346+
Ok(replication_message)
347+
}
348+
}
349+
281350
struct Buffer {
282351
bytes: Bytes,
283352
idx: usize,
@@ -524,6 +593,27 @@ impl CopyOutResponseBody {
524593
}
525594
}
526595

596+
pub struct CopyBothResponseBody {
597+
storage: Bytes,
598+
len: u16,
599+
format: u8,
600+
}
601+
602+
impl CopyBothResponseBody {
603+
#[inline]
604+
pub fn format(&self) -> u8 {
605+
self.format
606+
}
607+
608+
#[inline]
609+
pub fn column_formats(&self) -> ColumnFormats<'_> {
610+
ColumnFormats {
611+
remaining: self.len,
612+
buf: &self.storage,
613+
}
614+
}
615+
}
616+
527617
pub struct DataRowBody {
528618
storage: Bytes,
529619
len: u16,
@@ -776,6 +866,65 @@ impl RowDescriptionBody {
776866
}
777867
}
778868

869+
#[derive(Debug)]
870+
pub struct XLogDataBody {
871+
wal_start: u64,
872+
wal_end: u64,
873+
timestamp: i64,
874+
storage: Bytes,
875+
}
876+
877+
impl XLogDataBody {
878+
#[inline]
879+
pub fn wal_start(&self) -> u64 {
880+
self.wal_start
881+
}
882+
883+
#[inline]
884+
pub fn wal_end(&self) -> u64 {
885+
self.wal_end
886+
}
887+
888+
#[inline]
889+
pub fn timestamp(&self) -> i64 {
890+
self.timestamp
891+
}
892+
893+
#[inline]
894+
pub fn data(&self) -> &[u8] {
895+
&self.storage
896+
}
897+
898+
#[inline]
899+
pub fn into_bytes(self) -> Bytes {
900+
self.storage
901+
}
902+
}
903+
904+
#[derive(Debug)]
905+
pub struct PrimaryKeepAliveBody {
906+
wal_end: u64,
907+
timestamp: i64,
908+
reply: u8,
909+
}
910+
911+
impl PrimaryKeepAliveBody {
912+
#[inline]
913+
pub fn wal_end(&self) -> u64 {
914+
self.wal_end
915+
}
916+
917+
#[inline]
918+
pub fn timestamp(&self) -> i64 {
919+
self.timestamp
920+
}
921+
922+
#[inline]
923+
pub fn reply(&self) -> u8 {
924+
self.reply
925+
}
926+
}
927+
779928
pub struct Fields<'a> {
780929
buf: &'a [u8],
781930
remaining: u16,

postgres-protocol/src/message/frontend.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,48 @@ pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
132132
})
133133
}
134134

135+
#[inline]
136+
pub fn standby_status_update(
137+
write_lsn: i64,
138+
flush_lsn: i64,
139+
apply_lsn: i64,
140+
timestamp: i64,
141+
reply: u8,
142+
buf: &mut BytesMut,
143+
) -> io::Result<()> {
144+
buf.put_u8(b'd');
145+
write_body(buf, |buf| {
146+
buf.put_u8(b'r');
147+
buf.put_i64(write_lsn + 1);
148+
buf.put_i64(flush_lsn + 1);
149+
buf.put_i64(apply_lsn + 1);
150+
buf.put_i64(timestamp);
151+
buf.put_u8(reply);
152+
Ok(())
153+
})
154+
}
155+
156+
#[inline]
157+
pub fn hot_standby_feedback(
158+
timestamp: i64,
159+
global_xmin: u32,
160+
global_xmin_epoch: u32,
161+
catalog_xmin: u32,
162+
catalog_xmin_epoch: u32,
163+
buf: &mut BytesMut,
164+
) -> io::Result<()> {
165+
buf.put_u8(b'd');
166+
write_body(buf, |buf| {
167+
buf.put_u8(b'h');
168+
buf.put_i64(timestamp);
169+
buf.put_u32(global_xmin);
170+
buf.put_u32(global_xmin_epoch);
171+
buf.put_u32(catalog_xmin);
172+
buf.put_u32(catalog_xmin_epoch);
173+
Ok(())
174+
})
175+
}
176+
135177
pub struct CopyData<T> {
136178
buf: T,
137179
len: i32,

tokio-postgres/src/config.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ pub enum ChannelBinding {
5656
Require,
5757
}
5858

59+
/// Replication mode configuration.
60+
#[derive(Debug, Copy, Clone, PartialEq)]
61+
#[non_exhaustive]
62+
pub enum ReplicationMode {
63+
/// Physical replication.
64+
Physical,
65+
/// Logical replication.
66+
Logical,
67+
}
68+
5969
/// A host specification.
6070
#[derive(Debug, Clone, PartialEq)]
6171
pub enum Host {
@@ -159,6 +169,7 @@ pub struct Config {
159169
pub(crate) keepalives_idle: Duration,
160170
pub(crate) target_session_attrs: TargetSessionAttrs,
161171
pub(crate) channel_binding: ChannelBinding,
172+
pub(crate) replication_mode: Option<ReplicationMode>,
162173
}
163174

164175
impl Default for Config {
@@ -184,6 +195,7 @@ impl Config {
184195
keepalives_idle: Duration::from_secs(2 * 60 * 60),
185196
target_session_attrs: TargetSessionAttrs::Any,
186197
channel_binding: ChannelBinding::Prefer,
198+
replication_mode: None,
187199
}
188200
}
189201

@@ -387,6 +399,17 @@ impl Config {
387399
self.channel_binding
388400
}
389401

402+
/// Set replication mode.
403+
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
404+
self.replication_mode = Some(replication_mode);
405+
self
406+
}
407+
408+
/// Get replication mode.
409+
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
410+
self.replication_mode
411+
}
412+
390413
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
391414
match key {
392415
"user" => {
@@ -534,6 +557,12 @@ impl fmt::Debug for Config {
534557
}
535558
}
536559

560+
let replication_mode_str = match self.replication_mode {
561+
None => "false",
562+
Some(ReplicationMode::Physical) => "true",
563+
Some(ReplicationMode::Logical) => "database",
564+
};
565+
537566
f.debug_struct("Config")
538567
.field("user", &self.user)
539568
.field("password", &self.password.as_ref().map(|_| Redaction {}))
@@ -548,6 +577,7 @@ impl fmt::Debug for Config {
548577
.field("keepalives_idle", &self.keepalives_idle)
549578
.field("target_session_attrs", &self.target_session_attrs)
550579
.field("channel_binding", &self.channel_binding)
580+
.field("replication", &replication_mode_str.to_string())
551581
.finish()
552582
}
553583
}

tokio-postgres/src/connect_raw.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2-
use crate::config::{self, Config};
2+
use crate::config::{self, Config, ReplicationMode};
33
use crate::connect_tls::connect_tls;
44
use crate::maybe_tls_stream::MaybeTlsStream;
55
use crate::tls::{TlsConnect, TlsStream};
@@ -124,6 +124,12 @@ where
124124
if let Some(application_name) = &config.application_name {
125125
params.push(("application_name", &**application_name));
126126
}
127+
if let Some(replication_mode) = &config.replication_mode {
128+
match replication_mode {
129+
ReplicationMode::Physical => params.push(("replication", "true")),
130+
ReplicationMode::Logical => params.push(("replication", "database")),
131+
}
132+
}
127133

128134
let mut buf = BytesMut::new();
129135
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;

0 commit comments

Comments
 (0)