Skip to content

Commit 7c5b43f

Browse files
committed
add postgres replication integration test
Signed-off-by: Petros Angelatos <[email protected]>
1 parent f192b59 commit 7c5b43f

File tree

2 files changed

+150
-0
lines changed

2 files changed

+150
-0
lines changed

tokio-postgres/tests/test/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use tokio_postgres::{
1818

1919
mod binary_copy;
2020
mod parse;
21+
mod replication;
2122
#[cfg(feature = "runtime")]
2223
mod runtime;
2324
mod types;
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use futures::StreamExt;
2+
use std::time::{Duration, UNIX_EPOCH};
3+
4+
use postgres_protocol::message::backend::LogicalReplicationMessage::{Begin, Commit, Insert};
5+
use postgres_protocol::message::backend::ReplicationMessage::*;
6+
use postgres_protocol::message::backend::TupleData;
7+
use postgres_types::PgLsn;
8+
use tokio_postgres::replication::LogicalReplicationStream;
9+
use tokio_postgres::NoTls;
10+
use tokio_postgres::SimpleQueryMessage::Row;
11+
12+
#[tokio::test]
13+
async fn test_replication() {
14+
// form SQL connection
15+
let conninfo = "host=127.0.0.1 port=5433 user=postgres replication=database";
16+
let (client, connection) = tokio_postgres::connect(conninfo, NoTls).await.unwrap();
17+
tokio::spawn(async move {
18+
if let Err(e) = connection.await {
19+
eprintln!("connection error: {}", e);
20+
}
21+
});
22+
23+
client
24+
.simple_query("DROP TABLE IF EXISTS test_logical_replication")
25+
.await
26+
.unwrap();
27+
client
28+
.simple_query("CREATE TABLE test_logical_replication(i int)")
29+
.await
30+
.unwrap();
31+
let res = client
32+
.simple_query("SELECT 'test_logical_replication'::regclass::oid")
33+
.await
34+
.unwrap();
35+
let rel_id: u32 = if let Row(row) = &res[0] {
36+
row.get("oid").unwrap().parse().unwrap()
37+
} else {
38+
panic!("unexpeced query message");
39+
};
40+
41+
client
42+
.simple_query("DROP PUBLICATION IF EXISTS test_pub")
43+
.await
44+
.unwrap();
45+
client
46+
.simple_query("CREATE PUBLICATION test_pub FOR ALL TABLES")
47+
.await
48+
.unwrap();
49+
50+
let slot = "test_logical_slot";
51+
52+
let query = format!(
53+
r#"CREATE_REPLICATION_SLOT {:?} TEMPORARY LOGICAL "pgoutput""#,
54+
slot
55+
);
56+
let slot_query = client.simple_query(&query).await.unwrap();
57+
let lsn = if let Row(row) = &slot_query[0] {
58+
row.get("consistent_point").unwrap()
59+
} else {
60+
panic!("unexpeced query message");
61+
};
62+
63+
// issue a query that will appear in the slot's stream since it happened after its creation
64+
client
65+
.simple_query("INSERT INTO test_logical_replication VALUES (42)")
66+
.await
67+
.unwrap();
68+
69+
let options = r#"("proto_version" '1', "publication_names" 'test_pub')"#;
70+
let query = format!(
71+
r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#,
72+
slot, lsn, options
73+
);
74+
let copy_stream = client
75+
.copy_both_simple::<bytes::Bytes>(&query)
76+
.await
77+
.unwrap();
78+
79+
let stream = LogicalReplicationStream::new(copy_stream);
80+
tokio::pin!(stream);
81+
82+
// verify that we can observe the transaction in the replication stream
83+
let begin = loop {
84+
match stream.next().await {
85+
Some(Ok(XLogData(body))) => {
86+
if let Begin(begin) = body.into_data() {
87+
break begin;
88+
}
89+
}
90+
Some(Ok(_)) => (),
91+
Some(Err(_)) => panic!("unexpected replication stream error"),
92+
None => panic!("unexpected replication stream end"),
93+
}
94+
};
95+
96+
let insert = loop {
97+
match stream.next().await {
98+
Some(Ok(XLogData(body))) => {
99+
if let Insert(insert) = body.into_data() {
100+
break insert;
101+
}
102+
}
103+
Some(Ok(_)) => (),
104+
Some(Err(_)) => panic!("unexpected replication stream error"),
105+
None => panic!("unexpected replication stream end"),
106+
}
107+
};
108+
109+
let commit = loop {
110+
match stream.next().await {
111+
Some(Ok(XLogData(body))) => {
112+
if let Commit(commit) = body.into_data() {
113+
break commit;
114+
}
115+
}
116+
Some(Ok(_)) => (),
117+
Some(Err(_)) => panic!("unexpected replication stream error"),
118+
None => panic!("unexpected replication stream end"),
119+
}
120+
};
121+
122+
assert_eq!(begin.final_lsn(), commit.commit_lsn());
123+
assert_eq!(insert.rel_id(), rel_id);
124+
125+
let tuple_data = insert.tuple().tuple_data();
126+
assert_eq!(tuple_data.len(), 1);
127+
assert!(matches!(tuple_data[0], TupleData::Text(_)));
128+
if let TupleData::Text(data) = &tuple_data[0] {
129+
assert_eq!(data, &b"42"[..]);
130+
}
131+
132+
// Send a standby status update and require a keep alive response
133+
let lsn: PgLsn = lsn.parse().unwrap();
134+
let epoch = UNIX_EPOCH + Duration::from_secs(946_684_800);
135+
let ts = epoch.elapsed().unwrap().as_micros() as i64;
136+
stream
137+
.as_mut()
138+
.standby_status_update(lsn, lsn, lsn, ts, 1)
139+
.await
140+
.unwrap();
141+
loop {
142+
match stream.next().await {
143+
Some(Ok(PrimaryKeepAlive(_))) => break,
144+
Some(Ok(_)) => (),
145+
Some(Err(e)) => panic!("unexpected replication stream error: {}", e),
146+
None => panic!("unexpected replication stream end"),
147+
}
148+
}
149+
}

0 commit comments

Comments
 (0)