Skip to content

Commit 3055488

Browse files
committed
[pac] Add get_asset_check_partitions storage method
1 parent 0bf6f2e commit 3055488

File tree

6 files changed

+257
-2
lines changed

6 files changed

+257
-2
lines changed

python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from collections.abc import Iterable
33
from typing import NamedTuple, Optional, cast
44

5+
from dagster_shared.record import record
56
from dagster_shared.serdes import deserialize_value
67

78
import dagster._check as check
@@ -238,3 +239,28 @@ async def targets_latest_materialization(self, loading_context: LoadingContext)
238239
)
239240
else:
240241
check.failed(f"Unexpected check status {resolved_status}")
242+
243+
244+
@record
245+
class AssetCheckPartitionInfo:
246+
check_key: AssetCheckKey
247+
partition_key: Optional[str]
248+
# the status of the last execution of the check
249+
last_execution_status: AssetCheckExecutionRecordStatus
250+
# the storage id of the materialization that the last execution of the check targeted
251+
last_execution_target_materialization_storage_id: Optional[int]
252+
# the run id of the last planned event for the check
253+
last_planned_run_id: str
254+
# the storage id of the last event for the check
255+
last_storage_id: int
256+
# the storage id of the last materialization for the asset this check targets
257+
last_materialization_storage_id: Optional[int]
258+
259+
@property
260+
def is_current(self) -> bool:
261+
"""Returns True if the latest check execution targets the latest materialization event."""
262+
return (
263+
self.last_materialization_storage_id is None
264+
or self.last_materialization_storage_id
265+
== self.last_execution_target_materialization_storage_id
266+
)

