Skip to content

Commit 8924e96

Browse files
committed
[pac] add AssetCheckStatusCacheValue
1 parent fccb6fe commit 8924e96

File tree

2 files changed

+336
-21
lines changed

2 files changed

+336
-21
lines changed

python_modules/dagster/dagster/_core/asset_graph_view/asset_graph_view.py

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
import functools
32
from collections.abc import Awaitable, Iterable
43
from datetime import datetime, timedelta
@@ -22,6 +21,7 @@
2221
from dagster._core.definitions.freshness import FreshnessState
2322
from dagster._core.definitions.partitions.context import (
2423
PartitionLoadingContext,
24+
partition_loading_context,
2525
use_partition_loading_context,
2626
)
2727
from dagster._core.definitions.partitions.definition import (
@@ -564,9 +564,12 @@ async def compute_subset_with_status(
564564
"""Returns the subset of an asset check that matches a given status."""
565565
from dagster._core.storage.event_log.base import AssetCheckSummaryRecord
566566

567-
# Handle partitioned asset checks with partition-level granularity
567+
# Handle partitioned asset checks
568568
if self._get_partitions_def(key):
569-
return await self._get_partitioned_check_subset_with_status(key, status, from_subset)
569+
with partition_loading_context(new_ctx=self._partition_loading_context):
570+
return await self._get_partitioned_check_subset_with_status(
571+
key, status, from_subset
572+
)
570573

571574
# Handle non-partitioned asset checks with existing logic
572575
summary = await AssetCheckSummaryRecord.gen(self, key)
@@ -628,35 +631,39 @@ async def _compute_missing_check_subset(
628631
) -> EntitySubset[AssetCheckKey]:
629632
return await self.compute_subset_with_status(key, None, from_subset)
630633

631-
async def _get_asset_check_partition_status(
632-
self, key: AssetCheckKey, partition: str
633-
) -> Optional["AssetCheckExecutionResolvedStatus"]:
634-
# NOTE: we should add a LoadingContext-native version of this
635-
record = self.instance.event_log_storage.get_latest_asset_check_execution_by_key(
636-
[key], partition
637-
).get(key)
638-
if record:
639-
targets_latest = await record.targets_latest_materialization(self)
640-
return await record.resolve_status(self) if targets_latest else None
641-
else:
642-
return None
643-
634+
@use_partition_loading_context
644635
async def _get_partitioned_check_subset_with_status(
645636
self,
646637
key: AssetCheckKey,
647638
status: Optional["AssetCheckExecutionResolvedStatus"],
648639
from_subset: EntitySubset,
649640
) -> EntitySubset[AssetCheckKey]:
641+
from dagster._core.storage.asset_check_status_cache import (
642+
get_updated_asset_check_status_cache_value,
643+
)
644+
650645
check_node = self.asset_graph.get(key)
651646
if not check_node or not check_node.partitions_def:
652647
check.failed(f"Asset check {key} not found or not partitioned.")
653648

654-
partitions = list(from_subset.expensively_compute_partition_keys())
655-
statuses = await asyncio.gather(
656-
*(self._get_asset_check_partition_status(key, p) for p in partitions)
649+
cache_value = get_updated_asset_check_status_cache_value(
650+
key, check_node.partitions_def, self
657651
)
658-
matching_partitions = {p for p, s in zip(partitions, statuses) if s == status}
659-
return from_subset.compute_intersection_with_partition_keys(matching_partitions)
652+
653+
if status is None:
654+
known_statuses = self.get_empty_subset(key=key)
655+
for serializable_subset in cache_value.subsets.values():
656+
subset = self.get_subset_from_serializable_subset(serializable_subset)
657+
if subset:
658+
known_statuses = known_statuses.compute_union(subset)
659+
return from_subset.compute_difference(known_statuses) or self.get_empty_subset(key=key)
660+
else:
661+
serializable_subset = cache_value.subsets.get(status)
662+
if serializable_subset is None:
663+
return self.get_empty_subset(key=key)
664+
return self.get_subset_from_serializable_subset(
665+
serializable_subset
666+
) or self.get_empty_subset(key=key)
660667

661668
async def _compute_run_in_progress_asset_subset(self, key: AssetKey) -> EntitySubset[AssetKey]:
662669
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
from collections.abc import Mapping, Sequence
2+
from typing import Optional
3+
4+
from dagster_shared.record import record
5+
from dagster_shared.serdes import whitelist_for_serdes
6+
7+
from dagster._core.asset_graph_view.serializable_entity_subset import SerializableEntitySubset
8+
from dagster._core.definitions.asset_key import AssetCheckKey
9+
from dagster._core.definitions.partitions.definition.partitions_definition import (
10+
PartitionsDefinition,
11+
)
12+
from dagster._core.events.log import DagsterEventType
13+
from dagster._core.loader import LoadingContext
14+
from dagster._core.storage.asset_check_execution_record import (
15+
AssetCheckExecutionRecordStatus,
16+
AssetCheckExecutionResolvedStatus,
17+
AssetCheckPartitionRecord,
18+
)
19+
from dagster._core.storage.dagster_run import DagsterRunStatus, RunRecord
20+
21+
22+
@whitelist_for_serdes
23+
@record
24+
class AssetCheckStatusCacheValue:
25+
latest_check_record_storage_id: int
26+
latest_materialization_storage_id: int
27+
subsets: Mapping[AssetCheckExecutionResolvedStatus, SerializableEntitySubset[AssetCheckKey]]
28+
in_progress_runs: Mapping[str, SerializableEntitySubset[AssetCheckKey]]
29+
30+
def compatible_with(self, partitions_def: Optional[PartitionsDefinition]) -> bool:
31+
subset = next(iter(self.subsets.values()), None)
32+
if subset is None:
33+
return True
34+
return subset.is_compatible_with_partitions_def(partitions_def)
35+
36+
@classmethod
37+
def empty(cls) -> "AssetCheckStatusCacheValue":
38+
return cls(
39+
latest_check_record_storage_id=0,
40+
latest_materialization_storage_id=0,
41+
subsets={},
42+
in_progress_runs={},
43+
)
44+
45+
46+
def _identify_affected_partitions(
47+
check_records: Sequence[AssetCheckPartitionRecord],
48+
partitions_with_new_materializations: set[str],
49+
partitions_def: PartitionsDefinition,
50+
) -> set[str]:
51+
affected: set[str] = set()
52+
53+
# Partitions with new check records
54+
for check_record in check_records:
55+
pk = check_record.partition_key
56+
if pk is not None and partitions_def.has_partition_key(pk):
57+
affected.add(pk)
58+
59+
# Partitions with new materializations (these clear check status)
60+
for pk in partitions_with_new_materializations:
61+
if partitions_def.has_partition_key(pk):
62+
affected.add(pk)
63+
64+
return affected
65+
66+
67+
def _update_subsets_from_partition_records(
68+
partition_records: Mapping[str, AssetCheckPartitionRecord],
69+
latest_mat_by_partition: Mapping[str, int],
70+
affected_partitions: set[str],
71+
key: AssetCheckKey,
72+
partitions_def: PartitionsDefinition,
73+
initial_subsets: dict[
74+
AssetCheckExecutionResolvedStatus, SerializableEntitySubset[AssetCheckKey]
75+
],
76+
initial_in_progress_runs: dict[str, SerializableEntitySubset[AssetCheckKey]],
77+
) -> tuple[
78+
dict[AssetCheckExecutionResolvedStatus, SerializableEntitySubset[AssetCheckKey]],
79+
dict[str, SerializableEntitySubset[AssetCheckKey]],
80+
]:
81+
"""Returns a set of updated subsets based on new partition records and the latest materialization storage ids."""
82+
new_subsets = {
83+
status: initial_subsets.get(status, SerializableEntitySubset.empty(key, partitions_def))
84+
for status in AssetCheckExecutionResolvedStatus
85+
}
86+
new_in_progress_runs = dict(initial_in_progress_runs)
87+
empty_subset = SerializableEntitySubset.empty(key, partitions_def)
88+
89+
# Step 1: Clear all affected partitions from all status subsets
90+
for pk in affected_partitions:
91+
partition_subset = SerializableEntitySubset.from_coercible_value(key, pk, partitions_def)
92+
for status in new_subsets:
93+
new_subsets[status] = new_subsets[status].compute_difference(partition_subset)
94+
95+
# Step 2: For partitions with check records, resolve and set new status
96+
for pk, check_record in partition_records.items():
97+
if pk is None or not partitions_def.has_partition_key(pk):
98+
continue
99+
100+
partition_subset = SerializableEntitySubset.from_coercible_value(key, pk, partitions_def)
101+
102+
if check_record.last_execution_status == AssetCheckExecutionRecordStatus.PLANNED:
103+
# Add to IN_PROGRESS and track run
104+
new_subsets[AssetCheckExecutionResolvedStatus.IN_PROGRESS] = new_subsets[
105+
AssetCheckExecutionResolvedStatus.IN_PROGRESS
106+
].compute_union(partition_subset)
107+
run_id = check_record.last_planned_run_id
108+
new_in_progress_runs[run_id] = new_in_progress_runs.get(
109+
run_id, empty_subset
110+
).compute_union(partition_subset)
111+
112+
elif check_record.last_execution_status in (
113+
AssetCheckExecutionRecordStatus.SUCCEEDED,
114+
AssetCheckExecutionRecordStatus.FAILED,
115+
):
116+
# Check if check targets latest materialization
117+
latest_mat_storage_id = latest_mat_by_partition.get(pk)
118+
check_target_storage_id = check_record.last_execution_target_materialization_storage_id
119+
120+
is_current = (
121+
latest_mat_storage_id is None or check_target_storage_id == latest_mat_storage_id
122+
)
123+
124+
if is_current:
125+
# Check is current, set appropriate status
126+
status = (
127+
AssetCheckExecutionResolvedStatus.SUCCEEDED
128+
if check_record.last_execution_status
129+
== AssetCheckExecutionRecordStatus.SUCCEEDED
130+
else AssetCheckExecutionResolvedStatus.FAILED
131+
)
132+
new_subsets[status] = new_subsets[status].compute_union(partition_subset)
133+
# else: stale check, partition stays unknown (already cleared)
134+
135+
return new_subsets, new_in_progress_runs
136+
137+
138+
def _apply_in_progress_resolution(
139+
loading_context: LoadingContext,
140+
key: AssetCheckKey,
141+
partitions_def: PartitionsDefinition,
142+
subsets: dict[AssetCheckExecutionResolvedStatus, SerializableEntitySubset[AssetCheckKey]],
143+
in_progress_runs: dict[str, SerializableEntitySubset[AssetCheckKey]],
144+
) -> tuple[
145+
dict[AssetCheckExecutionResolvedStatus, SerializableEntitySubset[AssetCheckKey]],
146+
dict[str, SerializableEntitySubset[AssetCheckKey]],
147+
]:
148+
"""Resolve in-progress runs that have completed.
149+
150+
This checks if any runs tracked in in_progress_runs have finished,
151+
and moves their partitions to SKIPPED or EXECUTION_FAILED.
152+
"""
153+
if not in_progress_runs:
154+
return subsets, in_progress_runs
155+
156+
delta_skipped, delta_execution_failed, resolved_run_ids = _resolve_in_progress_subsets(
157+
loading_context, key, partitions_def, in_progress_runs
158+
)
159+
160+
new_in_progress_runs = {
161+
run_id: subset
162+
for run_id, subset in in_progress_runs.items()
163+
if run_id not in resolved_run_ids
164+
}
165+
166+
new_subsets = dict(subsets)
167+
new_subsets[AssetCheckExecutionResolvedStatus.IN_PROGRESS] = (
168+
new_subsets[AssetCheckExecutionResolvedStatus.IN_PROGRESS]
169+
.compute_difference(delta_skipped)
170+
.compute_difference(delta_execution_failed)
171+
)
172+
new_subsets[AssetCheckExecutionResolvedStatus.SKIPPED] = new_subsets[
173+
AssetCheckExecutionResolvedStatus.SKIPPED
174+
].compute_union(delta_skipped)
175+
new_subsets[AssetCheckExecutionResolvedStatus.EXECUTION_FAILED] = new_subsets[
176+
AssetCheckExecutionResolvedStatus.EXECUTION_FAILED
177+
].compute_union(delta_execution_failed)
178+
179+
return new_subsets, new_in_progress_runs
180+
181+
182+
def _resolve_in_progress_subsets(
183+
loading_context: LoadingContext,
184+
key: AssetCheckKey,
185+
partitions_def: PartitionsDefinition,
186+
in_progress_runs: Mapping[str, SerializableEntitySubset[AssetCheckKey]],
187+
) -> tuple[
188+
SerializableEntitySubset[AssetCheckKey],
189+
SerializableEntitySubset[AssetCheckKey],
190+
set[str],
191+
]:
192+
"""Resolve in-progress runs that have completed.
193+
194+
Returns:
195+
Tuple of (delta_skipped, delta_execution_failed, resolved_run_ids)
196+
"""
197+
run_ids = list(in_progress_runs.keys())
198+
run_records = RunRecord.blocking_get_many(loading_context, in_progress_runs.keys())
199+
200+
empty_subset = SerializableEntitySubset.empty(key, partitions_def)
201+
delta_skipped = empty_subset
202+
delta_execution_failed = empty_subset
203+
resolved_run_ids: set[str] = set()
204+
205+
for run_id, run_record in zip(run_ids, run_records):
206+
if run_record is None or run_record.dagster_run.is_finished:
207+
resolved_run_ids.add(run_id)
208+
if run_record and run_record.dagster_run.status == DagsterRunStatus.FAILURE:
209+
delta_execution_failed = delta_execution_failed.compute_union(
210+
in_progress_runs[run_id]
211+
)
212+
else:
213+
delta_skipped = delta_skipped.compute_union(in_progress_runs[run_id])
214+
215+
return delta_skipped, delta_execution_failed, resolved_run_ids
216+
217+
218+
def get_updated_asset_check_status_cache_value(
219+
key: AssetCheckKey,
220+
partitions_def: PartitionsDefinition,
221+
loading_context: LoadingContext,
222+
) -> AssetCheckStatusCacheValue:
223+
"""Compute an updated cache value for the given asset check using per-partition records.
224+
225+
This function fetches new check and materialization events, identifies affected partitions,
226+
and resolves the status for each partition by comparing the latest check execution against
227+
the latest materialization.
228+
"""
229+
current_value = None # TODO: actually store / load this
230+
if current_value is None or not current_value.compatible_with(partitions_def):
231+
current_value = AssetCheckStatusCacheValue.empty()
232+
233+
empty_subset = SerializableEntitySubset.empty(key, partitions_def)
234+
235+
# Phase 1: Fetch new check records and partitions with new materializations
236+
check_records = loading_context.instance.event_log_storage.get_asset_check_partition_records(
237+
key, after_event_storage_id=current_value.latest_check_record_storage_id
238+
)
239+
partitions_with_new_mats = (
240+
loading_context.instance.event_log_storage.get_materialized_partitions(
241+
key.asset_key, after_cursor=current_value.latest_materialization_storage_id
242+
)
243+
)
244+
245+
# Compute new check cursor
246+
new_check_cursor = max(
247+
(r.last_event_id for r in check_records),
248+
default=current_value.latest_check_record_storage_id,
249+
)
250+
251+
# Phase 2: Identify affected partitions
252+
affected_partitions = _identify_affected_partitions(
253+
check_records, partitions_with_new_mats, partitions_def
254+
)
255+
256+
# Phase 3: Get latest materialization storage_ids for affected partitions
257+
# This gives us both the cursor and the staleness comparison data
258+
latest_mat_by_partition = (
259+
loading_context.instance.get_latest_storage_id_by_partition(
260+
key.asset_key,
261+
DagsterEventType.ASSET_MATERIALIZATION,
262+
partitions=affected_partitions,
263+
)
264+
if affected_partitions
265+
else {}
266+
)
267+
268+
# Compute new materialization cursor from affected partitions
269+
new_mat_cursor = max(
270+
latest_mat_by_partition.values(),
271+
default=current_value.latest_materialization_storage_id,
272+
)
273+
274+
# Phase 4: Update subsets from partition records
275+
initial_subsets = {
276+
status: current_value.subsets.get(status, empty_subset)
277+
for status in AssetCheckExecutionResolvedStatus
278+
}
279+
if not affected_partitions:
280+
# No updates needed, just resolve in-progress runs
281+
new_subsets, new_in_progress_runs = initial_subsets, dict(current_value.in_progress_runs)
282+
else:
283+
# Filter out None partition keys (only handle partitioned assets)
284+
partition_records: dict[str, AssetCheckPartitionRecord] = {}
285+
for r in check_records:
286+
if r.partition_key is not None:
287+
partition_records[r.partition_key] = r
288+
new_subsets, new_in_progress_runs = _update_subsets_from_partition_records(
289+
partition_records,
290+
latest_mat_by_partition,
291+
affected_partitions,
292+
key,
293+
partitions_def,
294+
initial_subsets,
295+
dict(current_value.in_progress_runs),
296+
)
297+
298+
# Phase 5: Resolve completed in-progress runs
299+
new_subsets, new_in_progress_runs = _apply_in_progress_resolution(
300+
loading_context, key, partitions_def, new_subsets, new_in_progress_runs
301+
)
302+
303+
return AssetCheckStatusCacheValue(
304+
latest_check_record_storage_id=new_check_cursor,
305+
latest_materialization_storage_id=new_mat_cursor,
306+
subsets=new_subsets,
307+
in_progress_runs=new_in_progress_runs,
308+
)

0 commit comments

Comments
 (0)