Skip to content

Commit 471caaa

Browse files
committed
added remove_checkpoints instead of single checkpoint
1 parent 2b9f5ab commit 471caaa

3 files changed

Lines changed: 13 additions & 8 deletions

File tree

src/datachain/catalog/catalog.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2145,8 +2145,7 @@ def cleanup_checkpoints(self, ttl_seconds: int | None = None) -> int:
21452145
self._cleanup_udf_tables(f"udf_{group_id}_", "_input")
21462146

21472147
checkpoints = list(self.metastore.list_checkpoints(job_id=inactive_job_ids))
2148-
for ch in checkpoints:
2149-
self.metastore.remove_checkpoint(ch.id)
2148+
self.metastore.remove_checkpoints([ch.id for ch in checkpoints])
21502149

21512150
logger.info(
21522151
"Checkpoint cleanup complete: removed %d checkpoints from %d jobs",

src/datachain/data_storage/metastore.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,10 @@ def get_or_create_checkpoint(
591591
"""Get or create checkpoint. Must be atomic and idempotent."""
592592

593593
@abstractmethod
594-
def remove_checkpoint(self, checkpoint_id: str, conn: Any | None = None) -> None:
595-
"""Removes a checkpoint by ID"""
594+
def remove_checkpoints(
595+
self, checkpoint_ids: list[str], conn: Any | None = None
596+
) -> None:
597+
"""Soft-delete checkpoints by IDs."""
596598

597599
@abstractmethod
598600
def get_jobs_with_expired_checkpoints(
@@ -2743,10 +2745,14 @@ def get_dataset_version_for_job_ancestry(
27432745

27442746
return self.dataset_version_class.parse(*results[0])
27452747

2746-
def remove_checkpoint(self, checkpoint_id: str, conn: Any | None = None) -> None:
2748+
def remove_checkpoints(
2749+
self, checkpoint_ids: list[str], conn: Any | None = None
2750+
) -> None:
2751+
if not checkpoint_ids:
2752+
return
27472753
self.db.execute(
27482754
self._checkpoints.update()
2749-
.where(self._checkpoints.c.id == checkpoint_id)
2755+
.where(self._checkpoints.c.id.in_(checkpoint_ids))
27502756
.values(status=CheckpointStatus.DELETED),
27512757
conn=conn,
27522758
)

src/datachain/query/dataset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,7 +1155,7 @@ def _run_from_scratch(
11551155
)
11561156

11571157
if partial_checkpoint:
1158-
self.metastore.remove_checkpoint(partial_checkpoint.id)
1158+
self.metastore.remove_checkpoints([partial_checkpoint.id])
11591159
self.metastore.get_or_create_checkpoint(self.job.id, hash_output)
11601160
logger.debug(
11611161
"UDF(%s) [job=%s run_group=%s]: Promoted partial to final, hash=%s",
@@ -1291,7 +1291,7 @@ def _continue_udf(
12911291
partial_table, udf_output_table_name(self.job.id, hash_output)
12921292
)
12931293

1294-
self.metastore.remove_checkpoint(partial_checkpoint.id)
1294+
self.metastore.remove_checkpoints([partial_checkpoint.id])
12951295
self.metastore.get_or_create_checkpoint(self.job.id, hash_output)
12961296
logger.debug(
12971297
"UDF(%s) [job=%s run_group=%s]: Promoted partial to final, hash=%s",

0 commit comments

Comments
 (0)