Skip to content

Important MQTT features: NATS to MQTT QoS1/2 and MQTT shared subscriptions #6859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
toshede opened this issue May 1, 2025 · 2 comments
Open
Labels
proposal Enhancement idea or proposal

Comments

@toshede
Copy link

toshede commented May 1, 2025

Proposed change

The current MQTT interface needs very little enhancement to become fully useful:

  1. NATS Jetstream -> MQTT QoS 1/2

There are millions of devices speaking MQTT. A NATS client currently cannot send a reliable message (QoS>0) to these devices, so one has to open an additional MQTT connection and send the message there with QoS1+. Enabling Jetstream messages to be bridged to MQTT with QoS1/2 will solve this gap.

  1. Shared subscriptions. This most important feature, although part of the v5 specification, can be implemented also in MQTT v3.x. (There are MQTT brokers that already do this.) This can be mapped to queue groups.

Currently clients have to use the NATS protocol if they want to operate in a high available configuration. Adding this feature to the existing MQTT 3.x interface will allow MQTT protocol to be used also for the backend. NATS can become a considerable alternative to established MQTT brokers and ease migration of existing projects.
I know there are a lot of requests for MQTT v5 and it would be great, but imho the most of us are missing these two features.

Use case

Currently the following use case cannot be solved with only one connection from the backend to NATS:

MQTT IoT device -> (MQTT request with QoS 1) -> NATS Server -> Backend consumer (In a Queue Group/Shared subscription) -> NATS Server -> (MQTT Response with QoS 1) -> MQTT IoT Device

To make it work with NATS, It needs an additional MQTT channel from the backend consumer.

Contribution

No response

@toshede toshede added the proposal Enhancement idea or proposal label May 1, 2025
@roeschter
Copy link

First about high availability. As far as I remember the MQTT subsystem in NATS will be in replicas=3 high available Jetstream mode when used in a cluster with at least 3 Jetstream node.

@roeschter
Copy link

About sending and receiving QOS 1/2 messages

NATS uses (hidden) stream to store QoS 1/2 messages

Streams used

Stream Description
$MQTT_msgs Stores messages for persistent sessions. Interest based. One consumer per session. Metadata is stored in $MQTT_sess
$MQTT_out QoS temporary status messages for delivery. Should be (almost ) empty.
$MQTT_qos2in QoS2 deduplication only. Should be (almost ) empty.
$MQTT_sess Stores session. Non persistent session are deleted when client disconnects. Persistent sessions stay.
$MQTT_rmsgs Retained messages. Stores only the last message per subject. Last message retention is independent of persistent interest.

Can I send persistent messages to MQTT from NATS (QoS 1 and 2)

You can directly send Jetstream messages to $MQTT.msgs.<subject>

Note that the Nmqtt-Pub header MUST be set to the desired QoS.

Can I receive Jetstream message through MQTT?

You can impersonate a MQTT session (recommened). OR directly create and additional consumer on $MQTT_msgs (not recommended).

  1. Create a non volatile MQTT consumer using the MQTT API (Curl is sufficient for the task.
  2. Find the named MQTT sessions in $MQTT_sess and extract the NATS durable consumer name
  3. COnnect a regular NATS consumer instance on $MQTT_msgs with the discovered consumer name.

Sessions example - $MQTT_sess

Subject: $MQTT.sess.M75CkzIW

{
  "origin": "SCGxHPF8",
  "id": "mosquitto01",
  "subs": {
    "foo": 1
  },
  "cons": {
    "foo": {
      "durable_name": "M75CkzIW_c7Od0knFHyr7TU8RilU2IR",
      "deliver_policy": "new",
      "ack_policy": "explicit",
      "ack_wait": 30000000000,
      "filter_subject": "$MQTT.msgs.foo",
      "replay_policy": "instant",
      "max_ack_pending": 1024,
      "deliver_subject": "$MQTT.sub.c7Od0knFHyr7TU8RilU2DV",
      "num_replicas": 0
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal
Projects
None yet
Development

No branches or pull requests

2 participants