diff --git a/docker-compose.yml b/docker-compose.yml
index d44fbe866..4fd5b2744 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -3,4 +3,9 @@ services:
   postgres:
     image: "sfackler/rust-postgres-test:6"
     ports:
-    - 5433:5433
+      - 5433:5433
+    environment:
+      - POSTGRES_PASSWORD=pass
+    volumes:
+      - "./docker/sql_setup.sh:/docker-entrypoint-initdb.d/sql_setup.sh"
+
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 1dd7f3db6..9d45694dd 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -1,3 +1,4 @@
 FROM postgres:12
 
 COPY sql_setup.sh /docker-entrypoint-initdb.d/
+RUN apt-get update && apt-get install postgresql-12-wal2json
diff --git a/docker/sql_setup.sh b/docker/sql_setup.sh
index 422dcbda9..687ab92ea 100755
--- a/docker/sql_setup.sh
+++ b/docker/sql_setup.sh
@@ -64,6 +64,8 @@ port = 5433
 ssl = on
 ssl_cert_file = 'server.crt'
 ssl_key_file = 'server.key'
+wal_level = logical
+log_statement = 'all'
 EOCONF
 
 cat > "$PGDATA/pg_hba.conf" <<-EOCONF
diff --git a/postgres-protocol/src/message/backend.rs b/postgres-protocol/src/message/backend.rs
index 68b5aa6e5..b13401155 100644
--- a/postgres-protocol/src/message/backend.rs
+++ b/postgres-protocol/src/message/backend.rs
@@ -32,6 +32,7 @@ pub const PARAMETER_STATUS_TAG: u8 = b'S';
 pub const PARAMETER_DESCRIPTION_TAG: u8 = b't';
 pub const ROW_DESCRIPTION_TAG: u8 = b'T';
 pub const READY_FOR_QUERY_TAG: u8 = b'Z';
+pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
 
 #[derive(Debug, Copy, Clone)]
 pub struct Header {
@@ -93,6 +94,7 @@ pub enum Message {
     CopyDone,
     CopyInResponse(CopyInResponseBody),
     CopyOutResponse(CopyOutResponseBody),
+    CopyBothResponse(CopyBothResponseBody),
     DataRow(DataRowBody),
     EmptyQueryResponse,
     ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
                     storage,
                 })
             }
+            COPY_BOTH_RESPONSE_TAG => {
+                let format = buf.read_u8()?;
+                let len = buf.read_u16::<BigEndian>()?;
+                let storage = buf.read_all();
+                Message::CopyBothResponse(CopyBothResponseBody {
+                    format,
+                    len,
+                    storage,
+                })
+            }
             EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
             BACKEND_KEY_DATA_TAG => {
                 let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,27 @@ impl CopyOutResponseBody {
     }
 }
 
+pub struct CopyBothResponseBody {
+    storage: Bytes,
+    len: u16,
+    format: u8,
+}
+
+impl CopyBothResponseBody {
+    #[inline]
+    pub fn format(&self) -> u8 {
+        self.format
+    }
+
+    #[inline]
+    pub fn column_formats(&self) -> ColumnFormats<'_> {
+        ColumnFormats {
+            remaining: self.len,
+            buf: &self.storage,
+        }
+    }
+}
+
 pub struct DataRowBody {
     storage: Bytes,
     len: u16,
diff --git a/postgres-protocol/src/message/frontend.rs b/postgres-protocol/src/message/frontend.rs
index 8587cd080..ed7329984 100644
--- a/postgres-protocol/src/message/frontend.rs
+++ b/postgres-protocol/src/message/frontend.rs
@@ -132,6 +132,26 @@ pub fn close(variant: u8, name: &str, buf: &mut BytesMut) -> io::Result<()> {
     })
 }
 
