Skip to content

feat: Add async streaming support in HuggingFaceLocalChatGenerator #9405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

mdrazak2001
Copy link
Contributor

Related Issues

Proposed Changes:

  • added async version of hugging face local streaming handler.

How did you test it?

  • added a unit tes

Notes for the reviewer

Checklist

  • I have read the contributors guidelines and the code of conduct
  • I have updated the related issue with new insights and changes
  • I added unit tests and updated the docstrings
  • I've used one of the conventional commit types for my PR title: fix:, feat:, build:, chore:, ci:, docs:, style:, refactor:, perf:, test: and added ! in case the PR includes breaking changes.
  • I documented my code
  • I ran pre-commit hooks and fixed any issue

@mdrazak2001 mdrazak2001 requested review from a team as code owners May 18, 2025 16:59
@mdrazak2001 mdrazak2001 requested review from dfokina and vblagoje and removed request for a team May 18, 2025 16:59
@github-actions github-actions bot added topic:tests type:documentation Improvements on the docs labels May 18, 2025
@mdrazak2001 mdrazak2001 changed the title feat: Add async streaming support in hugging face generator feat: Add async streaming support in HuggingFaceLocalChatGenerator May 18, 2025
@coveralls
Copy link
Collaborator

coveralls commented May 18, 2025

Pull Request Test Coverage Report for Build 15217439421

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 324 unchanged lines in 29 files lost coverage.
  • Overall coverage decreased (-0.2%) to 90.258%

Files with Coverage Reduction New Missed Lines %
components/agents/init.py 1 75.0%
components/builders/answer_builder.py 1 98.53%
components/evaluators/sas_evaluator.py 1 62.3%
components/embedders/sentence_transformers_document_embedder.py 2 96.88%
components/embedders/sentence_transformers_text_embedder.py 2 96.43%
components/rankers/init.py 2 50.0%
tools/parameters_schema_utils.py 2 94.87%
utils/init.py 2 33.33%
components/converters/multi_file_converter.py 4 92.73%
components/generators/hugging_face_local.py 4 91.76%
Totals Coverage Status
Change from base Build 15067128089: -0.2%
Covered Lines: 11387
Relevant Lines: 12616

💛 - Coveralls

@@ -566,7 +568,7 @@ async def _run_streaming_async( # pylint: disable=too-many-positional-arguments
)

# Set up streaming handler
generation_kwargs["streamer"] = HFTokenStreamingHandler(tokenizer, streaming_callback, stop_words)
generation_kwargs["streamer"] = AsyncHFTokenStreamingHandler(tokenizer, streaming_callback, stop_words)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make mypy happy could we add an assert here asserting that streaming_callback is of type AsyncStreamingCallbackT?

or update AsyncHFTokenStreamingHandler such that the type hint for stream_handler is StreamingCallbackT

@sjrl sjrl removed the request for review from vblagoje May 20, 2025 07:51
@sjrl sjrl requested a review from mpangrazzi May 22, 2025 15:44
Copy link
Contributor

@mpangrazzi mpangrazzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment below!

self.token_handler = stream_handler
self.stop_words = stop_words or []

def on_finalized_text(self, word: str, stream_end: bool = False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method returns a coroutine, but if I look at this, it seems that TextStreamer will call it synchronously (and so coroutine will be not awaited).

In the test below I see you're working around this:

loop.call_soon_threadsafe(
    lambda: asyncio.create_task(streamer.on_finalized_text("Hello", stream_end=False))
)

but TextStreamer will call it like:

streamer.on_finalized_text("Hello", stream_end=False) 

I might be wrong, but have you tried testing it manually? You might need to do something like:

def on_finalized_text(self, word: str, stream_end: bool = False):
    word_to_send = word + "\n" if stream_end else word
    if word_to_send.strip() not in self.stop_words:
        # Schedule the async callback without returning the coroutine
        asyncio.create_task(self.token_handler(StreamingChunk(content=word_to_send)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mpangrazzi thank you for noticing and pointing out, had really missed this crucial implementation step!
have made the required change as well!

@mdrazak2001 mdrazak2001 requested a review from mpangrazzi May 23, 2025 19:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic:tests type:documentation Improvements on the docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

In HuggingFaceLocalChatGenerator add an async version of HFTokenStreamHandler and update type signature for async streaming callback
4 participants