From fa7ab2372521832950bad0a492b1428837cfd874 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 28 Aug 2024 12:07:57 +0530 Subject: [PATCH] chore: split SubscriptionManager into ReliabilityMonitor --- packages/sdk/src/protocols/filter.ts | 214 +++++++++++++-------------- 1 file changed, 106 insertions(+), 108 deletions(-) diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 18f1dc7521..0b5482b73c 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -57,28 +57,18 @@ const DEFAULT_KEEP_ALIVE = 30 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE }; -export class SubscriptionManager implements ISubscriptionSDK { - private readonly receivedMessagesHashStr: string[] = []; - private keepAliveTimer: number | null = null; - private readonly receivedMessagesHashes: ReceivedMessageHashes; + +class ReliabilityMonitor { + private receivedMessagesHashes: ReceivedMessageHashes; private peerFailures: Map = new Map(); private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; - private subscriptionCallbacks: Map< - ContentTopic, - SubscriptionCallback - >; - public constructor( - private readonly pubsubTopic: PubsubTopic, - private protocol: FilterCore, private getPeers: () => Peer[], private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { - this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); this.receivedMessagesHashes = { all: new Set(), @@ -89,11 +79,7 @@ export class SubscriptionManager implements ISubscriptionSDK { allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); } - public get messageHashes(): string[] { - return [...this.receivedMessagesHashes.all]; - } - - private addHash(hash: string, peerIdStr?: string): void { + public addHash(hash: string, peerIdStr?: string): void { this.receivedMessagesHashes.all.add(hash); if (peerIdStr) { @@ -101,16 +87,110 @@ export class SubscriptionManager implements ISubscriptionSDK { } } + public async validateMessage(): Promise { + for (const hash of this.receivedMessagesHashes.all) { + for (const [peerIdStr, hashes] of Object.entries( + this.receivedMessagesHashes.nodes + )) { + if (!hashes.has(hash)) { + this.incrementMissedMessageCount(peerIdStr); + if (this.shouldRenewPeer(peerIdStr)) { + log.info( + `Peer ${peerIdStr} has missed too many messages, renewing.` + ); + const peerId = this.getPeers().find( + (p) => p.id.toString() === peerIdStr + )?.id; + if (!peerId) { + log.error( + `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` + ); + continue; + } + try { + await this.renewAndSubscribePeer(peerId); + } catch (error) { + log.error(`Failed to renew peer ${peerIdStr}: ${error}`); + } + } + } + } + } + } + + public async handlePeerFailure(peerId: PeerId): Promise { + const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; + this.peerFailures.set(peerId.toString(), failures); + + if (failures > this.maxPingFailures) { + try { + await this.renewAndSubscribePeer(peerId); + this.peerFailures.delete(peerId.toString()); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); + } + } + } + + private async renewAndSubscribePeer( + peerId: PeerId + ): Promise { + try { + const newPeer = await this.renewPeer(peerId); + this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); + this.missedMessagesByPeer.set(newPeer.id.toString(), 0); + + return newPeer; + } catch (error) { + log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + return; + } finally { + this.peerFailures.delete(peerId.toString()); + this.missedMessagesByPeer.delete(peerId.toString()); + delete this.receivedMessagesHashes.nodes[peerId.toString()]; + } + } + + private incrementMissedMessageCount(peerIdStr: string): void { + const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; + this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); + } + + private shouldRenewPeer(peerIdStr: string): boolean { + const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; + return missedMessages > this.maxMissedMessagesThreshold; + } +} + +export class SubscriptionManager implements ISubscriptionSDK { + private readonly receivedMessagesHashStr: string[] = []; + private keepAliveTimer: number | null = null; + private subscriptionCallbacks: Map< + ContentTopic, + SubscriptionCallback + >; + private reliabilityMonitor: ReliabilityMonitor; + + public constructor( + private readonly pubsubTopic: PubsubTopic, + private protocol: FilterCore, + private getPeers: () => Peer[], + renewPeer: (peerToDisconnect: PeerId) => Promise + ) { + this.pubsubTopic = pubsubTopic; + this.subscriptionCallbacks = new Map(); + this.reliabilityMonitor = new ReliabilityMonitor( + getPeers.bind(this), + renewPeer.bind(this) + ); + } + public async subscribe( decoders: IDecoder | IDecoder[], callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE; - this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS; - this.maxMissedMessagesThreshold = - options.maxMissedMessagesThreshold || - DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -218,37 +298,6 @@ export class SubscriptionManager implements ISubscriptionSDK { return finalResult; } - private async validateMessage(): Promise { - for (const hash of this.receivedMessagesHashes.all) { - for (const [peerIdStr, hashes] of Object.entries( - this.receivedMessagesHashes.nodes - )) { - if (!hashes.has(hash)) { - this.incrementMissedMessageCount(peerIdStr); - if (this.shouldRenewPeer(peerIdStr)) { - log.info( - `Peer ${peerIdStr} has missed too many messages, renewing.` - ); - const peerId = this.getPeers().find( - (p) => p.id.toString() === peerIdStr - )?.id; - if (!peerId) { - log.error( - `Unexpected Error: Peer ${peerIdStr} not found in connected peers.` - ); - continue; - } - try { - await this.renewAndSubscribePeer(peerId); - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}`); - } - } - } - } - } - } - public async processIncomingMessage( message: WakuMessage, peerIdStr: PeerIdStr @@ -258,8 +307,8 @@ export class SubscriptionManager implements ISubscriptionSDK { message as IProtoMessage ); - this.addHash(hashedMessageStr, peerIdStr); - void this.validateMessage(); + this.reliabilityMonitor.addHash(hashedMessageStr, peerIdStr); + void this.reliabilityMonitor.validateMessage(); if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { log.info("Message already received, skipping"); @@ -322,13 +371,11 @@ export class SubscriptionManager implements ISubscriptionSDK { try { const result = await this.protocol.ping(peer); if (result.failure) { - await this.handlePeerFailure(peerId); - } else { - this.peerFailures.delete(peerId.toString()); + await this.reliabilityMonitor.handlePeerFailure(peerId); } return result; } catch (error) { - await this.handlePeerFailure(peerId); + await this.reliabilityMonitor.handlePeerFailure(peerId); return { success: null, failure: { @@ -339,45 +386,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private async handlePeerFailure(peerId: PeerId): Promise { - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures > this.maxPingFailures) { - try { - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } - } - } - - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - try { - const newPeer = await this.renewPeer(peerId); - await this.protocol.subscribe( - this.pubsubTopic, - newPeer, - Array.from(this.subscriptionCallbacks.keys()) - ); - - this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); - this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - - return newPeer; - } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); - return; - } finally { - this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; - } - } - private startKeepAlivePings(options: SubscribeOptions): void { const { keepAlive } = options; if (this.keepAliveTimer) { @@ -402,16 +410,6 @@ export class SubscriptionManager implements ISubscriptionSDK { clearInterval(this.keepAliveTimer); this.keepAliveTimer = null; } - - private incrementMissedMessageCount(peerIdStr: string): void { - const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; - this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); - } - - private shouldRenewPeer(peerIdStr: string): boolean { - const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0; - return missedMessages > this.maxMissedMessagesThreshold; - } } class FilterSDK extends BaseProtocolSDK implements IFilterSDK {