Skip to content

When the ackwait is exceeded, the stream still displays messages, but consumers cannot retrieve them again #6905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
birdycn opened this issue May 19, 2025 · 3 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@birdycn
Copy link

birdycn commented May 19, 2025

Observed behavior

Image

Expected behavior

how can i get the messages again?

Server and client version

nats-server: v2.11.4-RC.1

Host environment

No response

Steps to reproduce

No response

@birdycn birdycn added the defect Suspected defect such as a bug or regression label May 19, 2025
@birdycn
Copy link
Author

birdycn commented May 19, 2025

    "account_details": [
        {
            "name": "$G",
            "id": "$G",
            "memory": 0,
            "storage": 657,
            "reserved_memory": 18446744073709552000,
            "reserved_storage": 18446744073709552000,
            "accounts": 0,
            "ha_assets": 0,
            "api": {
                "level": 0,
                "total": 58,
                "errors": 4
            },
            "stream_detail": [
                {
                    "name": "testQueueS",
                    "created": "2025-05-19T12:28:50.7387314Z",
                    "cluster": {
                        "leader": "NB36THQKG4GHVCTCNQ4QRC22ZG72OCGXGYH3CILQWTJE6U2FOHAP4RQJ"
                    },
                    "state": {
                        "messages": 13,
                        "bytes": 657,
                        "first_seq": 23,
                        "first_ts": "2025-05-19T12:29:24.6426466Z",
                        "last_seq": 82,
                        "last_ts": "2025-05-19T12:40:46.9248638Z",
                        "num_subjects": 1,
                        "num_deleted": 47,
                        "consumer_count": 1
                    }
                }
            ]
        }
    ],
    "total": 1

@neilalexander
Copy link
Member

You will likely want to configure redeliveries if you are using AckWait. What is your consumer config?

@birdycn
Copy link
Author

birdycn commented May 21, 2025

You will likely want to configure redeliveries if you are using AckWait. What is your consumer config?

func DefaultJetStreamConfig() *streamConfig {
	return &streamConfig{
		Retention: jetstream.WorkQueuePolicy,
		Discard:   jetstream.DiscardOld,
		Storage:   jetstream.FileStorage,
		MaxAge:    48 * time.Hour,
		MaxMsgs:   10240,
	}
}
func DefaultJetConsumerConfig() *consumerConfig {
	return &consumerConfig{
		AckPolicy:  jetstream.AckExplicitPolicy,
		MaxDeliver: 5,
		AckWait:    2 * time.Minute,
	}
}

func TestQueue(t *testing.T) {
	js, err := NewJetStreamClient("127.0.0.1:4222")
	assert.Equal(t, nil, err)

	const stream = "testQueueS"
	const consumer = "testQueueC"
	const topic = "test.queue"

	cfg := DefaultJetStreamConfig()
	cfg.Retention = jetstream.WorkQueuePolicy
	s, err := js.CreateStream(context.Background(), stream, []string{topic}, cfg)
	assert.Equal(t, nil, err)

	consumerCfg := DefaultJetConsumerConfig()
	consumerCfg.AckWait = 1 * time.Second

	c, err := s.CreateConsumer(context.Background(), consumer, consumerCfg)
	assert.Equal(t, nil, err)

	c.Subscribe(
		context.Background(),
		func(msg Message) error {
			time.Sleep(2*time.Second)
			msg.Ack()
			t.Logf("stream:%s,consumer:%s,topic:%s,msg:%s",
				stream, consumer, topic, string(msg.Data()))
			return nil
		},
		WithHandleError(func(err error) {
			t.Log(err)
		}))

	var i int
	for {
		time.Sleep(500 * time.Millisecond)

		if err := c.Publish(
			context.Background(), topic,
			[]byte(fmt.Sprintf("sequence:%d", i))); err != nil {
			t.Log(err)
			if errors.Is(err, nats.ErrConnectionClosed) {
				break
			}
		}
		i++
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants