diff --git a/.cspell.json b/.cspell.json index 61fedbe688..f4e425d684 100644 --- a/.cspell.json +++ b/.cspell.json @@ -61,6 +61,7 @@ "ineed", "IPAM", "ipfs", + "cooldown", "iwant", "jdev", "jswaku", diff --git a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts index 724356ccf5..856c57f5f5 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts @@ -48,7 +48,10 @@ describe("ConnectionLimiter", () => { const defaultOptions = { maxBootstrapPeers: 2, pingKeepAlive: 300, - relayKeepAlive: 300 + relayKeepAlive: 300, + maxDialingPeers: 3, + failedDialCooldown: 60, + dialCooldown: 10 }; beforeEach(() => { @@ -761,7 +764,10 @@ describe("ConnectionLimiter", () => { const customOptions = { maxBootstrapPeers: 1, pingKeepAlive: 300, - relayKeepAlive: 300 + relayKeepAlive: 300, + maxDialingPeers: 3, + failedDialCooldown: 60, + dialCooldown: 10 }; connectionLimiter = new ConnectionLimiter({ @@ -834,7 +840,10 @@ describe("ConnectionLimiter", () => { const customOptions = { maxBootstrapPeers: 1, pingKeepAlive: 300, - relayKeepAlive: 300 + relayKeepAlive: 300, + maxDialingPeers: 3, + failedDialCooldown: 60, + dialCooldown: 10 }; connectionLimiter = new ConnectionLimiter({ @@ -882,7 +891,10 @@ describe("ConnectionLimiter", () => { const customOptions = { maxBootstrapPeers: 1, pingKeepAlive: 300, - relayKeepAlive: 300 + relayKeepAlive: 300, + maxDialingPeers: 3, + failedDialCooldown: 60, + dialCooldown: 10 }; connectionLimiter = new ConnectionLimiter({ @@ -928,7 +940,10 @@ describe("ConnectionLimiter", () => { const customOptions = { maxBootstrapPeers: 10, pingKeepAlive: 300, - relayKeepAlive: 300 + relayKeepAlive: 300, + maxDialingPeers: 3, + failedDialCooldown: 60, + dialCooldown: 10 }; connectionLimiter = new ConnectionLimiter({ diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 9b5da9bf03..8b175f83a6 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -24,6 +24,9 @@ const log = new Logger("connection-manager"); const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1; const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60; const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60; +const DEFAULT_MAX_DIALING_PEERS = 3; +const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60; +const DEFAULT_DIAL_COOLDOWN_SEC = 10; type ConnectionManagerConstructorOptions = { libp2p: Libp2p; @@ -55,6 +58,9 @@ export class ConnectionManager implements IConnectionManager { maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC, relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC, + maxDialingPeers: DEFAULT_MAX_DIALING_PEERS, + failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC, + dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC, ...options.config }; @@ -74,7 +80,8 @@ export class ConnectionManager implements IConnectionManager { this.dialer = new Dialer({ libp2p: options.libp2p, - shardReader: this.shardReader + shardReader: this.shardReader, + options: this.options }); this.discoveryDialer = new DiscoveryDialer({ @@ -97,6 +104,7 @@ export class ConnectionManager implements IConnectionManager { } public start(): void { + this.dialer.start(); this.networkMonitor.start(); this.discoveryDialer.start(); this.keepAliveManager.start(); @@ -104,6 +112,7 @@ export class ConnectionManager implements IConnectionManager { } public stop(): void { + this.dialer.stop(); this.networkMonitor.stop(); this.discoveryDialer.stop(); this.keepAliveManager.stop(); diff --git a/packages/core/src/lib/connection_manager/dialer.spec.ts b/packages/core/src/lib/connection_manager/dialer.spec.ts index 8f2b0f1ea1..971f5dafcf 100644 --- a/packages/core/src/lib/connection_manager/dialer.spec.ts +++ b/packages/core/src/lib/connection_manager/dialer.spec.ts @@ -1,5 +1,5 @@ import { PeerId } from "@libp2p/interface"; -import { Libp2p } from "@waku/interfaces"; +import { ConnectionManagerOptions, Libp2p } from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -13,6 +13,7 @@ describe("Dialer", () => { let mockPeerId: PeerId; let mockPeerId2: PeerId; let clock: sinon.SinonFakeTimers; + let mockOptions: ConnectionManagerOptions; const createMockPeerId = (id: string): PeerId => ({ @@ -31,10 +32,21 @@ describe("Dialer", () => { isPeerOnNetwork: sinon.stub().resolves(true) } as unknown as sinon.SinonStubbedInstance; + mockOptions = { + maxBootstrapPeers: 1, + pingKeepAlive: 300, + relayKeepAlive: 300, + maxDialingPeers: 3, + failedDialCooldown: 60, + dialCooldown: 10 + }; + mockPeerId = createMockPeerId("12D3KooWTest1"); mockPeerId2 = createMockPeerId("12D3KooWTest2"); - clock = sinon.useFakeTimers(); + clock = sinon.useFakeTimers({ + now: 1000000000000 + }); }); afterEach(() => { @@ -49,7 +61,8 @@ describe("Dialer", () => { it("should create dialer with libp2p and shardReader", () => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); expect(dialer).to.be.instanceOf(Dialer); @@ -60,7 +73,8 @@ describe("Dialer", () => { beforeEach(() => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); }); @@ -98,7 +112,8 @@ describe("Dialer", () => { beforeEach(() => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); dialer.start(); }); @@ -135,7 +150,8 @@ describe("Dialer", () => { beforeEach(() => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); dialer.start(); }); @@ -152,14 +168,28 @@ describe("Dialer", () => { it("should add peer to queue when queue is not empty", async () => { const dialStub = libp2p.dial as sinon.SinonStub; - dialStub.resolves(); - void dialer.dial(mockPeerId); + let resolveFirstDial: () => void; + const firstDialPromise = new Promise((resolve) => { + resolveFirstDial = resolve; + }); + dialStub.onFirstCall().returns(firstDialPromise); + dialStub.onSecondCall().resolves(); + + const firstDialCall = dialer.dial(mockPeerId); - dialStub.resetHistory(); await dialer.dial(mockPeerId2); - expect(dialStub.called).to.be.true; + expect(dialStub.calledOnce).to.be.true; + expect(dialStub.calledWith(mockPeerId)).to.be.true; + + resolveFirstDial!(); + await firstDialCall; + + clock.tick(500); + await Promise.resolve(); + + expect(dialStub.calledTwice).to.be.true; expect(dialStub.calledWith(mockPeerId2)).to.be.true; }); @@ -186,7 +216,54 @@ describe("Dialer", () => { clock.tick(5000); await dialer.dial(mockPeerId); - expect(dialStub.called).to.be.true; + expect(dialStub.called).to.be.false; + }); + + it("should skip peer when failed to dial recently", async () => { + const dialStub = libp2p.dial as sinon.SinonStub; + dialStub.rejects(new Error("Dial failed")); + + await dialer.dial(mockPeerId); + expect(dialStub.calledOnce).to.be.true; + + dialStub.resetHistory(); + dialStub.resolves(); + + clock.tick(30000); + + await dialer.dial(mockPeerId); + expect(dialStub.called).to.be.false; + }); + + it("should populate queue if has active dial", async () => { + const dialStub = libp2p.dial as sinon.SinonStub; + const mockPeerId3 = createMockPeerId("12D3KooWTest3"); + + let resolveFirstDial: () => void; + const firstDialPromise = new Promise((resolve) => { + resolveFirstDial = resolve; + }); + dialStub.onFirstCall().returns(firstDialPromise); + dialStub.onSecondCall().resolves(); + dialStub.onThirdCall().resolves(); + + const firstDialCall = dialer.dial(mockPeerId); + + await dialer.dial(mockPeerId2); + await dialer.dial(mockPeerId3); + + expect(dialStub.calledOnce).to.be.true; + expect(dialStub.calledWith(mockPeerId)).to.be.true; + + resolveFirstDial!(); + await firstDialCall; + + clock.tick(500); + await Promise.resolve(); + + expect(dialStub.callCount).to.equal(3); + expect(dialStub.calledWith(mockPeerId2)).to.be.true; + expect(dialStub.calledWith(mockPeerId3)).to.be.true; }); it("should allow redial after cooldown period", async () => { @@ -252,13 +329,49 @@ describe("Dialer", () => { expect(dialStub.calledOnce).to.be.true; expect(dialStub.calledWith(mockPeerId)).to.be.true; }); + + it("should allow redial after failed dial cooldown expires", async () => { + const dialStub = libp2p.dial as sinon.SinonStub; + dialStub.onFirstCall().rejects(new Error("Dial failed")); + dialStub.onSecondCall().resolves(); + await dialer.dial(mockPeerId); + expect(dialStub.calledOnce).to.be.true; + clock.tick(60001); + await dialer.dial(mockPeerId); + expect(dialStub.calledTwice).to.be.true; + }); + + it("should handle queue overflow by adding peers to queue", async () => { + const dialStub = libp2p.dial as sinon.SinonStub; + const peers = []; + for (let i = 0; i < 100; i++) { + peers.push(createMockPeerId(`12D3KooWTest${i}`)); + } + let resolveFirstDial: () => void; + const firstDialPromise = new Promise((resolve) => { + resolveFirstDial = resolve; + }); + dialStub.onFirstCall().returns(firstDialPromise); + dialStub.resolves(); + const firstDialCall = dialer.dial(peers[0]); + for (let i = 1; i < 100; i++) { + await dialer.dial(peers[i]); + } + expect(dialStub.calledOnce).to.be.true; + resolveFirstDial!(); + await firstDialCall; + clock.tick(500); + await Promise.resolve(); + expect(dialStub.callCount).to.be.greaterThan(1); + }); }); describe("queue processing", () => { beforeEach(() => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); dialer.start(); }); @@ -334,7 +447,8 @@ describe("Dialer", () => { beforeEach(() => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); dialer.start(); }); @@ -368,7 +482,8 @@ describe("Dialer", () => { it("should handle complete dial lifecycle", async () => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); dialer.start(); @@ -386,7 +501,8 @@ describe("Dialer", () => { it("should handle multiple peers with different shard configurations", async () => { dialer = new Dialer({ libp2p, - shardReader: mockShardReader + shardReader: mockShardReader, + options: mockOptions }); dialer.start(); diff --git a/packages/core/src/lib/connection_manager/dialer.ts b/packages/core/src/lib/connection_manager/dialer.ts index a8f7cb34f3..21989c12aa 100644 --- a/packages/core/src/lib/connection_manager/dialer.ts +++ b/packages/core/src/lib/connection_manager/dialer.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import { Libp2p } from "@waku/interfaces"; +import { ConnectionManagerOptions, Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { ShardReader } from "./shard_reader.js"; @@ -9,6 +9,7 @@ const log = new Logger("dialer"); type DialerConstructorOptions = { libp2p: Libp2p; shardReader: ShardReader; + options: ConnectionManagerOptions; }; interface IDialer { @@ -20,18 +21,25 @@ interface IDialer { export class Dialer implements IDialer { private readonly libp2p: Libp2p; private readonly shardReader: ShardReader; + private readonly options: ConnectionManagerOptions; private dialingQueue: PeerId[] = []; private dialHistory: Map = new Map(); + private failedDials: Map = new Map(); private dialingInterval: NodeJS.Timeout | null = null; + private isProcessing = false; + private isImmediateDialing = false; public constructor(options: DialerConstructorOptions) { this.libp2p = options.libp2p; this.shardReader = options.shardReader; + this.options = options.options; } public start(): void { + log.info("Starting dialer"); + if (!this.dialingInterval) { this.dialingInterval = setInterval(() => { void this.processQueue(); @@ -39,15 +47,19 @@ export class Dialer implements IDialer { } this.dialHistory.clear(); + this.failedDials.clear(); } public stop(): void { + log.info("Stopping dialer"); + if (this.dialingInterval) { clearInterval(this.dialingInterval); this.dialingInterval = null; } this.dialHistory.clear(); + this.failedDials.clear(); } public async dial(peerId: PeerId): Promise { @@ -58,11 +70,17 @@ export class Dialer implements IDialer { return; } + const isEmptyQueue = this.dialingQueue.length === 0; + const isNotDialing = !this.isProcessing && !this.isImmediateDialing; + // If queue is empty and we're not currently processing, dial immediately - if (this.dialingQueue.length === 0 && !this.isProcessing) { + if (isEmptyQueue && isNotDialing) { + this.isImmediateDialing = true; + log.info("Dialed peer immediately"); await this.dialPeer(peerId); + this.isImmediateDialing = false; + log.info("Released immediate dial lock"); } else { - // Add to queue this.dialingQueue.push(peerId); log.info( `Added peer to dialing queue, queue size: ${this.dialingQueue.length}` @@ -71,12 +89,17 @@ export class Dialer implements IDialer { } private async processQueue(): Promise { - if (this.dialingQueue.length === 0 || this.isProcessing) return; + if (this.dialingQueue.length === 0 || this.isProcessing) { + return; + } this.isProcessing = true; try { - const peersToDial = this.dialingQueue.slice(0, 3); + const peersToDial = this.dialingQueue.slice( + 0, + this.options.maxDialingPeers + ); this.dialingQueue = this.dialingQueue.slice(peersToDial.length); log.info( @@ -95,10 +118,12 @@ export class Dialer implements IDialer { await this.libp2p.dial(peerId); this.dialHistory.set(peerId.toString(), Date.now()); + this.failedDials.delete(peerId.toString()); log.info(`Successfully dialed peer from queue: ${peerId}`); } catch (error) { log.error(`Error dialing peer ${peerId}`, error); + this.failedDials.set(peerId.toString(), Date.now()); } } @@ -109,14 +134,18 @@ export class Dialer implements IDialer { return true; } - const lastDialed = this.dialHistory.get(peerId.toString()); - if (lastDialed && Date.now() - lastDialed < 10_000) { + if (this.isRecentlyDialed(peerId)) { log.info( `Skipping peer ${peerId} - already dialed in the last 10 seconds` ); return true; } + if (this.isRecentlyFailed(peerId)) { + log.info(`Skipping peer ${peerId} - recently failed to dial`); + return true; + } + try { const hasShardInfo = await this.shardReader.hasShardInfo(peerId); if (!hasShardInfo) { @@ -136,4 +165,28 @@ export class Dialer implements IDialer { return true; // Skip peer when there's an error } } + + private isRecentlyDialed(peerId: PeerId): boolean { + const lastDialed = this.dialHistory.get(peerId.toString()); + if ( + lastDialed && + Date.now() - lastDialed < this.options.dialCooldown * 1000 + ) { + return true; + } + + return false; + } + + private isRecentlyFailed(peerId: PeerId): boolean { + const lastFailed = this.failedDials.get(peerId.toString()); + if ( + lastFailed && + Date.now() - lastFailed < this.options.failedDialCooldown * 1000 + ) { + return true; + } + + return false; + } } diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 8849a54e13..6e1b0c1239 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -31,6 +31,27 @@ export type ConnectionManagerOptions = { * @default 300 seconds */ relayKeepAlive: number; + + /** + * Max number of peers to dial at once. + * + * @default 3 + */ + maxDialingPeers: number; + + /** + * Time to wait before dialing failed peers again. + * + * @default 60 seconds + */ + failedDialCooldown: number; + + /** + * Time to wait before dialing a peer again. + * + * @default 10 seconds + */ + dialCooldown: number; }; export interface IConnectionManager {