Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 08dc3c9

Browse files
committedApr 11, 2021
Publish ConnState message on gateway (un)subscribe.
This introduces a new topic hierarchy gateway/ID/state/STATE for publishing state messages. State messages are published with the retained flag. The ConnState message is published to gateway/ID/state/conn and contains the state ONLINE or OFFLINE.
1 parent 5f22f6b commit 08dc3c9

File tree

15 files changed

+342
-19
lines changed

15 files changed

+342
-19
lines changed
 

‎cmd/chirpstack-gateway-bridge/cmd/configfile.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@ marshaler="{{ .Integration.Marshaler }}"
221221
# Event topic template.
222222
event_topic_template="{{ .Integration.MQTT.EventTopicTemplate }}"
223223
224+
# State topic template.
225+
#
226+
# States are sent by the gateway as retained MQTT messages so that the last
227+
# message will be stored by the MQTT broker. When set to a blank string, this
228+
# feature will be disabled. This feature is only supported when using the
229+
# generic authentication type.
230+
state_topic_template="{{ .Integration.MQTT.StateTopicTemplate }}"
231+
224232
# Command topic template.
225233
command_topic_template="{{ .Integration.MQTT.CommandTopicTemplate }}"
226234

‎cmd/chirpstack-gateway-bridge/cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ func init() {
5757
viper.SetDefault("integration.mqtt.auth.type", "generic")
5858

5959
viper.SetDefault("integration.mqtt.event_topic_template", "gateway/{{ .GatewayID }}/event/{{ .EventType }}")
60+
viper.SetDefault("integration.mqtt.state_topic_template", "gateway/{{ .GatewayID }}/state/{{ .StateType }}")
6061
viper.SetDefault("integration.mqtt.command_topic_template", "gateway/{{ .GatewayID }}/command/#")
6162
viper.SetDefault("integration.mqtt.keep_alive", 30*time.Second)
6263
viper.SetDefault("integration.mqtt.max_reconnect_interval", time.Minute)

‎cmd/chirpstack-gateway-bridge/cmd/root_run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ func run(cmd *cobra.Command, args []string) error {
4747
log.WithField("signal", <-sigChan).Info("signal received")
4848
log.Warning("shutting down server")
4949

50+
integration.GetIntegration().Stop()
51+
5052
return nil
5153
}
5254

‎go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/brocaar/chirpstack-gateway-bridge
33
go 1.16
44

55
require (
6-
github.com/brocaar/chirpstack-api/go/v3 v3.8.1
6+
github.com/brocaar/chirpstack-api/go/v3 v3.9.7
77
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303
88
github.com/dgrijalva/jwt-go v3.2.0+incompatible
99
github.com/eclipse/paho.mqtt.golang v1.3.0

‎go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
5151
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
5252
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI=
5353
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI=
54-
github.com/brocaar/chirpstack-api/go/v3 v3.8.1 h1:8xNpG/GZqiL8XYkkAUWYNZJu7yn5SamK6oPBx1hCQe0=
55-
github.com/brocaar/chirpstack-api/go/v3 v3.8.1/go.mod h1:ex/wqXQaClwDMa2zDN6crp9ZiMGc1GMVQhjxiB+OJcg=
54+
github.com/brocaar/chirpstack-api/go/v3 v3.9.7 h1:n5Zte6zIg+qbqtb4dwp3vGQwIXpXsk5nMR4WwMUcLgA=
55+
github.com/brocaar/chirpstack-api/go/v3 v3.9.7/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
5656
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303 h1:LkE19tFPfDaRh1HIKWLCZKSBZNonMu0rIOJPCLvEjC0=
5757
github.com/brocaar/lorawan v0.0.0-20201030140234-f23da2d4a303/go.mod h1:CciUmQHIpUYTHHMeICtyamM7d+47VV+WBZ5ReDozpoc=
5858
github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw=

‎internal/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type Config struct {
5858
MQTT struct {
5959
EventTopicTemplate string `mapstructure:"event_topic_template"`
6060
CommandTopicTemplate string `mapstructure:"command_topic_template"`
61+
StateTopicTemplate string `mapstructure:"state_topic_template"`
6162
KeepAlive time.Duration `mapstructure:"keep_alive"`
6263
MaxReconnectInterval time.Duration `mapstructure:"max_reconnect_interval"`
6364
TerminateOnConnectError bool `mapstructure:"terminate_on_connect_error"`

‎internal/integration/integration.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ type Integration interface {
4747
// PublishEvent publishes the given event.
4848
PublishEvent(lorawan.EUI64, string, uuid.UUID, proto.Message) error
4949

50+
// PublishState publishes the given state as retained message.
51+
PublishState(lorawan.EUI64, string, proto.Message) error
52+
5053
// SetDownlinkFrameFunc sets the DownlinkFrame handler func.
5154
SetDownlinkFrameFunc(func(gw.DownlinkFrame))
5255

‎internal/integration/mqtt/auth/auth.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,18 @@ import (
88

99
mqtt "github.com/eclipse/paho.mqtt.golang"
1010
"github.com/pkg/errors"
11+
12+
"github.com/brocaar/lorawan"
1113
)
1214

1315
// Authentication defines the authentication interface.
1416
type Authentication interface {
1517
// Init applies the initial configuration.
1618
Init(*mqtt.ClientOptions) error
1719

20+
// GetGatewayID returns the GatewayID if available.
21+
GetGatewayID() *lorawan.EUI64
22+
1823
// Update updates the authentication options.
1924
Update(*mqtt.ClientOptions) error
2025

‎internal/integration/mqtt/auth/azure_iot_hub.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/pkg/errors"
1616

1717
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
18+
"github.com/brocaar/lorawan"
1819
)
1920

2021
// See:
@@ -201,6 +202,12 @@ func (a *AzureIoTHubAuthentication) Init(opts *mqtt.ClientOptions) error {
201202
return nil
202203
}
203204

205+
// GetGatewayID returns the GatewayID if available.
206+
// TODO: implement.
207+
func (a *AzureIoTHubAuthentication) GetGatewayID() *lorawan.EUI64 {
208+
return nil
209+
}
210+
204211
// Update updates the authentication options.
205212
func (a *AzureIoTHubAuthentication) Update(opts *mqtt.ClientOptions) error {
206213
if a.authType == authTypeSymmetric {

‎internal/integration/mqtt/auth/gcp_cloud_iot_core.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/pkg/errors"
1212

1313
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
14+
"github.com/brocaar/lorawan"
1415
)
1516

1617
// GCPCloudIoTCoreAuthentication implements the Google Cloud IoT Core authentication.
@@ -59,6 +60,12 @@ func (a *GCPCloudIoTCoreAuthentication) Init(opts *mqtt.ClientOptions) error {
5960
return nil
6061
}
6162

63+
// GetGatewayID returns the GatewayID if available.
64+
// TODO: implement.
65+
func (a *GCPCloudIoTCoreAuthentication) GetGatewayID() *lorawan.EUI64 {
66+
return nil
67+
}
68+
6269
// Update updates the authentication options.
6370
func (a *GCPCloudIoTCoreAuthentication) Update(opts *mqtt.ClientOptions) error {
6471
token := jwt.NewWithClaims(a.siginingMethod, jwt.StandardClaims{

‎internal/integration/mqtt/auth/generic.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66

77
mqtt "github.com/eclipse/paho.mqtt.golang"
88
"github.com/pkg/errors"
9+
log "github.com/sirupsen/logrus"
910

1011
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
12+
"github.com/brocaar/lorawan"
1113
)
1214

1315
// GenericAuthentication implements a generic MQTT authentication.
@@ -59,6 +61,24 @@ func (a *GenericAuthentication) Init(opts *mqtt.ClientOptions) error {
5961
return nil
6062
}
6163

64+
// GetGatewayID returns the GatewayID if available.
65+
func (a *GenericAuthentication) GetGatewayID() *lorawan.EUI64 {
66+
if a.clientID == "" {
67+
return nil
68+
}
69+
70+
// Try to decode the client ID as gateway ID.
71+
var gatewayID lorawan.EUI64
72+
if err := gatewayID.UnmarshalText([]byte(a.clientID)); err != nil {
73+
log.WithError(err).WithFields(log.Fields{
74+
"client_id": a.clientID,
75+
}).Warning("integration/mqtt/auth: could not decode client ID to gateway ID")
76+
return nil
77+
}
78+
79+
return &gatewayID
80+
}
81+
6282
// Update updates the authentication options.
6383
func (a *GenericAuthentication) Update(opts *mqtt.ClientOptions) error {
6484
return nil
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package auth
2+
3+
import (
4+
"testing"
5+
6+
"github.com/brocaar/chirpstack-gateway-bridge/internal/config"
7+
"github.com/brocaar/lorawan"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestGenericAuthentication(t *testing.T) {
12+
gatewayID := lorawan.EUI64{1, 2, 3, 4, 5, 6, 7, 8}
13+
14+
var conf config.Config
15+
conf.Integration.Marshaler = "json"
16+
conf.Integration.MQTT.EventTopicTemplate = "gateway/{{ .GatewayID }}/event/{{ .EventType }}"
17+
conf.Integration.MQTT.StateTopicTemplate = "gateway/{{ .GatewayID }}/state/{{ .StateType }}"
18+
conf.Integration.MQTT.CommandTopicTemplate = "gateway/{{ .GatewayID }}/command/#"
19+
conf.Integration.MQTT.Auth.Type = "generic"
20+
conf.Integration.MQTT.Auth.Generic.Servers = []string{"tcp://localhost:1883"}
21+
conf.Integration.MQTT.Auth.Generic.Username = "foo"
22+
conf.Integration.MQTT.Auth.Generic.Password = "bar"
23+
conf.Integration.MQTT.Auth.Generic.CleanSession = true
24+
conf.Integration.MQTT.Auth.Generic.ClientID = gatewayID.String()
25+
26+
t.Run("New", func(t *testing.T) {
27+
assert := require.New(t)
28+
29+
auth, err := NewGenericAuthentication(conf)
30+
assert.NoError(err)
31+
32+
t.Run("GetGatewayID", func(t *testing.T) {
33+
assert := require.New(t)
34+
assert.Equal(&gatewayID, auth.GetGatewayID())
35+
})
36+
})
37+
}

‎internal/integration/mqtt/backend.go

Lines changed: 146 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type Backend struct {
4343

4444
qos uint8
4545
eventTopicTemplate *template.Template
46+
stateTopicTemplate *template.Template
4647
commandTopicTemplate *template.Template
4748

4849
marshal func(msg proto.Message) ([]byte, error)
@@ -75,6 +76,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
7576

7677
conf.Integration.MQTT.EventTopicTemplate = "/devices/gw-{{ .GatewayID }}/events/{{ .EventType }}"
7778
conf.Integration.MQTT.CommandTopicTemplate = "/devices/gw-{{ .GatewayID }}/commands/#"
79+
conf.Integration.MQTT.StateTopicTemplate = ""
7880
case "azure_iot_hub":
7981
b.auth, err = auth.NewAzureIoTHubAuthentication(conf)
8082
if err != nil {
@@ -83,6 +85,7 @@ func NewBackend(conf config.Config) (*Backend, error) {
8385

8486
conf.Integration.MQTT.EventTopicTemplate = "devices/{{ .GatewayID }}/messages/events/{{ .EventType }}"
8587
conf.Integration.MQTT.CommandTopicTemplate = "devices/{{ .GatewayID }}/messages/devicebound/#"
88+
conf.Integration.MQTT.StateTopicTemplate = ""
8689
default:
8790
return nil, fmt.Errorf("integration/mqtt: unknown auth type: %s", conf.Integration.MQTT.Auth.Type)
8891
}
@@ -121,6 +124,13 @@ func NewBackend(conf config.Config) (*Backend, error) {
121124
return nil, errors.Wrap(err, "integration/mqtt: parse event-topic template error")
122125
}
123126

127+
if conf.Integration.MQTT.StateTopicTemplate != "" {
128+
b.stateTopicTemplate, err = template.New("state").Parse(conf.Integration.MQTT.StateTopicTemplate)
129+
if err != nil {
130+
return nil, errors.Wrap(err, "integration/mqtt: parse state-topic template error")
131+
}
132+
}
133+
124134
b.commandTopicTemplate, err = template.New("event").Parse(conf.Integration.MQTT.CommandTopicTemplate)
125135
if err != nil {
126136
return nil, errors.Wrap(err, "integration/mqtt: parse event-topic template error")
@@ -137,6 +147,43 @@ func NewBackend(conf config.Config) (*Backend, error) {
137147
return nil, errors.Wrap(err, "mqtt: init authentication error")
138148
}
139149

150+
if gatewayID := b.auth.GetGatewayID(); gatewayID != nil {
151+
log.WithFields(log.Fields{
152+
"gateway_id": gatewayID,
153+
}).Info("integration/mqtt: gateway id provided by authentication method")
154+
155+
// Add GatewayID to list of gateways we must subscribe to.
156+
b.gateways[*gatewayID] = struct{}{}
157+
158+
// As we know the Gateway ID and a state topic has been configured, we set
159+
// the last will and testament.
160+
if b.stateTopicTemplate != nil {
161+
pl := gw.ConnState{
162+
GatewayId: gatewayID[:],
163+
State: gw.ConnState_OFFLINE,
164+
}
165+
bb, err := b.marshal(&pl)
166+
if err != nil {
167+
return nil, errors.Wrap(err, "marshal error")
168+
}
169+
170+
topic := bytes.NewBuffer(nil)
171+
if err := b.stateTopicTemplate.Execute(topic, struct {
172+
GatewayID lorawan.EUI64
173+
StateType string
174+
}{*gatewayID, "conn"}); err != nil {
175+
return nil, errors.Wrap(err, "execute state template error")
176+
}
177+
178+
log.WithFields(log.Fields{
179+
"gateway_id": gatewayID,
180+
"topic": topic.String(),
181+
}).Info("integration/mqtt: setting last will and testament")
182+
183+
b.clientOpts.SetBinaryWill(topic.String(), bb, b.qos, true)
184+
}
185+
}
186+
140187
return &b, nil
141188
}
142189

@@ -153,6 +200,20 @@ func (b *Backend) Stop() error {
153200
b.connMux.Lock()
154201
defer b.connMux.Unlock()
155202

203+
b.gatewaysMux.Lock()
204+
defer b.gatewaysMux.Unlock()
205+
206+
// Set gateway state to offline for all gateways.
207+
for gatewayID := range b.gateways {
208+
pl := gw.ConnState{
209+
GatewayId: gatewayID[:],
210+
State: gw.ConnState_OFFLINE,
211+
}
212+
if err := b.PublishState(gatewayID, "conn", &pl); err != nil {
213+
log.WithError(err).Error("integration/mqtt: publish state error")
214+
}
215+
}
216+
156217
b.conn.Disconnect(250)
157218
b.connClosed = true
158219
return nil
@@ -183,6 +244,16 @@ func (b *Backend) SetRawPacketForwarderCommandFunc(f func(gw.RawPacketForwarderC
183244
// race conditions in case of connection issues. This way, the gateways map
184245
// always reflect the desired state.
185246
func (b *Backend) SetGatewaySubscription(subscribe bool, gatewayID lorawan.EUI64) error {
247+
// In this case we don't want to (un)subscribe as the Gateway ID is provided by
248+
// the authentication and is set before connect.
249+
if id := b.auth.GetGatewayID(); id != nil && *id == gatewayID {
250+
log.WithFields(log.Fields{
251+
"gateway_id": gatewayID,
252+
"subscribe": subscribe,
253+
}).Debug("integration/mqtt: ignoring SetGatewaySubscription as gateway id is set by authentication")
254+
return nil
255+
}
256+
186257
log.WithFields(log.Fields{
187258
"gateway_id": gatewayID,
188259
"subscribe": subscribe,
@@ -242,11 +313,48 @@ func (b *Backend) PublishEvent(gatewayID lorawan.EUI64, event string, id uuid.UU
242313
"exec": "exec_",
243314
"raw": "raw_",
244315
}
245-
return b.publish(gatewayID, event, log.Fields{
316+
return b.publishEvent(gatewayID, event, log.Fields{
246317
idPrefix[event] + "id": id,
247318
}, v)
248319
}
249320

321+
// PublishState publishes the given state as retained message.
322+
func (b *Backend) PublishState(gatewayID lorawan.EUI64, state string, v proto.Message) error {
323+
if b.stateTopicTemplate == nil {
324+
log.WithFields(log.Fields{
325+
"state": state,
326+
"gateway_id": gatewayID,
327+
}).Debug("integration/mqtt: ignoring publish state, no state_topic_template configured")
328+
return nil
329+
}
330+
331+
mqttStateCounter(state).Inc()
332+
333+
topic := bytes.NewBuffer(nil)
334+
if err := b.stateTopicTemplate.Execute(topic, struct {
335+
GatewayID lorawan.EUI64
336+
StateType string
337+
}{gatewayID, state}); err != nil {
338+
return errors.Wrap(err, "execute state template error")
339+
}
340+
341+
bytes, err := b.marshal(v)
342+
if err != nil {
343+
return errors.Wrap(err, "marshal message error")
344+
}
345+
346+
log.WithFields(log.Fields{
347+
"topic": topic.String(),
348+
"qos": b.qos,
349+
"state": state,
350+
"gateway_id": gatewayID,
351+
}).Info("integration/mqtt: publishing state")
352+
if token := b.conn.Publish(topic.String(), b.qos, true, bytes); token.Wait() && token.Error() != nil {
353+
return token.Error()
354+
}
355+
return nil
356+
}
357+
250358
func (b *Backend) connect() error {
251359
b.connMux.Lock()
252360
defer b.connMux.Unlock()
@@ -293,13 +401,10 @@ func (b *Backend) disconnect() error {
293401
func (b *Backend) reconnectLoop() {
294402
if b.auth.ReconnectAfter() > 0 {
295403
for {
296-
b.connMux.RLock()
297-
closed := b.connClosed
298-
b.connMux.RUnlock()
299-
300-
if closed {
404+
if b.isClosed() {
301405
break
302406
}
407+
303408
time.Sleep(b.auth.ReconnectAfter())
304409
log.Info("mqtt: re-connect triggered")
305410

@@ -329,13 +434,16 @@ func (b *Backend) onConnected(c paho.Client) {
329434

330435
func (b *Backend) subscribeLoop() {
331436
for {
332-
b.connMux.RLock()
333-
closed := b.connClosed
334-
b.connMux.RUnlock()
335-
if closed {
437+
time.Sleep(time.Millisecond * 100)
438+
439+
if b.isClosed() {
336440
break
337441
}
338442

443+
if !b.conn.IsConnected() {
444+
continue
445+
}
446+
339447
var subscribe []lorawan.EUI64
340448
var unsubscribe []lorawan.EUI64
341449

@@ -361,23 +469,40 @@ func (b *Backend) subscribeLoop() {
361469
b.gatewaysMux.RUnlock()
362470

363471
for _, gatewayID := range subscribe {
472+
statePL := gw.ConnState{
473+
GatewayId: gatewayID[:],
474+
State: gw.ConnState_ONLINE,
475+
}
476+
364477
if err := b.subscribeGateway(gatewayID); err != nil {
365478
log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: subscribe gateway error")
366479
} else {
367-
b.gatewaysSubscribed[gatewayID] = struct{}{}
480+
if err := b.PublishState(gatewayID, "conn", &statePL); err != nil {
481+
log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: publish conn state error")
482+
} else {
483+
b.gatewaysSubscribed[gatewayID] = struct{}{}
484+
}
368485
}
369486
}
370487

371488
for _, gatewayID := range unsubscribe {
489+
statePL := gw.ConnState{
490+
GatewayId: gatewayID[:],
491+
State: gw.ConnState_OFFLINE,
492+
}
493+
372494
if err := b.unsubscribeGateway(gatewayID); err != nil {
373495
log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: unsubscribe gateway error")
374496
} else {
375-
delete(b.gatewaysSubscribed, gatewayID)
497+
if err := b.PublishState(gatewayID, "conn", &statePL); err != nil {
498+
log.WithError(err).WithField("gateway_id", gatewayID).Error("integration/mqtt: publish conn state error")
499+
} else {
500+
delete(b.gatewaysSubscribed, gatewayID)
501+
}
376502
}
377503
}
378504

379505
b.gatewaysSubscribedMux.Unlock()
380-
time.Sleep(time.Millisecond * 100)
381506
}
382507
}
383508

@@ -513,7 +638,7 @@ func (b *Backend) handleCommand(c paho.Client, msg paho.Message) {
513638
}
514639
}
515640

516-
func (b *Backend) publish(gatewayID lorawan.EUI64, event string, fields log.Fields, msg proto.Message) error {
641+
func (b *Backend) publishEvent(gatewayID lorawan.EUI64, event string, fields log.Fields, msg proto.Message) error {
517642
topic := bytes.NewBuffer(nil)
518643
if err := b.eventTopicTemplate.Execute(topic, struct {
519644
GatewayID lorawan.EUI64
@@ -537,3 +662,10 @@ func (b *Backend) publish(gatewayID lorawan.EUI64, event string, fields log.Fiel
537662
}
538663
return nil
539664
}
665+
666+
// isClosed returns true when the integration is shutting down.
667+
func (b *Backend) isClosed() bool {
668+
b.connMux.RLock()
669+
defer b.connMux.RUnlock()
670+
return b.connClosed
671+
}

‎internal/integration/mqtt/backend_test.go

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,42 +55,105 @@ func (ts *MQTTBackendTestSuite) SetupSuite() {
5555
var conf config.Config
5656
conf.Integration.Marshaler = "json"
5757
conf.Integration.MQTT.EventTopicTemplate = "gateway/{{ .GatewayID }}/event/{{ .EventType }}"
58+
conf.Integration.MQTT.StateTopicTemplate = "gateway/{{ .GatewayID }}/state/{{ .StateType }}"
5859
conf.Integration.MQTT.CommandTopicTemplate = "gateway/{{ .GatewayID }}/command/#"
5960
conf.Integration.MQTT.Auth.Type = "generic"
6061
conf.Integration.MQTT.Auth.Generic.Servers = []string{server}
6162
conf.Integration.MQTT.Auth.Generic.Username = username
6263
conf.Integration.MQTT.Auth.Generic.Password = password
6364
conf.Integration.MQTT.Auth.Generic.CleanSession = true
65+
conf.Integration.MQTT.Auth.Generic.ClientID = ts.gatewayID.String()
6466

6567
var err error
6668
ts.backend, err = NewBackend(conf)
6769
assert.NoError(err)
6870
assert.NoError(ts.backend.Start())
69-
assert.NoError(ts.backend.SetGatewaySubscription(true, ts.gatewayID))
70-
time.Sleep(100 * time.Millisecond)
71+
72+
// The subscribe loop runs every 100ms, we will wait twice the time to make
73+
// sure the subscription is set.
74+
time.Sleep(200 * time.Millisecond)
7175
}
7276

7377
func (ts *MQTTBackendTestSuite) TearDownSuite() {
7478
ts.mqttClient.Disconnect(0)
7579
ts.backend.Stop()
7680
}
7781

82+
func (ts *MQTTBackendTestSuite) TestLastWill() {
83+
assert := require.New(ts.T())
84+
85+
assert.True(ts.backend.clientOpts.WillEnabled)
86+
assert.Equal("gateway/0807060504030201/state/conn", ts.backend.clientOpts.WillTopic)
87+
assert.Equal(`{"gatewayID":"CAcGBQQDAgE=","state":"OFFLINE"}`, string(ts.backend.clientOpts.WillPayload))
88+
assert.True(ts.backend.clientOpts.WillRetained)
89+
}
90+
91+
func (ts *MQTTBackendTestSuite) TestConnStateOnline() {
92+
assert := require.New(ts.T())
93+
94+
connStateChan := make(chan gw.ConnState)
95+
token := ts.mqttClient.Subscribe("gateway/0807060504030201/state/conn", 0, func(c paho.Client, msg paho.Message) {
96+
var pl gw.ConnState
97+
assert.NoError(ts.backend.unmarshal(msg.Payload(), &pl))
98+
connStateChan <- pl
99+
})
100+
token.Wait()
101+
assert.NoError(token.Error())
102+
103+
assert.Equal(gw.ConnState{
104+
GatewayId: ts.gatewayID[:],
105+
State: gw.ConnState_ONLINE,
106+
}, <-connStateChan)
107+
108+
token = ts.mqttClient.Unsubscribe("gateway/0807060504030201/state/conn")
109+
token.Wait()
110+
assert.NoError(token.Error())
111+
}
112+
78113
func (ts *MQTTBackendTestSuite) TestSubscribeGateway() {
79114
assert := require.New(ts.T())
80115

81116
gatewayID := lorawan.EUI64{1, 2, 3, 4, 5, 6, 7, 8}
117+
connStateChan := make(chan gw.ConnState)
82118

83119
assert.NoError(ts.backend.SetGatewaySubscription(true, gatewayID))
84120
_, ok := ts.backend.gateways[gatewayID]
85121
assert.True(ok)
86122

123+
// Wait 200ms to make sure that the subscribe loop has picked up the
124+
// change and set the ConnState. If we subscribe too early, it is
125+
// possible that we get an (old) OFFLINE retained message.
126+
time.Sleep(200 * time.Millisecond)
127+
128+
token := ts.mqttClient.Subscribe("gateway/0102030405060708/state/conn", 0, func(c paho.Client, msg paho.Message) {
129+
var pl gw.ConnState
130+
assert.NoError(ts.backend.unmarshal(msg.Payload(), &pl))
131+
connStateChan <- pl
132+
})
133+
token.Wait()
134+
assert.NoError(token.Error())
135+
136+
assert.Equal(gw.ConnState{
137+
GatewayId: gatewayID[:],
138+
State: gw.ConnState_ONLINE,
139+
}, <-connStateChan)
140+
87141
ts.T().Run("Unsubscribe", func(t *testing.T) {
88142
assert := require.New(t)
89143

90144
assert.NoError(ts.backend.SetGatewaySubscription(false, gatewayID))
91145
_, ok := ts.backend.gateways[gatewayID]
92146
assert.False(ok)
147+
148+
assert.Equal(gw.ConnState{
149+
GatewayId: gatewayID[:],
150+
State: gw.ConnState_OFFLINE,
151+
}, <-connStateChan)
93152
})
153+
154+
token = ts.mqttClient.Unsubscribe("gateway/0102030405060708/state/conn")
155+
token.Wait()
156+
assert.NoError(token.Error())
94157
}
95158

96159
func (ts *MQTTBackendTestSuite) TestPublishUplinkFrame() {
@@ -169,10 +232,38 @@ func (ts *MQTTBackendTestSuite) TestPublishDownlinkTXAck() {
169232
assert.NoError(token.Error())
170233

171234
assert.NoError(ts.backend.PublishEvent(ts.gatewayID, "ack", id, &txAck))
235+
172236
txAckReceived := <-txAckChan
173237
assert.Equal(txAck, txAckReceived)
174238
}
175239

240+
func (ts *MQTTBackendTestSuite) TestPublishConnState() {
241+
assert := require.New(ts.T())
242+
243+
// We publish first
244+
state := gw.ConnState{
245+
GatewayId: ts.gatewayID[:],
246+
State: gw.ConnState_ONLINE,
247+
}
248+
assert.NoError(ts.backend.PublishState(ts.gatewayID, "conn", &state))
249+
250+
// And then subscribe to test that the message has been retained
251+
stateChan := make(chan gw.ConnState)
252+
token := ts.mqttClient.Subscribe("gateway/0807060504030201/state/conn", 0, func(c paho.Client, msg paho.Message) {
253+
var pl gw.ConnState
254+
assert.NoError(ts.backend.unmarshal(msg.Payload(), &pl))
255+
stateChan <- pl
256+
})
257+
token.Wait()
258+
assert.NoError(token.Error())
259+
260+
assert.Equal(state, <-stateChan)
261+
262+
token = ts.mqttClient.Unsubscribe("gateway/0807060504030201/state/conn")
263+
token.Wait()
264+
assert.NoError(token.Error())
265+
}
266+
176267
func (ts *MQTTBackendTestSuite) TestDownlinkFrameHandler() {
177268
assert := require.New(ts.T())
178269
downlinkFrameChan := make(chan gw.DownlinkFrame, 1)

‎internal/integration/mqtt/metrics.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ var (
1111
Help: "The number of gateway events published by the MQTT integration (per event).",
1212
}, []string{"event"})
1313

14+
sc = promauto.NewCounterVec(prometheus.CounterOpts{
15+
Name: "integration_mqtt_state_count",
16+
Help: "The number of gateway states published by the MQTT integration (per state).",
17+
}, []string{"state"})
18+
1419
cc = promauto.NewCounterVec(prometheus.CounterOpts{
1520
Name: "integration_mqtt_command_count",
1621
Help: "The number of commands received by the MQTT integration (per command).",
@@ -36,6 +41,10 @@ func mqttEventCounter(e string) prometheus.Counter {
3641
return pc.With(prometheus.Labels{"event": e})
3742
}
3843

44+
func mqttStateCounter(s string) prometheus.Counter {
45+
return sc.With(prometheus.Labels{"state": s})
46+
}
47+
3948
func mqttCommandCounter(c string) prometheus.Counter {
4049
return cc.With(prometheus.Labels{"command": c})
4150
}

0 commit comments

Comments
 (0)
Please sign in to comment.