Skip to content

deliver policy can not be updated after the container reconstructed #778

@newdee

Description

@newdee

Observed behavior

I run the it in docker container, the first time it create consumer normally, but after reconstructed the container, it throws error:

Traceback (most recent call last):
  File "/app/.venv/lib/python3.13/site-packages/nats/js/client.py", line 877, in recreate_consumer
    cinfo = await self._js._jsm.add_consumer(
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
  File "/app/.venv/lib/python3.13/site-packages/nats/js/manager.py", line 262, in add_consumer
    resp = await self._api_request(subject, req_data, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/nats/js/manager.py", line 421, in _api_request
    raise APIError.from_error(resp["error"])
          ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.13/site-packages/nats/js/errors.py", line 85, in from_error
    raise ServerError(**err)
nats.js.errors.ServerError: nats: ServerError: code=500 err_code=10012 description='deliver policy can not be updated'
nats: encountered error

here is the code:

class NatsSubscriber:
    def __init__(
        self,
        *,
        server: str = "localhost",
        stream: str = "sample-stream",
        subject: str = "foo",
        queue_name: str = "default-queue",
        durable: bool = True,
        ordered: bool = False,
    ):
        self.server = server
        self.stream = stream
        self.subject = subject
        self.queue_name = queue_name
        self.ordered = ordered
        self.durable = durable
        self.nc: Client | None = None
        self.js: JetStreamContext | None = None

    async def connect(self) -> Self:
        self.nc = await nats.connect(self.server)

        if self.durable:
            self.js = self.nc.jetstream()
            try:
                await self.js.stream_info(self.stream)
                logger.info(f"stream {self.stream} already exist")
            except NotFoundError:
                logger.info(f"creating stream {self.stream}")
                await self.js.add_stream(name=self.stream, subjects=[self.subject])
        return self

    async def subscribe(
        self, handler: Callable[[Msg], Awaitable[None]]
    ) -> Subscription:
        if self.js:
            return await self.js.subscribe(
                subject=self.subject,
                durable=self.queue_name,
                cb=handler,
                ordered_consumer=self.ordered,
                deliver_policy=DeliverPolicy.ALL,
            )

        if self.nc:
            return await self.nc.subscribe(
                subject=self.subject,
                queue=self.queue_name,
                cb=handler,
            )

        raise RuntimeError("Subscriber not connected")

    async def close(self):
        if self.nc:
            await self.nc.drain()

any answer will be welcome, thanks!

Expected behavior

start consumer normally and consuming message

Server and client version

client: "nats-py>=2.10.0",
server: docker latest images

Host environment

No response

Steps to reproduce

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    defectSuspected defect such as a bug or regression

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions