diff --git a/Cargo.lock b/Cargo.lock index 8fe912b2c..895980c07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,6 +192,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.88" @@ -505,7 +527,9 @@ name = "bitwarden-ipc" version = "1.0.0" dependencies = [ "bitwarden-error", + "bitwarden-threading", "js-sys", + "log", "serde", "serde_json", "thiserror 2.0.12", @@ -584,6 +608,7 @@ dependencies = [ "serde", "thiserror 2.0.12", "tokio", + "tokio-test", "tokio-util", "tsify-next", "wasm-bindgen", @@ -4059,6 +4084,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.15" diff --git a/crates/bitwarden-ipc/Cargo.toml b/crates/bitwarden-ipc/Cargo.toml index ac7a105d0..054ceae46 100644 --- a/crates/bitwarden-ipc/Cargo.toml +++ b/crates/bitwarden-ipc/Cargo.toml @@ -15,22 +15,22 @@ wasm = [ "dep:wasm-bindgen", "dep:wasm-bindgen-futures", "dep:js-sys", - "bitwarden-error/wasm" + "bitwarden-error/wasm", + "bitwarden-threading/wasm" ] # WASM support [dependencies] bitwarden-error = { workspace = true } +bitwarden-threading = { workspace = true } js-sys = { workspace = true, optional = true } +log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } -tokio = { features = ["sync", "time"], workspace = true } +tokio = { features = ["sync", "time", "rt"], workspace = true } tsify-next = { workspace = true, optional = true } wasm-bindgen = { workspace = true, optional = true } wasm-bindgen-futures = { workspace = true, optional = true } -[dev-dependencies] -tokio = { workspace = true, features = ["rt"] } - [lints] workspace = true diff --git a/crates/bitwarden-ipc/src/constants.rs b/crates/bitwarden-ipc/src/constants.rs new file mode 100644 index 000000000..d97d32f97 --- /dev/null +++ b/crates/bitwarden-ipc/src/constants.rs @@ -0,0 +1 @@ +pub(super) const CHANNEL_BUFFER_CAPACITY: usize = 50; diff --git a/crates/bitwarden-ipc/src/error.rs b/crates/bitwarden-ipc/src/error.rs deleted file mode 100644 index 16575ee6a..000000000 --- a/crates/bitwarden-ipc/src/error.rs +++ /dev/null @@ -1,49 +0,0 @@ -use thiserror::Error; - -#[derive(Clone, Debug, Error, PartialEq, Eq)] -pub enum SendError { - #[error("Crypto error: {0}")] - Crypto(Crypto), - - #[error("Communication error: {0}")] - Communication(Com), -} - -#[derive(Clone, Debug, Error, PartialEq, Eq)] -pub enum ReceiveError { - #[error("The receive operation timed out")] - Timeout, - - #[error("Crypto error: {0}")] - Crypto(Crypto), - - #[error("Communication error: {0}")] - Communication(Com), -} - -#[derive(Clone, Debug, Error, PartialEq, Eq)] -pub enum TypedReceiveError { - #[error("Typing error: {0}")] - Typing(Typing), - - #[error("The receive operation timed out")] - Timeout, - - #[error("Crypto error: {0}")] - Crypto(Crypto), - - #[error("Communication error: {0}")] - Communication(Com), -} - -impl From> - for TypedReceiveError -{ - fn from(value: ReceiveError) -> Self { - match value { - ReceiveError::Timeout => TypedReceiveError::Timeout, - ReceiveError::Crypto(crypto) => TypedReceiveError::Crypto(crypto), - ReceiveError::Communication(com) => TypedReceiveError::Communication(com), - } - } -} diff --git a/crates/bitwarden-ipc/src/ipc_client.rs b/crates/bitwarden-ipc/src/ipc_client.rs index 469f51652..28a4b6708 100644 --- a/crates/bitwarden-ipc/src/ipc_client.rs +++ b/crates/bitwarden-ipc/src/ipc_client.rs @@ -1,37 +1,39 @@ -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; + +use bitwarden_error::bitwarden_error; +use bitwarden_threading::cancellation_token::CancellationToken; +use thiserror::Error; +use tokio::{select, sync::RwLock}; use crate::{ - error::{ReceiveError, SendError, TypedReceiveError}, + constants::CHANNEL_BUFFER_CAPACITY, message::{IncomingMessage, OutgoingMessage, PayloadTypeName, TypedIncomingMessage}, - traits::{ - CommunicationBackend, CommunicationBackendReceiver, CryptoProvider, SessionRepository, - }, + traits::{CommunicationBackend, CryptoProvider, SessionRepository}, }; -#[allow(missing_docs)] +/// An IPC client that handles communication between different components and clients. +/// It uses a crypto provider to encrypt and decrypt messages, a communication backend to send and +/// receive messages, and a session repository to persist sessions. pub struct IpcClient where Crypto: CryptoProvider, Com: CommunicationBackend, - Ses: SessionRepository, + Ses: SessionRepository, { crypto: Crypto, communication: Com, sessions: Ses, + + incoming: RwLock>>, + cancellation_token: RwLock>, } /// A subscription to receive messages over IPC. /// The subcription will start buffering messages after its creation and return them /// when receive() is called. Messages received before the subscription was created will not be /// returned. -pub struct IpcClientSubscription -where - Crypto: CryptoProvider, - Com: CommunicationBackend, - Ses: SessionRepository, -{ - receiver: Com::Receiver, - client: Arc>, +pub struct IpcClientSubscription { + receiver: tokio::sync::broadcast::Receiver, topic: Option, } @@ -39,41 +41,164 @@ where /// The subcription will start buffering messages after its creation and return them /// when receive() is called. Messages received before the subscription was created will not be /// returned. -pub struct IpcClientTypedSubscription -where - Crypto: CryptoProvider, - Com: CommunicationBackend, - Ses: SessionRepository, - Payload: TryFrom> + PayloadTypeName, -{ - receiver: Com::Receiver, - client: Arc>, - _payload: std::marker::PhantomData, +pub struct IpcClientTypedSubscription> + PayloadTypeName>( + IpcClientSubscription, + std::marker::PhantomData, +); + +#[derive(Debug, Error, Clone, PartialEq, Eq)] +#[bitwarden_error(flat)] +#[allow(missing_docs)] +pub enum SubscribeError { + #[error("The IPC processing thread is not running")] + NotStarted, +} + +#[derive(Debug, Error, PartialEq, Eq)] +#[bitwarden_error(flat)] +#[allow(missing_docs)] +pub enum ReceiveError { + #[error("Failed to subscribe to the IPC channel: {0}")] + Channel(#[from] tokio::sync::broadcast::error::RecvError), + + #[error("Timed out while waiting for a message: {0}")] + Timeout(#[from] tokio::time::error::Elapsed), + + #[error("Cancelled while waiting for a message")] + Cancelled, +} + +#[derive(Debug, Error, PartialEq, Eq)] +#[bitwarden_error(flat)] +#[allow(missing_docs)] +pub enum TypedReceiveError { + #[error("Failed to subscribe to the IPC channel: {0}")] + Channel(#[from] tokio::sync::broadcast::error::RecvError), + + #[error("Timed out while waiting for a message: {0}")] + Timeout(#[from] tokio::time::error::Elapsed), + + #[error("Cancelled while waiting for a message")] + Cancelled, + + #[error("Typing error: {0}")] + Typing(String), +} + +impl From for TypedReceiveError { + fn from(value: ReceiveError) -> Self { + match value { + ReceiveError::Channel(e) => TypedReceiveError::Channel(e), + ReceiveError::Timeout(e) => TypedReceiveError::Timeout(e), + ReceiveError::Cancelled => TypedReceiveError::Cancelled, + } + } } impl IpcClient where Crypto: CryptoProvider, Com: CommunicationBackend, - Ses: SessionRepository, + Ses: SessionRepository, { - #[allow(missing_docs)] + /// Create a new IPC client with the provided crypto provider, communication backend, and + /// session repository. pub fn new(crypto: Crypto, communication: Com, sessions: Ses) -> Arc { Arc::new(Self { crypto, communication, sessions, + + incoming: RwLock::new(None), + cancellation_token: RwLock::new(None), }) } + /// Start the IPC client, which will begin listening for incoming messages and processing them. + /// This will be done in a separate task, allowing the client to receive messages + /// asynchronously. The client will stop automatically if an error occurs during message + /// processing or if the cancellation token is triggered. + /// + /// Note: The client can and will send messages in the background while it is running, even if + /// no active subscriptions are present. + pub async fn start(self: &Arc) { + let cancellation_token = CancellationToken::new(); + self.cancellation_token + .write() + .await + .replace(cancellation_token.clone()); + + let com_receiver = self.communication.subscribe().await; + let (client_tx, client_rx) = tokio::sync::broadcast::channel(CHANNEL_BUFFER_CAPACITY); + + self.incoming.write().await.replace(client_rx); + + let client = self.clone(); + let future = async move { + loop { + select! { + _ = cancellation_token.cancelled() => { + log::debug!("Cancellation signal received, stopping IPC client"); + break; + } + received = client.crypto.receive(&com_receiver, &client.communication, &client.sessions) => { + match received { + Ok(message) => { + if client_tx.send(message).is_err() { + log::error!("Failed to save incoming message"); + break; + }; + } + Err(e) => { + log::error!("Error receiving message: {:?}", e); + break; + } + } + } + } + } + log::debug!("IPC client shutting down"); + client.stop().await; + }; + + #[cfg(not(target_arch = "wasm32"))] + tokio::spawn(future); + + #[cfg(target_arch = "wasm32")] + wasm_bindgen_futures::spawn_local(future); + } + + /// Check if the IPC client task is currently running. + pub async fn is_running(self: &Arc) -> bool { + let has_incoming = self.incoming.read().await.is_some(); + let has_cancellation_token = self.cancellation_token.read().await.is_some(); + has_incoming && has_cancellation_token + } + + /// Stop the IPC client task. This will stop listening for incoming messages. + pub async fn stop(self: &Arc) { + let mut incoming = self.incoming.write().await; + let _ = incoming.take(); + + let mut cancellation_token = self.cancellation_token.write().await; + if let Some(cancellation_token) = cancellation_token.take() { + cancellation_token.cancel(); + } + } + /// Send a message - pub async fn send( - self: &Arc, - message: OutgoingMessage, - ) -> Result<(), SendError> { - self.crypto + pub async fn send(self: &Arc, message: OutgoingMessage) -> Result<(), Crypto::SendError> { + let result = self + .crypto .send(&self.communication, &self.sessions, message) - .await + .await; + + if result.is_err() { + log::error!("Error sending message: {:?}", result); + self.stop().await; + } + + result } /// Create a subscription to receive messages, optionally filtered by topic. @@ -81,155 +206,97 @@ where pub async fn subscribe( self: &Arc, topic: Option, - ) -> IpcClientSubscription { - IpcClientSubscription { - receiver: self.communication.subscribe().await, - client: self.clone(), + ) -> Result { + Ok(IpcClientSubscription { + receiver: self + .incoming + .read() + .await + .as_ref() + .ok_or(SubscribeError::NotStarted)? + .resubscribe(), topic, - } + }) } /// Create a subscription to receive messages that can be deserialized into the provided payload /// type. pub async fn subscribe_typed( self: &Arc, - ) -> IpcClientTypedSubscription + ) -> Result, SubscribeError> where Payload: TryFrom> + PayloadTypeName, { - IpcClientTypedSubscription { - receiver: self.communication.subscribe().await, - client: self.clone(), - _payload: std::marker::PhantomData, - } + Ok(IpcClientTypedSubscription( + self.subscribe(Some(Payload::name())).await?, + std::marker::PhantomData, + )) } +} +impl IpcClientSubscription { /// Receive a message, optionally filtering by topic. - /// Setting the topic to `None` will receive all messages. - /// Setting the timeout to `None` will wait indefinitely. - async fn receive( - &self, - receiver: &Com::Receiver, - topic: &Option, - timeout: Option, - ) -> Result< - IncomingMessage, - ReceiveError< - Crypto::ReceiveError, - ::ReceiveError, - >, - > { - let receive_loop = async { - loop { - let received = self - .crypto - .receive(receiver, &self.communication, &self.sessions) - .await?; - if topic.is_none() || &received.topic == topic { - return Ok(received); + /// Setting the cancellation_token to `None` will wait indefinitely. + pub async fn receive( + &mut self, + cancellation_token: Option, + ) -> Result { + let cancellation_token = cancellation_token.unwrap_or_default(); + + loop { + select! { + _ = cancellation_token.cancelled() => { + return Err(ReceiveError::Cancelled) + } + result = self.receiver.recv() => { + let received = result?; + if self.topic.is_none() || received.topic == self.topic { + return Ok::(received); + } } } - }; - - if let Some(timeout) = timeout { - tokio::time::timeout(timeout, receive_loop) - .await - .map_err(|_| ReceiveError::Timeout)? - } else { - receive_loop.await } } - - /// Receive a message, skipping any messages that cannot be deserialized into the expected - /// payload type. - async fn receive_typed( - &self, - receiver: &Com::Receiver, - timeout: Option, - ) -> Result< - TypedIncomingMessage, - TypedReceiveError< - >>::Error, - Crypto::ReceiveError, - ::ReceiveError, - >, - > - where - Payload: TryFrom> + PayloadTypeName, - { - let topic = Some(Payload::name()); - let received = self.receive(receiver, &topic, timeout).await?; - received.try_into().map_err(TypedReceiveError::Typing) - } } -impl IpcClientSubscription +impl IpcClientTypedSubscription where - Crypto: CryptoProvider, - Com: CommunicationBackend, - Ses: SessionRepository, -{ - /// Receive a message, optionally filtering by topic. - /// Setting the timeout to `None` will wait indefinitely. - pub async fn receive( - &self, - timeout: Option, - ) -> Result< - IncomingMessage, - ReceiveError< - Crypto::ReceiveError, - ::ReceiveError, - >, - > { - self.client - .receive(&self.receiver, &self.topic, timeout) - .await - } -} - -impl IpcClientTypedSubscription -where - Crypto: CryptoProvider, - Com: CommunicationBackend, - Ses: SessionRepository, - Payload: TryFrom> + PayloadTypeName, + Payload: TryFrom, Error = TryFromError> + PayloadTypeName, + TryFromError: std::fmt::Display, { /// Receive a message. - /// Setting the timeout to `None` will wait indefinitely. + /// Setting the cancellation_token to `None` will wait indefinitely. pub async fn receive( - &self, - timeout: Option, - ) -> Result< - TypedIncomingMessage, - TypedReceiveError< - >>::Error, - Crypto::ReceiveError, - ::ReceiveError, - >, - > { - self.client.receive_typed(&self.receiver, timeout).await + &mut self, + cancellation_token: Option, + ) -> Result, TypedReceiveError> { + let received = self.0.receive(cancellation_token).await?; + received + .try_into() + .map_err(|e: TryFromError| TypedReceiveError::Typing(e.to_string())) } } #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::{collections::HashMap, time::Duration}; + use bitwarden_threading::time::sleep; use serde::{Deserialize, Serialize}; use super::*; use crate::{ endpoint::Endpoint, traits::{ - tests::{TestCommunicationBackend, TestCommunicationBackendReceiveError}, - InMemorySessionRepository, NoEncryptionCryptoProvider, + tests::TestCommunicationBackend, InMemorySessionRepository, NoEncryptionCryptoProvider, }, }; struct TestCryptoProvider { - send_result: Result<(), SendError>, - receive_result: - Result>, + /// Simulate a send result. Set to `None` wait indefinitely + send_result: Option>, + /// Simulate a receive result. Set to `None` wait indefinitely + receive_result: Option>, } type TestSessionRepository = InMemorySessionRepository; @@ -243,9 +310,15 @@ mod tests { _receiver: &::Receiver, _communication: &TestCommunicationBackend, _sessions: &TestSessionRepository, - ) -> Result> - { - self.receive_result.clone() + ) -> Result { + match &self.receive_result { + Some(result) => result.clone(), + None => { + // Simulate waiting for a message but never returning + sleep(Duration::from_secs(600)).await; + Err("Simulated timeout".to_string()) + } + } } async fn send( @@ -253,14 +326,15 @@ mod tests { _communication: &TestCommunicationBackend, _sessions: &TestSessionRepository, _message: OutgoingMessage, - ) -> Result< - (), - SendError< - Self::SendError, - ::SendError, - >, - > { - self.send_result.clone() + ) -> Result<(), Self::SendError> { + match &self.send_result { + Some(result) => result.clone(), + None => { + // Simulate waiting for a message to be send but never returning + sleep(Duration::from_secs(600)).await; + Err("Simulated timeout".to_string()) + } + } } } @@ -272,34 +346,17 @@ mod tests { topic: None, }; let crypto_provider = TestCryptoProvider { - send_result: Err(SendError::Crypto("Crypto error".to_string())), - receive_result: Err(ReceiveError::Crypto( - "Should not have be called".to_string(), - )), + send_result: Some(Err("Crypto error".to_string())), + receive_result: Some(Err("Should not have be called".to_string())), }; let communication_provider = TestCommunicationBackend::new(); let session_map = TestSessionRepository::new(HashMap::new()); let client = IpcClient::new(crypto_provider, communication_provider, session_map); + client.start().await; let error = client.send(message).await.unwrap_err(); - assert_eq!(error, SendError::Crypto("Crypto error".to_string())); - } - - #[tokio::test] - async fn returns_receive_error_when_crypto_provider_returns_error() { - let crypto_provider = TestCryptoProvider { - send_result: Ok(()), - receive_result: Err(ReceiveError::Crypto("Crypto error".to_string())), - }; - let communication_provider = TestCommunicationBackend::new(); - let session_map = TestSessionRepository::new(HashMap::new()); - let client = IpcClient::new(crypto_provider, communication_provider, session_map); - - let subscription = client.subscribe(None).await; - let error = subscription.receive(None).await.unwrap_err(); - - assert_eq!(error, ReceiveError::Crypto("Crypto error".to_string())); + assert_eq!(error, "Crypto error".to_string()); } #[tokio::test] @@ -313,6 +370,7 @@ mod tests { let communication_provider = TestCommunicationBackend::new(); let session_map = InMemorySessionRepository::new(HashMap::new()); let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map); + client.start().await; client.send(message.clone()).await.unwrap(); @@ -332,8 +390,12 @@ mod tests { let communication_provider = TestCommunicationBackend::new(); let session_map = InMemorySessionRepository::new(HashMap::new()); let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map); + client.start().await; - let subscription = &client.subscribe(None).await; + let mut subscription = client + .subscribe(None) + .await + .expect("Subscribing should not fail"); communication_provider.push_incoming(message.clone()); let received_message = subscription.receive(None).await.unwrap(); @@ -359,7 +421,11 @@ mod tests { let communication_provider = TestCommunicationBackend::new(); let session_map = InMemorySessionRepository::new(HashMap::new()); let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map); - let subscription = client.subscribe(Some("matching_topic".to_owned())).await; + client.start().await; + let mut subscription = client + .subscribe(Some("matching_topic".to_owned())) + .await + .expect("Subscribing should not fail"); communication_provider.push_incoming(non_matching_message.clone()); communication_provider.push_incoming(non_matching_message.clone()); communication_provider.push_incoming(matching_message.clone()); @@ -416,10 +482,19 @@ mod tests { let communication_provider = TestCommunicationBackend::new(); let session_map = InMemorySessionRepository::new(HashMap::new()); let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map); - let subscription = client.subscribe_typed::().await; + client.start().await; + let mut subscription = client + .subscribe_typed::() + .await + .expect("Subscribing should not fail"); communication_provider.push_incoming(unrelated.clone()); communication_provider.push_incoming(unrelated.clone()); - communication_provider.push_incoming(typed_message.clone().try_into().unwrap()); + communication_provider.push_incoming( + typed_message + .clone() + .try_into() + .expect("Serialization should not fail"), + ); let received_message = subscription.receive(None).await.unwrap(); @@ -466,14 +541,73 @@ mod tests { let communication_provider = TestCommunicationBackend::new(); let session_map = InMemorySessionRepository::new(HashMap::new()); let client = IpcClient::new(crypto_provider, communication_provider.clone(), session_map); - let subscription = client.subscribe_typed::().await; + client.start().await; + let mut subscription = client + .subscribe_typed::() + .await + .expect("Subscribing should not fail"); communication_provider.push_incoming(non_deserializable_message.clone()); let result = subscription.receive(None).await; + assert!(matches!(result, Err(TypedReceiveError::Typing(_)))); + } + + #[tokio::test] + async fn ipc_client_stops_if_crypto_returns_send_error() { + let message = OutgoingMessage { + payload: vec![], + destination: Endpoint::BrowserBackground, + topic: None, + }; + let crypto_provider = TestCryptoProvider { + send_result: Some(Err("Crypto error".to_string())), + receive_result: None, + }; + let communication_provider = TestCommunicationBackend::new(); + let session_map = TestSessionRepository::new(HashMap::new()); + let client = IpcClient::new(crypto_provider, communication_provider, session_map); + client.start().await; + + let error = client.send(message).await.unwrap_err(); + let is_running = client.is_running().await; + + assert_eq!(error, "Crypto error".to_string()); + assert!(!is_running); + } + + #[tokio::test] + async fn ipc_client_stops_if_crypto_returns_receive_error() { + let crypto_provider = TestCryptoProvider { + send_result: None, + receive_result: Some(Err("Crypto error".to_string())), + }; + let communication_provider = TestCommunicationBackend::new(); + let session_map = TestSessionRepository::new(HashMap::new()); + let client = IpcClient::new(crypto_provider, communication_provider, session_map); + client.start().await; + + // Give the client some time to process the error + tokio::time::sleep(Duration::from_millis(100)).await; + let is_running = client.is_running().await; + + assert!(!is_running); + } + + #[tokio::test] + async fn ipc_client_is_running_if_no_errors_are_encountered() { + let crypto_provider = TestCryptoProvider { + send_result: None, + receive_result: None, + }; + let communication_provider = TestCommunicationBackend::new(); + let session_map = TestSessionRepository::new(HashMap::new()); + let client = IpcClient::new(crypto_provider, communication_provider, session_map); + client.start().await; + + // Give the client some time to process + tokio::time::sleep(Duration::from_millis(100)).await; + let is_running = client.is_running().await; - assert!(matches!( - result, - Err(TypedReceiveError::Typing(serde_json::Error { .. })) - )); + assert!(is_running); } } diff --git a/crates/bitwarden-ipc/src/lib.rs b/crates/bitwarden-ipc/src/lib.rs index d5173d645..b34f9a027 100644 --- a/crates/bitwarden-ipc/src/lib.rs +++ b/crates/bitwarden-ipc/src/lib.rs @@ -1,7 +1,7 @@ #![doc = include_str!("../README.md")] +mod constants; mod endpoint; -mod error; mod ipc_client; mod message; mod traits; @@ -10,4 +10,7 @@ mod traits; #[cfg(feature = "wasm")] pub mod wasm; -pub use ipc_client::IpcClient; +pub use ipc_client::{ + IpcClient, IpcClientSubscription, IpcClientTypedSubscription, ReceiveError, SubscribeError, + TypedReceiveError, +}; diff --git a/crates/bitwarden-ipc/src/traits/communication_backend.rs b/crates/bitwarden-ipc/src/traits/communication_backend.rs index 5bb108060..7f11da47e 100644 --- a/crates/bitwarden-ipc/src/traits/communication_backend.rs +++ b/crates/bitwarden-ipc/src/traits/communication_backend.rs @@ -1,14 +1,22 @@ +use std::fmt::Debug; + use crate::message::{IncomingMessage, OutgoingMessage}; /// This trait defines the interface that will be used to send and receive messages over IPC. /// It is up to the platform to implement this trait and any necessary thread synchronization and /// broadcasting. -pub trait CommunicationBackend { - type SendError; +pub trait CommunicationBackend: Send + Sync + 'static { + type SendError: Debug + Send + Sync + 'static; type Receiver: CommunicationBackendReceiver; /// Send a message to the destination specified in the message. This function may be called - /// from any thread at any time. The implementation will handle any necessary synchronization. + /// from any thread at any time. + /// + /// An error should only be returned for fatal and unrecoverable errors. + /// Returning an error will cause the IPC client to stop processing messages. + /// + /// The implementation of this trait needs to guarantee that: + /// - Multiple concurrent receivers and senders can coexist. fn send( &self, message: OutgoingMessage, @@ -20,7 +28,8 @@ pub trait CommunicationBackend { /// The implementation of this trait needs to guarantee that: /// - Multiple concurrent receivers may be created. /// - All concurrent receivers will receive the same messages. - fn subscribe(&self) -> impl std::future::Future; + /// - Multiple concurrent receivers and senders can coexist. + fn subscribe(&self) -> impl std::future::Future + Send + Sync; } /// This trait defines the interface for receiving messages from the communication backend. @@ -29,23 +38,25 @@ pub trait CommunicationBackend { /// - The receiver buffers messages from the creation of the receiver until the first call to /// receive(). /// - The receiver buffers messages between calls to receive(). -pub trait CommunicationBackendReceiver { - type ReceiveError; +pub trait CommunicationBackendReceiver: Send + Sync + 'static { + type ReceiveError: Debug + Send + Sync + 'static; /// Receive a message. This function will block asynchronously until a message is received. /// + /// An error should only be returned for fatal and unrecoverable errors. + /// Returning an error will cause the IPC client to stop processing messages. + /// /// Do not call this function from multiple threads at the same time. Use the subscribe function /// to create one receiver per thread. fn receive( &self, - ) -> impl std::future::Future>; + ) -> impl std::future::Future> + Send + Sync; } #[cfg(test)] pub mod tests { use std::sync::Arc; - use thiserror::Error; use tokio::sync::{ broadcast::{self, Receiver, Sender}, RwLock, @@ -103,12 +114,6 @@ pub mod tests { } } - #[derive(Debug, Clone, Error, PartialEq)] - pub enum TestCommunicationBackendReceiveError { - #[error("Could not receive mock message since no messages were queued")] - NoQueuedMessages, - } - impl CommunicationBackend for TestCommunicationBackend { type SendError = (); type Receiver = TestCommunicationBackendReceiver; @@ -124,16 +129,13 @@ pub mod tests { } impl CommunicationBackendReceiver for TestCommunicationBackendReceiver { - type ReceiveError = TestCommunicationBackendReceiveError; + type ReceiveError = (); async fn receive(&self) -> Result { - let mut receiver = self.0.write().await; - - if receiver.is_empty() { - return Err(TestCommunicationBackendReceiveError::NoQueuedMessages); - } - - Ok(receiver + Ok(self + .0 + .write() + .await .recv() .await .expect("Failed to receive incoming message")) diff --git a/crates/bitwarden-ipc/src/traits/crypto_provider.rs b/crates/bitwarden-ipc/src/traits/crypto_provider.rs index 2ee9b91f6..60316789a 100644 --- a/crates/bitwarden-ipc/src/traits/crypto_provider.rs +++ b/crates/bitwarden-ipc/src/traits/crypto_provider.rs @@ -1,39 +1,51 @@ +use std::fmt::Debug; + use super::{CommunicationBackend, CommunicationBackendReceiver, SessionRepository}; -use crate::{ - error::{ReceiveError, SendError}, - message::{IncomingMessage, OutgoingMessage}, -}; +use crate::message::{IncomingMessage, OutgoingMessage}; -pub trait CryptoProvider +pub trait CryptoProvider: Send + Sync + 'static where Com: CommunicationBackend, - Ses: SessionRepository, + Ses: SessionRepository, { - type Session; - type SendError; - type ReceiveError; + type Session: Send + Sync + 'static; + type SendError: Debug + Send + Sync + 'static; + type ReceiveError: Debug + Send + Sync + 'static; + /// Send a message. + /// + /// Calling this function may result in multiple messages being sent, depending on the + /// implementation of the trait. For example, if the destination does not have a + /// session, the function may first send a message to establish a session and then send the + /// original message. The implementation of this function should handle this logic. + /// + /// An error should only be returned for fatal and unrecoverable errors e.g. if the session + /// storage is full or cannot be accessed. Returning an error will cause the IPC client to + /// stop processing messages. fn send( &self, communication: &Com, sessions: &Ses, message: OutgoingMessage, - ) -> impl std::future::Future>>; + ) -> impl std::future::Future>; + /// Receive a message. + /// + /// Calling this function may also result in messages being sent, depending on the trait + /// implementation. For example, if an encrypted message is received from a destination that + /// does not have a session. The function may then try to establish a session and then + /// re-request the original message. The implementation of this function should handle this + /// logic. + /// + /// An error should only be returned for fatal and unrecoverable errors e.g. if the session + /// storage is full or cannot be accessed. Returning an error will cause the IPC client to + /// stop processing messages. fn receive( &self, receiver: &Com::Receiver, communication: &Com, sessions: &Ses, - ) -> impl std::future::Future< - Output = Result< - IncomingMessage, - ReceiveError< - Self::ReceiveError, - ::ReceiveError, - >, - >, - >; + ) -> impl std::future::Future> + Send + Sync; } pub struct NoEncryptionCryptoProvider; @@ -41,7 +53,7 @@ pub struct NoEncryptionCryptoProvider; impl CryptoProvider for NoEncryptionCryptoProvider where Com: CommunicationBackend, - Ses: SessionRepository, + Ses: SessionRepository<()>, { type Session = (); type SendError = Com::SendError; @@ -52,11 +64,8 @@ where communication: &Com, _sessions: &Ses, message: OutgoingMessage, - ) -> Result<(), SendError> { - communication - .send(message) - .await - .map_err(SendError::Communication) + ) -> Result<(), Self::SendError> { + communication.send(message).await } async fn receive( @@ -64,16 +73,7 @@ where receiver: &Com::Receiver, _communication: &Com, _sessions: &Ses, - ) -> Result< - IncomingMessage, - ReceiveError< - Self::ReceiveError, - ::ReceiveError, - >, - > { - receiver - .receive() - .await - .map_err(ReceiveError::Communication) + ) -> Result { + receiver.receive().await } } diff --git a/crates/bitwarden-ipc/src/traits/session_repository.rs b/crates/bitwarden-ipc/src/traits/session_repository.rs index 685d84416..222a6bf7f 100644 --- a/crates/bitwarden-ipc/src/traits/session_repository.rs +++ b/crates/bitwarden-ipc/src/traits/session_repository.rs @@ -1,23 +1,22 @@ -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; use tokio::sync::RwLock; use crate::endpoint::Endpoint; -pub trait SessionRepository { - type Session; - type GetError; - type SaveError; - type RemoveError; +pub trait SessionRepository: Send + Sync + 'static { + type GetError: Debug + Send + Sync + 'static; + type SaveError: Debug + Send + Sync + 'static; + type RemoveError: Debug + Send + Sync + 'static; fn get( &self, destination: Endpoint, - ) -> impl std::future::Future, Self::GetError>>; + ) -> impl std::future::Future, Self::GetError>>; fn save( &self, destination: Endpoint, - session: Self::Session, + session: Session, ) -> impl std::future::Future>; fn remove( &self, @@ -26,20 +25,19 @@ pub trait SessionRepository { } pub type InMemorySessionRepository = RwLock>; -impl SessionRepository for InMemorySessionRepository +impl SessionRepository for InMemorySessionRepository where - Session: Clone, + Session: Clone + Send + Sync + 'static, { - type Session = Session; type GetError = (); type SaveError = (); type RemoveError = (); - async fn get(&self, destination: Endpoint) -> Result, ()> { + async fn get(&self, destination: Endpoint) -> Result, ()> { Ok(self.read().await.get(&destination).cloned()) } - async fn save(&self, destination: Endpoint, session: Self::Session) -> Result<(), ()> { + async fn save(&self, destination: Endpoint, session: Session) -> Result<(), ()> { self.write().await.insert(destination, session); Ok(()) } diff --git a/crates/bitwarden-ipc/src/wasm/communication_backend.rs b/crates/bitwarden-ipc/src/wasm/communication_backend.rs index aaa3047f5..90722bfcd 100644 --- a/crates/bitwarden-ipc/src/wasm/communication_backend.rs +++ b/crates/bitwarden-ipc/src/wasm/communication_backend.rs @@ -1,4 +1,7 @@ +use std::sync::Arc; + use bitwarden_error::bitwarden_error; +use bitwarden_threading::ThreadBoundRunner; use thiserror::Error; use tokio::sync::RwLock; use wasm_bindgen::prelude::*; @@ -29,45 +32,55 @@ export interface IpcCommunicationBackendSender { #[wasm_bindgen] extern "C" { - #[allow(missing_docs)] + /// JavaScript interface for handling outgoing messages from the IPC framework. #[wasm_bindgen(js_name = IpcCommunicationBackendSender, typescript_type = "IpcCommunicationBackendSender")] pub type JsCommunicationBackendSender; - #[allow(missing_docs)] + /// Used by the IPC framework to send an outgoing message. #[wasm_bindgen(catch, method, structural)] pub async fn send( this: &JsCommunicationBackendSender, message: OutgoingMessage, ) -> Result<(), JsValue>; - #[allow(missing_docs)] + /// Used by JavaScript to provide an incoming message to the IPC framework. #[wasm_bindgen(catch, method, structural)] pub async fn receive(this: &JsCommunicationBackendSender) -> Result; } -#[allow(missing_docs)] +/// JavaScript implementation of the `CommunicationBackend` trait for IPC communication. #[wasm_bindgen(js_name = IpcCommunicationBackend)] pub struct JsCommunicationBackend { - sender: JsCommunicationBackendSender, + sender: Arc>, receive_rx: tokio::sync::broadcast::Receiver, receive_tx: tokio::sync::broadcast::Sender, } +impl Clone for JsCommunicationBackend { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + receive_rx: self.receive_rx.resubscribe(), + receive_tx: self.receive_tx.clone(), + } + } +} + #[wasm_bindgen(js_class = IpcCommunicationBackend)] impl JsCommunicationBackend { - #[allow(missing_docs)] + /// Creates a new instance of the JavaScript communication backend. #[wasm_bindgen(constructor)] pub fn new(sender: JsCommunicationBackendSender) -> Self { let (receive_tx, receive_rx) = tokio::sync::broadcast::channel(20); Self { - sender, + sender: Arc::new(ThreadBoundRunner::new(sender)), receive_rx, receive_tx, } } - /// JavaScript function to provide a received message to the backend/IPC framework. - pub fn deliver_message(&self, message: IncomingMessage) -> Result<(), JsValue> { + /// Used by JavaScript to provide an incoming message to the IPC framework. + pub fn receive(&self, message: IncomingMessage) -> Result<(), JsValue> { self.receive_tx .send(message) .map_err(|e| ChannelError(e.to_string()))?; @@ -76,11 +89,18 @@ impl JsCommunicationBackend { } impl CommunicationBackend for JsCommunicationBackend { - type SendError = JsValue; + type SendError = String; type Receiver = RwLock>; async fn send(&self, message: OutgoingMessage) -> Result<(), Self::SendError> { - self.sender.send(message).await + let result = self + .sender + .run_in_thread(|sender| async move { + sender.send(message).await.map_err(|e| format!("{:?}", e)) + }) + .await; + + result.map_err(|e| e.to_string())? } async fn subscribe(&self) -> Self::Receiver { @@ -89,14 +109,9 @@ impl CommunicationBackend for JsCommunicationBackend { } impl CommunicationBackendReceiver for RwLock> { - type ReceiveError = JsValue; + type ReceiveError = String; async fn receive(&self) -> Result { - Ok(self - .write() - .await - .recv() - .await - .map_err(|e| ChannelError(e.to_string()))?) + self.write().await.recv().await.map_err(|e| e.to_string()) } } diff --git a/crates/bitwarden-ipc/src/wasm/error.rs b/crates/bitwarden-ipc/src/wasm/error.rs deleted file mode 100644 index a53ce205f..000000000 --- a/crates/bitwarden-ipc/src/wasm/error.rs +++ /dev/null @@ -1,62 +0,0 @@ -use wasm_bindgen::prelude::*; - -use crate::error::{ReceiveError, SendError}; - -// We're not using bitwarden_error here because we want to return the raw JsValue error -// (bitwarden_error would try to serialize it with tsify and serde) - -#[allow(missing_docs)] -#[wasm_bindgen(js_name = SendError)] -pub struct JsSendError { - #[wasm_bindgen(getter_with_clone)] - pub crypto: JsValue, - #[wasm_bindgen(getter_with_clone)] - pub communication: JsValue, -} - -#[allow(missing_docs)] -#[wasm_bindgen(js_name = ReceiveError)] -pub struct JsReceiveError { - pub timeout: bool, - #[wasm_bindgen(getter_with_clone)] - pub crypto: JsValue, - #[wasm_bindgen(getter_with_clone)] - pub communication: JsValue, -} - -impl From> for JsSendError { - fn from(error: SendError) -> Self { - match error { - SendError::Crypto(e) => JsSendError { - crypto: e, - communication: JsValue::UNDEFINED, - }, - SendError::Communication(e) => JsSendError { - crypto: JsValue::UNDEFINED, - communication: e, - }, - } - } -} - -impl From> for JsReceiveError { - fn from(error: ReceiveError) -> Self { - match error { - ReceiveError::Timeout => JsReceiveError { - timeout: true, - crypto: JsValue::UNDEFINED, - communication: JsValue::UNDEFINED, - }, - ReceiveError::Crypto(e) => JsReceiveError { - timeout: false, - crypto: e, - communication: JsValue::UNDEFINED, - }, - ReceiveError::Communication(e) => JsReceiveError { - timeout: false, - crypto: JsValue::UNDEFINED, - communication: e, - }, - } - } -} diff --git a/crates/bitwarden-ipc/src/wasm/ipc_client.rs b/crates/bitwarden-ipc/src/wasm/ipc_client.rs index 358a4c789..01e6c69c6 100644 --- a/crates/bitwarden-ipc/src/wasm/ipc_client.rs +++ b/crates/bitwarden-ipc/src/wasm/ipc_client.rs @@ -1,19 +1,18 @@ use std::{collections::HashMap, sync::Arc}; +use bitwarden_threading::cancellation_token::wasm::{AbortSignal, AbortSignalExt}; use wasm_bindgen::prelude::*; -use super::{ - communication_backend::JsCommunicationBackend, - error::{JsReceiveError, JsSendError}, -}; +use super::communication_backend::JsCommunicationBackend; use crate::{ - ipc_client::IpcClientSubscription, + ipc_client::{IpcClientSubscription, ReceiveError, SubscribeError}, message::{IncomingMessage, OutgoingMessage}, traits::{InMemorySessionRepository, NoEncryptionCryptoProvider}, IpcClient, }; -#[allow(missing_docs)] +/// JavaScript wrapper around the IPC client. For more information, see the +/// [IpcClient] documentation. #[wasm_bindgen(js_name = IpcClient)] pub struct JsIpcClient { // TODO: Change session provider to a JS-implemented one @@ -26,21 +25,22 @@ pub struct JsIpcClient { >, } -#[allow(missing_docs)] +/// JavaScript wrapper around the IPC client subscription. For more information, see the +/// [IpcClientSubscription](crate::IpcClientSubscription) documentation. #[wasm_bindgen(js_name = IpcClientSubscription)] pub struct JsIpcClientSubscription { - subscription: IpcClientSubscription< - NoEncryptionCryptoProvider, - JsCommunicationBackend, - InMemorySessionRepository<()>, - >, + subscription: IpcClientSubscription, } -#[allow(missing_docs)] #[wasm_bindgen(js_class = IpcClientSubscription)] impl JsIpcClientSubscription { - pub async fn receive(&self) -> Result { - self.subscription.receive(None).await.map_err(|e| e.into()) + #[allow(missing_docs)] + pub async fn receive( + &mut self, + abort_signal: Option, + ) -> Result { + let cancellation_token = abort_signal.map(|signal| signal.to_cancellation_token()); + self.subscription.receive(cancellation_token).await } } @@ -48,24 +48,38 @@ impl JsIpcClientSubscription { impl JsIpcClient { #[allow(missing_docs)] #[wasm_bindgen(constructor)] - pub fn new(communication_provider: JsCommunicationBackend) -> JsIpcClient { + pub fn new(communication_provider: &JsCommunicationBackend) -> JsIpcClient { JsIpcClient { client: IpcClient::new( NoEncryptionCryptoProvider, - communication_provider, + communication_provider.clone(), InMemorySessionRepository::new(HashMap::new()), ), } } #[allow(missing_docs)] - pub async fn send(&self, message: OutgoingMessage) -> Result<(), JsSendError> { - self.client.send(message).await.map_err(|e| e.into()) + pub async fn start(&self) { + self.client.start().await + } + + #[wasm_bindgen(js_name = isRunning)] + #[allow(missing_docs)] + pub async fn is_running(&self) -> bool { + self.client.is_running().await + } + + #[allow(missing_docs)] + pub async fn send(&self, message: OutgoingMessage) -> Result<(), JsError> { + self.client + .send(message) + .await + .map_err(|e| JsError::new(&e)) } #[allow(missing_docs)] - pub async fn subscribe(&self) -> JsIpcClientSubscription { - let subscription = self.client.subscribe(None).await; - JsIpcClientSubscription { subscription } + pub async fn subscribe(&self) -> Result { + let subscription = self.client.subscribe(None).await?; + Ok(JsIpcClientSubscription { subscription }) } } diff --git a/crates/bitwarden-ipc/src/wasm/mod.rs b/crates/bitwarden-ipc/src/wasm/mod.rs index 6d423ddf5..275156442 100644 --- a/crates/bitwarden-ipc/src/wasm/mod.rs +++ b/crates/bitwarden-ipc/src/wasm/mod.rs @@ -1,9 +1,7 @@ mod communication_backend; -mod error; mod ipc_client; mod message; // Re-export types to make sure wasm_bindgen picks them up pub use communication_backend::*; -pub use error::*; pub use ipc_client::*; diff --git a/crates/bitwarden-threading/Cargo.toml b/crates/bitwarden-threading/Cargo.toml index a0a29b008..36e39458e 100644 --- a/crates/bitwarden-threading/Cargo.toml +++ b/crates/bitwarden-threading/Cargo.toml @@ -9,25 +9,36 @@ repository.workspace = true license-file.workspace = true keywords.workspace = true +[package.metadata.cargo-udeps.ignore] +development = ["tokio-test"] # only used in doc-tests + +[features] +wasm = [ + "dep:wasm-bindgen", + "dep:wasm-bindgen-futures", + "dep:js-sys", + "dep:gloo-timers" +] + [dependencies] bitwarden-error = { workspace = true } +js-sys = { workspace = true, optional = true } log = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } tokio = { features = ["sync", "time", "rt"], workspace = true } tokio-util = { version = "0.7.15" } +wasm-bindgen = { workspace = true, optional = true } [target.'cfg(target_arch="wasm32")'.dependencies] -gloo-timers = { version = "0.3.0", features = ["futures"] } -js-sys = { workspace = true } -tsify-next = { workspace = true } -wasm-bindgen = { workspace = true } -wasm-bindgen-futures = { workspace = true } +gloo-timers = { version = "0.3.0", features = ["futures"], optional = true } +wasm-bindgen-futures = { workspace = true, optional = true } [dev-dependencies] async-trait = "0.1.88" console_error_panic_hook = "0.1.7" js-sys = { workspace = true } +tokio-test = "0.4.4" tsify-next = { workspace = true } wasm-bindgen = { workspace = true } wasm-bindgen-futures = { workspace = true } diff --git a/crates/bitwarden-threading/src/cancellation_token.rs b/crates/bitwarden-threading/src/cancellation_token.rs index bdb414488..7de7bf78f 100644 --- a/crates/bitwarden-threading/src/cancellation_token.rs +++ b/crates/bitwarden-threading/src/cancellation_token.rs @@ -1,6 +1,6 @@ pub use tokio_util::sync::CancellationToken; -#[cfg(target_arch = "wasm32")] +#[cfg(feature = "wasm")] pub mod wasm { use wasm_bindgen::prelude::*; diff --git a/crates/bitwarden-threading/src/lib.rs b/crates/bitwarden-threading/src/lib.rs index a5947f939..b86b47214 100644 --- a/crates/bitwarden-threading/src/lib.rs +++ b/crates/bitwarden-threading/src/lib.rs @@ -1,5 +1,10 @@ #![doc = include_str!("../README.md")] +#[cfg(all(target_arch = "wasm32", not(feature = "wasm")))] +compile_error!( + "The `wasm` feature must be enabled to use the `bitwarden-ipc` crate in a WebAssembly environment." +); + #[allow(missing_docs)] pub mod cancellation_token; mod thread_bound_runner; diff --git a/crates/bitwarden-threading/src/thread_bound_runner.rs b/crates/bitwarden-threading/src/thread_bound_runner.rs index 8b23a6148..ff3154f89 100644 --- a/crates/bitwarden-threading/src/thread_bound_runner.rs +++ b/crates/bitwarden-threading/src/thread_bound_runner.rs @@ -7,7 +7,7 @@ use bitwarden_error::bitwarden_error; use thiserror::Error; #[cfg(not(target_arch = "wasm32"))] use tokio::task::spawn_local; -#[cfg(target_arch = "wasm32")] +#[cfg(all(target_arch = "wasm32", feature = "wasm"))] use wasm_bindgen_futures::spawn_local; type CallFunction = @@ -25,22 +25,56 @@ struct CallRequest { #[bitwarden_error(basic)] pub struct CallError(String); -/// A runner that takes a non-`Send`, non-`Sync` state and makes it `Send + Sync` compatible. +/// A runner that takes a non-`Send` state and makes it `Send` compatible. /// -/// `ThreadBoundRunner` is designed to safely encapsulate a `!Send + !Sync` state object by -/// pinning it to a single thread using `spawn_local`. It provides a `Send + Sync` API that +/// `ThreadBoundRunner` is designed to safely encapsulate a `!Send` state object by +/// pinning it to a single thread using `spawn_local`. It provides a `Send` API that /// allows other threads to submit tasks (function pointers or closures) that operate on the /// thread-bound state. /// /// Tasks are queued via an internal channel and are executed sequentially on the owning thread. /// /// # Example -/// ```ignore -/// let runner = ThreadBoundRunner::new(my_state); +/// ``` +/// # tokio_test::block_on(tokio::task::LocalSet::new().run_until(async { +/// use bitwarden_threading::ThreadBoundRunner; +/// +/// struct State; +/// +/// impl State { +/// pub async fn do_something(&self, some_input: i32) -> i32 { +/// return some_input; +/// } +/// } +/// +/// let runner = ThreadBoundRunner::new(State); +/// let input = 42; +/// +/// let output = runner.run_in_thread(move |state| async move { +/// return state.do_something(input).await; +/// }).await; +/// +/// assert_eq!(output.unwrap(), 42); +/// # })); +/// ``` +/// +/// If you need mutable access to the state, you can wrap the `ThreadState` in a `Mutex` or +/// `RwLock` and use the `run_in_thread` method to lock it before accessing it. +/// +/// # Example +/// ``` +/// # tokio_test::block_on(tokio::task::LocalSet::new().run_until(async { +/// use bitwarden_threading::ThreadBoundRunner; +/// use tokio::sync::Mutex; +/// +/// struct State(i32); +/// +/// let runner = ThreadBoundRunner::new(Mutex::new(State(0))); /// /// runner.run_in_thread(|state| async move { -/// // do something with `state` -/// }); +/// state.lock().await.0 += 1; +/// }).await; +/// # })); /// ``` /// /// This pattern is useful for interacting with APIs or data structures that must remain @@ -73,7 +107,7 @@ where /// Submit a task to be executed on the thread-bound state. /// /// The provided function is executed on the thread that owns the internal `ThreadState`, - /// ensuring safe access to `!Send + !Sync` data. Tasks are dispatched in the order they are + /// ensuring safe access to `!Send` data. Tasks are dispatched in the order they are /// received, but because they are asynchronous, multiple tasks may be in-flight and running /// concurrently if their futures yield. ///