Skip to content

Commit 0b0a5eb

Browse files
committed
Comments and WIP start replication tuple.
1 parent 3c0c317 commit 0b0a5eb

File tree

2 files changed

+91
-6
lines changed

2 files changed

+91
-6
lines changed

tokio-postgres/src/replication_client.rs

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,26 @@ pub struct IdentifySystem {
147147
}
148148

149149
impl IdentifySystem {
150+
/// The unique system identifier identifying the cluster. This can
151+
/// be used to check that the base backup used to initialize the
152+
/// standby came from the same cluster.
150153
pub fn systemid(&self) -> &str {
151154
&self.systemid
152155
}
153156

157+
/// Current timeline ID. Also useful to check that the standby is
158+
/// consistent with the master.
154159
pub fn timeline(&self) -> u32 {
155160
self.timeline
156161
}
157162

163+
/// Current WAL flush location. Useful to get a known location in
164+
/// the write-ahead log where streaming can start.
158165
pub fn xlogpos(&self) -> Lsn {
159166
self.xlogpos
160167
}
161168

169+
/// Database connected to or None.
162170
pub fn dbname(&self) -> Option<&str> {
163171
self.dbname.as_deref()
164172
}
@@ -172,10 +180,13 @@ pub struct TimelineHistory {
172180
}
173181

174182
impl TimelineHistory {
183+
/// File name of the timeline history file, e.g.,
184+
/// 00000002.history.
175185
pub fn filename(&self) -> &Path {
176186
self.filename.as_path()
177187
}
178188

189+
/// Contents of the timeline history file.
179190
pub fn content(&self) -> &[u8] {
180191
self.content.as_slice()
181192
}
@@ -185,8 +196,16 @@ impl TimelineHistory {
185196
/// [create_logical_replication_slot()](ReplicationClient::create_logical_replication_slot).
186197
#[derive(Debug)]
187198
pub enum SnapshotMode {
199+
/// Export the snapshot for use in other sessions. This option
200+
/// can't be used inside a transaction.
188201
ExportSnapshot,
202+
/// Use the snapshot for logical decoding as normal but won't do
203+
/// anything else with it.
189204
NoExportSnapshot,
205+
/// Use the snapshot for the current transaction executing the
206+
/// command. This option must be used in a transaction, and
207+
/// CREATE_REPLICATION_SLOT must be the first command run in that
208+
/// transaction.
190209
UseSnapshot,
191210
}
192211

@@ -203,23 +222,53 @@ pub struct CreateReplicationSlotResponse {
203222
}
204223

205224
impl CreateReplicationSlotResponse {
225+
/// The name of the newly-created replication slot.
206226
pub fn slot_name(&self) -> &str {
207227
&self.slot_name
208228
}
209229

230+
/// The WAL location at which the slot became consistent. This is
231+
/// the earliest location from which streaming can start on this
232+
/// replication slot.
210233
pub fn consistent_point(&self) -> Lsn {
211234
self.consistent_point
212235
}
213236

237+
/// The identifier of the snapshot exported by the command. The
238+
/// snapshot is valid until a new command is executed on this
239+
/// connection or the replication connection is closed. Null if
240+
/// the created slot is physical.
214241
pub fn snapshot_name(&self) -> Option<&str> {
215242
self.snapshot_name.as_deref()
216243
}
217244

245+
/// The name of the output plugin used by the newly-created
246+
/// replication slot. Null if the created slot is physical.
218247
pub fn output_plugin(&self) -> Option<&str> {
219248
self.output_plugin.as_deref()
220249
}
221250
}
222251

252+
/// Response sent after streaming from a timeline that is not the
253+
/// current timeline.
254+
#[derive(Debug)]
255+
pub struct ReplicationResponse {
256+
next_tli: u64,
257+
next_tli_startpos: Lsn,
258+
}
259+
260+
impl ReplicationResponse {
261+
/// next timeline's ID
262+
pub fn next_tli(&self) -> u64 {
263+
self.next_tli
264+
}
265+
266+
/// WAL location where the switch happened
267+
pub fn next_tli_startpos(&self) -> Lsn {
268+
self.next_tli_startpos
269+
}
270+
}
271+
223272
/// Represents a client connected in replication mode.
224273
pub struct ReplicationClient {
225274
client: Client,
@@ -404,6 +453,15 @@ impl ReplicationClient {
404453
let ranges = datarow.ranges().collect::<Vec<_>>().map_err(Error::parse)?;
405454

406455
assert_eq!(fields.len(), 4);
456+
assert_eq!(fields[0].type_oid(), Type::TEXT.oid());
457+
assert_eq!(fields[0].format(), 0);
458+
assert_eq!(fields[1].type_oid(), Type::TEXT.oid());
459+
assert_eq!(fields[1].format(), 0);
460+
assert_eq!(fields[2].type_oid(), Type::TEXT.oid());
461+
assert_eq!(fields[2].format(), 0);
462+
assert_eq!(fields[3].type_oid(), Type::TEXT.oid());
463+
assert_eq!(fields[3].format(), 0);
464+
assert_eq!(ranges.len(), 4);
407465

408466
let values: Vec<Option<&str>> = ranges
409467
.iter()
@@ -642,7 +700,7 @@ pub struct ReplicationStream<'a> {
642700

643701
impl ReplicationStream<'_> {
644702
/// Stop replication stream and return the replication client object.
645-
pub async fn stop_replication(mut self: Pin<Box<Self>>) -> Result<(), Error> {
703+
pub async fn stop_replication(mut self: Pin<Box<Self>>) -> Result<Option<ReplicationResponse>, Error> {
646704
let this = self.as_mut().project();
647705

648706
this.rclient.send_copydone()?;
@@ -657,10 +715,36 @@ impl ReplicationStream<'_> {
657715
}
658716
}
659717

660-
match responses.next().await? {
661-
Message::CommandComplete(_) => (),
718+
let next_message = responses.next().await?;
719+
720+
let response = match next_message {
721+
Message::RowDescription(rowdesc) => {
722+
let datarow = match responses.next().await? {
723+
Message::DataRow(m) => m,
724+
m => return Err(Error::unexpected_message(m)),
725+
};
726+
727+
let fields = rowdesc.fields().collect::<Vec<_>>().map_err(Error::parse)?;
728+
let ranges = datarow.ranges().collect::<Vec<_>>().map_err(Error::parse)?;
729+
730+
assert_eq!(fields.len(), 2);
731+
assert_eq!(fields[0].type_oid(), Type::INT8.oid());
732+
assert_eq!(fields[0].format(), 0);
733+
assert_eq!(fields[1].type_oid(), Type::TEXT.oid());
734+
assert_eq!(fields[1].format(), 0);
735+
assert_eq!(ranges.len(), 2);
736+
737+
let timeline = &datarow.buffer()[ranges[0].to_owned().unwrap()];
738+
let switch = &datarow.buffer()[ranges[1].to_owned().unwrap()];
739+
Some(ReplicationResponse {
740+
next_tli: from_utf8(timeline).unwrap().parse::<u64>().unwrap(),
741+
next_tli_startpos: Lsn::from(from_utf8(switch).unwrap()),
742+
})
743+
}
744+
Message::CommandComplete(_) => None,
662745
m => return Err(Error::unexpected_message(m)),
663746
};
747+
664748
match responses.next().await? {
665749
Message::CommandComplete(_) => (),
666750
m => return Err(Error::unexpected_message(m)),
@@ -670,7 +754,7 @@ impl ReplicationStream<'_> {
670754
m => return Err(Error::unexpected_message(m)),
671755
};
672756

673-
Ok(())
757+
Ok(response)
674758
}
675759
}
676760

tokio-postgres/tests/test/replication.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use postgres_protocol::message::backend::ReplicationMessage;
22
use tokio::stream::StreamExt;
33
use tokio_postgres::Client;
4-
use tokio_postgres::ReplicationClient;
4+
use tokio_postgres::replication_client::ReplicationClient;
55
use tokio_postgres::{connect, connect_replication, NoTls, ReplicationMode};
66

77
const LOGICAL_BEGIN_TAG: u8 = b'B';
@@ -59,7 +59,8 @@ async fn physical_replication() {
5959

6060
assert!(got_xlogdata);
6161

62-
let _ = physical_stream.stop_replication().await.unwrap();
62+
let response = physical_stream.stop_replication().await.unwrap();
63+
assert!(response.is_none());
6364

6465
// repeat simple command after stream is ended
6566
let show_port = rclient.show("port").await.unwrap();

0 commit comments

Comments
 (0)