Skip to content

Commit 5c1454d

Browse files
authored
Handle "Unexpected response type for remote procedure call: Close" on query stream opening (#759)
## Usage and product changes Fix a rare `InternalError` returned by mistake when a client sends a query request while the transaction is being closed. Now, an expected "The transaction is closed and no further operation is allowed." error is returned instead. Additionally, wait for specific transaction responses in `rollback`, `commit`, and `query` to solidify the protocol and ensure that the server acts as expected. ## Implementation We [faced](https://discord.com/channels/665254494820368395/1301180015173435454/1379414371402256454) this error when running a huge number of queries against a server that eventually went to sleep. Presumably, this state led to the GRPC stream interruption, and it "gracefully" stopped without errors. However, the driver's code did not expect that it could happen right after the stream opening request, while it should be considered a normal situation. The source code where `Close` responses are generated: ``` async fn listen_loop( mut grpc_source: Streaming<transaction::Server>, collector: ResponseCollector, shutdown_sink: UnboundedSender<()>, ) { loop { match grpc_source.next().await { Some(Ok(message)) => collector.collect(message).await, Some(Err(status)) => break collector.close_with_error(status.into()).await, None => break collector.close().await, // <---- here, the stream has ended } } shutdown_sink.send(()).ok(); } async fn close(self) { self.is_open.store(false); let mut listeners = std::mem::take(&mut *self.callbacks.write().unwrap()); for (_, listener) in listeners.drain() { listener.finish(Ok(TransactionResponse::Close)); // <--- here, the only `Close` response is sent } // .... } ``` Additionally, we explicitly wait for *request's response type* or *stream closed marker* with their correct processing for each other transaction request in `transaction_stream`. For this, the commit behavior of the server has been made more explicit: typedb/typedb#7484
1 parent b9b8467 commit 5c1454d

File tree

3 files changed

+32
-15
lines changed

3 files changed

+32
-15
lines changed

dependencies/typedb/artifacts.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def typedb_artifact():
2525
artifact_name = "typedb-all-{platform}-{version}.{ext}",
2626
tag_source = deployment["artifact"]["release"]["download"],
2727
commit_source = deployment["artifact"]["snapshot"]["download"],
28-
commit = "16eca5f4238a772532a6824966e2fc78437467dc"
28+
commit = "d83f3b2b40c673c30d00b377ce327ac0ff233056"
2929
)
3030

3131
#def typedb_cloud_artifact():

rust/src/connection/transaction_stream.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,30 @@ use crate::{
3535
promisify, resolve, Error, QueryOptions, TransactionOptions, TransactionType,
3636
};
3737

38+
macro_rules! require_transaction_response {
39+
($response:expr, $variant:ident(_)) => {
40+
match $response {
41+
Ok(TransactionResponse::$variant(inner)) => Ok(inner),
42+
other => handle_unexpected_response(other),
43+
}
44+
};
45+
46+
($response:expr, $variant:ident) => {
47+
match $response {
48+
Ok(TransactionResponse::$variant) => Ok(()),
49+
other => handle_unexpected_response(other),
50+
}
51+
};
52+
}
53+
54+
fn handle_unexpected_response<T>(response: Result<TransactionResponse>) -> Result<T> {
55+
match response {
56+
Ok(TransactionResponse::Close) => Err(ConnectionError::TransactionIsClosed.into()),
57+
Ok(other) => Err(InternalError::UnexpectedResponseType { response_type: format!("{other:?}") }.into()),
58+
Err(err) => Err(err),
59+
}
60+
}
61+
3862
pub(crate) struct TransactionStream {
3963
type_: TransactionType,
4064
options: TransactionOptions,
@@ -73,18 +97,14 @@ impl TransactionStream {
7397
pub(crate) fn commit(self: Pin<Box<Self>>) -> impl Promise<'static, Result> {
7498
let promise = self.transaction_transmitter.single(TransactionRequest::Commit);
7599
promisify! {
76-
let _this = self; // move into the promise so the stream isn't dropped until the promise is resolved
77-
resolve!(promise).map(|_| {
78-
()
79-
}).map_err(|err| {
80-
err
81-
})
100+
let _this = self; // move into the promise so the stream isn't dropped until the promise is resolved
101+
require_transaction_response!(resolve!(promise), Commit)
82102
}
83103
}
84104

85105
pub(crate) fn rollback(&self) -> impl Promise<'_, Result> {
86106
let promise = self.single(TransactionRequest::Rollback);
87-
promisify! { resolve!(promise).map(|_| ()) }
107+
promisify! { require_transaction_response!(resolve!(promise), Rollback) }
88108
}
89109

90110
pub(crate) fn query(&self, query: &str, options: QueryOptions) -> impl Promise<'static, Result<QueryAnswer>> {
@@ -164,11 +184,9 @@ impl TransactionStream {
164184
}
165185

166186
fn query_stream(&self, req: QueryRequest) -> Result<impl Stream<Item = Result<QueryResponse>>> {
167-
Ok(self.stream(TransactionRequest::Query(req))?.map(|response| match response {
168-
Ok(TransactionResponse::Query(res)) => Ok(res),
169-
Ok(other) => Err(InternalError::UnexpectedResponseType { response_type: format!("{other:?}") }.into()),
170-
Err(err) => Err(err),
171-
}))
187+
Ok(self
188+
.stream(TransactionRequest::Query(req))?
189+
.map(|response| require_transaction_response!(response, Query(_))))
172190
}
173191
}
174192

tool/test/start-community-server.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ set -e
2121
rm -rf typedb-all
2222

2323
bazel run //tool/test:typedb-extractor -- typedb-all
24-
BAZEL_JAVA_HOME=$(bazel run //tool/test:echo-java-home)
25-
./typedb-all/typedb server --development-mode.enabled --server.authentication.token_ttl_seconds 15 &
24+
./typedb-all/typedb server --development-mode.enabled true --server.authentication.token_ttl_seconds 15 &
2625

2726
set +e
2827
POLL_INTERVAL_SECS=0.5

0 commit comments

Comments
 (0)