-
-
Notifications
You must be signed in to change notification settings - Fork 486
implement support for streaming replication WIP (for #116) #652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
does the PR have any hope of getting to a point of being good and merged or should I abandon it :)? |
8587850
to
24e540b
Compare
Hello, @ruslantalpa! Thank you for the PR, it is much better than just "do it for us, please"! Some thoughts to it… I. The II. The LSN (Log Sequence Number) is u64 rather than i64 (see III. From my understanding[1] there is no "stop replication". The "START_REPLICATION" change the connection mode until disconnection. IV.
It is up to @sfackler, but I would like to see API which do not allow to make a mistake: it could be better if V. May the VI.
I would return an object similar to VII.
You receive data from a plugin. It can send empty messages. Each time DB detects any successful commit (in WAL) it calls a callback with changed rows (one call per row) and then "commit" callback which may not detect there was no rows for it. — Supporting logical replication is an amazing feature for me, I can publish changes on top of your commits (it is better to squash them in a single one and rebase to the current master) but I do not know how to add my commit to this PR. —
It seems the protocol allows to stop copying and do something else but I do not think it is a real-world case. /*
* If we already received a CopyDone from the frontend, the frontend
* should not send us anything until we've closed our end of the COPY.
* XXX: In theory, the frontend could already send the next command
* before receiving the CopyDone, but libpq doesn't currently allow
* that.
*/
if (streamingDoneReceiving && firstchar != 'X')
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
firstchar))); |
i edited my original message in this PR to reflect things i "fixed/figured out". |
} | ||
|
||
/// Stoppes the current replication by sending a copy_done message | ||
pub async fn stop_replication(&self) -> Result<(), Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be handled on the stream type like it is for copy_in.
} | ||
|
||
/// Notifies PostgreSQL of the last processed WAL | ||
pub async fn standby_status_update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is sent as part of the CopyBoth stream, not at the top level: https://www.postgresql.org/docs/13/protocol-replication.html.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had just started hacking on this problem today, and then I found your PR. Thanks for working on this!
} | ||
|
||
impl Stream for ReplicationStream { | ||
type Item = Result<Bytes, Error>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest an enum here that describes the messages coming back at least as far as the protocol defines them, e.g. XLogData
and PrimaryKeepAlive
.
At least for now, we don't need to parse the WAL data itself, but we can represent all of the information that the protocol defines.
Ok(()) | ||
} | ||
|
||
pub async fn standby_status_update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider also adding support for:
- identify_system
- base_backup
- hot standby feedback
- creating/dropping slots
- timeline history
- show
(I'm not the crate maintainer so don't take these as blockers. It might be fine to do those in later PRs after the basic support is in.)
@@ -3,6 +3,7 @@ use crate::config::{Host, SslMode}; | |||
use crate::connection::{Request, RequestMessages}; | |||
use crate::copy_out::CopyOutStream; | |||
use crate::query::RowStream; | |||
use crate::replication::ReplicationStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have separate LogicalReplicationStream
and PhysicalReplicationStream
?
@@ -548,6 +564,7 @@ impl fmt::Debug for Config { | |||
.field("keepalives_idle", &self.keepalives_idle) | |||
.field("target_session_attrs", &self.target_session_attrs) | |||
.field("channel_binding", &self.channel_binding) | |||
.field("replication", &self.replication) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making this an ad-hoc parameter, should we make this a different connection type? That would add better compile-time checking that you decide what you are going to use the connection for (normal, physical replication, or logical replication).
Just to put some code behind my ideas, I created #696 . It's not meant to replace this PR, but you can have a look. |
Closing in favor of #778 |
so this is how far I've gotten and i wanted to check if the direction is right (before cleaning the code, writing some docs and a better test).
The questions I have are (since i don't know what i am really doing here :))):
in the test you'll see that i do 3 inserts but receive 7 events back from replication, are there some messages that should be skipped and not returned?when i do astop_replication
call, pg also logs an errorunexpected standby message type "S", after receiving CopyDone
any idea where that comes from?