Skip to content

Commit ba06fd6

Browse files
committed
add replication test
1 parent 7087ada commit ba06fd6

File tree

5 files changed

+85
-10
lines changed

5 files changed

+85
-10
lines changed

docker-compose.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@ services:
33
postgres:
44
image: "sfackler/rust-postgres-test:6"
55
ports:
6-
- 5433:5433
6+
- 5433:5433
7+
volumes:
8+
- "./docker/sql_setup.sh:/docker-entrypoint-initdb.d/sql_setup.sh"

docker/sql_setup.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ port = 5433
6464
ssl = on
6565
ssl_cert_file = 'server.crt'
6666
ssl_key_file = 'server.key'
67+
wal_level = logical
6768
EOCONF
6869

6970
cat > "$PGDATA/pg_hba.conf" <<-EOCONF
@@ -82,6 +83,7 @@ host all ssl_user ::0/0 reject
8283
8384
# IPv4 local connections:
8485
host all postgres 0.0.0.0/0 trust
86+
host replication postgres 0.0.0.0/0 trust
8587
# IPv6 local connections:
8688
host all postgres ::0/0 trust
8789
# Unix socket connections:

tokio-postgres/src/replication_client.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub struct IdentifySystem {
2121
systemid: String,
2222
timeline: u32,
2323
xlogpos: Lsn,
24-
dbname: String,
24+
dbname: Option<String>,
2525
}
2626

2727
impl IdentifySystem {
@@ -37,8 +37,8 @@ impl IdentifySystem {
3737
self.xlogpos
3838
}
3939

40-
pub fn dbname(&self) -> &str {
41-
&self.dbname
40+
pub fn dbname(&self) -> Option<&str> {
41+
self.dbname.as_deref()
4242
}
4343
}
4444

@@ -89,16 +89,20 @@ impl ReplicationClient {
8989
)));
9090
};
9191

92-
let str_values = ranges
92+
let values: Vec<Option<&str>> = ranges
9393
.iter()
94-
.map(|r| from_utf8(&datarow.buffer()[r.to_owned().unwrap()]).unwrap())
94+
.map(|range| {
95+
range
96+
.to_owned()
97+
.map(|r| from_utf8(&datarow.buffer()[r]).unwrap())
98+
})
9599
.collect::<Vec<_>>();
96100

97101
Ok(IdentifySystem {
98-
systemid: String::from(str_values[0]),
99-
timeline: str_values[1].parse::<u32>().unwrap(),
100-
xlogpos: Lsn::from(str_values[2]),
101-
dbname: String::from(str_values[3]),
102+
systemid: String::from(values[0].unwrap()),
103+
timeline: values[1].unwrap().parse::<u32>().unwrap(),
104+
xlogpos: Lsn::from(values[2].unwrap()),
105+
dbname: values[3].map(String::from),
102106
})
103107
}
104108

tokio-postgres/tests/test/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio_postgres::{
1818

1919
mod binary_copy;
2020
mod parse;
21+
mod replication;
2122
#[cfg(feature = "runtime")]
2223
mod runtime;
2324
mod types;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use postgres_protocol::message::backend::ReplicationMessage;
2+
use tokio::stream::StreamExt;
3+
use tokio_postgres::{connect, connect_replication, NoTls, ReplicationMode};
4+
5+
// test for:
6+
// - identify_system
7+
// - show
8+
// X timeline history
9+
// - physical replication
10+
#[tokio::test]
11+
async fn physical_replication() {
12+
let conninfo = "host=127.0.0.1 port=5433 user=postgres";
13+
14+
// form SQL connection
15+
let (sclient, sconnection) = connect(conninfo, NoTls).await.unwrap();
16+
tokio::spawn(async move {
17+
if let Err(e) = sconnection.await {
18+
eprintln!("connection error: {}", e);
19+
}
20+
});
21+
22+
// form replication connection
23+
let (rclient, rconnection) = connect_replication(conninfo, NoTls, ReplicationMode::Physical)
24+
.await
25+
.unwrap();
26+
tokio::spawn(async move {
27+
if let Err(e) = rconnection.await {
28+
eprintln!("connection error: {}", e);
29+
}
30+
});
31+
32+
let identify_system = rclient.identify_system().await.unwrap();
33+
assert_eq!(identify_system.dbname(), None);
34+
let show_port = rclient.show("port").await.unwrap();
35+
assert_eq!(show_port, "5433");
36+
37+
let physical_stream = rclient
38+
.start_physical_replication(None, identify_system.xlogpos(), None)
39+
.await
40+
.unwrap();
41+
42+
let _nrows = sclient
43+
.execute("drop table test_physical_replication", &[])
44+
.await
45+
.unwrap();
46+
let _nrows = sclient
47+
.execute("create table test_physical_replication(i int)", &[])
48+
.await
49+
.unwrap();
50+
let _nrows = sclient
51+
.execute("insert into test_physical_replication values(1)", &[])
52+
.await
53+
.unwrap();
54+
55+
tokio::pin!(physical_stream);
56+
57+
let mut got_xlogdata = false;
58+
while let Some(replication_message) = physical_stream.next().await {
59+
if let ReplicationMessage::XLogData(_) = replication_message.unwrap() {
60+
got_xlogdata = true;
61+
break;
62+
}
63+
}
64+
65+
assert!(got_xlogdata);
66+
}

0 commit comments

Comments
 (0)