Skip to content
This repository was archived by the owner on Dec 14, 2021. It is now read-only.

Commit 986b4b5

Browse files
committed
Re-implement Discovery Client
1 parent 9955a22 commit 986b4b5

29 files changed

+838
-660
lines changed

Makefile

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ttnctlpkg = ttnctl-$(GOOS)-$(GOARCH)
3131
ttnbin = $(ttnpkg)$(GOEXE)
3232
ttnctlbin = $(ttnctlpkg)$(GOEXE)
3333

34-
.PHONY: all clean deps update-deps test-deps proto-deps dev-deps cover-deps proto test fmt vet cover coveralls build install docker package
34+
.PHONY: all clean deps update-deps test-deps dev-deps cover-deps proto test fmt vet cover coveralls build install docker package
3535

3636
all: clean deps build package
3737

@@ -44,11 +44,11 @@ update-deps:
4444
test-deps:
4545
$(GOCMD) get -d -v $(TEST_DEPS)
4646

47-
proto-deps:
48-
$(GOCMD) get -v github.com/gogo/protobuf/protoc-gen-gofast
49-
50-
dev-deps: update-deps proto-deps test-deps
51-
$(GOCMD) get -v github.com/ddollar/forego
47+
dev-deps: update-deps test-deps
48+
$(GOCMD) get -u -v github.com/gogo/protobuf/protoc-gen-gofast
49+
$(GOCMD) get -u -v github.com/golang/mock/gomock
50+
$(GOCMD) get -u -v github.com/golang/mock/mockgen
51+
$(GOCMD) get -u -v github.com/ddollar/forego
5252

5353
cover-deps:
5454
if ! $(GOCMD) get github.com/golang/tools/cmd/cover; then $(GOCMD) get golang.org/x/tools/cmd/cover; fi
@@ -66,6 +66,10 @@ proto:
6666
@$(PROTOC)/api/discovery/discovery.proto
6767
@$(PROTOC)/api/noc/noc.proto
6868

69+
mocks:
70+
mockgen -source=./api/networkserver/networkserver.pb.go -package networkserver NetworkServerClient > api/networkserver/networkserver_mock.go
71+
mockgen -source=./api/discovery/client.go -package discovery NetworkServerClient > api/discovery/client_mock.go
72+
6973
test:
7074
$(select_pkgs) | xargs $(GOCMD) test
7175

api/discovery/announcement.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package discovery
55

66
import (
7+
"bytes"
78
"encoding/json"
89
"fmt"
910
)
@@ -102,3 +103,27 @@ func (announcement *Announcement) parseProperty(property string, value string) e
102103
}
103104
return nil
104105
}
106+
107+
// AddMetadata adds metadata to the announcement if it doesn't exist
108+
func (announcement *Announcement) AddMetadata(key Metadata_Key, value []byte) {
109+
for _, meta := range announcement.Metadata {
110+
if meta.Key == key && bytes.Equal(value, meta.Value) {
111+
return
112+
}
113+
}
114+
announcement.Metadata = append(announcement.Metadata, &Metadata{
115+
Key: key,
116+
Value: value,
117+
})
118+
}
119+
120+
// DeleteMetadata deletes metadata from the announcement if it exists
121+
func (announcement *Announcement) DeleteMetadata(key Metadata_Key, value []byte) {
122+
newMeta := make([]*Metadata, 0, len(announcement.Metadata))
123+
for _, meta := range announcement.Metadata {
124+
if !(meta.Key == key && bytes.Equal(value, meta.Value)) {
125+
newMeta = append(newMeta, meta)
126+
}
127+
}
128+
announcement.Metadata = newMeta
129+
}

