[MCP Integration] Phase 6: Job management service#2239
[MCP Integration] Phase 6: Job management service#2239aniruddh-alt merged 10 commits intoani/mcp-integration-05-docs-svcfrom
Conversation
|
Important Upgrade your plan to unlock code review, CI analysis, custom rules, and more. |
03c5392 to
0d68b18
Compare
2a4d610 to
6602b6b
Compare
fae7907 to
aaed3bb
Compare
6602b6b to
11b574c
Compare
aaed3bb to
9e7b6a2
Compare
11b574c to
8a4fe87
Compare
src/oumi/mcp/job_service.py
Outdated
| except Exception as exc: | ||
| rt.error_message = str(exc) | ||
| logger.exception("Failed to launch cloud job %s", record.job_id) | ||
| return "" |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
src/oumi/mcp/job_service.py
Outdated
|
|
||
| return removed | ||
|
|
||
| def _save(self) -> None: |
There was a problem hiding this comment.
Could we have the MCP server store this in somewhere less temporary? I'm thinking about what would happen if the MCP restarts, it'd be nice to not lose track of what jobs were submitted.
|
|
||
| """Job management service for Oumi MCP execution tools. | ||
|
|
||
| Provides job submission, status polling, cancellation, and log streaming |
There was a problem hiding this comment.
This file is getting a bit overwhelming in implementation details - could we move job handling and logs to separate .py files and have this service take them in as dependencies in its constructor? That will clean up a lot of the excess private methods and logic being brought in here when ideally we'd want to keep this file clean since it's acting as the interface.
d84edd0 to
5a9cd18
Compare
fa46a23 to
b97b16f
Compare
| except TimeoutError: | ||
| return { | ||
| "success": False, | ||
| "error": ( | ||
| f"Cancel timed out after 30s " | ||
| f"(cloud={cloud}, cluster={cluster_name}, id={job_id}). " | ||
| "The cancellation may still be in progress. " | ||
| "Check cloud console or retry." | ||
| ), |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| except asyncio.TimeoutError: | ||
| raw = "".join(chunks) |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
Add job_service.py for job submission (local subprocess + cloud via oumi.launcher), status polling, cancellation, and log streaming. Includes JobRegistry for tracking active jobs and registry tests. Part of the MCP integration PR chain (Phase 6 of 10). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Use asyncio.Lock for _runtimes dict concurrency protection - Ensure subprocesses are killed and reaped on cancellation - Use atomic write pattern in JobRegistry._save() - Cap unbounded lines parameter in log streaming
Pydantic requires typing_extensions.TypedDict (not typing.TypedDict) on Python < 3.12 when NotRequired fields are present. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5a9cd18 to
eb40707
Compare
b97b16f to
ad58177
Compare
Co-authored-by: Aniruddhan Ramesh <aniruddhanramesh@Aniruddhans-MacBook-Pro.local>
Description
Part of the MCP Integration PR chain (Phase 6 of 10) - Stage: service
What changed: Added
job_service.pyfor job submission (local subprocess + cloud viaoumi.launcher), status polling, cancellation, and async log streaming. IncludesJobRegistryfor tracking active jobs and registry tests.Why: Job management is the core execution capability of the MCP server — it enables launching, monitoring, and controlling ML training jobs.
Related issues
Before submitting