Skip to content

Commit 3af7e2f

Browse files
committed
Add mesh event / command payloads.
This refactors the mesh-heartbeat into a generic event/mesh message, which can be used for 'known' event and proprietary events. This also adds a command/mesh message, which can be used for sending commands to the Relay Gateways within the Gateway Mesh (this depends on using the ChirpStack Gateway Mesh).
1 parent daab3fe commit 3af7e2f

File tree

8 files changed

+182
-142
lines changed

8 files changed

+182
-142
lines changed

Cargo.lock

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"usage",
1717
"derive",
1818
] }
19-
chirpstack_api = { version = "4.12", default-features = false, features = [
19+
chirpstack_api = { version = "4.13.0-test.1", default-features = false, features = [
2020
"json",
2121
] }
2222
lrwn_filters = { version = "4.9", features = ["serde"] }
@@ -26,7 +26,6 @@
2626
log = "0.4"
2727
simple_logger = "5.0"
2828
syslog = "7.0"
29-
prost = "0.13"
3029
anyhow = "1.0"
3130
toml = "0.8"
3231
chrono = "0.4"

src/backend/concentratord.rs

Lines changed: 81 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,17 @@
1-
use std::io::Cursor;
21
use std::sync::{Arc, Mutex};
32
use std::thread::sleep;
43
use std::time::Duration;
54

65
use anyhow::Result;
76
use async_trait::async_trait;
8-
use chirpstack_api::gw;
7+
use chirpstack_api::{gw, prost::Message};
98
use log::{debug, error, info, trace, warn};
10-
use prost::Message;
119
use tokio::task;
1210

1311
use super::Backend as BackendTrait;
1412
use crate::config::Configuration;
1513
use crate::metadata;
16-
use crate::mqtt::{send_gateway_stats, send_mesh_heartbeat, send_tx_ack, send_uplink_frame};
14+
use crate::mqtt::{send_gateway_stats, send_mesh_event, send_tx_ack, send_uplink_frame};
1715

1816
pub struct Backend {
1917
gateway_id: String,
@@ -45,26 +43,31 @@ impl Backend {
4543

4644
info!("Reading gateway id");
4745

48-
// send 'gateway_id' command with empty payload.
49-
cmd_sock.send("gateway_id", zmq::SNDMORE)?;
50-
cmd_sock.send("", 0)?;
46+
// Request Gateway ID.
47+
let req = gw::Command {
48+
command: Some(gw::command::Command::GetGatewayId(
49+
gw::GetGatewayIdRequest {},
50+
)),
51+
};
52+
cmd_sock.send(req.encode_to_vec(), 0)?;
5153

5254
// set poller so that we can timeout after 100ms
5355
let mut items = [cmd_sock.as_poll_item(zmq::POLLIN)];
5456
zmq::poll(&mut items, 100)?;
5557
if !items[0].is_readable() {
5658
return Err(anyhow!("Could not read gateway id"));
5759
}
58-
let gateway_id = cmd_sock.recv_bytes(0)?;
59-
if gateway_id.len() != 8 {
60+
61+
// Read response.
62+
let resp = cmd_sock.recv_bytes(0)?;
63+
let resp = gw::GetGatewayIdResponse::decode(resp.as_slice())?;
64+
if resp.gateway_id.len() != 16 {
6065
return Err(anyhow!(
61-
"Invalid gateway id, expected 8 bytes, received {}",
62-
gateway_id.len()
66+
"Invalid Gateway ID length, gateway_id: {}",
67+
resp.gateway_id
6368
));
6469
}
65-
let gateway_id = hex::encode(gateway_id);
66-
67-
info!("Received gateway id, gateway_id: {}", gateway_id);
70+
info!("Received gateway id, gateway_id: {}", resp.gateway_id);
6871

6972
tokio::spawn({
7073
let forward_crc_ok = conf.backend.filters.forward_crc_ok;
@@ -88,18 +91,17 @@ impl Backend {
8891
});
8992

9093
Ok(Backend {
91-
gateway_id,
94+
gateway_id: resp.gateway_id,
9295
ctx: zmq_ctx,
9396
cmd_url: conf.backend.concentratord.command_url.clone(),
9497
cmd_sock: Mutex::new(cmd_sock),
9598
})
9699
}
97100

98-
fn send_command(&self, cmd: &str, b: &[u8]) -> Result<Vec<u8>> {
101+
fn send_command(&self, cmd: gw::Command) -> Result<Vec<u8>> {
99102
let res = || -> Result<Vec<u8>> {
100103
let cmd_sock = self.cmd_sock.lock().unwrap();
101-
cmd_sock.send(cmd, zmq::SNDMORE)?;
102-
cmd_sock.send(b, 0)?;
104+
cmd_sock.send(cmd.encode_to_vec(), 0)?;
103105

104106
// set poller so that we can timeout after 100ms
105107
let mut items = [cmd_sock.as_poll_item(zmq::POLLIN)];
@@ -109,8 +111,8 @@ impl Backend {
109111
}
110112

111113
// red tx ack response
112-
let resp_b: &[u8] = &cmd_sock.recv_bytes(0)?;
113-
Ok(resp_b.to_vec())
114+
let resp_b = cmd_sock.recv_bytes(0)?;
115+
Ok(resp_b)
114116
}();
115117

116118
if res.is_err() {
@@ -153,13 +155,16 @@ impl BackendTrait for Backend {
153155
Ok(self.gateway_id.clone())
154156
}
155157

156-
async fn send_downlink_frame(&self, pl: &gw::DownlinkFrame) -> Result<()> {
158+
async fn send_downlink_frame(&self, pl: gw::DownlinkFrame) -> Result<()> {
157159
info!("Sending downlink frame, downlink_id: {}", pl.downlink_id);
160+
let downlink_id = pl.downlink_id;
158161

159162
let tx_ack = {
160-
let b = pl.encode_to_vec();
161-
let resp_b = self.send_command("down", &b)?;
162-
gw::DownlinkTxAck::decode(&mut Cursor::new(resp_b))?
163+
let cmd = gw::Command {
164+
command: Some(gw::command::Command::SendDownlinkFrame(pl)),
165+
};
166+
let resp_b = self.send_command(cmd)?;
167+
gw::DownlinkTxAck::decode(resp_b.as_slice())?
163168
};
164169

165170
let ack_items: Vec<String> = tx_ack
@@ -170,17 +175,30 @@ impl BackendTrait for Backend {
170175

171176
info!(
172177
"Received ack, items: {:?}, downlink_id: {}",
173-
ack_items, pl.downlink_id
178+
ack_items, downlink_id
174179
);
175180

176181
send_tx_ack(&tx_ack).await
177182
}
178183

179-
async fn send_configuration_command(&self, pl: &gw::GatewayConfiguration) -> Result<()> {
184+
async fn send_configuration_command(&self, pl: gw::GatewayConfiguration) -> Result<()> {
180185
info!("Sending configuration command, version: {}", pl.version);
181186

182-
let b = pl.encode_to_vec();
183-
let _ = self.send_command("config", &b)?;
187+
let cmd = gw::Command {
188+
command: Some(gw::command::Command::SetGatewayConfiguration(pl)),
189+
};
190+
let _ = self.send_command(cmd)?;
191+
192+
Ok(())
193+
}
194+
195+
async fn send_mesh_command(&self, pl: gw::MeshCommand) -> Result<()> {
196+
info!("Sending mesh command");
197+
198+
let cmd = gw::Command {
199+
command: Some(gw::command::Command::Mesh(pl)),
200+
};
201+
let _ = self.send_command(cmd)?;
184202

185203
Ok(())
186204
}
@@ -197,37 +215,37 @@ async fn event_loop(
197215
let event_sock = Arc::new(Mutex::new(event_sock));
198216

199217
loop {
200-
let res = task::spawn_blocking({
218+
let event = task::spawn_blocking({
201219
let event_sock = event_sock.clone();
202220

203-
move || -> Result<Vec<Vec<u8>>> {
221+
move || -> Result<Option<gw::Event>> {
204222
let event_sock = event_sock.lock().unwrap();
205223

206224
// set poller so that we can timeout after 100ms
207225
let mut items = [event_sock.as_poll_item(zmq::POLLIN)];
208226
zmq::poll(&mut items, 100)?;
209227
if !items[0].is_readable() {
210-
return Ok(vec![]);
228+
return Ok(None);
211229
}
212230

213-
let msg = event_sock.recv_multipart(0)?;
214-
if msg.len() != 2 {
215-
return Err(anyhow!("Event must have two frames"));
216-
}
217-
Ok(msg)
231+
let msg = event_sock.recv_bytes(0)?;
232+
Ok(Some(gw::Event::decode(msg.as_slice())?))
218233
}
219234
})
220235
.await;
221236

222-
match res {
223-
Ok(Ok(msg)) => {
224-
if msg.len() != 2 {
225-
continue;
226-
}
237+
let event = match event {
238+
Ok(v) => v,
239+
Err(e) => {
240+
error!("Task error: {}", e);
241+
continue;
242+
}
243+
};
227244

245+
match event {
246+
Ok(Some(v)) => {
228247
if let Err(err) = handle_event_msg(
229-
&msg[0],
230-
&msg[1],
248+
v,
231249
&filters,
232250
forward_crc_ok,
233251
forward_crc_invalid,
@@ -239,72 +257,60 @@ async fn event_loop(
239257
continue;
240258
}
241259
}
242-
Ok(Err(err)) => {
243-
error!("Receive event error, error: {}", err);
244-
continue;
245-
}
246-
Err(err) => {
247-
error!("{}", err);
260+
Ok(None) => continue,
261+
Err(e) => {
262+
error!("Error reading event, error: {}", e);
248263
continue;
249264
}
250265
}
251266
}
252267
}
253268

254269
async fn handle_event_msg(
255-
event: &[u8],
256-
pl: &[u8],
270+
event: gw::Event,
257271
filters: &lrwn_filters::Filters,
258272
forward_crc_ok: bool,
259273
forward_crc_invalid: bool,
260274
forward_crc_missing: bool,
261275
) -> Result<()> {
262-
let event = String::from_utf8(event.to_vec())?;
263-
let pl = Cursor::new(pl.to_vec());
264-
265-
match event.as_str() {
266-
"up" => {
267-
let pl = gw::UplinkFrame::decode(pl)?;
268-
if let Some(rx_info) = &pl.rx_info {
276+
match event.event {
277+
Some(gw::event::Event::UplinkFrame(v)) => {
278+
if let Some(rx_info) = &v.rx_info {
269279
if !((rx_info.crc_status() == gw::CrcStatus::CrcOk && forward_crc_ok)
270280
|| (rx_info.crc_status() == gw::CrcStatus::BadCrc && forward_crc_invalid)
271281
|| (rx_info.crc_status() == gw::CrcStatus::NoCrc && forward_crc_missing))
272282
{
273283
debug!(
274284
"Ignoring uplink frame because of forward_crc_ flags, uplink_id: {}",
275-
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
285+
v.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
276286
);
277287
return Ok(());
278288
}
279289
}
280290

281-
if lrwn_filters::matches(&pl.phy_payload, filters) {
291+
if lrwn_filters::matches(&v.phy_payload, filters) {
282292
info!(
283293
"Received uplink frame, uplink_id: {}",
284-
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
294+
v.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default(),
285295
);
286-
send_uplink_frame(&pl).await?;
296+
send_uplink_frame(&v).await?;
287297
} else {
288298
debug!(
289299
"Ignoring uplink frame because of dev_addr and join_eui filters, uplink_id: {}",
290-
pl.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default()
300+
v.rx_info.as_ref().map(|v| v.uplink_id).unwrap_or_default()
291301
);
292302
}
293303
}
294-
"stats" => {
295-
let mut pl = gw::GatewayStats::decode(pl)?;
296-
info!("Received gateway stats");
297-
pl.metadata.extend(metadata::get().await?);
298-
send_gateway_stats(&pl).await?;
299-
}
300-
"mesh_heartbeat" => {
301-
let pl = gw::MeshHeartbeat::decode(pl)?;
302-
info!("Received mesh heartbeat");
303-
send_mesh_heartbeat(&pl).await?;
304+
Some(gw::event::Event::GatewayStats(mut v)) => {
305+
info!("received gateway stats");
306+
v.metadata.extend(metadata::get().await?);
307+
send_gateway_stats(&v).await?;
304308
}
305-
_ => {
306-
return Err(anyhow!("Unexpected event: {}", event));
309+
Some(gw::event::Event::Mesh(v)) => {
310+
info!("Received mesh event");
311+
send_mesh_event(&v).await?;
307312
}
313+
None => {}
308314
}
309315

310316
Ok(())

src/backend/mod.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ static BACKEND: OnceCell<Box<dyn Backend + Sync + Send>> = OnceCell::const_new()
1919
#[async_trait]
2020
pub trait Backend {
2121
async fn get_gateway_id(&self) -> Result<String>;
22-
async fn send_downlink_frame(&self, pl: &gw::DownlinkFrame) -> Result<()>;
23-
async fn send_configuration_command(&self, pl: &gw::GatewayConfiguration) -> Result<()>;
22+
async fn send_downlink_frame(&self, pl: gw::DownlinkFrame) -> Result<()>;
23+
async fn send_configuration_command(&self, pl: gw::GatewayConfiguration) -> Result<()>;
24+
async fn send_mesh_command(&self, pl: gw::MeshCommand) -> Result<()>;
2425
}
2526

2627
pub async fn setup(conf: &Configuration) -> Result<()> {
@@ -74,18 +75,26 @@ pub async fn get_gateway_id() -> Result<String> {
7475
Err(anyhow!("BACKEND is not set"))
7576
}
7677

77-
pub async fn send_downlink_frame(pl: &gw::DownlinkFrame) -> Result<()> {
78+
pub async fn send_downlink_frame(pl: gw::DownlinkFrame) -> Result<()> {
7879
if let Some(b) = BACKEND.get() {
7980
return b.send_downlink_frame(pl).await;
8081
}
8182

8283
Err(anyhow!("BACKEND is not set"))
8384
}
8485

85-
pub async fn send_configuration_command(pl: &gw::GatewayConfiguration) -> Result<()> {
86+
pub async fn send_configuration_command(pl: gw::GatewayConfiguration) -> Result<()> {
8687
if let Some(b) = BACKEND.get() {
8788
return b.send_configuration_command(pl).await;
8889
}
8990

9091
Err(anyhow!("BACKEND is not set"))
9192
}
93+
94+
pub async fn send_mesh_command(pl: gw::MeshCommand) -> Result<()> {
95+
if let Some(b) = BACKEND.get() {
96+
return b.send_mesh_command(pl).await;
97+
}
98+
99+
Err(anyhow!("BACKEND is not set"))
100+
}

0 commit comments

Comments
 (0)