From 777f497d136b2e7bcc20279e7d32b03cefa86c1e Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 10 Sep 2024 16:13:12 +0530 Subject: [PATCH] feat: abstract a ReliabilityMonitor over FilterSDK --- packages/core/src/lib/filter/index.ts | 4 +- packages/interfaces/src/filter.ts | 35 ++++- packages/sdk/src/protocols/filter/index.ts | 40 +++-- .../protocols/filter/subscription_manager.ts | 108 +------------ .../protocols/message_reliability_monitor.ts | 147 ++++++++++++++++++ packages/sdk/src/waku.ts | 2 + 6 files changed, 213 insertions(+), 123 deletions(-) create mode 100644 packages/sdk/src/protocols/message_reliability_monitor.ts diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5300718b2b..5d11835c94 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -36,7 +36,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { pubsubTopic: PubsubTopic, wakuMessage: WakuMessage, peerIdStr: string - ) => Promise, + ) => void, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { @@ -291,7 +291,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { return; } - await this.handleIncomingMessage( + this.handleIncomingMessage( pubsubTopic, wakuMessage, connection.remotePeer.toString() diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 67946a02a9..b615c9ade5 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,7 +1,13 @@ -import type { PeerId } from "@libp2p/interface"; +import type { Peer, PeerId } from "@libp2p/interface"; +import { WakuMessage } from "@waku/proto"; import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { ContentTopic, ThisOrThat } from "./misc.js"; +import type { + ContentTopic, + PeerIdStr, + PubsubTopic, + ThisOrThat +} from "./misc.js"; import type { Callback, IBaseProtocolCore, @@ -12,6 +18,11 @@ import type { } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; +export type SubscriptionCallback = { + decoders: IDecoder[]; + callback: Callback; +}; + export type SubscribeOptions = { keepAlive?: number; pingsBeforePeerRenewed?: number; @@ -26,12 +37,11 @@ export interface ISubscriptionSDK { callback: Callback, options?: SubscribeOptions ): Promise; - unsubscribe(contentTopics: ContentTopic[]): Promise; - ping(peerId?: PeerId): Promise; - unsubscribeAll(): Promise; + + renewAndSubscribePeer(peerId: PeerId): Promise; } export type IFilterSDK = IReceiver & @@ -42,6 +52,21 @@ export type IFilterSDK = IReceiver & protocolUseOptions?: ProtocolUseOptions, subscribeOptions?: SubscribeOptions ): Promise; + + activeSubscriptions: Map; + + setIncomingMessageHandler( + handler: ( + pubsubTopic: ContentTopic, + message: WakuMessage, + peerIdStr: PeerIdStr + ) => void + ): void; + handleIncomingMessage: ( + pubsubTopic: ContentTopic, + message: WakuMessage, + peerIdStr: PeerIdStr + ) => void; }; export type SubscribeResult = SubscriptionSuccess | SubscriptionError; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index eb8a517c14..b57dfad394 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -8,6 +8,7 @@ import { type IFilterSDK, type Libp2p, NetworkConfig, + PeerIdStr, type ProtocolCreateOptions, ProtocolError, type ProtocolUseOptions, @@ -16,6 +17,7 @@ import { SubscribeResult, type Unsubscribe } from "@waku/interfaces"; +import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, groupByContentTopic, @@ -34,7 +36,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public readonly protocol: FilterCore; private readonly _connectionManager: ConnectionManager; - private activeSubscriptions = new Map(); + public activeSubscriptions = new Map(); public constructor( connectionManager: ConnectionManager, @@ -43,17 +45,9 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ) { super( new FilterCore( - async (pubsubTopic, wakuMessage, peerIdStr) => { - const subscription = this.getActiveSubscription(pubsubTopic); - if (!subscription) { - log.error( - `No subscription locally registered for topic ${pubsubTopic}` - ); - return; - } + (pubsubTopic, message, peerIdStr) => + this.handleIncomingMessage(pubsubTopic, message, peerIdStr), - await subscription.processIncomingMessage(wakuMessage, peerIdStr); - }, connectionManager.configuredPubsubTopics, libp2p ), @@ -65,6 +59,30 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { this._connectionManager = connectionManager; } + public handleIncomingMessage: ( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr: PeerIdStr + ) => void = (pubsubTopic, message) => { + const subscription = this.getActiveSubscription(pubsubTopic); + if (!subscription) { + log.error(`No subscription locally registered for topic ${pubsubTopic}`); + return; + } + + void subscription.processIncomingMessage(message); + }; + + public setIncomingMessageHandler( + handler: ( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr: string + ) => void + ): void { + this.handleIncomingMessage = handler; + } + /** * Opens a subscription with the Filter protocol using the provided decoders and callback. * This method combines the functionality of creating a subscription and subscribing to it. diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 13c673854a..91fc11100b 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -9,24 +9,15 @@ import { IDecoder, IProtoMessage, ISubscriptionSDK, - PeerIdStr, ProtocolError, PubsubTopic, SDKProtocolResult, SubscribeOptions, SubscriptionCallback } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; -type ReceivedMessageHashes = { - all: Set; - nodes: { - [peerId: PeerIdStr]: Set; - }; -}; - export const DEFAULT_MAX_PINGS = 2; export const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; export const DEFAULT_KEEP_ALIVE = 60 * 1000; @@ -38,48 +29,21 @@ export class SubscriptionManager implements ISubscriptionSDK { ContentTopic, SubscriptionCallback > = new Map(); - private readonly receivedMessagesHashStr: string[] = []; private peerFailures: Map = new Map(); - private readonly receivedMessagesHashes: ReceivedMessageHashes; - private missedMessagesByPeer: Map = new Map(); private keepAliveInterval: number = DEFAULT_KEEP_ALIVE; private maxPingFailures: number = DEFAULT_MAX_PINGS; - private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; private keepAliveTimer: number | null = null; public constructor( - private readonly pubsubTopic: PubsubTopic, + public readonly pubsubTopic: PubsubTopic, private readonly protocol: FilterCore, private readonly connectionManager: ConnectionManager, private readonly getPeers: () => Peer[], private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { this.pubsubTopic = pubsubTopic; - - const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); - this.receivedMessagesHashes = { - all: new Set(), - nodes: { - ...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()])) - } - }; - allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); - } - - private addHash(hash: string, peerIdStr?: string): void { - this.receivedMessagesHashes.all.add(hash); - - if (!peerIdStr) { - return; - } - - if (!this.receivedMessagesHashes.nodes[peerIdStr]) { - this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); - } - - this.receivedMessagesHashes.nodes[peerIdStr].add(hash); } public async subscribe( @@ -89,9 +53,6 @@ export class SubscriptionManager implements ISubscriptionSDK { ): Promise { this.keepAliveInterval = options.keepAlive || DEFAULT_KEEP_ALIVE; this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; - this.maxMissedMessagesThreshold = - options.maxMissedMessagesThreshold || - DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -193,55 +154,7 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - private async validateMessage(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } - } - - public async processIncomingMessage( - message: WakuMessage, - peerIdStr: PeerIdStr - ): Promise { - const hashedMessageStr = messageHashStr( - this.pubsubTopic, - message as IProtoMessage - ); - - this.addHash(hashedMessageStr, peerIdStr); - void this.validateMessage(); - - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { - log.info("Message already received, skipping"); - return; - } - this.receivedMessagesHashStr.push(hashedMessageStr); - + public async processIncomingMessage(message: WakuMessage): Promise { const { contentTopic } = message; const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); if (!subscriptionCallback) { @@ -328,7 +241,7 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async renewAndSubscribePeer( + public async renewAndSubscribePeer( peerId: PeerId ): Promise { try { @@ -339,17 +252,12 @@ export class SubscriptionManager implements ISubscriptionSDK { Array.from(this.subscriptionCallbacks.keys()) ); - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - return newPeer; } catch (error) { log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); return; } finally { this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; } } @@ -426,16 +334,6 @@ export class SubscriptionManager implements ISubscriptionSDK { this.startKeepAlivePings(this.keepAliveInterval); } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } async function pushMessage( diff --git a/packages/sdk/src/protocols/message_reliability_monitor.ts b/packages/sdk/src/protocols/message_reliability_monitor.ts new file mode 100644 index 0000000000..baff2a96eb --- /dev/null +++ b/packages/sdk/src/protocols/message_reliability_monitor.ts @@ -0,0 +1,147 @@ +import { + IFilterSDK, + IProtoMessage, + ISubscriptionSDK, + PeerIdStr, + PubsubTopic +} from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; +import { WakuMessage } from "@waku/proto"; +import { Logger } from "@waku/utils"; + +const log = new Logger("sdk:message_reliability_monitor"); + +const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; + +export class MessageReliabilityManager { + public constructor(private filter: IFilterSDK) { + this.filter.activeSubscriptions.forEach((subscription) => { + new MessageReliabilityMonitor(this.filter, subscription); + }); + } +} + +export class MessageReliabilityMonitor { + private receivedMessagesHashStr: string[] = []; + private receivedMessagesHashes: { + all: Set; + nodes: Record>; + }; + private missedMessagesByPeer: Map = new Map(); + private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + + public constructor( + private filter: IFilterSDK, + private subscription: ISubscriptionSDK + ) { + this.receivedMessagesHashes = { + all: new Set(), + nodes: {} + }; + this.initializeListeners(); + } + + private initializeListeners(): void { + this.filter.setIncomingMessageHandler(this.handleFilterMessage.bind(this)); + } + + private handleFilterMessage( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr: string + ): void { + const isReceived = this.isMessageAlreadyReceived( + pubsubTopic, + message, + peerIdStr + ); + if (isReceived) { + return; + } + + void this.validatePreviousMessage(); + + this.filter.handleIncomingMessage(pubsubTopic, message, peerIdStr); + } + + private isMessageAlreadyReceived( + pubsubTopic: PubsubTopic, + message: WakuMessage, + peerIdStr?: string + ): boolean { + const hashedMessageStr = messageHashStr( + pubsubTopic, + message as IProtoMessage + ); + + this.receivedMessagesHashes.all.add(hashedMessageStr); + + if (peerIdStr) { + if (!this.receivedMessagesHashes.nodes[peerIdStr]) { + this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); + } + this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr); + } + + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + return true; + } else { + this.receivedMessagesHashStr.push(hashedMessageStr); + return false; + } + } + + private async validatePreviousMessage(): Promise { + if (this.receivedMessagesHashStr.length < 2) { + return; // Not enough messages to validate + } + + const previousMessageHash = + this.receivedMessagesHashStr[this.receivedMessagesHashStr.length - 2]; + + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(previousMessageHash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info(`Peer ${peerIdStr} has missed too many messages, renewing.`); + await this.renewPeer(peerIdStr); + } + } + } + } + + private async renewPeer(peerIdStr: PeerIdStr): Promise { + try { + const peers = await this.filter.protocol.peerStore.all(); + const peerId = peers.find((p) => p.id.toString() === peerIdStr)?.id; + if (!peerId) { + log.error(`Peer ${peerIdStr} not found in peer store`); + return; + } + + await this.subscription.renewAndSubscribePeer(peerId); + + this.missedMessagesByPeer.delete(peerIdStr); + this.receivedMessagesHashes.nodes[peerIdStr] = new Set(); + log.info(`Successfully renewed peer ${peerIdStr}`); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}`, error); + } + } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } + + public setMaxMissedMessagesThreshold(value: number): void { + this.maxMissedMessagesThreshold = value; + } +} diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index d63aa8fc34..1aa3cf1f18 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -18,6 +18,7 @@ import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter/index.js"; import { wakuLightPush } from "./protocols/light_push.js"; +import { MessageReliabilityMonitor } from "./protocols/message_reliability_monitor.js"; import { wakuStore } from "./protocols/store.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; @@ -115,6 +116,7 @@ export class WakuNode implements Waku { if (protocolsEnabled.filter) { const filter = wakuFilter(this.connectionManager); this.filter = filter(libp2p); + new MessageReliabilityMonitor(this.filter); } log.info(