Skip to content

Commit 7087ada

Browse files
committed
standby status update
1 parent cdb33c1 commit 7087ada

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

postgres-protocol/src/message/frontend.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,19 +134,19 @@ pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
134134

135135
#[inline]
136136
pub fn standby_status_update(
137-
write_lsn: i64,
138-
flush_lsn: i64,
139-
apply_lsn: i64,
137+
write_lsn: u64,
138+
flush_lsn: u64,
139+
apply_lsn: u64,
140140
timestamp: i64,
141141
reply: u8,
142142
buf: &mut BytesMut,
143143
) -> io::Result<()> {
144144
buf.put_u8(b'd');
145145
write_body(buf, |buf| {
146146
buf.put_u8(b'r');
147-
buf.put_i64(write_lsn + 1);
148-
buf.put_i64(flush_lsn + 1);
149-
buf.put_i64(apply_lsn + 1);
147+
buf.put_u64(write_lsn);
148+
buf.put_u64(flush_lsn);
149+
buf.put_u64(apply_lsn);
150150
buf.put_i64(timestamp);
151151
buf.put_u8(reply);
152152
Ok(())

tokio-postgres/src/replication_client.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@ use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
44
use crate::types::{Lsn, Type};
55
use crate::{simple_query, Client, Error};
6-
use bytes::Bytes;
6+
use bytes::{Bytes, BytesMut};
77
use fallible_iterator::FallibleIterator;
88
use futures::{ready, Stream};
99
use pin_project_lite::pin_project;
1010
use postgres_protocol::escape::{escape_identifier, escape_literal};
1111
use postgres_protocol::message::backend::{Message, ReplicationMessage};
12+
use postgres_protocol::message::frontend;
1213
use std::io;
1314
use std::marker::PhantomPinned;
1415
use std::pin::Pin;
@@ -213,6 +214,29 @@ impl ReplicationClient {
213214
_p: PhantomPinned,
214215
})
215216
}
217+
218+
/// Send update to server.
219+
pub async fn standby_status_update(
220+
&self,
221+
write_lsn: Lsn,
222+
flush_lsn: Lsn,
223+
apply_lsn: Lsn,
224+
ts: i64,
225+
reply: u8,
226+
) -> Result<(), Error> {
227+
let iclient = self.0.inner();
228+
let mut buf = BytesMut::new();
229+
let _ = frontend::standby_status_update(
230+
write_lsn.into(),
231+
flush_lsn.into(),
232+
apply_lsn.into(),
233+
ts as i64,
234+
reply,
235+
&mut buf,
236+
);
237+
let _ = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?;
238+
Ok(())
239+
}
216240
}
217241

218242
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {

0 commit comments

Comments
 (0)