Skip to content

Commit 8587850

Browse files
committed
cleanup_status_update and cleanup replication test
1 parent e016a04 commit 8587850

File tree

6 files changed

+100
-43
lines changed

6 files changed

+100
-43
lines changed

docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,6 @@ services:
66
- 5433:5433
77
environment:
88
- POSTGRES_PASSWORD=pass
9+
volumes:
10+
- "./docker/sql_setup.sh:/docker-entrypoint-initdb.d/sql_setup.sh"
911

docker/sql_setup.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ ssl_cert_file = 'server.crt'
6666
ssl_key_file = 'server.key'
6767
wal_level = logical
6868
log_statement = 'all'
69-
shared_preload_libraries = 'wal2json'
7069
EOCONF
7170

7271
cat > "$PGDATA/pg_hba.conf" <<-EOCONF

postgres-protocol/src/message/frontend.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,26 @@ pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
132132
})
133133
}
134134

135+
#[inline]
136+
pub fn standby_status_update(
137+
write_lsn: i64,
138+
flush_lsn: i64,
139+
apply_lsn: i64,
140+
timestamp: i64,
141+
buf: &mut BytesMut,
142+
) -> io::Result<()> {
143+
buf.put_u8(b'd');
144+
write_body(buf, |buf| {
145+
buf.put_u8(b'r');
146+
buf.put_i64(write_lsn + 1);
147+
buf.put_i64(flush_lsn + 1);
148+
buf.put_i64(apply_lsn + 1);
149+
buf.put_i64(timestamp);
150+
buf.put_u8(0);
151+
Ok(())
152+
})
153+
}
154+
135155
pub struct CopyData<T> {
136156
buf: T,
137157
len: i32,

tokio-postgres/src/client.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,16 +435,26 @@ impl Client {
435435
copy_out::copy_out(self.inner(), statement).await
436436
}
437437

438-
/// TODO!
438+
/// Executes a 'START_REPLICATION SLOT ...', returning a stream of raw replication events
439439
pub async fn start_replication(&self, query: &str) -> Result<ReplicationStream, Error> {
440440
replication::start_replication(self.inner(), query).await
441441
}
442442

443-
/// TODO!
443+
/// Stoppes the current replication by sending a copy_done message
444444
pub async fn stop_replication(&self) -> Result<(), Error> {
445445
replication::stop_replication(self.inner()).await
446446
}
447447

448+
/// Notifies PostgreSQL of the last processed WAL
449+
pub async fn standby_status_update(
450+
&self,
451+
write_lsn: i64,
452+
flush_lsn: i64,
453+
apply_lsn: i64,
454+
) -> Result<(), Error> {
455+
replication::standby_status_update(self.inner(), write_lsn, flush_lsn, apply_lsn).await
456+
}
457+
448458
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
449459
///
450460
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

tokio-postgres/src/replication.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use postgres_protocol::message::frontend;
1111
use std::marker::PhantomPinned;
1212
use std::pin::Pin;
1313
use std::task::{Context, Poll};
14-
14+
use std::time::{SystemTime, UNIX_EPOCH};
15+
const J2000_EPOCH_GAP: u128 = 946_684_800_000_000;
1516
pub async fn start_replication(
1617
client: &InnerClient,
1718
query: &str,
@@ -34,6 +35,24 @@ pub async fn stop_replication(client: &InnerClient) -> Result<(), Error> {
3435
Ok(())
3536
}
3637

38+
pub async fn standby_status_update(
39+
client: &InnerClient,
40+
write_lsn: i64,
41+
flush_lsn: i64,
42+
apply_lsn: i64,
43+
) -> Result<(), Error> {
44+
trace!("executing standby_status_update");
45+
let now = SystemTime::now()
46+
.duration_since(UNIX_EPOCH)
47+
.unwrap()
48+
.as_micros()
49+
- J2000_EPOCH_GAP;
50+
let mut buf = BytesMut::new();
51+
let _ = frontend::standby_status_update(write_lsn, flush_lsn, apply_lsn, now as i64, &mut buf);
52+
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?;
53+
Ok(())
54+
}
55+
3756
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
3857
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
3958
trace!("start in repication");

tokio-postgres/tests/test/main.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use futures::channel::mpsc;
55
use futures::{
66
future, join, pin_mut, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt,
77
};
8+
use std::convert::TryInto;
89
use std::fmt::Write;
910
use std::time::Duration;
1011
use tokio::net::TcpStream;
@@ -21,7 +22,6 @@ mod parse;
2122
#[cfg(feature = "runtime")]
2223
mod runtime;
2324
mod types;
24-
use std::sync::{Arc, Mutex};
2525
use std::thread;
2626
async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> {
2727
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
@@ -808,70 +808,77 @@ async fn replication_start() {
808808

809809
client
810810
.batch_execute(
811-
"CREATE TABLE IF NOT EXISTS foo (
811+
"CREATE TABLE IF NOT EXISTS replication (
812812
id SERIAL,
813813
name TEXT
814814
);
815-
DROP PUBLICATION IF EXISTS rust;
816-
CREATE PUBLICATION rust FOR ALL TABLES;
817815
",
818816
)
819817
.await
820818
.unwrap();
821819
let _ = r_client
822-
.simple_query("DROP_REPLICATION_SLOT rust_slot")
823-
.await;
824-
// let _ = r_client.simple_query("CREATE_REPLICATION_SLOT rust_slot LOGICAL pgoutput NOEXPORT_SNAPSHOT").await;
825-
let _ = r_client
826-
.simple_query("CREATE_REPLICATION_SLOT rust_slot LOGICAL wal2json NOEXPORT_SNAPSHOT")
820+
.simple_query(
821+
"CREATE_REPLICATION_SLOT rust_slot TEMPORARY LOGICAL test_decoding NOEXPORT_SNAPSHOT",
822+
)
827823
.await;
828824

829825
let stream = r_client
830-
//.start_replication("START_REPLICATION SLOT rust_slot LOGICAL 0/0 (proto_version '1', publication_names 'rust')")
831-
.start_replication("START_REPLICATION SLOT rust_slot LOGICAL 0/0 (\"pretty-print\" '1', \"format-version\" '2')")
826+
.start_replication(
827+
"START_REPLICATION SLOT rust_slot LOGICAL 0/0 (\"skip-empty-xacts\" '1')",
828+
)
832829
.await
833830
.unwrap();
834831

835-
let events = Arc::new(Mutex::new(vec![]));
836-
let total_events = Arc::new(Mutex::new(0));
837-
let total_events_1 = Arc::clone(&total_events);
838-
let events_1 = Arc::clone(&events);
839-
let t = tokio::spawn(async move {
840-
let events = stream.try_collect::<Vec<_>>().await.unwrap();
841-
let mut num = total_events_1.lock().unwrap();
842-
*num = events.len();
843-
let mut ev = events_1.lock().unwrap();
844-
*ev = events;
845-
});
846-
847832
client
848-
.query("INSERT INTO foo (name) VALUES ('ann')", &[])
833+
.query("INSERT INTO replication (name) VALUES ('ann')", &[])
849834
.await
850835
.unwrap();
851-
client
852-
.query("INSERT INTO foo (name) VALUES ('jim'), ('joe')", &[])
853-
.await
854-
.unwrap();
855-
client.query("DROP TABLE foo", &[]).await.unwrap();
856836

857837
thread::sleep(time::Duration::from_secs(1)); //give a chance to pg to send the events
858838
let _ = r_client.stop_replication().await;
859-
let _ = t.await;
860-
println!("events {:?}", *events.lock().unwrap());
861-
//assert_eq!(*total_events.lock().unwrap(), 2);
862-
for e in &*events.lock().unwrap() {
839+
let events = stream.try_collect::<Vec<_>>().await.unwrap();
840+
client.query("DROP TABLE replication", &[]).await.unwrap();
841+
842+
let mut total_events = 0;
843+
for e in &events {
863844
match e[0].into() {
864845
'k' => {
865-
// let message_type = "keepalive";
866-
// println!("type: ({}), {}", message_type, e.len());
846+
//keepalive message
847+
let current_wal_end =
848+
i64::from_be_bytes(e.slice(1..9).as_ref().try_into().unwrap());
849+
let timestamp = i64::from_be_bytes(e.slice(9..17).as_ref().try_into().unwrap());
850+
let reply: char = e[17].into();
851+
println!(
852+
"keepalive tiemstamp: {} current_wal_end: {:X}/{:X} reply: {}",
853+
timestamp,
854+
(current_wal_end >> 32) as i32,
855+
current_wal_end as i32,
856+
reply as i8
857+
);
867858
}
868859
'w' => {
869-
let message_type = "dataframe";
870-
let slice = e.slice(25..);
871-
let data = std::str::from_utf8(slice.as_ref()).unwrap();
872-
println!("type: ({}), {}, {}", message_type, e.len(), data);
860+
// WAL message
861+
let current_wal = i64::from_be_bytes(e.slice(1..9).as_ref().try_into().unwrap());
862+
let current_wal_end =
863+
i64::from_be_bytes(e.slice(9..17).as_ref().try_into().unwrap());
864+
let timestamp = i64::from_be_bytes(e.slice(17..25).as_ref().try_into().unwrap());
865+
let _data = e.slice(25..); //the format of these bytes depends on the logical decoder
866+
println!(
867+
"WAL timestamp: {} current_wal: {:X}/{:X} current_wal_end: {:X}/{:X} {:?}",
868+
timestamp,
869+
(current_wal >> 32) as i32,
870+
current_wal as i32,
871+
(current_wal_end >> 32) as i32,
872+
current_wal_end as i32,
873+
_data
874+
);
875+
total_events += 1;
876+
//while in replication state, one needs to send updates from time to time like this
877+
//let _ = r_pg_client.standby_status_update(current_wal, current_wal, current_wal).await.unwrap();
873878
}
874879
_ => {}
875880
};
876881
}
882+
// in this case we receive 3 events (BEGIN,INSERT,COMMIT)
883+
assert_eq!(total_events, 3);
877884
}

0 commit comments

Comments
 (0)