Skip to content

Commit 1ada326

Browse files
committed
allow for watching future values with watcher
Include StopIterSentinel oops
1 parent 7e7883e commit 1ada326

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

nats/js/kv.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@
3535
logger = logging.getLogger(__name__)
3636

3737

38+
class StopIterSentinel:
39+
"""A sentinel class used to indicate that iteration should stop."""
40+
41+
pass
42+
43+
3844
class KeyValue:
3945
"""
4046
KeyValue uses the JetStream KeyValue functionality.
@@ -278,6 +284,9 @@ async def purge_deletes(self, olderthan: int = 30 * 60) -> bool:
278284
watcher = await self.watchall()
279285
delete_markers = []
280286
async for update in watcher:
287+
if update is None:
288+
break
289+
281290
if update.operation == KV_DEL or update.operation == KV_PURGE:
282291
delete_markers.append(update)
283292

@@ -300,11 +309,11 @@ async def status(self) -> BucketStatus:
300309
return KeyValue.BucketStatus(stream_info=info, bucket=self._name)
301310

302311
class KeyWatcher:
312+
STOP_ITER = StopIterSentinel()
303313

304314
def __init__(self, js):
305315
self._js = js
306-
self._updates: asyncio.Queue[KeyValue.Entry
307-
| None] = asyncio.Queue(maxsize=256)
316+
self._updates: asyncio.Queue[KeyValue.Entry | None | StopIterSentinel] = asyncio.Queue(maxsize=256)
308317
self._sub = None
309318
self._pending: Optional[int] = None
310319

@@ -317,6 +326,7 @@ async def stop(self):
317326
stop will stop this watcher.
318327
"""
319328
await self._sub.unsubscribe()
329+
await self._updates.put(KeyValue.KeyWatcher.STOP_ITER)
320330

321331
async def updates(self, timeout=5.0):
322332
"""
@@ -331,10 +341,10 @@ def __aiter__(self):
331341
return self
332342

333343
async def __anext__(self):
334-
entry = await self._updates.get()
335-
if not entry:
336-
raise StopAsyncIteration
337-
else:
344+
while True:
345+
entry = await self._updates.get()
346+
if isinstance(entry, StopIterSentinel):
347+
raise StopAsyncIteration
338348
return entry
339349

340350
async def watchall(self, **kwargs) -> KeyWatcher:

nats/js/object_store.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
ObjectDeletedError,
3434
ObjectNotFoundError,
3535
)
36-
from nats.js.kv import MSG_ROLLUP_SUBJECT
36+
from nats.js.kv import MSG_ROLLUP_SUBJECT, StopIterSentinel
3737

3838
VALID_BUCKET_RE = re.compile(r"^[a-zA-Z0-9_-]+$")
3939
VALID_KEY_RE = re.compile(r"^[-/_=\.a-zA-Z0-9]+$")
@@ -427,10 +427,11 @@ async def update_meta(
427427
await self._js.purge_stream(self._stream, subject=meta_subj)
428428

429429
class ObjectWatcher:
430+
STOP_ITER = StopIterSentinel()
430431

431432
def __init__(self, js):
432433
self._js = js
433-
self._updates = asyncio.Queue(maxsize=256)
434+
self._updates: asyncio.Queue[Union[api.ObjectInfo, None, StopIterSentinel]] = asyncio.Queue(maxsize=256)
434435
self._sub = None
435436
self._pending: Optional[int] = None
436437

@@ -457,10 +458,11 @@ def __aiter__(self):
457458
return self
458459

459460
async def __anext__(self):
460-
entry = await self._updates.get()
461-
if not entry:
462-
raise StopAsyncIteration
463-
else:
461+
while True:
462+
entry = await self._updates.get()
463+
464+
if isinstance(entry, StopIterSentinel):
465+
raise StopAsyncIteration
464466
return entry
465467

466468
async def watch(

0 commit comments

Comments
 (0)