Skip to content

Commit 22778f4

Browse files
authored
fix(listen): prevent panic on websocket keep-alive after close_stream (#144)
Closes #143
1 parent b8c43fe commit 22778f4

File tree

5 files changed

+153
-2
lines changed

5 files changed

+153
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
/target
2+
.env
23
your_output_file.wav
34
.DS_Store

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ name = "microphone_stream"
9999
path = "examples/transcription/websocket/microphone_stream.rs"
100100
required-features = ["listen"]
101101

102+
[[example]]
103+
name = "16_keepalive_close_stream"
104+
path = "examples/transcription/websocket/16_keepalive_close_stream.rs"
105+
required-features = ["listen"]
106+
102107
[[example]]
103108
name = "simple_flux"
104109
path = "examples/transcription/flux/simple_flux.rs"
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/// Example: WebSocket Keep-Alive with Close Stream
2+
///
3+
/// Demonstrates that enabling keep_alive() and then calling close_stream()
4+
/// works correctly without panicking — even when the keep-alive timer fires
5+
/// after the channel has been closed.
6+
///
7+
/// This validates the fix for the race condition where close_stream() closes
8+
/// the internal channel, and a subsequent keep-alive send would previously
9+
/// panic on .expect().
10+
///
11+
/// Usage:
12+
/// DEEPGRAM_API_KEY=your-key cargo run --example 16_keepalive_close_stream
13+
use std::env;
14+
use std::time::Duration;
15+
16+
use deepgram::{
17+
common::options::{Encoding, Endpointing, Options},
18+
Deepgram, DeepgramError,
19+
};
20+
21+
#[tokio::main]
22+
async fn main() -> Result<(), DeepgramError> {
23+
let deepgram_api_key =
24+
env::var("DEEPGRAM_API_KEY").expect("DEEPGRAM_API_KEY environment variable");
25+
26+
let dg_client = Deepgram::new(&deepgram_api_key)?;
27+
28+
let options = Options::builder()
29+
.query_params([("mip_opt_out".to_string(), "true".to_string())])
30+
.build();
31+
32+
println!("Connecting to Deepgram with keep-alive enabled...");
33+
34+
let mut handle = dg_client
35+
.transcription()
36+
.stream_request_with_options(options)
37+
.encoding(Encoding::Linear16)
38+
.endpointing(Endpointing::Disabled)
39+
.keep_alive()
40+
.handle()
41+
.await?;
42+
43+
println!("Connected. Request ID: {}", handle.request_id());
44+
45+
// Brief pause — no audio is sent, so the worker only has the keep-alive
46+
// timer ticking. This simulates a connection that is idle before being
47+
// torn down.
48+
tokio::time::sleep(Duration::from_millis(100)).await;
49+
50+
println!("Closing stream...");
51+
handle.close_stream().await?;
52+
println!("Stream closed.");
53+
54+
// Wait longer than the 3s keep-alive interval so the keep-alive send
55+
// path runs after close. Before the fix this would panic; now the send
56+
// error is silently ignored and the worker exits cleanly.
57+
println!("Waiting for keep-alive timer to fire (>3s)...");
58+
tokio::time::sleep(Duration::from_secs(4)).await;
59+
60+
println!("No panic — keep-alive + close_stream works correctly.");
61+
62+
Ok(())
63+
}

src/listen/websocket.rs

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,12 @@ async fn run_worker(
481481
_ = sleep.fuse() => {
482482
// eprintln!("<worker> sleep");
483483
if keep_alive && is_open {
484-
message_tx.send(WsMessage::ControlMessage(ControlMessage::KeepAlive)).await.expect("we hold the receiver, so we know it hasn't been dropped");
484+
// Ignore send errors: the channel may have been closed by
485+
// close_stream() (via close_channel()) before the worker
486+
// processes the pending CloseStream message. In that case
487+
// the next iteration will handle CloseStream, stop sending new
488+
// messages, and proceed toward shutdown.
489+
let _ = message_tx.send(WsMessage::ControlMessage(ControlMessage::KeepAlive)).await;
485490
last_sent_message = tokio::time::Instant::now();
486491
} else {
487492
pending::<()>().await;
@@ -872,8 +877,10 @@ mod file_chunker {
872877

873878
#[cfg(test)]
874879
mod tests {
880+
use std::time::Duration;
881+
875882
use super::ControlMessage;
876-
use crate::common::options::Options;
883+
use crate::common::options::{Encoding, Endpointing, Options};
877884

878885
#[test]
879886
fn test_stream_url() {
@@ -910,4 +917,78 @@ mod tests {
910917
r#"{"type":"CloseStream"}"#
911918
);
912919
}
920+
921+
/// Reproduces the worker panic from issue #143: close_stream() calls
922+
/// close_channel(), so when the worker's keep-alive sleep fires it sends
923+
/// into a closed channel. Before the fix, .expect() would panic.
924+
#[tokio::test]
925+
#[ignore = "requires DEEPGRAM_API_KEY and network; run manually"]
926+
async fn keepalive_then_close_stream_panic_repro() {
927+
let Ok(api_key) = std::env::var("DEEPGRAM_API_KEY") else {
928+
eprintln!("skipping: DEEPGRAM_API_KEY not set");
929+
return;
930+
};
931+
932+
let dg = crate::Deepgram::new(&api_key).expect("Deepgram::new");
933+
let transcription = dg.transcription();
934+
935+
let options = Options::builder()
936+
.query_params([("mip_opt_out".to_string(), "true".to_string())])
937+
.build();
938+
let mut handle = transcription
939+
.stream_request_with_options(options)
940+
.encoding(Encoding::Linear16)
941+
.endpointing(Endpointing::Disabled)
942+
.keep_alive()
943+
.handle()
944+
.await
945+
.expect("handle");
946+
947+
// No audio sent: worker only has the keep-alive timer (3s interval).
948+
// Close the channel before the keep-alive fires so the
949+
// send(KeepAlive) hits a closed channel.
950+
tokio::time::sleep(Duration::from_millis(100)).await;
951+
let _ = handle.close_stream().await;
952+
953+
// Wait longer than the 3s keep-alive interval so the keep-alive
954+
// send path runs after close. Before the fix this would panic.
955+
tokio::time::sleep(Duration::from_secs(4)).await;
956+
}
957+
958+
/// Runs the close_stream race in a loop to increase the probability of
959+
/// hitting the timing window (scheduling variance).
960+
#[tokio::test]
961+
#[ignore = "requires DEEPGRAM_API_KEY and network; run manually"]
962+
async fn keepalive_close_stream_panic_repro_loop() {
963+
let Ok(api_key) = std::env::var("DEEPGRAM_API_KEY") else {
964+
eprintln!("skipping: DEEPGRAM_API_KEY not set");
965+
return;
966+
};
967+
968+
const ITERATIONS: u32 = 3;
969+
970+
let options = Options::builder()
971+
.query_params([("mip_opt_out".to_string(), "true".to_string())])
972+
.build();
973+
let dg = crate::Deepgram::new(&api_key).expect("Deepgram::new");
974+
let transcription = dg.transcription();
975+
976+
for _iteration in 0..ITERATIONS {
977+
let mut handle = transcription
978+
.stream_request_with_options(options.clone())
979+
.encoding(Encoding::Linear16)
980+
.endpointing(Endpointing::Disabled)
981+
.keep_alive()
982+
.handle()
983+
.await
984+
.expect("handle");
985+
986+
tokio::time::sleep(Duration::from_millis(100)).await;
987+
let _ = handle.close_stream().await;
988+
989+
// Wait longer than the 3s keep-alive interval so the keep-alive
990+
// send path runs after close. Before the fix this would panic.
991+
tokio::time::sleep(Duration::from_secs(4)).await;
992+
}
993+
}
913994
}

tests/flux_unknown_messages.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod mock {
2525
tokio::spawn(async move {
2626
let (stream, _) = listener.accept().await.unwrap();
2727

28+
#[allow(clippy::result_large_err)]
2829
let callback =
2930
|_req: &tungstenite::handshake::server::Request,
3031
mut resp: tungstenite::handshake::server::Response| {

0 commit comments

Comments
 (0)