From 00635b7afe60c2ed739f2ccd1f07b2a6cc04f797 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Tue, 16 Jul 2024 18:35:24 +0200 Subject: [PATCH] feat: fix peer renewal, change Filter keep alive (#2065) * move util, create stream manager folder * change default keep alive, improve peer renewal * address nit --- packages/core/src/index.ts | 2 +- packages/core/src/lib/base_protocol.ts | 3 +- packages/core/src/lib/connection_manager.ts | 57 +++++++-------- packages/core/src/lib/keep_alive_manager.ts | 41 ++++++----- packages/core/src/lib/stream_manager/index.ts | 1 + .../{ => stream_manager}/stream_manager.ts | 3 +- packages/core/src/lib/stream_manager/utils.ts | 22 ++++++ packages/sdk/src/protocols/base_protocol.ts | 73 +++++++++++++++---- packages/sdk/src/protocols/filter.ts | 4 +- packages/utils/src/libp2p/index.ts | 21 ------ 10 files changed, 139 insertions(+), 88 deletions(-) create mode 100644 packages/core/src/lib/stream_manager/index.ts rename packages/core/src/lib/{ => stream_manager}/stream_manager.ts (98%) create mode 100644 packages/core/src/lib/stream_manager/utils.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f71ac2568d..af6d03b3c5 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -22,6 +22,6 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; export { ConnectionManager } from "./lib/connection_manager.js"; export { KeepAliveManager } from "./lib/keep_alive_manager.js"; -export { StreamManager } from "./lib/stream_manager.js"; +export { StreamManager } from "./lib/stream_manager/index.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 6ede084500..69a7e29067 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -14,7 +14,7 @@ import { } from "@waku/utils/libp2p"; import { filterPeersByDiscovery } from "./filterPeers.js"; -import { StreamManager } from "./stream_manager.js"; +import { StreamManager } from "./stream_manager/index.js"; /** * A class with predefined helpers, to be used as a base to implement Waku @@ -47,6 +47,7 @@ export class BaseProtocol implements IBaseProtocolCore { this.addLibp2pEventListener ); } + protected async getStream(peer: Peer): Promise { return this.streamManager.getStream(peer); } diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 95f209e739..6b23713eb5 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -89,7 +89,7 @@ export class ConnectionManager return instance; } - stop(): void { + public stop(): void { this.keepAliveManager.stopAll(); this.libp2p.removeEventListener( "peer:connect", @@ -105,7 +105,7 @@ export class ConnectionManager ); } - async dropConnection(peerId: PeerId): Promise { + public async dropConnection(peerId: PeerId): Promise { try { this.keepAliveManager.stop(peerId); await this.libp2p.hangUp(peerId); @@ -187,7 +187,11 @@ export class ConnectionManager ...options }; - this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay); + this.keepAliveManager = new KeepAliveManager({ + relay, + libp2p, + options: keepAliveOptions + }); this.run() .then(() => log.info(`Connection Manager is now running`)) @@ -250,6 +254,7 @@ export class ConnectionManager this.dialAttemptsForPeer.set(peerId.toString(), -1); // Dialing succeeded, break the loop + this.keepAliveManager.start(peerId); break; } catch (error) { if (error instanceof AggregateError) { @@ -356,7 +361,7 @@ export class ConnectionManager ); } - private async attemptDial(peerId: PeerId): Promise { + public async attemptDial(peerId: PeerId): Promise { if (!(await this.shouldDialPeer(peerId))) return; if (this.currentActiveParallelDialCount >= this.options.maxParallelDials) { @@ -364,9 +369,7 @@ export class ConnectionManager return; } - this.dialPeer(peerId).catch((err) => { - log.error(`Error dialing peer ${peerId.toString()} : ${err}`); - }); + await this.dialPeer(peerId); } private onEventHandlers = { @@ -389,11 +392,7 @@ export class ConnectionManager const peerId = evt.detail; - this.keepAliveManager.start( - peerId, - this.libp2p.services.ping, - this.libp2p.peerStore - ); + this.keepAliveManager.start(peerId); const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( Tags.BOOTSTRAP @@ -449,38 +448,40 @@ export class ConnectionManager * @returns true if the peer should be dialed, false otherwise */ private async shouldDialPeer(peerId: PeerId): Promise { - // if we're already connected to the peer, don't dial const isConnected = this.libp2p.getConnections(peerId).length > 0; if (isConnected) { log.warn(`Already connected to peer ${peerId.toString()}. Not dialing.`); return false; } - // if the peer is not part of any of the configured pubsub topics, don't dial - if (!(await this.isPeerTopicConfigured(peerId))) { + const isSameShard = await this.isPeerTopicConfigured(peerId); + if (!isSameShard) { const shardInfo = await this.getPeerShardInfo( peerId, this.libp2p.peerStore ); + log.warn( `Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ this.configuredPubsubTopics }). Not dialing.` ); + return false; } - // if the peer is not dialable based on bootstrap status, don't dial - if (!(await this.isPeerDialableBasedOnBootstrapStatus(peerId))) { + const isPreferredBasedOnBootstrap = + await this.isPeerDialableBasedOnBootstrapStatus(peerId); + if (!isPreferredBasedOnBootstrap) { log.warn( `Peer ${peerId.toString()} is not dialable based on bootstrap status. Not dialing.` ); return false; } - // If the peer is already already has an active dial attempt, or has been dialed before, don't dial it - if (this.dialAttemptsForPeer.has(peerId.toString())) { + const hasBeenDialed = this.dialAttemptsForPeer.has(peerId.toString()); + if (hasBeenDialed) { log.warn( `Peer ${peerId.toString()} has already been attempted dial before, or already has a dial attempt in progress, skipping dial` ); @@ -502,19 +503,17 @@ export class ConnectionManager const isBootstrap = tagNames.some((tagName) => tagName === Tags.BOOTSTRAP); - if (isBootstrap) { - const currentBootstrapConnections = this.libp2p - .getConnections() - .filter((conn) => { - return conn.tags.find((name) => name === Tags.BOOTSTRAP); - }).length; - if (currentBootstrapConnections < this.options.maxBootstrapPeersAllowed) - return true; - } else { + if (!isBootstrap) { return true; } - return false; + const currentBootstrapConnections = this.libp2p + .getConnections() + .filter((conn) => { + return conn.tags.find((name) => name === Tags.BOOTSTRAP); + }).length; + + return currentBootstrapConnections < this.options.maxBootstrapPeersAllowed; } private async dispatchDiscoveryEvent(peerId: PeerId): Promise { diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 3051a53f5b..e16e4a0a85 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -1,6 +1,5 @@ -import type { PeerId, PeerStore } from "@libp2p/interface"; -import type { PingService } from "@libp2p/ping"; -import type { IRelay, PeerIdStr } from "@waku/interfaces"; +import type { PeerId } from "@libp2p/interface"; +import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; @@ -10,24 +9,30 @@ import { createEncoder } from "./message/version_0.js"; export const RelayPingContentTopic = "/relay-ping/1/ping/null"; const log = new Logger("keep-alive"); -export class KeepAliveManager { - private pingKeepAliveTimers: Map>; - private relayKeepAliveTimers: Map[]>; - private options: KeepAliveOptions; - private relay?: IRelay; +type CreateKeepAliveManagerOptions = { + options: KeepAliveOptions; + libp2p: Libp2p; + relay?: IRelay; +}; - constructor(options: KeepAliveOptions, relay?: IRelay) { - this.pingKeepAliveTimers = new Map(); - this.relayKeepAliveTimers = new Map(); +export class KeepAliveManager { + private readonly relay?: IRelay; + private readonly libp2p: Libp2p; + + private readonly options: KeepAliveOptions; + + private pingKeepAliveTimers: Map> = + new Map(); + private relayKeepAliveTimers: Map[]> = + new Map(); + + constructor({ options, relay, libp2p }: CreateKeepAliveManagerOptions) { this.options = options; this.relay = relay; + this.libp2p = libp2p; } - public start( - peerId: PeerId, - libp2pPing: PingService, - peerStore: PeerStore - ): void { + public start(peerId: PeerId): void { // Just in case a timer already exists for this peer this.stop(peerId); @@ -46,7 +51,7 @@ export class KeepAliveManager { // ping the peer for keep alive // also update the peer store with the latency try { - ping = await libp2pPing.ping(peerId); + ping = await this.libp2p.services.ping.ping(peerId); log.info(`Ping succeeded (${peerIdStr})`, ping); } catch (error) { log.error(`Ping failed for peer (${peerIdStr}). @@ -56,7 +61,7 @@ export class KeepAliveManager { } try { - await peerStore.merge(peerId, { + await this.libp2p.peerStore.merge(peerId, { metadata: { ping: utf8ToBytes(ping.toString()) } diff --git a/packages/core/src/lib/stream_manager/index.ts b/packages/core/src/lib/stream_manager/index.ts new file mode 100644 index 0000000000..7260f3064d --- /dev/null +++ b/packages/core/src/lib/stream_manager/index.ts @@ -0,0 +1 @@ +export { StreamManager } from "./stream_manager.js"; diff --git a/packages/core/src/lib/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts similarity index 98% rename from packages/core/src/lib/stream_manager.ts rename to packages/core/src/lib/stream_manager/stream_manager.ts index aeb6072d0e..b602d47e71 100644 --- a/packages/core/src/lib/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -2,7 +2,8 @@ import type { PeerUpdate, Stream } from "@libp2p/interface"; import type { Peer, PeerId } from "@libp2p/interface"; import { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -import { selectConnection } from "@waku/utils/libp2p"; + +import { selectConnection } from "./utils.js"; const CONNECTION_TIMEOUT = 5_000; const RETRY_BACKOFF_BASE = 1_000; diff --git a/packages/core/src/lib/stream_manager/utils.ts b/packages/core/src/lib/stream_manager/utils.ts new file mode 100644 index 0000000000..d6f9ace5a8 --- /dev/null +++ b/packages/core/src/lib/stream_manager/utils.ts @@ -0,0 +1,22 @@ +import type { Connection } from "@libp2p/interface"; + +export function selectConnection( + connections: Connection[] +): Connection | undefined { + if (!connections.length) return; + if (connections.length === 1) return connections[0]; + + let latestConnection: Connection | undefined; + + connections.forEach((connection) => { + if (connection.status === "open") { + if (!latestConnection) { + latestConnection = connection; + } else if (connection.timeline.open > latestConnection.timeline.open) { + latestConnection = connection; + } + } + }); + + return latestConnection; +} diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 6013e9dfbe..27ee1b8832 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -9,6 +9,7 @@ interface Options { maintainPeersInterval?: number; } +const RENEW_TIME_LOCK_DURATION = 30 * 1000; const DEFAULT_NUM_PEERS_TO_USE = 3; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; @@ -21,6 +22,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { log: Logger; private maintainPeersLock = false; + private readonly renewPeersLocker = new RenewPeerLocker( + RENEW_TIME_LOCK_DURATION + ); constructor( protected core: BaseProtocol, @@ -46,11 +50,6 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { */ public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); - await this.connectionManager.dropConnection(peerToDisconnect); - this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect); - this.log.info( - `Peer ${peerToDisconnect} disconnected and removed from the peer list` - ); const peer = (await this.findAndAddPeers(1))[0]; if (!peer) { @@ -59,6 +58,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ); } + await this.connectionManager.dropConnection(peerToDisconnect); + this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect)); + this.log.info( + `Peer ${peerToDisconnect} disconnected and removed from the peer list` + ); + + this.renewPeersLocker.lock(peerToDisconnect); + return peer; } @@ -163,6 +170,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.log.info( `Peer maintenance completed, current count: ${this.peers.length}` ); + this.renewPeersLocker.cleanUnlocked(); } finally { this.maintainPeersLock = false; } @@ -177,6 +185,12 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.log.info(`Finding and adding ${numPeers} new peers`); try { const additionalPeers = await this.findAdditionalPeers(numPeers); + const dials = additionalPeers.map((peer) => + this.connectionManager.attemptDial(peer.id) + ); + + await Promise.all(dials); + this.peers = [...this.peers, ...additionalPeers]; this.log.info( `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` @@ -198,22 +212,19 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { private async findAdditionalPeers(numPeers: number): Promise { this.log.info(`Finding ${numPeers} additional peers`); try { - let newPeers = await this.core.getPeers({ - maxBootstrapPeers: 0, - numPeers: 0 - }); + let newPeers = await this.core.allPeers(); if (newPeers.length === 0) { - this.log.warn("No new peers found, trying with bootstrap peers"); - newPeers = await this.core.getPeers({ - maxBootstrapPeers: numPeers, - numPeers: 0 - }); + this.log.warn("No new peers found."); } newPeers = newPeers - .filter((peer) => this.peers.some((p) => p.id === peer.id) === false) + .filter( + (peer) => this.peers.some((p) => p.id.equals(peer.id)) === false + ) + .filter((peer) => !this.renewPeersLocker.isLocked(peer.id)) .slice(0, numPeers); + return newPeers; } catch (error) { this.log.error("Error finding additional peers:", error); @@ -221,3 +232,35 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } } + +class RenewPeerLocker { + private readonly peers: Map = new Map(); + + constructor(private lockDuration: number) {} + + public lock(id: PeerId): void { + this.peers.set(id.toString(), Date.now()); + } + + public isLocked(id: PeerId): boolean { + const time = this.peers.get(id.toString()); + + if (time && !this.isTimeUnlocked(time)) { + return true; + } + + return false; + } + + public cleanUnlocked(): void { + Object.entries(this.peers).forEach(([id, lock]) => { + if (this.isTimeUnlocked(lock)) { + this.peers.delete(id.toString()); + } + }); + } + + private isTimeUnlocked(time: number): boolean { + return Date.now() - time >= this.lockDuration; + } +} diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index c10e9582db..35da1a4c92 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -41,11 +41,11 @@ type SubscriptionCallback = { const log = new Logger("sdk:filter"); -const MINUTE = 60 * 1000; const DEFAULT_MAX_PINGS = 3; +const DEFAULT_KEEP_ALIVE = 30 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: MINUTE + keepAlive: DEFAULT_KEEP_ALIVE }; export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 04c37c7d6d..fa9b97c85d 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -105,24 +105,3 @@ export async function getConnectedPeersForProtocolAndShard( const peersWithNulls = await Promise.all(peerPromises); return peersWithNulls.filter((peer): peer is Peer => peer !== null); } - -export function selectConnection( - connections: Connection[] -): Connection | undefined { - if (!connections.length) return; - if (connections.length === 1) return connections[0]; - - let latestConnection: Connection | undefined; - - connections.forEach((connection) => { - if (connection.status === "open") { - if (!latestConnection) { - latestConnection = connection; - } else if (connection.timeline.open > latestConnection.timeline.open) { - latestConnection = connection; - } - } - }); - - return latestConnection; -}