Skip to content

Commit 9cc5d8b

Browse files
committed
GetOrCreateTopic, GetOrCreateChannel: return *Obj, error
1 parent 2cb1881 commit 9cc5d8b

11 files changed

+127
-120
lines changed

nsqadmin/http_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ func TestHTTPChannelGET(t *testing.T) {
253253
defer nsqadmin1.Exit()
254254

255255
topicName := "test_channel_get" + strconv.Itoa(int(time.Now().Unix()))
256-
topic := nsqds[0].GetOrCreateTopic(topicName)
256+
topic, err := nsqds[0].GetOrCreateTopic(topicName)
257+
test.Nil(t, err)
257258
topic.GetOrCreateChannel("ch")
258259
time.Sleep(100 * time.Millisecond)
259260

@@ -292,7 +293,8 @@ func TestHTTPNodesSingleGET(t *testing.T) {
292293
defer nsqadmin1.Exit()
293294

294295
topicName := "test_nodes_single_get" + strconv.Itoa(int(time.Now().Unix()))
295-
topic := nsqds[0].GetOrCreateTopic(topicName)
296+
topic, err := nsqds[0].GetOrCreateTopic(topicName)
297+
test.Nil(t, err)
296298
topic.GetOrCreateChannel("ch")
297299
time.Sleep(100 * time.Millisecond)
298300

@@ -419,7 +421,8 @@ func TestHTTPDeleteChannelPOST(t *testing.T) {
419421
defer nsqadmin1.Exit()
420422

421423
topicName := "test_delete_channel_post" + strconv.Itoa(int(time.Now().Unix()))
422-
topic := nsqds[0].GetOrCreateTopic(topicName)
424+
topic, err := nsqds[0].GetOrCreateTopic(topicName)
425+
test.Nil(t, err)
423426
topic.GetOrCreateChannel("ch")
424427
time.Sleep(100 * time.Millisecond)
425428

@@ -474,7 +477,8 @@ func TestHTTPPauseChannelPOST(t *testing.T) {
474477
defer nsqadmin1.Exit()
475478

476479
topicName := "test_pause_channel_post" + strconv.Itoa(int(time.Now().Unix()))
477-
topic := nsqds[0].GetOrCreateTopic(topicName)
480+
topic, err := nsqds[0].GetOrCreateTopic(topicName)
481+
test.Nil(t, err)
478482
topic.GetOrCreateChannel("ch")
479483
time.Sleep(100 * time.Millisecond)
480484

@@ -509,7 +513,8 @@ func TestHTTPEmptyTopicPOST(t *testing.T) {
509513
defer nsqadmin1.Exit()
510514

511515
topicName := "test_empty_topic_post" + strconv.Itoa(int(time.Now().Unix()))
512-
topic := nsqds[0].GetOrCreateTopic(topicName)
516+
topic, err := nsqds[0].GetOrCreateTopic(topicName)
517+
test.Nil(t, err)
513518
topic.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))
514519
test.Equal(t, int64(1), topic.Depth())
515520
time.Sleep(100 * time.Millisecond)
@@ -537,7 +542,8 @@ func TestHTTPEmptyChannelPOST(t *testing.T) {
537542
defer nsqadmin1.Exit()
538543

539544
topicName := "test_empty_channel_post" + strconv.Itoa(int(time.Now().Unix()))
540-
topic := nsqds[0].GetOrCreateTopic(topicName)
545+
topic, err := nsqds[0].GetOrCreateTopic(topicName)
546+
test.Nil(t, err)
541547
channel := topic.GetOrCreateChannel("ch")
542548
channel.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))
543549

nsqd/channel_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ func TestPutMessage(t *testing.T) {
2121
defer nsqd.Exit()
2222

2323
topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
24-
topic := nsqd.GetOrCreateTopic(topicName)
25-
channel1 := topic.GetOrCreateChannel("ch")
24+
topic, _ := nsqd.GetOrCreateTopic(topicName)
25+
channel1, _ := topic.GetOrCreateChannel("ch")
2626

2727
var id MessageID
2828
msg := NewMessage(id, []byte("test"))
@@ -42,9 +42,9 @@ func TestPutMessage2Chan(t *testing.T) {
4242
defer nsqd.Exit()
4343

4444
topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix()))
45-
topic := nsqd.GetOrCreateTopic(topicName)
46-
channel1 := topic.GetOrCreateChannel("ch1")
47-
channel2 := topic.GetOrCreateChannel("ch2")
45+
topic, _ := nsqd.GetOrCreateTopic(topicName)
46+
channel1, _ := topic.GetOrCreateChannel("ch1")
47+
channel2, _ := topic.GetOrCreateChannel("ch2")
4848

4949
var id MessageID
5050
msg := NewMessage(id, []byte("test"))
@@ -71,8 +71,8 @@ func TestInFlightWorker(t *testing.T) {
7171
defer nsqd.Exit()
7272

7373
topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
74-
topic := nsqd.GetOrCreateTopic(topicName)
75-
channel := topic.GetOrCreateChannel("channel")
74+
topic, _ := nsqd.GetOrCreateTopic(topicName)
75+
channel, _ := topic.GetOrCreateChannel("channel")
7676

7777
for i := 0; i < count; i++ {
7878
msg := NewMessage(topic.GenerateID(), []byte("test"))
@@ -112,8 +112,8 @@ func TestChannelEmpty(t *testing.T) {
112112
defer nsqd.Exit()
113113

114114
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
115-
topic := nsqd.GetOrCreateTopic(topicName)
116-
channel := topic.GetOrCreateChannel("channel")
115+
topic, _ := nsqd.GetOrCreateTopic(topicName)
116+
channel, _ := topic.GetOrCreateChannel("channel")
117117

118118
msgs := make([]*Message, 0, 25)
119119
for i := 0; i < 25; i++ {
@@ -148,8 +148,8 @@ func TestChannelEmptyConsumer(t *testing.T) {
148148
defer conn.Close()
149149

150150
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
151-
topic := nsqd.GetOrCreateTopic(topicName)
152-
channel := topic.GetOrCreateChannel("channel")
151+
topic, _ := nsqd.GetOrCreateTopic(topicName)
152+
channel, _ := topic.GetOrCreateChannel("channel")
153153
client := newClientV2(0, conn, nsqd)
154154
client.SetReadyCount(25)
155155
err := channel.AddClient(client.ID, client)
@@ -186,8 +186,8 @@ func TestMaxChannelConsumers(t *testing.T) {
186186
defer conn.Close()
187187

188188
topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().Unix()))
189-
topic := nsqd.GetOrCreateTopic(topicName)
190-
channel := topic.GetOrCreateChannel("channel")
189+
topic, _ := nsqd.GetOrCreateTopic(topicName)
190+
channel, _ := topic.GetOrCreateChannel("channel")
191191

192192
client1 := newClientV2(1, conn, nsqd)
193193
client1.SetReadyCount(25)
@@ -209,9 +209,9 @@ func TestChannelHealth(t *testing.T) {
209209
defer os.RemoveAll(opts.DataPath)
210210
defer nsqd.Exit()
211211

212-
topic := nsqd.GetOrCreateTopic("test")
212+
topic, _ := nsqd.GetOrCreateTopic("test")
213213

214-
channel := topic.GetOrCreateChannel("channel")
214+
channel, _ := topic.GetOrCreateChannel("channel")
215215

216216
channel.backend = &errorBackendQueue{}
217217

@@ -258,8 +258,8 @@ func TestChannelDraining(t *testing.T) {
258258
defer nsqd.Exit()
259259

260260
topicName := "test_drain_channel" + strconv.Itoa(int(time.Now().Unix()))
261-
topic := nsqd.GetOrCreateTopic(topicName)
262-
channel1 := topic.GetOrCreateChannel("ch")
261+
topic, _ := nsqd.GetOrCreateTopic(topicName)
262+
channel1, _ := topic.GetOrCreateChannel("ch")
263263

264264
msg := NewMessage(topic.GenerateID(), []byte("test"))
265265
topic.PutMessage(msg)

nsqd/http.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, e
201201
if !protocol.IsValidTopicName(topicName) {
202202
return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
203203
}
204-
topic := s.nsqd.GetOrCreateTopic(topicName)
205-
if topic == nil {
204+
topic, err := s.nsqd.GetOrCreateTopic(topicName)
205+
if err != nil {
206206
return nil, nil, http_api.Err{503, "EXITING"}
207207
}
208208

@@ -416,8 +416,8 @@ func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, p
416416
if err != nil {
417417
return nil, err
418418
}
419-
ch := topic.GetOrCreateChannel(channelName)
420-
if ch == nil {
419+
_, err = topic.GetOrCreateChannel(channelName)
420+
if err != nil {
421421
return nil, http_api.Err{503, "EXITING"}
422422
}
423423
return nil, nil

nsqd/http_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestHTTPpub(t *testing.T) {
4646
defer nsqd.Exit()
4747

4848
topicName := "test_http_pub" + strconv.Itoa(int(time.Now().Unix()))
49-
topic := nsqd.GetOrCreateTopic(topicName)
49+
topic, _ := nsqd.GetOrCreateTopic(topicName)
5050

5151
buf := bytes.NewBuffer([]byte("test message"))
5252
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
@@ -69,7 +69,7 @@ func TestHTTPpubEmpty(t *testing.T) {
6969
defer nsqd.Exit()
7070

7171
topicName := "test_http_pub_empty" + strconv.Itoa(int(time.Now().Unix()))
72-
topic := nsqd.GetOrCreateTopic(topicName)
72+
topic, _ := nsqd.GetOrCreateTopic(topicName)
7373

7474
buf := bytes.NewBuffer([]byte(""))
7575
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
@@ -93,7 +93,7 @@ func TestHTTPmpub(t *testing.T) {
9393
defer nsqd.Exit()
9494

9595
topicName := "test_http_mpub" + strconv.Itoa(int(time.Now().Unix()))
96-
topic := nsqd.GetOrCreateTopic(topicName)
96+
topic, _ := nsqd.GetOrCreateTopic(topicName)
9797

9898
msg := []byte("test message")
9999
msgs := make([][]byte, 4)
@@ -122,7 +122,7 @@ func TestHTTPmpubEmpty(t *testing.T) {
122122
defer nsqd.Exit()
123123

124124
topicName := "test_http_mpub_empty" + strconv.Itoa(int(time.Now().Unix()))
125-
topic := nsqd.GetOrCreateTopic(topicName)
125+
topic, _ := nsqd.GetOrCreateTopic(topicName)
126126

127127
msg := []byte("test message")
128128
msgs := make([][]byte, 4)
@@ -153,7 +153,7 @@ func TestHTTPmpubBinary(t *testing.T) {
153153
defer nsqd.Exit()
154154

155155
topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
156-
topic := nsqd.GetOrCreateTopic(topicName)
156+
topic, _ := nsqd.GetOrCreateTopic(topicName)
157157

158158
mpub := make([][]byte, 5)
159159
for i := range mpub {
@@ -182,7 +182,7 @@ func TestHTTPmpubForNonNormalizedBinaryParam(t *testing.T) {
182182
defer nsqd.Exit()
183183

184184
topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
185-
topic := nsqd.GetOrCreateTopic(topicName)
185+
topic, _ := nsqd.GetOrCreateTopic(topicName)
186186

187187
mpub := make([][]byte, 5)
188188
for i := range mpub {
@@ -211,8 +211,8 @@ func TestHTTPpubDefer(t *testing.T) {
211211
defer nsqd.Exit()
212212

213213
topicName := "test_http_pub_defer" + strconv.Itoa(int(time.Now().Unix()))
214-
topic := nsqd.GetOrCreateTopic(topicName)
215-
ch := topic.GetOrCreateChannel("ch")
214+
topic, _ := nsqd.GetOrCreateTopic(topicName)
215+
ch, _ := topic.GetOrCreateChannel("ch")
216216

217217
buf := bytes.NewBuffer([]byte("test message"))
218218
url := fmt.Sprintf("http://%s/pub?topic=%s&defer=%d", httpAddr, topicName, 1000)
@@ -242,7 +242,7 @@ func TestHTTPSRequire(t *testing.T) {
242242
defer nsqd.Exit()
243243

244244
topicName := "test_http_pub_req" + strconv.Itoa(int(time.Now().Unix()))
245-
topic := nsqd.GetOrCreateTopic(topicName)
245+
topic, _ := nsqd.GetOrCreateTopic(topicName)
246246

247247
buf := bytes.NewBuffer([]byte("test message"))
248248
url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
@@ -289,7 +289,7 @@ func TestHTTPSRequireVerify(t *testing.T) {
289289

290290
httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
291291
topicName := "test_http_pub_req_verf" + strconv.Itoa(int(time.Now().Unix()))
292-
topic := nsqd.GetOrCreateTopic(topicName)
292+
topic, _ := nsqd.GetOrCreateTopic(topicName)
293293

294294
// no cert
295295
buf := bytes.NewBuffer([]byte("test message"))
@@ -353,7 +353,7 @@ func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
353353
defer nsqd.Exit()
354354

355355
topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
356-
topic := nsqd.GetOrCreateTopic(topicName)
356+
topic, _ := nsqd.GetOrCreateTopic(topicName)
357357

358358
// no cert
359359
buf := bytes.NewBuffer([]byte("test message"))
@@ -761,7 +761,7 @@ func TestEmptyChannel(t *testing.T) {
761761
test.Equal(t, 404, resp.StatusCode)
762762
test.HTTPError(t, resp, 404, "TOPIC_NOT_FOUND")
763763

764-
topic := nsqd.GetOrCreateTopic(topicName)
764+
topic, _ := nsqd.GetOrCreateTopic(topicName)
765765

766766
url = fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", httpAddr, topicName, channelName)
767767
resp, err = http.Post(url, "application/json", nil)

nsqd/nsqd.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,8 @@ func (n *NSQD) LoadMetadata() error {
357357
n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
358358
continue
359359
}
360-
topic := n.GetOrCreateTopic(t.Name)
361-
if topic == nil {
360+
topic, err := n.GetOrCreateTopic(t.Name)
361+
if err != nil {
362362
n.logf(LOG_WARN, "skipping creation of topic, nsqd draining %s", t.Name)
363363
continue
364364
}
@@ -370,8 +370,8 @@ func (n *NSQD) LoadMetadata() error {
370370
n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
371371
continue
372372
}
373-
channel := topic.GetOrCreateChannel(c.Name)
374-
if c.Paused && channel != nil {
373+
channel, err := topic.GetOrCreateChannel(c.Name)
374+
if c.Paused && err != nil {
375375
channel.Pause()
376376
}
377377
}
@@ -487,26 +487,26 @@ func (n *NSQD) Exit() {
487487

488488
// GetOrCreateTopic performs a thread safe operation to get an existing topic or create a new one
489489
//
490-
// The creation might fail if nsqd is draining
491-
func (n *NSQD) GetOrCreateTopic(topicName string) *Topic {
490+
// An error will be returned if nsqd is draining
491+
func (n *NSQD) GetOrCreateTopic(topicName string) (*Topic, error) {
492492
// most likely, we already have this topic, so try read lock first.
493493
n.RLock()
494494
t, ok := n.topicMap[topicName]
495495
n.RUnlock()
496496
if ok {
497-
return t
497+
return t, nil
498498
}
499499

500500
n.Lock()
501501

502502
t, ok = n.topicMap[topicName]
503503
if ok {
504504
n.Unlock()
505-
return t
505+
return t, nil
506506
}
507507
if atomic.LoadInt32(&n.isDraining) == 1 {
508508
// don't create new topics when nsqd is draining
509-
return nil
509+
return nil, errors.New("nsqd draining")
510510
}
511511

512512
deleteCallback := func(t *Topic) {
@@ -533,7 +533,7 @@ func (n *NSQD) GetOrCreateTopic(topicName string) *Topic {
533533

534534
// if loading metadata at startup, no lookupd connections yet, topic started after load
535535
if atomic.LoadInt32(&n.isLoading) == 1 {
536-
return t
536+
return t, nil
537537
}
538538

539539
// if using lookupd, make a blocking call to get the topics, and immediately create them.
@@ -556,7 +556,7 @@ func (n *NSQD) GetOrCreateTopic(topicName string) *Topic {
556556

557557
// now that all channels are added, start topic messagePump
558558
t.Start()
559-
return t
559+
return t, nil
560560
}
561561

562562
// GetExistingTopic gets a topic only if it exists

nsqd/nsqd_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ func TestStartup(t *testing.T) {
7373
atomic.StoreInt32(&nsqd.isLoading, 0)
7474

7575
body := make([]byte, 256)
76-
topic := nsqd.GetOrCreateTopic(topicName)
76+
topic, _ := nsqd.GetOrCreateTopic(topicName)
7777
for i := 0; i < iterations; i++ {
7878
msg := NewMessage(topic.GenerateID(), body)
7979
topic.PutMessage(msg)
8080
}
8181

8282
t.Logf("pulling from channel")
83-
channel1 := topic.GetOrCreateChannel("ch1")
83+
channel1, _ := topic.GetOrCreateChannel("ch1")
8484

8585
t.Logf("read %d msgs", iterations/2)
8686
for i := 0; i < iterations/2; i++ {
@@ -124,12 +124,12 @@ func TestStartup(t *testing.T) {
124124
doneExitChan <- 1
125125
}()
126126

127-
topic = nsqd.GetOrCreateTopic(topicName)
127+
topic, _ = nsqd.GetOrCreateTopic(topicName)
128128
// should be empty; channel should have drained everything
129129
count := topic.Depth()
130130
test.Equal(t, int64(0), count)
131131

132-
channel1 = topic.GetOrCreateChannel("ch1")
132+
channel1, _ = topic.GetOrCreateChannel("ch1")
133133

134134
for {
135135
if channel1.Depth() == int64(iterations/2) {
@@ -176,8 +176,8 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
176176
}()
177177

178178
body := []byte("an_ephemeral_message")
179-
topic := nsqd.GetOrCreateTopic(topicName)
180-
ephemeralChannel := topic.GetOrCreateChannel("ch1#ephemeral")
179+
topic, _ := nsqd.GetOrCreateTopic(topicName)
180+
ephemeralChannel, _ := topic.GetOrCreateChannel("ch1#ephemeral")
181181
client := newClientV2(0, nil, nsqd)
182182
err := ephemeralChannel.AddClient(client.ID, client)
183183
test.Equal(t, err, nil)
@@ -215,8 +215,8 @@ func TestPauseMetadata(t *testing.T) {
215215
// avoid concurrency issue of async PersistMetadata() calls
216216
atomic.StoreInt32(&nsqd.isLoading, 1)
217217
topicName := "pause_metadata" + strconv.Itoa(int(time.Now().Unix()))
218-
topic := nsqd.GetOrCreateTopic(topicName)
219-
channel := topic.GetOrCreateChannel("ch")
218+
topic, _ := nsqd.GetOrCreateTopic(topicName)
219+
channel, _ := topic.GetOrCreateChannel("ch")
220220
atomic.StoreInt32(&nsqd.isLoading, 0)
221221
nsqd.PersistMetadata()
222222

0 commit comments

Comments
 (0)