-
Notifications
You must be signed in to change notification settings - Fork 236
Open
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression
Description
Observed behavior
I run the it in docker container, the first time it create consumer normally, but after reconstructed the container, it throws error:
Traceback (most recent call last):
File "/app/.venv/lib/python3.13/site-packages/nats/js/client.py", line 877, in recreate_consumer
cinfo = await self._js._jsm.add_consumer(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<3 lines>...
)
^
File "/app/.venv/lib/python3.13/site-packages/nats/js/manager.py", line 262, in add_consumer
resp = await self._api_request(subject, req_data, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/nats/js/manager.py", line 421, in _api_request
raise APIError.from_error(resp["error"])
~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.13/site-packages/nats/js/errors.py", line 85, in from_error
raise ServerError(**err)
nats.js.errors.ServerError: nats: ServerError: code=500 err_code=10012 description='deliver policy can not be updated'
nats: encountered error
here is the code:
class NatsSubscriber:
def __init__(
self,
*,
server: str = "localhost",
stream: str = "sample-stream",
subject: str = "foo",
queue_name: str = "default-queue",
durable: bool = True,
ordered: bool = False,
):
self.server = server
self.stream = stream
self.subject = subject
self.queue_name = queue_name
self.ordered = ordered
self.durable = durable
self.nc: Client | None = None
self.js: JetStreamContext | None = None
async def connect(self) -> Self:
self.nc = await nats.connect(self.server)
if self.durable:
self.js = self.nc.jetstream()
try:
await self.js.stream_info(self.stream)
logger.info(f"stream {self.stream} already exist")
except NotFoundError:
logger.info(f"creating stream {self.stream}")
await self.js.add_stream(name=self.stream, subjects=[self.subject])
return self
async def subscribe(
self, handler: Callable[[Msg], Awaitable[None]]
) -> Subscription:
if self.js:
return await self.js.subscribe(
subject=self.subject,
durable=self.queue_name,
cb=handler,
ordered_consumer=self.ordered,
deliver_policy=DeliverPolicy.ALL,
)
if self.nc:
return await self.nc.subscribe(
subject=self.subject,
queue=self.queue_name,
cb=handler,
)
raise RuntimeError("Subscriber not connected")
async def close(self):
if self.nc:
await self.nc.drain()
any answer will be welcome, thanks!
Expected behavior
start consumer normally and consuming message
Server and client version
client: "nats-py>=2.10.0",
server: docker latest images
Host environment
No response
Steps to reproduce
No response
Metadata
Metadata
Assignees
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression