Skip to content

[PM-20615] Only process incoming messages once #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: main
Choose a base branch
from

Conversation

coroiu
Copy link
Contributor

@coroiu coroiu commented May 6, 2025

🎟️ Tracking

📔 Objective

  • Create a new thread/task and delegate message processing to it
  • Make sure CryptoProvider only sees each incoming message once
  • Make everything multi-thread safe
    • Enables a single thread to take ownership of message processing
    • but will also allow us to use the IPC framework with BitwardenClient

⏰ Reminders before review

  • Contributor guidelines followed
  • All formatters and local linters executed and passed
  • Written new unit and / or integration tests where applicable
  • Protected functional changes with optionality (feature flags)
  • Used internationalization (i18n) for all UI strings
  • CI builds passed
  • Communicated to DevOps any deployment requirements
  • Updated any necessary documentation (Confluence, contributing docs) or informed the documentation
    team

🦮 Reviewer guidelines

  • 👍 (:+1:) or similar for great changes
  • 📝 (:memo:) or ℹ️ (:information_source:) for notes or general info
  • ❓ (:question:) for questions
  • 🤔 (:thinking:) or 💭 (:thought_balloon:) for more open inquiry that's not quite a confirmed
    issue and could potentially benefit from discussion
  • 🎨 (:art:) for suggestions / improvements
  • ❌ (:x:) or ⚠️ (:warning:) for more significant problems or concerns needing attention
  • 🌱 (:seedling:) or ♻️ (:recycle:) for future improvements or indications of technical debt
  • ⛏ (:pick:) for minor or nitpick changes

@coroiu
Copy link
Contributor Author

coroiu commented May 6, 2025

Sorry for assigning this to you too @dani-garcia, but I don't think anyone else has experience with Send + Sync

Copy link
Contributor

github-actions bot commented May 6, 2025

Logo
Checkmarx One – Scan Summary & Detailsc1eb3ffd-7c6e-4dcd-a4f9-4727d5ef13ec

Great job, no security vulnerabilities found in this Pull Request

@coroiu
Copy link
Contributor Author

coroiu commented May 6, 2025

I'm debugging the tests now but apparently they only fail when you run cargo test and not cargo test -p bitwarden-ipc --lib

Edit: Fixed

Copy link

codecov bot commented May 6, 2025

Codecov Report

Attention: Patch coverage is 78.04878% with 54 lines in your changes missing coverage. Please review.

Project coverage is 70.08%. Comparing base (046d1e5) to head (76f902c).
Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
crates/bitwarden-ipc/src/ipc_client.rs 91.08% 18 Missing ⚠️
crates/bitwarden-ipc/src/wasm/ipc_client.rs 0.00% 18 Missing ⚠️
...es/bitwarden-ipc/src/wasm/communication_backend.rs 0.00% 16 Missing ⚠️
...tes/bitwarden-ipc/src/traits/session_repository.rs 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #264      +/-   ##
==========================================
+ Coverage   69.94%   70.08%   +0.13%     
==========================================
  Files         214      212       -2     
  Lines       16935    17047     +112     
==========================================
+ Hits        11846    11947     +101     
- Misses       5089     5100      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

{
crypto: Crypto,
communication: Com,
sessions: Ses,

incoming: RwLock<Option<tokio::sync::broadcast::Receiver<IncomingMessage>>>,
cancellation_handle: RwLock<Option<tokio::sync::watch::Sender<bool>>>,
Copy link
Member

@dani-garcia dani-garcia May 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way there's a CancellationToken in tokio-util, which more or less does something similar: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't want to include yet another crate, but I'm having issues with tokio::time which I'll have to replace with a cancellation handle like this so I'll probably just have to add it

Comment on lines 128 to 140
let future = async move {
let com_receiver = client.communication.subscribe().await;
let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY);

let mut client_incoming = client.incoming.write().await;
*client_incoming = Some(client_rx);
drop(client_incoming);

await_init_tx
.send(())
.expect("Sending init signal should not fail");

loop {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is await_init_tx used to wait until the subscription is done? if so we could just subscribe outside the task.

Suggested change
let future = async move {
let com_receiver = client.communication.subscribe().await;
let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY);
let mut client_incoming = client.incoming.write().await;
*client_incoming = Some(client_rx);
drop(client_incoming);
await_init_tx
.send(())
.expect("Sending init signal should not fail");
loop {
let com_receiver = client.communication.subscribe().await;
let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY);
let mut client_incoming = client.incoming.write().await;
*client_incoming = Some(client_rx);
drop(client_incoming);
let future = async move {
loop {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure I follow, which task do you mean?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that currently in the code we create a future that we spawn in the background, but then we use a oneshot channel to synchronize and wait for initialization. If we instead do the initialization outside the task, we can avoid having to do that synchronization. With some pseudocode, this is what we currently have:

let (await_init_tx, await_init_rx) = tokio::sync::oneshot::channel();

// We create a future that gets spawned in the background
let future = async move {
    // Do some initialization
    some_init_stuff().await;

    // Notify that we're initialized
    await_init_tx.send(());

    // Process messages
    loop { ... }
};

// Spawn the task and...
spawn_future(future);

///  wait for the initialization to complete before continuing
await_init_rx.await;

if we pull the initialization code out of the future, then we can avoid the explicit syncrhonization:

// Do some initialization before the future is created, now we don't need to wait as it's already done
some_init_stuff().await;

// We create a future that gets spawned in the background
let future = async move {
    // Process messages
    loop { ... }
};

// Spawn the task and...
spawn_future(future);

@coroiu coroiu requested review from dani-garcia and removed request for dani-garcia May 19, 2025 13:19
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently I had commited this change to the wrong branch, I'm adding it here instead

/// ```ignore
/// let runner = ThreadBoundRunner::new(my_state);
/// ```
/// # tokio_test::block_on(tokio::task::LocalSet::new().run_until(async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This crate is not a dev dependency so the build is failing

pub trait SessionRepository<Session>: Send + Sync + 'static {
type GetError: Send + Sync + 'static;
type SaveError: Send + Sync + 'static;
type RemoveError: Send + Sync + 'static;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these errors not also require Debug (or Error) to keep it consistent with the others?

})
.await;

Ok(result.map_err(|e| e.to_string())??)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clippy lint here about unnecessary ?

.await
.as_ref()
.ok_or(SubscribeError::NotStarted)?
.resubscribe(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking thought: Some of the code for IpcClientTypedSubscription is fairly similar to the code in IpcClientSubscription, I wonder if we could define it like IpcClientTypedSubscription<T>(IpcClientSubscription, PhantomData<T>) or something like that to avoid repetition in their subscribe and receive functions.

Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants