Skip to content

Replication support (#116) #696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/sql_setup.sh
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@ port = 5433
ssl = on
ssl_cert_file = 'server.crt'
ssl_key_file = 'server.key'
wal_level = logical
EOCONF

cat > "$PGDATA/pg_hba.conf" <<-EOCONF
@@ -82,6 +83,7 @@ host all ssl_user ::0/0 reject

# IPv4 local connections:
host all postgres 0.0.0.0/0 trust
host replication postgres 0.0.0.0/0 trust
# IPv6 local connections:
host all postgres ::0/0 trust
# Unix socket connections:
146 changes: 146 additions & 0 deletions postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use std::str;

use crate::Oid;

// top-level message tags
pub const PARSE_COMPLETE_TAG: u8 = b'1';
pub const BIND_COMPLETE_TAG: u8 = b'2';
pub const CLOSE_COMPLETE_TAG: u8 = b'3';
@@ -22,6 +23,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
pub const ERROR_RESPONSE_TAG: u8 = b'E';
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
pub const NO_DATA_TAG: u8 = b'n';
@@ -33,6 +35,10 @@ pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
pub const READY_FOR_QUERY_TAG: u8 = b'Z';

// replication message tags
pub const XLOG_DATA_TAG: u8 = b'w';
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';

#[derive(Debug, Copy, Clone)]
pub struct Header {
tag: u8,
@@ -93,6 +99,7 @@ pub enum Message {
CopyDone,
CopyInResponse(CopyInResponseBody),
CopyOutResponse(CopyOutResponseBody),
CopyBothResponse(CopyBothResponseBody),
DataRow(DataRowBody),
EmptyQueryResponse,
ErrorResponse(ErrorResponseBody),
@@ -190,6 +197,16 @@ impl Message {
storage,
})
}
COPY_BOTH_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::CopyBothResponse(CopyBothResponseBody {
format,
len,
storage,
})
}
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
BACKEND_KEY_DATA_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
@@ -278,6 +295,57 @@ impl Message {
}
}

/// An enum representing Postgres backend replication messages.
#[non_exhaustive]
pub enum ReplicationMessage {
XLogData(XLogDataBody),
PrimaryKeepAlive(PrimaryKeepAliveBody),
}

impl ReplicationMessage {
pub fn parse(bytes: &Bytes) -> io::Result<ReplicationMessage> {
let mut buf = Buffer {
bytes: bytes.clone(),
idx: 0,
};

let tag = buf.read_u8()?;

let replication_message = match tag {
XLOG_DATA_TAG => {
let wal_start = buf.read_u64::<BigEndian>()?;
let wal_end = buf.read_u64::<BigEndian>()?;
let timestamp = buf.read_i64::<BigEndian>()?;
let storage = buf.read_all();
ReplicationMessage::XLogData(XLogDataBody {
wal_start,
wal_end,
timestamp,
storage,
})
}
PRIMARY_KEEPALIVE_TAG => {
let wal_end = buf.read_u64::<BigEndian>()?;
let timestamp = buf.read_i64::<BigEndian>()?;
let reply = buf.read_u8()?;
ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
wal_end,
timestamp,
reply,
})
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown replication message tag `{}`", tag),
));
}
};

Ok(replication_message)
}
}

struct Buffer {
bytes: Bytes,
idx: usize,
@@ -524,6 +592,27 @@ impl CopyOutResponseBody {
}
}

pub struct CopyBothResponseBody {
storage: Bytes,
len: u16,
format: u8,
}

impl CopyBothResponseBody {
#[inline]
pub fn format(&self) -> u8 {
self.format
}

#[inline]
pub fn column_formats(&self) -> ColumnFormats<'_> {
ColumnFormats {
remaining: self.len,
buf: &self.storage,
}
}
}

pub struct DataRowBody {
storage: Bytes,
len: u16,
@@ -776,6 +865,63 @@ impl RowDescriptionBody {
}
}

pub struct XLogDataBody {
wal_start: u64,
wal_end: u64,
timestamp: i64,
storage: Bytes,
}

