Skip to content

Commit 137c25d

Browse files
committed
[pac] Add get_asset_check_partitions storage method
1 parent 27b0720 commit 137c25d

File tree

6 files changed

+274
-2
lines changed

6 files changed

+274
-2
lines changed

python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from collections.abc import Iterable
33
from typing import NamedTuple, Optional, cast
44

5+
from dagster_shared.record import record
56
from dagster_shared.serdes import deserialize_value
67

78
import dagster._check as check
@@ -238,3 +239,30 @@ async def targets_latest_materialization(self, loading_context: LoadingContext)
238239
)
239240
else:
240241
check.failed(f"Unexpected check status {resolved_status}")
242+
243+
244+
@record
245+
class AssetCheckPartitionInfo:
246+
check_key: AssetCheckKey
247+
partition_key: Optional[str]
248+
# the status of the last execution of the check
249+
latest_execution_status: AssetCheckExecutionRecordStatus
250+
# the run id of the last planned event for the check
251+
latest_planned_run_id: str
252+
# the storage id of the last event (planned or evaluation) for the check
253+
latest_check_event_storage_id: int
254+
# the storage id of the last materialization for the asset / partition that this check targets
255+
# this is the latest overall materialization, independent of if there has been a check event
256+
# that targets it
257+
latest_materialization_storage_id: Optional[int]
258+
# the storage id of the materialization that the last execution of the check targeted
259+
latest_target_materialization_storage_id: Optional[int]
260+
261+
@property
262+
def is_current(self) -> bool:
263+
"""Returns True if the latest check execution targets the latest materialization event."""
264+
return (
265+
self.latest_materialization_storage_id is None
266+
or self.latest_materialization_storage_id
267+
== self.latest_target_materialization_storage_id
268+
)

