Skip to content

Commit 47b4172

Browse files
committed
create/drop replication slot
1 parent b71a1e0 commit 47b4172

File tree

2 files changed

+71
-10
lines changed

2 files changed

+71
-10
lines changed

tokio-postgres/src/replication_client.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ impl TimelineHistory {
5757
}
5858
}
5959

60+
#[derive(Debug)]
61+
pub enum SnapshotMode {
62+
ExportSnapshot,
63+
NoExportSnapshot,
64+
UseSnapshot,
65+
}
66+
6067
/// Replication client connection.
6168
///
6269
/// A replication client is used to issue replication commands, begin
@@ -204,6 +211,62 @@ impl ReplicationClient {
204211
})
205212
}
206213

214+
/// Create physical replication slot
215+
pub async fn create_physical_replication_slot(
216+
&self,
217+
slot_name: &str,
218+
temporary: bool,
219+
reserve_wal: bool
220+
) -> Result<(), Error> {
221+
let iclient = self.0.inner();
222+
let temporary_str = if temporary { " TEMPORARY" } else { "" };
223+
let reserve_wal_str = if reserve_wal { " RESERVE_WAL" } else { "" };
224+
let command = format!("CREATE_REPLICATION_SLOT {}{} PHYSICAL{}",
225+
escape_identifier(slot_name),
226+
temporary_str,
227+
reserve_wal_str);
228+
let buf = simple_query::encode(iclient, &command)?;
229+
let _responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
230+
Ok(())
231+
}
232+
233+
/// Create logical replication slot.
234+
pub async fn create_logical_replication_slot(
235+
&self,
236+
slot_name: &str,
237+
temporary: bool,
238+
plugin_name: &str,
239+
snapshot_mode: Option<SnapshotMode>,
240+
) -> Result<(), Error> {
241+
let iclient = self.0.inner();
242+
let temporary_str = if temporary { " TEMPORARY" } else { "" };
243+
let snapshot_str = snapshot_mode.map_or("", |mode| {
244+
match mode {
245+
SnapshotMode::ExportSnapshot => " EXPORT_SNAPSHOT",
246+
SnapshotMode::NoExportSnapshot => " NOEXPORT_SNAPSHOT",
247+
SnapshotMode::UseSnapshot => " USE_SNAPSHOT",
248+
}
249+
});
250+
let command = format!("CREATE_REPLICATION_SLOT {}{} LOGICAL {}{}",
251+
escape_identifier(slot_name),
252+
temporary_str,
253+
escape_identifier(plugin_name),
254+
snapshot_str);
255+
let buf = simple_query::encode(iclient, &command)?;
256+
let _responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
257+
Ok(())
258+
}
259+
260+
/// Drop replication slot
261+
pub async fn drop_replication_slot(&self, slot_name: &str, wait: bool) -> Result<(), Error> {
262+
let iclient = self.0.inner();
263+
let wait_str = if wait { " WAIT" } else { "" };
264+
let command = format!("DROP_REPLICATION_SLOT {}{}", escape_identifier(slot_name), wait_str);
265+
let buf = simple_query::encode(iclient, &command)?;
266+
let _responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
267+
Ok(())
268+
}
269+
207270
/// Begin physical replication, consuming the replication client and producing a replication stream.
208271
///
209272
/// Replication begins starting with the given Log Sequence Number

tokio-postgres/tests/test/replication.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const LOGICAL_INSERT_TAG: u8 = b'I';
1515
// test for:
1616
// - identify_system
1717
// - show
18+
// - slot create/drop
1819
// - physical replication
1920
#[tokio::test]
2021
async fn physical_replication() {
@@ -25,6 +26,10 @@ async fn physical_replication() {
2526
let show_port = rclient.show("port").await.unwrap();
2627
assert_eq!(show_port, "5433");
2728

29+
let slot = "test_physical_slot";
30+
let _ = rclient.drop_replication_slot(slot, false);
31+
let _ = rclient.create_physical_replication_slot(slot, false, false);
32+
2833
let physical_stream = rclient
2934
.start_physical_replication(None, identify_system.xlogpos(), None)
3035
.await
@@ -51,7 +56,7 @@ async fn physical_replication() {
5156
}
5257

5358
// test for:
54-
// X create/drop slot
59+
// - create/drop slot
5560
// X standby_status_update
5661
// - logical replication
5762
#[tokio::test]
@@ -63,15 +68,8 @@ async fn logical_replication() {
6368

6469
let slot = "test_logical_slot";
6570
let plugin = "pgoutput";
66-
let _nrows = sclient
67-
.execute("select pg_drop_replication_slot($1)", &[&slot.to_string()])
68-
.await
69-
.unwrap();
70-
71-
let _nrows = sclient
72-
.execute("select pg_create_logical_replication_slot($1, $2)", &[&slot.to_string(), &plugin.to_string()])
73-
.await
74-
.unwrap();
71+
let _ = rclient.drop_replication_slot(slot, false);
72+
let _ = rclient.create_logical_replication_slot(slot, false, plugin, None);
7573

7674
let xlog_start = identify_system.xlogpos();
7775
let options = &vec![("proto_version","1"), ("publication_names", "test_logical_pub")];

0 commit comments

Comments
 (0)