@@ -28,6 +28,7 @@ import { KafkaLogger, KafkaParser } from '../helpers';
2828import {
2929 CustomTransportStrategy ,
3030 KafkaOptions ,
31+ MessageHandler ,
3132 OutgoingResponse ,
3233 ReadPacket ,
3334} from '../interfaces' ;
@@ -49,6 +50,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
4950 protected clientId : string ;
5051 protected groupId : string ;
5152
53+ protected registeredPatterns : any [ ] ;
54+
5255 constructor ( protected readonly options : KafkaOptions [ 'options' ] ) {
5356 super ( ) ;
5457
@@ -120,13 +123,12 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
120123 }
121124
122125 public async bindEvents ( consumer : Consumer ) {
123- const registeredPatterns = [ ...this . messageHandlers . keys ( ) ] ;
124126 const consumerSubscribeOptions = this . options . subscribe || { } ;
125127
126- if ( registeredPatterns . length > 0 ) {
128+ if ( this . registeredPatterns . length > 0 ) {
127129 await this . consumer . subscribe ( {
128130 ...consumerSubscribeOptions ,
129- topics : registeredPatterns ,
131+ topics : this . registeredPatterns ,
130132 } ) ;
131133 }
132134
@@ -317,4 +319,14 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
317319 protected initializeDeserializer ( options : KafkaOptions [ 'options' ] ) {
318320 this . deserializer = options ?. deserializer ?? new KafkaRequestDeserializer ( ) ;
319321 }
322+
323+ public addHandler (
324+ pattern : any ,
325+ callback : MessageHandler ,
326+ isEventHandler : boolean = false ,
327+ extras : Record < string , any > = { } ,
328+ ) {
329+ this . registeredPatterns . push ( pattern ) ;
330+ super . addHandler ( pattern , callback , isEventHandler , extras ) ;
331+ }
320332}
0 commit comments