Skip to content

implement support for streaming replication WIP (for #116) #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

ruslantalpa
Copy link
Contributor

@ruslantalpa ruslantalpa commented Sep 5, 2020

so this is how far I've gotten and i wanted to check if the direction is right (before cleaning the code, writing some docs and a better test).

The questions I have are (since i don't know what i am really doing here :))):

  • is this the way you think it should be implemented from the interface point of view?
  • i know you said to implement a Sync and i know the protocol is in "copy_both" state but why is it needed, what would one send in the sync back to PG, besides stop replication?
  • right now the output is raw messages, is this ok (leave the parsing to the user since i think the format of the body is dependent on the plugin) or should there be additional parsing of the body?
  • in the test you'll see that i do 3 inserts but receive 7 events back from replication, are there some messages that should be skipped and not returned?
  • when i do a stop_replication call, pg also logs an error unexpected standby message type "S", after receiving CopyDone any idea where that comes from?

@ruslantalpa
Copy link
Contributor Author

does the PR have any hope of getting to a point of being good and merged or should I abandon it :)?

@ruslantalpa ruslantalpa force-pushed the streaming_replication branch from 8587850 to 24e540b Compare October 8, 2020 08:44
@vitaly-burovoy
Copy link

Hello, @ruslantalpa!

Thank you for the PR, it is much better than just "do it for us, please"!

Some thoughts to it…

I. The replication parameter in Config should not be a string since allowed values are only boolean (all its string variants: on, true, 1) for physical and "database" for logical replication:
https://www.postgresql.org/docs/12/protocol-replication.html
I.e. it is a triple-state parameter: none, physical and logical.

II. The LSN (Log Sequence Number) is u64 rather than i64 (see fn standby_status_update(…)) because it is "a byte position" which can not be negative.
https://www.postgresql.org/docs/current/datatype-pg-lsn.html

III. From my understanding[1] there is no "stop replication". The "START_REPLICATION" change the connection mode until disconnection.

IV.

is this the way you think it should be implemented from the interface point of view?

It is up to @sfackler, but I would like to see API which do not allow to make a mistake: it could be better if start_replication consumes an InnerClient and returns a ReplicationStream and a proxy struct which only allows to send "standby_status_update" and close the connection because nothing (except Standby status update/Hot Standby feedback message) should be sent: my experiments show the connection just hangs if I send something even with a simple_query command.

V. May the start_replication create its own "START_REPLICATION SLOT xxx YYYY ..." string where YYYY can be "LOGICAL" or "PHYSICAL" because we can know it from the Config.replication param? But it is up to @sfackler (he mentioned the crate should be as simple as it can).

VI.

right now the output is raw messages, is this ok…?

I would return an object similar to Row (from the RowStream) like XLogRow with attributes from the received message to be used in replies via the standby_status_update.

VII.

in the test you'll see that i do 3 inserts but receive 7 events back from replication

You receive data from a plugin. It can send empty messages. Each time DB detects any successful commit (in WAL) it calls a callback with changed rows (one call per row) and then "commit" callback which may not detect there was no rows for it.

Supporting logical replication is an amazing feature for me, I can publish changes on top of your commits (it is better to squash them in a single one and rebase to the current master) but I do not know how to add my commit to this PR.


[1]

when i do a stop_replication call, pg also logs an error unexpected standby message type "S", after receiving CopyDone any idea where that comes from?

It seems the protocol allows to stop copying and do something else but I do not think it is a real-world case.
Also the answer to you question is in the PG code ('X' is Terminate command):
https://git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/replication/walsender.c;h=6d959f9ecf8dc056bb4da5a318825b3a755ecc86;hb=29be9983a64c011eac0b9ee29895cce71e15ea77#l1734

/*
 * If we already received a CopyDone from the frontend, the frontend
 * should not send us anything until we've closed our end of the COPY.
 * XXX: In theory, the frontend could already send the next command
 * before receiving the CopyDone, but libpq doesn't currently allow
 * that.
 */
if (streamingDoneReceiving && firstchar != 'X')
    ereport(FATAL,
            (errcode(ERRCODE_PROTOCOL_VIOLATION),
             errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
                    firstchar)));

@ruslantalpa
Copy link
Contributor Author

i edited my original message in this PR to reflect things i "fixed/figured out".
Since then, also, i am using this code for my needs and it seems to work well

}

/// Stoppes the current replication by sending a copy_done message
pub async fn stop_replication(&self) -> Result<(), Error> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be handled on the stream type like it is for copy_in.

}

/// Notifies PostgreSQL of the last processed WAL
pub async fn standby_status_update(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sent as part of the CopyBoth stream, not at the top level: https://www.postgresql.org/docs/13/protocol-replication.html.

Copy link
Contributor

@jeff-davis jeff-davis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had just started hacking on this problem today, and then I found your PR. Thanks for working on this!

}

impl Stream for ReplicationStream {
type Item = Result<Bytes, Error>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest an enum here that describes the messages coming back at least as far as the protocol defines them, e.g. XLogData and PrimaryKeepAlive.

At least for now, we don't need to parse the WAL data itself, but we can represent all of the information that the protocol defines.

Ok(())
}

pub async fn standby_status_update(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider also adding support for:

  • identify_system
  • base_backup
  • hot standby feedback
  • creating/dropping slots
  • timeline history
  • show

(I'm not the crate maintainer so don't take these as blockers. It might be fine to do those in later PRs after the basic support is in.)

@@ -3,6 +3,7 @@ use crate::config::{Host, SslMode};
use crate::connection::{Request, RequestMessages};
use crate::copy_out::CopyOutStream;
use crate::query::RowStream;
use crate::replication::ReplicationStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have separate LogicalReplicationStream and PhysicalReplicationStream?

@@ -548,6 +564,7 @@ impl fmt::Debug for Config {
.field("keepalives_idle", &self.keepalives_idle)
.field("target_session_attrs", &self.target_session_attrs)
.field("channel_binding", &self.channel_binding)
.field("replication", &self.replication)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making this an ad-hoc parameter, should we make this a different connection type? That would add better compile-time checking that you decide what you are going to use the connection for (normal, physical replication, or logical replication).

@jeff-davis
Copy link
Contributor

Just to put some code behind my ideas, I created #696 . It's not meant to replace this PR, but you can have a look.

@ruslantalpa
Copy link
Contributor Author

Closing in favor of #778

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants