Skip to content

Commit 3d4e08c

Browse files
thomashebrardlchoquelclaude
authored
feature/temporal-renaming-finish (#772)
* feature/temporal-renaming-finish * add changelog * add config * update config * update PIpelex.make * add .md temporal * Stabilize Temporal integration: worker CLI, workflow utils, and docs - Add worker CLI commands and temporal workflow utilities - Update temporal hub, manager, and task manager for stability - Add pipe routing and execution documentation - Add temporal-diagnose skill for debugging worker issues - Update Makefile with temporal-related targets - Clean up removed temporal-problm.md (moved to skill references) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Problems, solutions, plans * Improve Temporal dev workflow: portable paths, env deps, and docs fixes Remove hardcoded absolute paths in temporal-diagnose skill, add env dependency to Makefile temporal targets, fix workflow/activity names in docs, and add pipe routing page to mkdocs nav. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * move wip docs * Refactor documentation references and update LibraryContext structure --------- Co-authored-by: Louis Choquel <lchoquel@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent fa9e2bd commit 3d4e08c

69 files changed

Lines changed: 2215 additions & 202 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
---
2+
name: temporal-diagnose
3+
description: >
4+
Two modes for working on the Temporal worker library loading bug.
5+
DIAGNOSE mode: run the 3-terminal Temporal dev setup (server + worker + job),
6+
observe the failure, interpret errors. Use when the user says "test temporal",
7+
"run temporal", "diagnose temporal", "temporal dev", "reproduce the temporal bug",
8+
"check if temporal works", or pastes Temporal worker/submitter output to interpret.
9+
FIX mode: discuss architecture, design the solution, plan implementation, make
10+
code changes. Use when the user says "fix temporal", "let's discuss a fix",
11+
"design the temporal fix", "implement the temporal fix", "plan the temporal
12+
solution", or wants to iterate on the worker library loading solution.
13+
Always use this skill when the conversation touches the Temporal worker library
14+
problem, get_required_pipe failures on the worker, or mthds_contents not reaching
15+
the worker.
16+
---
17+
18+
# Temporal Worker Library — Diagnose & Fix
19+
20+
This skill has two modes. Determine which one from the user's prompt:
21+
22+
- **"diagnose"**, **"test"**, **"run"**, **"reproduce"**, **"check"** → DIAGNOSE mode
23+
- **"fix"**, **"discuss"**, **"design"**, **"implement"**, **"plan"**, **"solution"** → FIX mode
24+
25+
If ambiguous, ask the user: "Do you want to diagnose (run the setup and observe) or discuss the fix?"
26+
27+
Read `references/temporal-worker-problem.md` before proceeding — it explains the
28+
root cause, code paths, and expected error patterns.
29+
30+
## How Claude Code runs everything
31+
32+
Claude Code handles all three processes. Do NOT ask the user to open terminals
33+
or run commands — do it yourself.
34+
35+
Use **tmux** to manage the long-running processes (server and worker) in named
36+
sessions. This lets you start them, run the job submitter, and then capture
37+
output from all three to diagnose.
38+
39+
| Process | tmux session | Raw command | Lifecycle |
40+
|---------|-------------|-------------|-----------|
41+
| Temporal server | `temporal-server` | `temporal server start-dev` | Long-running, stays up across iterations |
42+
| Temporal worker | `temporal-worker` | `PIPELEXPATH=<bundle_dir> .venv/bin/python -m pipelex.temporal.worker_cli --is-not-sandboxed` | Long-running, restart after code changes |
43+
| Job submitter | (inline Bash) | `make trund` / `make trun` | Runs and exits |
44+
45+
**Important**: The server and worker are **long-running processes that never exit**.
46+
They block the shell they run in. That is why they run inside tmux sessions, not
47+
inline. The submitter (`make trund` / `make trun`) is the only process that runs to
48+
completion and exits — run it directly via Bash, not in tmux.
49+
50+
**Why raw commands in tmux**: tmux sessions run in a bare shell without the
51+
Makefile's variable resolution (`$(VENV_PYTHON)`, `$(call PRINT_TITLE,...)`).
52+
Using `make ts` or `make tw` inside tmux will fail. Always use the raw commands
53+
shown above for tmux sessions. The `make` targets are only for the job submitter
54+
which runs in Claude Code's own shell.
55+
56+
### tmux cheatsheet
57+
58+
**Start a session:**
59+
```bash
60+
tmux new-session -d -s temporal-server 'temporal server start-dev'
61+
```
62+
63+
**Check if running:**
64+
```bash
65+
tmux has-session -t temporal-server 2>/dev/null && echo "running" || echo "not running"
66+
```
67+
68+
**Read output** (last N lines):
69+
```bash
70+
tmux capture-pane -t temporal-worker -p -S -100
71+
```
72+
73+
**Kill and restart** (e.g., to pick up code changes):
74+
```bash
75+
tmux kill-session -t temporal-worker
76+
tmux new-session -d -c "$PWD" -s temporal-worker 'PIPELEXPATH=tests/integration/pipelex/pipes/controller/pipe_sequence .venv/bin/python -m pipelex.temporal.worker_cli --is-not-sandboxed'
77+
```
78+
79+
If tmux is not installed, fall back to asking the user to run the server and
80+
worker in separate terminals.
81+
82+
---
83+
84+
## DIAGNOSE Mode
85+
86+
Run the 3-process Temporal development setup and interpret results.
87+
88+
### Prerequisites
89+
90+
Verify these yourself (via Bash):
91+
1. `tmux` installed: `which tmux`
92+
2. `temporal` CLI installed: `which temporal`
93+
94+
### Step 1: Start the Temporal server
95+
96+
First check if a server is already running (possibly outside tmux from a previous
97+
session or another terminal):
98+
```bash
99+
curl -s http://localhost:8233 > /dev/null && echo "running" || echo "not running"
100+
```
101+
102+
If **running**: skip to step 2. The server is already up — no need to start it again.
103+
104+
If **not running**: start it in a tmux session:
105+
```bash
106+
tmux new-session -d -s temporal-server 'temporal server start-dev'
107+
```
108+
Sleep **3 seconds**, then verify:
109+
```bash
110+
sleep 3 && curl -s http://localhost:8233 > /dev/null && echo "running" || echo "not running"
111+
```
112+
113+
Do NOT try to start the server if port 7233 is already in use — it will fail with
114+
a bind error, the tmux session will exit immediately, and subsequent `capture-pane`
115+
calls will fail.
116+
117+
### Step 2: Start the worker
118+
119+
```bash
120+
tmux has-session -t temporal-worker 2>/dev/null || \
121+
tmux new-session -d -s temporal-worker \
122+
'cd $PWD && PIPELEXPATH=tests/integration/pipelex/pipes/controller/pipe_sequence .venv/bin/python -m pipelex.temporal.worker_cli --is-not-sandboxed'
123+
```
124+
125+
The worker is also long-running and never exits. Sleep **4 seconds** (no more),
126+
then capture the pane:
127+
```bash
128+
sleep 4 && tmux capture-pane -t temporal-worker -p -S -30
129+
```
130+
Look for `Temporal Worker started for 'temporal_task_queue'`.
131+
132+
### Step 3: Submit a job
133+
134+
Run the job submitter. It connects to Temporal, submits the workflow, and **waits
135+
for the result**. If the worker fails to process the job (e.g., deserialization
136+
error), the submitter may hang for a long time waiting for a response that never
137+
comes. Run it in the background so you can check worker output while it's waiting.
138+
139+
Dry run (no real LLM calls):
140+
```bash
141+
TEMPORAL_BUNDLE="tests/integration/pipelex/pipes/controller/pipe_sequence/pipe_sequence_1.mthds"
142+
tmux has-session -t temporal-submitter 2>/dev/null || \
143+
tmux new-session -d -s temporal-submitter \
144+
"cd $PWD && .venv/bin/pipelex run bundle $TEMPORAL_BUNDLE --temporal --dry-run --mock-inputs --no-logo"
145+
```
146+
147+
Or for real LLM execution:
148+
```bash
149+
TEMPORAL_BUNDLE="tests/integration/pipelex/pipes/controller/pipe_sequence/pipe_sequence_1.mthds"
150+
tmux has-session -t temporal-submitter 2>/dev/null || \
151+
tmux new-session -d -s temporal-submitter \
152+
"cd $PWD && .venv/bin/pipelex run bundle $TEMPORAL_BUNDLE --temporal --mock-inputs --no-logo"
153+
```
154+
155+
Both default to `pipe_sequence_1.mthds`. To target a specific pipe, add `--pipe <pipe_code>`.
156+
Override the bundle by changing `TEMPORAL_BUNDLE`.
157+
158+
### Step 4: Diagnose the output
159+
160+
Read the submitter output (from step 3) AND the worker output:
161+
```bash
162+
tmux capture-pane -t temporal-worker -p -S -200
163+
```
164+
165+
**Expected failure (bug not yet fixed):**
166+
167+
There are two failure layers, both caused by the missing library on the worker.
168+
See `references/temporal-worker-problem.md` for details.
169+
170+
**Layer 1 — Deserialization failure** (hits first):
171+
1. The PipeJob's WorkingMemory contains Stuff objects with dynamically-generated
172+
concept content classes (e.g., `RawText` inheriting from `TextContent`)
173+
2. These classes are generated during library loading by `ConceptFactory` /
174+
`StructureGenerator` and registered with Kajson's class registry
175+
3. On the worker, the library was never loaded → these classes don't exist →
176+
Kajson fails with `KajsonDecoderError: Class 'RawText' not found in module 'builtins'`
177+
4. Temporal wraps this as `RuntimeError: Failed decoding arguments`
178+
5. The submitter may hang waiting for a result that never comes
179+
180+
**Layer 2 — Library resolution failure** (would hit after Layer 1 is fixed):
181+
1. `WfPipeRouter.run()` receives the PipeJob with the top-level PipeSequence
182+
2. `PipeSequence.run_pipe()` calls `get_required_pipe("clean_text")`
183+
3. `library_manager` singleton is empty on the worker → error
184+
4. Propagates as `TemporalError` / `ActivityError` to the submitter
185+
186+
The submitter output will show a Temporal workflow failure (or hang indefinitely
187+
for Layer 1 failures).
188+
189+
**After fix is applied (success looks like):**
190+
- Submitter: successful pipeline result printed to stdout
191+
- Worker (`tmux capture-pane`): logs showing pipe execution steps
192+
- Temporal UI (http://localhost:8233): completed workflow with result
193+
194+
### Step 5: Iterate
195+
196+
1. Kill and restart the worker (to pick up code changes):
197+
```bash
198+
tmux kill-session -t temporal-worker
199+
tmux new-session -d -c "$PWD" -s temporal-worker 'PIPELEXPATH=tests/integration/pipelex/pipes/controller/pipe_sequence .venv/bin/python -m pipelex.temporal.worker_cli --is-not-sandboxed'
200+
sleep 5
201+
```
202+
2. Make code changes
203+
3. Run `make trund` again and read the result
204+
4. Capture worker output: `tmux capture-pane -t temporal-worker -p -S -200`
205+
5. Repeat
206+
207+
The server session (`temporal-server`) stays running across iterations.
208+
209+
### Cleanup
210+
211+
When done with the entire session:
212+
```bash
213+
tmux kill-session -t temporal-worker 2>/dev/null
214+
tmux kill-session -t temporal-server 2>/dev/null
215+
```
216+
217+
### Test bundles for different pipe controllers
218+
219+
| Controller | Bundle path |
220+
|------------|-------------|
221+
| PipeSequence | `tests/integration/pipelex/pipes/controller/pipe_sequence/pipe_sequence_1.mthds` |
222+
| PipeCondition | `tests/integration/pipelex/pipes/controller/pipe_condition/pipe_condition_1.mthds` |
223+
| PipeBatch | `tests/integration/pipelex/pipes/controller/pipe_batch/uppercase_transformer.mthds` |
224+
| PipeParallel | `tests/integration/pipelex/pipes/controller/pipe_parallel/pipe_parallel_1.mthds` |
225+
226+
---
227+
228+
## FIX Mode
229+
230+
Discuss architecture, design choices, and implementation for solving the worker
231+
library loading problem. Stay in discussion/planning territory — do NOT jump to
232+
code changes unless the user explicitly says to implement.
233+
234+
### What you must understand first
235+
236+
Read `references/temporal-worker-problem.md` thoroughly. The core tension:
237+
- `pipeline_run_setup()` loads the library into `library_manager` — but only in the API process
238+
- PipeJob carries the serialized top-level pipe, but child pipes are resolved by code at runtime
239+
- On the worker, `get_required_pipe()` finds an empty library
240+
241+
### Design dimensions to discuss with the user
242+
243+
1. **Where does the library load on the worker?**
244+
- At worker startup (base library from PIPELEXPATH)?
245+
- Per-workflow in an Activity (custom bundles from mthds_contents)?
246+
- Both (two-tier cache)?
247+
248+
2. **What travels with the workflow input?**
249+
- Today: a pre-resolved `PipeJob` with the top-level pipe object
250+
- Option A: send `mthds_contents` + `pipe_code` instead, resolve on worker
251+
- Option B: send `PipeJob` but also include `mthds_contents` for the worker to load
252+
253+
3. **Replay safety**
254+
- Library loading is I/O — it belongs in Activities, not workflow code
255+
- Side-effect state (loading into a singleton) is lost on replay
256+
- Activities re-execute cleanly on replay
257+
258+
4. **Caching strategy**
259+
- Tier 1: base library at worker startup (same for all executions)
260+
- Tier 2: per-request overlay cached by content hash of mthds_contents
261+
262+
### Key files to read and discuss
263+
264+
| What | Where |
265+
|------|-------|
266+
| Library loading (API-side) | `pipelex/pipeline/pipeline_run_setup.py` |
267+
| Hub singleton + get_required_pipe | `pipelex/hub.py` |
268+
| Workflow definition | `pipelex/temporal/tprl_pipe/wf_pipe_router.py` |
269+
| Router (Temporal) | `pipelex/temporal/tprl_pipe/pipe_router_top.py` |
270+
| Router (local, works fine) | `pipelex/pipe_run/pipe_router.py` |
271+
| Worker startup | `pipelex/temporal/worker_cli.py` |
272+
| All controllers that break | `pipelex/pipe_controllers/` (sequence, condition, batch, parallel, sub_pipe) |
273+
| Library manager | `pipelex/libraries/library_manager.py` |
274+
275+
### When the user says "implement"
276+
277+
Only then shift to making code changes. Use the diagnose loop to verify each change:
278+
1. Make code changes yourself
279+
2. Restart the worker:
280+
```bash
281+
tmux kill-session -t temporal-worker
282+
tmux new-session -d -c "$PWD" -s temporal-worker 'PIPELEXPATH=tests/integration/pipelex/pipes/controller/pipe_sequence .venv/bin/python -m pipelex.temporal.worker_cli --is-not-sandboxed'
283+
sleep 5
284+
```
285+
3. Run `make trund` via Bash and read the output
286+
4. Capture worker output: `tmux capture-pane -t temporal-worker -p -S -200`
287+
5. Repeat
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Temporal Worker Library Problem
2+
3+
## The Bug
4+
5+
When pipelex runs pipe controllers (PipeSequence, PipeCondition, PipeBatch, PipeParallel, SubPipe) via Temporal, they fail because the library is not loaded on the worker process. The missing library causes two cascading failures.
6+
7+
## Root Cause
8+
9+
```
10+
API Process Temporal Worker
11+
───────────────────────────────────────── ──────────────────────────────
12+
PipelexRunner.execute_pipeline()
13+
├─ pipeline_run_setup()
14+
│ ├─ Loads library (library_manager) (empty here)
15+
│ ├─ Generates dynamic concept classes (classes don't exist here)
16+
│ ├─ Registers them with Kajson (Kajson registry incomplete here)
17+
│ ├─ Resolves pipe by code (can't resolve here)
18+
│ └─ Creates PipeJob (top-level pipe)
19+
└─ PipeRouterTop sends PipeJob ──────────► WfPipeRouter.run(pipe_job)
20+
to Temporal ├─ Kajson deserializes PipeJob
21+
│ └─ FAILS (Layer 1): unknown class
22+
└─ pipe.run_pipe()
23+
└─ get_required_pipe() FAILS (Layer 2)
24+
```
25+
26+
1. `pipeline_run_setup()` loads the library into an in-memory `library_manager` singleton — **only in the API process**.
27+
2. During library loading, `ConceptFactory` dynamically generates Python classes for concepts defined in `.mthds` bundles (e.g., `RawText = "Raw input text..."` generates a `RawText` class inheriting from `TextContent`) and registers them with Kajson's class registry.
28+
3. The `PipeJob` is serialized via Kajson, which embeds `__class__` / `__module__` metadata for all Pydantic objects — including these dynamically-generated concept classes.
29+
4. On the worker, the library was never loaded → these dynamic classes don't exist → **Kajson deserialization fails** before the workflow even starts (Layer 1).
30+
5. Even if deserialization succeeded, child pipes are referenced **by code** via `get_required_pipe()`, which queries the empty `library_manager` singleton (Layer 2).
31+
6. Temporal can replay workflows on different workers, so any side-effect library state is lost.
32+
33+
## Key Code Paths
34+
35+
| What | Where |
36+
|------|-------|
37+
| Library loading | `pipelex/pipeline/pipeline_run_setup.py``library_manager` |
38+
| Dynamic concept class generation | `pipelex/core/concepts/concept_factory.py``_handle_basic_blueprint()` |
39+
| Structure generator (creates the classes) | `pipelex/core/concepts/structure_generation/generator.py` |
40+
| Kajson class registration | `pipelex/pipelex.py:353` (CoreRegistryModels) + `concept_factory.py:359` (dynamic) |
41+
| Kajson data converter (Temporal serde) | `pipelex/temporal/temporal_data_converter.py` |
42+
| `get_required_pipe()` | `pipelex/hub.py:511` |
43+
| Callers that break (Layer 2) | `pipelex/pipe_controllers/sequence/pipe_sequence.py`, `condition/pipe_condition.py`, `batch/pipe_batch.py`, `parallel/pipe_parallel.py`, `sub_pipe.py` |
44+
| Workflow definition | `pipelex/temporal/tprl_pipe/wf_pipe_router.py` |
45+
| Router (Temporal) | `pipelex/temporal/tprl_pipe/pipe_router_top.py` |
46+
| Router (local) | `pipelex/pipe_run/pipe_router.py` |
47+
| Worker CLI | `pipelex/temporal/worker_cli.py` |
48+
49+
## What the Library Contains
50+
51+
- **Base libraries** — shared pipe/concept definitions from `PIPELEXPATH` directories. Same for all executions.
52+
- **Custom bundles** — per-request `mthds_contents` (MTHDS bundle strings). Each API call can bring its own definitions.
53+
54+
## What Library Loading Does (beyond populating pipes)
55+
56+
Loading a library also **generates dynamic Python classes** for concepts. When a `.mthds` file
57+
declares a simple concept like `RawText = "Raw input text..."`, `ConceptFactory._handle_basic_blueprint()`
58+
calls `StructureGenerator.generate_from_structure_blueprint()` to create a new Python class named
59+
`RawText` inheriting from `TextContent`, then registers it with Kajson's class registry. These
60+
dynamically-generated classes are used as the `content` type of `Stuff` objects in the `WorkingMemory`.
61+
62+
When the PipeJob is serialized via Kajson for Temporal transport, these objects carry
63+
`__class__: "RawText"` and `__module__: "builtins"` metadata. The worker must have these classes
64+
registered before it can deserialize the PipeJob.
65+
66+
## Why Tests Don't Catch It
67+
68+
- Integration tests use local `PipeRouter` (in-process), not Temporal. Library is shared.
69+
- Temporal tests only test leaf workflows (text gen, jinja2) that don't call `get_required_pipe()`.
70+
- No test sends a pipe controller through `WfPipeRouter`.
71+
72+
## Expected Error Patterns
73+
74+
The bug manifests in two layers. Layer 1 hits first and prevents Layer 2 from being reached.
75+
76+
### Layer 1: Kajson deserialization failure (hits first)
77+
78+
On the **worker** stderr:
79+
- `KajsonDecoderError: Class '<ConceptCode>' not found in module 'builtins'`
80+
(e.g., `Class 'RawText' not found in module 'builtins'`)
81+
- Wrapped as `RuntimeError: Failed decoding arguments` by Temporal's workflow instance
82+
- The concept code (e.g., `RawText`) is a dynamically-generated Python class created by
83+
`ConceptFactory._handle_basic_blueprint()` during library loading — which never ran on the worker
84+
- The submitter may **hang indefinitely** waiting for a workflow result that will never arrive
85+
86+
### Layer 2: Library resolution failure (would hit after Layer 1 is fixed)
87+
88+
On the **worker** stderr:
89+
- Errors from `get_required_pipe()` — pipe code not found in library
90+
- The error originates inside `run_pipe()` of a controller (PipeSequence, PipeCondition, etc.)
91+
- The API/submitter side will see a `TemporalError` or `ActivityError` wrapping it
92+
93+
## Proposed Fix Direction
94+
95+
Loading the library on the worker fixes both layers: it generates and registers the dynamic
96+
concept classes (fixing Layer 1 deserialization) and populates `library_manager` with pipe
97+
definitions (fixing Layer 2 resolution).
98+
99+
1. Workers load base library at startup from PIPELEXPATH (Tier 1 cache) — this generates
100+
dynamic concept classes and registers them with Kajson, enabling deserialization.
101+
2. `mthds_contents` travels with workflow input (not consumed on API side).
102+
3. Library loading happens in Activities (replay-safe), not workflow code.
103+
4. Per-request overlay cached by content hash (Tier 2 cache).

0 commit comments

Comments
 (0)