Skip to content

Commit e016a04

Browse files
committed
fmt code and use wal2json in tests
1 parent 2780b2e commit e016a04

File tree

7 files changed

+73
-47
lines changed

7 files changed

+73
-47
lines changed

docker/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
FROM postgres:12
22

33
COPY sql_setup.sh /docker-entrypoint-initdb.d/
4+
RUN apt-get update && apt-get install postgresql-12-wal2json

docker/sql_setup.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ ssl_cert_file = 'server.crt'
6666
ssl_key_file = 'server.key'
6767
wal_level = logical
6868
log_statement = 'all'
69+
shared_preload_libraries = 'wal2json'
6970
EOCONF
7071

7172
cat > "$PGDATA/pg_hba.conf" <<-EOCONF

tokio-postgres/src/client.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::codec::BackendMessages;
22
use crate::config::{Host, SslMode};
33
use crate::connection::{Request, RequestMessages};
44
use crate::copy_out::CopyOutStream;
5-
use crate::replication::ReplicationStream;
65
use crate::query::RowStream;
6+
use crate::replication::ReplicationStream;
77
use crate::simple_query::SimpleQueryStream;
88
#[cfg(feature = "runtime")]
99
use crate::tls::MakeTlsConnect;
@@ -12,8 +12,9 @@ use crate::types::{Oid, ToSql, Type};
1212
#[cfg(feature = "runtime")]
1313
use crate::Socket;
1414
use crate::{
15-
copy_in, copy_out, replication, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
16-
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
15+
copy_in, copy_out, prepare, query, replication, simple_query, slice_iter, CancelToken,
16+
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
17+
TransactionBuilder,
1718
};
1819
use bytes::{Buf, BytesMut};
1920
use fallible_iterator::FallibleIterator;
@@ -435,14 +436,12 @@ impl Client {
435436
}
436437

437438
/// TODO!
438-
pub async fn start_replication(&self, query: &str) -> Result<ReplicationStream, Error>
439-
{
439+
pub async fn start_replication(&self, query: &str) -> Result<ReplicationStream, Error> {
440440
replication::start_replication(self.inner(), query).await
441441
}
442442

443443
/// TODO!
444-
pub async fn stop_replication(&self) -> Result<(), Error>
445-
{
444+
pub async fn stop_replication(&self) -> Result<(), Error> {
446445
replication::stop_replication(self.inner()).await
447446
}
448447

tokio-postgres/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl Config {
185185
keepalives_idle: Duration::from_secs(2 * 60 * 60),
186186
target_session_attrs: TargetSessionAttrs::Any,
187187
channel_binding: ChannelBinding::Prefer,
188-
replication: None
188+
replication: None,
189189
}
190190
}
191191

tokio-postgres/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ pub use crate::config::Config;
121121
pub use crate::connection::Connection;
122122
pub use crate::copy_in::CopyInSink;
123123
pub use crate::copy_out::CopyOutStream;
124-
pub use crate::replication::ReplicationStream;
125124
use crate::error::DbError;
126125
pub use crate::error::Error;
127126
pub use crate::generic_client::GenericClient;
128127
pub use crate::portal::Portal;
129128
pub use crate::query::RowStream;
129+
pub use crate::replication::ReplicationStream;
130130
pub use crate::row::{Row, SimpleQueryRow};
131131
pub use crate::simple_query::SimpleQueryStream;
132132
#[cfg(feature = "runtime")]
@@ -158,13 +158,13 @@ mod connect_tls;
158158
mod connection;
159159
mod copy_in;
160160
mod copy_out;
161-
mod replication;
162161
pub mod error;
163162
mod generic_client;
164163
mod maybe_tls_stream;
165164
mod portal;
166165
mod prepare;
167166
mod query;
167+
mod replication;
168168
pub mod row;
169169
mod simple_query;
170170
#[cfg(feature = "runtime")]

tokio-postgres/src/replication.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,37 @@ use crate::connection::RequestMessages;
44
use crate::{simple_query, Error};
55
use bytes::{Bytes, BytesMut};
66
use futures::{ready, Stream};
7+
use log::trace;
78
use pin_project_lite::pin_project;
89
use postgres_protocol::message::backend::Message;
9-
//use std::marker::PhantomPinned;
10+
use postgres_protocol::message::frontend;
11+
use std::marker::PhantomPinned;
1012
use std::pin::Pin;
1113
use std::task::{Context, Poll};
12-
use log::{trace};
13-
use postgres_protocol::message::frontend;
1414

15-
pub async fn start_replication(client: &InnerClient, query: &str) -> Result<ReplicationStream, Error> {
15+
pub async fn start_replication(
16+
client: &InnerClient,
17+
query: &str,
18+
) -> Result<ReplicationStream, Error> {
1619
trace!("executing start replication query {}", query);
1720

1821
let buf = simple_query::encode(client, query)?;
1922
let responses = start(client, buf).await?;
2023
Ok(ReplicationStream {
2124
responses,
22-
// _p: PhantomPinned,
25+
_p: PhantomPinned,
2326
})
2427
}
2528

2629
pub async fn stop_replication(client: &InnerClient) -> Result<(), Error> {
2730
trace!("executing stop replication");
2831
let mut buf = BytesMut::new();
29-
frontend::copy_done(&mut buf);
30-
frontend::sync(&mut buf);
32+
frontend::copy_done(&mut buf);
3133
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?;
3234
Ok(())
3335
}
3436

3537
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
36-
3738
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
3839
trace!("start in repication");
3940

@@ -49,8 +50,8 @@ pin_project! {
4950
/// A stream of `START_REPLICATION` query data.
5051
pub struct ReplicationStream {
5152
responses: Responses,
52-
// #[pin]
53-
// _p: PhantomPinned,
53+
#[pin]
54+
_p: PhantomPinned,
5455
}
5556
}
5657

@@ -64,10 +65,6 @@ impl Stream for ReplicationStream {
6465
Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
6566
Message::CopyDone => Poll::Ready(None),
6667
_ => Poll::Ready(Some(Err(Error::unexpected_message()))),
67-
// m => {
68-
// debug!("msg: {:?}", m);
69-
// Poll::Ready(Some(Ok(Bytes::from("Hello world"))))
70-
// }
7168
}
7269
}
7370
}

