diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5300718b2b..fdc19698a2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -37,6 +37,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { wakuMessage: WakuMessage, peerIdStr: string ) => Promise, + private handleError: (error: Error) => Promise, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { @@ -301,8 +302,18 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { () => { log.info("Receiving pipe closed."); }, - (e) => { - log.error("Error with receiving pipe", e); + async (e) => { + log.error( + "Error with receiving pipe", + e, + " -- ", + "on peer ", + connection.remotePeer.toString(), + " -- ", + "stream ", + stream + ); + await this.handleError(e); } ); } catch (e) { diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index ad48298bbd..67a1d31c9c 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -54,6 +54,9 @@ class Filter extends BaseProtocolSDK implements IFilter { await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, + async (error: Error) => { + log.error("Error with receiving pipe", error); + }, connectionManager.configuredPubsubTopics, libp2p ),