Skip to content

Commit 1a0a907

Browse files
committed
replication: use SystemTime for timestamps at API boundary
Signed-off-by: Petros Angelatos <[email protected]>
1 parent 39a35f1 commit 1a0a907

File tree

5 files changed

+45
-19
lines changed

5 files changed

+45
-19
lines changed

postgres-protocol/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ byteorder = "1.0"
1414
bytes = "1.0"
1515
fallible-iterator = "0.2"
1616
hmac = "0.10"
17+
lazy_static = "1.4"
1718
md-5 = "0.9"
1819
memchr = "2.0"
1920
rand = "0.8"

postgres-protocol/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
use byteorder::{BigEndian, ByteOrder};
1616
use bytes::{BufMut, BytesMut};
17+
use lazy_static::lazy_static;
1718
use std::io;
19+
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1820

1921
pub mod authentication;
2022
pub mod escape;
@@ -28,6 +30,11 @@ pub type Oid = u32;
2830
/// A Postgres Log Sequence Number (LSN).
2931
pub type Lsn = u64;
3032

33+
lazy_static! {
34+
/// Postgres epoch is 2000-01-01T00:00:00Z
35+
pub static ref PG_EPOCH: SystemTime = UNIX_EPOCH + Duration::from_secs(946_684_800);
36+
}
37+
3138
/// An enum indicating if a value is `NULL` or not.
3239
pub enum IsNull {
3340
/// The value is `NULL`.

postgres-protocol/src/message/backend.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use std::cmp;
88
use std::io::{self, Read};
99
use std::ops::Range;
1010
use std::str;
11+
use std::time::{Duration, SystemTime};
1112

12-
use crate::{Lsn, Oid};
13+
use crate::{Lsn, Oid, PG_EPOCH};
1314

1415
// top-level message tags
1516
pub const PARSE_COMPLETE_TAG: u8 = b'1';
@@ -340,7 +341,12 @@ impl ReplicationMessage<Bytes> {
340341
XLOG_DATA_TAG => {
341342
let wal_start = buf.read_u64::<BigEndian>()?;
342343
let wal_end = buf.read_u64::<BigEndian>()?;
343-
let timestamp = buf.read_i64::<BigEndian>()?;
344+
let ts = buf.read_i64::<BigEndian>()?;
345+
let timestamp = if ts > 0 {
346+
*PG_EPOCH + Duration::from_micros(ts as u64)
347+
} else {
348+
*PG_EPOCH - Duration::from_micros(-ts as u64)
349+
};
344350
let data = buf.read_all();
345351
ReplicationMessage::XLogData(XLogDataBody {
346352
wal_start,
@@ -351,7 +357,12 @@ impl ReplicationMessage<Bytes> {
351357
}
352358
PRIMARY_KEEPALIVE_TAG => {
353359
let wal_end = buf.read_u64::<BigEndian>()?;
354-
let timestamp = buf.read_i64::<BigEndian>()?;
360+
let ts = buf.read_i64::<BigEndian>()?;
361+
let timestamp = if ts > 0 {
362+
*PG_EPOCH + Duration::from_micros(ts as u64)
363+
} else {
364+
*PG_EPOCH - Duration::from_micros(-ts as u64)
365+
};
355366
let reply = buf.read_u8()?;
356367
ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
357368
wal_end,
@@ -894,7 +905,7 @@ impl RowDescriptionBody {
894905
pub struct XLogDataBody<D> {
895906
wal_start: u64,
896907
wal_end: u64,
897-
timestamp: i64,
908+
timestamp: SystemTime,
898909
data: D,
899910
}
900911

@@ -910,7 +921,7 @@ impl<D> XLogDataBody<D> {
910921
}
911922

912923
#[inline]
913-
pub fn timestamp(&self) -> i64 {
924+
pub fn timestamp(&self) -> SystemTime {
914925
self.timestamp
915926
}
916927

@@ -941,7 +952,7 @@ impl<D> XLogDataBody<D> {
941952
#[derive(Debug)]
942953
pub struct PrimaryKeepAliveBody {
943954
wal_end: u64,
944-
timestamp: i64,
955+
timestamp: SystemTime,
945956
reply: u8,
946957
}
947958

@@ -952,7 +963,7 @@ impl PrimaryKeepAliveBody {
952963
}
953964

954965
#[inline]
955-
pub fn timestamp(&self) -> i64 {
966+
pub fn timestamp(&self) -> SystemTime {
956967
self.timestamp
957968
}
958969

tokio-postgres/src/replication.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use postgres_protocol::message::backend::ReplicationMessage;
1010
use postgres_types::PgLsn;
1111
use std::pin::Pin;
1212
use std::task::{Context, Poll};
13+
use std::time::SystemTime;
1314

1415
const STANDBY_STATUS_UPDATE_TAG: u8 = b'r';
1516
const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h';
@@ -38,17 +39,22 @@ impl ReplicationStream {
3839
write_lsn: PgLsn,
3940
flush_lsn: PgLsn,
4041
apply_lsn: PgLsn,
41-
ts: i64,
42+
timestamp: SystemTime,
4243
reply: u8,
4344
) -> Result<(), Error> {
4445
let mut this = self.project();
4546

47+
let timestamp = match timestamp.duration_since(*PG_EPOCH) {
48+
Ok(d) => d.as_micros() as i64,
49+
Err(e) => -(e.duration().as_micros() as i64),
50+
};
51+
4652
let mut buf = BytesMut::new();
4753
buf.put_u8(STANDBY_STATUS_UPDATE_TAG);
4854
buf.put_u64(write_lsn.into());
4955
buf.put_u64(flush_lsn.into());
5056
buf.put_u64(apply_lsn.into());
51-
buf.put_i64(ts);
57+
buf.put_i64(timestamp);
5258
buf.put_u8(reply);
5359

5460
this.stream.send(buf.freeze()).await
@@ -57,14 +63,19 @@ impl ReplicationStream {
5763
/// Send hot standby feedback message to server.
5864
pub async fn hot_standby_feedback(
5965
self: Pin<&mut Self>,
60-
timestamp: i64,
66+
timestamp: SystemTime,
6167
global_xmin: u32,
6268
global_xmin_epoch: u32,
6369
catalog_xmin: u32,
6470
catalog_xmin_epoch: u32,
6571
) -> Result<(), Error> {
6672
let mut this = self.project();
6773

74+
let timestamp = match timestamp.duration_since(*PG_EPOCH) {
75+
Ok(d) => d.as_micros() as i64,
76+
Err(e) => -(e.duration().as_micros() as i64),
77+
};
78+
6879
let mut buf = BytesMut::new();
6980
buf.put_u8(HOT_STANDBY_FEEDBACK_TAG);
7081
buf.put_i64(timestamp);
@@ -118,19 +129,19 @@ impl LogicalReplicationStream {
118129
write_lsn: PgLsn,
119130
flush_lsn: PgLsn,
120131
apply_lsn: PgLsn,
121-
ts: i64,
132+
timestamp: SystemTime,
122133
reply: u8,
123134
) -> Result<(), Error> {
124135
let this = self.project();
125136
this.stream
126-
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply)
137+
.standby_status_update(write_lsn, flush_lsn, apply_lsn, timestamp, reply)
127138
.await
128139
}
129140

130141
/// Send hot standby feedback message to server.
131142
pub async fn hot_standby_feedback(
132143
self: Pin<&mut Self>,
133-
timestamp: i64,
144+
timestamp: SystemTime,
134145
global_xmin: u32,
135146
global_xmin_epoch: u32,
136147
catalog_xmin: u32,

tokio-postgres/tests/test/replication.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use futures::StreamExt;
2-
use std::time::{Duration, UNIX_EPOCH};
2+
use std::time::SystemTime;
33

44
use postgres_protocol::message::backend::LogicalReplicationMessage::{Begin, Commit, Insert};
55
use postgres_protocol::message::backend::ReplicationMessage::*;
@@ -130,13 +130,9 @@ async fn test_replication() {
130130

131131
// Send a standby status update and require a keep alive response
132132
let lsn: PgLsn = lsn.parse().unwrap();
133-
134-
// Postgres epoch is 2000-01-01T00:00:00Z
135-
let pg_epoch = UNIX_EPOCH + Duration::from_secs(946_684_800);
136-
let ts = pg_epoch.elapsed().unwrap().as_micros() as i64;
137133
stream
138134
.as_mut()
139-
.standby_status_update(lsn, lsn, lsn, ts, 1)
135+
.standby_status_update(lsn, lsn, lsn, SystemTime::now(), 1)
140136
.await
141137
.unwrap();
142138
loop {

0 commit comments

Comments
 (0)