Replies: 1 comment
-
|
Thanks for filing this from the Slack thread — wanted to add a closer trace before we change the docs, because I think the root-cause attribution is slightly off.
annotations = {}
if user_priority:
annotations["priority"] = user_priority
...
annotations = merge(dask.get_annotations(), annotations)With The "no observable effect" symptom is almost certainly real, but the more likely explanations are:
Workarounds today:
Proposed follow-ups:
Minimal repro to disambiguate the three theories: import dask
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def slow(i):
import time; time.sleep(0.5)
return i
@flow(task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 1, "threads_per_worker": 1}, # force a queue
))
def f():
futs = []
for i in range(10):
with dask.annotate(priority=i): # later submissions get HIGHER priority
futs.append(slow.submit(i))
return [f.result() for f in futs]Task 0 starts immediately on the only worker; tasks 1–9 queue. If annotations reach the scheduler, the queued tasks should finish in order 9, 8, 7, …, 1 (highest priority first). If they finish in submission order (1, 2, …, 9) instead, the annotation is being dropped — next thing to check is whether the actual |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
This discussion was created from a Slack thread conversation.
Original Thread: https://prefect-community.slack.com/archives/C079VLLH5D3/p1776339347743129
Summary
The Prefect docs page at https://docs.prefect.io/integrations/prefect-dask#with-prefect shows an example suggesting users can influence task execution order with dask.annotate(priority=...). In Prefect 3.x with the DaskTaskRunner, this annotation does not change scheduling order because Prefect’s Dask client passes an explicit priority=0 to distributed.Client.submit(...) for each task, which overrides any annotation context. The result is that annotated priorities do not take effect in Prefect-managed task submission.
Impact
Users trying to prioritize tasks (e.g., when m > n workers) by using dask.annotate(...) will see no effect and get confused, as reported by a user in Slack. This contradicts the current docs example.
Observed behavior
priorityrun earlier.Likely root cause (code-level)
PrefectDaskClient.submit(...)forwards an explicitprioritykwarg todistributed.Client.submit(...), defaulting to 0.priority=argument overrides anyannotate(...)context, includingdistributed.annotate.DaskTaskRunner.submit(...)does not expose apriorityparameter, andTask.submit(...)does not either, so users cannot pass priority through Prefect at this time.Requested doc correction
dask.annotate(priority=...)(and evendistributed.annotate) will not affect Prefect-managed task submission because an explicit priority is set when submitting to Dask.wait_forfor strict sequencing.priorityparameter to Prefect’sTask.submit(...)and/orDaskTaskRunner.submit(...)that passes through to the Dask client.Links for context
If maintainers confirm this behavior is intended, updating the docs would prevent confusion for users trying to control order with annotations under Prefect’s DaskTaskRunner.
This discussion was automatically created by the Marvin bot to preserve valuable community insights.
Beta Was this translation helpful? Give feedback.
All reactions