-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
I'm working with the OpenAI Assistants API in streaming mode. The assistant streams responses correctly up until a requires_action (THREAD_RUN_REQUIRES_ACTION("thread.run.requires_action")) event is encountered. At that point, I submit the tool outputs (submitToolOutputsToRunStream) for the function call, and the submission succeeds.
However, after submitting the function outputs, the stream does not continue. Although inspecting the assistant’s messages afterward confirms that it generates a response post-function call, the stream itself stops and no further data is emitted.
Upon reviewing the implementation, I noticed that the issue stems from this line in our submitToolOutputsToRunStream method:
.setStream(false); // <-- This disables streaming
Here’s the problematic snippet:
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<StreamUpdate> submitToolOutputsToRunStream(String threadId, String runId,
List<ToolOutput> toolOutputs) {
RequestOptions requestOptions = new RequestOptions();
SubmitToolOutputsToRunRequest requestObj = new SubmitToolOutputsToRunRequest(toolOutputs)
.setStream(false); // <-- Streaming disabled here
Flux<ByteBuffer> responseStream =
submitToolOutputsToRunWithResponse(threadId, runId, BinaryData.fromObject(requestObj), requestOptions)
.flatMapMany(response -> response.getValue().toFluxByteBuffer());
OpenAIServerSentEvents openAIServerSentEvents = new OpenAIServerSentEvents(responseStream);
return openAIServerSentEvents.getEvents();
}
Issue:
Since setStream(false) is explicitly used, the assistant does not continue streaming its response after the function call result is submitted.
Question:
How can we correctly re-enable streaming after submitting tool outputs? Is there an official way or a workaround to ensure the assistant continues to stream its reply post-submission?
We tested the following code for end-to-end streaming with automatic tool output submission using Kotlin and WebFlux:
fun createRunStream(client: AssistantsAsyncClient, threadId: String, runOptions: CreateRunOptions): Flux<StreamUpdate> {
return client.createRunStream(threadId, runOptions)
.expand { update ->
when (update) {
is StreamRequiredAction -> {
val runId = update.message.id
val tId = update.message.threadId
val requiredAction = update.message.requiredAction as? SubmitToolOutputsAction
val toolCalls = requiredAction?.submitToolOutputs?.toolCalls.orEmpty()
val outputs = toolCalls.map { call ->
val date = Date().toString()
ToolOutput().apply {
toolCallId = call.id
output = """{"date": "Today Date $date"}"""
}
}
println("Submitting tool outputs for runId=$runId, threadId=$tId")
// Corrected submission to enable streaming
submitToolOutputsToRunStream(client, tId, runId, outputs)
}
else -> Flux.just(update)
}
}
.doOnNext { update ->
val msg = when (update) {
is StreamMessageCreation -> update.message.content.joinToString("") { (it as MessageTextContent).text.value }
is StreamMessageUpdate -> update.message.delta.content.joinToString("") { (it as MessageDeltaTextContentObject).text.value }
else -> null
}
println("Received Update: kind=${update.kind} -> ${update.javaClass.simpleName}, msg=$msg")
}
.doOnError { err -> println("Error during streaming: ${err.message}") }
.doOnComplete { println("Stream complete.") }
}
✅ Workaround
The issue is resolved by ensuring setStream(true) is explicitly set when submitting tool outputs:
fun submitToolOutputsToRunStream(client: AssistantsAsyncClient, threadId: String, runId: String, toolOutputs: List<ToolOutput>): Flux<StreamUpdate> {
val requestOptions = RequestOptions()
val requestObj = SubmitToolOutputsToRunRequest(toolOutputs).setStream(true)
val responseStream: Flux<ByteBuffer> =
client.submitToolOutputsToRunWithResponse(threadId, runId, BinaryData.fromObject(requestObj), requestOptions)
.flatMapMany { response -> response.value.toFluxByteBuffer() }
return OpenAIServerSentEvents(responseStream).events
}
If anyone has official documentation or insights on whether setStream(true) is the recommended way or if there’s a more robust approach, feedback is welcome.