Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion registry/agent/claude/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"scripts": {
"build": "tsc && node ./scripts/build-patched-cli.mjs",
"check-types": "tsc --noEmit",
"test": "pnpm build && node --test tests/*.test.mjs"
"test": "pnpm build && node --test --test-force-exit tests/*.test.mjs"
},
"dependencies": {
"@agentclientprotocol/sdk": "^0.16.1",
Expand Down
73 changes: 47 additions & 26 deletions registry/agent/claude/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ type PendingTurn = {
sawToolCall: boolean;
};

class ClaudeQuerySession {
// Exported for unit tests (the constructor takes an injectable `queryFactory`,
// so the SDK can be faked). Not part of the package's public API.
export class ClaudeQuerySession {
private promptQueue = new AsyncQueue<SDKUserMessage>();
private query: Query;
private readyPromise: Promise<void>;
Expand All @@ -338,6 +340,11 @@ class ClaudeQuerySession {
string,
{ toolName: string; rawInput?: Record<string, unknown> }
>();
// Maps a streaming content-block `index` -> toolUseId for the current turn.
// `content_block_delta` (input_json_delta) events reference the tool call by
// its content-block index, NOT by tool-call ordinal — any text/thinking block
// before a tool_use shifts that index. Cleared on each turn terminus.
private toolUseBlockIndex = new Map<number, string>();
private reader: Promise<void>;
private closed = false;
private cancelled = false;
Expand Down Expand Up @@ -688,9 +695,15 @@ class ClaudeQuerySession {
}
} finally {
traceAdapter(`consume_finally session=${this.sessionId}`);
// The reader loop has exited — the SDK query stream is done (cleanly or
// via error), so this session can never produce another result. Mark it
// closed so a subsequent prompt() fails fast via the guard in prompt()
// instead of queueing onto a dead reader and hanging to the ACP method
// timeout (a zombie session).
this.closed = true;
if (this.pendingTurn) {
this.pendingTurn.reject(
new Error("Claude query ended before producing a result"),
new Error("Claude session ended before producing a result"),
);
this.pendingTurn = null;
}
Expand Down Expand Up @@ -793,6 +806,12 @@ class ClaudeQuerySession {
const toolName = String(block.name ?? "tool");
const rawInput = isRecord(block.input) ? block.input : undefined;
this.activeToolCalls.set(toolUseId, { toolName, rawInput });
// Remember this tool_use block's content-block index so subsequent
// input_json_delta events (which reference the block by index) are
// attributed to the right tool call.
if (typeof event.index === "number") {
this.toolUseBlockIndex.set(Number(event.index), toolUseId);
}
if (this.pendingTurn) this.pendingTurn.sawToolCall = true;

await this.emit({
Expand Down Expand Up @@ -892,6 +911,7 @@ class ClaudeQuerySession {
});
}
this.activeToolCalls.clear();
this.toolUseBlockIndex.clear();

await this.lastEmit;

Expand Down Expand Up @@ -921,7 +941,16 @@ class ClaudeQuerySession {
update,
}),
)
.catch(() => {});
// The catch is load-bearing: lastEmit is awaited at turn end and a
// rejected chain would halt all later updates and surface as a spurious
// prompt failure. But never swallow silently — a dropped session/update
// (host disconnect / broken pipe) must be host-visible, so write it to
// stderr (the onAgentStderr channel).
.catch((error) => {
process.stderr.write(
`[claude-acp] failed to deliver session/update: ${formatError(error)}\n`,
);
});
return this.lastEmit;
}

Expand Down Expand Up @@ -960,26 +989,13 @@ class ClaudeQuerySession {
toolCallId: options.toolUseID,
},
};
const permissionFallbackMs = Number.parseInt(
process.env.CLAUDE_CODE_PERMISSION_FALLBACK_MS ?? "2000",
10,
);
const response = await Promise.race([
this.conn.requestPermission(request),
new Promise<RequestPermissionResponse>((resolve) => {
setTimeout(() => {
traceAdapter(
`permission_request_fallback session=${this.sessionId} tool=${toolName} toolUseId=${options.toolUseID}`,
);
resolve({
outcome: {
outcome: "selected",
optionId: "allow_once",
},
});
}, Number.isFinite(permissionFallbackMs) ? permissionFallbackMs : 2000);
}),
]);
// The host's permission handler is authoritative: never auto-resolve
// the request on a timer. A timer-based fallback here would either
// fail OPEN (auto-allow — the untrusted guest's tool runs without host
// consent) or fail closed early (deny a legitimate slow approval). If
// the host never answers, the request fails via the bounded ACP method
// timeout, which surfaces to the host rather than silently granting.
const response = await this.conn.requestPermission(request);
traceAdapter(
`permission_request_done session=${this.sessionId} tool=${toolName} toolUseId=${options.toolUseID}`,
);
Expand All @@ -997,9 +1013,14 @@ class ClaudeQuerySession {
toolName: string;
rawInput?: Record<string, unknown>;
} | null {
const entry = [...this.activeToolCalls.entries()][index];
if (!entry) return null;
const [toolUseId, value] = entry;
// `index` is the streaming content-block index, not a tool-call ordinal —
// resolve it through the per-turn block-index map so the partial input is
// attributed to the correct tool call even when text/thinking blocks
// precede the tool_use block.
const toolUseId = this.toolUseBlockIndex.get(index);
if (!toolUseId) return null;
const value = this.activeToolCalls.get(toolUseId);
if (!value) return null;
return { toolUseId, toolName: value.toolName, rawInput: value.rawInput };
}
}
Expand Down
203 changes: 203 additions & 0 deletions registry/agent/claude/tests/adapter.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import test, { after } from "node:test";
import assert from "node:assert/strict";
import { resolve as resolvePath } from "node:path";

