-
Notifications
You must be signed in to change notification settings - Fork 10
[PM-20615] Only process incoming messages once #264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Sorry for assigning this to you too @dani-garcia, but I don't think anyone else has experience with Send + Sync |
Great job, no security vulnerabilities found in this Pull Request |
I'm debugging the tests now but apparently they only fail when you run Edit: Fixed |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #264 +/- ##
==========================================
+ Coverage 69.94% 70.08% +0.13%
==========================================
Files 214 212 -2
Lines 16935 17047 +112
==========================================
+ Hits 11846 11947 +101
- Misses 5089 5100 +11 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
{ | ||
crypto: Crypto, | ||
communication: Com, | ||
sessions: Ses, | ||
|
||
incoming: RwLock<Option<tokio::sync::broadcast::Receiver<IncomingMessage>>>, | ||
cancellation_handle: RwLock<Option<tokio::sync::watch::Sender<bool>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way there's a CancellationToken
in tokio-util, which more or less does something similar: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I didn't want to include yet another crate, but I'm having issues with tokio::time
which I'll have to replace with a cancellation handle like this so I'll probably just have to add it
let future = async move { | ||
let com_receiver = client.communication.subscribe().await; | ||
let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY); | ||
|
||
let mut client_incoming = client.incoming.write().await; | ||
*client_incoming = Some(client_rx); | ||
drop(client_incoming); | ||
|
||
await_init_tx | ||
.send(()) | ||
.expect("Sending init signal should not fail"); | ||
|
||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is await_init_tx used to wait until the subscription is done? if so we could just subscribe outside the task.
let future = async move { | |
let com_receiver = client.communication.subscribe().await; | |
let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY); | |
let mut client_incoming = client.incoming.write().await; | |
*client_incoming = Some(client_rx); | |
drop(client_incoming); | |
await_init_tx | |
.send(()) | |
.expect("Sending init signal should not fail"); | |
loop { | |
let com_receiver = client.communication.subscribe().await; | |
let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY); | |
let mut client_incoming = client.incoming.write().await; | |
*client_incoming = Some(client_rx); | |
drop(client_incoming); | |
let future = async move { | |
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure I follow, which task do you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean that currently in the code we create a future that we spawn in the background, but then we use a oneshot channel to synchronize and wait for initialization. If we instead do the initialization outside the task, we can avoid having to do that synchronization. With some pseudocode, this is what we currently have:
let (await_init_tx, await_init_rx) = tokio::sync::oneshot::channel();
// We create a future that gets spawned in the background
let future = async move {
// Do some initialization
some_init_stuff().await;
// Notify that we're initialized
await_init_tx.send(());
// Process messages
loop { ... }
};
// Spawn the task and...
spawn_future(future);
/// wait for the initialization to complete before continuing
await_init_rx.await;
if we pull the initialization code out of the future, then we can avoid the explicit syncrhonization:
// Do some initialization before the future is created, now we don't need to wait as it's already done
some_init_stuff().await;
// We create a future that gets spawned in the background
let future = async move {
// Process messages
loop { ... }
};
// Spawn the task and...
spawn_future(future);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently I had commited this change to the wrong branch, I'm adding it here instead
/// ```ignore | ||
/// let runner = ThreadBoundRunner::new(my_state); | ||
/// ``` | ||
/// # tokio_test::block_on(tokio::task::LocalSet::new().run_until(async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This crate is not a dev dependency so the build is failing
pub trait SessionRepository<Session>: Send + Sync + 'static { | ||
type GetError: Send + Sync + 'static; | ||
type SaveError: Send + Sync + 'static; | ||
type RemoveError: Send + Sync + 'static; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these errors not also require Debug (or Error) to keep it consistent with the others?
}) | ||
.await; | ||
|
||
Ok(result.map_err(|e| e.to_string())??) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clippy lint here about unnecessary ?
.await | ||
.as_ref() | ||
.ok_or(SubscribeError::NotStarted)? | ||
.resubscribe(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-blocking thought: Some of the code for IpcClientTypedSubscription
is fairly similar to the code in IpcClientSubscription
, I wonder if we could define it like IpcClientTypedSubscription<T>(IpcClientSubscription, PhantomData<T>)
or something like that to avoid repetition in their subscribe and receive functions.
(cherry picked from commit 875c3a9)
|
🎟️ Tracking
📔 Objective
BitwardenClient
⏰ Reminders before review
team
🦮 Reviewer guidelines
:+1:
) or similar for great changes:memo:
) or ℹ️ (:information_source:
) for notes or general info:question:
) for questions:thinking:
) or 💭 (:thought_balloon:
) for more open inquiry that's not quite a confirmedissue and could potentially benefit from discussion
:art:
) for suggestions / improvements:x:
) or:warning:
) for more significant problems or concerns needing attention:seedling:
) or ♻️ (:recycle:
) for future improvements or indications of technical debt:pick:
) for minor or nitpick changes