Skip to content

Replication support (#116) #696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docker/sql_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ port = 5433
ssl = on
ssl_cert_file = 'server.crt'
ssl_key_file = 'server.key'
wal_level = logical
EOCONF

cat > "$PGDATA/pg_hba.conf" <<-EOCONF
Expand All @@ -82,6 +83,7 @@ host all ssl_user ::0/0 reject

# IPv4 local connections:
host all postgres 0.0.0.0/0 trust
host replication postgres 0.0.0.0/0 trust
# IPv6 local connections:
host all postgres ::0/0 trust
# Unix socket connections:
Expand Down
146 changes: 146 additions & 0 deletions postgres-protocol/src/message/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::str;

use crate::Oid;

// top-level message tags
pub const PARSE_COMPLETE_TAG: u8 = b'1';
pub const BIND_COMPLETE_TAG: u8 = b'2';
pub const CLOSE_COMPLETE_TAG: u8 = b'3';
Expand All @@ -22,6 +23,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
pub const ERROR_RESPONSE_TAG: u8 = b'E';
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
pub const NO_DATA_TAG: u8 = b'n';
Expand All @@ -33,6 +35,10 @@ pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
pub const ROW_DESCRIPTION_TAG: u8 = b'T';
pub const READY_FOR_QUERY_TAG: u8 = b'Z';

// replication message tags
pub const XLOG_DATA_TAG: u8 = b'w';
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';

#[derive(Debug, Copy, Clone)]
pub struct Header {
tag: u8,
Expand Down Expand Up @@ -93,6 +99,7 @@ pub enum Message {
CopyDone,
CopyInResponse(CopyInResponseBody),
CopyOutResponse(CopyOutResponseBody),
CopyBothResponse(CopyBothResponseBody),
DataRow(DataRowBody),
EmptyQueryResponse,
ErrorResponse(ErrorResponseBody),
Expand Down Expand Up @@ -190,6 +197,16 @@ impl Message {
storage,
})
}
COPY_BOTH_RESPONSE_TAG => {
let format = buf.read_u8()?;
let len = buf.read_u16::<BigEndian>()?;
let storage = buf.read_all();
Message::CopyBothResponse(CopyBothResponseBody {
format,
len,
storage,
})
}
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
BACKEND_KEY_DATA_TAG => {
let process_id = buf.read_i32::<BigEndian>()?;
Expand Down Expand Up @@ -278,6 +295,57 @@ impl Message {
}
}

/// An enum representing Postgres backend replication messages.
#[non_exhaustive]
pub enum ReplicationMessage {
XLogData(XLogDataBody),
PrimaryKeepAlive(PrimaryKeepAliveBody),
}

impl ReplicationMessage {
pub fn parse(bytes: &Bytes) -> io::Result<ReplicationMessage> {
let mut buf = Buffer {
bytes: bytes.clone(),
idx: 0,
};

let tag = buf.read_u8()?;

let replication_message = match tag {
XLOG_DATA_TAG => {
let wal_start = buf.read_u64::<BigEndian>()?;
let wal_end = buf.read_u64::<BigEndian>()?;
let timestamp = buf.read_i64::<BigEndian>()?;
let storage = buf.read_all();
ReplicationMessage::XLogData(XLogDataBody {
wal_start,
wal_end,
timestamp,
storage,
})
}
PRIMARY_KEEPALIVE_TAG => {
let wal_end = buf.read_u64::<BigEndian>()?;
let timestamp = buf.read_i64::<BigEndian>()?;
let reply = buf.read_u8()?;
ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
wal_end,
timestamp,
reply,
})
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown replication message tag `{}`", tag),
));
}
};

Ok(replication_message)
}
}

struct Buffer {
bytes: Bytes,
idx: usize,
Expand Down Expand Up @@ -524,6 +592,27 @@ impl CopyOutResponseBody {
}
}

pub struct CopyBothResponseBody {
storage: Bytes,
len: u16,
format: u8,
}

impl CopyBothResponseBody {
#[inline]
pub fn format(&self) -> u8 {
self.format
}

#[inline]
pub fn column_formats(&self) -> ColumnFormats<'_> {
ColumnFormats {
remaining: self.len,
buf: &self.storage,
}
}
}

pub struct DataRowBody {
storage: Bytes,
len: u16,
Expand Down Expand Up @@ -776,6 +865,63 @@ impl RowDescriptionBody {
}
}

pub struct XLogDataBody {
wal_start: u64,
wal_end: u64,
timestamp: i64,
storage: Bytes,
}

impl XLogDataBody {
#[inline]
pub fn wal_start(&self) -> u64 {
self.wal_start
}

#[inline]
pub fn wal_end(&self) -> u64 {
self.wal_end
}

#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}

#[inline]
pub fn data(&self) -> &[u8] {
&self.storage
}

#[inline]
pub fn into_bytes(self) -> Bytes {
self.storage
}
}

