- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 487
Support CopyBoth queries and replication mode in config #778
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
petrosagg
wants to merge
5
commits into
sfackler:master
Choose a base branch
from
petrosagg:copy-both
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+653
−6
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
95a3c98
Make simple_query::encode() pub(crate).
jeff-davis bd96437
Connection string config for replication.
jeff-davis 92899e8
implement Stream for Responses
petrosagg bed87a9
add copy_both_simple method
petrosagg 88edd68
ci: enable logical replication in the test image
petrosagg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,358 @@ | ||
use crate::client::{InnerClient, Responses}; | ||
use crate::codec::FrontendMessage; | ||
use crate::{simple_query, Error}; | ||
use bytes::{Buf, BufMut, Bytes, BytesMut}; | ||
use futures_channel::mpsc; | ||
use futures_util::{ready, Sink, SinkExt, Stream, StreamExt}; | ||
use log::debug; | ||
use pin_project_lite::pin_project; | ||
use postgres_protocol::message::backend::Message; | ||
use postgres_protocol::message::frontend; | ||
use postgres_protocol::message::frontend::CopyData; | ||
use std::marker::{PhantomData, PhantomPinned}; | ||
use std::pin::Pin; | ||
use std::task::{Context, Poll}; | ||
|
||
/// The state machine of CopyBothReceiver | ||
/// | ||
/// ```ignore | ||
/// CopyBoth | ||
/// / \ | ||
/// v v | ||
/// CopyOut CopyIn | ||
/// \ / | ||
/// v v | ||
/// CopyNone | ||
/// | | ||
/// v | ||
/// CopyComplete | ||
/// | | ||
/// v | ||
/// CommandComplete | ||
/// ``` | ||
#[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
enum CopyBothState { | ||
/// The state before having entered the CopyBoth mode. | ||
Setup, | ||
/// Initial state where CopyData messages can go in both directions | ||
CopyBoth, | ||
/// The server->client stream is closed and we're in CopyIn mode | ||
CopyIn, | ||
/// The client->server stream is closed and we're in CopyOut mode | ||
CopyOut, | ||
/// Both directions are closed, we waiting for CommandComplete messages | ||
CopyNone, | ||
/// We have received the first CommandComplete message for the copy | ||
CopyComplete, | ||
/// We have received the final CommandComplete message for the statement | ||
CommandComplete, | ||
} | ||
|
||
/// A CopyBothReceiver is responsible for handling the CopyBoth subprotocol. It ensures that no | ||
/// matter what the users do with their CopyBothDuplex handle we're always going to send the | ||
/// correct messages to the backend in order to restore the connection into a usable state. | ||
/// | ||
/// ```ignore | ||
/// | | ||
/// <tokio_postgres owned> | <userland owned> | ||
/// | | ||
/// pg -> Connection -> CopyBothReceiver ---+---> CopyBothDuplex | ||
/// | ^ \ | ||
/// | / v | ||
/// | Sink Stream | ||
/// ``` | ||
pub struct CopyBothReceiver { | ||
/// Receiver of backend messages from the underlying [Connection](crate::Connection) | ||
responses: Responses, | ||
/// Receiver of frontend messages sent by the user using <CopyBothDuplex as Sink> | ||
sink_receiver: mpsc::Receiver<FrontendMessage>, | ||
/// Sender of CopyData contents to be consumed by the user using <CopyBothDuplex as Stream> | ||
stream_sender: mpsc::Sender<Result<Message, Error>>, | ||
/// The current state of the subprotocol | ||
state: CopyBothState, | ||
/// Holds a buffered message until we are ready to send it to the user's stream | ||
buffered_message: Option<Result<Message, Error>>, | ||
} | ||
|
||
impl CopyBothReceiver { | ||
pub(crate) fn new( | ||
responses: Responses, | ||
sink_receiver: mpsc::Receiver<FrontendMessage>, | ||
stream_sender: mpsc::Sender<Result<Message, Error>>, | ||
) -> CopyBothReceiver { | ||
CopyBothReceiver { | ||
responses, | ||
sink_receiver, | ||
stream_sender, | ||
state: CopyBothState::Setup, | ||
buffered_message: None, | ||
} | ||
} | ||
|
||
/// Convenience method to set the subprotocol into an unexpected message state | ||
fn unexpected_message(&mut self) { | ||
self.sink_receiver.close(); | ||
self.buffered_message = Some(Err(Error::unexpected_message())); | ||
self.state = CopyBothState::CommandComplete; | ||
} | ||
|
||
/// Processes messages from the backend, it will resolve once all backend messages have been | ||
/// processed | ||
fn poll_backend(&mut self, cx: &mut Context<'_>) -> Poll<()> { | ||
use CopyBothState::*; | ||
|
||
loop { | ||
// Deliver the buffered message (if any) to the user to ensure we can potentially | ||
// buffer a new one in response to a server message | ||
if let Some(message) = self.buffered_message.take() { | ||
match self.stream_sender.poll_ready(cx) { | ||
Poll::Ready(_) => { | ||
// If the receiver has hung up we'll just drop the message | ||
let _ = self.stream_sender.start_send(message); | ||
} | ||
Poll::Pending => { | ||
// Stash the message and try again later | ||
self.buffered_message = Some(message); | ||
return Poll::Pending; | ||
} | ||
} | ||
} | ||
|
||
match ready!(self.responses.poll_next_unpin(cx)) { | ||
Some(Ok(Message::CopyBothResponse(body))) => match self.state { | ||
Setup => { | ||
self.buffered_message = Some(Ok(Message::CopyBothResponse(body))); | ||
self.state = CopyBoth; | ||
} | ||
_ => self.unexpected_message(), | ||
}, | ||
Some(Ok(Message::CopyData(body))) => match self.state { | ||
CopyBoth | CopyOut => { | ||
self.buffered_message = Some(Ok(Message::CopyData(body))); | ||
} | ||
_ => self.unexpected_message(), | ||
}, | ||
// The server->client stream is done | ||
Some(Ok(Message::CopyDone)) => { | ||
match self.state { | ||
CopyBoth => self.state = CopyIn, | ||
CopyOut => self.state = CopyNone, | ||
_ => self.unexpected_message(), | ||
}; | ||
} | ||
Some(Ok(Message::CommandComplete(_))) => { | ||
match self.state { | ||
CopyNone => self.state = CopyComplete, | ||
CopyComplete => { | ||
self.stream_sender.close_channel(); | ||
self.sink_receiver.close(); | ||
self.state = CommandComplete; | ||
} | ||
_ => self.unexpected_message(), | ||
}; | ||
} | ||
// The server indicated an error, terminate our side if we haven't already | ||
Some(Err(err)) => { | ||
match self.state { | ||
Setup | CopyBoth | CopyOut | CopyIn => { | ||
self.sink_receiver.close(); | ||
self.buffered_message = Some(Err(err)); | ||
self.state = CommandComplete; | ||
petrosagg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
_ => self.unexpected_message(), | ||
}; | ||
} | ||
Some(Ok(Message::ReadyForQuery(_))) => match self.state { | ||
CommandComplete => { | ||
self.sink_receiver.close(); | ||
self.stream_sender.close_channel(); | ||
} | ||
_ => self.unexpected_message(), | ||
}, | ||
Some(Ok(_)) => self.unexpected_message(), | ||
None => return Poll::Ready(()), | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// The [Connection](crate::Connection) will keep polling this stream until it is exhausted. This | ||
/// is the mechanism that drives the CopyBoth subprotocol forward | ||
impl Stream for CopyBothReceiver { | ||
type Item = FrontendMessage; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<FrontendMessage>> { | ||
use CopyBothState::*; | ||
|
||
match self.poll_backend(cx) { | ||
Poll::Ready(()) => Poll::Ready(None), | ||
Poll::Pending => match self.state { | ||
Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_next_unpin(cx)) { | ||
Some(msg) => Poll::Ready(Some(msg)), | ||
None => { | ||
self.state = match self.state { | ||
CopyBoth => CopyOut, | ||
CopyIn => CopyNone, | ||
_ => unreachable!(), | ||
}; | ||
|
||
let mut buf = BytesMut::new(); | ||
frontend::copy_done(&mut buf); | ||
Poll::Ready(Some(FrontendMessage::Raw(buf.freeze()))) | ||
} | ||
}, | ||
_ => Poll::Pending, | ||
}, | ||
} | ||
} | ||
} | ||
|
||
pin_project! { | ||
/// A duplex stream for consuming streaming replication data. | ||
/// | ||
/// Users should ensure that CopyBothDuplex is dropped before attempting to await on a new | ||
/// query. This will ensure that the connection returns into normal processing mode. | ||
/// | ||
/// ```no_run | ||
/// use tokio_postgres::Client; | ||
/// | ||
/// async fn foo(client: &Client) { | ||
/// let duplex_stream = client.copy_both_simple::<&[u8]>("..").await; | ||
/// | ||
/// // ⚠️ INCORRECT ⚠️ | ||
/// client.query("SELECT 1", &[]).await; // hangs forever | ||
/// | ||
/// // duplex_stream drop-ed here | ||
/// } | ||
/// ``` | ||
/// | ||
/// ```no_run | ||
/// use tokio_postgres::Client; | ||
/// | ||
/// async fn foo(client: &Client) { | ||
/// let duplex_stream = client.copy_both_simple::<&[u8]>("..").await; | ||
/// | ||
/// // ✅ CORRECT ✅ | ||
/// drop(duplex_stream); | ||
/// | ||
/// client.query("SELECT 1", &[]).await; | ||
/// } | ||
/// ``` | ||
pub struct CopyBothDuplex<T> { | ||
#[pin] | ||
sink_sender: mpsc::Sender<FrontendMessage>, | ||
#[pin] | ||
stream_receiver: mpsc::Receiver<Result<Message, Error>>, | ||
buf: BytesMut, | ||
#[pin] | ||
_p: PhantomPinned, | ||
_p2: PhantomData<T>, | ||
} | ||
} | ||
|
||
impl<T> Stream for CopyBothDuplex<T> { | ||
type Item = Result<Bytes, Error>; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
Poll::Ready(match ready!(self.project().stream_receiver.poll_next(cx)) { | ||
Some(Ok(Message::CopyData(body))) => Some(Ok(body.into_bytes())), | ||
Some(Ok(_)) => Some(Err(Error::unexpected_message())), | ||
Some(Err(err)) => Some(Err(err)), | ||
None => None, | ||
}) | ||
} | ||
} | ||
|
||
impl<T> Sink<T> for CopyBothDuplex<T> | ||
where | ||
T: Buf + 'static + Send, | ||
{ | ||
type Error = Error; | ||
|
||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { | ||
self.project() | ||
.sink_sender | ||
.poll_ready(cx) | ||
.map_err(|_| Error::closed()) | ||
} | ||
|
||
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Error> { | ||
let this = self.project(); | ||
|
||
let data: Box<dyn Buf + Send> = if item.remaining() > 4096 { | ||
if this.buf.is_empty() { | ||
Box::new(item) | ||
} else { | ||
Box::new(this.buf.split().freeze().chain(item)) | ||
} | ||
} else { | ||
this.buf.put(item); | ||
if this.buf.len() > 4096 { | ||
Box::new(this.buf.split().freeze()) | ||
} else { | ||
return Ok(()); | ||
} | ||
}; | ||
|
||
let data = CopyData::new(data).map_err(Error::encode)?; | ||
this.sink_sender | ||
.start_send(FrontendMessage::CopyData(data)) | ||
.map_err(|_| Error::closed()) | ||
} | ||
|
||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { | ||
let mut this = self.project(); | ||
|
||
if !this.buf.is_empty() { | ||
ready!(this.sink_sender.as_mut().poll_ready(cx)).map_err(|_| Error::closed())?; | ||
let data: Box<dyn Buf + Send> = Box::new(this.buf.split().freeze()); | ||
let data = CopyData::new(data).map_err(Error::encode)?; | ||
this.sink_sender | ||
.as_mut() | ||
.start_send(FrontendMessage::CopyData(data)) | ||
.map_err(|_| Error::closed())?; | ||
} | ||
|
||
this.sink_sender.poll_flush(cx).map_err(|_| Error::closed()) | ||
} | ||
|
||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { | ||
ready!(self.as_mut().poll_flush(cx))?; | ||
let mut this = self.as_mut().project(); | ||
this.sink_sender.disconnect(); | ||
Poll::Ready(Ok(())) | ||
} | ||
} | ||
|
||
pub async fn copy_both_simple<T>( | ||
client: &InnerClient, | ||
query: &str, | ||
) -> Result<CopyBothDuplex<T>, Error> | ||
where | ||
T: Buf + 'static + Send, | ||
{ | ||
debug!("executing copy both query {}", query); | ||
|
||
let buf = simple_query::encode(client, query)?; | ||
|
||
let mut handles = client.start_copy_both()?; | ||
|
||
handles | ||
.sink_sender | ||
.send(FrontendMessage::Raw(buf)) | ||
.await | ||
.map_err(|_| Error::closed())?; | ||
|
||
match handles.stream_receiver.next().await.transpose()? { | ||
Some(Message::CopyBothResponse(_)) => {} | ||
_ => return Err(Error::unexpected_message()), | ||
petrosagg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
Ok(CopyBothDuplex { | ||
stream_receiver: handles.stream_receiver, | ||
sink_sender: handles.sink_sender, | ||
buf: BytesMut::new(), | ||
_p: PhantomPinned, | ||
_p2: PhantomData, | ||
}) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
use futures_util::{future, StreamExt, TryStreamExt}; | ||
use tokio_postgres::{error::SqlState, Client, SimpleQueryMessage, SimpleQueryRow}; | ||
|
||
async fn q(client: &Client, query: &str) -> Vec<SimpleQueryRow> { | ||
let msgs = client.simple_query(query).await.unwrap(); | ||
|
||
msgs.into_iter() | ||
.filter_map(|msg| match msg { | ||
SimpleQueryMessage::Row(row) => Some(row), | ||
_ => None, | ||
}) | ||
.collect() | ||
} | ||
|
||
#[tokio::test] | ||
async fn copy_both_error() { | ||
let client = crate::connect("user=postgres replication=database").await; | ||
|
||
let err = client | ||
.copy_both_simple::<bytes::Bytes>("START_REPLICATION SLOT undefined LOGICAL 0000/0000") | ||
.await | ||
.err() | ||
.unwrap(); | ||
|
||
assert_eq!(err.code(), Some(&SqlState::UNDEFINED_OBJECT)); | ||
|
||
// Ensure we can continue issuing queries | ||
assert_eq!(q(&client, "SELECT 1").await[0].get(0), Some("1")); | ||
} | ||
|
||
#[tokio::test] | ||
async fn copy_both_stream_error() { | ||
let client = crate::connect("user=postgres replication=true").await; | ||
|
||
q(&client, "CREATE_REPLICATION_SLOT err2 PHYSICAL").await; | ||
|
||
// This will immediately error out after entering CopyBoth mode | ||
let duplex_stream = client | ||
.copy_both_simple::<bytes::Bytes>("START_REPLICATION SLOT err2 PHYSICAL FFFF/FFFF") | ||
petrosagg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.await | ||
.unwrap(); | ||
|
||
let mut msgs: Vec<_> = duplex_stream.collect().await; | ||
let result = msgs.pop().unwrap(); | ||
assert_eq!(msgs.len(), 0); | ||
assert!(result.unwrap_err().as_db_error().is_some()); | ||
|
||
// Ensure we can continue issuing queries | ||
assert_eq!(q(&client, "DROP_REPLICATION_SLOT err2").await.len(), 0); | ||
} | ||
|
||
#[tokio::test] | ||
async fn copy_both_stream_error_sync() { | ||
let client = crate::connect("user=postgres replication=database").await; | ||
|
||
q(&client, "CREATE_REPLICATION_SLOT err1 TEMPORARY PHYSICAL").await; | ||
|
||
// This will immediately error out after entering CopyBoth mode | ||
let duplex_stream = client | ||
.copy_both_simple::<bytes::Bytes>("START_REPLICATION SLOT err1 PHYSICAL FFFF/FFFF") | ||
.await | ||
.unwrap(); | ||
|
||
// Immediately close our sink to send a CopyDone before receiving the ErrorResponse | ||
drop(duplex_stream); | ||
|
||
// Ensure we can continue issuing queries | ||
assert_eq!(q(&client, "SELECT 1").await[0].get(0), Some("1")); | ||
} | ||
|
||
#[tokio::test] | ||
async fn copy_both() { | ||
let client = crate::connect("user=postgres replication=database").await; | ||
|
||
q(&client, "DROP TABLE IF EXISTS replication").await; | ||
q(&client, "CREATE TABLE replication (i text)").await; | ||
|
||
let slot_query = "CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL \"test_decoding\""; | ||
let lsn = q(&client, slot_query).await[0] | ||
.get("consistent_point") | ||
.unwrap() | ||
.to_owned(); | ||
|
||
// We will attempt to read this from the other end | ||
q(&client, "BEGIN").await; | ||
let xid = q(&client, "SELECT txid_current()").await[0] | ||
.get("txid_current") | ||
.unwrap() | ||
.to_owned(); | ||
q(&client, "INSERT INTO replication VALUES ('processed')").await; | ||
q(&client, "COMMIT").await; | ||
|
||
// Insert a second row to generate unprocessed messages in the stream | ||
q(&client, "INSERT INTO replication VALUES ('ignored')").await; | ||
|
||
let query = format!("START_REPLICATION SLOT slot LOGICAL {}", lsn); | ||
petrosagg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let duplex_stream = client | ||
.copy_both_simple::<bytes::Bytes>(&query) | ||
.await | ||
.unwrap(); | ||
|
||
let expected = vec![ | ||
format!("BEGIN {}", xid), | ||
"table public.replication: INSERT: i[text]:'processed'".to_string(), | ||
format!("COMMIT {}", xid), | ||
]; | ||
|
||
let actual: Vec<_> = duplex_stream | ||
// Process only XLogData messages | ||
.try_filter(|buf| future::ready(buf[0] == b'w')) | ||
petrosagg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Playback the stream until the first expected message | ||
.try_skip_while(|buf| future::ready(Ok(!buf.ends_with(expected[0].as_ref())))) | ||
// Take only the expected number of messsage, the rest will be discarded by tokio_postgres | ||
.take(expected.len()) | ||
.try_collect() | ||
.await | ||
.unwrap(); | ||
|
||
for (msg, ending) in actual.into_iter().zip(expected.into_iter()) { | ||
assert!(msg.ends_with(ending.as_ref())); | ||
} | ||
|
||
// Ensure we can continue issuing queries | ||
assert_eq!(q(&client, "SELECT 1").await[0].get(0), Some("1")); | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the replication stream, if the timeline is historical, Postgres will send a tuple as a response. So we actually need a function that returns something like
Result<(CopyBothDuplex<T>, Option<SimpleQueryMessage>), Error>
(or maybeResult<(CopyBothDuplex<T>, Option<Vec<SimpleQueryMessage>>), Error>
in case other commands are added in the future which useCopyBoth
and return a set).It's actually very specific to
START_REPLICATION
(and even more specifically, to physical replication), so it might make sense to have a more specific name or at least clarify what it's expecting the command to do. Maybe something likecopy_both_simple_with_result()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, I'll take a look on how we can expose this to users, ideally in a generic way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case you missed my other comment, there's a similar issue for
BASE_BACKUP
, except withCopyOut
instead. That can be a separate PR, though.