- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 487
Replication support (#116) #696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jeff-davis
wants to merge
24
commits into
sfackler:master
Choose a base branch
from
jeff-davis:replication
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,489
−70
Open
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
b2c5723
Image configuration updates.
jeff-davis 015b29e
Change from pin-project-lite to pin-project.
jeff-davis 2d66934
Add InnerClient::unpipelined_send() method.
jeff-davis d641fb6
Make simple_query::encode() pub(crate).
jeff-davis 65edbb8
Protocol
jeff-davis 9554473
Lsn type.
jeff-davis e753b8a
Connection string config for replication.
jeff-davis cf0f1ea
Replication Support.
jeff-davis b59de53
fixup warnings
jeff-davis 3ccd6b1
fixup replication
jeff-davis 4e00b85
fix error handling for stop replication
jeff-davis e33773c
fixup client.rs connection.rs
jeff-davis 7e33d0a
test improvements
jeff-davis 72fef37
Improve docs
jeff-davis daee4a5
Doc improvements.
jeff-davis 6add101
more fixups
jeff-davis c029d72
documentation and timeline_history types.
jeff-davis fbdddf7
Comments and WIP start replication tuple.
jeff-davis 6683d1f
handle stop_replication with optional tuple
jeff-davis bfff8bf
remove unnecessary messages
jeff-davis a2c2e19
add Debug trait for Lsn
jeff-davis 59ea058
more doc improvements and cleanup
jeff-davis 27c9cab
fixup unexpected_message
jeff-davis 8e256f8
rustfmt run
jeff-davis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
use postgres_protocol::message::backend::ReplicationMessage; | ||
use tokio::stream::StreamExt; | ||
use tokio_postgres::replication_client::ReplicationClient; | ||
use tokio_postgres::Client; | ||
use tokio_postgres::{connect, connect_replication, NoTls, ReplicationMode}; | ||
|
||
const LOGICAL_BEGIN_TAG: u8 = b'B'; | ||
const LOGICAL_COMMIT_TAG: u8 = b'C'; | ||
const LOGICAL_INSERT_TAG: u8 = b'I'; | ||
|
||
// Tests missing for timeline_history(). For a timeline history to be | ||
// available, it requires a point-in-time-recovery or a standby | ||
// promotion; neither of which is done in the current test setup. | ||
|
||
// test for: | ||
// - identify_system | ||
// - show | ||
// - slot create/drop | ||
// - physical replication | ||
#[tokio::test] | ||
async fn physical_replication() { | ||
let (sclient, mut rclient) = setup(ReplicationMode::Physical).await; | ||
|
||
simple_exec(&sclient, "drop table if exists test_physical_replication").await; | ||
simple_exec(&sclient, "create table test_physical_replication(i int)").await; | ||
|
||
let identify_system = rclient.identify_system().await.unwrap(); | ||
assert_eq!(identify_system.dbname(), None); | ||
let show_port = rclient.show("port").await.unwrap(); | ||
assert_eq!(show_port, "5433"); | ||
|
||
let slot = "test_physical_slot"; | ||
let _ = rclient.drop_replication_slot(slot, false).await.unwrap(); | ||
let slotdesc = rclient | ||
.create_physical_replication_slot(slot, false, false) | ||
.await | ||
.unwrap(); | ||
assert_eq!(slotdesc.slot_name(), slot); | ||
assert_eq!(slotdesc.snapshot_name(), None); | ||
assert_eq!(slotdesc.output_plugin(), None); | ||
|
||
let mut physical_stream = rclient | ||
.start_physical_replication(None, identify_system.xlogpos(), None) | ||
.await | ||
.unwrap(); | ||
|
||
let _nrows = sclient | ||
.execute("insert into test_physical_replication values(1)", &[]) | ||
.await | ||
.unwrap(); | ||
|
||
let mut got_xlogdata = false; | ||
while let Some(replication_message) = physical_stream.next().await { | ||
if let ReplicationMessage::XLogData(_) = replication_message.unwrap() { | ||
got_xlogdata = true; | ||
break; | ||
} | ||
} | ||
|
||
assert!(got_xlogdata); | ||
|
||
let response = physical_stream.stop_replication().await.unwrap(); | ||
assert!(response.is_none()); | ||
|
||
// repeat simple command after stream is ended | ||
let show_port = rclient.show("port").await.unwrap(); | ||
assert_eq!(show_port, "5433"); | ||
|
||
simple_exec(&sclient, "drop table if exists test_physical_replication").await; | ||
} | ||
|
||
// test for: | ||
// - create/drop slot | ||
// X standby_status_update | ||
// - logical replication | ||
#[tokio::test] | ||
async fn logical_replication() { | ||
let (sclient, mut rclient) = setup(ReplicationMode::Logical).await; | ||
|
||
simple_exec(&sclient, "drop table if exists test_logical_replication").await; | ||
simple_exec(&sclient, "drop publication if exists test_logical_pub").await; | ||
simple_exec(&sclient, "create table test_logical_replication(i int)").await; | ||
simple_exec( | ||
&sclient, | ||
"create publication test_logical_pub for table test_logical_replication", | ||
) | ||
.await; | ||
|
||
let identify_system = rclient.identify_system().await.unwrap(); | ||
assert_eq!(identify_system.dbname().unwrap(), "postgres"); | ||
|
||
let slot = "test_logical_slot"; | ||
let plugin = "pgoutput"; | ||
let _ = rclient.drop_replication_slot(slot, false).await.unwrap(); | ||
let slotdesc = rclient | ||
.create_logical_replication_slot(slot, false, plugin, None) | ||
.await | ||
.unwrap(); | ||
assert_eq!(slotdesc.slot_name(), slot); | ||
assert!(slotdesc.snapshot_name().is_some()); | ||
assert_eq!(slotdesc.output_plugin(), Some(plugin)); | ||
|
||
let xlog_start = identify_system.xlogpos(); | ||
let options = &vec![ | ||
("proto_version", "1"), | ||
("publication_names", "test_logical_pub"), | ||
]; | ||
|
||
let mut logical_stream = rclient | ||
.start_logical_replication(slot, xlog_start, options) | ||
.await | ||
.unwrap(); | ||
|
||
let _nrows = sclient | ||
.execute("insert into test_logical_replication values(1)", &[]) | ||
.await | ||
.unwrap(); | ||
|
||
let mut got_begin = false; | ||
let mut got_insert = false; | ||
let mut got_commit = false; | ||
while let Some(replication_message) = logical_stream.next().await { | ||
if let ReplicationMessage::XLogData(msg) = replication_message.unwrap() { | ||
match msg.data()[0] { | ||
LOGICAL_BEGIN_TAG => { | ||
assert!(!got_begin); | ||
assert!(!got_insert); | ||
assert!(!got_commit); | ||
got_begin = true; | ||
} | ||
LOGICAL_INSERT_TAG => { | ||
assert!(got_begin); | ||
assert!(!got_insert); | ||
assert!(!got_commit); | ||
got_insert = true; | ||
} | ||
LOGICAL_COMMIT_TAG => { | ||
assert!(got_begin); | ||
assert!(got_insert); | ||
assert!(!got_commit); | ||
got_commit = true; | ||
break; | ||
} | ||
_ => (), | ||
} | ||
} | ||
} | ||
|
||
assert!(got_begin); | ||
assert!(got_insert); | ||
assert!(got_commit); | ||
|
||
simple_exec(&sclient, "drop table if exists test_logical_replication").await; | ||
simple_exec(&sclient, "drop publication if exists test_logical_pub").await; | ||
} | ||
|
||
// test for base backup | ||
#[tokio::test] | ||
async fn base_backup() {} | ||
|
||
// Test that a dropped replication stream properly returns to normal | ||
// command processing in the ReplicationClient. | ||
// | ||
// This test will fail on PostgreSQL server versions earlier than the | ||
// following patch versions: 13.2, 12.6, 11.11, 10.16, 9.6.21, | ||
// 9.5.25. In earlier server versions, there's a bug that prevents | ||
// pipelining requests after the client sends a CopyDone message, but | ||
// before the server replies with a CommandComplete. | ||
// | ||
// Disabled until the patch is more widely available. | ||
// #[tokio::test] | ||
#[allow(dead_code)] | ||
async fn drop_replication_stream() { | ||
let (sclient, mut rclient) = setup(ReplicationMode::Physical).await; | ||
|
||
simple_exec(&sclient, "drop table if exists test_drop_stream").await; | ||
simple_exec(&sclient, "create table test_drop_stream(i int)").await; | ||
|
||
let identify_system = rclient.identify_system().await.unwrap(); | ||
assert_eq!(identify_system.dbname(), None); | ||
|
||
let mut physical_stream = rclient | ||
.start_physical_replication(None, identify_system.xlogpos(), None) | ||
.await | ||
.unwrap(); | ||
|
||
let mut got_xlogdata = false; | ||
while let Some(replication_message) = physical_stream.next().await { | ||
if let ReplicationMessage::XLogData(_) = replication_message.unwrap() { | ||
got_xlogdata = true; | ||
break; | ||
} | ||
} | ||
|
||
assert!(got_xlogdata); | ||
|
||
drop(physical_stream); | ||
|
||
// test that simple command completes after replication stream is dropped | ||
let show_port = rclient.show("port").await.unwrap(); | ||
assert_eq!(show_port, "5433"); | ||
|
||
simple_exec(&sclient, "drop table if exists test_drop_stream").await; | ||
} | ||
|
||
async fn setup(mode: ReplicationMode) -> (Client, ReplicationClient) { | ||
let conninfo = "host=127.0.0.1 port=5433 user=postgres"; | ||
|
||
// form SQL connection | ||
let (sclient, sconnection) = connect(conninfo, NoTls).await.unwrap(); | ||
tokio::spawn(async move { | ||
if let Err(e) = sconnection.await { | ||
eprintln!("connection error: {}", e); | ||
} | ||
}); | ||
|
||
// form replication connection | ||
let (rclient, rconnection) = connect_replication(conninfo, NoTls, mode).await.unwrap(); | ||
tokio::spawn(async move { | ||
if let Err(e) = rconnection.await { | ||
eprintln!("connection error: {}", e); | ||
} | ||
}); | ||
|
||
(sclient, rclient) | ||
} | ||
|
||
async fn simple_exec(sclient: &Client, command: &str) { | ||
let _nrows = sclient.execute(command, &[]).await.unwrap(); | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jeff-davis I've been pouring over this PR the past couple of days trying to understand everything and there's one thing I'm not quite connecting. Forgive me, as I'm somewhat new to Rust. Isn't
logical_stream.next()
going to wait indefinitely until there is some copy data to consume from the server?The reason I ask is because, if we look at pg_recvlogical, we don't want to wait indefinitely on copy data because there may be a need to send standby data to postgres (e.g. if we've waited some long amount of time in between receiving copy data) so that postgres doesn't kill our replication connection.
Apologies if I'm missing something obvious here.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very familiar with the
rust-postgres
codebase (nor Rust in general -- have just started using it), but from what I observed while I was integrating the branch by @petrosagg, is that postgres sends out "keepalive" messages every 20 seconds or so.Interestingly, on this doc page (under "Primary keepalive message (B)"), it shows that the last bit in that message-type's structure tells the client if postgres detects that the client hasn't communicated in a while, and is nearing the point where it will close the connection.
So what I have my code do at the moment, is just wait for a keepalive message with that "timeout imminent" flag, and then immediately send a response to the server saying "I am still connected". Note that postgres is actually pretty lenient; it gives another 10 or 15 seconds before it actually closes the connection, which is enough time for this "reactionary" approach to be reliable (during the several hours I've tested it anyway).
From what I can tell, this "notify postgres we're still online" feature is not present in this pull-request's code -- so I guess that's something @petrosagg is thinking can be handled by higher level modules/crates. (it might be worth having at least a warning about this behavior in the base
rust-postgres
library however, eg. if it detects a disconnect, and it can see that it was due to the client not sending any responses during the timeout period, then it could log a warning about it)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Venryx unfortunately it is not enough to wait for postgres to send your a keepalive with that bit set in order to send a keepalive, you actually need to proactively be sending them in a timely manner. This is exactly what we do in materialize.
Check out this code https://github.com/MaterializeInc/materialize/blob/main/src/dataflow/src/source/postgres.rs#L303-L329
and this thread https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, interesting; thank you for mentioning. (At my project's current scale, I think it will be a long time before there are transactions of a large enough size for queuing to cause problems with the keepalive responder; but it's still a good thing to fix, to not cause issues for other developers [or myself] down the road.)