Skip to content

Commit b71a1e0

Browse files
committed
support timeline_history
1 parent aed4975 commit b71a1e0

File tree

2 files changed

+74
-2
lines changed

2 files changed

+74
-2
lines changed

tokio-postgres/src/replication_client.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@ impl IdentifySystem {
4141
}
4242
}
4343

44+
#[derive(Debug)]
45+
pub struct TimelineHistory {
46+
filename: Vec<u8>,
47+
content: Vec<u8>,
48+
}
49+
50+
impl TimelineHistory {
51+
pub fn filename(&self) -> &[u8] {
52+
&self.content
53+
}
54+
55+
pub fn content(&self) -> &[u8] {
56+
&self.content
57+
}
58+
}
59+
4460
/// Replication client connection.
4561
///
4662
/// A replication client is used to issue replication commands, begin
@@ -77,9 +93,13 @@ impl ReplicationClient {
7793

7894
assert_eq!(fields.len(), 4);
7995
assert_eq!(fields[0].type_oid(), Type::TEXT.oid());
96+
assert_eq!(fields[0].format(), 0);
8097
assert_eq!(fields[1].type_oid(), Type::INT4.oid());
98+
assert_eq!(fields[1].format(), 0);
8199
assert_eq!(fields[2].type_oid(), Type::TEXT.oid());
100+
assert_eq!(fields[2].format(), 0);
82101
assert_eq!(fields[3].type_oid(), Type::TEXT.oid());
102+
assert_eq!(fields[3].format(), 0);
83103
assert_eq!(ranges.len(), 4);
84104

85105
let values: Vec<Option<&str>> = ranges
@@ -92,7 +112,7 @@ impl ReplicationClient {
92112
.collect::<Vec<_>>();
93113

94114
Ok(IdentifySystem {
95-
systemid: String::from(values[0].unwrap()),
115+
systemid: values[0].unwrap().to_string(),
96116
timeline: values[1].unwrap().parse::<u32>().unwrap(),
97117
xlogpos: Lsn::from(values[2].unwrap()),
98118
dbname: values[3].map(String::from),
@@ -135,6 +155,55 @@ impl ReplicationClient {
135155
Ok(String::from(val))
136156
}
137157

158+
/// show the value of the given setting
159+
pub async fn timeline_history(&self, timeline_id: u32) -> Result<TimelineHistory, Error> {
160+
let iclient = self.0.inner();
161+
let command = format!("TIMELINE_HISTORY {}", timeline_id);
162+
let buf = simple_query::encode(iclient, &command)?;
163+
let mut responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
164+
165+
let rowdesc = match responses.next().await? {
166+
Message::RowDescription(m) => m,
167+
_ => return Err(Error::unexpected_message()),
168+
};
169+
let datarow = match responses.next().await? {
170+
Message::DataRow(m) => m,
171+
_ => return Err(Error::unexpected_message()),
172+
};
173+
match responses.next().await? {
174+
Message::CommandComplete(_) => (),
175+
_ => return Err(Error::unexpected_message()),
176+
};
177+
match responses.next().await? {
178+
Message::ReadyForQuery(_) => (),
179+
_ => return Err(Error::unexpected_message()),
180+
};
181+
182+
let fields = rowdesc.fields().collect::<Vec<_>>().map_err(Error::parse)?;
183+
let ranges = datarow.ranges().collect::<Vec<_>>().map_err(Error::parse)?;
184+
185+
assert_eq!(fields.len(), 2);
186+
187+
// Both fields should actually be labeled BYTEA with format
188+
// code 1 (for binary), because they return raw
189+
// bytes. Unfortunately, postgres returns a misleading
190+
// descriptor.
191+
//assert_eq!(fields[0].type_oid(), Type::BYTEA.oid());
192+
//assert_eq!(fields[0].format(), 1);
193+
//assert_eq!(fields[1].type_oid(), Type::BYTEA.oid());
194+
//assert_eq!(fields[1].format(), 1);
195+
196+
assert_eq!(ranges.len(), 2);
197+
198+
let filename = &datarow.buffer()[ranges[0].to_owned().unwrap()];
199+
let content = &datarow.buffer()[ranges[1].to_owned().unwrap()];
200+
201+
Ok(TimelineHistory {
202+
filename: filename.to_owned(),
203+
content: content.to_owned(),
204+
})
205+
}
206+
138207
/// Begin physical replication, consuming the replication client and producing a replication stream.
139208
///
140209
/// Replication begins starting with the given Log Sequence Number

tokio-postgres/tests/test/replication.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@ const LOGICAL_BEGIN_TAG: u8 = b'B';
88
const LOGICAL_COMMIT_TAG: u8 = b'C';
99
const LOGICAL_INSERT_TAG: u8 = b'I';
1010

11+
// Tests missing for timeline_history(). For a timeline history to be
12+
// available, it requires a point-in-time-recovery or a standby
13+
// promotion; neither of which is done in the current test setup.
14+
1115
// test for:
1216
// - identify_system
1317
// - show
14-
// X timeline history
1518
// - physical replication
1619
#[tokio::test]
1720
async fn physical_replication() {

0 commit comments

Comments
 (0)