pub struct PrimaryKeepAliveBody {
wal_end: u64,
timestamp: i64,
reply: u8,
}

impl PrimaryKeepAliveBody {
#[inline]
pub fn wal_end(&self) -> u64 {
self.wal_end
}

#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}

#[inline]
pub fn reply(&self) -> u8 {
self.reply
}
}

pub struct Fields<'a> {
buf: &'a [u8],
remaining: u16,
Expand Down
42 changes: 42 additions & 0 deletions postgres-protocol/src/message/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,48 @@ pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
})
}

#[inline]
pub fn standby_status_update(
write_lsn: u64,
flush_lsn: u64,
apply_lsn: u64,
timestamp: i64,
reply: u8,
buf: &mut BytesMut,
) -> io::Result<()> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'r');
buf.put_u64(write_lsn);
buf.put_u64(flush_lsn);
buf.put_u64(apply_lsn);
buf.put_i64(timestamp);
buf.put_u8(reply);
Ok(())
})
}

#[inline]
pub fn hot_standby_feedback(
timestamp: i64,
global_xmin: u32,
global_xmin_epoch: u32,
catalog_xmin: u32,
catalog_xmin_epoch: u32,
buf: &mut BytesMut,
) -> io::Result<()> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'h');
buf.put_i64(timestamp);
buf.put_u32(global_xmin);
buf.put_u32(global_xmin_epoch);
buf.put_u32(catalog_xmin);
buf.put_u32(catalog_xmin_epoch);
Ok(())
})
}

pub struct CopyData<T> {
buf: T,
len: i32,
Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ futures = "0.3"
log = "0.4"
parking_lot = "0.11"
percent-encoding = "2.0"
pin-project-lite = "0.2"
pin-project = "1.0"
phf = "0.8"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-types = { version = "0.1.2", path = "../postgres-types" }
Expand Down
36 changes: 17 additions & 19 deletions tokio-postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{slice_iter, CopyInSink, CopyOutStream, Error};
use byteorder::{BigEndian, ByteOrder};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::{ready, SinkExt, Stream};
use pin_project_lite::pin_project;
use pin_project::pin_project;
use postgres_types::BorrowToSql;
use std::convert::TryFrom;
use std::io;
Expand All @@ -18,16 +18,15 @@ use std::task::{Context, Poll};
const MAGIC: &[u8] = b"PGCOPY\n\xff\r\n\0";
const HEADER_LEN: usize = MAGIC.len() + 4 + 4;

pin_project! {
/// A type which serializes rows into the PostgreSQL binary copy format.
///
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
pub struct BinaryCopyInWriter {
#[pin]
sink: CopyInSink<Bytes>,
types: Vec<Type>,
buf: BytesMut,
}
/// A type which serializes rows into the PostgreSQL binary copy format.
///
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
#[pin_project]
pub struct BinaryCopyInWriter {
#[pin]
sink: CopyInSink<Bytes>,
types: Vec<Type>,
buf: BytesMut,
}

impl BinaryCopyInWriter {
Expand Down Expand Up @@ -115,14 +114,13 @@ struct Header {
has_oids: bool,
}

pin_project! {
/// A stream of rows deserialized from the PostgreSQL binary copy format.
pub struct BinaryCopyOutStream {
#[pin]
stream: CopyOutStream,
types: Arc<Vec<Type>>,
header: Option<Header>,
}
/// A stream of rows deserialized from the PostgreSQL binary copy format.
#[pin_project]
pub struct BinaryCopyOutStream {
#[pin]
stream: CopyOutStream,
types: Arc<Vec<Type>>,
header: Option<Header>,
}

impl BinaryCopyOutStream {
Expand Down
20 changes: 19 additions & 1 deletion tokio-postgres/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ pub struct InnerClient {
impl InnerClient {
pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request { messages, sender };
let request = Request {
messages: messages,
sender: Some(sender),
};
self.sender
.unbounded_send(request)
.map_err(|_| Error::closed())?;
Expand All @@ -81,6 +84,21 @@ impl InnerClient {
})
}

// Send a message for the existing entry in the pipeline; don't
// create a new entry in the pipeline. This is needed for CopyBoth
// mode (i.e. streaming replication), where the client may send a
// new message that is part of the existing request.
pub fn unpipelined_send(&self, messages: RequestMessages) -> Result<(), Error> {
let request = Request {
messages: messages,
sender: None,
};
self.sender
.unbounded_send(request)
.map_err(|_| Error::closed())?;
Ok(())
}

pub fn typeinfo(&self) -> Option<Statement> {
self.state.lock().typeinfo.clone()
}
Expand Down
Loading