+#[inline]
+pub fn standby_status_update(
+    write_lsn: i64,
+    flush_lsn: i64,
+    apply_lsn: i64,
+    timestamp: i64,
+    buf: &mut BytesMut,
+) -> io::Result<()> {
+    buf.put_u8(b'd');
+    write_body(buf, |buf| {
+        buf.put_u8(b'r');
+        buf.put_i64(write_lsn + 1);
+        buf.put_i64(flush_lsn + 1);
+        buf.put_i64(apply_lsn + 1);
+        buf.put_i64(timestamp);
+        buf.put_u8(0);
+        Ok(())
+    })
+}
+
 pub struct CopyData<T> {
     buf: T,
     len: i32,
diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs
index e19caae83..f9dbe0e76 100644
--- a/tokio-postgres/src/client.rs
+++ b/tokio-postgres/src/client.rs
@@ -3,6 +3,7 @@ use crate::config::{Host, SslMode};
 use crate::connection::{Request, RequestMessages};
 use crate::copy_out::CopyOutStream;
 use crate::query::RowStream;
+use crate::replication::ReplicationStream;
 use crate::simple_query::SimpleQueryStream;
 #[cfg(feature = "runtime")]
 use crate::tls::MakeTlsConnect;
@@ -11,8 +12,9 @@ use crate::types::{Oid, ToSql, Type};
 #[cfg(feature = "runtime")]
 use crate::Socket;
 use crate::{
-    copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
-    Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
+    copy_in, copy_out, prepare, query, replication, simple_query, slice_iter, CancelToken,
+    CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
+    TransactionBuilder,
 };
 use bytes::{Buf, BytesMut};
 use fallible_iterator::FallibleIterator;
@@ -433,6 +435,26 @@ impl Client {
         copy_out::copy_out(self.inner(), statement).await
     }
 
+    /// Executes a 'START_REPLICATION SLOT ...', returning a stream of raw replication events
+    pub async fn start_replication(&self, query: &str) -> Result<ReplicationStream, Error> {
+        replication::start_replication(self.inner(), query).await
+    }
+
+    /// Stoppes the current replication by sending a copy_done message
+    pub async fn stop_replication(&self) -> Result<(), Error> {
+        replication::stop_replication(self.inner()).await
+    }
+
+    /// Notifies PostgreSQL of the last processed WAL
+    pub async fn standby_status_update(
+        &self,
+        write_lsn: i64,
+        flush_lsn: i64,
+        apply_lsn: i64,
+    ) -> Result<(), Error> {
+        replication::standby_status_update(self.inner(), write_lsn, flush_lsn, apply_lsn).await
+    }
+
     /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
     ///
     /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
diff --git a/tokio-postgres/src/codec.rs b/tokio-postgres/src/codec.rs
index 9d078044b..732bc23b7 100644
--- a/tokio-postgres/src/codec.rs
+++ b/tokio-postgres/src/codec.rs
@@ -4,12 +4,22 @@ use postgres_protocol::message::backend;
 use postgres_protocol::message::frontend::CopyData;
 use std::io;
 use tokio_util::codec::{Decoder, Encoder};
+//use std::fmt;
 
 pub enum FrontendMessage {
     Raw(Bytes),
     CopyData(CopyData<Box<dyn Buf + Send>>),
 }
 
+// impl fmt::Debug for FrontendMessage {
+//     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+//         match self {
+//             FrontendMessage::Raw(b) => write!(f, "FrontendMessage::Raw({:?})", b),
+//             FrontendMessage::CopyData(b) => write!(f, "FrontendMessage::CopyData({:?})", "***"),
+//         }
+//     }
+// }
+
 pub enum BackendMessage {
     Normal {
         messages: BackendMessages,
diff --git a/tokio-postgres/src/config.rs b/tokio-postgres/src/config.rs
index da171cc79..da6aaa91e 100644
--- a/tokio-postgres/src/config.rs
+++ b/tokio-postgres/src/config.rs
@@ -159,6 +159,7 @@ pub struct Config {
     pub(crate) keepalives_idle: Duration,
     pub(crate) target_session_attrs: TargetSessionAttrs,
     pub(crate) channel_binding: ChannelBinding,
+    pub(crate) replication: Option<String>,
 }
 
 impl Default for Config {
@@ -184,6 +185,7 @@ impl Config {
             keepalives_idle: Duration::from_secs(2 * 60 * 60),
             target_session_attrs: TargetSessionAttrs::Any,
             channel_binding: ChannelBinding::Prefer,
+            replication: None,
         }
     }
 
@@ -387,6 +389,17 @@ impl Config {
         self.channel_binding
     }
 
+    /// TODO!
+    pub fn replication(&mut self, replication: &str) -> &mut Config {
+        self.replication = Some(replication.to_string());
+        self
+    }
+
+    /// TODO!
+    pub fn get_replication(&self) -> Option<&str> {
+        self.replication.as_deref()
+    }
+
     fn param(&mut self, key: &str, value: &str) -> Result<(), Error> {
         match key {
             "user" => {
@@ -476,6 +489,9 @@ impl Config {
                 };
                 self.channel_binding(channel_binding);
             }
+            "replication" => {
+                self.replication(&value);
+            }
             key => {
                 return Err(Error::config_parse(Box::new(UnknownOption(
                     key.to_string(),
@@ -548,6 +564,7 @@ impl fmt::Debug for Config {
             .field("keepalives_idle", &self.keepalives_idle)
             .field("target_session_attrs", &self.target_session_attrs)
             .field("channel_binding", &self.channel_binding)
+            .field("replication", &self.replication)
             .finish()
     }
 }
diff --git a/tokio-postgres/src/connect_raw.rs b/tokio-postgres/src/connect_raw.rs
index d07d5a2df..e7bd44ce1 100644
--- a/tokio-postgres/src/connect_raw.rs
+++ b/tokio-postgres/src/connect_raw.rs
@@ -124,6 +124,9 @@ where
     if let Some(application_name) = &config.application_name {
         params.push(("application_name", &**application_name));
     }
+    if let Some(replication) = &config.replication {
+        params.push(("replication", &**replication));
+    }
 
     let mut buf = BytesMut::new();
     frontend::startup_message(params, &mut buf).map_err(Error::encode)?;
diff --git a/tokio-postgres/src/lib.rs b/tokio-postgres/src/lib.rs
index c69fff793..bfa5e7a42 100644
--- a/tokio-postgres/src/lib.rs
+++ b/tokio-postgres/src/lib.rs
@@ -126,6 +126,7 @@ pub use crate::error::Error;
 pub use crate::generic_client::GenericClient;
 pub use crate::portal::Portal;
 pub use crate::query::RowStream;
+pub use crate::replication::ReplicationStream;
 pub use crate::row::{Row, SimpleQueryRow};
 pub use crate::simple_query::SimpleQueryStream;
 #[cfg(feature = "runtime")]
@@ -163,6 +164,7 @@ mod maybe_tls_stream;
 mod portal;
 mod prepare;
 mod query;
+mod replication;
 pub mod row;
 mod simple_query;
 #[cfg(feature = "runtime")]
diff --git a/tokio-postgres/src/replication.rs b/tokio-postgres/src/replication.rs
new file mode 100644
index 000000000..bb860edc6
--- /dev/null
+++ b/tokio-postgres/src/replication.rs
@@ -0,0 +1,89 @@
+use crate::client::{InnerClient, Responses};
+use crate::codec::FrontendMessage;
+use crate::connection::RequestMessages;
+use crate::{simple_query, Error};
+use bytes::{Bytes, BytesMut};
+use futures::{ready, Stream};
+use log::trace;
+use pin_project_lite::pin_project;
+use postgres_protocol::message::backend::Message;
+use postgres_protocol::message::frontend;
+use std::marker::PhantomPinned;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::time::{SystemTime, UNIX_EPOCH};
+const J2000_EPOCH_GAP: u128 = 946_684_800_000_000;
+pub async fn start_replication(
+    client: &InnerClient,
+    query: &str,
+) -> Result<ReplicationStream, Error> {
+    trace!("executing start replication query {}", query);
+
+    let buf = simple_query::encode(client, query)?;
+    let responses = start(client, buf).await?;
+    Ok(ReplicationStream {
+        responses,
+        _p: PhantomPinned,
+    })
+}
+
+pub async fn stop_replication(client: &InnerClient) -> Result<(), Error> {
+    trace!("executing stop replication");
+    let mut buf = BytesMut::new();
+    frontend::copy_done(&mut buf);
+    let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?;
+    Ok(())
+}
+
+pub async fn standby_status_update(
+    client: &InnerClient,
+    write_lsn: i64,
+    flush_lsn: i64,
+    apply_lsn: i64,
+) -> Result<(), Error> {
+    trace!("executing standby_status_update");
+    let now = SystemTime::now()
+        .duration_since(UNIX_EPOCH)
+        .unwrap()
+        .as_micros()
+        - J2000_EPOCH_GAP;
+    let mut buf = BytesMut::new();
+    let _ = frontend::standby_status_update(write_lsn, flush_lsn, apply_lsn, now as i64, &mut buf);
+    let _ = client.send(RequestMessages::Single(FrontendMessage::Raw(buf.freeze())))?;
+    Ok(())
+}
+
+async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
+    let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
+    trace!("start in repication");
+
+    match responses.next().await? {
+        Message::CopyBothResponse(_) => {}
+        _ => return Err(Error::unexpected_message()),
+    }
+
+    Ok(responses)
+}
+
+pin_project! {
+    /// A stream of `START_REPLICATION` query data.
+    pub struct ReplicationStream {
+        responses: Responses,
+        #[pin]
+        _p: PhantomPinned,
+    }
+}
+
+impl Stream for ReplicationStream {
+    type Item = Result<Bytes, Error>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+        let this = self.project();
+
+        match ready!(this.responses.poll_next(cx)?) {
+            Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
+            Message::CopyDone => Poll::Ready(None),
+            _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
+        }
+    }
+}
diff --git a/tokio-postgres/src/simple_query.rs b/tokio-postgres/src/simple_query.rs
index 82ac35664..ae0d5cef3 100644
--- a/tokio-postgres/src/simple_query.rs
+++ b/tokio-postgres/src/simple_query.rs
@@ -45,7 +45,7 @@ pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Erro
     }
 }
 
-fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
+pub fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
     client.with_buf(|buf| {
         frontend::query(query, buf).map_err(Error::encode)?;
         Ok(buf.split().freeze())
diff --git a/tokio-postgres/tests/test/main.rs b/tokio-postgres/tests/test/main.rs
index b01037edc..d67382c71 100644
--- a/tokio-postgres/tests/test/main.rs
+++ b/tokio-postgres/tests/test/main.rs
@@ -5,6 +5,7 @@ use futures::channel::mpsc;
 use futures::{
     future, join, pin_mut, stream, try_join, FutureExt, SinkExt, StreamExt, TryStreamExt,
 };
+use std::convert::TryInto;
 use std::fmt::Write;
 use std::time::Duration;
 use tokio::net::TcpStream;
@@ -21,7 +22,7 @@ mod parse;
 #[cfg(feature = "runtime")]
 mod runtime;
 mod types;
-
+use std::thread;
 async fn connect_raw(s: &str) -> Result<(Client, Connection<TcpStream, NoTlsStream>), Error> {
     let socket = TcpStream::connect("127.0.0.1:5433").await.unwrap();
     let config = s.parse::<Config>().unwrap();
@@ -799,3 +800,85 @@ async fn query_opt() {
         .err()
         .unwrap();
 }
+
+#[tokio::test]
+async fn replication_start() {
+    let client = connect("user=postgres").await;
+    let r_client = connect("user=postgres replication=database").await;
+
+    client
+        .batch_execute(
+            "CREATE TABLE IF NOT EXISTS replication (
+            id SERIAL,
+            name TEXT
+        );
+        ",
+        )
+        .await
+        .unwrap();
+    let _ = r_client
+        .simple_query(
+            "CREATE_REPLICATION_SLOT rust_slot TEMPORARY LOGICAL test_decoding NOEXPORT_SNAPSHOT",
+        )
+        .await;
+
+    let stream = r_client
+        .start_replication(
+            "START_REPLICATION SLOT rust_slot  LOGICAL 0/0 (\"skip-empty-xacts\" '1')",
+        )
+        .await
+        .unwrap();
+
+    client
+        .query("INSERT INTO replication (name) VALUES ('ann')", &[])
+        .await
+        .unwrap();
+
+    thread::sleep(time::Duration::from_secs(1)); //give a chance to pg to send the events
+    let _ = r_client.stop_replication().await;
+    let events = stream.try_collect::<Vec<_>>().await.unwrap();
+    client.query("DROP TABLE replication", &[]).await.unwrap();
+
+    let mut total_events = 0;
+    for e in &events {
+        match e[0].into() {
+            'k' => {
+                //keepalive message
+                let current_wal_end =
+                    i64::from_be_bytes(e.slice(1..9).as_ref().try_into().unwrap());
+                let timestamp = i64::from_be_bytes(e.slice(9..17).as_ref().try_into().unwrap());
+                let reply: char = e[17].into();
+                println!(
+                    "keepalive tiemstamp: {} current_wal_end: {:X}/{:X} reply: {}",
+                    timestamp,
+                    (current_wal_end >> 32) as i32,
+                    current_wal_end as i32,
+                    reply as i8
+                );
+            }
+            'w' => {
+                // WAL message
+                let current_wal = i64::from_be_bytes(e.slice(1..9).as_ref().try_into().unwrap());
+                let current_wal_end =
+                    i64::from_be_bytes(e.slice(9..17).as_ref().try_into().unwrap());
+                let timestamp = i64::from_be_bytes(e.slice(17..25).as_ref().try_into().unwrap());
+                let _data = e.slice(25..); //the format of these bytes depends on the logical decoder
+                println!(
+                    "WAL timestamp: {} current_wal: {:X}/{:X} current_wal_end: {:X}/{:X} {:?}",
+                    timestamp,
+                    (current_wal >> 32) as i32,
+                    current_wal as i32,
+                    (current_wal_end >> 32) as i32,
+                    current_wal_end as i32,
+                    _data
+                );
+                total_events += 1;
+                //while in replication state, one needs to send updates from time to time like this
+                //let _ = r_pg_client.standby_status_update(current_wal, current_wal, current_wal).await.unwrap();
+            }
+            _ => {}
+        };
+    }
+    // in this case we receive 3 events (BEGIN,INSERT,COMMIT)
+    assert_eq!(total_events, 3);
+}