python_modules/dagster/dagster/_core/storage/event_log/base.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from dagster._core.storage.asset_check_execution_record import (
3737
AssetCheckExecutionRecord,
3838
AssetCheckExecutionRecordStatus,
39+
AssetCheckPartitionInfo,
3940
)
4041
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
4142
from dagster._core.storage.partition_status_cache import get_and_update_asset_status_cache_value
@@ -648,6 +649,15 @@ def get_latest_asset_check_execution_by_key(
648649
"""Get the latest executions for a list of asset checks."""
649650
pass
650651

652+
@abstractmethod
653+
def get_asset_check_partition_info(
654+
self,
655+
keys: Sequence[AssetCheckKey],
656+
after_storage_id: Optional[int] = None,
657+
) -> Sequence[AssetCheckPartitionInfo]:
658+
"""Get asset check partition records with execution status and planned run info."""
659+
pass
660+
651661
@abstractmethod
652662
def fetch_materializations(
653663
self,

python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
COMPLETED_ASSET_CHECK_EXECUTION_RECORD_STATUSES,
6060
AssetCheckExecutionRecord,
6161
AssetCheckExecutionRecordStatus,
62+
AssetCheckPartitionInfo,
6263
)
6364
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
6465
from dagster._core.storage.event_log.base import (
@@ -3008,6 +3009,7 @@ def _store_asset_check_evaluation_planned(
30083009
execution_status=AssetCheckExecutionRecordStatus.PLANNED.value,
30093010
evaluation_event=serialize_value(event),
30103011
evaluation_event_timestamp=self._event_insert_timestamp(event),
3012+
evaluation_event_storage_id=event_id,
30113013
)
30123014
for partition_key in partition_keys
30133015
]
@@ -3232,6 +3234,106 @@ def get_latest_asset_check_execution_by_key(
32323234
results[check_key] = AssetCheckExecutionRecord.from_db_row(row, key=check_key)
32333235
return results
32343236

3237+
def _get_asset_check_partition_info_for_key(
3238+
self, check_key: AssetCheckKey, after_storage_id: Optional[int]
3239+
) -> Sequence[AssetCheckPartitionInfo]:
3240+
# Build the base filter conditions
3241+
filter_conditions = [
3242+
AssetCheckExecutionsTable.c.asset_key == check_key.asset_key.to_string(),
3243+
AssetCheckExecutionsTable.c.check_name == check_key.name,
3244+
# Historical records may have NULL in the evaluation_event_storage_id column for
3245+
# PLANNED events
3246+
AssetCheckExecutionsTable.c.evaluation_event_storage_id.isnot(None),
3247+
]
3248+
3249+
# Subquery to find the max id for each partition
3250+
latest_check_ids_subquery = db_subquery(
3251+
db_select(
3252+
[
3253+
db.func.max(AssetCheckExecutionsTable.c.id).label("id"),
3254+
AssetCheckExecutionsTable.c.partition.label("partition"),
3255+
]
3256+
)
3257+
.where(db.and_(*filter_conditions))
3258+
.group_by(AssetCheckExecutionsTable.c.partition),
3259+
"latest_check_ids_subquery",
3260+
)
3261+
3262+
# Subquery to find the latest materialization storage id for each partition of the
3263+
# target asset. Note: we don't filter by after_storage_id here because we always want
3264+
# to return the latest materialization storage id, even if it's older than after_storage_id.
3265+
latest_materialization_ids_subquery = self._latest_event_ids_by_partition_subquery(
3266+
check_key.asset_key,
3267+
[DagsterEventType.ASSET_MATERIALIZATION],
3268+
)
3269+
3270+
# Main query to get all columns for the latest records, joined with latest
3271+
# materialization storage ids
3272+
query = db_select(
3273+
[
3274+
AssetCheckExecutionsTable.c.id,
3275+
AssetCheckExecutionsTable.c.partition,
3276+
AssetCheckExecutionsTable.c.execution_status,
3277+
AssetCheckExecutionsTable.c.evaluation_event_storage_id,
3278+
AssetCheckExecutionsTable.c.materialization_event_storage_id,
3279+
AssetCheckExecutionsTable.c.run_id,
3280+
latest_materialization_ids_subquery.c.id.label("latest_materialization_storage_id"),
3281+
]
3282+
).select_from(
3283+
AssetCheckExecutionsTable.join(
3284+
latest_check_ids_subquery,
3285+
AssetCheckExecutionsTable.c.id == latest_check_ids_subquery.c.id,
3286+
).join(
3287+
latest_materialization_ids_subquery,
3288+
AssetCheckExecutionsTable.c.partition
3289+
== latest_materialization_ids_subquery.c.partition,
3290+
isouter=True,
3291+
)
3292+
)
3293+
3294+
# these filters are applied to the main query rather than the individual subqueries to ensure
3295+
# we don't miss records that only have a new materialization or a new check execution but not both
3296+
if after_storage_id is not None:
3297+
query = query.where(
3298+
db.or_(
3299+
AssetCheckExecutionsTable.c.evaluation_event_storage_id > after_storage_id,
3300+
latest_materialization_ids_subquery.c.id > after_storage_id,
3301+
)
3302+
)
3303+
3304+
with self.index_connection() as conn:
3305+
rows = db_fetch_mappings(conn, query)
3306+
3307+
return [
3308+
AssetCheckPartitionInfo(
3309+
check_key=check_key,
3310+
partition_key=row["partition"],
3311+
last_execution_status=AssetCheckExecutionRecordStatus(row["execution_status"]),
3312+
last_execution_target_materialization_storage_id=row[
3313+
"materialization_event_storage_id"
3314+
],
3315+
last_planned_run_id=row["run_id"],
3316+
last_storage_id=row["evaluation_event_storage_id"],
3317+
last_materialization_storage_id=row["latest_materialization_storage_id"],
3318+
)
3319+
for row in rows
3320+
]
3321+
3322+
def get_asset_check_partition_info(
3323+
self,
3324+
keys: Sequence[AssetCheckKey],
3325+
after_storage_id: Optional[int] = None,
3326+
) -> Sequence[AssetCheckPartitionInfo]:
3327+
check.list_param(keys, "keys", of_type=AssetCheckKey)
3328+
check.opt_int_param(after_storage_id, "after_storage_id")
3329+
3330+
infos = []
3331+
# the inner query is not feasible to join in a single query because the latest materialization ids subquery,
3332+
# so for now we fetch the info for each key separately
3333+
for key in keys:
3334+
infos.extend(self._get_asset_check_partition_info_for_key(key, after_storage_id))
3335+
return infos
3336+
32353337
@property
32363338
def supports_asset_checks(self): # pyright: ignore[reportIncompatibleMethodOverride]
32373339
return self.has_table(AssetCheckExecutionsTable.name)

python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,12 @@ def store_event(self, event: EventLogEntry) -> None:
276276
self.store_asset_event_tags([event], [event_id])
277277

278278
if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS:
279-
self.store_asset_check_event(event, None)
279+
# mirror the event in the cross-run index database
280+
with self.index_connection() as conn:
281+
result = conn.execute(insert_event_statement)
282+
event_id = result.inserted_primary_key[0]
283+
284+
self.store_asset_check_event(event, event_id)
280285

281286
if event.is_dagster_event and event.dagster_event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS:
282287
# should mirror run status change events in the index shard

python_modules/dagster/dagster/_core/storage/legacy_storage.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from dagster._core.storage.asset_check_execution_record import (
1717
AssetCheckExecutionRecord,
1818
AssetCheckExecutionRecordStatus,
19+
AssetCheckPartitionInfo,
1920
)
2021
from dagster._core.storage.base_storage import DagsterStorage
2122
from dagster._core.storage.event_log.base import (
@@ -765,6 +766,15 @@ def get_latest_asset_check_execution_by_key(
765766
check_keys, partition=partition
766767
)
767768

769+
def get_asset_check_partition_info(
770+
self,
771+
keys: Sequence["AssetCheckKey"],
772+
after_storage_id: Optional[int] = None,
773+
) -> Sequence[AssetCheckPartitionInfo]:
774+
return self._storage.event_log_storage.get_asset_check_partition_info(
775+
keys=keys, after_storage_id=after_storage_id
776+
)
777+
768778

769779
class LegacyScheduleStorage(ScheduleStorage, ConfigurableClass):
770780
def __init__(self, storage: DagsterStorage, inst_data: Optional[ConfigurableClassData] = None):

python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7092,6 +7092,47 @@ def test_asset_check_partitioned_planned_and_evaluation(
70927092
assert latest_c[check_key].partition == "c"
70937093
assert latest_c[check_key].status == AssetCheckExecutionRecordStatus.PLANNED
70947094

7095+
# Test get_asset_check_partition_records - returns all partitions with latest status
7096+
partition_records = storage.get_asset_check_partition_info([check_key])
7097+
assert len(partition_records) == 3
7098+
7099+
records_by_partition = {r.partition_key: r for r in partition_records}
7100+
assert set(records_by_partition.keys()) == {"a", "b", "c"}
7101+
7102+
# Verify each partition has correct status
7103+
assert (
7104+
records_by_partition["a"].last_execution_status
7105+
== AssetCheckExecutionRecordStatus.SUCCEEDED
7106+
)
7107+
assert (
7108+
records_by_partition["b"].last_execution_status
7109+
== AssetCheckExecutionRecordStatus.FAILED
7110+
)
7111+
assert (
7112+
records_by_partition["c"].last_execution_status
7113+
== AssetCheckExecutionRecordStatus.PLANNED
7114+
)
7115+
7116+
# Verify all records have the same run_id (all planned in same run)
7117+
assert records_by_partition["a"].last_planned_run_id == run_id
7118+
assert records_by_partition["b"].last_planned_run_id == run_id
7119+
assert records_by_partition["c"].last_planned_run_id == run_id
7120+
7121+
# Verify last_storage_id is set for all records (this is the row id in the table)
7122+
assert records_by_partition["a"].last_storage_id is not None
7123+
assert records_by_partition["b"].last_storage_id is not None
7124+
assert records_by_partition["c"].last_storage_id is not None
7125+
7126+
# Verify last_materialization_storage_id is None for all records (no materializations)
7127+
assert records_by_partition["a"].last_materialization_storage_id is None
7128+
assert records_by_partition["b"].last_materialization_storage_id is None
7129+
assert records_by_partition["c"].last_materialization_storage_id is None
7130+
7131+
filtered_records = storage.get_asset_check_partition_info(
7132+
[check_key], after_storage_id=999999
7133+
)
7134+
assert len(filtered_records) == 0
7135+
70957136
def test_asset_check_partitioned_multiple_runs_same_partition(
70967137
self,
70977138
storage: EventLogStorage,
@@ -7107,22 +7148,54 @@ def test_asset_check_partitioned_multiple_runs_same_partition(
71077148
partitions_def = dg.StaticPartitionsDefinition(["a"])
71087149
partitions_subset = partitions_def.subset_with_partition_keys(["a"])
71097150

7110-
# Run 1: Store planned + evaluation for partition "a" with passed=True
7151+
# Run 1: Store planned event for partition "a"
71117152
storage.store_event(
71127153
_create_check_planned_event(run_id_1, check_key, partitions_subset=partitions_subset)
71137154
)
7155+
7156+
# status for partition "a" should be PLANNED
7157+
partition_records = storage.get_asset_check_partition_info([check_key])
7158+
assert len(partition_records) == 1
7159+
record = partition_records[0]
7160+
assert record.partition_key == "a"
7161+
assert record.last_execution_status == AssetCheckExecutionRecordStatus.PLANNED
7162+
assert record.last_planned_run_id == run_id_1
7163+
7164+
# Run 1: Now store evaluation event for partition "a" with passed=True
71147165
storage.store_event(
71157166
_create_check_evaluation_event(run_id_1, check_key, passed=True, partition="a")
71167167
)
71177168

7169+
# status for partition "a" should be SUCCEEDED
7170+
partition_records = storage.get_asset_check_partition_info([check_key])
7171+
assert len(partition_records) == 1
7172+
record = partition_records[0]
7173+
assert record.partition_key == "a"
7174+
assert record.last_execution_status == AssetCheckExecutionRecordStatus.SUCCEEDED
7175+
assert record.last_planned_run_id == run_id_1
7176+
71187177
# Run 2: Store planned + evaluation for partition "a" with passed=False
71197178
storage.store_event(
71207179
_create_check_planned_event(run_id_2, check_key, partitions_subset=partitions_subset)
71217180
)
7181+
7182+
# back to PLANNED
7183+
partition_records = storage.get_asset_check_partition_info([check_key])
7184+
record = partition_records[0]
7185+
assert record.last_execution_status == AssetCheckExecutionRecordStatus.PLANNED
7186+
assert record.last_planned_run_id == run_id_2
7187+
7188+
# Run 2: Now store evaluation event for partition "a" with passed=False
71227189
storage.store_event(
71237190
_create_check_evaluation_event(run_id_2, check_key, passed=False, partition="a")
71247191
)
71257192

7193+
# onto FAILED
7194+
partition_records = storage.get_asset_check_partition_info([check_key])
7195+
record = partition_records[0]
7196+
assert record.last_execution_status == AssetCheckExecutionRecordStatus.FAILED
7197+
assert record.last_planned_run_id == run_id_2
7198+
71267199
# Verify get_asset_check_execution_history returns 2 records for partition "a"
71277200
checks = storage.get_asset_check_execution_history(check_key, limit=10, partition="a")
71287201
assert len(checks) == 2
@@ -7149,6 +7222,16 @@ def test_asset_check_partitioned_multiple_runs_same_partition(
71497222
assert check_key in latest_overall
71507223
assert latest_overall[check_key].run_id == run_id_2
71517224

7225+
# Test get_asset_check_partition_records returns only the latest record per partition
7226+
partition_records = storage.get_asset_check_partition_info([check_key])
7227+
assert len(partition_records) == 1 # Only partition "a" exists
7228+
7229+
record = partition_records[0]
7230+
assert record.partition_key == "a"
7231+
# Should be the latest execution (run_id_2, FAILED)
7232+
assert record.last_execution_status == AssetCheckExecutionRecordStatus.FAILED
7233+
assert record.last_planned_run_id == run_id_2
7234+
71527235
def test_asset_check_partitioned_with_target_materialization(
71537236
self,
71547237
storage: EventLogStorage,
@@ -7210,6 +7293,12 @@ def test_asset_check_partitioned_with_target_materialization(
72107293
assert check_data.target_materialization_data
72117294
assert check_data.target_materialization_data.storage_id == m1_storage_id
72127295

7296+
# Verify get_asset_check_partition_records returns M1 as the latest materialization
7297+
partition_records = storage.get_asset_check_partition_info([check_key])
7298+
assert len(partition_records) == 1
7299+
record = partition_records[0]
7300+
assert record.last_materialization_storage_id == m1_storage_id
7301+
72137302
# Store materialization M2 for partition "a"
72147303
storage.store_event(_create_materialization_event(run_id_2, asset_key, partition="a"))
72157304

@@ -7237,6 +7326,19 @@ def test_asset_check_partitioned_with_target_materialization(
72377326
assert check_data.target_materialization_data.storage_id == m1_storage_id
72387327
assert check_data.target_materialization_data.storage_id != m2_storage_id
72397328

7329+
# Test get_asset_check_partition_records includes target_materialization_storage_id
7330+
partition_records = storage.get_asset_check_partition_info([check_key])
7331+
assert len(partition_records) == 1
7332+
7333+
record = partition_records[0]
7334+
assert record.partition_key == "a"
7335+
assert record.last_execution_status == AssetCheckExecutionRecordStatus.SUCCEEDED
7336+
assert record.last_planned_run_id == run_id_1
7337+
# Verify last_execution_target_materialization_storage_id matches M1 (what check targeted)
7338+
assert record.last_execution_target_materialization_storage_id == m1_storage_id
7339+
# Verify last_materialization_storage_id matches M2 (the current latest materialization)
7340+
assert record.last_materialization_storage_id == m2_storage_id
7341+
72407342

72417343
def _create_check_planned_event(
72427344
run_id: str,

0 commit comments

Comments
 (0)