You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Subscribers might not be alive when messages are first published
when subscribers come online, they should receive the most recent message on this stream --> this appears to work
if a subscriber is slower, it should continue to receive the most recent message, in the below code example however, it still appears to receive all the messages that the Publisher has published.
is this expected? even if I set the MaxMsgsPerSubject to 1 and DeliverPolicy: DeliverLastPerSubject?
thanks
import asyncio
import nats
from nats.js.api import ConsumerConfig, DeliverPolicy, StreamConfig
stream_name = "TEST_STREAM_2"
topic = "TEST_TOPIC_1"
# Async publisher function
async def publish_loop(nc, js):
i = 0
while True:
try:
await js.publish(topic, str.encode(f'{i}'))
i = i + 1
print(f"Publishing: {i}")
await asyncio.sleep(1)
except Exception as e:
print(f"Publish error: {e}")
await asyncio.sleep(10)
# Async subscriber function
async def subscribe_loop(nc, js):
# sub = await nc.subscribe("foo")
consumer_config = ConsumerConfig()
consumer_config.deliver_policy = DeliverPolicy.LAST_PER_SUBJECT
sub = await js.subscribe(topic, config=consumer_config)
await asyncio.sleep(5)
try:
while True:
msg = await sub.next_msg()
print("Received:", msg)
await asyncio.sleep(1) # Keep the loop running
except Exception as e:
print(f"Subscribe error: {e}")
finally:
await sub.unsubscribe()
async def main():
nc = await nats.connect("nats://localhost:4222")
js = nc.jetstream()
stream_config = StreamConfig()
stream_config.max_msgs_per_subject = 1
await js.add_stream(name=stream_name, subjects=[topic], storage='file', config=stream_config)
try:
await asyncio.gather(
subscribe_loop(nc, js),
publish_loop(nc, js)
)
finally:
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
Expected behavior
the behaviour I am looking to replicate is sow_and_subscribe (AMPS supports this), am looking to migrate away from them. when subscriber first comes online, it gets the most recent message, then it should always get the most recent message from the producer (if it is slow), however, if subscriber is fast enough, it should receive all the messages from the producer
Server and client version
nats-py-2.10.0
server: 2.11.1-binary
Host environment
Mac (Docker)
Steps to reproduce
see code above pls
Update: I played around with this a little bit more. it appears that if a subscriber is connected, the broker ensures that it receives all messages. tbh, this appears to be the correct behaviour. is there some way to achieve my use case with NATS(without making changes to client side. I already have conflation on my client. the issue is it can get quite busy)?
The text was updated successfully, but these errors were encountered:
LastPerSubject affects the initial state of the Consumer - what set of messages already existing at the stream will be delivered. After initial delivery, messages for for filtered subjects will be continously delivered. It is Consumer config and thus does not affect the contents of the stream.
MaxMsgsPerSubject defines how many messages per subject a Stream will retain at given time. The speed of the client should not impact what messages are received or dropped (unless its so slow it causes slow consumer or infrequent pull requests for PullConsumer), but rather the fact if there is client connected or not, as in the meantime messages from the Stream might be discarded.
If I understand your use case correctly, I think that the Consumer behaviour with provided config gives you exactly what you are asking for.
I would defer to @caspervonb to comment on the python snippet.
wallyqs
changed the title
MaxMsgsPerSubject not working as intended
MaxMsgsPerSubject not working as intended [v2.11.1]
May 10, 2025
Uh oh!
There was an error while loading. Please reload this page.
Observed behavior
Hi
I have a set of publishers and subscribers
is this expected? even if I set the MaxMsgsPerSubject to 1 and DeliverPolicy: DeliverLastPerSubject?
thanks
Expected behavior
the behaviour I am looking to replicate is sow_and_subscribe (AMPS supports this), am looking to migrate away from them. when subscriber first comes online, it gets the most recent message, then it should always get the most recent message from the producer (if it is slow), however, if subscriber is fast enough, it should receive all the messages from the producer
Server and client version
nats-py-2.10.0
server: 2.11.1-binary
Host environment
Mac (Docker)
Steps to reproduce
see code above pls
Update: I played around with this a little bit more. it appears that if a subscriber is connected, the broker ensures that it receives all messages. tbh, this appears to be the correct behaviour. is there some way to achieve my use case with NATS(without making changes to client side. I already have conflation on my client. the issue is it can get quite busy)?
The text was updated successfully, but these errors were encountered: