From 6e0cddf09625b1d0ae9e735c6cbf697ea0e1cbad Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 11 Sep 2024 13:51:47 +0530 Subject: [PATCH] chore: restructure to use `Receiver` based structuring --- packages/interfaces/src/filter.ts | 2 ++ packages/sdk/src/protocols/filter/index.ts | 26 +++++++++---------- .../protocols/message_reliability_monitor.ts | 24 ++++++++++++----- packages/sdk/src/waku.ts | 2 -- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 94d9efdab9..98a7121466 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -32,6 +32,8 @@ export type SubscribeOptions = { export type IFilter = IReceiver & IBaseProtocolCore; export interface ISubscriptionSDK { + readonly pubsubTopic: PubsubTopic; + subscribe( decoders: IDecoder | IDecoder[], callback: Callback, diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index ea05225807..1ff28ad714 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -27,7 +27,7 @@ import { } from "@waku/utils"; import { BaseProtocolSDK } from "../base_protocol.js"; -import { MessageReliabilityMonitor } from "../message_reliability_monitor.js"; +import { ReceiverReliabilityMonitor } from "../message_reliability_monitor.js"; import { SubscriptionManager } from "./subscription_manager.js"; @@ -72,17 +72,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { this._connectionManager = connectionManager; } - public setIncomingMessageHandler( - handler: ( - pubsubTopic: PubsubTopic, - message: WakuMessage, - peerIdStr: string - ) => void - ): void { - this.handleIncomingMessage = handler; - this.protocol.incomingMessageHandler = handler; - } - /** * Opens a subscription with the Filter protocol using the provided decoders and callback. * This method combines the functionality of creating a subscription and subscribing to it. @@ -216,7 +205,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ) ); - new MessageReliabilityMonitor(this, subscription); + new ReceiverReliabilityMonitor(this, subscription); } return { @@ -286,6 +275,17 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { return toAsyncIterator(this, decoders); } + public setIncomingMessageHandler( + handler: ( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr: string + ) => void + ): void { + this.handleIncomingMessage = handler; + this.protocol.incomingMessageHandler = handler; + } + //TODO: move to SubscriptionManager private getActiveSubscription( pubsubTopic: PubsubTopic diff --git a/packages/sdk/src/protocols/message_reliability_monitor.ts b/packages/sdk/src/protocols/message_reliability_monitor.ts index ee49cf5cb2..5bc7716a53 100644 --- a/packages/sdk/src/protocols/message_reliability_monitor.ts +++ b/packages/sdk/src/protocols/message_reliability_monitor.ts @@ -13,15 +13,14 @@ const log = new Logger("sdk:message_reliability_monitor"); const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; -export class MessageReliabilityManager { - public constructor(private filter: IFilterSDK) { - this.filter.activeSubscriptions.forEach((subscription) => { - new MessageReliabilityMonitor(this.filter, subscription); - }); - } +export class MessageReliabilityTracker { + public static receiverMonitor: Map = + new Map(); + + public constructor() {} } -export class MessageReliabilityMonitor { +export class ReceiverReliabilityMonitor { private receivedMessagesHashStr: string[] = []; private receivedMessagesHashes: { all: Set; @@ -34,6 +33,11 @@ export class MessageReliabilityMonitor { private filter: IFilterSDK, private subscription: ISubscriptionSDK ) { + MessageReliabilityTracker.receiverMonitor.set( + this.subscription.pubsubTopic, + this + ); + this.receivedMessagesHashes = { all: new Set(), nodes: {} @@ -42,6 +46,12 @@ export class MessageReliabilityMonitor { this.filter.setIncomingMessageHandler(this.handleFilterMessage.bind(this)); } + public destructor(): void { + MessageReliabilityTracker.receiverMonitor.delete( + this.subscription.pubsubTopic + ); + } + private handleFilterMessage( pubsubTopic: PubsubTopic, message: WakuMessage, diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index de5452cf8e..d63aa8fc34 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -18,7 +18,6 @@ import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter/index.js"; import { wakuLightPush } from "./protocols/light_push.js"; -import { MessageReliabilityManager } from "./protocols/message_reliability_monitor.js"; import { wakuStore } from "./protocols/store.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; @@ -116,7 +115,6 @@ export class WakuNode implements Waku { if (protocolsEnabled.filter) { const filter = wakuFilter(this.connectionManager); this.filter = filter(libp2p); - new MessageReliabilityManager(this.filter); } log.info(