From 22c10fca53e0ee76c59bfccfabbe46f5315c7239 Mon Sep 17 00:00:00 2001 From: Sasha Date: Fri, 19 Sep 2025 00:34:33 +0200 Subject: [PATCH] implement TTL update for open connections, add re-bootstrapping in case reaches zero peers --- .../connection_manager/bootstrap_trigger.ts | 101 ------------------ .../connection_manager/connection_limiter.ts | 88 +++++++++++++-- .../connection_manager.spec.ts | 6 ++ .../connection_manager/connection_manager.ts | 8 -- .../core/src/lib/connection_manager/utils.ts | 23 ++++ 5 files changed, 111 insertions(+), 115 deletions(-) delete mode 100644 packages/core/src/lib/connection_manager/bootstrap_trigger.ts diff --git a/packages/core/src/lib/connection_manager/bootstrap_trigger.ts b/packages/core/src/lib/connection_manager/bootstrap_trigger.ts deleted file mode 100644 index 3052784060..0000000000 --- a/packages/core/src/lib/connection_manager/bootstrap_trigger.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { PeerId } from "@libp2p/interface"; -import { Libp2p, Tags } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; - -type BootstrapTriggerConstructorOptions = { - libp2p: Libp2p; -}; - -interface IBootstrapTrigger { - start(): void; - stop(): void; -} - -const log = new Logger("bootstrap-trigger"); - -const DEFAULT_BOOTSTRAP_TIMEOUT_MS = 1000; - -export class BootstrapTrigger implements IBootstrapTrigger { - private readonly libp2p: Libp2p; - private bootstrapTimeout: NodeJS.Timeout | null = null; - - public constructor(options: BootstrapTriggerConstructorOptions) { - this.libp2p = options.libp2p; - } - - public start(): void { - log.info("Starting bootstrap trigger"); - this.libp2p.addEventListener("peer:disconnect", this.onPeerDisconnectEvent); - } - - public stop(): void { - log.info("Stopping bootstrap trigger"); - this.libp2p.removeEventListener( - "peer:disconnect", - this.onPeerDisconnectEvent - ); - - if (this.bootstrapTimeout) { - clearTimeout(this.bootstrapTimeout); - this.bootstrapTimeout = null; - log.info("Cleared pending bootstrap timeout"); - } - } - - private onPeerDisconnectEvent = (event: CustomEvent): void => { - const peerId = event.detail; - const connections = this.libp2p.getConnections(); - log.info( - `Peer disconnected: ${peerId.toString()}, remaining connections: ${connections.length}` - ); - - if (connections.length !== 0) { - return; - } - - log.info( - `Last peer disconnected, scheduling bootstrap in ${DEFAULT_BOOTSTRAP_TIMEOUT_MS} milliseconds` - ); - - if (this.bootstrapTimeout) { - clearTimeout(this.bootstrapTimeout); - } - - this.bootstrapTimeout = setTimeout(() => { - log.info("Triggering bootstrap after timeout"); - this.triggerBootstrap(); - this.bootstrapTimeout = null; - }, DEFAULT_BOOTSTRAP_TIMEOUT_MS); - }; - - private triggerBootstrap(): void { - log.info("Triggering bootstrap discovery"); - - const bootstrapComponents = Object.values(this.libp2p.components.components) - .filter((c) => !!c) - .filter((c: unknown) => - [`@waku/${Tags.BOOTSTRAP}`, `@waku/${Tags.PEER_CACHE}`].includes( - (c as { [Symbol.toStringTag]: string })?.[Symbol.toStringTag] - ) - ); - - if (bootstrapComponents.length === 0) { - log.warn("No bootstrap components found to trigger"); - return; - } - - log.info( - `Found ${bootstrapComponents.length} bootstrap components, starting them` - ); - - bootstrapComponents.forEach((component) => { - try { - (component as { stop: () => void })?.stop?.(); - (component as { start: () => void })?.start?.(); - log.info("Successfully started bootstrap component"); - } catch (error) { - log.error("Failed to start bootstrap component", error); - } - }); - } -} diff --git a/packages/core/src/lib/connection_manager/connection_limiter.ts b/packages/core/src/lib/connection_manager/connection_limiter.ts index 3b59c5f286..83d85714c7 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.ts @@ -9,9 +9,11 @@ import { WakuEvent } from "@waku/interfaces"; import { Logger } from "@waku/utils"; +import { numberToBytes } from "@waku/utils/bytes"; import { Dialer } from "./dialer.js"; import { NetworkMonitor } from "./network_monitor.js"; +import { isAddressesSupported } from "./utils.js"; const log = new Logger("connection-limiter"); @@ -123,6 +125,7 @@ export class ConnectionLimiter implements IConnectionLimiter { private async maintainConnections(): Promise { await this.maintainConnectionsCount(); await this.maintainBootstrapConnections(); + await this.maintainTTLConnectedPeers(); } private async onDisconnectedEvent(): Promise { @@ -145,13 +148,15 @@ export class ConnectionLimiter implements IConnectionLimiter { const peers = await this.getPrioritizedPeers(); if (peers.length === 0) { - log.info(`No peers to dial, node is utilizing all known peers`); + log.info(`No peers to dial, skipping`); + await this.triggerBootstrap(); return; } const promises = peers .slice(0, this.options.maxConnections - connections.length) .map((p) => this.dialer.dial(p.id)); + await Promise.all(promises); return; @@ -210,6 +215,28 @@ export class ConnectionLimiter implements IConnectionLimiter { } } + private async maintainTTLConnectedPeers(): Promise { + log.info(`Maintaining TTL connected peers`); + + const promises = this.libp2p.getConnections().map(async (c) => { + try { + await this.libp2p.peerStore.merge(c.remotePeer, { + metadata: { + ttl: numberToBytes(Date.now()) + } + }); + log.info(`TTL updated for connected peer ${c.remotePeer.toString()}`); + } catch (error) { + log.error( + `Unexpected error while maintaining TTL connected peer`, + error + ); + } + }); + + await Promise.all(promises); + } + private async dialPeersFromStore(): Promise { log.info(`Dialing peers from store`); @@ -218,6 +245,7 @@ export class ConnectionLimiter implements IConnectionLimiter { if (peers.length === 0) { log.info(`No peers to dial, skipping`); + await this.triggerBootstrap(); return; } @@ -248,10 +276,9 @@ export class ConnectionLimiter implements IConnectionLimiter { const notConnectedPeers = allPeers.filter( (p) => !allConnections.some((c) => c.remotePeer.equals(p.id)) && - p.addresses.some( - (a) => - a.multiaddr.toString().includes("wss") || - a.multiaddr.toString().includes("ws") + isAddressesSupported( + this.libp2p, + p.addresses.map((a) => a.multiaddr) ) ); @@ -267,7 +294,19 @@ export class ConnectionLimiter implements IConnectionLimiter { p.tags.has(Tags.PEER_CACHE) ); - return [...bootstrapPeers, ...peerExchangePeers, ...localStorePeers]; + const restPeers = notConnectedPeers.filter( + (p) => + !p.tags.has(Tags.BOOTSTRAP) && + !p.tags.has(Tags.PEER_EXCHANGE) && + !p.tags.has(Tags.PEER_CACHE) + ); + + return [ + ...bootstrapPeers, + ...peerExchangePeers, + ...localStorePeers, + ...restPeers + ]; } private async getBootstrapPeers(): Promise { @@ -291,4 +330,41 @@ export class ConnectionLimiter implements IConnectionLimiter { return null; } } + + /** + * Triggers the bootstrap or peer cache discovery if they are mounted. + * @returns void + */ + private async triggerBootstrap(): Promise { + log.info("Triggering bootstrap discovery"); + + const bootstrapComponents = Object.values(this.libp2p.components.components) + .filter((c) => !!c) + .filter((c: unknown) => + [`@waku/${Tags.BOOTSTRAP}`, `@waku/${Tags.PEER_CACHE}`].includes( + (c as { [Symbol.toStringTag]: string })?.[Symbol.toStringTag] + ) + ); + + if (bootstrapComponents.length === 0) { + log.warn("No bootstrap components found to trigger"); + return; + } + + log.info( + `Found ${bootstrapComponents.length} bootstrap components, starting them` + ); + + const promises = bootstrapComponents.map(async (component) => { + try { + await (component as { stop: () => Promise })?.stop?.(); + await (component as { start: () => Promise })?.start?.(); + log.info("Successfully started bootstrap component"); + } catch (error) { + log.error("Failed to start bootstrap component", error); + } + }); + + await Promise.all(promises); + } } diff --git a/packages/core/src/lib/connection_manager/connection_manager.spec.ts b/packages/core/src/lib/connection_manager/connection_manager.spec.ts index 45d64781f6..d8a108625a 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.spec.ts @@ -52,6 +52,12 @@ describe("ConnectionManager", () => { dialProtocol: sinon.stub().resolves({} as Stream), hangUp: sinon.stub().resolves(), getPeers: sinon.stub().returns([]), + getConnections: sinon.stub().returns([]), + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + components: { + components: {} + }, peerStore: { get: sinon.stub().resolves(null), merge: sinon.stub().resolves() diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 8b044aa33f..f5d6ded196 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -11,7 +11,6 @@ import { import { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -import { BootstrapTrigger } from "./bootstrap_trigger.js"; import { ConnectionLimiter } from "./connection_limiter.js"; import { Dialer } from "./dialer.js"; import { DiscoveryDialer } from "./discovery_dialer.js"; @@ -46,7 +45,6 @@ export class ConnectionManager implements IConnectionManager { private readonly shardReader: ShardReader; private readonly networkMonitor: NetworkMonitor; private readonly connectionLimiter: ConnectionLimiter; - private readonly bootstrapTrigger: BootstrapTrigger; private readonly options: ConnectionManagerOptions; private libp2p: Libp2p; @@ -66,10 +64,6 @@ export class ConnectionManager implements IConnectionManager { ...options.config }; - this.bootstrapTrigger = new BootstrapTrigger({ - libp2p: options.libp2p - }); - this.keepAliveManager = new KeepAliveManager({ relay: options.relay, libp2p: options.libp2p, @@ -116,7 +110,6 @@ export class ConnectionManager implements IConnectionManager { this.discoveryDialer.start(); this.keepAliveManager.start(); this.connectionLimiter.start(); - this.bootstrapTrigger.start(); } public stop(): void { @@ -125,7 +118,6 @@ export class ConnectionManager implements IConnectionManager { this.discoveryDialer.stop(); this.keepAliveManager.stop(); this.connectionLimiter.stop(); - this.bootstrapTrigger.stop(); } public isConnected(): boolean { diff --git a/packages/core/src/lib/connection_manager/utils.ts b/packages/core/src/lib/connection_manager/utils.ts index 02fa68a2b6..ded582d80f 100644 --- a/packages/core/src/lib/connection_manager/utils.ts +++ b/packages/core/src/lib/connection_manager/utils.ts @@ -1,6 +1,7 @@ import { isPeerId, type Peer, type PeerId } from "@libp2p/interface"; import { peerIdFromString } from "@libp2p/peer-id"; import { Multiaddr, multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; +import { Libp2p } from "@waku/interfaces"; import { bytesToUtf8 } from "@waku/utils/bytes"; /** @@ -49,3 +50,25 @@ export const mapToPeerId = (input: PeerId | MultiaddrInput): PeerId => { ? input : peerIdFromString(multiaddr(input).getPeerId()!); }; + +/** + * Checks if the address is supported by the libp2p instance. + * @param libp2p - The libp2p instance. + * @param addresses - The addresses to check. + * @returns True if the addresses are supported, false otherwise. + */ +export const isAddressesSupported = ( + libp2p: Libp2p, + addresses: Multiaddr[] +): boolean => { + const transports = + libp2p?.components?.transportManager?.getTransports() || []; + + if (transports.length === 0) { + return false; + } + + return transports + .map((transport) => transport.dialFilter(addresses)) + .some((supportedAddresses) => supportedAddresses.length > 0); +};