tokio-postgres/tests/test/main.rs

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ mod parse;
2121
#[cfg(feature = "runtime")]
2222
mod runtime;
2323
mod types;
24-
use std::thread;
2524
use std::sync::{Arc, Mutex};
25+
use std::thread;
2626
async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> {
2727
let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
2828
let config = s.parse::<Config>().unwrap();
@@ -806,44 +806,72 @@ async fn replication_start() {
806806
let client = connect("user=postgres").await;
807807
let r_client = connect("user=postgres replication=database").await;
808808

809-
client.batch_execute(
810-
"CREATE TABLE foo (
809+
client
810+
.batch_execute(
811+
"CREATE TABLE IF NOT EXISTS foo (
811812
id SERIAL,
812813
name TEXT
813814
);
814815
DROP PUBLICATION IF EXISTS rust;
815816
CREATE PUBLICATION rust FOR ALL TABLES;
816-
"
817-
).await.unwrap();
818-
let _ = r_client.simple_query("DROP_REPLICATION_SLOT rust_slot").await;
819-
let _ = r_client.simple_query("CREATE_REPLICATION_SLOT rust_slot LOGICAL pgoutput NOEXPORT_SNAPSHOT").await;
817+
",
818+
)
819+
.await
820+
.unwrap();
821+
let _ = r_client
822+
.simple_query("DROP_REPLICATION_SLOT rust_slot")
823+
.await;
824+
// let _ = r_client.simple_query("CREATE_REPLICATION_SLOT rust_slot LOGICAL pgoutput NOEXPORT_SNAPSHOT").await;
825+
let _ = r_client
826+
.simple_query("CREATE_REPLICATION_SLOT rust_slot LOGICAL wal2json NOEXPORT_SNAPSHOT")
827+
.await;
820828

821829
let stream = r_client
822-
.start_replication("START_REPLICATION SLOT rust_slot LOGICAL 0/0 (proto_version '1', publication_names 'rust')")
830+
//.start_replication("START_REPLICATION SLOT rust_slot LOGICAL 0/0 (proto_version '1', publication_names 'rust')")
831+
.start_replication("START_REPLICATION SLOT rust_slot LOGICAL 0/0 (\"pretty-print\" '1', \"format-version\" '2')")
823832
.await
824833
.unwrap();
825-
834+
835+
let events = Arc::new(Mutex::new(vec![]));
826836
let total_events = Arc::new(Mutex::new(0));
827837
let total_events_1 = Arc::clone(&total_events);
838+
let events_1 = Arc::clone(&events);
828839
let t = tokio::spawn(async move {
829-
let events = stream
830-
.try_collect::<Vec<_>>()
831-
.await
832-
.unwrap();
840+
let events = stream.try_collect::<Vec<_>>().await.unwrap();
833841
let mut num = total_events_1.lock().unwrap();
834842
*num = events.len();
843+
let mut ev = events_1.lock().unwrap();
844+
*ev = events;
835845
});
836-
837-
client.batch_execute(
838-
"
839-
INSERT INTO foo (name) VALUES ('jim'), ('joe');
840-
INSERT INTO foo (name) VALUES ('ann');
841-
DROP TABLE foo;
842-
"
843-
).await.unwrap();
846+
847+
client
848+
.query("INSERT INTO foo (name) VALUES ('ann')", &[])
849+
.await
850+
.unwrap();
851+
client
852+
.query("INSERT INTO foo (name) VALUES ('jim'), ('joe')", &[])
853+
.await
854+
.unwrap();
855+
client.query("DROP TABLE foo", &[]).await.unwrap();
856+
844857
thread::sleep(time::Duration::from_secs(1)); //give a chance to pg to send the events
845858
let _ = r_client.stop_replication().await;
846859
let _ = t.await;
847-
assert_eq!(*total_events.lock().unwrap(), 7);
860+
println!("events {:?}", *events.lock().unwrap());
861+
//assert_eq!(*total_events.lock().unwrap(), 2);
862+
for e in &*events.lock().unwrap() {
863+
match e[0].into() {
864+
'k' => {
865+
// let message_type = "keepalive";
866+
// println!("type: ({}), {}", message_type, e.len());
867+
}
868+
'w' => {
869+
let message_type = "dataframe";
870+
let slice = e.slice(25..);
871+
let data = std::str::from_utf8(slice.as_ref()).unwrap();
872+
println!("type: ({}), {}, {}", message_type, e.len(), data);
873+
}
874+
_ => {}
875+
};
876+
}
848877
}
849-

0 commit comments

Comments
 (0)