// Stop fns for every "open" fake query, drained after the run so the dangling
// consume() loops (and the process) can exit.
const cleanups = [];
after(() => {
for (const stop of cleanups) stop();
});

// Unit tests for the ClaudeQuerySession adapter fixes. The class is exported
// for testing and its constructor takes an injectable `queryFactory`, so these
// drive the translation/permission/teardown logic with a fake Claude SDK query
// and a mock ACP connection — no real SDK, no VM.
const packageDir = resolvePath(import.meta.dirname, "..");
const { ClaudeQuerySession } = await import(
resolvePath(packageDir, "dist", "adapter.js")
);

function makeConn(overrides = {}) {
let closeConn;
const closed = new Promise((r) => {
closeConn = r;
});
const updates = [];
return {
updates,
closeConn: () => closeConn(),
sessionUpdate: async (u) => {
updates.push(u);
},
requestPermission: async () => ({
outcome: { outcome: "selected", optionId: "allow_once" },
}),
closed,
...overrides,
};
}

function makeQuery({ endImmediately = false } = {}) {
let stop;
const stopped = new Promise((r) => {
stop = r;
});
if (!endImmediately) cleanups.push(stop);
return {
setMcpServers: async () => {},
interrupt: async () => {},
setPermissionMode: async () => {},
async *[Symbol.asyncIterator]() {
if (endImmediately) return;
await stopped; // ends consume() when drained in `after`
},
};
}

function makeSession({ endImmediately = false, conn } = {}) {
const c = conn ?? makeConn();
let capturedOptions;
const queryFactory = (arg) => {
capturedOptions = arg.options;
return makeQuery({ endImmediately });
};
const sess = new ClaudeQuerySession(
c,
"sess-1",
"/workspace",
"default",
{ cwd: "/workspace", mcpServers: undefined },
"/usr/bin/claude",
queryFactory,
);
return { sess, conn: c, getOptions: () => capturedOptions };
}

// ── Fix #5: input_json_delta maps to the correct tool by content-block index ──
test("claude #5: partial tool input is attributed by content-block index, not insertion order", async () => {
const { sess, conn } = makeSession();
sess.pendingTurn = {
sawAssistantText: false,
sawToolCall: false,
resolve() {},
reject() {},
};
// A text block occupies content-block index 0; the tool_use block is index 1.
await sess.handleStreamEvent({
event: { type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "thinking..." } },
});
await sess.handleStreamEvent({
event: {
type: "content_block_start",
index: 1,
content_block: { type: "tool_use", id: "tool-A", name: "Bash", input: {} },
},
});
await sess.handleStreamEvent({
event: { type: "content_block_delta", index: 1, delta: { type: "input_json_delta", partial_json: '{"command":"ls"}' } },
});
await sess.lastEmit;

const toolUpdates = conn.updates
.map((u) => u.update)
.filter((u) => u?.sessionUpdate === "tool_call_update");
// With the old insertion-order lookup, findToolCallByIndex(1) on a 1-entry
// map returns null and the partial input is silently dropped (no update).
assert.equal(toolUpdates.length, 1, "expected exactly one tool_call_update for the partial input");
assert.equal(toolUpdates[0].toolCallId, "tool-A", "partial input must target the tool at block index 1");
assert.equal(toolUpdates[0].rawInput.partial_json, '{"command":"ls"}');
});