api/discovery/client.go

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
// Copyright © 2016 The Things Network
2+
// Use of this source code is governed by the MIT license that can be found in the LICENSE file.
3+
4+
package discovery
5+
6+
// Client is used to manage applications and devices on a handler
7+
import (
8+
"fmt"
9+
"sync"
10+
"time"
11+
12+
"github.com/TheThingsNetwork/ttn/core/types"
13+
"github.com/bluele/gcache"
14+
15+
"golang.org/x/net/context"
16+
17+
"google.golang.org/grpc"
18+
"google.golang.org/grpc/metadata"
19+
)
20+
21+
// CacheSize indicates the number of components that are cached
22+
var CacheSize = 1000
23+
24+
// CacheExpiration indicates the time a cached item is valid
25+
var CacheExpiration = 5 * time.Minute
26+
27+
type Client interface {
28+
Announce(token string) error
29+
GetAll(serviceName string) ([]*Announcement, error)
30+
Get(serviceName, id string) (*Announcement, error)
31+
AddMetadata(key Metadata_Key, value []byte, token string) error
32+
DeleteMetadata(key Metadata_Key, value []byte, token string) error
33+
GetAllForMetadata(serviceName string, key Metadata_Key, matchFunc func(value []byte) bool) ([]*Announcement, error)
34+
GetAllBrokersForDevAddr(devAddr types.DevAddr) ([]*Announcement, error)
35+
GetAllHandlersForAppID(appID string) ([]*Announcement, error)
36+
}
37+
38+
// NewClient returns a new Client
39+
func NewClient(conn *grpc.ClientConn, announcement *Announcement, tokenFunc func() string) Client {
40+
client := &DefaultClient{
41+
lists: make(map[string][]*Announcement),
42+
listsUpdated: make(map[string]time.Time),
43+
self: announcement,
44+
tokenFunc: tokenFunc,
45+
conn: conn,
46+
client: NewDiscoveryClient(conn),
47+
}
48+
client.cache = gcache.
49+
New(CacheSize).
50+
Expiration(CacheExpiration).
51+
ARC().
52+
LoaderFunc(func(k interface{}) (interface{}, error) {
53+
key, ok := k.(cacheKey)
54+
if !ok {
55+
return nil, fmt.Errorf("wrong type for cacheKey: %T", k)
56+
}
57+
return client.get(key.serviceName, key.id)
58+
}).
59+
Build()
60+
return client
61+
}
62+
63+
// DefaultClient is a wrapper around DiscoveryClient
64+
type DefaultClient struct {
65+
sync.Mutex
66+
cache gcache.Cache
67+
listsUpdated map[string]time.Time
68+
lists map[string][]*Announcement
69+
self *Announcement
70+
tokenFunc func() string
71+
conn *grpc.ClientConn
72+
client DiscoveryClient
73+
}
74+
75+
type cacheKey struct {
76+
serviceName string
77+
id string
78+
}
79+
80+
func (c *DefaultClient) getContext(token string) context.Context {
81+
if token == "" {
82+
token = c.tokenFunc()
83+
}
84+
md := metadata.Pairs(
85+
"service-name", c.self.ServiceName,
86+
"id", c.self.Id,
87+
"token", token,
88+
"net-address", c.self.NetAddress,
89+
)
90+
ctx := metadata.NewContext(context.Background(), md)
91+
return ctx
92+
}
93+
94+
func (c *DefaultClient) get(serviceName, id string) (*Announcement, error) {
95+
res, err := c.client.Get(c.getContext(""), &GetRequest{
96+
ServiceName: serviceName,
97+
Id: id,
98+
})
99+
if err != nil {
100+
return nil, err
101+
}
102+
return res, nil
103+
}
104+
105+
func (c *DefaultClient) getAll(serviceName string) ([]*Announcement, error) {
106+
res, err := c.client.GetAll(c.getContext(""), &GetAllRequest{ServiceName: serviceName})
107+
if err != nil {
108+
return nil, err
109+
}
110+
c.lists[serviceName] = res.Services
111+
c.listsUpdated[serviceName] = time.Now()
112+
for _, announcement := range res.Services {
113+
c.cache.Set(&cacheKey{serviceName: announcement.ServiceName, id: announcement.Id}, announcement)
114+
}
115+
return res.Services, nil
116+
}
117+
118+
// Announce announces the configured announcement to the discovery server
119+
func (c *DefaultClient) Announce(token string) error {
120+
_, err := c.client.Announce(c.getContext(token), c.self)
121+
return err
122+
}
123+
124+
// GetAll returns all services of the given service type
125+
func (c *DefaultClient) GetAll(serviceName string) ([]*Announcement, error) {
126+
c.Lock()
127+
defer c.Unlock()
128+
129+
// If list initialized, return cached version
130+
if list, ok := c.lists[serviceName]; ok && len(list) > 0 {
131+
// And update if expired
132+
if c.listsUpdated[serviceName].Add(CacheExpiration).After(time.Now()) {
133+
go func() {
134+
c.Lock()
135+
defer c.Unlock()
136+
c.getAll(serviceName)
137+
}()
138+
}
139+
return list, nil
140+
}
141+
142+
// If list not initialized, do request
143+
return c.getAll(serviceName)
144+
}
145+
146+
// Get returns the (cached) service annoucement for the given service type and id
147+
func (c *DefaultClient) Get(serviceName, id string) (*Announcement, error) {
148+
res, err := c.cache.Get(cacheKey{serviceName, id})
149+
if err != nil {
150+
return nil, err
151+
}
152+
return res.(*Announcement), nil
153+
}
154+
155+
// AddMetadata publishes metadata for the current component to the Discovery server
156+
func (c *DefaultClient) AddMetadata(key Metadata_Key, value []byte, token string) error {
157+
_, err := c.client.AddMetadata(c.getContext(token), &MetadataRequest{
158+
ServiceName: c.self.ServiceName,
159+
Id: c.self.Id,
160+
Metadata: &Metadata{
161+
Key: key,
162+
Value: value,
163+
},
164+
})
165+
return err
166+
}
167+
168+
// DeleteMetadata deletes metadata for the current component from the Discovery server
169+
func (c *DefaultClient) DeleteMetadata(key Metadata_Key, value []byte, token string) error {
170+
_, err := c.client.DeleteMetadata(c.getContext(token), &MetadataRequest{
171+
ServiceName: c.self.ServiceName,
172+
Id: c.self.Id,
173+
Metadata: &Metadata{
174+
Key: key,
175+
Value: value,
176+
},
177+
})
178+
return err
179+
}
180+
181+
// GetAllForMetadata returns all annoucements of given type that contain given metadata and match the given function
182+
func (c *DefaultClient) GetAllForMetadata(serviceName string, key Metadata_Key, matchFunc func(value []byte) bool) ([]*Announcement, error) {
183+
announcements, err := c.GetAll(serviceName)
184+
if err != nil {
185+
return nil, err
186+
}
187+
res := make([]*Announcement, 0, len(announcements))
188+
nextAnnouncement:
189+
for _, announcement := range announcements {
190+
for _, meta := range announcement.Metadata {
191+
if meta.Key == key && matchFunc(meta.Value) {
192+
res = append(res, announcement)
193+
continue nextAnnouncement
194+
}
195+
}
196+
}
197+
return res, nil
198+
}
199+
200+
// GetAllBrokersForDevAddr returns all brokers that can handle the given DevAddr
201+
func (c *DefaultClient) GetAllBrokersForDevAddr(devAddr types.DevAddr) ([]*Announcement, error) {
202+
return c.GetAllForMetadata("broker", Metadata_PREFIX, func(value []byte) bool {
203+
if len(value) != 5 {
204+
return false
205+
}
206+
var prefix types.DevAddrPrefix
207+
copy(prefix.DevAddr[:], value[1:])
208+
prefix.Length = int(value[0])
209+
return devAddr.HasPrefix(prefix)
210+
})
211+
}
212+
213+
// GetAllHandlersForAppID returns all handlers that can handle the given AppID
214+
func (c *DefaultClient) GetAllHandlersForAppID(appID string) ([]*Announcement, error) {
215+
return c.GetAllForMetadata("handler", Metadata_APP_ID, func(value []byte) bool {
216+
return string(value) == appID
217+
})
218+
}

0 commit comments

Comments
 (0)