-
Notifications
You must be signed in to change notification settings - Fork 4k
Description
Checked other resources
- This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
- I added a clear and detailed title that summarizes the issue.
- I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
- I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.
Example Code
#!/usr/bin/env python3
"""
Minimal test script to debug parallel interrupts with ToolNode.
This tests whether LangGraph's ToolNode properly collects all interrupts
when multiple tools are called in parallel, each with an interrupt().
"""
import asyncio
from typing import Annotated, List
from uuid import uuid4
from langchain_core.messages import AnyMessage, HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt
from typing_extensions import TypedDict
# ============================================================================
# Tools with interrupts
# ============================================================================
@tool
def create_circle(shape_id: str, color: str) -> str:
"""Create a circle on the canvas."""
action = {"tool": "create_circle", "shapeId": shape_id, "color": color}
print(f" [create_circle] About to interrupt with: {action}")
response = interrupt(action)
print(f" [create_circle] Resumed with: {response}")
return f"Created circle {shape_id} with color {color}. Response: {response}"
@tool
def create_square(shape_id: str, color: str) -> str:
"""Create a square on the canvas."""
action = {"tool": "create_square", "shapeId": shape_id, "color": color}
print(f" [create_square] About to interrupt with: {action}")
response = interrupt(action)
print(f" [create_square] Resumed with: {response}")
return f"Created square {shape_id} with color {color}. Response: {response}"
@tool
def create_triangle(shape_id: str, color: str) -> str:
"""Create a triangle on the canvas."""
action = {"tool": "create_triangle", "shapeId": shape_id, "color": color}
print(f" [create_triangle] About to interrupt with: {action}")
response = interrupt(action)
print(f" [create_triangle] Resumed with: {response}")
return f"Created triangle {shape_id} with color {color}. Response: {response}"
# ============================================================================
# Graph definition
# ============================================================================
class State(TypedDict):
messages: Annotated[List[AnyMessage], add_messages]
tools = [create_circle, create_square, create_triangle]
async def agent_node(state: State):
"""Simulates an LLM that returns multiple tool calls."""
print("\n[agent_node] Entering agent node")
# Simulate the LLM returning 3 parallel tool calls
from langchain_core.messages import AIMessage
ai_message = AIMessage(
content="I'll create three shapes for you.",
tool_calls=[
{
"id": "call_1",
"name": "create_circle",
"args": {"shape_id": "c1", "color": "red"},
},
{
"id": "call_2",
"name": "create_square",
"args": {"shape_id": "s1", "color": "blue"},
},
{
"id": "call_3",
"name": "create_triangle",
"args": {"shape_id": "t1", "color": "green"},
},
],
)
print(f"[agent_node] Returning {len(ai_message.tool_calls)} tool calls")
return {"messages": [ai_message]}
def router(state: State) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "tools"
return END
async def main():
print("=" * 60)
print("Testing Parallel Interrupts with ToolNode")
print("=" * 60)
# Build the graph
graph = StateGraph(State)
graph.add_node("agent", agent_node)
graph.add_node("tools", ToolNode(tools=tools))
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", router, ["tools", END])
graph.add_edge("tools", "agent")
checkpointer = MemorySaver()
workflow = graph.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": str(uuid4())}}
# First invocation - should trigger interrupts
print("\n--- First Invocation ---")
initial_input = {"messages": [HumanMessage(content="Create 3 shapes")]}
try:
result = await workflow.ainvoke(initial_input, config=config)
print(f"Result: {result}")
except Exception as e:
print(f"Exception during invoke: {type(e).__name__}: {e}")
# Check state for interrupts
print("\n--- Checking State for Interrupts ---")
state = await workflow.aget_state(config)
print(f"Tasks: {len(state.tasks)}")
for i, task in enumerate(state.tasks):
print(f" Task {i}: {task.id} - {task.name}")
print(f" Interrupts: {len(task.interrupts) if task.interrupts else 0}")
if task.interrupts:
for j, interrupt in enumerate(task.interrupts):
print(
f" Interrupt {j}: id={interrupt.id}, value={interrupt.value}"
)
# Collect all interrupts
all_interrupts = []
for task in state.tasks:
if task.interrupts:
all_interrupts.extend(task.interrupts)
print(f"\nTotal interrupts collected: {len(all_interrupts)}")
if not all_interrupts:
print("\n!!! NO INTERRUPTS FOUND - This is the bug !!!")
return
# Try to resume with all interrupts
print("\n--- Attempting Resume ---")
from langgraph.types import Command
# Build resume dict with all interrupt IDs
resume_data = {
intr.id: {"message": f"done with {intr.value}"} for intr in all_interrupts
}
print(f"Resuming with: {resume_data}")
try:
result = await workflow.ainvoke(Command(resume=resume_data), config=config)
print(f"Resume result: {result}")
except Exception as e:
print(f"Exception during resume: {type(e).__name__}: {e}")
# Check state again
print("\n--- Final State Check ---")
state = await workflow.aget_state(config)
all_interrupts = []
for task in state.tasks:
if task.interrupts:
all_interrupts.extend(task.interrupts)
print(f"Remaining interrupts: {len(all_interrupts)}")
if __name__ == "__main__":
asyncio.run(main())Error Message and Stack Trace (if applicable)
============================================================
Testing Parallel Interrupts with ToolNode
============================================================
--- First Invocation ---
[agent_node] Entering agent node
[agent_node] Returning 3 tool calls
[create_square] About to interrupt with: {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}
[create_circle] About to interrupt with: {'tool': 'create_circle', 'shapeId': 'c1', 'color': 'red'}
[create_triangle] About to interrupt with: {'tool': 'create_triangle', 'shapeId': 't1', 'color': 'green'}
Result: {'messages': [HumanMessage(content='Create 3 shapes', additional_kwargs={}, response_metadata={}, id='496c3dd8-bc87-4add-a1e7-e8ffff14ce1b'), AIMessage(content="I'll create three shapes for you.", additional_kwargs={}, response_metadata={}, id='4ae56873-4bd1-4e55-84a7-373b0b082b78', tool_calls=[{'name': 'create_circle', 'args': {'shape_id': 'c1', 'color': 'red'}, 'id': 'call_1', 'type': 'tool_call'}, {'name': 'create_square', 'args': {'shape_id': 's1', 'color': 'blue'}, 'id': 'call_2', 'type': 'tool_call'}, {'name': 'create_triangle', 'args': {'shape_id': 't1', 'color': 'green'}, 'id': 'call_3', 'type': 'tool_call'}])], '__interrupt__': [Interrupt(value={'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}, id='d21596a148c0eb88b24f7bedd9b747b5')]}
--- Checking State for Interrupts ---
Tasks: 1
Task 0: 0eede955-6fe6-f4a0-7b74-494a62c11f92 - tools
Interrupts: 1
Interrupt 0: id=d21596a148c0eb88b24f7bedd9b747b5, value={'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}
Total interrupts collected: 1
--- Attempting Resume ---
Resuming with: {'d21596a148c0eb88b24f7bedd9b747b5': {'message': "done with {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}"}}
[create_circle] About to interrupt with: {'tool': 'create_circle', 'shapeId': 'c1', 'color': 'red'}
[create_square] About to interrupt with: {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}
[create_circle] Resumed with: {'message': "done with {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}"}
[create_triangle] About to interrupt with: {'tool': 'create_triangle', 'shapeId': 't1', 'color': 'green'}
Resume result: {'messages': [HumanMessage(content='Create 3 shapes', additional_kwargs={}, response_metadata={}, id='496c3dd8-bc87-4add-a1e7-e8ffff14ce1b'), AIMessage(content="I'll create three shapes for you.", additional_kwargs={}, response_metadata={}, id='4ae56873-4bd1-4e55-84a7-373b0b082b78', tool_calls=[{'name': 'create_circle', 'args': {'shape_id': 'c1', 'color': 'red'}, 'id': 'call_1', 'type': 'tool_call'}, {'name': 'create_square', 'args': {'shape_id': 's1', 'color': 'blue'}, 'id': 'call_2', 'type': 'tool_call'}, {'name': 'create_triangle', 'args': {'shape_id': 't1', 'color': 'green'}, 'id': 'call_3', 'type': 'tool_call'}])], '__interrupt__': [Interrupt(value={'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}, id='d21596a148c0eb88b24f7bedd9b747b5')]}
--- Final State Check ---
Remaining interrupts: 1Description
I am trying to execute tools in parallel that each contain an interrupt. I basically want to execute all of them, gather the interrupts together and then send all the interrupts to the frontend, complete them all, then resume all the tools with their respective results. Am I doing this incorrectly?
Expected Behavior
When tools are called in parallel and each calls interrupt():
- All tools should start executing (✅ this works - all 3 print "About to interrupt")
- All interrupt values should be captured in
state.tasks[0].interrupts(❌ only 1 is captured) - On resume with
Command(resume={id1: result1, id2: result2, id3: result3}), each tool should receive its corresponding result (❌ wrong tool receives the value)
Additional Context
Resume Behavior is Also Broken
When resuming with the single captured interrupt, all 3 tools re-execute, but only one receives the resume value - and it's often the wrong tool:
--- Attempting Resume ---
Resuming with: {'d21596a148c0eb88b24f7bedd9b747b5': {'message': 'done with create_square'}}
[create_circle] About to interrupt with: {'tool': 'create_circle', ...}
[create_square] About to interrupt with: {'tool': 'create_square', ...}
[create_circle] Resumed with: {'message': 'done with create_square'} # <-- WRONG! Circle got Square's value
[create_triangle] About to interrupt with: {'tool': 'create_triangle', ...}
Root Cause Hypothesis
When ToolNode executes tools in parallel using asyncio.gather, the first GraphInterrupt exception that propagates causes:
- Other concurrent coroutines to be cancelled or their interrupts to be lost
- Only one interrupt to be recorded in the checkpoint
- The interrupt ID to not be properly associated with its originating tool
System Info
System Information
OS: Darwin
OS Version: Darwin Kernel Version 25.1.0: Mon Oct 20 19:34:05 PDT 2025; root:xnu-12377.41.6~2/RELEASE_ARM64_T6041
Python Version: 3.14.0 (main, Oct 7 2025, 09:34:52) [Clang 17.0.0 (clang-1700.0.13.3)]
Package Information
langchain_core: 1.2.2
langchain: 1.2.0
langchain_community: 0.4.1
langsmith: 0.5.0
langchain_anthropic: 1.3.0
langchain_classic: 1.0.0
langchain_huggingface: 1.2.0
langchain_openai: 1.1.6
langchain_text_splitters: 1.1.0
langgraph_sdk: 0.3.0
Optional packages not installed
langserve
Other Dependencies
aiohttp: 3.13.2
anthropic: 0.75.0
dataclasses-json: 0.6.7
httpx: 0.28.1
httpx-sse: 0.4.3
huggingface-hub: 0.36.0
jsonpatch: 1.33
langgraph: 1.0.5
numpy: 2.3.5
openai: 2.13.0
orjson: 3.11.5
packaging: 25.0
pydantic: 2.12.5
pydantic-settings: 2.12.0
PyYAML: 6.0.3
pyyaml: 6.0.3
requests: 2.32.5
requests-toolbelt: 1.0.0
rich: 13.9.4
sentence-transformers: 5.2.0
sqlalchemy: 2.0.45
SQLAlchemy: 2.0.45
tenacity: 9.1.2
tiktoken: 0.12.0
tokenizers: 0.22.1
transformers: 4.57.3
typing-extensions: 4.15.0
uuid-utils: 0.12.0
zstandard: 0.25.0