1212from camel .societies .workforce .utils import TaskAssignResult
1313from camel .societies .workforce .workforce_metrics import WorkforceMetrics
1414from camel .societies .workforce .events import WorkerCreatedEvent
15+ from camel .societies .workforce .prompts import TASK_DECOMPOSE_PROMPT
1516from camel .tasks .task import Task , TaskState , validate_task_content
1617from app .component import code
1718from app .exception .exception import UserException
@@ -65,14 +66,22 @@ def __init__(
6566 )
6667 logger .info (f"[WF-LIFECYCLE] ✅ Workforce.__init__ COMPLETED, id={ id (self )} " )
6768
68- def eigent_make_sub_tasks (self , task : Task , coordinator_context : str = "" ):
69+ def eigent_make_sub_tasks (
70+ self ,
71+ task : Task ,
72+ coordinator_context : str = "" ,
73+ on_stream_batch = None ,
74+ on_stream_text = None ,
75+ ):
6976 """
7077 Split process_task method to eigent_make_sub_tasks and eigent_start method.
7178
7279 Args:
7380 task: The main task to decompose
7481 coordinator_context: Optional context ONLY for coordinator agent during decomposition.
7582 This context will NOT be passed to subtasks or worker agents.
83+ on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
84+ on_stream_text: Optional callback for raw streaming text chunks
7685 """
7786 logger .info ("=" * 80 )
7887 logger .info ("🧩 [DECOMPOSE] eigent_make_sub_tasks CALLED" , extra = {
@@ -103,7 +112,15 @@ def eigent_make_sub_tasks(self, task: Task, coordinator_context: str = ""):
103112 logger .info (f"[DECOMPOSE] Workforce reset complete, state: { self ._state .name } " )
104113
105114 logger .info (f"[DECOMPOSE] Calling handle_decompose_append_task" )
106- subtasks = asyncio .run (self .handle_decompose_append_task (task , reset = False , coordinator_context = coordinator_context ))
115+ subtasks = asyncio .run (
116+ self .handle_decompose_append_task (
117+ task ,
118+ reset = False ,
119+ coordinator_context = coordinator_context ,
120+ on_stream_batch = on_stream_batch ,
121+ on_stream_text = on_stream_text
122+ )
123+ )
107124 logger .info ("=" * 80 )
108125 logger .info (f"✅ [DECOMPOSE] Task decomposition COMPLETED" , extra = {
109126 "api_task_id" : self .api_task_id ,
@@ -142,8 +159,45 @@ async def eigent_start(self, subtasks: list[Task]):
142159 self ._state = WorkforceState .IDLE
143160 logger .info (f"[WF-LIFECYCLE] Workforce state set to IDLE" )
144161
162+ def _decompose_task (self , task : Task , stream_callback = None ):
163+ """Decompose task with optional streaming text callback."""
164+
165+ decompose_prompt = str (
166+ TASK_DECOMPOSE_PROMPT .format (
167+ content = task .content ,
168+ child_nodes_info = self ._get_child_nodes_info (),
169+ additional_info = task .additional_info ,
170+ )
171+ )
172+ self .task_agent .reset ()
173+ result = task .decompose (
174+ self .task_agent , decompose_prompt , stream_callback = stream_callback
175+ )
176+
177+ if isinstance (result , Generator ):
178+ def streaming_with_dependencies ():
179+ all_subtasks = []
180+ for new_tasks in result :
181+ all_subtasks .extend (new_tasks )
182+ if new_tasks :
183+ self ._update_dependencies_for_decomposition (
184+ task , all_subtasks
185+ )
186+ yield new_tasks
187+ return streaming_with_dependencies ()
188+ else :
189+ subtasks = result
190+ if subtasks :
191+ self ._update_dependencies_for_decomposition (task , subtasks )
192+ return subtasks
193+
145194 async def handle_decompose_append_task (
146- self , task : Task , reset : bool = True , coordinator_context : str = ""
195+ self ,
196+ task : Task ,
197+ reset : bool = True ,
198+ coordinator_context : str = "" ,
199+ on_stream_batch = None ,
200+ on_stream_text = None ,
147201 ) -> List [Task ]:
148202 """
149203 Override to support coordinator_context parameter.
@@ -153,6 +207,8 @@ async def handle_decompose_append_task(
153207 task: The task to be processed
154208 reset: Should trigger workforce reset (Workforce must not be running)
155209 coordinator_context: Optional context ONLY for coordinator during decomposition
210+ on_stream_batch: Optional callback for streaming batches signature (List[Task], bool)
211+ on_stream_text: Optional callback for raw streaming text chunks
156212
157213 Returns:
158214 List[Task]: The decomposed subtasks or the original task
@@ -186,18 +242,23 @@ async def handle_decompose_append_task(
186242 task .content = task_with_context
187243
188244 logger .info (f"[DECOMPOSE] Calling _decompose_task with context" )
189- subtasks_result = self ._decompose_task (task )
245+ subtasks_result = self ._decompose_task (task , stream_callback = on_stream_text )
190246
191247 task .content = original_content
192248 else :
193249 logger .info (f"[DECOMPOSE] Calling _decompose_task without context" )
194- subtasks_result = self ._decompose_task (task )
250+ subtasks_result = self ._decompose_task (task , stream_callback = on_stream_text )
195251
196252 logger .info (f"[DECOMPOSE] _decompose_task returned, processing results" )
197253 if isinstance (subtasks_result , Generator ):
198254 subtasks = []
199255 for new_tasks in subtasks_result :
200256 subtasks .extend (new_tasks )
257+ if on_stream_batch :
258+ try :
259+ on_stream_batch (new_tasks , False )
260+ except Exception as e :
261+ logger .warning (f"Streaming callback failed: { e } " )
201262 logger .info (f"[DECOMPOSE] Collected { len (subtasks )} subtasks from generator" )
202263 else :
203264 subtasks = subtasks_result
@@ -218,6 +279,12 @@ async def handle_decompose_append_task(
218279 subtasks = [fallback_task ]
219280 logger .info (f"[DECOMPOSE] Created fallback task: { fallback_task .id } " )
220281
282+ if on_stream_batch :
283+ try :
284+ on_stream_batch (subtasks , True )
285+ except Exception as e :
286+ logger .warning (f"Final streaming callback failed: { e } " )
287+
221288 return subtasks
222289
223290 async def _find_assignee (self , tasks : List [Task ]) -> TaskAssignResult :
0 commit comments