test("claude #5: two tool_use blocks at different indices each get their own partial input", async () => {
const { sess, conn } = makeSession();
sess.pendingTurn = { sawAssistantText: false, sawToolCall: false, resolve() {}, reject() {} };
// index 0 = text, index 1 = toolA, index 2 = toolB
await sess.handleStreamEvent({ event: { type: "content_block_delta", index: 0, delta: { type: "text_delta", text: "x" } } });
await sess.handleStreamEvent({ event: { type: "content_block_start", index: 1, content_block: { type: "tool_use", id: "tool-A", name: "Bash", input: {} } } });
await sess.handleStreamEvent({ event: { type: "content_block_start", index: 2, content_block: { type: "tool_use", id: "tool-B", name: "Read", input: {} } } });
await sess.handleStreamEvent({ event: { type: "content_block_delta", index: 2, delta: { type: "input_json_delta", partial_json: '{"path":"/a"}' } } });
await sess.handleStreamEvent({ event: { type: "content_block_delta", index: 1, delta: { type: "input_json_delta", partial_json: '{"command":"echo"}' } } });
await sess.lastEmit;

const updates = conn.updates.map((u) => u.update).filter((u) => u?.sessionUpdate === "tool_call_update");
const byTool = Object.fromEntries(updates.map((u) => [u.toolCallId, u.rawInput.partial_json]));
assert.equal(byTool["tool-B"], '{"path":"/a"}', "block index 2 → tool-B");
assert.equal(byTool["tool-A"], '{"command":"echo"}', "block index 1 → tool-A");
});

// ── Fix #1: permission handler is host-authoritative; no timer auto-resolve ──
test("claude #1: permission handler returns the host's deny decision", async () => {
const conn = makeConn({
requestPermission: async () => ({ outcome: { outcome: "selected", optionId: "reject_once" } }),
});
const { getOptions } = makeSession({ conn });
const canUseTool = getOptions().canUseTool;
const result = await canUseTool("Bash", { command: "rm -rf /" }, { toolUseID: "t1", title: "Bash", suggestions: [] });
assert.equal(result.behavior, "deny", "host reject must produce a deny");
});

test("claude #1: permission handler does NOT auto-resolve on a timer when the host is silent", async () => {
let pendingResolve;
const conn = makeConn({
// Never settles — simulates a host that hasn't answered yet.
requestPermission: () => new Promise((r) => {
pendingResolve = r;
}),
});
const { getOptions } = makeSession({ conn });
const canUseTool = getOptions().canUseTool;
const handlerPromise = canUseTool("Bash", {}, { toolUseID: "t1", title: "Bash", suggestions: [] });
const timer = new Promise((r) => setTimeout(() => r("TIMER_WON"), 300));
const winner = await Promise.race([handlerPromise.then(() => "HANDLER_WON"), timer]);
assert.equal(winner, "TIMER_WON", "handler must not auto-resolve before the host answers (no fail-open timer)");
// settle the pending request so the handler promise doesn't dangle
pendingResolve({ outcome: { outcome: "selected", optionId: "reject_once" } });
await handlerPromise;
});

// ── Fix #2: emit logs delivery failures (host-visible) and keeps the chain alive ──
test("claude #2: a failed sessionUpdate is logged to stderr and the emit chain survives", async () => {
let failNext = true;
const delivered = [];
const conn = makeConn({
sessionUpdate: async (u) => {
if (failNext) {
failNext = false;
throw new Error("broken pipe");
}
delivered.push(u);
},
});
const { sess } = makeSession({ conn });

const writes = [];
const orig = process.stderr.write;
process.stderr.write = (s) => {
writes.push(String(s));
return true;
};
try {
await sess.emit({ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "a" } });
await sess.emit({ sessionUpdate: "agent_message_chunk", content: { type: "text", text: "b" } });
await sess.lastEmit;
} finally {
process.stderr.write = orig;
}
assert.ok(
writes.some((w) => w.includes("failed to deliver session/update")),
"a delivery failure must be written to stderr, not swallowed",
);
assert.equal(delivered.length, 1, "the chain must survive the failure and deliver the next update");
});

// ── Fix #4: a dead reader marks the session closed so prompt() fails fast ──
test("claude #4: once the query stream ends, prompt() fails fast instead of hanging", async () => {
const { sess } = makeSession({ endImmediately: true });
await sess.reader; // wait for consume() to finish on the now-ended query
await assert.rejects(
sess.prompt({ prompt: [{ type: "text", text: "hi" }] }),
/Session is closed/,
"a prompt on a dead session must reject promptly, not hang",
);
});
3 changes: 2 additions & 1 deletion registry/agent/pi/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"scripts": {
"build": "tsc && node scripts/build-snapshot-bundle.mjs",
"build:snapshot": "node scripts/build-snapshot-bundle.mjs",
"check-types": "tsc --noEmit"
"check-types": "tsc --noEmit",
"test": "pnpm build && node --test --test-force-exit tests/*.test.mjs"
},
"dependencies": {
"@agentclientprotocol/sdk": "^0.16.1",
Expand Down
Loading
Loading