-
-
Notifications
You must be signed in to change notification settings - Fork 486
implement support for streaming replication WIP (for #116) #652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ae8b799
2780b2e
e016a04
24e540b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
FROM postgres:12 | ||
|
||
COPY sql_setup.sh /docker-entrypoint-initdb.d/ | ||
RUN apt-get update && apt-get install postgresql-12-wal2json |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ use crate::config::{Host, SslMode}; | |
use crate::connection::{Request, RequestMessages}; | ||
use crate::copy_out::CopyOutStream; | ||
use crate::query::RowStream; | ||
use crate::replication::ReplicationStream; | ||
use crate::simple_query::SimpleQueryStream; | ||
#[cfg(feature = "runtime")] | ||
use crate::tls::MakeTlsConnect; | ||
|
@@ -11,8 +12,9 @@ use crate::types::{Oid, ToSql, Type}; | |
#[cfg(feature = "runtime")] | ||
use crate::Socket; | ||
use crate::{ | ||
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error, | ||
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder, | ||
copy_in, copy_out, prepare, query, replication, simple_query, slice_iter, CancelToken, | ||
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction, | ||
TransactionBuilder, | ||
}; | ||
use bytes::{Buf, BytesMut}; | ||
use fallible_iterator::FallibleIterator; | ||
|
@@ -433,6 +435,26 @@ impl Client { | |
copy_out::copy_out(self.inner(), statement).await | ||
} | ||
|
||
/// Executes a 'START_REPLICATION SLOT ...', returning a stream of raw replication events | ||
pub async fn start_replication(&self, query: &str) -> Result<ReplicationStream, Error> { | ||
replication::start_replication(self.inner(), query).await | ||
} | ||
|
||
/// Stoppes the current replication by sending a copy_done message | ||
pub async fn stop_replication(&self) -> Result<(), Error> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be handled on the stream type like it is for copy_in. |
||
replication::stop_replication(self.inner()).await | ||
} | ||
|
||
/// Notifies PostgreSQL of the last processed WAL | ||
pub async fn standby_status_update( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is sent as part of the CopyBoth stream, not at the top level: https://www.postgresql.org/docs/13/protocol-replication.html. |
||
&self, | ||
write_lsn: i64, | ||
flush_lsn: i64, | ||
apply_lsn: i64, | ||
) -> Result<(), Error> { | ||
replication::standby_status_update(self.inner(), write_lsn, flush_lsn, apply_lsn).await | ||
} | ||
|
||
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows. | ||
/// | ||
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,6 +159,7 @@ pub struct Config { | |
pub(crate) keepalives_idle: Duration, | ||
pub(crate) target_session_attrs: TargetSessionAttrs, | ||
pub(crate) channel_binding: ChannelBinding, | ||
pub(crate) replication: Option<String>, | ||
} | ||
|
||
impl Default for Config { | ||
|
@@ -184,6 +185,7 @@ impl Config { | |
keepalives_idle: Duration::from_secs(2 * 60 * 60), | ||
target_session_attrs: TargetSessionAttrs::Any, | ||
channel_binding: ChannelBinding::Prefer, | ||
replication: None, | ||
} | ||
} | ||
|
||
|
@@ -387,6 +389,17 @@ impl Config { | |
self.channel_binding | ||
} | ||
|
||
/// TODO! | ||
pub fn replication(&mut self, replication: &str) -> &mut Config { | ||
self.replication = Some(replication.to_string()); | ||
self | ||
} | ||
|
||
/// TODO! | ||
pub fn get_replication(&self) -> Option<&str> { | ||
self.replication.as_deref() | ||
} | ||
|
||
fn param(&mut self, key: &str, value: &str) -> Result<(), Error> { | ||
match key { | ||
"user" => { | ||
|
@@ -476,6 +489,9 @@ impl Config { | |
}; | ||
self.channel_binding(channel_binding); | ||
} | ||
"replication" => { | ||
self.replication(&value); | ||
} | ||
key => { | ||
return Err(Error::config_parse(Box::new(UnknownOption( | ||
key.to_string(), | ||
|
@@ -548,6 +564,7 @@ impl fmt::Debug for Config { | |
.field("keepalives_idle", &self.keepalives_idle) | ||
.field("target_session_attrs", &self.target_session_attrs) | ||
.field("channel_binding", &self.channel_binding) | ||
.field("replication", &self.replication) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of making this an ad-hoc parameter, should we make this a different connection type? That would add better compile-time checking that you decide what you are going to use the connection for (normal, physical replication, or logical replication). |
||
.finish() | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
use crate::client::{InnerClient, Responses}; | ||
use crate::codec::FrontendMessage; | ||
use crate::connection::RequestMessages; | ||
use crate::{simple_query, Error}; | ||
use bytes::{Bytes, BytesMut}; | ||
use futures::{ready, Stream}; | ||
use log::trace; | ||
use pin_project_lite::pin_project; | ||
use postgres_protocol::message::backend::Message; | ||
use postgres_protocol::message::frontend; | ||
use std::marker::PhantomPinned; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
use std::time::{SystemTime, UNIX_EPOCH}; | ||
const J2000_EPOCH_GAP: u128 = 946_684_800_000_000; | ||
pub async fn start_replication( | ||
client: &InnerClient, | ||
query: &str, | ||
) -> Result<ReplicationStream, Error> { | ||
trace!("executing start replication query {}", query); | ||
|
||
let buf = simple_query::encode(client, query)?; | ||
let responses = start(client, buf).await?; | ||
Ok(ReplicationStream { | ||
responses, | ||
_p: PhantomPinned, | ||
}) | ||
} | ||
|
||
pub async fn stop_replication(client: &InnerClient) -> Result<(), Error> { | ||
trace!("executing stop replication"); | ||
let mut buf = BytesMut::new(); | ||
frontend::copy_done(&mut buf); | ||
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?; | ||
Ok(()) | ||
} | ||
|
||
pub async fn standby_status_update( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you consider also adding support for:
(I'm not the crate maintainer so don't take these as blockers. It might be fine to do those in later PRs after the basic support is in.) |
||
client: &InnerClient, | ||
write_lsn: i64, | ||
flush_lsn: i64, | ||
apply_lsn: i64, | ||
) -> Result<(), Error> { | ||
trace!("executing standby_status_update"); | ||
let now = SystemTime::now() | ||
.duration_since(UNIX_EPOCH) | ||
.unwrap() | ||
.as_micros() | ||
- J2000_EPOCH_GAP; | ||
let mut buf = BytesMut::new(); | ||
let _ = frontend::standby_status_update(write_lsn, flush_lsn, apply_lsn, now as i64, &mut buf); | ||
let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?; | ||
Ok(()) | ||
} | ||
|
||
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> { | ||
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?; | ||
trace!("start in repication"); | ||
|
||
match responses.next().await? { | ||
Message::CopyBothResponse(_) => {} | ||
_ => return Err(Error::unexpected_message()), | ||
} | ||
|
||
Ok(responses) | ||
} | ||
|
||
pin_project! { | ||
/// A stream of `START_REPLICATION` query data. | ||
pub struct ReplicationStream { | ||
responses: Responses, | ||
#[pin] | ||
_p: PhantomPinned, | ||
} | ||
} | ||
|
||
impl Stream for ReplicationStream { | ||
type Item = Result<Bytes, Error>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest an enum here that describes the messages coming back at least as far as the protocol defines them, e.g. At least for now, we don't need to parse the WAL data itself, but we can represent all of the information that the protocol defines. |
||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
let this = self.project(); | ||
|
||
match ready!(this.responses.poll_next(cx)?) { | ||
Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))), | ||
Message::CopyDone => Poll::Ready(None), | ||
_ => Poll::Ready(Some(Err(Error::unexpected_message()))), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have separate
LogicalReplicationStream
andPhysicalReplicationStream
?