Skip to content

Commit 1306e55

Browse files
committed
[pac] Update user-facing APIs and snapshots
1 parent 0c3076c commit 1306e55

File tree

13 files changed

+816
-2
lines changed

13 files changed

+816
-2
lines changed

python_modules/dagster/dagster/_core/definitions/asset_checks/asset_check_result.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,15 @@ def to_asset_check_evaluation(
172172
else:
173173
target_materialization_data = None
174174

175+
if step_context.has_partition_key:
176+
check_spec = assets_def_for_check.get_spec_for_check_key(check_key)
177+
if check_spec.partitions_def is not None:
178+
partition = step_context.partition_key
179+
else:
180+
partition = None
181+
else:
182+
partition = None
183+
175184
return AssetCheckEvaluation(
176185
check_name=check_key.name,
177186
asset_key=check_key.asset_key,
@@ -181,6 +190,7 @@ def to_asset_check_evaluation(
181190
severity=self.severity,
182191
description=self.description,
183192
blocking=assets_def_for_check.get_spec_for_check_key(check_key).blocking,
193+
partition=partition,
184194
)
185195

186196
def with_metadata(self, metadata: Mapping[str, RawMetadataValue]) -> "AssetCheckResult": # pyright: ignore[reportIncompatibleMethodOverride]

python_modules/dagster/dagster/_core/definitions/asset_checks/asset_check_spec.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
replace,
1111
)
1212
from dagster_shared.serdes import whitelist_for_serdes
13+
from dagster_shared.utils.warnings import preview_warning
1314

1415
from dagster._annotations import PublicAttr, public
1516
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey, CoercibleToAssetKey
17+
from dagster._core.definitions.partitions.definition import PartitionsDefinition
1618

1719
if TYPE_CHECKING:
1820
from dagster._core.definitions.assets.definition.asset_dep import AssetDep, CoercibleToAssetDep
@@ -58,6 +60,7 @@ class AssetCheckSpec(IHaveNew, LegacyNamedTupleMixin):
5860
blocking: PublicAttr[bool]
5961
metadata: PublicAttr[Mapping[str, Any]]
6062
automation_condition: PublicAttr[Optional[LazyAutomationCondition]]
63+
partitions_def: PublicAttr[Optional[PartitionsDefinition]]
6164

6265
"""Defines information about an asset check, except how to execute it.
6366
@@ -80,6 +83,9 @@ class AssetCheckSpec(IHaveNew, LegacyNamedTupleMixin):
8083
that multi-asset is responsible for enforcing that downstream assets within the
8184
same step do not execute after a blocking asset check fails.
8285
metadata (Optional[Mapping[str, Any]]): A dict of static metadata for this asset check.
86+
automation_condition (Optional[AutomationCondition[AssetCheckKey]]): The AutomationCondition for this asset check.
87+
partitions_def (Optional[PartitionsDefinition]): The PartitionsDefinition for this asset check. Must be either None
88+
or the same as the PartitionsDefinition of the asset specified by `asset`.
8389
"""
8490

8591
def __new__(
@@ -92,11 +98,15 @@ def __new__(
9298
blocking: bool = False,
9399
metadata: Optional[Mapping[str, Any]] = None,
94100
automation_condition: Optional["AutomationCondition[AssetCheckKey]"] = None,
101+
partitions_def: Optional[PartitionsDefinition] = None,
95102
):
96103
from dagster._core.definitions.assets.definition.asset_dep import (
97104
coerce_to_deps_and_check_duplicates,
98105
)
99106

107+
if partitions_def is not None:
108+
preview_warning("Specifying a partitions_def on an AssetCheckSpec")
109+
100110
asset_key = AssetKey.from_coercible_or_definition(asset)
101111

102112
additional_asset_deps = coerce_to_deps_and_check_duplicates(
@@ -119,6 +129,7 @@ def __new__(
119129
blocking=blocking,
120130
metadata=metadata or {},
121131
automation_condition=automation_condition,
132+
partitions_def=partitions_def,
122133
)
123134

124135
def get_python_identifier(self) -> str:

python_modules/dagster/dagster/_core/definitions/assets/graph/asset_graph.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def __init__(
189189
v.get_spec_for_check_key(k).description,
190190
v.get_spec_for_check_key(k).automation_condition,
191191
v.get_spec_for_check_key(k).metadata,
192+
v.get_spec_for_check_key(k).partitions_def,
192193
)
193194
for k, v in assets_defs_by_check_key.items()
194195
}

python_modules/dagster/dagster/_core/definitions/assets/graph/base_asset_graph.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,15 @@ def __init__(
217217
description: Optional[str],
218218
automation_condition: Optional["AutomationCondition[AssetCheckKey]"],
219219
metadata: ArbitraryMetadataMapping,
220+
partitions_def: Optional[PartitionsDefinition],
220221
):
221222
self.key = key
222223
self.blocking = blocking
223224
self._automation_condition = automation_condition
224225
self._additional_deps = additional_deps
225226
self._description = description
226227
self._metadata = metadata
228+
self._partitions_def = partitions_def
227229

228230
@property
229231
def parent_entity_keys(self) -> AbstractSet[AssetKey]:
@@ -235,8 +237,7 @@ def child_entity_keys(self) -> AbstractSet[EntityKey]:
235237

236238
@property
237239
def partitions_def(self) -> Optional[PartitionsDefinition]:
238-
# all checks are unpartitioned
239-
return None
240+
return self._partitions_def
240241

241242
@property
242243
def partition_mappings(self) -> Mapping[EntityKey, PartitionMapping]:
@@ -266,6 +267,10 @@ class BaseAssetGraph(ABC, Generic[T_AssetNode]):
266267
def asset_nodes(self) -> Iterable[T_AssetNode]:
267268
return self._asset_nodes_by_key.values()
268269

270+
@property
271+
def asset_check_nodes(self) -> Iterable[AssetCheckNode]:
272+
return self._asset_check_nodes_by_key.values()
273+
269274
@property
270275
def nodes(self) -> Iterable[BaseEntityNode]:
271276
return [
@@ -668,6 +673,28 @@ def validate_partitions(self):
668673
f"Invalid partition mapping from {node.key.to_user_string()} to {parent.key.to_user_string()}"
669674
) from e
670675

676+
# Validate that asset checks have compatible partitions_def with their target asset
677+
for node in self.asset_check_nodes:
678+
if node.partitions_def is None:
679+
continue
680+
681+
target_asset_key = node.key.asset_key
682+
if not self.has(target_asset_key):
683+
raise DagsterInvalidDefinitionError(
684+
f"Partitioned asset check '{node.key.to_user_string()}' targets "
685+
f"asset '{target_asset_key.to_user_string()}' "
686+
"but the asset does not exist in the graph."
687+
)
688+
# If the check is partitioned, it must have the same partitions_def as the asset
689+
if node.partitions_def != self.get(target_asset_key).partitions_def:
690+
raise DagsterInvalidDefinitionError(
691+
f"Asset check '{node.key.to_user_string()}' targets asset '{target_asset_key.to_user_string()}' "
692+
"but has a different partitions definition. "
693+
f"Asset check partitions_def: {node.partitions_def}, "
694+
f"Asset partitions_def: {self.get(target_asset_key).partitions_def}. "
695+
"Partitioned asset checks must have the same partitions definition as their target asset."
696+
)
697+
671698
def upstream_key_iterator(self, asset_key: AssetKey) -> Iterator[AssetKey]:
672699
"""Iterates through all asset keys which are upstream of the given key."""
673700
visited: set[AssetKey] = set()

python_modules/dagster/dagster/_core/definitions/assets/graph/remote_asset_graph.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,9 @@ def _get_asset_check_node_from_remote_asset_check_node(
480480
remote_node.asset_check.description,
481481
remote_node.asset_check.automation_condition,
482482
{}, # metadata not yet on AssetCheckNodeSnap
483+
remote_node.asset_check.partitions_def_snapshot.get_partitions_definition()
484+
if remote_node.asset_check.partitions_def_snapshot
485+
else None,
483486
)
484487

485488
##### COMMON ASSET GRAPH INTERFACE

python_modules/dagster/dagster/_core/definitions/declarative_automation/automation_context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ def create(key: EntityKey, evaluator: "AutomationConditionEvaluator") -> "Automa
7373
condition = check.not_none(
7474
evaluator.asset_graph.get(key).automation_condition or evaluator.default_condition
7575
)
76+
7677
unique_ids = condition.get_node_unique_ids(
7778
parent_unique_ids=[None], child_indices=[None], target_key=None
7879
)

python_modules/dagster/dagster/_core/definitions/decorators/asset_check_decorator.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
from dagster._core.definitions.decorators.op_decorator import _Op
2929
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
3030
from dagster._core.definitions.output import Out
31+
from dagster._core.definitions.partitions.definition.partitions_definition import (
32+
PartitionsDefinition,
33+
)
3134
from dagster._core.definitions.policy import RetryPolicy
3235
from dagster._core.definitions.source_asset import SourceAsset
3336
from dagster._core.definitions.utils import DEFAULT_OUTPUT
@@ -113,6 +116,7 @@ def asset_check(
113116
metadata: Optional[Mapping[str, Any]] = None,
114117
automation_condition: Optional[AutomationCondition[AssetCheckKey]] = None,
115118
pool: Optional[str] = None,
119+
partitions_def: Optional[PartitionsDefinition] = None,
116120
) -> Callable[[AssetCheckFunction], AssetChecksDefinition]:
117121
"""Create a definition for how to execute an asset check.
118122
@@ -151,6 +155,7 @@ def asset_check(
151155
automation_condition (Optional[AutomationCondition]): An AutomationCondition which determines
152156
when this check should be executed.
153157
pool (Optional[str]): A string that identifies the concurrency pool that governs this asset check's execution.
158+
partitions_def (Optional[PartitionsDefinition]): The PartitionsDefinition for this asset check.
154159
155160
Produces an :py:class:`AssetChecksDefinition` object.
156161
@@ -218,6 +223,7 @@ def inner(fn: AssetCheckFunction) -> AssetChecksDefinition:
218223
blocking=blocking,
219224
metadata=metadata,
220225
automation_condition=automation_condition,
226+
partitions_def=partitions_def,
221227
)
222228

223229
resource_defs_for_execution = wrap_resources_for_execution(resource_defs)

python_modules/dagster/dagster/_core/execution/context/asset_check_execution_context.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from dagster._core.definitions.asset_checks.asset_check_spec import AssetCheckKey, AssetCheckSpec
77
from dagster._core.definitions.job_definition import JobDefinition
88
from dagster._core.definitions.op_definition import OpDefinition
9+
from dagster._core.definitions.partitions.partition_key_range import PartitionKeyRange
10+
from dagster._core.definitions.partitions.utils.time_window import TimeWindow
911
from dagster._core.definitions.repository_definition.repository_definition import (
1012
RepositoryDefinition,
1113
)
@@ -135,6 +137,43 @@ def step_launcher(self) -> Optional[StepLauncher]:
135137
def get_step_execution_context(self) -> StepExecutionContext:
136138
return self.op_execution_context.get_step_execution_context()
137139

140+
#### partition related
141+
@public
142+
@property
143+
@_copy_docs_from_op_execution_context
144+
def has_partition_key(self) -> bool:
145+
return self.op_execution_context.has_partition_key
146+
147+
@public
148+
@property
149+
@_copy_docs_from_op_execution_context
150+
def partition_key(self) -> str:
151+
return self.op_execution_context.partition_key
152+
153+
@public
154+
@property
155+
@_copy_docs_from_op_execution_context
156+
def partition_keys(self) -> Sequence[str]:
157+
return self.op_execution_context.partition_keys
158+
159+
@public
160+
@property
161+
@_copy_docs_from_op_execution_context
162+
def has_partition_key_range(self) -> bool:
163+
return self.op_execution_context.has_partition_key_range
164+
165+
@public
166+
@property
167+
@_copy_docs_from_op_execution_context
168+
def partition_key_range(self) -> PartitionKeyRange:
169+
return self.op_execution_context.partition_key_range
170+
171+
@public
172+
@property
173+
@_copy_docs_from_op_execution_context
174+
def partition_time_window(self) -> TimeWindow:
175+
return self.op_execution_context.partition_time_window
176+
138177
# misc
139178

140179
@public

python_modules/dagster/dagster/_core/remote_representation/external_data.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,7 @@ class AssetCheckNodeSnap(IHaveNew):
899899
additional_asset_keys: Sequence[AssetKey]
900900
automation_condition: Optional[AutomationCondition]
901901
automation_condition_snapshot: Optional[AutomationConditionSnapshot]
902+
partitions_def_snapshot: Optional[PartitionsSnap]
902903

903904
def __new__(
904905
cls,
@@ -911,6 +912,7 @@ def __new__(
911912
additional_asset_keys: Optional[Sequence[AssetKey]] = None,
912913
automation_condition: Optional[AutomationCondition] = None,
913914
automation_condition_snapshot: Optional[AutomationConditionSnapshot] = None,
915+
partitions_def_snapshot: Optional[PartitionsSnap] = None,
914916
):
915917
return super().__new__(
916918
cls,
@@ -923,6 +925,7 @@ def __new__(
923925
additional_asset_keys=additional_asset_keys or [],
924926
automation_condition=automation_condition,
925927
automation_condition_snapshot=automation_condition_snapshot,
928+
partitions_def_snapshot=partitions_def_snapshot,
926929
)
927930

928931
@property
@@ -1211,6 +1214,9 @@ def asset_check_node_snaps_from_repo(repo: RepositoryDefinition) -> Sequence[Ass
12111214
additional_asset_keys=[dep.asset_key for dep in spec.additional_deps],
12121215
automation_condition=automation_condition,
12131216
automation_condition_snapshot=automation_condition_snapshot,
1217+
partitions_def_snapshot=PartitionsSnap.from_def(spec.partitions_def)
1218+
if spec.partitions_def
1219+
else None,
12141220
)
12151221
)
12161222

0 commit comments

Comments
 (0)