diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5d11835c94..6127e2b74b 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -31,12 +31,13 @@ export const FilterCodecs = { }; export class FilterCore extends BaseProtocol implements IBaseProtocolCore { + private handleIncomingMessage: ( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerId: string + ) => void = () => {}; + public constructor( - private handleIncomingMessage: ( - pubsubTopic: PubsubTopic, - wakuMessage: WakuMessage, - peerIdStr: string - ) => void, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { @@ -51,6 +52,16 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { }); } + public set incomingMessageHandler( + handler: ( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerId: string + ) => void + ) { + this.handleIncomingMessage = handler; + } + public async subscribe( pubsubTopic: PubsubTopic, peer: Peer, diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 3e34b6fc59..b830c26ecf 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -39,32 +39,12 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public activeSubscriptions = new Map(); - public constructor( - connectionManager: ConnectionManager, - libp2p: Libp2p, - options?: ProtocolCreateOptions - ) { - super( - new FilterCore( - (pubsubTopic, message, peerIdStr) => - this.handleIncomingMessage(pubsubTopic, message, peerIdStr), - - connectionManager.configuredPubsubTopics, - libp2p - ), - connectionManager, - { numPeersToUse: options?.numPeersToUse } - ); - - this.protocol = this.core as FilterCore; - this._connectionManager = connectionManager; - } - - public handleIncomingMessage: ( + public handleIncomingMessage = ( pubsubTopic: PubsubTopic, message: WakuMessage, peerIdStr: PeerIdStr - ) => void = (pubsubTopic, message) => { + ): void => { + log.info(`Received message from ${peerIdStr} on topic ${pubsubTopic}`); const subscription = this.getActiveSubscription(pubsubTopic); if (!subscription) { log.error(`No subscription locally registered for topic ${pubsubTopic}`); @@ -74,6 +54,22 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { void subscription.processIncomingMessage(message); }; + public constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { + super( + new FilterCore(connectionManager.configuredPubsubTopics, libp2p), + connectionManager, + { numPeersToUse: options?.numPeersToUse } + ); + + this.protocol = this.core as FilterCore; + this.protocol.incomingMessageHandler = this.handleIncomingMessage; + this._connectionManager = connectionManager; + } + public setIncomingMessageHandler( handler: ( pubsubTopic: PubsubTopic,