-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: Add async streaming support in HuggingFaceLocalChatGenerator
#9405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add async streaming support in HuggingFaceLocalChatGenerator
#9405
Conversation
HuggingFaceLocalChatGenerator
Pull Request Test Coverage Report for Build 15217439421Warning: This coverage report may be inaccurate.This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.
Details
💛 - Coveralls |
@@ -566,7 +568,7 @@ async def _run_streaming_async( # pylint: disable=too-many-positional-arguments | |||
) | |||
|
|||
# Set up streaming handler | |||
generation_kwargs["streamer"] = HFTokenStreamingHandler(tokenizer, streaming_callback, stop_words) | |||
generation_kwargs["streamer"] = AsyncHFTokenStreamingHandler(tokenizer, streaming_callback, stop_words) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make mypy happy could we add an assert here asserting that streaming_callback
is of type AsyncStreamingCallbackT
?
or update AsyncHFTokenStreamingHandler
such that the type hint for stream_handler
is StreamingCallbackT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a comment below!
self.token_handler = stream_handler | ||
self.stop_words = stop_words or [] | ||
|
||
def on_finalized_text(self, word: str, stream_end: bool = False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method returns a coroutine, but if I look at this, it seems that TextStreamer will call it synchronously (and so coroutine will be not awaited).
In the test below I see you're working around this:
loop.call_soon_threadsafe(
lambda: asyncio.create_task(streamer.on_finalized_text("Hello", stream_end=False))
)
but TextStreamer
will call it like:
streamer.on_finalized_text("Hello", stream_end=False)
I might be wrong, but have you tried testing it manually? You might need to do something like:
def on_finalized_text(self, word: str, stream_end: bool = False):
word_to_send = word + "\n" if stream_end else word
if word_to_send.strip() not in self.stop_words:
# Schedule the async callback without returning the coroutine
asyncio.create_task(self.token_handler(StreamingChunk(content=word_to_send)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpangrazzi thank you for noticing and pointing out, had really missed this crucial implementation step!
have made the required change as well!
Related Issues
HuggingFaceLocalChatGenerator
add an async version ofHFTokenStreamHandler
and update type signature for async streaming callback #9391Proposed Changes:
How did you test it?
Notes for the reviewer
Checklist
fix:
,feat:
,build:
,chore:
,ci:
,docs:
,style:
,refactor:
,perf:
,test:
and added!
in case the PR includes breaking changes.