Skip to content

Commit a92c38d

Browse files
authored
Explicit Queue Listening (#537)
By default, a process running DBOS dequeues from all declared queues. Now, you can instead use `DBOS.listen_queues` to explicitly tell a process running DBOS to only dequeue workflows from a specific set of queues. ```python DBOS.listen_queues( queues: List[Queue] ) ``` For example, you can use this to manage heterogeneous workers, so workers of type A only dequeue and execute workflows from queue A while workers of type B only dequeue and execute workflows from queue B. For example: ```python cpu_queue = Queue("queue_one") gpu_queue = Queue("queue_two") if __name__ == "__main__": config = ... worker_type = ... # "cpu' or 'gpu' DBOS(config=config) if worker_type = "gpu": # GPU workers will only dequeue and execute workflows from the GPU queue DBOS.listen_queues([gpu_queue]) elif worker_type == "cpu": # CPU workers will only dequeue and execute workflows from the CPU queue DBOS.listen_queues([cpu_queue]) DBOS.launch() ``` Also add logging of all queues being listened to.
1 parent 977f769 commit a92c38d

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

dbos/_dbos.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ def __init__(
326326
self._app_db_field: Optional[ApplicationDatabase] = None
327327
self._registry: DBOSRegistry = _get_or_create_dbos_registry()
328328
self._registry.dbos = self
329+
self._listening_queues: Optional[List[Queue]] = None
329330
self._admin_server_field: Optional[AdminServer] = None
330331
# Stop internal background threads (queue thread, timeout threads, etc.)
331332
self.background_thread_stop_events: List[threading.Event] = []
@@ -1576,6 +1577,21 @@ def tracer(self) -> DBOSTracer:
15761577
"""Return the DBOS OpenTelemetry tracer."""
15771578
return dbos_tracer
15781579

1580+
@classmethod
1581+
def listen_queues(cls, queues: List[Queue]) -> None:
1582+
"""
1583+
Configure this DBOS process to only listen to (dequeue workflows from) specific queues.
1584+
1585+
Args:
1586+
queues(List[Queue]): The list of queues to listen to
1587+
"""
1588+
dbos = _get_dbos_instance()
1589+
if dbos._launched:
1590+
raise DBOSException("listen_queues called after DBOS is launched")
1591+
if dbos._listening_queues is not None:
1592+
raise DBOSException("listen_queues called more than once")
1593+
dbos._listening_queues = queues
1594+
15791595

15801596
class WorkflowHandle(Generic[R], Protocol):
15811597
"""

dbos/_queue.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,25 @@ def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
175175
queue_threads: dict[str, threading.Thread] = {}
176176
check_interval = 1.0 # Check for new queues every second
177177

178+
if dbos._listening_queues is not None:
179+
listening_queues = dbos._listening_queues
180+
else:
181+
listening_queues = list(dbos._registry.queue_info_map.values())
182+
listening_queues = [
183+
q for q in listening_queues if q.name != INTERNAL_QUEUE_NAME
184+
]
185+
dbos.logger.info(f"Listening to {len(listening_queues)} queues:")
186+
log_queues(listening_queues)
187+
178188
while not stop_event.is_set():
179-
# Check for new queues
180-
current_queues = dict(dbos._registry.queue_info_map)
189+
if dbos._listening_queues is not None:
190+
# If explicitly listening for queues, only use those queues
191+
current_queues = {queue.name: queue for queue in dbos._listening_queues}
192+
# Always listen to the internal queue
193+
current_queues[INTERNAL_QUEUE_NAME] = dbos._registry.get_internal_queue()
194+
else:
195+
# Else, check all declared queues
196+
current_queues = dict(dbos._registry.queue_info_map)
181197

182198
# Start threads for new queues
183199
for queue_name, queue in current_queues.items():
@@ -212,3 +228,21 @@ def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
212228
dbos.logger.debug(
213229
f"Queue worker thread for {queue_name} stopped successfully"
214230
)
231+
232+
233+
def log_queues(queues: list[Queue]) -> None:
234+
"""Helper function to log queues on DBOS launch."""
235+
for q in queues:
236+
opts = []
237+
if q.concurrency is not None:
238+
opts.append(f"concurrency={q.concurrency}")
239+
if q.worker_concurrency is not None:
240+
opts.append(f"worker_concurrency={q.worker_concurrency}")
241+
if q.limiter is not None:
242+
opts.append(f"limit={q.limiter['limit']}/{q.limiter['period']}s")
243+
if q.priority_enabled:
244+
opts.append("priority")
245+
if q.partition_queue:
246+
opts.append("partitioned")
247+
opts_str = f" ({', '.join(opts)})" if opts else ""
248+
dbos_logger.info(f"Queue: {q.name}{opts_str}")

tests/test_queue.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,3 +1752,35 @@ def workflow() -> str:
17521752
start_time = time.time()
17531753
assert queue.enqueue(workflow).get_result(polling_interval_sec=0.1)
17541754
assert time.time() - start_time < 1.0
1755+
1756+
1757+
def test_listen_queue(dbos: DBOS, config: DBOSConfig) -> None:
1758+
DBOS.destroy(destroy_registry=True)
1759+
DBOS(config=config)
1760+
1761+
queue_one = Queue("queue_one")
1762+
queue_two = Queue("queue_two")
1763+
1764+
@DBOS.workflow()
1765+
def workflow() -> str:
1766+
assert DBOS.workflow_id
1767+
return DBOS.workflow_id
1768+
1769+
DBOS.listen_queues([queue_one])
1770+
DBOS.launch()
1771+
1772+
# While only listening to queue one, only workflows enqueued there execute
1773+
handle_one = queue_one.enqueue(workflow)
1774+
handle_two = queue_two.enqueue(workflow)
1775+
assert handle_one.get_result()
1776+
assert handle_two.get_status().status == "ENQUEUED"
1777+
1778+
DBOS.destroy()
1779+
DBOS(config=config)
1780+
DBOS.listen_queues([queue_two])
1781+
DBOS.launch()
1782+
1783+
# Listening to queue two completes its workflows
1784+
assert DBOS.retrieve_workflow(handle_two.workflow_id).get_result()
1785+
# Verify the internal queue works
1786+
assert DBOS.fork_workflow(handle_two.workflow_id, 0).get_result()

0 commit comments

Comments
 (0)