Skip to content

Commit 30f62c2

Browse files
committed
Merge branch 'release/v0.5.1'
2 parents d8efe1c + 8c9dad1 commit 30f62c2

File tree

8 files changed

+148
-38
lines changed

8 files changed

+148
-38
lines changed

group.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ UnexpectedEnd:
8686
}
8787

8888
func (g group) toString(rname string, tokens []string) string {
89+
if g == nil {
90+
return rname
91+
}
8992
l := len(g)
9093
if l == 0 {
91-
return rname
94+
return ""
9295
}
9396
if l == 1 && g[0].str != "" {
9497
return g[0].str

mux.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,15 @@ func (m *Mux) Handle(pattern string, hf ...Option) {
200200
// AddHandler register a handler for the given resource pattern.
201201
// The pattern used is the same as described for Handle.
202202
func (m *Mux) AddHandler(pattern string, hs Handler) {
203+
var g group
204+
if hs.Parallel {
205+
g = []gpart{}
206+
} else {
207+
g = parseGroup(hs.Group, pattern)
208+
}
203209
h := regHandler{
204210
Handler: hs,
205-
group: parseGroup(hs.Group, pattern),
211+
group: g,
206212
}
207213

208214
m.add(pattern, &h)

restest/assert.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
// AssertEqualJSON expects that a and b json marshals into equal values, and
1313
// returns true if they do, otherwise logs a fatal error and returns false.
1414
func AssertEqualJSON(t *testing.T, name string, result, expected interface{}, ctx ...interface{}) bool {
15-
aa, aj := jsonMap(t, result)
16-
bb, bj := jsonMap(t, expected)
15+
aa, aj := jsonMap(result)
16+
bb, bj := jsonMap(expected)
1717

1818
if !reflect.DeepEqual(aa, bb) {
1919
t.Fatalf("expected %s to be:\n\t%s\nbut got:\n\t%s%s", name, bj, aj, ctxString(ctx))
@@ -117,7 +117,7 @@ func ctxString(ctx []interface{}) string {
117117
return "\nin " + fmt.Sprint(ctx...)
118118
}
119119

120-
func jsonMap(t *testing.T, v interface{}) (interface{}, []byte) {
120+
func jsonMap(v interface{}) (interface{}, []byte) {
121121
var err error
122122
j, err := json.Marshal(v)
123123
if err != nil {

restest/natsrequest.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ type NATSRequest struct {
88
inb string
99
}
1010

11+
// NATSRequests represents a slice of requests sent over NATS to the service,
12+
// but that may get responses in undetermined order.
13+
type NATSRequests []*NATSRequest
14+
1115
// Response gets the next pending message that is published to NATS by the
1216
// service.
1317
//
@@ -19,6 +23,26 @@ func (nr *NATSRequest) Response() *Msg {
1923
return m
2024
}
2125

26+
// Response gets the next pending message that is published to NATS by the
27+
// service, and matches it to one of the requests.
28+
//
29+
// If no message is received within a set amount of time, or if the message is
30+
// not a response to one of the requests, it will log it as a fatal error.
31+
//
32+
// The matching request will be set to nil.
33+
func (nrs NATSRequests) Response(c *MockConn) *Msg {
34+
m := c.GetMsg()
35+
for i := 0; i < len(nrs); i++ {
36+
nr := nrs[i]
37+
if nr != nil && nr.inb == m.Subject {
38+
nrs[i] = nil
39+
return m
40+
}
41+
}
42+
c.t.Fatalf("expected to find request matching response %s, but found none", m.Subject)
43+
return nil
44+
}
45+
2246
// Get sends a get request to the service.
2347
//
2448
// The resource ID, rid, may contain a query part:

service.go

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ type Handler struct {
137137
// resource name will be used as identifier.
138138
Group string
139139

140+
// Parallel is a flag telling that all requests to the handler may be
141+
// handled in parallel on different goroutines. If set to true, any value in
142+
// Group will be ignored.
143+
Parallel bool
144+
140145
// OnRegister is callback that is to be call when the handler has been
141146
// registered to a service.
142147
//
@@ -196,7 +201,9 @@ type Service struct {
196201
nc Conn // NATS Server connection
197202
inCh chan *nats.Msg // Channel for incoming nats messages
198203
rwork map[string]*work // map of resource work
199-
workCh chan *work // Resource work channel, listened to by the workers
204+
workqueue []*work // Resource work queue.
205+
workbuf []*work // Underlying buffer of the workqueue
206+
workcond sync.Cond // Cond waited on by workers and signaled when work is added to workqueue
200207
wg sync.WaitGroup // WaitGroup for all workers
201208
mu sync.Mutex // Mutex to protect rwork map
202209
logger logger.Logger // Logger
@@ -517,6 +524,16 @@ func Group(group string) Option {
517524
})
518525
}
519526

527+
// Parallel sets the parallel flag. All requests for the handler may be handled
528+
// in parallel on different worker goroutines.
529+
//
530+
// If set to true, any value in Group will be ignored.
531+
func Parallel(parallel bool) Option {
532+
return OptionFunc(func(hs *Handler) {
533+
hs.Parallel = parallel
534+
})
535+
}
536+
520537
// OnRegister sets a callback to be called when the handler is registered to a
521538
// service.
522539
//
@@ -648,14 +665,16 @@ func (s *Service) serve(nc Conn) error {
648665
workCh := make(chan *work, 1)
649666
s.nc = nc
650667
s.inCh = inCh
651-
s.workCh = workCh
652-
s.rwork = make(map[string]*work)
668+
s.workcond = sync.Cond{L: &s.mu}
669+
s.workbuf = make([]*work, s.inChannelSize)
670+
s.workqueue = s.workbuf[:0]
671+
s.rwork = make(map[string]*work, s.inChannelSize)
653672
s.queryTQ = timerqueue.New(s.queryEventExpire, s.queryDuration)
654673

655674
// Start workers
656675
s.wg.Add(s.workerCount)
657676
for i := 0; i < s.workerCount; i++ {
658-
go s.startWorker(s.workCh)
677+
go s.startWorker()
659678
}
660679

661680
atomic.StoreInt32(&s.state, stateStarted)
@@ -699,7 +718,6 @@ func (s *Service) Shutdown() error {
699718

700719
s.inCh = nil
701720
s.nc = nil
702-
s.workCh = nil
703721

704722
atomic.StoreInt32(&s.state, stateStopped)
705723

@@ -709,6 +727,11 @@ func (s *Service) Shutdown() error {
709727

710728
// close calls Close on the NATS connection, and closes the incoming channel
711729
func (s *Service) close() {
730+
s.mu.Lock()
731+
s.workqueue = nil
732+
s.mu.Unlock()
733+
s.workcond.Broadcast()
734+
712735
s.nc.Close()
713736
close(s.inCh)
714737
}
@@ -956,17 +979,25 @@ func (s *Service) runWith(wid string, cb func()) {
956979

957980
s.mu.Lock()
958981
// Get current work queue for the resource
959-
w, ok := s.rwork[wid]
982+
var w *work
983+
var ok bool
984+
if wid != "" {
985+
w, ok = s.rwork[wid]
986+
}
960987
if !ok {
961988
// Create a new work queue and pass it to a worker
962989
w = &work{
963-
s: s,
964-
wid: wid,
965-
queue: []func(){cb},
990+
s: s,
991+
wid: wid,
992+
single: [1]func(){cb},
993+
}
994+
w.queue = w.single[:1]
995+
if wid != "" {
996+
s.rwork[wid] = w
966997
}
967-
s.rwork[wid] = w
998+
s.workqueue = append(s.workqueue, w)
968999
s.mu.Unlock()
969-
s.workCh <- w
1000+
s.workcond.Signal()
9701001
} else {
9711002
// Append callback to existing work queue
9721003
w.queue = append(w.queue, cb)

test/00service_test.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,7 @@ func TestServiceSetOnServe_ValidCallback_IsCalledOnServe(t *testing.T) {
203203
select {
204204
case <-ch:
205205
case <-time.After(timeoutDuration):
206-
if t == nil {
207-
t.Fatal("expected OnServe callback to be called, but it wasn't")
208-
}
206+
t.Fatal("expected OnServe callback to be called, but it wasn't")
209207
}
210208
})
211209
}
@@ -257,9 +255,7 @@ func TestServiceWithResource_WithMatchingResource_CallsCallback(t *testing.T) {
257255
select {
258256
case <-ch:
259257
case <-time.After(timeoutDuration):
260-
if t == nil {
261-
t.Fatal("expected WithResource callback to be called, but it wasn't")
262-
}
258+
t.Fatal("expected WithResource callback to be called, but it wasn't")
263259
}
264260
})
265261
}
@@ -276,9 +272,7 @@ func TestServiceWithGroup_WithMatchingResource_CallsCallback(t *testing.T) {
276272
select {
277273
case <-ch:
278274
case <-time.After(timeoutDuration):
279-
if t == nil {
280-
t.Fatal("expected WithGroup callback to be called, but it wasn't")
281-
}
275+
t.Fatal("expected WithGroup callback to be called, but it wasn't")
282276
}
283277
})
284278
}
@@ -380,3 +374,41 @@ func TestServiceSetInChannelSize_GreaterThanZero_DoesNotPanic(t *testing.T) {
380374
s.SetInChannelSize(10)
381375
}, nil, restest.WithoutReset)
382376
}
377+
378+
func TestServiceWithParallel_WithMultipleCallsOnSameResource_CallsCallbacksInParallel(t *testing.T) {
379+
ch := make(chan bool)
380+
done := make(chan bool)
381+
runTest(t, func(s *res.Service) {
382+
s.Handle("model",
383+
res.Parallel(true),
384+
res.GetResource(func(r res.GetRequest) {
385+
ch <- true
386+
<-done
387+
r.NotFound()
388+
}),
389+
)
390+
}, func(s *restest.Session) {
391+
// Test getting the same model twice
392+
reqs := restest.NATSRequests{
393+
s.Get("test.model"),
394+
s.Get("test.model"),
395+
s.Get("test.model"),
396+
}
397+
398+
for i := 0; i < len(reqs); i++ {
399+
select {
400+
case <-ch:
401+
case <-time.After(timeoutDuration):
402+
t.Fatal("expected get handler to be called twice in parallel, but it wasn't")
403+
}
404+
}
405+
406+
close(done)
407+
408+
for i := len(reqs); i > 0; i-- {
409+
reqs.
410+
Response(s.MockConn).
411+
AssertError(res.ErrNotFound)
412+
}
413+
})
414+
}

test/22query_event_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,7 @@ func TestInvalidQueryResponse(t *testing.T) {
580580
select {
581581
case <-ch:
582582
case <-time.After(timeoutDuration):
583-
if t == nil {
584-
t.Fatal("expected query request to get a query response, but it timed out")
585-
}
583+
t.Fatal("expected query request to get a query response, but it timed out")
586584
}
587585
if t.Failed() {
588586
t.Logf("failed on test idx %d", i)

worker.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,49 @@
11
package res
22

33
type work struct {
4-
s *Service
5-
wid string // Worker ID for the work queue
6-
queue []func() // Callback queue
4+
s *Service
5+
wid string // Worker ID for the work queue
6+
single [1]func()
7+
queue []func() // Callback queue
78
}
89

910
// startWorker starts a new resource worker that will listen for resources to
1011
// process requests on.
11-
func (s *Service) startWorker(ch chan *work) {
12-
for w := range ch {
12+
func (s *Service) startWorker() {
13+
s.mu.Lock()
14+
defer s.mu.Unlock()
15+
defer s.wg.Done()
16+
// workqueue being nil signals we the service is closing
17+
for s.workqueue != nil {
18+
for len(s.workqueue) == 0 {
19+
s.workcond.Wait()
20+
if s.workqueue == nil {
21+
return
22+
}
23+
}
24+
w := s.workqueue[0]
25+
if len(s.workqueue) == 1 {
26+
s.workqueue = s.workbuf[:0]
27+
} else {
28+
s.workqueue = s.workqueue[1:]
29+
}
1330
w.processQueue()
1431
}
15-
s.wg.Done()
1632
}
1733

1834
func (w *work) processQueue() {
1935
var f func()
2036
idx := 0
2137

22-
w.s.mu.Lock()
2338
for len(w.queue) > idx {
2439
f = w.queue[idx]
2540
w.s.mu.Unlock()
2641
idx++
2742
f()
2843
w.s.mu.Lock()
2944
}
30-
// Work complete
31-
delete(w.s.rwork, w.wid)
32-
w.s.mu.Unlock()
45+
// Work complete. Delete if it has a work ID.
46+
if w.wid != "" {
47+
delete(w.s.rwork, w.wid)
48+
}
3349
}

0 commit comments

Comments
 (0)