@@ -108,16 +108,15 @@ where
108
108
109
109
pub async fn start ( self : & Arc < Self > ) {
110
110
let cancellation_token = CancellationToken :: new ( ) ;
111
- let mut self_cancellation_token = self . cancellation_token . write ( ) . await ;
112
-
113
- * self_cancellation_token = Some ( cancellation_token. clone ( ) ) ;
111
+ self . cancellation_token
112
+ . write ( )
113
+ . await
114
+ . replace ( cancellation_token. clone ( ) ) ;
114
115
115
116
let com_receiver = self . communication . subscribe ( ) . await ;
116
117
let ( client_tx, client_rx) = tokio:: sync:: broadcast:: channel ( CHANNEL_BUFFER_CAPACITY ) ;
117
118
118
- let mut client_incoming = self . incoming . write ( ) . await ;
119
- * client_incoming = Some ( client_rx) ;
120
- let _ = client_incoming;
119
+ self . incoming . write ( ) . await . replace ( client_rx) ;
121
120
122
121
let client = self . clone ( ) ;
123
122
let future = async move {
@@ -155,9 +154,9 @@ where
155
154
}
156
155
157
156
pub async fn is_running ( self : & Arc < Self > ) -> bool {
158
- let incoming = self . incoming . read ( ) . await ;
159
- let cancellation_token = self . cancellation_token . read ( ) . await ;
160
- incoming . is_some ( ) && cancellation_token . is_some ( )
157
+ let has_incoming = self . incoming . read ( ) . await . is_some ( ) ;
158
+ let has_cancellation_token = self . cancellation_token . read ( ) . await . is_some ( ) ;
159
+ has_incoming && has_cancellation_token
161
160
}
162
161
163
162
pub async fn stop ( self : & Arc < Self > ) {
@@ -225,22 +224,20 @@ impl IpcClientSubscription {
225
224
& mut self ,
226
225
cancellation_token : Option < CancellationToken > ,
227
226
) -> Result < IncomingMessage , ReceiveError > {
228
- let receive_loop = async {
229
- loop {
230
- let received = self . receiver . recv ( ) . await ?;
231
- if self . topic . is_none ( ) || received. topic == self . topic {
232
- return Ok :: < IncomingMessage , ReceiveError > ( received) ;
233
- }
234
- }
235
- } ;
236
-
237
227
let cancellation_token = cancellation_token. unwrap_or_default ( ) ;
238
228
239
- select ! {
240
- _ = cancellation_token. cancelled( ) => {
241
- Err ( ReceiveError :: Cancelled )
229
+ loop {
230
+ select ! {
231
+ _ = cancellation_token. cancelled( ) => {
232
+ return Err ( ReceiveError :: Cancelled )
233
+ }
234
+ result = self . receiver. recv( ) => {
235
+ let received = result?;
236
+ if self . topic. is_none( ) || received. topic == self . topic {
237
+ return Ok :: <IncomingMessage , ReceiveError >( received) ;
238
+ }
239
+ }
242
240
}
243
- result = receive_loop => result,
244
241
}
245
242
}
246
243
}
@@ -488,7 +485,6 @@ mod tests {
488
485
}
489
486
490
487
#[ tokio:: test]
491
- // async fn skips_message_if_it_was_not_deserializable() {
492
488
async fn returns_error_if_related_message_was_not_deserializable ( ) {
493
489
#[ derive( Debug , Clone , PartialEq , Deserialize , Serialize ) ]
494
490
struct TestPayload {
0 commit comments