Skip to content

Commit 3360a3c

Browse files
authored
Separate step configurations from deployment (#3739)
* Don't convert deployment to model * Skip recursive dehydration for some attributes * Skip more dehydration * Defer loading step configurations * Add option to include only subset of step configurations * Split up step configurations * Fix migration * Add step configuration unique constraint * Don't join load in WITH UPDATE statement * Don't join load in WITH UPDATE statement * Merge migrations * Linting
1 parent 4fb3d91 commit 3360a3c

15 files changed

+368
-68
lines changed

src/zenml/entrypoints/step_entrypoint_configuration.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
import os
1717
import sys
1818
from typing import TYPE_CHECKING, Any, List, Set
19+
from uuid import UUID
1920

2021
from zenml.client import Client
2122
from zenml.entrypoints.base_entrypoint_configuration import (
23+
DEPLOYMENT_ID_OPTION,
2224
BaseEntrypointConfiguration,
2325
)
2426
from zenml.integrations.registry import integration_registry
@@ -147,6 +149,18 @@ def get_entrypoint_arguments(
147149
kwargs[STEP_NAME_OPTION],
148150
]
149151

152+
def load_deployment(self) -> "PipelineDeploymentResponse":
153+
"""Loads the deployment.
154+
155+
Returns:
156+
The deployment.
157+
"""
158+
deployment_id = UUID(self.entrypoint_args[DEPLOYMENT_ID_OPTION])
159+
step_name = self.entrypoint_args[STEP_NAME_OPTION]
160+
return Client().zen_store.get_deployment(
161+
deployment_id=deployment_id, step_configuration_filter=[step_name]
162+
)
163+
150164
def run(self) -> None:
151165
"""Prepares the environment and runs the configured step."""
152166
deployment = self.load_deployment()

src/zenml/models/v2/core/pipeline_build.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ class PipelineBuildResponseBody(ProjectScopedResponseBody):
200200
class PipelineBuildResponseMetadata(ProjectScopedResponseMetadata):
201201
"""Response metadata for pipeline builds."""
202202

203+
__zenml_skip_dehydration__: ClassVar[List[str]] = [
204+
"images",
205+
]
206+
203207
pipeline: Optional["PipelineResponse"] = Field(
204208
default=None, title="The pipeline that was used for this build."
205209
)

src/zenml/models/v2/core/pipeline_deployment.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# permissions and limitations under the License.
1414
"""Models representing pipeline deployments."""
1515

16-
from typing import Any, Dict, Optional, Union
16+
from typing import Any, ClassVar, Dict, List, Optional, Union
1717
from uuid import UUID
1818

1919
from pydantic import Field
@@ -130,6 +130,13 @@ class PipelineDeploymentResponseBody(ProjectScopedResponseBody):
130130
class PipelineDeploymentResponseMetadata(ProjectScopedResponseMetadata):
131131
"""Response metadata for pipeline deployments."""
132132

133+
__zenml_skip_dehydration__: ClassVar[List[str]] = [
134+
"pipeline_configuration",
135+
"step_configurations",
136+
"client_environment",
137+
"pipeline_spec",
138+
]
139+
133140
run_name_template: str = Field(
134141
title="The run name template for runs created using this deployment.",
135142
)

src/zenml/models/v2/core/pipeline_run.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ class PipelineRunResponseBody(ProjectScopedResponseBody):
190190
class PipelineRunResponseMetadata(ProjectScopedResponseMetadata):
191191
"""Response metadata for pipeline runs."""
192192

193+
__zenml_skip_dehydration__: ClassVar[List[str]] = [
194+
"run_metadata",
195+
"config",
196+
"client_environment",
197+
"orchestrator_environment",
198+
]
199+
193200
run_metadata: Dict[str, MetadataType] = Field(
194201
default={},
195202
title="Metadata associated with this pipeline run.",

src/zenml/models/v2/core/step_run.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,12 @@ class StepRunResponseBody(ProjectScopedResponseBody):
199199
class StepRunResponseMetadata(ProjectScopedResponseMetadata):
200200
"""Response metadata for step runs."""
201201

202+
__zenml_skip_dehydration__: ClassVar[List[str]] = [
203+
"config",
204+
"spec",
205+
"metadata",
206+
]
207+
202208
# Configuration
203209
config: "StepConfiguration" = Field(title="The configuration of the step.")
204210
spec: "StepSpec" = Field(title="The spec of the step.")

src/zenml/zen_server/rbac/utils.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,16 @@ def dehydrate_response_model(
120120
)
121121

122122
dehydrated_values = {}
123+
skip_dehydration = getattr(model, "__zenml_skip_dehydration__", [])
123124
# See `get_subresources_for_model(...)` for a detailed explanation why we
124125
# need to use `model.__iter__()` here
125126
for key, value in model.__iter__():
126-
dehydrated_values[key] = _dehydrate_value(
127-
value, permissions=permissions
128-
)
127+
if key in skip_dehydration:
128+
dehydrated_values[key] = value
129+
else:
130+
dehydrated_values[key] = _dehydrate_value(
131+
value, permissions=permissions
132+
)
129133

130134
return type(model).model_validate(dehydrated_values)
131135

@@ -579,8 +583,10 @@ def get_subresources_for_model(
579583
for item in model:
580584
resources.update(_get_subresources_for_value(item))
581585
else:
582-
for _, value in model.__iter__():
583-
resources.update(_get_subresources_for_value(value))
586+
skip_dehydration = getattr(model, "__zenml_skip_dehydration__", [])
587+
for key, value in model.__iter__():
588+
if key not in skip_dehydration:
589+
resources.update(_get_subresources_for_value(value))
584590

585591
return resources
586592

src/zenml/zen_server/routers/pipeline_deployments_endpoints.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
# permissions and limitations under the License.
1414
"""Endpoint definitions for deployments."""
1515

16-
from typing import Any, Optional, Union
16+
from typing import Any, List, Optional, Union
1717
from uuid import UUID
1818

19-
from fastapi import APIRouter, Depends, Request, Security
19+
from fastapi import APIRouter, Depends, Query, Request, Security
2020

2121
from zenml.constants import API, PIPELINE_DEPLOYMENTS, VERSION_1
2222
from zenml.logging.step_logging import fetch_logs
@@ -201,6 +201,7 @@ def get_deployment(
201201
request: Request,
202202
deployment_id: UUID,
203203
hydrate: bool = True,
204+
step_configuration_filter: Optional[List[str]] = Query(None),
204205
_: AuthContext = Security(authorize),
205206
) -> Any:
206207
"""Gets a specific deployment using its unique id.
@@ -210,6 +211,9 @@ def get_deployment(
210211
deployment_id: ID of the deployment to get.
211212
hydrate: Flag deciding whether to hydrate the output model(s)
212213
by including metadata fields in the response.
214+
step_configuration_filter: List of step configurations to include in
215+
the response. If not given, all step configurations will be
216+
included.
213217
214218
Returns:
215219
A specific deployment object.
@@ -218,6 +222,7 @@ def get_deployment(
218222
id=deployment_id,
219223
get_method=zen_store().get_deployment,
220224
hydrate=hydrate,
225+
step_configuration_filter=step_configuration_filter,
221226
)
222227

223228
exclude = None
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""Split up step configurations [3d7e39f3ac92].
2+
3+
Revision ID: 3d7e39f3ac92
4+
Revises: 0.83.0
5+
Create Date: 2025-06-17 17:45:31.702617
6+
7+
"""
8+
9+
import json
10+
import uuid
11+
12+
import sqlalchemy as sa
13+
import sqlmodel
14+
from alembic import op
15+
from sqlalchemy.dialects import mysql
16+
17+
from zenml.utils.time_utils import utc_now
18+
19+
# revision identifiers, used by Alembic.
20+
revision = "3d7e39f3ac92"
21+
down_revision = "0.83.0"
22+
branch_labels = None
23+
depends_on = None
24+
25+
26+
def upgrade() -> None:
27+
"""Upgrade database schema and/or data, creating a new revision."""
28+
# ### commands auto generated by Alembic - please adjust! ###
29+
op.create_table(
30+
"step_configuration",
31+
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
32+
sa.Column("created", sa.DateTime(), nullable=False),
33+
sa.Column("updated", sa.DateTime(), nullable=False),
34+
sa.Column("index", sa.Integer(), nullable=False),
35+
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
36+
sa.Column(
37+
"config",
38+
sa.String(length=16777215).with_variant(mysql.MEDIUMTEXT, "mysql"),
39+
nullable=False,
40+
),
41+
sa.Column(
42+
"deployment_id", sqlmodel.sql.sqltypes.GUID(), nullable=False
43+
),
44+
sa.ForeignKeyConstraint(
45+
["deployment_id"],
46+
["pipeline_deployment.id"],
47+
name="fk_step_configuration_deployment_id_pipeline_deployment",
48+
ondelete="CASCADE",
49+
),
50+
sa.PrimaryKeyConstraint("id"),
51+
sa.UniqueConstraint(
52+
"deployment_id", "name", name="unique_step_name_for_deployment"
53+
),
54+
)
55+
with op.batch_alter_table("pipeline_deployment", schema=None) as batch_op:
56+
batch_op.add_column(
57+
sa.Column("step_count", sa.Integer(), nullable=True)
58+
)
59+
60+
# Migrate existing step configurations
61+
connection = op.get_bind()
62+
meta = sa.MetaData()
63+
meta.reflect(
64+
bind=connection, only=("pipeline_deployment", "step_configuration")
65+
)
66+
pipeline_deployment_table = sa.Table("pipeline_deployment", meta)
67+
step_configuration_table = sa.Table("step_configuration", meta)
68+
69+
step_configurations_to_insert = []
70+
deployment_updates = []
71+
72+
for deployment_id, step_configurations_json in connection.execute(
73+
sa.select(
74+
pipeline_deployment_table.c.id,
75+
pipeline_deployment_table.c.step_configurations,
76+
)
77+
):
78+
step_configurations = json.loads(step_configurations_json)
79+
80+
step_count = len(step_configurations)
81+
deployment_updates.append(
82+
{
83+
"id_": deployment_id,
84+
"step_count": step_count,
85+
}
86+
)
87+
88+
for index, (step_name, step_config) in enumerate(
89+
step_configurations.items()
90+
):
91+
now = utc_now()
92+
step_configurations_to_insert.append(
93+
{
94+
"id": str(uuid.uuid4()).replace("-", ""),
95+
"created": now,
96+
"updated": now,
97+
"index": index,
98+
"name": step_name,
99+
"config": json.dumps(step_config),
100+
"deployment_id": deployment_id,
101+
}
102+
)
103+
104+
op.bulk_insert(
105+
step_configuration_table, rows=step_configurations_to_insert
106+
)
107+
if deployment_updates:
108+
connection.execute(
109+
sa.update(pipeline_deployment_table).where(
110+
pipeline_deployment_table.c.id == sa.bindparam("id_")
111+
),
112+
deployment_updates,
113+
)
114+
115+
with op.batch_alter_table("pipeline_deployment", schema=None) as batch_op:
116+
batch_op.alter_column(
117+
"step_count", existing_type=sa.Integer(), nullable=False
118+
)
119+
batch_op.drop_column("step_configurations")
120+
121+
# ### end Alembic commands ###
122+
123+
124+
def downgrade() -> None:
125+
"""Downgrade database schema and/or data back to the previous revision."""
126+
# ### commands auto generated by Alembic - please adjust! ###
127+
with op.batch_alter_table("pipeline_deployment", schema=None) as batch_op:
128+
batch_op.add_column(
129+
sa.Column(
130+
"step_configurations",
131+
sa.VARCHAR(length=16777215),
132+
nullable=False,
133+
)
134+
)
135+
batch_op.drop_column("step_count")
136+
137+
op.drop_table("step_configuration")
138+
# ### end Alembic commands ###

src/zenml/zen_stores/rest_zen_store.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1631,14 +1631,20 @@ def create_deployment(
16311631
)
16321632

16331633
def get_deployment(
1634-
self, deployment_id: UUID, hydrate: bool = True
1634+
self,
1635+
deployment_id: UUID,
1636+
hydrate: bool = True,
1637+
step_configuration_filter: Optional[List[str]] = None,
16351638
) -> PipelineDeploymentResponse:
16361639
"""Get a deployment with a given ID.
16371640
16381641
Args:
16391642
deployment_id: ID of the deployment.
16401643
hydrate: Flag deciding whether to hydrate the output model(s)
16411644
by including metadata fields in the response.
1645+
step_configuration_filter: List of step configurations to include in
1646+
the response. If not given, all step configurations will be
1647+
included.
16421648
16431649
Returns:
16441650
The deployment.
@@ -1647,7 +1653,10 @@ def get_deployment(
16471653
resource_id=deployment_id,
16481654
route=PIPELINE_DEPLOYMENTS,
16491655
response_model=PipelineDeploymentResponse,
1650-
params={"hydrate": hydrate},
1656+
params={
1657+
"hydrate": hydrate,
1658+
"step_configuration_filter": step_configuration_filter,
1659+
},
16511660
)
16521661

16531662
def list_deployments(

src/zenml/zen_stores/schemas/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from zenml.zen_stores.schemas.server_settings_schemas import ServerSettingsSchema
3636
from zenml.zen_stores.schemas.pipeline_deployment_schemas import (
3737
PipelineDeploymentSchema,
38+
StepConfigurationSchema,
3839
)
3940
from zenml.zen_stores.schemas.pipeline_run_schemas import PipelineRunSchema
4041
from zenml.zen_stores.schemas.pipeline_schemas import PipelineSchema
@@ -91,6 +92,7 @@
9192
"OAuthDeviceSchema",
9293
"PipelineBuildSchema",
9394
"PipelineDeploymentSchema",
95+
"StepConfigurationSchema",
9496
"PipelineRunSchema",
9597
"PipelineSchema",
9698
"RunMetadataResourceSchema",

0 commit comments

Comments
 (0)