Skip to content

Commit 13203e4

Browse files
mihowclaude
andauthored
Allow admins to customize the time gap between sessions (#1292)
* feat(projects): wire session_time_gap_seconds into event grouping The session_time_gap_seconds project setting was added in #918 but never used. group_images_into_events() still hardcoded a 120-minute gap. This wires the field through and exposes it in the Processing settings UI so each project can override the session boundary. Closes #906 Co-Authored-By: Claude <noreply@anthropic.com> * feat(api): add regroup-sessions action to deployment viewset POST /api/v2/deployments/<pk>/regroup-sessions/ queues the existing ami.tasks.regroup_events Celery task for the deployment. The task calls group_images_into_events(), which now reads the project's session_time_gap_seconds setting. User-facing terminology is "sessions"; "Event" is retained as the internal model name for backwards compatibility. Returns 202 with task_id, deployment_id, project_id. Refs #906 Co-Authored-By: Claude <noreply@anthropic.com> * test(grouping): cover cross-midnight bursts and project-setting fallback Three new TestImageGrouping cases that exercise the new session_time_gap_seconds wiring: - test_cross_midnight_bursts_split_by_short_gap: documents the user- reported pain (2 bursts with off-window > default gap, crossing midnight, split into 2 Events on different dates) and proves that bumping max_time_gap past the off-window collapses them to 1 Event. - test_same_date_bursts_merge_regardless_of_gap: documents the inverse behavior — bursts on the same calendar date collide on group_by regardless of gap setting. Acts as a regression target for any future rework of the date-keyed Event reuse path (per the #904 caveat at models.py:1516). - test_session_time_gap_seconds_is_used_when_no_explicit_gap: verifies group_images_into_events falls back to the project setting when no max_time_gap is passed, including refreshing the cached relation after the setting changes. Refs #906 Co-Authored-By: Claude <noreply@anthropic.com> * feat(perms,tasks): regroup_sessions_deployment perm + idempotent regroup task Addresses takeaway-review feedback on PR #1292: - Add `regroup_sessions_deployment` custom Project permission so non-superusers with the ProjectManager role can hit `POST /deployments/<pk>/regroup-sessions/` (mirrors `sync_deployment`). Without this, `BaseModel.check_custom_permission` computes the codename `regroup_sessions_deployment` and the perm doesn't exist, so the action 403s for everyone except superusers. - Wrap `ami.tasks.regroup_events` in a per-deployment cache lock. Concurrent enqueues (double-clicks, sync_captures auto-regroup racing with manual trigger) collapse to a single run instead of overlapping on the same rows. Lower-level than the view so all callers (Deployment.save, admin bulk action, new API action) are protected. - Defensive clamp on `session_time_gap_seconds` — fall back to the historical 120-minute default if the value is 0 / negative, with a warning log. Stops pathological "every gap = new event" behavior when admins enter bad values. - Migration 0085 adds the new perm to `Project.Meta.permissions` and grants it to existing `ProjectManager` role groups via a data migration (mirrors how 0084 revoked `delete_job`). - Tests: - test_invalid_session_time_gap_falls_back_to_default exercises 0 / -1 / -7200 - test_regroup_events_task_is_idempotent_under_concurrent_calls pre-takes the cache lock, asserts a second call skips group_images_into_events Co-Authored-By: Claude <noreply@anthropic.com> * refactor(regroup): wrap regroup in Job framework, fix lock, unify call sites Replaces the direct-Celery API/admin entry points with a new RegroupEventsJob so admins can see regroup progress and failures in the Jobs UI like every other long-running task. The existing DataStorageSyncJob now runs grouping as an explicit second stage instead of relying on Deployment.save() autoregroup, so a post-sync regroup failure flips the Job to FAILURE rather than silently succeeding (closes #1157, #1158). Other changes in this commit: * Move the per-deployment idempotency lock from ami.tasks.regroup_events into group_images_into_events() itself, so all four call sites (RegroupEventsJob, sync-stage regroup, Deployment.save autoregroup, the bare Celery task) collapse to a single in-flight run rather than only the Celery-task entry point being protected. * Use a uuid token on the lock value and only delete the key if it still matches our token. Fixes the lock-release-clobbers-newer-owner race CodeRabbit flagged. * Add run_regroup_events_job permission and grant it to MLDataManager (which ProjectManager inherits from). regroup_sessions_deployment stays on ProjectManager. Both wired through the renamed migration 0085_add_regroup_permissions. * Strip the UI form field for session_time_gap_seconds; field is now admin-only until we decide whether it belongs on Project or Deployment. * Swap the admin "regroup" and "sync" bulk actions to create Jobs so the admin path matches the API path. Tests: - TestImageGrouping: replaced the old task-level idempotency test with one that exercises the new in-function lock; added a token-release test that verifies an expired-then-reacquired lock isn't clobbered. - TestRegroupEventsJob (new): end-to-end Job run, stats populated, and a failure-propagates assertion. - TestDataStorageSyncJobIncludesRegroupStage (new): sync Job exposes the regroup stage and a regroup failure surfaces as Job failure. Co-Authored-By: Claude <noreply@anthropic.com> * fix(migrations): rebase regroup perms migration onto main's 0088 Main has moved on to migration 0088. Rename 0085_add_regroup_permissions to 0089 and depend on 0088_detection_det_srcimg_created_idx to resolve the multiple-leaf-nodes conflict CI surfaced. Co-Authored-By: Claude <noreply@anthropic.com> * fix(regroup): correct events_created accounting + duplicate-timestamp log; add e2e command The regroup stage param `events_created` was incrementing for every event visited rather than only for events newly created by `get_or_create`. The bool returned from `get_or_create` was being discarded. Track `events_created_count` separately and use it in the stage-param write. Drive-by: `logger.warning(f"Found {len(values)} ...")` was reporting the character length of the newline-joined duplicate-timestamp report rather than the duplicate count. Pre-existing (since 0954c38, 2024-09-12) but sits inside the very block whose stats this PR now surfaces. Use `duplicate_timestamp_count`, and cap the in-log sample at 20 timestamps so very dirty deployments don't flood logs. Add `manage.py test_regroup_job_e2e {regroup|sync|concurrent}` to exercise the new Job framework path against a real stack: `regroup` runs a `RegroupEventsJob` and dumps stage params; `sync` runs a `DataStorageSyncJob` and asserts the two-stage chain; `concurrent` fires two regroups back-to-back and verifies the lock semantics. Mirrors `test_ml_job_e2e`. Used to validate PR #1292 against deployment 74 of the Antenna dev stack. Co-Authored-By: Claude <noreply@anthropic.com> * fix(admin): speed up deployment list view (~12× faster) The DeploymentAdmin list view ran four live per-row aggregates for each deployment displayed: - `start_date` → `SourceImage.objects.filter(event__deployment=obj).aggregate(Min)` (joins SourceImage→Event, scans the full per-deployment timeline) - `end_date` → `SourceImage.objects.filter(deployment=obj).aggregate(Max)` - `events_count` → `obj.events.count()` - `captures_count` → re-read of `data_source_total_files` (cached, but wrapped in a method that suggested it was computed) With 251 deployments on the page and ~5 expensive queries per row, the list view became unusable on stacks holding deployments with >100k captures (e.g. one deployment with 638k captures dominated end_date's wall time). Deployment already denormalizes `captures_count`, `events_count`, `data_source_total_size`, `first_capture_timestamp` and `last_capture_timestamp` via `update_calculated_fields()`. Switch the admin list to read those instead and drop the custom aggregates. Add `@admin.display(ordering=...)` so the columns remain sortable against the underlying cached fields. Wall time on the dev box (251 deployments): ~0.6s warm, ~0.76s cold. Co-Authored-By: Claude <noreply@anthropic.com> * feat(jobs): human-readable regroup stage param names The regroup stage params were stored with machine-style names (`captures_grouped`, `events_created`, ...), which is what the Jobs UI displays to admins. Other JobType stages already use title-cased, space-separated names (`Total files`, `Captures added`, etc.); this commit brings the regroup stage in line. Param **names** (UI labels): Captures grouped, Events created, Events touched, Empty events deleted, Duplicate timestamps, Ungrouped captures, Captures missing timestamp. Param **keys** (for retrieval, slugify(name) with `-` → `_`): captures_grouped, events_created, events_touched, empty_events_deleted (was `events_deleted_empty`), duplicate_timestamps, ungrouped_captures, captures_missing_timestamp (was `no_timestamp_captures`). Five keys are stable; two changed shape. Existing Job rows keep their historical name/key on disk — only new runs use the new labels. Centralise the list as `REGROUP_STAGE_PARAM_NAMES` in ami.jobs.models so RegroupEventsJob and DataStorageSyncJob's regroup stage share one source of truth. In `_group_images_into_events_locked`, swap the kwargs-style `update_stage(...)` call (which couldn't carry names with spaces) for an explicit `add_or_update_stage_param(stage_key, name, value)` loop driven by a label→value dict. Update tests to look up by the new keys and assert the human-readable names exist; update `test_regroup_job_e2e` to print `name [key]: value` so the stable retrieval key is visible alongside the UI label. Co-Authored-By: Claude <noreply@anthropic.com> * fix(regroup): replace assert with validation; restore update_children on sync path Two CodeRabbit findings on the latest revision. 1. `ami/main/api/views.py:376` used `assert deployment.project` to guard the API action. `assert` is stripped under `python -O`, so a deployment whose project FK is null (the schema allows it — `Project` FK is nullable on `Deployment`) would 500 with a confusing AttributeError later in the view instead of returning a clean 400. Replace with an explicit `api_exceptions.ValidationError` that returns 400 with a structured detail. 2. `Deployment.sync_captures()`'s new `regroup_after=False` branch (added in this PR so `DataStorageSyncJob` can drive the regroup as a separate tracked stage) bypassed `Deployment.save()`'s `update_calculated_fields= True` block, which is where `self.update_children()` realigns child `Event`/`Occurrence`/`SourceImage` `project` pointers. Result: a sync Job whose deployment had moved between projects would leave child rows with stale project FKs, breaking project-scoped queries (default filters, visibility checks, etc.). The user-triggered `RegroupEventsJob` path is unaffected because it doesn't run sync_captures — the regression is specific to the new sync→regroup chain. Add an explicit `self.update_children()` call after the `update_calculated_fields(save=True)` refresh on the regroup_after=False branch, gated on `project_id` like the corresponding block in `Deployment.save()`. Existing regroup tests still pass (4/4). Co-Authored-By: Claude <noreply@anthropic.com> * fix(jobs): warn on sync→regroup lock-miss instead of re-enqueuing CodeRabbit raised a real concern about the sync→regroup chain: if a concurrent regroup holds the lock when DataStorageSyncJob's regroup stage runs, the lock-miss branch in `group_images_into_events` returns `[]` and the just-synced captures sit ungrouped until the next save/sync. A previous draft of this fix added a post-lock re-enqueue (`regroup_events.delay()` after the `with _regroup_lock:` block) that called `deployment_events_need_update` and chained a follow-up task. A fresh review found that approach risks cascading enqueues on busy deployments — the lock serialises work but not enqueueing, so concurrent `Deployment.save` autoregroups during a slow run can pile redundant tasks into the queue (no infinite loop, but tens of redundant runs plausible). Drop the re-enqueue. Instead, surface a clear warning on the Job itself when the sync stage's regroup is skipped: we know `total_files` from the preceding sync stage, so we can say "sync added N files but regroup was skipped" directly on the Job that an admin is looking at. The captures will be picked up by the next Deployment.save autoregroup (models.py:1110-1113), which is the existing safety net for the general case. Trade-off vs the previous draft: a sync that races with a concurrent regroup defers its new captures to the next save/sync (small window, no data loss) but does NOT silently appear to have succeeded — the warning lands in the Job log where it belongs. No new tests; existing 4 regroup/lock tests + 12 grouping tests pass (16/16). Co-Authored-By: Claude <noreply@anthropic.com> * docs(project): expand session_time_gap_seconds help_text to flag manual-regroup requirement The previous help text just said "Time gap in seconds to consider a new session" — it didn't tell the user that changing the value doesn't regroup existing captures, which is the most common point of confusion when admins edit the field. Now points users to the Deployments admin "Regroup captures into sessions" bulk action so they can apply the change. Surfaces via Django admin form, DRF browsable API, and OPTIONS schema. Co-Authored-By: Claude <noreply@anthropic.com> * fix(regroup): apply takeaway-review findings Four fixes surfaced by an independent takeaway review: - Count actual SourceImage rows assigned to an event for the "Captures grouped" stat instead of the distinct-timestamp count. Two captures can share a timestamp, so the old count understated the work done. - Lower the regroup lock TTL and the matching Celery time limits to 10 min soft / 11 min hard. group_images_into_events is mostly batch SQL and finishes in seconds, so a tight limit bounds the worst-case stuck-lock window when a worker dies hard (OOM, SIGKILL, eviction) and the finally block never runs to release the lock. - Document that RegroupEventsJob and the bare regroup_events task only group; propagating project_id to children stays on Deployment.save(). - Log a clear "regroup skipped" message when the per-deployment lock is already held, instead of a misleading "now has 0 events". Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent 0e329fb commit 13203e4

12 files changed

Lines changed: 1119 additions & 77 deletions

File tree

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
"""End-to-end test harness for ``RegroupEventsJob`` and the regroup stage of
2+
``DataStorageSyncJob``.
3+
4+
Mirrors ``test_ml_job_e2e`` for the session-regrouping path (PR #1292). Used to
5+
validate:
6+
7+
* Mode ``regroup``: ``RegroupEventsJob`` runs to SUCCESS against a real
8+
deployment, stage params are populated, Event count delta is reasonable.
9+
* Mode ``sync``: ``DataStorageSyncJob`` exposes a two-stage progress
10+
(sync_captures + regroup_sessions), both reach SUCCESS, regroup stage params
11+
are populated.
12+
* Mode ``concurrent``: Two ``RegroupEventsJob`` enqueues for the same
13+
deployment within the lock TTL — exactly one stage produces non-zero stats,
14+
the other short-circuits with a lock warning, and Event count does not
15+
diverge from the single-run baseline.
16+
"""
17+
18+
import time
19+
20+
from django.core.management.base import BaseCommand, CommandError
21+
22+
from ami.jobs.models import DataStorageSyncJob, Job, JobState, RegroupEventsJob
23+
from ami.main.models import Deployment, Event
24+
25+
26+
class Command(BaseCommand):
27+
help = (
28+
"Run end-to-end tests for the regroup-events Job path.\n\n"
29+
"Modes:\n"
30+
" regroup — RegroupEventsJob on a deployment\n"
31+
" sync — DataStorageSyncJob (covers sync→regroup chain)\n"
32+
" concurrent — two RegroupEventsJobs back-to-back, asserts lock semantics\n"
33+
)
34+
35+
def add_arguments(self, parser):
36+
parser.add_argument(
37+
"mode",
38+
choices=["regroup", "sync", "concurrent"],
39+
help="Which scenario to exercise",
40+
)
41+
parser.add_argument("--deployment", type=int, required=True, help="Deployment ID")
42+
parser.add_argument(
43+
"--poll-interval", type=float, default=2.0, help="Seconds between Job state polls (default 2.0)"
44+
)
45+
parser.add_argument(
46+
"--timeout", type=float, default=600.0, help="Max seconds to wait for each Job (default 600)"
47+
)
48+
49+
def handle(self, *args, **options):
50+
deployment = self._resolve_deployment(options["deployment"])
51+
mode = options["mode"]
52+
53+
if mode == "regroup":
54+
self._run_regroup(deployment, options)
55+
elif mode == "sync":
56+
self._run_sync(deployment, options)
57+
elif mode == "concurrent":
58+
self._run_concurrent(deployment, options)
59+
60+
def _resolve_deployment(self, deployment_id: int) -> Deployment:
61+
try:
62+
deployment = Deployment.objects.get(pk=deployment_id)
63+
except Deployment.DoesNotExist:
64+
raise CommandError(f"Deployment {deployment_id} not found")
65+
self.stdout.write(
66+
self.style.SUCCESS(
67+
f"✓ Deployment {deployment.pk} '{deployment.name}' "
68+
f"(project={deployment.project_id}, captures={deployment.captures_count})"
69+
)
70+
)
71+
gap = deployment.project.session_time_gap_seconds if deployment.project_id else None
72+
self.stdout.write(f" project session_time_gap_seconds = {gap!r}")
73+
before = Event.objects.filter(deployment=deployment).count()
74+
self.stdout.write(f" Events before run: {before}")
75+
return deployment
76+
77+
def _run_regroup(self, deployment: Deployment, options: dict) -> None:
78+
before = Event.objects.filter(deployment=deployment).count()
79+
job = self._make_regroup_job(deployment, suffix="e2e-regroup")
80+
self.stdout.write(f"\n🚀 RegroupEventsJob {job.pk} enqueueing")
81+
job.enqueue()
82+
self._monitor(job, options)
83+
after = Event.objects.filter(deployment=deployment).count()
84+
self.stdout.write(f"\nEvents after: {after}{after - before:+d})")
85+
self._assert_status(job, expected=JobState.SUCCESS)
86+
self._dump_stage_params(job)
87+
88+
def _run_sync(self, deployment: Deployment, options: dict) -> None:
89+
if not deployment.data_source_id:
90+
raise CommandError(
91+
f"Deployment {deployment.pk} has no data_source — DataStorageSyncJob would fail immediately."
92+
)
93+
before = Event.objects.filter(deployment=deployment).count()
94+
job = Job.objects.create(
95+
name=f"E2E sync→regroup chain (deployment {deployment.pk})",
96+
project=deployment.project,
97+
deployment=deployment,
98+
job_type_key=DataStorageSyncJob.key,
99+
)
100+
self.stdout.write(f"\n🚀 DataStorageSyncJob {job.pk} enqueueing")
101+
job.enqueue()
102+
self._monitor(job, options)
103+
after = Event.objects.filter(deployment=deployment).count()
104+
self.stdout.write(f"\nEvents after: {after}{after - before:+d})")
105+
self._assert_status(job, expected=JobState.SUCCESS)
106+
107+
stage_keys = [s.key for s in (job.progress.stages or [])]
108+
if DataStorageSyncJob.regroup_stage_key not in stage_keys:
109+
raise CommandError(
110+
f"❌ DataStorageSyncJob exposed stages {stage_keys!r} — missing "
111+
f"'{DataStorageSyncJob.regroup_stage_key}' regroup stage."
112+
)
113+
self.stdout.write(self.style.SUCCESS(f"✓ Sync Job exposed both stages: {stage_keys!r}"))
114+
self._dump_stage_params(job)
115+
116+
def _run_concurrent(self, deployment: Deployment, options: dict) -> None:
117+
before = Event.objects.filter(deployment=deployment).count()
118+
job_a = self._make_regroup_job(deployment, suffix="e2e-concurrent-A")
119+
job_b = self._make_regroup_job(deployment, suffix="e2e-concurrent-B")
120+
self.stdout.write(f"\n🚀 Enqueueing two RegroupEventsJobs back-to-back: {job_a.pk}, {job_b.pk}")
121+
job_a.enqueue()
122+
# No sleep between — we want both Celery tasks to race for the lock.
123+
job_b.enqueue()
124+
125+
self.stdout.write("\nMonitoring job A:")
126+
self._monitor(job_a, options)
127+
self.stdout.write("\nMonitoring job B:")
128+
self._monitor(job_b, options)
129+
130+
after = Event.objects.filter(deployment=deployment).count()
131+
self.stdout.write(f"\nEvents after both jobs: {after}{after - before:+d})")
132+
133+
for job in (job_a, job_b):
134+
self._assert_status(job, expected=JobState.SUCCESS)
135+
136+
params_a = self._stage_param_dict(job_a, RegroupEventsJob.key)
137+
params_b = self._stage_param_dict(job_b, RegroupEventsJob.key)
138+
self.stdout.write(f"\nJob A stage params: {params_a}")
139+
self.stdout.write(f"Job B stage params: {params_b}")
140+
141+
# Exactly one of A/B should have done real work (captures_grouped > 0);
142+
# the other should have short-circuited and reported the initial zeroes.
143+
worked_a = (params_a.get("captures_grouped") or 0) > 0
144+
worked_b = (params_b.get("captures_grouped") or 0) > 0
145+
if worked_a == worked_b:
146+
self.stdout.write(
147+
self.style.WARNING(
148+
f"⚠ Lock did not separate runs as expected — both jobs reported "
149+
f"captures_grouped={params_a.get('captures_grouped')}/"
150+
f"{params_b.get('captures_grouped')}. "
151+
f"This can happen if the worker ran them serially fast enough that the lock cleared between."
152+
)
153+
)
154+
else:
155+
winner = "A" if worked_a else "B"
156+
loser = "B" if worked_a else "A"
157+
self.stdout.write(
158+
self.style.SUCCESS(f"✓ Lock semantics held: job {winner} did the work, job {loser} short-circuited.")
159+
)
160+
161+
def _make_regroup_job(self, deployment: Deployment, suffix: str) -> Job:
162+
return Job.objects.create(
163+
name=f"E2E {suffix} (deployment {deployment.pk})",
164+
project=deployment.project,
165+
deployment=deployment,
166+
job_type_key=RegroupEventsJob.key,
167+
)
168+
169+
def _monitor(self, job: Job, options: dict) -> None:
170+
start = time.time()
171+
timeout = options["timeout"]
172+
interval = options["poll_interval"]
173+
last_status = None
174+
while True:
175+
job.refresh_from_db()
176+
elapsed = time.time() - start
177+
if job.status != last_status:
178+
self.stdout.write(f" [{elapsed:6.1f}s] Job {job.pk} status: {job.status}")
179+
last_status = job.status
180+
if job.status in JobState.final_states():
181+
self.stdout.write(f" [{elapsed:6.1f}s] Job {job.pk} reached final state {job.status}")
182+
return
183+
if elapsed > timeout:
184+
raise CommandError(
185+
f"❌ Job {job.pk} did not reach a final state within {timeout}s (status={job.status})"
186+
)
187+
time.sleep(interval)
188+
189+
def _assert_status(self, job: Job, expected: str) -> None:
190+
if job.status != expected:
191+
raise CommandError(
192+
f"❌ Job {job.pk} ended with status {job.status!r} (expected {expected!r}). "
193+
f"Stages: {[(s.key, s.status, s.progress) for s in (job.progress.stages or [])]}"
194+
)
195+
self.stdout.write(self.style.SUCCESS(f"✓ Job {job.pk} ended {expected}"))
196+
197+
def _stage_param_dict(self, job: Job, stage_key: str) -> dict:
198+
for stage in job.progress.stages or []:
199+
if stage.key == stage_key:
200+
return {param.key: param.value for param in (stage.params or [])}
201+
return {}
202+
203+
def _dump_stage_params(self, job: Job) -> None:
204+
for stage in job.progress.stages or []:
205+
self.stdout.write(
206+
f"\n Stage '{stage.name}' ({stage.key}): status={stage.status} progress={stage.progress}"
207+
)
208+
for param in stage.params or []:
209+
self.stdout.write(f" {param.name} [{param.key}]: {param.value}")

ami/jobs/models.py

Lines changed: 120 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -672,9 +672,37 @@ def process_images(cls, job, images):
672672
job.save()
673673

674674

675+
# Human-readable param names for the regroup stage, surfaced both on the
676+
# standalone RegroupEventsJob and on DataStorageSyncJob's regroup stage. The
677+
# stable retrieval key is the slugify(name) form (e.g. "captures-grouped"),
678+
# produced by JobProgress.make_key — see _REGROUP_STAGE_PARAM_KEYS in
679+
# ami.main.models for the corresponding kwarg map.
680+
REGROUP_STAGE_PARAM_NAMES = (
681+
"Captures grouped",
682+
"Events created",
683+
"Events touched",
684+
"Empty events deleted",
685+
"Duplicate timestamps",
686+
"Ungrouped captures",
687+
"Captures missing timestamp",
688+
)
689+
690+
675691
class DataStorageSyncJob(JobType):
692+
"""
693+
Sync captures from the deployment's data source, then regroup them into
694+
sessions as a separate tracked stage.
695+
696+
The regroup stage runs inside this job (not via ``Deployment.save()``
697+
autoregroup) so its logs land on the same Job row and a regroup failure
698+
flips the Job to FAILURE. Previously a sync would silently succeed even
699+
if the post-sync regroup raised — see #1157.
700+
"""
701+
676702
name = "Data storage sync"
677703
key = "data_storage_sync"
704+
regroup_stage_key = "regroup_sessions"
705+
regroup_stage_name = "Regroup sessions"
678706

679707
@classmethod
680708
def run(cls, job: "Job"):
@@ -683,38 +711,63 @@ def run(cls, job: "Job"):
683711
684712
This is meant to be called by an async task, not directly.
685713
"""
714+
from ami.main.models import group_images_into_events
686715

687-
job.progress.add_stage(cls.name)
716+
job.progress.add_stage(cls.name, key=cls.key)
688717
job.progress.add_stage_param(cls.key, "Total files", 0)
689718
job.progress.add_stage_param(cls.key, "Failed", 0)
719+
720+
job.progress.add_stage(cls.regroup_stage_name, key=cls.regroup_stage_key)
721+
for param_name in REGROUP_STAGE_PARAM_NAMES:
722+
job.progress.add_stage_param(cls.regroup_stage_key, param_name, 0)
723+
690724
job.update_status(JobState.STARTED)
691725
job.started_at = datetime.datetime.now()
692726
job.finished_at = None
693727
job.save()
694728

695729
if not job.deployment:
696730
raise ValueError("No deployment provided for data storage sync job")
697-
else:
698-
job.logger.info(f"Syncing captures for deployment {job.deployment}")
699-
job.progress.update_stage(
700-
cls.key,
701-
status=JobState.STARTED,
702-
progress=0,
703-
total_files=0,
704-
)
705-
job.save()
706731

707-
job.deployment.sync_captures(job=job)
732+
job.logger.info(f"Syncing captures for deployment {job.deployment}")
733+
job.progress.update_stage(
734+
cls.key,
735+
status=JobState.STARTED,
736+
progress=0,
737+
total_files=0,
738+
)
739+
job.save()
740+
741+
job.deployment.sync_captures(job=job, regroup_after=False)
708742

709-
job.logger.info(f"Finished syncing captures for deployment {job.deployment}")
710-
job.progress.update_stage(
711-
cls.key,
712-
status=JobState.SUCCESS,
713-
progress=1,
743+
job.logger.info(f"Finished syncing captures for deployment {job.deployment}")
744+
job.progress.update_stage(cls.key, status=JobState.SUCCESS, progress=1)
745+
job.save()
746+
747+
job.logger.info(f"Regrouping captures into sessions for deployment {job.deployment}")
748+
job.progress.update_stage(cls.regroup_stage_key, status=JobState.STARTED, progress=0)
749+
job.save()
750+
751+
events = group_images_into_events(job.deployment, job=job, stage_key=cls.regroup_stage_key)
752+
job.logger.info(f"Deployment {job.deployment} now has {len(events)} events after sync regroup.")
753+
754+
# The lock-miss branch in group_images_into_events returns []. If we
755+
# just synced new captures, that means those captures are now sitting
756+
# ungrouped because a concurrent regroup held the lock. They will be
757+
# picked up by the next Deployment.save autoregroup (e.g. the next
758+
# sync) — but flag it loudly on this Job so an admin watching this run
759+
# sees what happened rather than a silently-empty regroup stage.
760+
sync_total_files = job.progress.get_stage_param(cls.key, "total_files").value or 0
761+
if not events and sync_total_files:
762+
job.logger.warning(
763+
f"Sync added {sync_total_files} files but the regroup stage was skipped because "
764+
f"another regroup is in progress for deployment {job.deployment.pk}. The new captures "
765+
f"will be grouped by the next sync or save. If this keeps happening, check the Jobs "
766+
f"list for a stuck regroup_events task."
714767
)
715-
job.update_status(JobState.SUCCESS)
716-
job.save()
717768

769+
job.progress.update_stage(cls.regroup_stage_key, status=JobState.SUCCESS, progress=1)
770+
job.update_status(JobState.SUCCESS)
718771
job.finished_at = datetime.datetime.now()
719772
job.save()
720773

@@ -834,6 +887,54 @@ def run(cls, job: "Job"):
834887
job.save()
835888

836889

890+
class RegroupEventsJob(JobType):
891+
"""
892+
Regroup a deployment's captures into Events using the project's
893+
``session_time_gap_seconds`` setting.
894+
895+
Single-stage job: ``group_images_into_events`` is one mostly-atomic SQL
896+
pass with no per-image Python loop, so we cannot report incremental %
897+
progress meaningfully. Stage transitions are CREATED → STARTED (0%) →
898+
SUCCESS/FAILURE (100%). Summary stats (events created/touched/deleted,
899+
duplicates, ungrouped captures) are written to the stage params by
900+
``group_images_into_events`` itself before it returns. Closes #1157, #1158.
901+
902+
Scope: grouping only. Propagating ``project_id`` to children lives on
903+
``Deployment.save()`` via ``update_children()`` — save the deployment
904+
after moving it to push the new ``project_id`` down. Bare
905+
``ami.tasks.regroup_events`` has the same scope.
906+
"""
907+
908+
name = "Regroup sessions"
909+
key = "regroup_events"
910+
911+
@classmethod
912+
def run(cls, job: "Job"):
913+
from ami.main.models import group_images_into_events
914+
915+
if not job.deployment:
916+
raise ValueError("No deployment provided for regroup events job")
917+
918+
job.progress.add_stage(cls.name, key=cls.key)
919+
for param_name in REGROUP_STAGE_PARAM_NAMES:
920+
job.progress.add_stage_param(cls.key, param_name, 0)
921+
922+
job.update_status(JobState.STARTED)
923+
job.started_at = datetime.datetime.now()
924+
job.finished_at = None
925+
job.progress.update_stage(cls.key, status=JobState.STARTED, progress=0)
926+
job.save()
927+
928+
job.logger.info(f"Regrouping captures for deployment {job.deployment}")
929+
events = group_images_into_events(job.deployment, job=job, stage_key=cls.key)
930+
job.logger.info(f"Deployment {job.deployment} now has {len(events)} events after regrouping.")
931+
932+
job.progress.update_stage(cls.key, status=JobState.SUCCESS, progress=1)
933+
job.update_status(JobState.SUCCESS, save=False)
934+
job.finished_at = datetime.datetime.now()
935+
job.save()
936+
937+
837938
class UnknownJobType(JobType):
838939
name = "Unknown"
839940
key = "unknown"
@@ -847,6 +948,7 @@ def run(cls, job: "Job"):
847948
MLJob,
848949
SourceImageCollectionPopulateJob,
849950
DataStorageSyncJob,
951+
RegroupEventsJob,
850952
UnknownJobType,
851953
DataExportJob,
852954
PostProcessingJob,

0 commit comments

Comments
 (0)