Skip to content

Commit f15b7db

Browse files
authored
Don't Check Partitions for Non-Partitioned Queues (#531)
This fixes an issue where a workflow is stuck when moved from a partitioned queue to the non-partitioned internal queue (for example when resuming a workflow).
1 parent d05e364 commit f15b7db

File tree

2 files changed

+56
-12
lines changed

2 files changed

+56
-12
lines changed

dbos/_sys_db.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,10 +1862,6 @@ def start_queued_workflows(
18621862
sa.select(sa.func.count())
18631863
.select_from(SystemSchema.workflow_status)
18641864
.where(SystemSchema.workflow_status.c.queue_name == queue.name)
1865-
.where(
1866-
SystemSchema.workflow_status.c.queue_partition_key
1867-
== queue_partition_key
1868-
)
18691865
.where(
18701866
SystemSchema.workflow_status.c.status
18711867
!= WorkflowStatusString.ENQUEUED.value
@@ -1875,6 +1871,11 @@ def start_queued_workflows(
18751871
> start_time_ms - limiter_period_ms
18761872
)
18771873
)
1874+
if queue_partition_key is not None:
1875+
query = query.where(
1876+
SystemSchema.workflow_status.c.queue_partition_key
1877+
== queue_partition_key
1878+
)
18781879
num_recent_queries = c.execute(query).fetchone()[0] # type: ignore
18791880
if num_recent_queries >= queue.limiter["limit"]:
18801881
return []
@@ -1890,16 +1891,17 @@ def start_queued_workflows(
18901891
)
18911892
.select_from(SystemSchema.workflow_status)
18921893
.where(SystemSchema.workflow_status.c.queue_name == queue.name)
1893-
.where(
1894-
SystemSchema.workflow_status.c.queue_partition_key
1895-
== queue_partition_key
1896-
)
18971894
.where(
18981895
SystemSchema.workflow_status.c.status
18991896
== WorkflowStatusString.PENDING.value
19001897
)
19011898
.group_by(SystemSchema.workflow_status.c.executor_id)
19021899
)
1900+
if queue_partition_key is not None:
1901+
pending_tasks_query = pending_tasks_query.where(
1902+
SystemSchema.workflow_status.c.queue_partition_key
1903+
== queue_partition_key
1904+
)
19031905
pending_workflows = c.execute(pending_tasks_query).fetchall()
19041906
pending_workflows_dict = {row[0]: row[1] for row in pending_workflows}
19051907
local_pending_workflows = pending_workflows_dict.get(executor_id, 0)
@@ -1935,10 +1937,6 @@ def start_queued_workflows(
19351937
)
19361938
.select_from(SystemSchema.workflow_status)
19371939
.where(SystemSchema.workflow_status.c.queue_name == queue.name)
1938-
.where(
1939-
SystemSchema.workflow_status.c.queue_partition_key
1940-
== queue_partition_key
1941-
)
19421940
.where(
19431941
SystemSchema.workflow_status.c.status
19441942
== WorkflowStatusString.ENQUEUED.value
@@ -1955,6 +1953,11 @@ def start_queued_workflows(
19551953
# to ensure all processes have a consistent view of the table.
19561954
.with_for_update(skip_locked=skip_locks, nowait=(not skip_locks))
19571955
)
1956+
if queue_partition_key is not None:
1957+
query = query.where(
1958+
SystemSchema.workflow_status.c.queue_partition_key
1959+
== queue_partition_key
1960+
)
19581961
if queue.priority_enabled:
19591962
query = query.order_by(
19601963
SystemSchema.workflow_status.c.priority.asc(),

tests/test_queue.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,47 @@ def regular_workflow() -> None:
10001000
assert queue_entries_are_cleaned_up(dbos)
10011001

10021002

1003+
def test_resuming_queued_partitioned_workflows(dbos: DBOS) -> None:
1004+
start_event = threading.Event()
1005+
blocking_event = threading.Event()
1006+
1007+
@DBOS.workflow()
1008+
def stuck_workflow() -> None:
1009+
start_event.set()
1010+
blocking_event.wait()
1011+
1012+
@DBOS.workflow()
1013+
def regular_workflow() -> None:
1014+
return
1015+
1016+
# Enqueue a blocked workflow and two regular workflows on a queue with concurrency 1
1017+
queue = Queue("test_queue", concurrency=1, partition_queue=True)
1018+
wfid = str(uuid.uuid4())
1019+
with SetEnqueueOptions(queue_partition_key="key"):
1020+
blocked_handle = queue.enqueue(stuck_workflow)
1021+
with SetWorkflowID(wfid):
1022+
regular_handle_1 = queue.enqueue(regular_workflow)
1023+
regular_handle_2 = queue.enqueue(regular_workflow)
1024+
1025+
# Verify that the blocked workflow starts and is PENDING while the regular workflows remain ENQUEUED.
1026+
start_event.wait()
1027+
assert blocked_handle.get_status().status == WorkflowStatusString.PENDING.value
1028+
assert regular_handle_1.get_status().status == WorkflowStatusString.ENQUEUED.value
1029+
assert regular_handle_2.get_status().status == WorkflowStatusString.ENQUEUED.value
1030+
1031+
# Resume a regular workflow. Verify it completes.
1032+
dbos.resume_workflow(wfid)
1033+
assert regular_handle_1.get_result() == None
1034+
1035+
# Complete the blocked workflow. Verify the second regular workflow also completes.
1036+
blocking_event.set()
1037+
assert blocked_handle.get_result() == None
1038+
assert regular_handle_2.get_result() == None
1039+
1040+
# Verify all queue entries eventually get cleaned up.
1041+
assert queue_entries_are_cleaned_up(dbos)
1042+
1043+
10031044
def test_dlq_enqueued_workflows(dbos: DBOS) -> None:
10041045
start_event = threading.Event()
10051046
blocking_event = threading.Event()

0 commit comments

Comments
 (0)