Skip to content

Commit a19307d

Browse files
committed
allow for watching future values with watcher
Include StopIterSentinel oops
1 parent 4302a50 commit a19307d

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

nats/js/kv.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@
3535
logger = logging.getLogger(__name__)
3636

3737

38+
class StopIterSentinel:
39+
"""A sentinel class used to indicate that iteration should stop."""
40+
41+
pass
42+
43+
3844
class KeyValue:
3945
"""
4046
KeyValue uses the JetStream KeyValue functionality.
@@ -275,6 +281,9 @@ async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:
275281
watcher = await self.watchall()
276282
delete_markers = []
277283
async for update in watcher:
284+
if update is None:
285+
break
286+
278287
if update.operation == KV_DEL or update.operation == KV_PURGE:
279288
delete_markers.append(update)
280289

@@ -299,11 +308,11 @@ async def status(self) -> BucketStatus:
299308
return KeyValue.BucketStatus(stream_info=info, bucket=self._name)
300309

301310
class KeyWatcher:
311+
STOP_ITER = StopIterSentinel()
302312

303313
def __init__(self, js):
304314
self._js = js
305-
self._updates: asyncio.Queue[KeyValue.Entry
306-
| None] = asyncio.Queue(maxsize=256)
315+
self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256)
307316
self._sub = None
308317
self._pending: Optional[int] = None
309318

@@ -316,6 +325,7 @@ async def stop(self):
316325
stop will stop this watcher.
317326
"""
318327
await self._sub.unsubscribe()
328+
await self._updates.put(KeyValue.KeyWatcher.STOP_ITER)
319329

320330
async def updates(self, timeout=5.0):
321331
"""
@@ -330,10 +340,10 @@ def __aiter__(self):
330340
return self
331341

332342
async def __anext__(self):
333-
entry = await self._updates.get()
334-
if not entry:
335-
raise StopAsyncIteration
336-
else:
343+
while True:
344+
entry = await self._updates.get()
345+
if isinstance(entry, StopIterSentinel):
346+
raise StopAsyncIteration
337347
return entry
338348

339349
async def watchall(self, **kwargs) -> KeyWatcher:

nats/js/object_store.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
ObjectDeletedError,
3434
ObjectNotFoundError,
3535
)
36-
from nats.js.kv import MSG_ROLLUP_SUBJECT
36+
from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel
3737

3838
VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
3939
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")
@@ -424,10 +424,11 @@ async def update_meta(
424424
await self._js.purge_stream(self._stream, subject=meta_subj)
425425

426426
class ObjectWatcher:
427+
STOP_ITER = StopIterSentinel()
427428

428429
def __init__(self, js):
429430
self._js = js
430-
self._updates = asyncio.Queue(maxsize=256)
431+
self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256)
431432
self._sub = None
432433
self._pending: Optional[int] = None
433434

@@ -454,10 +455,11 @@ def __aiter__(self):
454455
return self
455456

456457
async def __anext__(self):
457-
entry = await self._updates.get()
458-
if not entry:
459-
raise StopAsyncIteration
460-
else:
458+
while True:
459+
entry = await self._updates.get()
460+
461+
if isinstance(entry, StopIterSentinel):
462+
raise StopAsyncIteration
461463
return entry
462464

463465
async def watch(

0 commit comments

Comments
 (0)