Skip to content

Commit f8e3863

Browse files
support batch
1 parent 79edaca commit f8e3863

File tree

2 files changed

+110
-9
lines changed

2 files changed

+110
-9
lines changed

producer.go

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type Producer struct {
6767
// producers.
6868
type ProducerRequest struct {
6969
Topic string
70-
Message []byte
70+
Messages [][]byte
7171
Response chan<- error
7272
Deadline time.Time
7373
}
@@ -158,7 +158,17 @@ func (p *Producer) Stop() {
158158
// first unsuccessful attempt to publish the message. It is the responsibility
159159
// of the caller to retry if necessary.
160160
func (p *Producer) Publish(message []byte) (err error) {
161-
return p.PublishTo(p.topic, message)
161+
return p.MultiPublishTo(p.topic, [][]byte{message})
162+
}
163+
164+
// MultiPublish sends message batch using the producer p, returning an error if it was
165+
// already closed or if an error occurred while publishing the messages.
166+
//
167+
// Note that no retry is done internally, the producer will fail after the
168+
// first unsuccessful attempt to publish the message. It is the responsibility
169+
// of the caller to retry if necessary.
170+
func (p *Producer) MultiPublish(message [][]byte) (err error) {
171+
return p.MultiPublishTo(p.topic, message)
162172
}
163173

164174
// PublishTo sends a message to a specific topic using the producer p, returning
@@ -169,6 +179,17 @@ func (p *Producer) Publish(message []byte) (err error) {
169179
// first unsuccessful attempt to publish the message. It is the responsibility
170180
// of the caller to retry if necessary.
171181
func (p *Producer) PublishTo(topic string, message []byte) (err error) {
182+
return p.MultiPublishTo(p.topic, [][]byte{message})
183+
}
184+
185+
// MultiPublishTo sends a message batch to a specific topic using the producer p, returning
186+
// an error if it was already closed or if an error occurred while publishing the
187+
// message.
188+
//
189+
// Note that no retry is done internally, the producer will fail after the
190+
// first unsuccessful attempt to publish the message. It is the responsibility
191+
// of the caller to retry if necessary.
192+
func (p *Producer) MultiPublishTo(topic string, messages [][]byte) (err error) {
172193
defer func() {
173194
if recover() != nil {
174195
err = errors.New("publishing to a producer that was already stopped")
@@ -186,7 +207,7 @@ func (p *Producer) PublishTo(topic string, message []byte) (err error) {
186207
// it up.
187208
p.reqs <- ProducerRequest{
188209
Topic: topic,
189-
Message: message,
210+
Messages: messages,
190211
Response: response,
191212
Deadline: deadline,
192213
}
@@ -297,7 +318,7 @@ func (p *Producer) run() {
297318
continue
298319
}
299320

300-
if err := p.publish(conn, req.Topic, req.Message); err != nil {
321+
if err := p.publish(conn, req.Topic, req.Messages); err != nil {
301322
req.complete(err)
302323
shutdown(err)
303324
continue
@@ -365,11 +386,21 @@ func (p *Producer) write(conn *Conn, cmd Command) (err error) {
365386
return
366387
}
367388

368-
func (p *Producer) publish(conn *Conn, topic string, message []byte) error {
369-
return p.write(conn, Pub{
370-
Topic: topic,
371-
Message: message,
372-
})
389+
func (p *Producer) publish(conn *Conn, topic string, messages [][]byte) error {
390+
switch len(messages) {
391+
case 0:
392+
return nil
393+
case 1:
394+
return p.write(conn, Pub{
395+
Topic: topic,
396+
Message: messages[0],
397+
})
398+
default:
399+
return p.write(conn, MPub{
400+
Topic: topic,
401+
Messages: messages,
402+
})
403+
}
373404
}
374405

375406
func (p *Producer) ping(conn *Conn) error {

producer_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,73 @@ func TestProducer(t *testing.T) {
6666
})
6767
}
6868
}
69+
70+
func TestProducerBatch(t *testing.T) {
71+
batchSize := 10
72+
for _, n := range []int{1, 10, 100, 1000} {
73+
count := n
74+
topic := fmt.Sprintf("test-publisher-batch-%d", n)
75+
t.Run(topic, func(t *testing.T) {
76+
t.Parallel()
77+
78+
c, _ := StartConsumer(ConsumerConfig{
79+
Topic: topic,
80+
Channel: "channel",
81+
Address: "localhost:4150",
82+
ReadTimeout: 1 * time.Minute,
83+
})
84+
defer c.Stop()
85+
86+
// Give some time for the consumer to connect.
87+
time.Sleep(100 * time.Millisecond)
88+
89+
p, _ := StartProducer(ProducerConfig{
90+
Address: "localhost:4150",
91+
Topic: topic,
92+
MaxConcurrency: 3,
93+
})
94+
defer p.Stop()
95+
n := 0
96+
97+
for i := 0; i != count; i++ {
98+
batch := make([][]byte, batchSize)
99+
for j := 0; j != batchSize; j++ {
100+
batch[j] = []byte(strconv.Itoa(n))
101+
n++
102+
}
103+
104+
if err := p.MultiPublish(batch); err != nil {
105+
t.Error(err)
106+
return
107+
}
108+
}
109+
110+
buckets := make([]int, count*batchSize)
111+
112+
deadline := time.NewTimer(10 * time.Second)
113+
defer deadline.Stop()
114+
115+
for i := 0; i != count*batchSize; {
116+
select {
117+
case msg := <-c.Messages():
118+
b, err := strconv.Atoi(string(msg.Body))
119+
if err != nil {
120+
t.Error(err)
121+
}
122+
buckets[b]++
123+
i++
124+
msg.Finish()
125+
case <-deadline.C:
126+
t.Error("timeout")
127+
return
128+
}
129+
}
130+
131+
for i, b := range buckets {
132+
if b != 1 {
133+
t.Errorf("bucket at index %d has value %d", i, b)
134+
}
135+
}
136+
})
137+
}
138+
}

0 commit comments

Comments
 (0)