Skip to content

Commit 644ce09

Browse files
committed
feat(dag_version): add soft-delete support and set dag_version FK to SET NULL
1 parent 7d69f2e commit 644ce09

File tree

10 files changed

+1448
-1291
lines changed

10 files changed

+1448
-1291
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4cda55eb221ee0340749c8dd41af7603220d7b300e2e808d733239a86e0c2837
1+
c81c0f0c5e071b67a71b959a1a78f5b9357893c0b73efa28eaf890b8a2e299db

airflow-core/docs/img/airflow_erd.svg

Lines changed: 1287 additions & 1282 deletions
Loading

airflow-core/docs/migrations-ref.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ Here's the list of all the Database Migrations that are executed via when you ru
3939
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4040
| Revision ID | Revises ID | Airflow Version | Description |
4141
+=========================+==================+===================+==============================================================+
42-
| ``0242ac120002`` (head) | ``dfee8bd5d574`` | ``3.1.0`` | Change the Deadline column in the Deadline table from |
42+
| ``1b2cd3e4f5a6`` (head) | ``03e36c7f30aa`` | ``3.1.0`` | Modify task_instance.dag_version_id FK ondelete behavior to |
43+
| | | | SET NULL. |
44+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
45+
| ``03e36c7f30aa`` | ``0242ac120002`` | ``3.1.0`` | Add soft-delete flag to DagVersion (is_active). |
46+
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
47+
| ``0242ac120002`` | ``dfee8bd5d574`` | ``3.1.0`` | Change the Deadline column in the Deadline table from |
4348
| | | | DateTime to UTC DateTime. |
4449
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
4550
| ``dfee8bd5d574`` | ``29ce7909c52b`` | ``3.1.0`` | Add Deadline to Dag. |

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,8 @@ def grid_data(
191191
task_node_map_exclude and ti.task_id not in task_node_map_exclude
192192
) or ti.state == TaskInstanceState.REMOVED:
193193
continue
194+
if ti.task_id not in task_node_map:
195+
continue
194196

195197
# Populate the Grouped Task Instances (All Task Instances except the Parent Task Instances)
196198
if ti.task_id in get_child_task_map(
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Add soft-delete flag to DagVersion (is_active).
20+
21+
Revision ID: 03e36c7f30aa
22+
Revises: 0242ac120002
23+
Create Date: 2025-05-20 00:00:00.000000
24+
"""
25+
26+
from __future__ import annotations
27+
28+
import sqlalchemy as sa
29+
from alembic import op
30+
31+
# revision identifiers, used by Alembic.
32+
revision = "03e36c7f30aa"
33+
down_revision = "0242ac120002"
34+
branch_labels = None
35+
depends_on = None
36+
airflow_version = "3.1.0"
37+
38+
39+
def upgrade():
40+
"""Add is_active column to dag_version and backfill to True."""
41+
op.add_column(
42+
"dag_version",
43+
sa.Column(
44+
"is_active",
45+
sa.Boolean(),
46+
nullable=False,
47+
server_default="1",
48+
comment="Soft-delete flag; only active versions show up in APIs",
49+
),
50+
)
51+
# Backfill existing rows to active
52+
op.execute("UPDATE dag_version SET is_active = 1")
53+
54+
55+
def downgrade():
56+
"""Remove is_active column from dag_version."""
57+
op.drop_column("dag_version", "is_active")
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Modify task_instance.dag_version_id FK ondelete behavior to SET NULL.
20+
21+
Revision ID: 1b2cd3e4f5a6
22+
Revises: 03e36c7f30aa
23+
Create Date: 2025-05-21 00:00:00.000000
24+
"""
25+
26+
from __future__ import annotations
27+
28+
from alembic import op
29+
from sqlalchemy_utils import UUIDType
30+
31+
# revision identifiers, used by Alembic.
32+
revision = "1b2cd3e4f5a6"
33+
down_revision = "03e36c7f30aa"
34+
branch_labels = None
35+
depends_on = None
36+
airflow_version = "3.1.0"
37+
38+
39+
def upgrade():
40+
"""Alter task_instance.dag_version_id to SET NULL on delete."""
41+
with op.batch_alter_table("task_instance", schema=None) as batch_op:
42+
batch_op.drop_constraint(batch_op.f("task_instance_dag_version_id_fkey"), type_="foreignkey")
43+
batch_op.alter_column(
44+
"dag_version_id",
45+
existing_type=UUIDType(binary=False),
46+
nullable=True,
47+
)
48+
batch_op.create_foreign_key(
49+
batch_op.f("task_instance_dag_version_id_fkey"),
50+
"dag_version",
51+
["dag_version_id"],
52+
["id"],
53+
ondelete="SET NULL",
54+
)
55+
56+
57+
def downgrade():
58+
"""Revert task_instance.dag_version_id FK ondelete back to CASCADE."""
59+
with op.batch_alter_table("task_instance", schema=None) as batch_op:
60+
batch_op.drop_constraint(batch_op.f("task_instance_dag_version_id_fkey"), type_="foreignkey")
61+
batch_op.alter_column(
62+
"dag_version_id",
63+
existing_type=UUIDType(binary=False),
64+
nullable=False,
65+
)
66+
batch_op.create_foreign_key(
67+
batch_op.f("task_instance_dag_version_id_fkey"),
68+
"dag_version",
69+
["dag_version_id"],
70+
["id"],
71+
ondelete="CASCADE",
72+
)

airflow-core/src/airflow/models/dag.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1973,7 +1973,10 @@ class DagModel(Base):
19731973
"scheduler", "max_dagruns_to_create_per_loop", fallback=10
19741974
)
19751975
dag_versions = relationship(
1976-
"DagVersion", back_populates="dag_model", cascade="all, delete, delete-orphan"
1976+
"DagVersion",
1977+
back_populates="dag_model",
1978+
cascade="save-update, merge",
1979+
passive_deletes=True,
19771980
)
19781981

