Skip to content

MaxMsgsPerSubject not working as intended [v2.11.1] #6795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Roh-codeur opened this issue Apr 13, 2025 · 2 comments
Open

MaxMsgsPerSubject not working as intended [v2.11.1] #6795

Roh-codeur opened this issue Apr 13, 2025 · 2 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@Roh-codeur
Copy link

Roh-codeur commented Apr 13, 2025

Observed behavior

Hi

I have a set of publishers and subscribers

  1. Publishers would publish to a set of topics
  2. Subscribers would subscribe to a set of topics
  3. Subscribers might not be alive when messages are first published
  4. when subscribers come online, they should receive the most recent message on this stream --> this appears to work
  5. if a subscriber is slower, it should continue to receive the most recent message, in the below code example however, it still appears to receive all the messages that the Publisher has published.

is this expected? even if I set the MaxMsgsPerSubject to 1 and DeliverPolicy: DeliverLastPerSubject?

thanks

import asyncio

import nats
from nats.js.api import ConsumerConfig, DeliverPolicy, StreamConfig

stream_name = "TEST_STREAM_2"
topic = "TEST_TOPIC_1"


# Async publisher function
async def publish_loop(nc, js):
    i = 0
    while True:
        try:
            await js.publish(topic, str.encode(f'{i}'))
            i = i + 1
            print(f"Publishing: {i}")
            await asyncio.sleep(1)
        except Exception as e:
            print(f"Publish error: {e}")
            await asyncio.sleep(10)


# Async subscriber function
async def subscribe_loop(nc, js):
    # sub = await nc.subscribe("foo")
    consumer_config = ConsumerConfig()
    consumer_config.deliver_policy = DeliverPolicy.LAST_PER_SUBJECT
    sub = await js.subscribe(topic, config=consumer_config)
    await asyncio.sleep(5)
    try:
        while True:
            msg = await sub.next_msg()
            print("Received:", msg)
            await asyncio.sleep(1)  # Keep the loop running
    except Exception as e:
        print(f"Subscribe error: {e}")
    finally:
        await sub.unsubscribe()


async def main():
    nc = await nats.connect("nats://localhost:4222")
    js = nc.jetstream()
    stream_config = StreamConfig()
    stream_config.max_msgs_per_subject = 1
    await js.add_stream(name=stream_name, subjects=[topic], storage='file', config=stream_config)
    try:
        await asyncio.gather(
            subscribe_loop(nc, js),
            publish_loop(nc, js)
        )
    finally:
        await nc.close()


if __name__ == "__main__":
    asyncio.run(main())

Expected behavior

the behaviour I am looking to replicate is sow_and_subscribe (AMPS supports this), am looking to migrate away from them. when subscriber first comes online, it gets the most recent message, then it should always get the most recent message from the producer (if it is slow), however, if subscriber is fast enough, it should receive all the messages from the producer

Server and client version

nats-py-2.10.0
server: 2.11.1-binary

Host environment

Mac (Docker)

Steps to reproduce

see code above pls

Update: I played around with this a little bit more. it appears that if a subscriber is connected, the broker ensures that it receives all messages. tbh, this appears to be the correct behaviour. is there some way to achieve my use case with NATS(without making changes to client side. I already have conflation on my client. the issue is it can get quite busy)?

@Roh-codeur Roh-codeur added the defect Suspected defect such as a bug or regression label Apr 13, 2025
@Jarema
Copy link
Member

Jarema commented Apr 14, 2025

Regarding your config:

  • LastPerSubject affects the initial state of the Consumer - what set of messages already existing at the stream will be delivered. After initial delivery, messages for for filtered subjects will be continously delivered. It is Consumer config and thus does not affect the contents of the stream.
  • MaxMsgsPerSubject defines how many messages per subject a Stream will retain at given time. The speed of the client should not impact what messages are received or dropped (unless its so slow it causes slow consumer or infrequent pull requests for PullConsumer), but rather the fact if there is client connected or not, as in the meantime messages from the Stream might be discarded.

If I understand your use case correctly, I think that the Consumer behaviour with provided config gives you exactly what you are asking for.

I would defer to @caspervonb to comment on the python snippet.

@wallyqs wallyqs changed the title MaxMsgsPerSubject not working as intended MaxMsgsPerSubject not working as intended [v2.11.1] May 10, 2025
@Jgfrausing
Copy link

Might this be related to #6921? It is not the exact same setup, but it seems somewhat aligned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

3 participants