impl XLogDataBody {
#[inline]
pub fn wal_start(&self) -> u64 {
self.wal_start
}

#[inline]
pub fn wal_end(&self) -> u64 {
self.wal_end
}

#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}

#[inline]
pub fn data(&self) -> &[u8] {
&self.storage
}

#[inline]
pub fn into_bytes(self) -> Bytes {
self.storage
}
}

pub struct PrimaryKeepAliveBody {
wal_end: u64,
timestamp: i64,
reply: u8,
}

impl PrimaryKeepAliveBody {
#[inline]
pub fn wal_end(&self) -> u64 {
self.wal_end
}

#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}

#[inline]
pub fn reply(&self) -> u8 {
self.reply
}
}

pub struct Fields<'a> {
buf: &'a [u8],
remaining: u16,
42 changes: 42 additions & 0 deletions postgres-protocol/src/message/frontend.rs
Original file line number Diff line number Diff line change
@@ -132,6 +132,48 @@ pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
})
}

#[inline]
pub fn standby_status_update(
write_lsn: u64,
flush_lsn: u64,
apply_lsn: u64,
timestamp: i64,
reply: u8,
buf: &mut BytesMut,
) -> io::Result<()> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'r');
buf.put_u64(write_lsn);
buf.put_u64(flush_lsn);
buf.put_u64(apply_lsn);
buf.put_i64(timestamp);
buf.put_u8(reply);
Ok(())
})
}

#[inline]
pub fn hot_standby_feedback(
timestamp: i64,
global_xmin: u32,
global_xmin_epoch: u32,
catalog_xmin: u32,
catalog_xmin_epoch: u32,
buf: &mut BytesMut,
) -> io::Result<()> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'h');
buf.put_i64(timestamp);
buf.put_u32(global_xmin);
buf.put_u32(global_xmin_epoch);
buf.put_u32(catalog_xmin);
buf.put_u32(catalog_xmin_epoch);
Ok(())
})
}

