From f772b716114af449a7be17729b21e1459f100aec Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 27 May 2026 16:15:26 -0700 Subject: [PATCH 1/2] fix(celery): improve task dispatch + cancellation to prevent stuck jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses three reinforcing causes of run_job head-of-line blocking on the jobs queue (RolnickLab/antenna#1323) plus an orthogonal cancel bug exposed by the same investigation. - Enable fair scheduling (CELERY_WORKER_POOL_OPTIMIZATION = "fair") so the master process holds prefetched messages in a shared buffer instead of pre-assigning them to specific prefork children. Long heterogeneous tasks (notably run_job inside filter_processed_images) no longer block newer messages stuck behind them on the same child. - Add acks_late=True + reject_on_worker_lost=True to run_job so a worker SIGKILL/OOM mid-task triggers broker redelivery instead of silently dropping the job. Pairs with an early-guard at the top of run_job that returns cleanly if the Job is already in a terminal state or being cancelled, so redelivery never re-runs side effects. - Fix Job.cancel for ASYNC_API: skip terminate=True on the (likely-done) run_job task — the actual work runs on remote ADC workers via NATS, and cleanup_async_job_if_needed is what stops it. Terminating the local bootstrap was a no-op at best and SIGTERM'd a still-bootstrapping child at worst. INTERNAL / SYNC_API keep terminate=True since their celery task body owns the entire job lifecycle. - Document the optional CELERY_WORKER_CONCURRENCY=4 override on the celeryworker_jobs container (commented out for now) so operators can opt in once -O fair is observed in production. Co-Authored-By: Claude --- ami/jobs/models.py | 38 ++++++++++----- ami/jobs/tasks.py | 53 +++++++++++++++------ ami/jobs/tests/test_jobs.py | 90 ++++++++++++++++++++++++++++++++++-- ami/jobs/tests/test_tasks.py | 82 ++++++++++++++++++++++++++++++++ config/settings/base.py | 12 +++++ docker-compose.worker.yml | 10 ++++ 6 files changed, 254 insertions(+), 31 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 3669b65c4..0e3c8be5e 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -1067,25 +1067,39 @@ def retry(self, async_task=True): def cancel(self): """ - Cancel a job. For async_api jobs, clean up NATS/Redis resources - and transition through CANCELING → REVOKED. For other jobs, - revoke the Celery task. + Cancel a job. + + For ASYNC_API jobs the long-running work is on remote ADC workers via + NATS, not in the local ``run_job`` celery task — by the time the user + clicks cancel, ``run_job`` has usually already finished + ``queue_images_to_nats`` and returned. Tearing down the NATS stream + + Redis state (``cleanup_async_job_if_needed``) is what actually stops + further work: ADC stops being delivered tasks, and any in-flight + result handlers see no Redis state and fast-fail. Calling + ``revoke(terminate=True)`` on the (likely-done) run_job would SIGTERM + the worker child if it happens to still be inside the bootstrap (e.g. + a slow ``filter_processed_images`` for a huge collection), which + prior to ``acks_late`` was an unrecoverable message loss. We revoke + without terminate so a not-yet-started copy is dropped without + killing in-flight bootstrap; the in-flight copy then notices + ``status == CANCELING`` via the early-guard in ``run_job`` next time + it's invoked (e.g. on redelivery) and bails out cleanly. + + For INTERNAL / SYNC_API jobs the celery task body owns the entire + job lifecycle, so terminating it remains the only way to stop + active work. """ self.status = JobState.CANCELING self.save() + is_async_api = self.dispatch_mode == JobDispatchMode.ASYNC_API if self.task_id: task = run_job.AsyncResult(self.task_id) if task: - task.revoke(terminate=True) - if self.dispatch_mode == JobDispatchMode.ASYNC_API: - # For async jobs we need to set the status to revoked here since the task already - # finished (it only queues the images). - self.status = JobState.REVOKED - self.save() - else: - self.status = JobState.REVOKED - self.save() + task.revoke(terminate=not is_async_api) + + self.status = JobState.REVOKED + self.save() cleanup_async_job_if_needed(self) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index f3c996f86..9d838979b 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -136,30 +136,53 @@ def update_async_services_seen_for_project(project_id: int) -> None: ) -@celery_app.task(bind=True, soft_time_limit=default_soft_time_limit, time_limit=default_time_limit) +# acks_late + reject_on_worker_lost so a worker SIGKILL/OOM mid-task does not +# silently drop the job: the broker holds the message until the task body +# either completes successfully or raises, and redelivers if the worker dies. +# Pairs with the early-guard below — a redelivered run_job that finds the job +# already in a terminal state (or mid-cancellation) returns cleanly instead of +# re-running side effects. See RolnickLab/antenna#1323. +@celery_app.task( + bind=True, + soft_time_limit=default_soft_time_limit, + time_limit=default_time_limit, + acks_late=True, + reject_on_worker_lost=True, +) def run_job(self, job_id: int) -> None: - from ami.jobs.models import Job + from ami.jobs.models import Job, JobState try: job = Job.objects.get(pk=job_id) except Job.DoesNotExist as e: raise e # self.retry(exc=e, countdown=1, max_retries=1) + + # Early-guard: under acks_late, the broker may redeliver this message after a + # worker SIGKILL/OOM, and Job.cancel() may also flip status to CANCELING / + # REVOKED while the message sits in the prefetch buffer. Don't re-run a job + # that's already settled or being torn down. + if job.status in JobState.final_states() or job.status == JobState.CANCELING: + job.logger.info( + f"Skipping run_job for job {job.pk}: already in status {job.status} " + f"(redelivery or cancellation in flight)" + ) + return + + job.logger.info(f"Running job {job}") + try: + job.run() + except Exception as e: + job.logger.error(f'Job #{job.pk} "{job.name}" failed: {e}') + raise else: - job.logger.info(f"Running job {job}") - try: - job.run() - except Exception as e: - job.logger.error(f'Job #{job.pk} "{job.name}" failed: {e}') - raise - else: - from ami.jobs.models import JobDispatchMode + from ami.jobs.models import JobDispatchMode - job.refresh_from_db() - if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete(): - _log_worker_availability(job) - else: - job.logger.info(f"Finished job {job}") + job.refresh_from_db() + if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete(): + _log_worker_availability(job) + else: + job.logger.info(f"Finished job {job}") def _log_worker_availability(job) -> None: diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 783180ed7..4304ac3b6 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -375,10 +375,92 @@ def test_run_job_unauthenticated(self): # Accept either 401 (TokenAuthentication) or 403 (SessionAuthentication with AnonymousUser) self.assertIn(resp.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]) - def test_cancel_job(self): - # This cannot be tested until we have a way to cancel jobs - # and a way to run async tasks in tests. - pass + def test_cancel_async_api_job_does_not_terminate_celery_task(self): + """ASYNC_API cancel must revoke without terminate=True. + + The remote ADC worker is doing the actual work via NATS — terminating + the (likely-done) local ``run_job`` bootstrap doesn't stop them, and + SIGTERM'ing a still-bootstrapping child loses the message under the + broker's early-ack default. Cleanup of NATS/Redis state is what + actually stops further work. + """ + from unittest.mock import MagicMock, patch + + job = Job.objects.create( + project=self.project, + name="Cancel async_api", + task_id="fake-async-task-id", + status=JobState.STARTED, + dispatch_mode=JobDispatchMode.ASYNC_API, + ) + + with patch("ami.jobs.models.run_job") as mock_run_job, patch( + "ami.jobs.models.cleanup_async_job_if_needed" + ) as mock_cleanup: + mock_task = MagicMock() + mock_run_job.AsyncResult.return_value = mock_task + + job.cancel() + + mock_run_job.AsyncResult.assert_called_once_with("fake-async-task-id") + mock_task.revoke.assert_called_once_with(terminate=False) + mock_cleanup.assert_called_once_with(job) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED) + + def test_cancel_sync_api_job_terminates_celery_task(self): + """SYNC_API / INTERNAL cancel must keep terminate=True. + + Their celery task body owns the entire job lifecycle, so terminating + the task is the only way to stop active work. + """ + from unittest.mock import MagicMock, patch + + job = Job.objects.create( + project=self.project, + name="Cancel sync_api", + task_id="fake-sync-task-id", + status=JobState.STARTED, + dispatch_mode=JobDispatchMode.SYNC_API, + ) + + with patch("ami.jobs.models.run_job") as mock_run_job, patch( + "ami.jobs.models.cleanup_async_job_if_needed" + ) as mock_cleanup: + mock_task = MagicMock() + mock_run_job.AsyncResult.return_value = mock_task + + job.cancel() + + mock_task.revoke.assert_called_once_with(terminate=True) + mock_cleanup.assert_called_once_with(job) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED) + + def test_cancel_job_without_task_id_still_revokes(self): + """A job that never made it to enqueue (no task_id) still transitions + to REVOKED and triggers async-cleanup (a no-op for non-ASYNC_API).""" + from unittest.mock import patch + + job = Job.objects.create( + project=self.project, + name="Cancel never-enqueued", + task_id="", + status=JobState.PENDING, + dispatch_mode=JobDispatchMode.INTERNAL, + ) + + with patch("ami.jobs.models.run_job") as mock_run_job, patch( + "ami.jobs.models.cleanup_async_job_if_needed" + ) as mock_cleanup: + job.cancel() + mock_run_job.AsyncResult.assert_not_called() + mock_cleanup.assert_called_once_with(job) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED) def test_list_jobs_with_ids_only(self): """Test the ids_only parameter returns only job IDs.""" diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 89587f3f1..813d5e2b9 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -626,6 +626,88 @@ def test_task_failure_marks_sync_api_job_failure_and_cleans_up(self, mock_cleanu mock_cleanup.assert_called_once() +class TestRunJobEarlyGuard(TransactionTestCase): + """ + run_job early-guard regression tests. + + With ``acks_late=True`` and ``reject_on_worker_lost=True`` on the task, + the broker will redeliver a run_job message if a worker dies mid-task + (SIGKILL, OOM, deploy roll). The early-guard at the top of ``run_job`` + short-circuits when the Job is already in a terminal state or being + cancelled, so a redelivery — or a cancel-and-retry race — does not + re-run side effects. See RolnickLab/antenna#1323. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="run_job guard project") + self.pipeline = Pipeline.objects.create(name="run_job guard pipeline", slug="run-job-guard-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="run_job guard collection", project=self.project) + + def tearDown(self): + cache.clear() + + def _make_job(self, status: JobState) -> Job: + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name=f"run_job guard {status}", + pipeline=self.pipeline, + source_image_collection=self.collection, + ) + job.status = status + job.save() + return job + + def test_skips_when_job_already_revoked(self): + """Redelivery after the user cancelled and the job settled to REVOKED.""" + from ami.jobs.tasks import run_job + + job = self._make_job(JobState.REVOKED) + + with patch.object(Job, "run") as mock_run: + result = run_job.apply(args=[job.pk]) + + self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}") + mock_run.assert_not_called() + + def test_skips_when_job_canceling(self): + """Cancel arrived after the message was already in the prefetch buffer.""" + from ami.jobs.tasks import run_job + + job = self._make_job(JobState.CANCELING) + + with patch.object(Job, "run") as mock_run: + result = run_job.apply(args=[job.pk]) + + self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}") + mock_run.assert_not_called() + + def test_skips_when_job_already_success(self): + """Redelivery after the job actually completed (e.g. ack lost in transit).""" + from ami.jobs.tasks import run_job + + job = self._make_job(JobState.SUCCESS) + + with patch.object(Job, "run") as mock_run: + result = run_job.apply(args=[job.pk]) + + self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}") + mock_run.assert_not_called() + + def test_runs_when_job_pending(self): + """Contract pair: a healthy first-delivery still calls Job.run().""" + from ami.jobs.tasks import run_job + + job = self._make_job(JobState.PENDING) + + with patch.object(Job, "run") as mock_run: + run_job.apply(args=[job.pk]) + + mock_run.assert_called_once() + + class TestResultEndpointWithError(APITestCase): """Integration test for the result API endpoint with error results.""" diff --git a/config/settings/base.py b/config/settings/base.py index eabb593d7..59fc7c4ea 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -429,6 +429,18 @@ def _celery_result_backend_url(redis_url): CELERY_WORKER_PREFETCH_MULTIPLIER = 1 CELERY_WORKER_ENABLE_PREFETCH_COUNT_REDUCTION = True +# Fair scheduling for the prefork pool. +# https://docs.celeryq.dev/en/stable/userguide/configuration.html#worker-pool-optimization +# Under the default scheduler the master process pre-assigns prefetched +# messages to specific prefork children at delivery time, so a long task +# pinned to one child causes head-of-line blocking even when sibling +# children are idle. Fair mode holds prefetched messages in a shared buffer +# and hands one to a child only when that child is genuinely idle, which +# matters for queues that mix heterogeneous-duration tasks (notably ``jobs``, +# where ``run_job`` can sit inside ``filter_processed_images`` for minutes). +# See RolnickLab/antenna#1323. +CELERY_WORKER_POOL_OPTIMIZATION = "fair" + # Split Celery work across three queues so one class of task can't starve # another. Staging/production/worker compose files each run a dedicated # worker service per queue; local/CI use a single worker consuming all queues. diff --git a/docker-compose.worker.yml b/docker-compose.worker.yml index e7f8c83e5..dd94399bd 100644 --- a/docker-compose.worker.yml +++ b/docker-compose.worker.yml @@ -51,6 +51,16 @@ services: command: /start-celeryworker environment: CELERY_QUEUES: "jobs" + # TODO(#1323): consider overriding CELERY_WORKER_CONCURRENCY to ~4 here. + # Counter-intuitive but: per-container prefetch = concurrency × prefetch_multiplier(=1), + # so a 16-wide jobs container reserves up to 16 messages on a single broker channel. + # When one run_job sits idle inside filter_processed_images for minutes, the other 15 + # reserved slots block sibling jobs from being offered to an idle peer container. + # Lowering to 4 shrinks the reservation window so the broker spills aggressively to + # peers. Acceptable for jobs queue because run_job spends most of its time waiting on + # NATS results rather than CPU. Other queues (ml_results, antenna) want the larger + # pool — they're DB/Redis-bound and benefit from oversubscription. + # CELERY_WORKER_CONCURRENCY: "4" restart: always celeryworker_ml: From c8e6c8a00eddc9050ea619582d00eb9543d59388 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 27 May 2026 18:14:57 -0700 Subject: [PATCH 2/2] fix(jobs): close prerun-clobber + orphan-NATS-dispatch races MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review on #1324 surfaced two races that left the early-guard non-functional in production: 1. ``task_prerun`` (``pre_update_job_status``) wrote PENDING to the row before the ``run_job`` body inspected status. A canceled or redelivered message therefore had its REVOKED/CANCELING overwritten with PENDING, and the early-guard added in the parent commit never tripped. The existing tests passed only because they invoked ``run_job.apply(args=[…])`` while production uses ``kwargs={"job_id": …}`` — under args, the prerun handler raised ``KeyError`` and exited silently. Switching the tests to ``kwargs=`` reproduces the production code path; the prerun handler now short-circuits when ``Job.is_settled()`` is true, preserving the status the early-guard reads next. 2. For ASYNC_API jobs ``Job.cancel()`` revokes without ``terminate=True``, marks the row REVOKED, and tears down the NATS stream + Redis state. ``MLJob.run`` running in a worker that's still inside ``collect_images`` (slow for large collections) would then proceed to ``queue_images_to_nats`` and recreate the stream the cancel just deleted, dispatching real GPU work to ADC for a revoked job; the results came back to no Redis state and ``_fail_job`` silently overwrote REVOKED with FAILURE. The bootstrap now checks ``Job.status`` (via a values-only read so the in-memory ``progress`` mutations don't clobber the cancel's REVOKED) right after the collect stage and bails out before any dispatch. Adds ``Job.is_settled()`` to centralize the "terminal or being torn down" predicate that ``run_job``'s early-guard, the prerun handler, ``_fail_job``, and the bootstrap guard all needed. Adds two regression tests: one for the prerun-then-guard chain, one for the cancel-during-bootstrap race. Co-Authored-By: Claude --- ami/jobs/models.py | 37 ++++++++++++++++++++++++++++++ ami/jobs/tasks.py | 39 ++++++++++++++++++++++++++++---- ami/jobs/tests/test_jobs.py | 44 ++++++++++++++++++++++++++++++++++++ ami/jobs/tests/test_tasks.py | 27 ++++++++++++++++++---- 4 files changed, 138 insertions(+), 9 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 0e3c8be5e..bf869a3ed 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -529,6 +529,28 @@ def run(cls, job: "Job"): progress=1, ) + # Mid-bootstrap cancel guard. ``collect_images`` above can run for many + # minutes on large collections (S3 list + DB joins), and the user may + # cancel during that window. ``Job.cancel()`` for ASYNC_API does + # ``revoke(terminate=False)`` to avoid SIGKILL'ing this worker, then + # writes REVOKED + tears down the NATS stream / Redis state. Without + # this check we would (a) clobber the cancel's REVOKED via the next + # full ``job.save()`` and (b) proceed to ``queue_images_to_nats``, + # recreating the stream the cancel just deleted and dispatching real + # GPU work to ADC for a revoked job. Refresh is read-only against the + # ``status`` column; the in-memory ``progress`` mutations from the + # collect stage are intentionally dropped on the bail path because the + # job is settled — no further progress writes make sense. Covers + # ASYNC_API (NATS dispatch) and SYNC paths (Celery sub-tasks in + # ``process_images``); INTERNAL jobs benefit too. See + # RolnickLab/antenna#1323. + db_status = Job.objects.values_list("status", flat=True).get(pk=job.pk) + if db_status in JobState.final_states() or db_status == JobState.CANCELING: + job.logger.info( + f"Job {job.pk} settled to {db_status} during bootstrap; " f"skipping dispatch of {len(images)} images" + ) + return + # End image collection stage job.save() @@ -1041,6 +1063,21 @@ def setup(self, save=True): if save: self.save() + def is_settled(self) -> bool: + """Return True when the job is in a terminal state or being cancelled. + + Used by every code path that must not start (or continue) work for a + job whose lifecycle has effectively ended: the ``run_job`` early-guard + (after acks_late redelivery), the ``MLJob.run`` mid-bootstrap cancel + check (before ``queue_images_to_nats`` dispatches GPU work to ADC), + the prerun signal handler (so a redelivered or canceled message does + not get its status reset to PENDING), and ``_fail_job``. Centralized + so the predicate stays in one place — adding a new "do not resume" + status only requires touching :meth:`JobState.final_states` or this + method. + """ + return self.status in JobState.final_states() or self.status == JobState.CANCELING + def run(self): """ Run the job. diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 9d838979b..094e4ab7d 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -150,7 +150,7 @@ def update_async_services_seen_for_project(project_id: int) -> None: reject_on_worker_lost=True, ) def run_job(self, job_id: int) -> None: - from ami.jobs.models import Job, JobState + from ami.jobs.models import Job try: job = Job.objects.get(pk=job_id) @@ -161,8 +161,10 @@ def run_job(self, job_id: int) -> None: # Early-guard: under acks_late, the broker may redeliver this message after a # worker SIGKILL/OOM, and Job.cancel() may also flip status to CANCELING / # REVOKED while the message sits in the prefetch buffer. Don't re-run a job - # that's already settled or being torn down. - if job.status in JobState.final_states() or job.status == JobState.CANCELING: + # that's already settled or being torn down. The companion guard in + # pre_update_job_status above prevents the task_prerun signal from + # overwriting that status with PENDING before we get here. + if job.is_settled(): job.logger.info( f"Skipping run_job for job {job.pk}: already in status {job.status} " f"(redelivery or cancellation in flight)" @@ -444,7 +446,7 @@ def _fail_job(job_id: int, reason: str) -> None: try: with transaction.atomic(): job = Job.objects.select_for_update().get(pk=job_id) - if job.status in (JobState.CANCELING, *JobState.final_states()): + if job.is_settled(): return job.update_status(JobState.FAILURE, save=False) job.finished_at = datetime.datetime.now() @@ -1327,7 +1329,34 @@ def cleanup_async_job_if_needed(job) -> None: @task_prerun.connect(sender=run_job) def pre_update_job_status(sender, task_id, task, **kwargs): - # in the prerun signal, set the job status to PENDING + """Bump the job to PENDING when a worker picks the message up. + + Skipped when the job is already settled (terminal state) or being + cancelled. Without that guard, a broker redelivery (acks_late + worker + crash) or a cancel that arrived while the message was still in the + prefetch buffer would have its REVOKED/CANCELING status silently + overwritten with PENDING here, and the ``run_job`` early-guard + (which reads ``Job.status`` after this signal fires) would then fail + to short-circuit and re-run the job. See RolnickLab/antenna#1323. + """ + from ami.jobs.models import Job + + job_id = task.request.kwargs.get("job_id") if task.request.kwargs else None + if job_id is None and task.request.args: + job_id = task.request.args[0] + if job_id is not None: + try: + job = Job.objects.only("status").get(pk=job_id) + except Job.DoesNotExist: + pass + else: + if job.is_settled(): + logger.info( + "task_prerun: skipping PENDING write for job %s in status %s " "(redelivery or cancel in flight)", + job_id, + job.status, + ) + return update_job_status(sender, task_id, task, "PENDING", **kwargs) diff --git a/ami/jobs/tests/test_jobs.py b/ami/jobs/tests/test_jobs.py index 4304ac3b6..ac65e32f9 100644 --- a/ami/jobs/tests/test_jobs.py +++ b/ami/jobs/tests/test_jobs.py @@ -439,6 +439,50 @@ def test_cancel_sync_api_job_terminates_celery_task(self): job.refresh_from_db() self.assertEqual(job.status, JobState.REVOKED) + def test_mljob_run_bails_when_cancelled_during_bootstrap(self): + """Regression for the cancel-during-bootstrap race in ASYNC_API jobs. + + ``Job.cancel()`` revokes without terminate=True (so the local worker is + not SIGTERM'd), marks the row REVOKED, and tears down NATS/Redis state. + If the worker is still inside ``MLJob.run`` (typically blocked in the + slow ``collect_images`` step), it must refresh the row and bail BEFORE + calling ``queue_images_to_nats`` — otherwise it would recreate the + stream and dispatch real GPU work to ADC for a revoked job. + """ + from unittest.mock import patch + + from ami.jobs.models import MLJob + + pipeline = Pipeline.objects.create(name="Cancel-race pipeline", slug="cancel-race-pipeline") + pipeline.projects.add(self.project) + collection = SourceImageCollection.objects.create(name="Cancel-race collection", project=self.project) + job = Job.objects.create( + project=self.project, + name="Cancel-race", + pipeline=pipeline, + source_image_collection=collection, + status=JobState.STARTED, + dispatch_mode=JobDispatchMode.ASYNC_API, + ) + job.setup() + + def cancel_mid_collect(*_args, **_kwargs): + # Simulate the user clicking cancel while collect_images is still + # running: rewrite the DB row out from under this in-flight task. + Job.objects.filter(pk=job.pk).update(status=JobState.REVOKED) + return [] + + with patch.object( + pipeline, + "collect_images", + side_effect=cancel_mid_collect, + ), patch("ami.ml.orchestration.jobs.queue_images_to_nats") as mock_queue: + MLJob.run(job) + + mock_queue.assert_not_called() + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED) + def test_cancel_job_without_task_id_still_revokes(self): """A job that never made it to enqueue (no task_id) still transitions to REVOKED and triggers async-cleanup (a no-op for non-ASYNC_API).""" diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 813d5e2b9..84e766a7a 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -667,7 +667,7 @@ def test_skips_when_job_already_revoked(self): job = self._make_job(JobState.REVOKED) with patch.object(Job, "run") as mock_run: - result = run_job.apply(args=[job.pk]) + result = run_job.apply(kwargs={"job_id": job.pk}) self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}") mock_run.assert_not_called() @@ -679,7 +679,7 @@ def test_skips_when_job_canceling(self): job = self._make_job(JobState.CANCELING) with patch.object(Job, "run") as mock_run: - result = run_job.apply(args=[job.pk]) + result = run_job.apply(kwargs={"job_id": job.pk}) self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}") mock_run.assert_not_called() @@ -691,7 +691,7 @@ def test_skips_when_job_already_success(self): job = self._make_job(JobState.SUCCESS) with patch.object(Job, "run") as mock_run: - result = run_job.apply(args=[job.pk]) + result = run_job.apply(kwargs={"job_id": job.pk}) self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}") mock_run.assert_not_called() @@ -703,10 +703,29 @@ def test_runs_when_job_pending(self): job = self._make_job(JobState.PENDING) with patch.object(Job, "run") as mock_run: - run_job.apply(args=[job.pk]) + run_job.apply(kwargs={"job_id": job.pk}) mock_run.assert_called_once() + def test_prerun_signal_does_not_clobber_revoked_status(self): + """ + Regression: the ``task_prerun`` signal would otherwise call + ``update_job_status(state="PENDING")`` and overwrite a REVOKED/CANCELING + status before the ``run_job`` early-guard reads it. With the prerun + guard in place, the status survives the signal, the early-guard fires, + and ``Job.run()`` is not called. + """ + from ami.jobs.tasks import run_job + + job = self._make_job(JobState.REVOKED) + + with patch.object(Job, "run") as mock_run: + run_job.apply(kwargs={"job_id": job.pk}) + + job.refresh_from_db() + self.assertEqual(job.status, JobState.REVOKED) + mock_run.assert_not_called() + class TestResultEndpointWithError(APITestCase): """Integration test for the result API endpoint with error results."""