Skip to content

ToolNode doesn't collect all interrupts from parallel tool execution #6624

@jamesheavey

Description

@jamesheavey

Checked other resources

  • This is a bug, not a usage question. For questions, please use the LangChain Forum (https://forum.langchain.com/).
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

#!/usr/bin/env python3
"""
Minimal test script to debug parallel interrupts with ToolNode.

This tests whether LangGraph's ToolNode properly collects all interrupts
when multiple tools are called in parallel, each with an interrupt().
"""

import asyncio
from typing import Annotated, List
from uuid import uuid4

from langchain_core.messages import AnyMessage, HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt
from typing_extensions import TypedDict


# ============================================================================
# Tools with interrupts
# ============================================================================


@tool
def create_circle(shape_id: str, color: str) -> str:
    """Create a circle on the canvas."""
    action = {"tool": "create_circle", "shapeId": shape_id, "color": color}
    print(f"  [create_circle] About to interrupt with: {action}")
    response = interrupt(action)
    print(f"  [create_circle] Resumed with: {response}")
    return f"Created circle {shape_id} with color {color}. Response: {response}"


@tool
def create_square(shape_id: str, color: str) -> str:
    """Create a square on the canvas."""
    action = {"tool": "create_square", "shapeId": shape_id, "color": color}
    print(f"  [create_square] About to interrupt with: {action}")
    response = interrupt(action)
    print(f"  [create_square] Resumed with: {response}")
    return f"Created square {shape_id} with color {color}. Response: {response}"


@tool
def create_triangle(shape_id: str, color: str) -> str:
    """Create a triangle on the canvas."""
    action = {"tool": "create_triangle", "shapeId": shape_id, "color": color}
    print(f"  [create_triangle] About to interrupt with: {action}")
    response = interrupt(action)
    print(f"  [create_triangle] Resumed with: {response}")
    return f"Created triangle {shape_id} with color {color}. Response: {response}"


# ============================================================================
# Graph definition
# ============================================================================


class State(TypedDict):
    messages: Annotated[List[AnyMessage], add_messages]


tools = [create_circle, create_square, create_triangle]


async def agent_node(state: State):
    """Simulates an LLM that returns multiple tool calls."""
    print("\n[agent_node] Entering agent node")

    # Simulate the LLM returning 3 parallel tool calls
    from langchain_core.messages import AIMessage

    ai_message = AIMessage(
        content="I'll create three shapes for you.",
        tool_calls=[
            {
                "id": "call_1",
                "name": "create_circle",
                "args": {"shape_id": "c1", "color": "red"},
            },
            {
                "id": "call_2",
                "name": "create_square",
                "args": {"shape_id": "s1", "color": "blue"},
            },
            {
                "id": "call_3",
                "name": "create_triangle",
                "args": {"shape_id": "t1", "color": "green"},
            },
        ],
    )
    print(f"[agent_node] Returning {len(ai_message.tool_calls)} tool calls")
    return {"messages": [ai_message]}


def router(state: State) -> str:
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return END


async def main():
    print("=" * 60)
    print("Testing Parallel Interrupts with ToolNode")
    print("=" * 60)

    # Build the graph
    graph = StateGraph(State)
    graph.add_node("agent", agent_node)
    graph.add_node("tools", ToolNode(tools=tools))

    graph.add_edge(START, "agent")
    graph.add_conditional_edges("agent", router, ["tools", END])
    graph.add_edge("tools", "agent")

    checkpointer = MemorySaver()
    workflow = graph.compile(checkpointer=checkpointer)

    config = {"configurable": {"thread_id": str(uuid4())}}

    # First invocation - should trigger interrupts
    print("\n--- First Invocation ---")
    initial_input = {"messages": [HumanMessage(content="Create 3 shapes")]}

    try:
        result = await workflow.ainvoke(initial_input, config=config)
        print(f"Result: {result}")
    except Exception as e:
        print(f"Exception during invoke: {type(e).__name__}: {e}")

    # Check state for interrupts
    print("\n--- Checking State for Interrupts ---")
    state = await workflow.aget_state(config)

    print(f"Tasks: {len(state.tasks)}")
    for i, task in enumerate(state.tasks):
        print(f"  Task {i}: {task.id} - {task.name}")
        print(f"    Interrupts: {len(task.interrupts) if task.interrupts else 0}")
        if task.interrupts:
            for j, interrupt in enumerate(task.interrupts):
                print(
                    f"      Interrupt {j}: id={interrupt.id}, value={interrupt.value}"
                )

    # Collect all interrupts
    all_interrupts = []
    for task in state.tasks:
        if task.interrupts:
            all_interrupts.extend(task.interrupts)

    print(f"\nTotal interrupts collected: {len(all_interrupts)}")

    if not all_interrupts:
        print("\n!!! NO INTERRUPTS FOUND - This is the bug !!!")
        return

    # Try to resume with all interrupts
    print("\n--- Attempting Resume ---")
    from langgraph.types import Command

    # Build resume dict with all interrupt IDs
    resume_data = {
        intr.id: {"message": f"done with {intr.value}"} for intr in all_interrupts
    }
    print(f"Resuming with: {resume_data}")

    try:
        result = await workflow.ainvoke(Command(resume=resume_data), config=config)
        print(f"Resume result: {result}")
    except Exception as e:
        print(f"Exception during resume: {type(e).__name__}: {e}")

    # Check state again
    print("\n--- Final State Check ---")
    state = await workflow.aget_state(config)
    all_interrupts = []
    for task in state.tasks:
        if task.interrupts:
            all_interrupts.extend(task.interrupts)
    print(f"Remaining interrupts: {len(all_interrupts)}")


if __name__ == "__main__":
    asyncio.run(main())

Error Message and Stack Trace (if applicable)

============================================================
Testing Parallel Interrupts with ToolNode
============================================================

--- First Invocation ---

[agent_node] Entering agent node
[agent_node] Returning 3 tool calls
  [create_square] About to interrupt with: {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}
  [create_circle] About to interrupt with: {'tool': 'create_circle', 'shapeId': 'c1', 'color': 'red'}
  [create_triangle] About to interrupt with: {'tool': 'create_triangle', 'shapeId': 't1', 'color': 'green'}
Result: {'messages': [HumanMessage(content='Create 3 shapes', additional_kwargs={}, response_metadata={}, id='496c3dd8-bc87-4add-a1e7-e8ffff14ce1b'), AIMessage(content="I'll create three shapes for you.", additional_kwargs={}, response_metadata={}, id='4ae56873-4bd1-4e55-84a7-373b0b082b78', tool_calls=[{'name': 'create_circle', 'args': {'shape_id': 'c1', 'color': 'red'}, 'id': 'call_1', 'type': 'tool_call'}, {'name': 'create_square', 'args': {'shape_id': 's1', 'color': 'blue'}, 'id': 'call_2', 'type': 'tool_call'}, {'name': 'create_triangle', 'args': {'shape_id': 't1', 'color': 'green'}, 'id': 'call_3', 'type': 'tool_call'}])], '__interrupt__': [Interrupt(value={'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}, id='d21596a148c0eb88b24f7bedd9b747b5')]}

--- Checking State for Interrupts ---
Tasks: 1
  Task 0: 0eede955-6fe6-f4a0-7b74-494a62c11f92 - tools
    Interrupts: 1
      Interrupt 0: id=d21596a148c0eb88b24f7bedd9b747b5, value={'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}

Total interrupts collected: 1

--- Attempting Resume ---
Resuming with: {'d21596a148c0eb88b24f7bedd9b747b5': {'message': "done with {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}"}}
  [create_circle] About to interrupt with: {'tool': 'create_circle', 'shapeId': 'c1', 'color': 'red'}
  [create_square] About to interrupt with: {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}
  [create_circle] Resumed with: {'message': "done with {'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}"}
  [create_triangle] About to interrupt with: {'tool': 'create_triangle', 'shapeId': 't1', 'color': 'green'}
Resume result: {'messages': [HumanMessage(content='Create 3 shapes', additional_kwargs={}, response_metadata={}, id='496c3dd8-bc87-4add-a1e7-e8ffff14ce1b'), AIMessage(content="I'll create three shapes for you.", additional_kwargs={}, response_metadata={}, id='4ae56873-4bd1-4e55-84a7-373b0b082b78', tool_calls=[{'name': 'create_circle', 'args': {'shape_id': 'c1', 'color': 'red'}, 'id': 'call_1', 'type': 'tool_call'}, {'name': 'create_square', 'args': {'shape_id': 's1', 'color': 'blue'}, 'id': 'call_2', 'type': 'tool_call'}, {'name': 'create_triangle', 'args': {'shape_id': 't1', 'color': 'green'}, 'id': 'call_3', 'type': 'tool_call'}])], '__interrupt__': [Interrupt(value={'tool': 'create_square', 'shapeId': 's1', 'color': 'blue'}, id='d21596a148c0eb88b24f7bedd9b747b5')]}

--- Final State Check ---
Remaining interrupts: 1

Description

I am trying to execute tools in parallel that each contain an interrupt. I basically want to execute all of them, gather the interrupts together and then send all the interrupts to the frontend, complete them all, then resume all the tools with their respective results. Am I doing this incorrectly?

Expected Behavior

When tools are called in parallel and each calls interrupt():

  1. All tools should start executing (✅ this works - all 3 print "About to interrupt")
  2. All interrupt values should be captured in state.tasks[0].interrupts (❌ only 1 is captured)
  3. On resume with Command(resume={id1: result1, id2: result2, id3: result3}), each tool should receive its corresponding result (❌ wrong tool receives the value)

Additional Context

Resume Behavior is Also Broken

When resuming with the single captured interrupt, all 3 tools re-execute, but only one receives the resume value - and it's often the wrong tool:

--- Attempting Resume ---
Resuming with: {'d21596a148c0eb88b24f7bedd9b747b5': {'message': 'done with create_square'}}
  [create_circle] About to interrupt with: {'tool': 'create_circle', ...}
  [create_square] About to interrupt with: {'tool': 'create_square', ...}
  [create_circle] Resumed with: {'message': 'done with create_square'}  # <-- WRONG! Circle got Square's value
  [create_triangle] About to interrupt with: {'tool': 'create_triangle', ...}

Root Cause Hypothesis

When ToolNode executes tools in parallel using asyncio.gather, the first GraphInterrupt exception that propagates causes:

  1. Other concurrent coroutines to be cancelled or their interrupts to be lost
  2. Only one interrupt to be recorded in the checkpoint
  3. The interrupt ID to not be properly associated with its originating tool

System Info

System Information

OS: Darwin
OS Version: Darwin Kernel Version 25.1.0: Mon Oct 20 19:34:05 PDT 2025; root:xnu-12377.41.6~2/RELEASE_ARM64_T6041
Python Version: 3.14.0 (main, Oct 7 2025, 09:34:52) [Clang 17.0.0 (clang-1700.0.13.3)]

Package Information

langchain_core: 1.2.2
langchain: 1.2.0
langchain_community: 0.4.1
langsmith: 0.5.0
langchain_anthropic: 1.3.0
langchain_classic: 1.0.0
langchain_huggingface: 1.2.0
langchain_openai: 1.1.6
langchain_text_splitters: 1.1.0
langgraph_sdk: 0.3.0

Optional packages not installed

langserve

Other Dependencies

aiohttp: 3.13.2
anthropic: 0.75.0
dataclasses-json: 0.6.7
httpx: 0.28.1
httpx-sse: 0.4.3
huggingface-hub: 0.36.0
jsonpatch: 1.33
langgraph: 1.0.5
numpy: 2.3.5
openai: 2.13.0
orjson: 3.11.5
packaging: 25.0
pydantic: 2.12.5
pydantic-settings: 2.12.0
PyYAML: 6.0.3
pyyaml: 6.0.3
requests: 2.32.5
requests-toolbelt: 1.0.0
rich: 13.9.4
sentence-transformers: 5.2.0
sqlalchemy: 2.0.45
SQLAlchemy: 2.0.45
tenacity: 9.1.2
tiktoken: 0.12.0
tokenizers: 0.22.1
transformers: 4.57.3
typing-extensions: 4.15.0
uuid-utils: 0.12.0
zstandard: 0.25.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingpendingawaiting review/confirmation by maintainer

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions