Skip to content

Commit bd387a5

Browse files
Merge pull request #57 from segmentio/support-batch
Support publishing batch messages
2 parents 79edaca + 241959e commit bd387a5

File tree

7 files changed

+117
-18
lines changed

7 files changed

+117
-18
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ jobs:
88
# github.com/actions/setup-go/tags
99
uses: actions/setup-go@v5
1010
with:
11-
go-version: 1.22.x
11+
go-version: 1.24.x
1212
# github.com/actions/checkout/tags
1313
- uses: actions/checkout@v4
1414
with:
1515
path: './src/github.com/segmentio/nsq-go'
1616
- run: echo "PATH=$GITHUB_WORKSPACE/bin:$PATH" >> $GITHUB_ENV
1717
- name: Run tests
1818
run: |
19-
docker-compose up -d
19+
docker compose up -d
2020
go vet ./...
2121
go run honnef.co/go/tools/cmd/staticcheck@latest ./...
2222
go test -race -v ./...

docker-compose.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
version: '2'
2-
31
# Use this docker-compose file to setup the test environment before running the
42
# tests.
53
services:

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/segmentio/nsq-go
22

3-
go 1.18
3+
go 1.24
44

55
require (
66
github.com/pkg/errors v0.9.1

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
2+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
23
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
4+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
35
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
46
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
57
github.com/segmentio/asm v1.1.3 h1:WM03sfUOENvvKexOLp+pCqgb/WDjsi7EK8gIsICtzhc=
@@ -21,7 +23,9 @@ golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
2123
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
2224
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2325
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
26+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
2427
gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM=
28+
gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
2529
gopkg.in/go-playground/mold.v2 v2.2.0 h1:Y4IYB4/HYQfuq43zaKh6vs9cVelLE9qbqe2fkyfCTWQ=
2630
gopkg.in/go-playground/mold.v2 v2.2.0/go.mod h1:XMyyRsGtakkDPbxXbrA5VODo6bUXyvoDjLd5l3T0XoA=
2731
gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY=

nsqlookup/engine_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@ const (
1515
tombTimeout = 50 * time.Millisecond
1616
)
1717

18-
func init() {
19-
rand.Seed(time.Now().UnixNano())
20-
}
21-
2218
func testEngine(t *testing.T, do func(context.Context, *testing.T, Engine)) {
2319
tests := []struct {
2420
Type string

producer.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type Producer struct {
6767
// producers.
6868
type ProducerRequest struct {
6969
Topic string
70-
Message []byte
70+
Messages [][]byte
7171
Response chan<- error
7272
Deadline time.Time
7373
}
@@ -158,7 +158,17 @@ func (p *Producer) Stop() {
158158
// first unsuccessful attempt to publish the message. It is the responsibility
159159
// of the caller to retry if necessary.
160160
func (p *Producer) Publish(message []byte) (err error) {
161-
return p.PublishTo(p.topic, message)
161+
return p.MultiPublishTo(p.topic, [][]byte{message})
162+
}
163+
164+
// MultiPublish sends message batch using the producer p, returning an error if it was
165+
// already closed or if an error occurred while publishing the messages.
166+
//
167+
// Note that no retry is done internally, the producer will fail after the
168+
// first unsuccessful attempt to publish the message. It is the responsibility
169+
// of the caller to retry if necessary.
170+
func (p *Producer) MultiPublish(message [][]byte) (err error) {
171+
return p.MultiPublishTo(p.topic, message)
162172
}
163173

164174
// PublishTo sends a message to a specific topic using the producer p, returning
@@ -169,6 +179,17 @@ func (p *Producer) Publish(message []byte) (err error) {
169179
// first unsuccessful attempt to publish the message. It is the responsibility
170180
// of the caller to retry if necessary.
171181
func (p *Producer) PublishTo(topic string, message []byte) (err error) {
182+
return p.MultiPublishTo(p.topic, [][]byte{message})
183+
}
184+
185+
// MultiPublishTo sends a message batch to a specific topic using the producer p, returning
186+
// an error if it was already closed or if an error occurred while publishing the
187+
// message.
188+
//
189+
// Note that no retry is done internally, the producer will fail after the
190+
// first unsuccessful attempt to publish the message. It is the responsibility
191+
// of the caller to retry if necessary.
192+
func (p *Producer) MultiPublishTo(topic string, messages [][]byte) (err error) {
172193
defer func() {
173194
if recover() != nil {
174195
err = errors.New("publishing to a producer that was already stopped")
@@ -186,7 +207,7 @@ func (p *Producer) PublishTo(topic string, message []byte) (err error) {
186207
// it up.
187208
p.reqs <- ProducerRequest{
188209
Topic: topic,
189-
Message: message,
210+
Messages: messages,
190211
Response: response,
191212
Deadline: deadline,
192213
}
@@ -297,7 +318,7 @@ func (p *Producer) run() {
297318
continue
298319
}
299320

300-
if err := p.publish(conn, req.Topic, req.Message); err != nil {
321+
if err := p.publish(conn, req.Topic, req.Messages); err != nil {
301322
req.complete(err)
302323
shutdown(err)
303324
continue
@@ -365,11 +386,21 @@ func (p *Producer) write(conn *Conn, cmd Command) (err error) {
365386
return
366387
}
367388

368-
func (p *Producer) publish(conn *Conn, topic string, message []byte) error {
369-
return p.write(conn, Pub{
370-
Topic: topic,
371-
Message: message,
372-
})
389+
func (p *Producer) publish(conn *Conn, topic string, messages [][]byte) error {
390+
switch len(messages) {
391+
case 0:
392+
return nil
393+
case 1:
394+
return p.write(conn, Pub{
395+
Topic: topic,
396+
Message: messages[0],
397+
})
398+
default:
399+
return p.write(conn, MPub{
400+
Topic: topic,
401+
Messages: messages,
402+
})
403+
}
373404
}
374405

375406
func (p *Producer) ping(conn *Conn) error {

producer_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,73 @@ func TestProducer(t *testing.T) {
6666
})
6767
}
6868
}
69+
70+
func TestProducerBatch(t *testing.T) {
71+
batchSize := 10
72+
for _, n := range []int{1, 10, 100, 1000} {
73+
count := n
74+
topic := fmt.Sprintf("test-publisher-batch-%d", n)
75+
t.Run(topic, func(t *testing.T) {
76+
t.Parallel()
77+
78+
c, _ := StartConsumer(ConsumerConfig{
79+
Topic: topic,
80+
Channel: "channel",
81+
Address: "localhost:4150",
82+
ReadTimeout: 1 * time.Minute,
83+
})
84+
defer c.Stop()
85+
86+
// Give some time for the consumer to connect.
87+
time.Sleep(100 * time.Millisecond)
88+
89+
p, _ := StartProducer(ProducerConfig{
90+
Address: "localhost:4150",
91+
Topic: topic,
92+
MaxConcurrency: 3,
93+
})
94+
defer p.Stop()
95+
n := 0
96+
97+
for i := 0; i != count; i++ {
98+
batch := make([][]byte, batchSize)
99+
for j := 0; j != batchSize; j++ {
100+
batch[j] = []byte(strconv.Itoa(n))
101+
n++
102+
}
103+
104+
if err := p.MultiPublish(batch); err != nil {
105+
t.Error(err)
106+
return
107+
}
108+
}
109+
110+
buckets := make([]int, count*batchSize)
111+
112+
deadline := time.NewTimer(10 * time.Second)
113+
defer deadline.Stop()
114+
115+
for i := 0; i != count*batchSize; {
116+
select {
117+
case msg := <-c.Messages():
118+
b, err := strconv.Atoi(string(msg.Body))
119+
if err != nil {
120+
t.Error(err)
121+
}
122+
buckets[b]++
123+
i++
124+
msg.Finish()
125+
case <-deadline.C:
126+
t.Error("timeout")
127+
return
128+
}
129+
}
130+
131+
for i, b := range buckets {
132+
if b != 1 {
133+
t.Errorf("bucket at index %d has value %d", i, b)
134+
}
135+
}
136+
})
137+
}
138+
}

0 commit comments

Comments
 (0)