@@ -65,6 +65,7 @@ void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeou
65
65
c -> readbuf = readbuf ;
66
66
c -> readbuf_size = readbuf_size ;
67
67
c -> isconnected = 0 ;
68
+ c -> cleansession = 0 ;
68
69
c -> ping_outstanding = 0 ;
69
70
c -> defaultMessageHandler = NULL ;
70
71
c -> next_packetid = 1 ;
@@ -232,6 +233,22 @@ int keepalive(MQTTClient* c)
232
233
}
233
234
234
235
236
+ void MQTTCleanSession (MQTTClient * c )
237
+ {
238
+ for (int i = 0 ; i < MAX_MESSAGE_HANDLERS ; ++ i )
239
+ c -> messageHandlers [i ].topicFilter = NULL ;
240
+ }
241
+
242
+
243
+ void MQTTCloseSession (MQTTClient * c )
244
+ {
245
+ c -> ping_outstanding = 0 ;
246
+ c -> isconnected = 0 ;
247
+ if (c -> cleansession )
248
+ MQTTCleanSession (c );
249
+ }
250
+
251
+
235
252
int cycle (MQTTClient * c , Timer * timer )
236
253
{
237
254
int len = 0 ,
@@ -309,8 +326,8 @@ int cycle(MQTTClient* c, Timer* timer)
309
326
exit :
310
327
if (rc == SUCCESS )
311
328
rc = packet_type ;
312
- else
313
- c -> isconnected = 0 ;
329
+ else if ( c -> isconnected )
330
+ MQTTCloseSession ( c ) ;
314
331
return rc ;
315
332
}
316
333
@@ -381,6 +398,8 @@ int waitfor(MQTTClient* c, int packet_type, Timer* timer)
381
398
}
382
399
383
400
401
+
402
+
384
403
int MQTTConnectWithResults (MQTTClient * c , MQTTPacket_connectData * options , MQTTConnackData * data )
385
404
{
386
405
Timer connect_timer ;
@@ -401,6 +420,7 @@ int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTC
401
420
options = & default_options ; /* set default options if none were supplied */
402
421
403
422
c -> keepAliveInterval = options -> keepAliveInterval ;
423
+ c -> cleansession = options -> cleansession ;
404
424
TimerCountdown (& c -> last_received , c -> keepAliveInterval );
405
425
if ((len = MQTTSerialize_connect (c -> buf , c -> buf_size , options )) <= 0 )
406
426
goto exit ;
@@ -512,8 +532,8 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo
512
532
{
513
533
int count = 0 ;
514
534
unsigned short mypacketid ;
515
- data -> grantedQoS = 0 ;
516
- if (MQTTDeserialize_suback (& mypacketid , 1 , & count , & data -> grantedQoS , c -> readbuf , c -> readbuf_size ) == 1 )
535
+ data -> grantedQoS = QOS0 ;
536
+ if (MQTTDeserialize_suback (& mypacketid , 1 , & count , ( int * ) & data -> grantedQoS , c -> readbuf , c -> readbuf_size ) == 1 )
517
537
{
518
538
if (data -> grantedQoS != 0x80 )
519
539
rc = MQTTSetMessageHandler (c , topicFilter , messageHandler );
@@ -523,6 +543,8 @@ int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qo
523
543
rc = FAILURE ;
524
544
525
545
exit :
546
+ if (rc == FAILURE )
547
+ MQTTCloseSession (c );
526
548
#if defined(MQTT_TASK )
527
549
MutexUnlock (& c -> mutex );
528
550
#endif
@@ -573,6 +595,8 @@ int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
573
595
rc = FAILURE ;
574
596
575
597
exit :
598
+ if (rc == FAILURE )
599
+ MQTTCloseSession (c );
576
600
#if defined(MQTT_TASK )
577
601
MutexUnlock (& c -> mutex );
578
602
#endif
@@ -633,6 +657,8 @@ int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
633
657
}
634
658
635
659
exit :
660
+ if (rc == FAILURE )
661
+ MQTTCloseSession (c );
636
662
#if defined(MQTT_TASK )
637
663
MutexUnlock (& c -> mutex );
638
664
#endif
@@ -652,14 +678,13 @@ int MQTTDisconnect(MQTTClient* c)
652
678
TimerInit (& timer );
653
679
TimerCountdownMS (& timer , c -> command_timeout_ms );
654
680
655
- len = MQTTSerialize_disconnect (c -> buf , c -> buf_size );
681
+ len = MQTTSerialize_disconnect (c -> buf , c -> buf_size );
656
682
if (len > 0 )
657
683
rc = sendPacket (c , len , & timer ); // send the disconnect packet
658
-
659
- c -> isconnected = 0 ;
684
+ MQTTCloseSession (c );
660
685
661
686
#if defined(MQTT_TASK )
662
- MutexUnlock (& c -> mutex );
687
+ MutexUnlock (& c -> mutex );
663
688
#endif
664
689
return rc ;
665
690
}
0 commit comments