From f02581f1108db266500dfaeba693f37ed372d99b Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 18 Sep 2024 11:00:14 +0530 Subject: [PATCH] chore: improvements chore: add logs for subscription maintenance chore: update logging chore: improve logs & add debug chore: add TODO --- packages/core/src/lib/base_protocol.ts | 7 +- packages/core/src/lib/filter/index.ts | 15 +++- packages/interfaces/src/protocols.ts | 2 +- packages/sdk/src/protocols/base_protocol.ts | 74 ++++++++++++++----- .../sdk/src/protocols/filter/constants.ts | 2 +- packages/sdk/src/protocols/filter/index.ts | 3 + .../protocols/filter/subscription_manager.ts | 13 +++- packages/sdk/src/reliability_monitor/index.ts | 4 +- .../sdk/src/reliability_monitor/receiver.ts | 24 +++--- .../sdk/src/reliability_monitor/sender.ts | 22 ++++-- packages/utils/src/logger/index.ts | 11 ++- 11 files changed, 131 insertions(+), 46 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 648dd5c3eb..4b3b7c5a14 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -59,8 +59,11 @@ export class BaseProtocol implements IBaseProtocolCore { public async connectedPeers(withOpenStreams = false): Promise { const peers = await this.allPeers(); return peers.filter((peer) => { - return ( - this.components.connectionManager.getConnections(peer.id).length > 0 + const connections = this.components.connectionManager.getConnections( + peer.id + ); + return connections.some((c) => + c.streams.some((s) => s.protocol === this.multicodec) ); }); } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5300718b2b..fdc19698a2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -37,6 +37,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { wakuMessage: WakuMessage, peerIdStr: string ) => Promise, + private handleError: (error: Error) => Promise, public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { @@ -301,8 +302,18 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { () => { log.info("Receiving pipe closed."); }, - (e) => { - log.error("Error with receiving pipe", e); + async (e) => { + log.error( + "Error with receiving pipe", + e, + " -- ", + "on peer ", + connection.remotePeer.toString(), + " -- ", + "stream ", + stream + ); + await this.handleError(e); } ); } catch (e) { diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 3c681d3849..4f324287fc 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -25,7 +25,7 @@ export type IBaseProtocolCore = { export type IBaseProtocolSDK = { readonly connectedPeers: Peer[]; - renewPeer: (peerToDisconnect: PeerId) => Promise; + renewPeer: (peerToDisconnect: PeerId) => Promise; readonly numPeersToUse: number; }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 0c79ef0326..d091bd0724 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -46,6 +46,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; + this.log.info( + `Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms` + ); // void this.setupEventListeners(); void this.startMaintainPeersInterval(maintainPeersInterval); } @@ -59,18 +62,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param peerToDisconnect The peer to disconnect from. * @returns The new peer that was found and connected to. */ - public async renewPeer(peerToDisconnect: PeerId): Promise { - this.log.info(`Renewing peer ${peerToDisconnect}`); + public async renewPeer(peerToDisconnect: PeerId): Promise { + this.log.info(`Attempting to renew peer ${peerToDisconnect}`); await this.connectionManager.dropConnection(peerToDisconnect); - const peer = (await this.findAndAddPeers(1))[0]; - if (!peer) { - this.log.error( - "Failed to find a new peer to replace the disconnected one." - ); - } - const updatedPeers = this.peers.filter( (peer) => !peer.id.equals(peerToDisconnect) ); @@ -80,9 +76,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { `Peer ${peerToDisconnect} disconnected and removed from the peer list` ); + const newPeer = await this.findAndAddPeers(1); + if (newPeer.length === 0) { + this.log.error( + "Failed to find a new peer to replace the disconnected one" + ); + return undefined; + } + this.renewPeersLocker.lock(peerToDisconnect); - return peer; + return newPeer[0]; } /** @@ -93,9 +97,12 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { clearInterval(this.maintainPeersIntervalId); this.maintainPeersIntervalId = null; this.log.info("Maintain peers interval stopped"); + } else { + this.log.debug("Maintain peers interval was not running"); } } + //TODO: validate if adding event listeners for peer connect and disconnect is needed // private setupEventListeners(): void { // this.core.addLibp2pEventListener( // "peer:connect", @@ -157,18 +164,26 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } for (let attempts = 0; attempts < maxAttempts; attempts++) { + this.log.debug( + `Attempt ${attempts + 1}/${maxAttempts} to reach required number of peers` + ); await this.maintainPeers(); if (this.connectedPeers.length >= this.numPeersToUse) { + this.log.info( + `Required number of peers (${this.numPeersToUse}) reached` + ); return true; } this.log.warn( - `Found only ${this.connectedPeers.length} peers, expected ${this.numPeersToUse}. Retrying...` + `Found only ${this.connectedPeers.length}/${this.numPeersToUse} required peers. Retrying...` ); } - this.log.error("Failed to find required number of peers"); + this.log.error( + `Failed to find required number of peers (${this.numPeersToUse}) after ${maxAttempts} attempts` + ); return false; } @@ -177,17 +192,18 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param interval The interval in milliseconds to maintain the peers. */ private async startMaintainPeersInterval(interval: number): Promise { - this.log.info("Starting maintain peers interval"); + this.log.info( + `Starting maintain peers interval with ${interval}ms interval` + ); try { // await this.maintainPeers(); this.maintainPeersIntervalId = setInterval(() => { + this.log.debug("Running scheduled peer maintenance"); this.maintainPeers().catch((error) => { - this.log.error("Error during maintain peers interval:", error); + this.log.error("Error during scheduled peer maintenance:", error); }); }, interval); - this.log.info( - `Maintain peers interval started with interval ${interval}ms` - ); + this.log.info("Maintain peers interval started successfully"); } catch (error) { this.log.error("Error starting maintain peers interval:", error); throw error; @@ -207,6 +223,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { }); this.maintainPeersLock = true; + await this.confirmPeers(); this.log.info(`Maintaining peers, current count: ${this.peers.length}`); try { await this.confirmPeers(); @@ -214,13 +231,36 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { const numPeersToAdd = this.numPeersToUse - this.peers.size; if (numPeersToAdd > 0) { + this.log.info(`Attempting to add ${numPeersToAdd} peer(s)`); await this.peerManager.findAndAddPeers(numPeersToAdd); } else { + this.log.info( + `Attempting to remove ${Math.abs(numPeersToAdd)} excess peer(s)` + ); await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd)); } }); } + private async confirmPeers(): Promise { + const connectedPeers = await this.core.connectedPeers(); + const currentPeers = this.peers; + const peersToAdd = connectedPeers.filter( + (p) => !currentPeers.some((cp) => cp.id.equals(p.id)) + ); + const peersToRemove = currentPeers.filter( + (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) + ); + + peersToAdd.forEach((p) => this.peers.push(p)); + peersToRemove.forEach((p) => { + const index = this.peers.findIndex((cp) => cp.id.equals(p.id)); + if (index !== -1) this.peers.splice(index, 1); + }); + + this.updatePeers(this.peers); + } + /** * Finds and adds new peers to the peers list. * @param numPeers The number of peers to find and add. diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 3889e7638b..01cea6859f 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,4 +1,4 @@ -export const DEFAULT_KEEP_ALIVE = 30 * 1000; +export const DEFAULT_KEEP_ALIVE = 10_000; export const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index c8840ea0e4..c9db3cd80e 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -54,6 +54,9 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, + async (error: Error) => { + log.error("Error with receiving pipe", error); + }, connectionManager.configuredPubsubTopics, libp2p ), diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 51a9e16e8d..4300246624 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -41,7 +41,9 @@ export class SubscriptionManager implements ISubscriptionSDK { private readonly protocol: FilterCore, private readonly connectionManager: ConnectionManager, private readonly getPeers: () => Peer[], - private readonly renewPeer: (peerToDisconnect: PeerId) => Promise + private readonly renewPeer: ( + peerToDisconnect: PeerId + ) => Promise ) { this.pubsubTopic = pubsubTopic; this.subscriptionCallbacks = new Map(); @@ -251,11 +253,13 @@ export class SubscriptionManager implements ISubscriptionSDK { } private startSubscriptionsMaintenance(interval: number): void { + log.info("Starting subscriptions maintenance"); this.startKeepAlivePings(interval); this.startConnectionListener(); } private stopSubscriptionsMaintenance(): void { + log.info("Stopping subscriptions maintenance"); this.stopKeepAlivePings(); this.stopConnectionListener(); } @@ -299,9 +303,10 @@ export class SubscriptionManager implements ISubscriptionSDK { } this.keepAliveTimer = setInterval(() => { - void this.ping().catch((error) => { - log.error("Error in keep-alive ping cycle:", error); - }); + log.info("Sending keep-alive ping"); + void this.ping() + .then(() => log.info("Keep-alive ping successful")) + .catch((error) => log.error("Error in keep-alive ping cycle:", error)); }, interval) as unknown as number; } diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index 1e420dd921..b0d8dc1c3a 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -18,7 +18,7 @@ export class ReliabilityMonitorManager { public static createReceiverMonitor( pubsubTopic: PubsubTopic, getPeers: () => Peer[], - renewPeer: (peerId: PeerId) => Promise, + renewPeer: (peerId: PeerId) => Promise, getContentTopics: () => ContentTopic[], protocolSubscribe: ( pubsubTopic: PubsubTopic, @@ -42,7 +42,7 @@ export class ReliabilityMonitorManager { } public static createSenderMonitor( - renewPeer: (peerId: PeerId) => Promise + renewPeer: (peerId: PeerId) => Promise ): SenderReliabilityMonitor { if (!ReliabilityMonitorManager.senderMonitor) { ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index 440e35829c..985c52a59d 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -32,7 +32,7 @@ export class ReceiverReliabilityMonitor { public constructor( private readonly pubsubTopic: PubsubTopic, private getPeers: () => Peer[], - private renewPeer: (peerId: PeerId) => Promise, + private renewPeer: (peerId: PeerId) => Promise, private getContentTopics: () => ContentTopic[], private protocolSubscribe: ( pubsubTopic: PubsubTopic, @@ -163,15 +163,21 @@ export class ReceiverReliabilityMonitor { private async renewAndSubscribePeer( peerId: PeerId ): Promise { + const peerIdStr = peerId.toString(); try { - if (this.peerRenewalLocks.has(peerId.toString())) { - log.info(`Peer ${peerId.toString()} is already being renewed.`); + if (this.peerRenewalLocks.has(peerIdStr)) { + log.info(`Peer ${peerIdStr} is already being renewed.`); return; } - this.peerRenewalLocks.add(peerId.toString()); + this.peerRenewalLocks.add(peerIdStr); const newPeer = await this.renewPeer(peerId); + if (!newPeer) { + log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`); + return; + } + await this.protocolSubscribe( this.pubsubTopic, newPeer, @@ -181,16 +187,16 @@ export class ReceiverReliabilityMonitor { this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); this.missedMessagesByPeer.set(newPeer.id.toString(), 0); - this.peerFailures.delete(peerId.toString()); - this.missedMessagesByPeer.delete(peerId.toString()); - delete this.receivedMessagesHashes.nodes[peerId.toString()]; + this.peerFailures.delete(peerIdStr); + this.missedMessagesByPeer.delete(peerIdStr); + delete this.receivedMessagesHashes.nodes[peerIdStr]; return newPeer; } catch (error) { - log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); + log.error(`Failed to renew peer ${peerIdStr}: ${error}.`); return; } finally { - this.peerRenewalLocks.delete(peerId.toString()); + this.peerRenewalLocks.delete(peerIdStr); } } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts index 0ffe9a1659..914c321da8 100644 --- a/packages/sdk/src/reliability_monitor/sender.ts +++ b/packages/sdk/src/reliability_monitor/sender.ts @@ -11,7 +11,9 @@ export class SenderReliabilityMonitor { private readonly maxAttemptsBeforeRenewal = DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; - public constructor(private renewPeer: (peerId: PeerId) => Promise) {} + public constructor( + private renewPeer: (peerId: PeerId) => Promise + ) {} public async attemptRetriesOrRenew( peerId: PeerId, @@ -42,13 +44,19 @@ export class SenderReliabilityMonitor { } else { try { const newPeer = await this.renewPeer(peerId); - log.info( - `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` - ); + if (newPeer) { + log.info( + `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); - this.attempts.delete(peerIdStr); - this.attempts.set(newPeer.id.toString(), 0); - await protocolSend(); + this.attempts.delete(peerIdStr); + this.attempts.set(newPeer.id.toString(), 0); + await protocolSend(); + } else { + log.error( + `Failed to renew peer ${peerId.toString()}: New peer is undefined` + ); + } } catch (error) { log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); } diff --git a/packages/utils/src/logger/index.ts b/packages/utils/src/logger/index.ts index 725d55f665..1ece843d06 100644 --- a/packages/utils/src/logger/index.ts +++ b/packages/utils/src/logger/index.ts @@ -3,6 +3,7 @@ import debug, { Debugger } from "debug"; const APP_NAME = "waku"; export class Logger { + private _debug: Debugger; private _info: Debugger; private _warn: Debugger; private _error: Debugger; @@ -12,11 +13,16 @@ export class Logger { } public constructor(prefix?: string) { + this._debug = debug(Logger.createDebugNamespace("debug", prefix)); this._info = debug(Logger.createDebugNamespace("info", prefix)); this._warn = debug(Logger.createDebugNamespace("warn", prefix)); this._error = debug(Logger.createDebugNamespace("error", prefix)); } + public get debug(): Debugger { + return this._debug; + } + public get info(): Debugger { return this._info; } @@ -29,7 +35,10 @@ export class Logger { return this._error; } - public log(level: "info" | "warn" | "error", ...args: unknown[]): void { + public log( + level: "debug" | "info" | "warn" | "error", + ...args: unknown[] + ): void { const logger = this[level] as (...args: unknown[]) => void; logger(...args); }