From 4f4b91a5a161746fae3d5c11442c19a6aa1936da Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 13:16:07 +0530 Subject: [PATCH] feat: introduce ReliabilityMonitor --- .../protocols/filter/reliability_monitor.ts | 114 ++++++++++++++++++ .../protocols/filter/subscription_manager.ts | 112 ++++------------- 2 files changed, 139 insertions(+), 87 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/reliability_monitor.ts diff --git a/packages/sdk/src/protocols/filter/reliability_monitor.ts b/packages/sdk/src/protocols/filter/reliability_monitor.ts new file mode 100644 index 0000000000..557e79c440 --- /dev/null +++ b/packages/sdk/src/protocols/filter/reliability_monitor.ts @@ -0,0 +1,114 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { IProtoMessage, PeerIdStr, PubsubTopic } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; +import { WakuMessage } from "@waku/proto"; +import { Logger } from "@waku/utils"; + +type ReceivedMessageHashes = { + all: Set; + nodes: { + [peerId: PeerIdStr]: Set; + }; +}; + +const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; + +const log = new Logger("sdk:filter:reliability_monitor"); + +export class ReliabilityMonitor { + public receivedMessagesHashStr: string[] = []; + public receivedMessagesHashes: ReceivedMessageHashes; + public missedMessagesByPeer: Map = new Map(); + public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + + public constructor( + private getPeers: () => Peer[], + private renewAndSubscribePeer: (peerId: PeerId) => Promise + ) { + const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); + + this.receivedMessagesHashes = { + all: new Set(), + nodes: { + ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) + } + }; + allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); + } + + public setMaxMissedMessagesThreshold(value: number | undefined): void { + if (value === undefined) { + return; + } + this.maxMissedMessagesThreshold = value; + } + + public get messageHashes(): string[] { + return [...this.receivedMessagesHashes.all]; + } + + public addMessage( + message: WakuMessage, + pubsubTopic: PubsubTopic, + peerIdStr?: string + ): boolean { + const hashedMessageStr = messageHashStr( + pubsubTopic, + message as IProtoMessage + ); + + this.receivedMessagesHashes.all.add(hashedMessageStr); + + if (peerIdStr) { + this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); + } + + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + return false; + } else { + this.receivedMessagesHashStr.push(hashedMessageStr); + return true; + } + } + + public async validateMessage(): Promise { + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + continue; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + } + } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } +} diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index b8e814d1e8..909c4604e5 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -16,31 +16,21 @@ import { type SubscribeOptions, SubscriptionCallback } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; -type ReceivedMessageHashes = { - all: Set; - nodes: { - [peerId: PeerIdStr]: Set; - }; -}; +import { ReliabilityMonitor } from "./reliability_monitor.js"; const DEFAULT_MAX_PINGS = 3; -const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscriptionSDK { - private readonly receivedMessagesHashStr: string[] = []; private keepAliveTimer: number | null = null; - private readonly receivedMessagesHashes: ReceivedMessageHashes; private peerFailures: Map = new Map(); - private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; - private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + private reliabilityMonitor: ReliabilityMonitor; private subscriptionCallbacks: Map< ContentTopic, @@ -55,26 +45,10 @@ export class SubscriptionManager implements ISubscriptionSDK { ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); - const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); - this.receivedMessagesHashes = { - all: new Set(), - nodes: { - ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) - } - }; - allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - } - - public get messageHashes(): string[] { - return [...this.receivedMessagesHashes.all]; - } - - private addHash(hash: string, peerIdStr?: string): void { - this.receivedMessagesHashes.all.add(hash); - - if (peerIdStr) { - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); - } + this.reliabilityMonitor = new ReliabilityMonitor( + getPeers.bind(this), + this.renewAndSubscribePeer.bind(this) + ); } public async subscribe( @@ -84,9 +58,9 @@ export class SubscriptionManager implements ISubscriptionSDK { ): Promise { this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; - this.maxMissedMessagesThreshold = - options.maxMissedMessagesThreshold || - DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + this.reliabilityMonitor.setMaxMissedMessagesThreshold( + options.maxMissedMessagesThreshold + ); const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -194,54 +168,21 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - private async validateMessage(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } - } - public async processIncomingMessage( message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const hashedMessageStr = messageHashStr( + const includesMessage = this.reliabilityMonitor.addMessage( + message, this.pubsubTopic, - message as IProtoMessage + peerIdStr ); + void this.reliabilityMonitor.validateMessage(); - this.addHash(hashedMessageStr, peerIdStr); - void this.validateMessage(); - - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + if (includesMessage) { log.info("Message already received, skipping"); return; } - this.receivedMessagesHashStr.push(hashedMessageStr); const { contentTopic } = message; const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); @@ -340,8 +281,13 @@ export class SubscriptionManager implements ISubscriptionSDK { Array.from(this.subscriptionCallbacks.keys()) ); - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + this.reliabilityMonitor.receivedMessagesHashes.nodes[ + newPeer.id.toString() + ] = new Set(); + this.reliabilityMonitor.missedMessagesByPeer.set( + newPeer.id.toString(), + 0 + ); return newPeer; } catch (error) { @@ -349,8 +295,10 @@ export class SubscriptionManager implements ISubscriptionSDK { return; } finally { this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; + this.reliabilityMonitor.missedMessagesByPeer.delete(peerId.toString()); + delete this.reliabilityMonitor.receivedMessagesHashes.nodes[ + peerId.toString() + ]; } } @@ -378,16 +326,6 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } async function pushMessage(