diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index b615c9ade5..94d9efdab9 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -67,6 +67,11 @@ export type IFilterSDK = IReceiver & message: WakuMessage, peerIdStr: PeerIdStr ) => void; + readonly defaultHandleIncomingMessage: ( + pubsubTopic: ContentTopic, + message: WakuMessage, + peerIdStr: PeerIdStr + ) => void; }; export type SubscribeResult = SubscriptionSuccess | SubscriptionError; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index b830c26ecf..2acd1c4fe2 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -39,7 +39,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public activeSubscriptions = new Map(); - public handleIncomingMessage = ( + public readonly defaultHandleIncomingMessage = ( pubsubTopic: PubsubTopic, message: WakuMessage, peerIdStr: PeerIdStr @@ -54,6 +54,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { void subscription.processIncomingMessage(message); }; + public handleIncomingMessage = this.defaultHandleIncomingMessage; + public constructor( connectionManager: ConnectionManager, libp2p: Libp2p, @@ -78,6 +80,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ) => void ): void { this.handleIncomingMessage = handler; + this.protocol.incomingMessageHandler = handler; } /** diff --git a/packages/sdk/src/protocols/message_reliability_monitor.ts b/packages/sdk/src/protocols/message_reliability_monitor.ts index baff2a96eb..ee49cf5cb2 100644 --- a/packages/sdk/src/protocols/message_reliability_monitor.ts +++ b/packages/sdk/src/protocols/message_reliability_monitor.ts @@ -38,10 +38,7 @@ export class MessageReliabilityMonitor { all: new Set(), nodes: {} }; - this.initializeListeners(); - } - private initializeListeners(): void { this.filter.setIncomingMessageHandler(this.handleFilterMessage.bind(this)); } @@ -61,7 +58,7 @@ export class MessageReliabilityMonitor { void this.validatePreviousMessage(); - this.filter.handleIncomingMessage(pubsubTopic, message, peerIdStr); + this.filter.defaultHandleIncomingMessage(pubsubTopic, message, peerIdStr); } private isMessageAlreadyReceived(