pub struct CopyData<T> {
buf: T,
len: i32,
2 changes: 1 addition & 1 deletion tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ futures = "0.3"
log = "0.4"
parking_lot = "0.11"
percent-encoding = "2.0"
pin-project-lite = "0.2"
pin-project = "1.0"
phf = "0.8"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-types = { version = "0.1.2", path = "../postgres-types" }
36 changes: 17 additions & 19 deletions tokio-postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use crate::{slice_iter, CopyInSink, CopyOutStream, Error};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{ready, SinkExt, Stream};
use pin_project_lite::pin_project;
use pin_project::pin_project;
use postgres_types::BorrowToSql;
use std::convert::TryFrom;
use std::io;
@@ -18,16 +18,15 @@ use std::task::{Context, Poll};
const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
const HEADER_LEN: usize = MAGIC.len() + 4 + 4;

pin_project! {
/// A type which serializes rows into the PostgreSQL binary copy format.
///
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
pub struct BinaryCopyInWriter {
#[pin]
sink: CopyInSink<Bytes>,
types: Vec<Type>,
buf: BytesMut,
}
/// A type which serializes rows into the PostgreSQL binary copy format.
///
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
#[pin_project]
pub struct BinaryCopyInWriter {
#[pin]
sink: CopyInSink<Bytes>,
types: Vec<Type>,
buf: BytesMut,
}

impl BinaryCopyInWriter {
@@ -115,14 +114,13 @@ struct Header {
has_oids: bool,
}

pin_project! {
/// A stream of rows deserialized from the PostgreSQL binary copy format.
pub struct BinaryCopyOutStream {
#[pin]
stream: CopyOutStream,
types: Arc<Vec<Type>>,
header: Option<Header>,
}
/// A stream of rows deserialized from the PostgreSQL binary copy format.
#[pin_project]
pub struct BinaryCopyOutStream {
#[pin]
stream: CopyOutStream,
types: Arc<Vec<Type>>,
header: Option<Header>,
}

impl BinaryCopyOutStream {
20 changes: 19 additions & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
@@ -70,7 +70,10 @@ pub struct InnerClient {
impl InnerClient {
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request { messages, sender };
let request = Request {
messages: messages,
sender: Some(sender),
};
self.sender
.unbounded_send(request)
.map_err(|_| Error::closed())?;
@@ -81,6 +84,21 @@ impl InnerClient {
})
}

// Send a message for the existing entry in the pipeline; don't
// create a new entry in the pipeline. This is needed for CopyBoth
// mode (i.e. streaming replication), where the client may send a
// new message that is part of the existing request.
pub fn unpipelined_send(&self, messages: RequestMessages) -> Result<(), Error> {
let request = Request {
messages: messages,
sender: None,
};
self.sender
.unbounded_send(request)
.map_err(|_| Error::closed())?;
Ok(())
}

pub fn typeinfo(&self) -> Option<Statement> {
self.state.lock().typeinfo.clone()
}
30 changes: 30 additions & 0 deletions tokio-postgres/src/config.rs
Original file line number Diff line number Diff line change
@@ -56,6 +56,16 @@ pub enum ChannelBinding {
Require,
}

/// Replication mode configuration.
#[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum ReplicationMode {
/// Physical replication.
Physical,
/// Logical replication.
Logical,
}

/// A host specification.
#[derive(Debug, Clone, PartialEq)]
pub enum Host {
@@ -159,6 +169,7 @@ pub struct Config {
pub(crate) keepalives_idle: Duration,
pub(crate) target_session_attrs: TargetSessionAttrs,
pub(crate) channel_binding: ChannelBinding,
pub(crate) replication_mode: Option<ReplicationMode>,
}

impl Default for Config {
@@ -184,6 +195,7 @@ impl Config {
keepalives_idle: Duration::from_secs(2 * 60 * 60),
target_session_attrs: TargetSessionAttrs::Any,
channel_binding: ChannelBinding::Prefer,
replication_mode: None,
}
}

@@ -387,6 +399,17 @@ impl Config {
self.channel_binding
}

/// Set replication mode.
pub fn replication_mode(&mut self, replication_mode: ReplicationMode) -> &mut Config {
self.replication_mode = Some(replication_mode);
self
}

/// Get replication mode.
pub fn get_replication_mode(&self) -> Option<ReplicationMode> {
self.replication_mode
}

fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
match key {
"user" => {
@@ -534,6 +557,12 @@ impl fmt::Debug for Config {
}
}

let replication_mode_str = match self.replication_mode {
None => "false",
Some(ReplicationMode::Physical) => "true",
Some(ReplicationMode::Logical) => "database",
};

f.debug_struct("Config")
.field("user", &self.user)
.field("password", &self.password.as_ref().map(|_| Redaction {}))
@@ -548,6 +577,7 @@ impl fmt::Debug for Config {
.field("keepalives_idle", &self.keepalives_idle)
.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding)
.field("replication", &replication_mode_str.to_string())
.finish()
}
}
8 changes: 7 additions & 1 deletion tokio-postgres/src/connect_raw.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
use crate::config::{self, Config};
use crate::config::{self, Config, ReplicationMode};
use crate::connect_tls::connect_tls;
use crate::maybe_tls_stream::MaybeTlsStream;
use crate::tls::{TlsConnect, TlsStream};
@@ -124,6 +124,12 @@ where
if let Some(application_name) = &config.application_name {
params.push(("application_name", &**application_name));
}
if let Some(replication_mode) = &config.replication_mode {
match replication_mode {
ReplicationMode::Physical => params.push(("replication", "true")),
ReplicationMode::Logical => params.push(("replication", "database")),
}
}

let mut buf = BytesMut::new();
frontend::startup_message(params, &mut buf).map_err(Error::encode)?;
8 changes: 4 additions & 4 deletions tokio-postgres/src/connection.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ pub enum RequestMessages {

pub struct Request {
pub messages: RequestMessages,
pub sender: mpsc::Sender<BackendMessages>,
pub sender: Option<mpsc::Sender<BackendMessages>>,
}

pub struct Response {
@@ -183,9 +183,9 @@ where
match self.receiver.poll_next_unpin(cx) {
Poll::Ready(Some(request)) => {
trace!("polled new request");
self.responses.push_back(Response {
sender: request.sender,
});
if let Some(sender) = request.sender {
self.responses.push_back(Response { sender: sender });
}
Poll::Ready(Some(request.messages))
}
Poll::Ready(None) => Poll::Ready(None),
31 changes: 15 additions & 16 deletions tokio-postgres/src/copy_in.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use futures::channel::mpsc;
use futures::future;
use futures::{ready, Sink, SinkExt, Stream, StreamExt};
use log::debug;
use pin_project_lite::pin_project;
use pin_project::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use postgres_protocol::message::frontend::CopyData;
@@ -69,21 +69,20 @@ enum SinkState {
Reading,
}

pin_project! {
/// A sink for `COPY ... FROM STDIN` query data.
///
/// The copy *must* be explicitly completed via the `Sink::close` or `finish` methods. If it is
/// not, the copy will be aborted.
pub struct CopyInSink<T> {
#[pin]
sender: mpsc::Sender<CopyInMessage>,
responses: Responses,
buf: BytesMut,
state: SinkState,
#[pin]
_p: PhantomPinned,
_p2: PhantomData<T>,
}
/// A sink for `COPY ... FROM STDIN` query data.
///
/// The copy *must* be explicitly completed via the `Sink::close` or `finish` methods. If it is
/// not, the copy will be aborted.
#[pin_project]
pub struct CopyInSink<T> {
#[pin]
sender: mpsc::Sender<CopyInMessage>,
responses: Responses,
buf: BytesMut,
state: SinkState,
#[pin]
_p: PhantomPinned,
_p2: PhantomData<T>,
}

impl<T> CopyInSink<T>
15 changes: 7 additions & 8 deletions tokio-postgres/src/copy_out.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use crate::{query, slice_iter, Error, Statement};
use bytes::Bytes;
use futures::{ready, Stream};
use log::debug;
use pin_project_lite::pin_project;
use pin_project::pin_project;
use postgres_protocol::message::backend::Message;
use std::marker::PhantomPinned;
use std::pin::Pin;
@@ -38,13 +38,12 @@ async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
Ok(responses)
}

pin_project! {
/// A stream of `COPY ... TO STDOUT` query data.
pub struct CopyOutStream {
responses: Responses,
#[pin]
_p: PhantomPinned,
}
/// A stream of `COPY ... TO STDOUT` query data.
#[pin_project]
pub struct CopyOutStream {
responses: Responses,
#[pin]
_p: PhantomPinned,
}

impl Stream for CopyOutStream {
28 changes: 27 additions & 1 deletion tokio-postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -117,7 +117,7 @@

pub use crate::cancel_token::CancelToken;
pub use crate::client::Client;
pub use crate::config::Config;
pub use crate::config::{Config, ReplicationMode};
pub use crate::connection::Connection;
pub use crate::copy_in::CopyInSink;
pub use crate::copy_out::CopyOutStream;
@@ -126,6 +126,7 @@ pub use crate::error::Error;
pub use crate::generic_client::GenericClient;
pub use crate::portal::Portal;
pub use crate::query::RowStream;
use crate::replication_client::ReplicationClient;
pub use crate::row::{Row, SimpleQueryRow};
pub use crate::simple_query::SimpleQueryStream;
#[cfg(feature = "runtime")]
@@ -163,6 +164,7 @@ mod maybe_tls_stream;
mod portal;
mod prepare;
mod query;
pub mod replication_client;
pub mod row;
mod simple_query;
#[cfg(feature = "runtime")]
@@ -193,6 +195,30 @@ where
config.connect(tls).await
}

/// A convenience function which parses a connection string and connects to the database in replication mode. Normal queries are not permitted in replication mode.
///
/// See the documentation for [`Config`] for details on the connection string format.
///
/// Requires the `runtime` Cargo feature (enabled by default).
///
/// [`Config`]: config/struct.Config.html
#[cfg(feature = "runtime")]
pub async fn connect_replication<T>(
config: &str,
tls: T,
mode: ReplicationMode,
) -> Result<(ReplicationClient, Connection<Socket, T::Stream>), Error>
where
T: MakeTlsConnect<Socket>,
{
let mut config = config.parse::<Config>()?;
config.replication_mode(mode);
config
.connect(tls)
.await
.map(|(client, conn)| (ReplicationClient::new(client), conn))
}

/// An asynchronous notification.
#[derive(Clone, Debug)]
pub struct Notification {
17 changes: 8 additions & 9 deletions tokio-postgres/src/query.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use crate::{Error, Portal, Row, Statement};
use bytes::{Bytes, BytesMut};
use futures::{ready, Stream};
use log::{debug, log_enabled, Level};
use pin_project_lite::pin_project;
use pin_project::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::fmt;
@@ -188,14 +188,13 @@ where
}
}

pin_project! {
/// A stream of table rows.
pub struct RowStream {
statement: Statement,
responses: Responses,
#[pin]
_p: PhantomPinned,
}
/// A stream of table rows.
#[pin_project]
pub struct RowStream {
statement: Statement,
responses: Responses,
#[pin]
_p: PhantomPinned,
}

impl Stream for RowStream {
882 changes: 882 additions & 0 deletions tokio-postgres/src/replication_client.rs

Large diffs are not rendered by default.

19 changes: 9 additions & 10 deletions tokio-postgres/src/simple_query.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures::{ready, Stream};
use log::debug;
use pin_project_lite::pin_project;
use pin_project::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::marker::PhantomPinned;
@@ -45,21 +45,20 @@ pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Erro
}
}

fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.split().freeze())
})
}

pin_project! {
/// A stream of simple query results.
pub struct SimpleQueryStream {
responses: Responses,
columns: Option<Arc<[String]>>,
#[pin]
_p: PhantomPinned,
}
/// A stream of simple query results.
#[pin_project]
pub struct SimpleQueryStream {
responses: Responses,
columns: Option<Arc<[String]>>,
#[pin]
_p: PhantomPinned,
}

impl Stream for SimpleQueryStream {
42 changes: 42 additions & 0 deletions tokio-postgres/src/types.rs
Original file line number Diff line number Diff line change
@@ -4,3 +4,45 @@
#[doc(inline)]
pub use postgres_types::*;

use std::fmt;

/// Log Sequence Number for PostgreSQL Write-Ahead Log (transaction log).
#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)]
pub struct Lsn(u64);

impl From<&str> for Lsn {
fn from(lsn_str: &str) -> Self {
let split: Vec<&str> = lsn_str.split('/').collect();
assert_eq!(split.len(), 2);
let (hi, lo) = (
u64::from_str_radix(split[0], 16).unwrap(),
u64::from_str_radix(split[1], 16).unwrap(),
);
Lsn((hi << 32) | lo)
}
}

impl From<u64> for Lsn {
fn from(lsn_u64: u64) -> Self {
Lsn(lsn_u64)
}
}

impl From<Lsn> for u64 {
fn from(lsn: Lsn) -> u64 {
lsn.0
}
}

impl From<Lsn> for String {
fn from(lsn: Lsn) -> String {
format!("{:X}/{:X}", lsn.0 >> 32, lsn.0 & 0x00000000ffffffff)
}
}

impl fmt::Debug for Lsn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Lsn").field(&String::from(*self)).finish()
}
}
1 change: 1 addition & 0 deletions tokio-postgres/tests/test/main.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ use tokio_postgres::{

mod binary_copy;
mod parse;
mod replication;
#[cfg(feature = "runtime")]
mod runtime;
mod types;
230 changes: 230 additions & 0 deletions tokio-postgres/tests/test/replication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use postgres_protocol::message::backend::ReplicationMessage;
use tokio::stream::StreamExt;
use tokio_postgres::replication_client::ReplicationClient;
use tokio_postgres::Client;
use tokio_postgres::{connect, connect_replication, NoTls, ReplicationMode};

const LOGICAL_BEGIN_TAG: u8 = b'B';
const LOGICAL_COMMIT_TAG: u8 = b'C';
const LOGICAL_INSERT_TAG: u8 = b'I';

// Tests missing for timeline_history(). For a timeline history to be
// available, it requires a point-in-time-recovery or a standby
// promotion; neither of which is done in the current test setup.

// test for:
// - identify_system
// - show
// - slot create/drop
// - physical replication
#[tokio::test]
async fn physical_replication() {
let (sclient, mut rclient) = setup(ReplicationMode::Physical).await;

simple_exec(&sclient, "drop table if exists test_physical_replication").await;
simple_exec(&sclient, "create table test_physical_replication(i int)").await;

let identify_system = rclient.identify_system().await.unwrap();
assert_eq!(identify_system.dbname(), None);
let show_port = rclient.show("port").await.unwrap();
assert_eq!(show_port, "5433");

let slot = "test_physical_slot";
let _ = rclient.drop_replication_slot(slot, false).await.unwrap();
let slotdesc = rclient
.create_physical_replication_slot(slot, false, false)
.await
.unwrap();
assert_eq!(slotdesc.slot_name(), slot);
assert_eq!(slotdesc.snapshot_name(), None);
assert_eq!(slotdesc.output_plugin(), None);

let mut physical_stream = rclient
.start_physical_replication(None, identify_system.xlogpos(), None)
.await
.unwrap();

let _nrows = sclient
.execute("insert into test_physical_replication values(1)", &[])
.await
.unwrap();

let mut got_xlogdata = false;
while let Some(replication_message) = physical_stream.next().await {
if let ReplicationMessage::XLogData(_) = replication_message.unwrap() {
got_xlogdata = true;
break;
}
}

assert!(got_xlogdata);

let response = physical_stream.stop_replication().await.unwrap();
assert!(response.is_none());

// repeat simple command after stream is ended
let show_port = rclient.show("port").await.unwrap();
assert_eq!(show_port, "5433");

simple_exec(&sclient, "drop table if exists test_physical_replication").await;
}

// test for:
// - create/drop slot
// X standby_status_update
// - logical replication
#[tokio::test]
async fn logical_replication() {
let (sclient, mut rclient) = setup(ReplicationMode::Logical).await;

simple_exec(&sclient, "drop table if exists test_logical_replication").await;
simple_exec(&sclient, "drop publication if exists test_logical_pub").await;
simple_exec(&sclient, "create table test_logical_replication(i int)").await;
simple_exec(
&sclient,
"create publication test_logical_pub for table test_logical_replication",
)
.await;

let identify_system = rclient.identify_system().await.unwrap();
assert_eq!(identify_system.dbname().unwrap(), "postgres");

let slot = "test_logical_slot";
let plugin = "pgoutput";
let _ = rclient.drop_replication_slot(slot, false).await.unwrap();
let slotdesc = rclient
.create_logical_replication_slot(slot, false, plugin, None)
.await
.unwrap();
assert_eq!(slotdesc.slot_name(), slot);
assert!(slotdesc.snapshot_name().is_some());
assert_eq!(slotdesc.output_plugin(), Some(plugin));

let xlog_start = identify_system.xlogpos();
let options = &vec![
("proto_version", "1"),
("publication_names", "test_logical_pub"),
];

let mut logical_stream = rclient
.start_logical_replication(slot, xlog_start, options)
.await
.unwrap();

let _nrows = sclient
.execute("insert into test_logical_replication values(1)", &[])
.await
.unwrap();

let mut got_begin = false;
let mut got_insert = false;
let mut got_commit = false;
while let Some(replication_message) = logical_stream.next().await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeff-davis I've been pouring over this PR the past couple of days trying to understand everything and there's one thing I'm not quite connecting. Forgive me, as I'm somewhat new to Rust. Isn't logical_stream.next() going to wait indefinitely until there is some copy data to consume from the server?

The reason I ask is because, if we look at pg_recvlogical, we don't want to wait indefinitely on copy data because there may be a need to send standby data to postgres (e.g. if we've waited some long amount of time in between receiving copy data) so that postgres doesn't kill our replication connection.

Apologies if I'm missing something obvious here.

Copy link

@Venryx Venryx Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very familiar with the rust-postgres codebase (nor Rust in general -- have just started using it), but from what I observed while I was integrating the branch by @petrosagg, is that postgres sends out "keepalive" messages every 20 seconds or so.

Interestingly, on this doc page (under "Primary keepalive message (B)"), it shows that the last bit in that message-type's structure tells the client if postgres detects that the client hasn't communicated in a while, and is nearing the point where it will close the connection.

So what I have my code do at the moment, is just wait for a keepalive message with that "timeout imminent" flag, and then immediately send a response to the server saying "I am still connected". Note that postgres is actually pretty lenient; it gives another 10 or 15 seconds before it actually closes the connection, which is enough time for this "reactionary" approach to be reliable (during the several hours I've tested it anyway).

From what I can tell, this "notify postgres we're still online" feature is not present in this pull-request's code -- so I guess that's something @petrosagg is thinking can be handled by higher level modules/crates. (it might be worth having at least a warning about this behavior in the base rust-postgres library however, eg. if it detects a disconnect, and it can see that it was due to the client not sending any responses during the timeout period, then it could log a warning about it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Venryx unfortunately it is not enough to wait for postgres to send your a keepalive with that bit set in order to send a keepalive, you actually need to proactively be sending them in a timely manner. This is exactly what we do in materialize.

Check out this code https://github.com/MaterializeInc/materialize/blob/main/src/dataflow/src/source/postgres.rs#L303-L329

and this thread https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Venryx unfortunately it is not enough to wait for postgres to send your a keepalive with that bit set in order to send a keepalive, you actually need to proactively be sending them in a timely manner. This is exactly what we do in materialize.

Ah, interesting; thank you for mentioning. (At my project's current scale, I think it will be a long time before there are transactions of a large enough size for queuing to cause problems with the keepalive responder; but it's still a good thing to fix, to not cause issues for other developers [or myself] down the road.)

if let ReplicationMessage::XLogData(msg) = replication_message.unwrap() {
match msg.data()[0] {
LOGICAL_BEGIN_TAG => {
assert!(!got_begin);
assert!(!got_insert);
assert!(!got_commit);
got_begin = true;
}
LOGICAL_INSERT_TAG => {
assert!(got_begin);
assert!(!got_insert);
assert!(!got_commit);
got_insert = true;
}
LOGICAL_COMMIT_TAG => {
assert!(got_begin);
assert!(got_insert);
assert!(!got_commit);
got_commit = true;
break;
}
_ => (),
}
}
}

assert!(got_begin);
assert!(got_insert);
assert!(got_commit);

simple_exec(&sclient, "drop table if exists test_logical_replication").await;
simple_exec(&sclient, "drop publication if exists test_logical_pub").await;
}

// test for base backup
#[tokio::test]
async fn base_backup() {}

// Test that a dropped replication stream properly returns to normal
// command processing in the ReplicationClient.
//
// This test will fail on PostgreSQL server versions earlier than the
// following patch versions: 13.2, 12.6, 11.11, 10.16, 9.6.21,
// 9.5.25. In earlier server versions, there's a bug that prevents
// pipelining requests after the client sends a CopyDone message, but
// before the server replies with a CommandComplete.
//
// Disabled until the patch is more widely available.
// #[tokio::test]
#[allow(dead_code)]
async fn drop_replication_stream() {
let (sclient, mut rclient) = setup(ReplicationMode::Physical).await;

simple_exec(&sclient, "drop table if exists test_drop_stream").await;
simple_exec(&sclient, "create table test_drop_stream(i int)").await;

let identify_system = rclient.identify_system().await.unwrap();
assert_eq!(identify_system.dbname(), None);

let mut physical_stream = rclient
.start_physical_replication(None, identify_system.xlogpos(), None)
.await
.unwrap();

let mut got_xlogdata = false;
while let Some(replication_message) = physical_stream.next().await {
if let ReplicationMessage::XLogData(_) = replication_message.unwrap() {
got_xlogdata = true;
break;
}
}

assert!(got_xlogdata);

drop(physical_stream);

// test that simple command completes after replication stream is dropped
let show_port = rclient.show("port").await.unwrap();
assert_eq!(show_port, "5433");

simple_exec(&sclient, "drop table if exists test_drop_stream").await;
}

async fn setup(mode: ReplicationMode) -> (Client, ReplicationClient) {
let conninfo = "host=127.0.0.1 port=5433 user=postgres";

// form SQL connection
let (sclient, sconnection) = connect(conninfo, NoTls).await.unwrap();
tokio::spawn(async move {
if let Err(e) = sconnection.await {
eprintln!("connection error: {}", e);
}
});

// form replication connection
let (rclient, rconnection) = connect_replication(conninfo, NoTls, mode).await.unwrap();
tokio::spawn(async move {
if let Err(e) = rconnection.await {
eprintln!("connection error: {}", e);
}
});

(sclient, rclient)
}

async fn simple_exec(sclient: &Client, command: &str) {
let _nrows = sclient.execute(command, &[]).await.unwrap();
}