Skip to content

Commit a750522

Browse files
author
Halimjon Juraev
committed
Fixed nats request subscription leaks
1 parent 2338ff1 commit a750522

File tree

4 files changed

+34
-21
lines changed

4 files changed

+34
-21
lines changed

Sources/Nats/Handlers/NatsHandler.swift

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,6 @@ public final class NatsHandler: ChannelInboundHandler {
237237
numberOfResponses = "\(Proto.UNSUB.rawValue) \(reply.uuidString) \(count.count)\r\n"
238238
case .single:
239239
numberOfResponses = "\(Proto.UNSUB.rawValue) \(reply.uuidString) \(1)\r\n"
240-
case .unlimited:
241-
numberOfResponses = "\(Proto.UNSUB.rawValue) \(reply.uuidString)\r\n"
242240
}
243241

244242
let request = "\(Proto.SUB.rawValue) \(reply.uuidString) \(reply.uuidString)\r\n\(numberOfResponses)".data(using: .utf8)!
@@ -347,8 +345,8 @@ public final class NatsHandler: ChannelInboundHandler {
347345
return self.write(ctx: ctx, data: sub.sub())
348346
}
349347

350-
351-
public func unsubscribe(_ subject: String, max: UInt32 = 0) -> EventLoopFuture<Void>{
348+
@discardableResult
349+
public func unsubscribe(_ subject: String, max: Int = 0) -> EventLoopFuture<Void>{
352350
guard let ctx = self.ctx else {
353351
return container.eventLoop.newFailedFuture(error: NatsRequestError.coundNotFindChannelContextToUse)
354352
}
@@ -364,13 +362,35 @@ public final class NatsHandler: ChannelInboundHandler {
364362

365363
let promise = ctx.eventLoop.newPromise(of: NatsMessage.self)
366364

367-
let schedule = ctx.eventLoop.scheduleTask(in: .seconds(timeout), {
368-
self.subscriptions.removeValue(forKey: uuid)
365+
let schedule = ctx.eventLoop.scheduleTask(in: .seconds(timeout), { [weak self] in
366+
guard let StrongSelf = self, let callback = StrongSelf.subscriptions[uuid] else {return}
367+
self?.subscriptions.removeValue(forKey: uuid)
368+
switch callback.callback {
369+
case .REQ(let request):
370+
switch request.numberOfResponse {
371+
case .multiple(_):
372+
StrongSelf.unsubscribe(callback.subject)
373+
case .single:
374+
return
375+
}
376+
break
377+
default:
378+
break
379+
}
369380
let error = NatsGeneralError(identifier: "NATS TIMEOUT", reason: "TIMEOUT SUBJECT: \(subject)")
370381
promise.fail(error: error)
371382
})
372383
var finalData: Data = Data()
373-
let sub = "\(Proto.SUB.rawValue) \(uuid.uuidString) \(uuid.uuidString)\r\n\(Proto.UNSUB.rawValue) \(uuid.uuidString) \(1)\r\n".data(using: .utf8)!
384+
385+
var sub: Data
386+
387+
switch numberOfResponse {
388+
case .multiple(let count):
389+
sub = "\(Proto.SUB.rawValue) \(uuid.uuidString) \(uuid.uuidString)\r\n\(Proto.UNSUB.rawValue) \(uuid.uuidString) \(count.count)\r\n".data(using: .utf8)!
390+
case .single:
391+
sub = "\(Proto.SUB.rawValue) \(uuid.uuidString) \(uuid.uuidString)\r\n\(Proto.UNSUB.rawValue) \(uuid.uuidString) \(1)\r\n".data(using: .utf8)!
392+
}
393+
374394
let request = NatsRequest(promise: promise, scheduler: schedule, numberOfResponse: numberOfResponse)
375395
let natsCallback = NatsCallbacks(id: uuid, subject: subject, queueGroup: "", callback: .REQ(request))
376396
subscriptions.updateValue(natsCallback, forKey: uuid)
@@ -429,17 +449,17 @@ public final class NatsHandler: ChannelInboundHandler {
429449
callback(newMessage)
430450
case .REQ(let promise):
431451
let newMessage = NatsMessage(msg: message, container: container, ctx: ctx)
432-
promise.scheduler?.cancel()
433452

434453
switch promise.numberOfResponse {
435454
case .single:
455+
promise.scheduler?.cancel()
436456
subscriptions.removeValue(forKey: message.headers.sid)
437457
case .multiple(let count):
438458
if !count.counter() {
439459
subscriptions.removeValue(forKey: message.headers.sid)
460+
promise.scheduler?.cancel()
440461
}
441-
case .unlimited:
442-
break
462+
443463
}
444464
promise.promise.succeed(result: newMessage)
445465
case .pubAck(let proto):

Sources/Nats/Nats.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public final class NATS: Service {
222222
return handler.subscribe(subject, queueGroup: queueGroup, callback: callback)
223223
}
224224
@discardableResult
225-
public func unsubscribe(_ subject: String, max: UInt32 = 0) -> EventLoopFuture<Void> {
225+
public func unsubscribe(_ subject: String, max: Int = 0) -> EventLoopFuture<Void> {
226226
guard let handler = handlerCacher.currentValue else {
227227
fatalError("Internal Error, Channel handler not found for this thread")
228228
}

Sources/Nats/NatsMessage/Message.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public final class NatsMessage: ContainerAlias, DatabaseConnectable, CustomStri
5959

6060

6161
@discardableResult
62-
public func unsubscribe(_ subject: String, max: UInt32 = 0) -> EventLoopFuture<Void> {
62+
public func unsubscribe(_ subject: String, max: Int = 0) -> EventLoopFuture<Void> {
6363
guard let handler = ctx.handler as? NatsHandler else {
6464
let error = NatsGeneralError(identifier: "NATS Handler error", reason: "More likely incorrect thread")
6565
return ctx.eventLoop.newFailedFuture(error: error)

Sources/Nats/NatsMessage/NatProtocol.swift

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public class NatsRequest {
5454
public enum NumberOfResponse {
5555
case single
5656
case multiple(MultipleResponses)
57-
case unlimited
5857
}
5958

6059
public class MultipleResponses {
@@ -105,14 +104,8 @@ public class NatsCallbacks {
105104
return "\(Proto.SUB.rawValue) \(subject) \(group())\(id.uuidString)\r\n".data(using: .utf8) ?? Data()
106105
}
107106

108-
public func unsub(_ max: UInt32) -> Data {
109-
let wait: () -> String = {
110-
if max > 0 {
111-
return " \(max)"
112-
}
113-
return ""
114-
}
115-
return "\(Proto.UNSUB.rawValue) \(id)\(wait())\r\n".data(using: .utf8) ?? Data()
107+
public func unsub(_ max: Int) -> Data {
108+
return "\(Proto.UNSUB.rawValue) \(id)\(max)\r\n".data(using: .utf8) ?? Data()
116109
}
117110

118111
func counter() {

0 commit comments

Comments
 (0)