@@ -16,6 +16,11 @@ import org.eclipse.paho.client.mqttv3.MqttConnectOptions
1616import org.eclipse.paho.client.mqttv3.MqttMessage
1717import com.github.shamil.Xid
1818
19+ /* *
20+ * MQTT 客户端 (v2 — 纯订阅模式)
21+ * 只订阅服务端推送的通知主题 heji/notify/{userId}/,不再发布任何消息。
22+ * 数据同步通过 HTTP API 完成(SyncTrigger 独立运行)。
23+ */
1924class MqttSyncClient {
2025 private var mqttClient: MqttAndroidClient ? = null
2126 private var brokerUrl = " "
@@ -30,11 +35,12 @@ class MqttSyncClient {
3035 private val syncScope = CoroutineScope (Dispatchers .Main + syncJob)
3136
3237 private val syncReceiver by lazy { SyncReceiver () }
33- private val syncTrigger by lazy { SyncTrigger (syncScope) }
3438
3539 companion object {
36- private const val TOPIC_BOOK_SYNC = " heji/book/%s/sync"
37- private const val TOPIC_USER_SYNC = " heji/user/%s/sync"
40+ // 下行通知主题:服务端 → 客户端
41+ private const val TOPIC_NOTIFY_BOOK = " heji/notify/%s/book"
42+ private const val TOPIC_NOTIFY_BILL = " heji/notify/%s/bill"
43+ private const val TOPIC_NOTIFY_IMAGE = " heji/notify/%s/image"
3844
3945 @Volatile
4046 private var instance: MqttSyncClient ? = null
@@ -50,24 +56,6 @@ class MqttSyncClient {
5056 }
5157 }
5258
53- fun send (message : SyncMessage ): Boolean {
54- val mqttMessage = MqttMessage ().apply {
55- payload = message.toJson().toByteArray()
56- qos = 1
57- isRetained = false
58- }
59- return try {
60- val bookId = Config .book.id
61- val topic = String .format(TOPIC_BOOK_SYNC , bookId)
62- mqttClient?.publish(topic, mqttMessage)
63- LogUtils .d(" MQTT published to $topic " )
64- true
65- } catch (e: Exception ) {
66- LogUtils .e(" MQTT publish failed" , e)
67- false
68- }
69- }
70-
7159 fun close () {
7260 LogUtils .d(" close mqtt" )
7361 try {
@@ -101,20 +89,18 @@ class MqttSyncClient {
10189 status = Status .CONNECTED
10290 subscribeTopics()
10391 syncReceiver.register()
104- syncTrigger.register()
10592 }
10693
10794 override fun connectionLost (cause : Throwable ? ) {
10895 LogUtils .w(" MQTT connection lost: ${cause?.message} " )
10996 status = Status .DISCONNECTED
11097 syncReceiver.unregister()
111- syncTrigger.unregister()
11298 }
11399
114100 override fun messageArrived (topic : String? , message : MqttMessage ? ) {
115101 message?.let {
116102 val text = String (it.payload)
117- LogUtils .d(" MQTT message from $topic : $text " )
103+ LogUtils .d(" MQTT notify from $topic : $text " )
118104 syncScope.launch(Dispatchers .IO ) {
119105 syncReceiver.onReceiver(text)
120106 }
@@ -152,16 +138,16 @@ class MqttSyncClient {
152138 private fun subscribeTopics () {
153139 try {
154140 val userId = Config .user.id
155- val userTopic = String .format(TOPIC_USER_SYNC , userId)
156- mqttClient?.subscribe(userTopic, 1 )
157- LogUtils .d(" MQTT subscribed: $userTopic " )
158-
159- val bookId = Config .book.id
160- val bookTopic = String .format(TOPIC_BOOK_SYNC , bookId)
141+ val bookTopic = String .format(TOPIC_NOTIFY_BOOK , userId)
142+ val billTopic = String .format(TOPIC_NOTIFY_BILL , userId)
143+ val imageTopic = String .format(TOPIC_NOTIFY_IMAGE , userId)
161144 mqttClient?.subscribe(bookTopic, 1 )
162- LogUtils .d(" MQTT subscribed: $bookTopic " )
145+ mqttClient?.subscribe(billTopic, 1 )
146+ mqttClient?.subscribe(imageTopic, 1 )
147+ LogUtils .d(" MQTT subscribed: $bookTopic , $billTopic , $imageTopic " )
163148 } catch (e: Exception ) {
164149 LogUtils .e(" MQTT subscribe error" , e)
165150 }
166151 }
167152}
153+
0 commit comments