Skip to content

Commit 75c12ba

Browse files
committed
nsqd: BenchmarkTopicMessagePump
1 parent 99e928b commit 75c12ba

File tree

1 file changed

+41
-0
lines changed

1 file changed

+41
-0
lines changed

nsqd/topic_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package nsqd
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"io/ioutil"
78
"net/http"
89
"os"
910
"runtime"
1011
"strconv"
12+
"sync"
1113
"testing"
1214
"time"
1315

@@ -273,3 +275,42 @@ func BenchmarkTopicToChannelPut(b *testing.B) {
273275
runtime.Gosched()
274276
}
275277
}
278+
279+
func BenchmarkTopicMessagePump(b *testing.B) {
280+
b.StopTimer()
281+
topicName := "bench_topic_put_throughput" + strconv.Itoa(b.N)
282+
opts := NewOptions()
283+
opts.Logger = test.NewTestLogger(b)
284+
opts.LogLevel = LOG_WARN
285+
opts.MemQueueSize = int64(b.N)
286+
_, _, nsqd := mustStartNSQD(opts)
287+
defer os.RemoveAll(opts.DataPath)
288+
defer nsqd.Exit()
289+
290+
topic := nsqd.GetOrCreateTopic(topicName)
291+
ch := topic.GetOrCreateChannel("ch")
292+
ctx, cancel := context.WithCancel(context.Background())
293+
294+
var wg sync.WaitGroup
295+
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
296+
wg.Add(1)
297+
go func() {
298+
defer wg.Done()
299+
for {
300+
select {
301+
case <-ch.memoryMsgChan:
302+
case <-ctx.Done():
303+
return
304+
}
305+
}
306+
}()
307+
}
308+
309+
b.StartTimer()
310+
for i := 0; i <= b.N; i++ {
311+
msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
312+
topic.PutMessage(msg)
313+
}
314+
cancel()
315+
wg.Wait()
316+
}

0 commit comments

Comments
 (0)