From 0fe48693c2e222fe773ad8bac43688798352c6e1 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 13 Sep 2024 15:20:31 +0530 Subject: [PATCH] chore: restructure reliabiltiy monitors --- packages/sdk/src/index.ts | 4 +- .../protocols/filter/subscription_manager.ts | 5 +- .../{light_push.ts => lightpush/index.ts} | 2 +- .../protocols/{store.ts => store/index.ts} | 2 +- packages/sdk/src/reliability_monitor/index.ts | 55 +++++++++++++++++++ .../receiver.ts} | 47 ---------------- packages/sdk/src/waku.ts | 6 +- 7 files changed, 65 insertions(+), 56 deletions(-) rename packages/sdk/src/protocols/{light_push.ts => lightpush/index.ts} (98%) rename packages/sdk/src/protocols/{store.ts => store/index.ts} (99%) create mode 100644 packages/sdk/src/reliability_monitor/index.ts rename packages/sdk/src/{protocols/filter/reliability_monitor.ts => reliability_monitor/receiver.ts} (82%) diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 3b0aa54dda..2cbcfce937 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -14,9 +14,9 @@ export { defaultLibp2p, createLibp2pAndUpdateOptions } from "./create/index.js"; -export { wakuLightPush } from "./protocols/light_push.js"; +export { wakuLightPush } from "./protocols/lightpush/index.js"; export { wakuFilter } from "./protocols/filter/index.js"; -export { wakuStore } from "./protocols/store.js"; +export { wakuStore } from "./protocols/store/index.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 6d3d50f13b..70fccae33f 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -20,11 +20,12 @@ import { import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; -import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; import { ReceiverReliabilityMonitor, ReliabilityMonitorManager -} from "./reliability_monitor.js"; +} from "../../reliability_monitor/receiver.js"; + +import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; const log = new Logger("sdk:filter:subscription_manager"); diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/lightpush/index.ts similarity index 98% rename from packages/sdk/src/protocols/light_push.ts rename to packages/sdk/src/protocols/lightpush/index.ts index 4b303118bd..2cfdadde01 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/lightpush/index.ts @@ -13,7 +13,7 @@ import { } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; +import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); diff --git a/packages/sdk/src/protocols/store.ts b/packages/sdk/src/protocols/store/index.ts similarity index 99% rename from packages/sdk/src/protocols/store.ts rename to packages/sdk/src/protocols/store/index.ts index c851fbefe4..3aba99b87c 100644 --- a/packages/sdk/src/protocols/store.ts +++ b/packages/sdk/src/protocols/store/index.ts @@ -10,7 +10,7 @@ import { import { messageHash } from "@waku/message-hash"; import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; +import { BaseProtocolSDK } from "../base_protocol.js"; const DEFAULT_NUM_PEERS = 1; diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts new file mode 100644 index 0000000000..936a54700f --- /dev/null +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -0,0 +1,55 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { + ContentTopic, + CoreProtocolResult, + PubsubTopic +} from "@waku/interfaces"; + +import { ReceiverReliabilityMonitor } from "./receiver.js"; + +export class ReliabilityMonitorManager { + private static receiverMonitors: Map< + PubsubTopic, + ReceiverReliabilityMonitor + > = new Map(); + + public static createReceiverMonitor( + pubsubTopic: PubsubTopic, + getPeers: () => Peer[], + renewPeer: (peerId: PeerId) => Promise, + getContentTopics: () => ContentTopic[], + protocolSubscribe: ( + pubsubTopic: PubsubTopic, + peer: Peer, + contentTopics: ContentTopic[] + ) => Promise + ): ReceiverReliabilityMonitor { + if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { + return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; + } + + const monitor = new ReceiverReliabilityMonitor( + pubsubTopic, + getPeers, + renewPeer, + getContentTopics, + protocolSubscribe + ); + ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); + return monitor; + } + + private constructor() {} + + public static destroy(pubsubTopic: PubsubTopic): void { + this.receiverMonitors.delete(pubsubTopic); + } + + public static destroyAll(): void { + for (const [pubsubTopic, monitor] of this.receiverMonitors) { + monitor.setMaxMissedMessagesThreshold(undefined); + monitor.setMaxPingFailures(undefined); + this.receiverMonitors.delete(pubsubTopic); + } + } +} diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/reliability_monitor/receiver.ts similarity index 82% rename from packages/sdk/src/protocols/filter/reliability_monitor.ts rename to packages/sdk/src/reliability_monitor/receiver.ts index 84d386f716..440e35829c 100644 --- a/packages/sdk/src/protocols/filter/reliability_monitor.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -21,53 +21,6 @@ const log = new Logger("sdk:receiver:reliability_monitor"); const DEFAULT_MAX_PINGS = 3; -export class ReliabilityMonitorManager { - private static receiverMonitors: Map< - PubsubTopic, - ReceiverReliabilityMonitor - > = new Map(); - - public static createReceiverMonitor( - pubsubTopic: PubsubTopic, - getPeers: () => Peer[], - renewPeer: (peerId: PeerId) => Promise, - getContentTopics: () => ContentTopic[], - protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise - ): ReceiverReliabilityMonitor { - if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { - return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; - } - - const monitor = new ReceiverReliabilityMonitor( - pubsubTopic, - getPeers, - renewPeer, - getContentTopics, - protocolSubscribe - ); - ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); - return monitor; - } - - private constructor() {} - - public static destroy(pubsubTopic: PubsubTopic): void { - this.receiverMonitors.delete(pubsubTopic); - } - - public static destroyAll(): void { - for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxMissedMessagesThreshold(undefined); - monitor.setMaxPingFailures(undefined); - this.receiverMonitors.delete(pubsubTopic); - } - } -} - export class ReceiverReliabilityMonitor { private receivedMessagesHashes: ReceivedMessageHashes; private missedMessagesByPeer: Map = new Map(); diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index 57c9c16d2a..2ed722ac1c 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter/index.js"; -import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js"; -import { wakuLightPush } from "./protocols/light_push.js"; -import { wakuStore } from "./protocols/store.js"; +import { wakuLightPush } from "./protocols/lightpush/index.js"; +import { wakuStore } from "./protocols/store/index.js"; +import { ReliabilityMonitorManager } from "./reliability_monitor/index.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60;