19791982
def __init__(self, **kwargs):

airflow-core/src/airflow/models/dag_version.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from typing import TYPE_CHECKING
2222

2323
import uuid6
24-
from sqlalchemy import Column, ForeignKey, Integer, UniqueConstraint, select
24+
from sqlalchemy import Boolean, Column, ForeignKey, Integer, UniqueConstraint, select
2525
from sqlalchemy.orm import joinedload, relationship
2626
from sqlalchemy_utils import UUIDType
2727

@@ -68,6 +68,13 @@ class DagVersion(Base):
6868
__table_args__ = (
6969
UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"),
7070
)
71+
is_active = Column(
72+
Boolean,
73+
nullable=False,
74+
default=True,
75+
server_default="1",
76+
comment="Soft-delete flag; only active versions show up in APIs",
77+
)
7178

7279
def __repr__(self):
7380
"""Represent the object as a string."""
@@ -121,7 +128,7 @@ def _latest_version_select(
121128
:param dag_id: The DAG ID.
122129
:return: The select object.
123130
"""
124-
query = select(cls).where(cls.dag_id == dag_id)
131+
query = select(cls).where(cls.dag_id == dag_id, cls.is_active)
125132
if bundle_version:
126133
query = query.where(cls.bundle_version == bundle_version)
127134

@@ -163,16 +170,18 @@ def get_version(
163170
session: Session = NEW_SESSION,
164171
) -> DagVersion | None:
165172
"""
166-
Get the version of the DAG.
173+
Get the version of the DAG, if version_number is given, otherwise only active versions.
167174
168175
:param dag_id: The DAG ID.
169176
:param version_number: The version number.
170177
:param session: The database session.
171178
:return: The version of the DAG or None if not found.
172179
"""
173180
version_select_obj = select(cls).where(cls.dag_id == dag_id)
174-
if version_number:
181+
if version_number is not None:
175182
version_select_obj = version_select_obj.where(cls.version_number == version_number)
183+
else:
184+
version_select_obj = version_select_obj.where(cls.is_active)
176185

177186
return session.scalar(version_select_obj.order_by(cls.id.desc()).limit(1))
178187

airflow-core/src/airflow/models/taskinstance.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,11 @@ class TaskInstance(Base, LoggingMixin):
639639
next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
640640

641641
_task_display_property_value = Column("task_display_name", String(2000), nullable=True)
642-
dag_version_id = Column(UUIDType(binary=False), ForeignKey("dag_version.id", ondelete="CASCADE"))
642+
dag_version_id = Column(
643+
UUIDType(binary=False),
644+
ForeignKey("dag_version.id", ondelete="SET NULL"),
645+
nullable=True,
646+
)
643647
dag_version = relationship("DagVersion", back_populates="task_instances")
644648

645649
__table_args__ = (

airflow-core/src/airflow/utils/db.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class MappedClassProtocol(Protocol):
9393
"2.10.0": "22ed7efa9da2",
9494
"2.10.3": "5f2621c13b39",
9595
"3.0.0": "29ce7909c52b",
96-
"3.1.0": "0242ac120002",
96+
"3.1.0": "1b2cd3e4f5a6",
9797
}
9898

9999

0 commit comments

Comments
 (0)