python_modules/dagster/dagster/_core/storage/event_log/base.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from dagster._core.storage.asset_check_execution_record import (
3737
AssetCheckExecutionRecord,
3838
AssetCheckExecutionRecordStatus,
39+
AssetCheckPartitionInfo,
3940
)
4041
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
4142
from dagster._core.storage.partition_status_cache import get_and_update_asset_status_cache_value
@@ -648,6 +649,16 @@ def get_latest_asset_check_execution_by_key(
648649
"""Get the latest executions for a list of asset checks."""
649650
pass
650651

652+
@abstractmethod
653+
def get_asset_check_partition_info(
654+
self,
655+
keys: Sequence[AssetCheckKey],
656+
after_storage_id: Optional[int] = None,
657+
partition_keys: Optional[Sequence[str]] = None,
658+
) -> Sequence[AssetCheckPartitionInfo]:
659+
"""Get asset check partition records with execution status and planned run info."""
660+
pass
661+
651662
@abstractmethod
652663
def fetch_materializations(
653664
self,

python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
COMPLETED_ASSET_CHECK_EXECUTION_RECORD_STATUSES,
6060
AssetCheckExecutionRecord,
6161
AssetCheckExecutionRecordStatus,
62+
AssetCheckPartitionInfo,
6263
)
6364
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
6465
from dagster._core.storage.event_log.base import (
@@ -3008,6 +3009,7 @@ def _store_asset_check_evaluation_planned(
30083009
execution_status=AssetCheckExecutionRecordStatus.PLANNED.value,
30093010
evaluation_event=serialize_value(event),
30103011
evaluation_event_timestamp=self._event_insert_timestamp(event),
3012+
evaluation_event_storage_id=event_id,
30113013
)
30123014
for partition_key in partition_keys
30133015
]
@@ -3232,6 +3234,113 @@ def get_latest_asset_check_execution_by_key(
32323234
results[check_key] = AssetCheckExecutionRecord.from_db_row(row, key=check_key)
32333235
return results
32343236

3237+
def _get_asset_check_partition_info_for_key(
3238+
self,
3239+
check_key: AssetCheckKey,
3240+
after_storage_id: Optional[int],
3241+
partition_keys: Optional[Sequence[str]],
3242+
) -> Sequence[AssetCheckPartitionInfo]:
3243+
# Build the base filter conditions
3244+
filter_conditions = [
3245+
AssetCheckExecutionsTable.c.asset_key == check_key.asset_key.to_string(),
3246+
AssetCheckExecutionsTable.c.check_name == check_key.name,
3247+
# Historical records may have NULL in the evaluation_event_storage_id column for
3248+
# PLANNED events
3249+
AssetCheckExecutionsTable.c.evaluation_event_storage_id.isnot(None),
3250+
]
3251+
if partition_keys is not None:
3252+
filter_conditions.append(AssetCheckExecutionsTable.c.partition.in_(partition_keys))
3253+
3254+
# Subquery to find the max id for each partition
3255+
latest_check_ids_subquery = db_subquery(
3256+
db_select(
3257+
[
3258+
db.func.max(AssetCheckExecutionsTable.c.id).label("id"),
3259+
AssetCheckExecutionsTable.c.partition.label("partition"),
3260+
]
3261+
)
3262+
.where(db.and_(*filter_conditions))
3263+
.group_by(AssetCheckExecutionsTable.c.partition),
3264+
"latest_check_ids_subquery",
3265+
)
3266+
3267+
# Subquery to find the latest materialization storage id for each partition of the
3268+
# target asset. Note: we don't filter by after_storage_id here because we always want
3269+
# to return the latest materialization storage id, even if it's older than after_storage_id.
3270+
latest_materialization_ids_subquery = self._latest_event_ids_by_partition_subquery(
3271+
check_key.asset_key,
3272+
[DagsterEventType.ASSET_MATERIALIZATION],
3273+
asset_partitions=partition_keys,
3274+
)
3275+
3276+
# Main query to get all columns for the latest records, joined with latest
3277+
# materialization storage ids
3278+
query = db_select(
3279+
[
3280+
AssetCheckExecutionsTable.c.id,
3281+
AssetCheckExecutionsTable.c.partition,
3282+
AssetCheckExecutionsTable.c.execution_status,
3283+
AssetCheckExecutionsTable.c.evaluation_event_storage_id,
3284+
AssetCheckExecutionsTable.c.materialization_event_storage_id,
3285+
AssetCheckExecutionsTable.c.run_id,
3286+
latest_materialization_ids_subquery.c.id.label("latest_materialization_storage_id"),
3287+
]
3288+
).select_from(
3289+
AssetCheckExecutionsTable.join(
3290+
latest_check_ids_subquery,
3291+
AssetCheckExecutionsTable.c.id == latest_check_ids_subquery.c.id,
3292+
).join(
3293+
latest_materialization_ids_subquery,
3294+
AssetCheckExecutionsTable.c.partition
3295+
== latest_materialization_ids_subquery.c.partition,
3296+
isouter=True,
3297+
)
3298+
)
3299+
3300+
# these filters are applied to the main query rather than the individual subqueries to ensure
3301+
# we don't miss records that only have a new materialization or a new check execution but not both
3302+
if after_storage_id is not None:
3303+
query = query.where(
3304+
db.or_(
3305+
AssetCheckExecutionsTable.c.evaluation_event_storage_id > after_storage_id,
3306+
latest_materialization_ids_subquery.c.id > after_storage_id,
3307+
)
3308+
)
3309+
3310+
with self.index_connection() as conn:
3311+
rows = db_fetch_mappings(conn, query)
3312+
3313+
return [
3314+
AssetCheckPartitionInfo(
3315+
check_key=check_key,
3316+
partition_key=row["partition"],
3317+
latest_execution_status=AssetCheckExecutionRecordStatus(row["execution_status"]),
3318+
latest_target_materialization_storage_id=row["materialization_event_storage_id"],
3319+
latest_planned_run_id=row["run_id"],
3320+
latest_check_event_storage_id=row["evaluation_event_storage_id"],
3321+
latest_materialization_storage_id=row["latest_materialization_storage_id"],
3322+
)
3323+
for row in rows
3324+
]
3325+
3326+
def get_asset_check_partition_info(
3327+
self,
3328+
keys: Sequence[AssetCheckKey],
3329+
after_storage_id: Optional[int] = None,
3330+
partition_keys: Optional[Sequence[str]] = None,
3331+
) -> Sequence[AssetCheckPartitionInfo]:
3332+
check.list_param(keys, "keys", of_type=AssetCheckKey)
3333+
check.opt_int_param(after_storage_id, "after_storage_id")
3334+
3335+
infos = []
3336+
# the inner query is not feasible to join in a single query because the latest materialization ids subquery,
3337+
# so for now we fetch the info for each key separately
3338+
for key in keys:
3339+
infos.extend(
3340+
self._get_asset_check_partition_info_for_key(key, after_storage_id, partition_keys)
3341+
)
3342+
return infos
3343+
32353344
@property
32363345
def supports_asset_checks(self): # pyright: ignore[reportIncompatibleMethodOverride]
32373346
return self.has_table(AssetCheckExecutionsTable.name)

python_modules/dagster/dagster/_core/storage/event_log/sqlite/sqlite_event_log.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,12 @@ def store_event(self, event: EventLogEntry) -> None:
276276
self.store_asset_event_tags([event], [event_id])
277277

278278
if event.is_dagster_event and event.dagster_event_type in ASSET_CHECK_EVENTS:
279-
self.store_asset_check_event(event, None)
279+
# mirror the event in the cross-run index database
280+
with self.index_connection() as conn:
281+
result = conn.execute(insert_event_statement)
282+
event_id = result.inserted_primary_key[0]
283+
284+
self.store_asset_check_event(event, event_id)
280285

281286
if event.is_dagster_event and event.dagster_event_type in EVENT_TYPE_TO_PIPELINE_RUN_STATUS:
282287
# should mirror run status change events in the index shard

python_modules/dagster/dagster/_core/storage/legacy_storage.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from dagster._core.storage.asset_check_execution_record import (
1717
AssetCheckExecutionRecord,
1818
AssetCheckExecutionRecordStatus,
19+
AssetCheckPartitionInfo,
1920
)
2021
from dagster._core.storage.base_storage import DagsterStorage
2122
from dagster._core.storage.event_log.base import (
@@ -765,6 +766,16 @@ def get_latest_asset_check_execution_by_key(
765766
check_keys, partition=partition
766767
)
767768

769+
def get_asset_check_partition_info(
770+
self,
771+
keys: Sequence["AssetCheckKey"],
772+
after_storage_id: Optional[int] = None,
773+
partition_keys: Optional[Sequence[str]] = None,
774+
) -> Sequence[AssetCheckPartitionInfo]:
775+
return self._storage.event_log_storage.get_asset_check_partition_info(
776+
keys=keys, after_storage_id=after_storage_id, partition_keys=partition_keys
777+
)
778+
768779

769780
class LegacyScheduleStorage(ScheduleStorage, ConfigurableClass):
770781
def __init__(self, storage: DagsterStorage, inst_data: Optional[ConfigurableClassData] = None):

python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7119,6 +7119,53 @@ def test_asset_check_partitioned_planned_and_evaluation(
71197119
assert latest_c[check_key].partition == key_c
71207120
assert latest_c[check_key].status == AssetCheckExecutionRecordStatus.PLANNED
71217121

7122+
# Test get_asset_check_partition_records - returns all partitions with latest status
7123+
partition_records = storage.get_asset_check_partition_info([check_key])
7124+
assert len(partition_records) == 3
7125+
7126+
records_by_partition = {r.partition_key: r for r in partition_records}
7127+
assert set(records_by_partition.keys()) == set(partition_keys)
7128+
7129+
filtered_partition_records = storage.get_asset_check_partition_info(
7130+
[check_key], partition_keys=[key_a, key_b]
7131+
)
7132+
assert len(filtered_partition_records) == 2
7133+
assert set(r.partition_key for r in filtered_partition_records) == {key_a, key_b}
7134+
7135+
# Verify each partition has correct status
7136+
assert (
7137+
records_by_partition[key_a].latest_execution_status
7138+
== AssetCheckExecutionRecordStatus.SUCCEEDED
7139+
)
7140+
assert (
7141+
records_by_partition[key_b].latest_execution_status
7142+
== AssetCheckExecutionRecordStatus.FAILED
7143+
)
7144+
assert (
7145+
records_by_partition[key_c].latest_execution_status
7146+
== AssetCheckExecutionRecordStatus.PLANNED
7147+
)
7148+
7149+
# Verify all records have the same run_id (all planned in same run)
7150+
assert records_by_partition[key_a].latest_planned_run_id == run_id
7151+
assert records_by_partition[key_b].latest_planned_run_id == run_id
7152+
assert records_by_partition[key_c].latest_planned_run_id == run_id
7153+
7154+
# Verify last_storage_id is set for all records (this is the row id in the table)
7155+
assert records_by_partition[key_a].latest_check_event_storage_id is not None
7156+
assert records_by_partition[key_b].latest_check_event_storage_id is not None
7157+
assert records_by_partition[key_c].latest_check_event_storage_id is not None
7158+
7159+
# Verify last_materialization_storage_id is None for all records (no materializations)
7160+
assert records_by_partition[key_a].latest_materialization_storage_id is None
7161+
assert records_by_partition[key_b].latest_materialization_storage_id is None
7162+
assert records_by_partition[key_c].latest_materialization_storage_id is None
7163+
7164+
filtered_records = storage.get_asset_check_partition_info(
7165+
[check_key], after_storage_id=999999
7166+
)
7167+
assert len(filtered_records) == 0
7168+
71227169
def test_asset_check_partitioned_multiple_runs_same_partition(
71237170
self,
71247171
storage: EventLogStorage,
@@ -7134,22 +7181,54 @@ def test_asset_check_partitioned_multiple_runs_same_partition(
71347181
partitions_def = dg.StaticPartitionsDefinition(["a"])
71357182
partitions_subset = partitions_def.subset_with_partition_keys(["a"])
71367183

7137-
# Run 1: Store planned + evaluation for partition "a" with passed=True
7184+
# Run 1: Store planned event for partition "a"
71387185
storage.store_event(
71397186
_create_check_planned_event(run_id_1, check_key, partitions_subset=partitions_subset)
71407187
)
7188+
7189+
# status for partition "a" should be PLANNED
7190+
partition_records = storage.get_asset_check_partition_info([check_key])
7191+
assert len(partition_records) == 1
7192+
record = partition_records[0]
7193+
assert record.partition_key == "a"
7194+
assert record.latest_execution_status == AssetCheckExecutionRecordStatus.PLANNED
7195+
assert record.latest_planned_run_id == run_id_1
7196+
7197+
# Run 1: Now store evaluation event for partition "a" with passed=True
71417198
storage.store_event(
71427199
_create_check_evaluation_event(run_id_1, check_key, passed=True, partition="a")
71437200
)
71447201

7202+
# status for partition "a" should be SUCCEEDED
7203+
partition_records = storage.get_asset_check_partition_info([check_key])
7204+
assert len(partition_records) == 1
7205+
record = partition_records[0]
7206+
assert record.partition_key == "a"
7207+
assert record.latest_execution_status == AssetCheckExecutionRecordStatus.SUCCEEDED
7208+
assert record.latest_planned_run_id == run_id_1
7209+
71457210
# Run 2: Store planned + evaluation for partition "a" with passed=False
71467211
storage.store_event(
71477212
_create_check_planned_event(run_id_2, check_key, partitions_subset=partitions_subset)
71487213
)
7214+
7215+
# back to PLANNED
7216+
partition_records = storage.get_asset_check_partition_info([check_key])
7217+
record = partition_records[0]
7218+
assert record.latest_execution_status == AssetCheckExecutionRecordStatus.PLANNED
7219+
assert record.latest_planned_run_id == run_id_2
7220+
7221+
# Run 2: Now store evaluation event for partition "a" with passed=False
71497222
storage.store_event(
71507223
_create_check_evaluation_event(run_id_2, check_key, passed=False, partition="a")
71517224
)
71527225

7226+
# onto FAILED
7227+
partition_records = storage.get_asset_check_partition_info([check_key])
7228+
record = partition_records[0]
7229+
assert record.latest_execution_status == AssetCheckExecutionRecordStatus.FAILED
7230+
assert record.latest_planned_run_id == run_id_2
7231+
71537232
# Verify get_asset_check_execution_history returns 2 records for partition "a"
71547233
checks = storage.get_asset_check_execution_history(check_key, limit=10, partition="a")
71557234
assert len(checks) == 2
@@ -7176,6 +7255,16 @@ def test_asset_check_partitioned_multiple_runs_same_partition(
71767255
assert check_key in latest_overall
71777256
assert latest_overall[check_key].run_id == run_id_2
71787257

7258+
# Test get_asset_check_partition_records returns only the latest record per partition
7259+
partition_records = storage.get_asset_check_partition_info([check_key])
7260+
assert len(partition_records) == 1 # Only partition "a" exists
7261+
7262+
record = partition_records[0]
7263+
assert record.partition_key == "a"
7264+
# Should be the latest execution (run_id_2, FAILED)
7265+
assert record.latest_execution_status == AssetCheckExecutionRecordStatus.FAILED
7266+
assert record.latest_planned_run_id == run_id_2
7267+
71797268
def test_asset_check_partitioned_with_target_materialization(
71807269
self,
71817270
storage: EventLogStorage,
@@ -7237,6 +7326,12 @@ def test_asset_check_partitioned_with_target_materialization(
72377326
assert check_data.target_materialization_data
72387327
assert check_data.target_materialization_data.storage_id == m1_storage_id
72397328

7329+
# Verify get_asset_check_partition_records returns M1 as the latest materialization
7330+
partition_records = storage.get_asset_check_partition_info([check_key])
7331+
assert len(partition_records) == 1
7332+
record = partition_records[0]
7333+
assert record.latest_materialization_storage_id == m1_storage_id
7334+
72407335
# Store materialization M2 for partition "a"
72417336
storage.store_event(_create_materialization_event(run_id_2, asset_key, partition="a"))
72427337

@@ -7264,6 +7359,19 @@ def test_asset_check_partitioned_with_target_materialization(
72647359
assert check_data.target_materialization_data.storage_id == m1_storage_id
72657360
assert check_data.target_materialization_data.storage_id != m2_storage_id
72667361

7362+
# Test get_asset_check_partition_records includes target_materialization_storage_id
7363+
partition_records = storage.get_asset_check_partition_info([check_key])
7364+
assert len(partition_records) == 1
7365+
7366+
record = partition_records[0]
7367+
assert record.partition_key == "a"
7368+
assert record.latest_execution_status == AssetCheckExecutionRecordStatus.SUCCEEDED
7369+
assert record.latest_planned_run_id == run_id_1
7370+
# Verify last_execution_target_materialization_storage_id matches M1 (what check targeted)
7371+
assert record.latest_target_materialization_storage_id == m1_storage_id
7372+
# Verify last_materialization_storage_id matches M2 (the current latest materialization)
7373+
assert record.latest_materialization_storage_id == m2_storage_id
7374+
72677375

72687376
def _create_check_planned_event(
72697377
run_id: str,

0 commit comments

Comments
 (0)