From 4526a9a96379fe4d88e83ac7d09ffb881f824c5d Mon Sep 17 00:00:00 2001 From: isit666 Date: Sun, 11 Jan 2026 14:36:04 +0100 Subject: [PATCH] add dialTimeout, change dialingQueue to Map --- .../connection_limiter.spec.ts | 3 +- .../connection_manager/connection_manager.ts | 2 ++ .../src/lib/connection_manager/dialer.spec.ts | 1 + .../core/src/lib/connection_manager/dialer.ts | 35 ++++++++++++------- packages/interfaces/src/connection_manager.ts | 7 ++++ 5 files changed, 35 insertions(+), 13 deletions(-) diff --git a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts index 57d068b904..4c242870bb 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts @@ -65,7 +65,8 @@ describe("ConnectionLimiter", () => { enableAutoRecovery: true, maxDialingPeers: 3, failedDialCooldown: 60, - dialCooldown: 10 + dialCooldown: 10, + dialTimeout: 30 }; function createLimiter( diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index f5d6ded196..0e8964d06f 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -29,6 +29,7 @@ const DEFAULT_MAX_CONNECTIONS = 10; const DEFAULT_MAX_DIALING_PEERS = 3; const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60; const DEFAULT_DIAL_COOLDOWN_SEC = 10; +const DEFAULT_DIAL_TIMEOUT_SEC = 30; type ConnectionManagerConstructorOptions = { libp2p: Libp2p; @@ -61,6 +62,7 @@ export class ConnectionManager implements IConnectionManager { maxDialingPeers: DEFAULT_MAX_DIALING_PEERS, failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC, dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC, + dialTimeout: DEFAULT_DIAL_TIMEOUT_SEC, ...options.config }; diff --git a/packages/core/src/lib/connection_manager/dialer.spec.ts b/packages/core/src/lib/connection_manager/dialer.spec.ts index d7112a5b02..d7596c2c64 100644 --- a/packages/core/src/lib/connection_manager/dialer.spec.ts +++ b/packages/core/src/lib/connection_manager/dialer.spec.ts @@ -39,6 +39,7 @@ describe("Dialer", () => { maxDialingPeers: 3, failedDialCooldown: 60, dialCooldown: 10, + dialTimeout: 30, maxConnections: 10, enableAutoRecovery: true }; diff --git a/packages/core/src/lib/connection_manager/dialer.ts b/packages/core/src/lib/connection_manager/dialer.ts index fbe317d3d2..ccd51f7889 100644 --- a/packages/core/src/lib/connection_manager/dialer.ts +++ b/packages/core/src/lib/connection_manager/dialer.ts @@ -23,7 +23,7 @@ export class Dialer implements IDialer { private readonly shardReader: ShardReader; private readonly options: ConnectionManagerOptions; - private dialingQueue: PeerId[] = []; + private dialingQueue: Map = new Map(); private dialHistory: Map = new Map(); private failedDials: Map = new Map(); private dialingInterval: NodeJS.Timeout | null = null; @@ -70,7 +70,7 @@ export class Dialer implements IDialer { return; } - const isEmptyQueue = this.dialingQueue.length === 0; + const isEmptyQueue = this.dialingQueue.size === 0; const isNotDialing = !this.isProcessing && !this.isImmediateDialing; // If queue is empty and we're not currently processing, dial immediately @@ -81,29 +81,28 @@ export class Dialer implements IDialer { this.isImmediateDialing = false; log.info("Released immediate dial lock"); } else { - this.dialingQueue.push(peerId); + this.dialingQueue.set(peerId.toString(), peerId); log.info( - `Added peer to dialing queue, queue size: ${this.dialingQueue.length}` + `Added peer to dialing queue, queue size: ${this.dialingQueue.size}` ); } } private async processQueue(): Promise { - if (this.dialingQueue.length === 0 || this.isProcessing) { + if (this.dialingQueue.size === 0 || this.isProcessing) { return; } this.isProcessing = true; try { - const peersToDial = this.dialingQueue.slice( - 0, - this.options.maxDialingPeers - ); - this.dialingQueue = this.dialingQueue.slice(peersToDial.length); + const allPeers = Array.from(this.dialingQueue.values()); + const peersToDial = allPeers.slice(0, this.options.maxDialingPeers); + + peersToDial.forEach((peer) => this.dialingQueue.delete(peer.toString())); log.info( - `Processing dial queue: dialing ${peersToDial.length} peers, ${this.dialingQueue.length} remaining in queue` + `Processing dial queue: dialing ${peersToDial.length} peers, ${this.dialingQueue.size} remaining in queue` ); await Promise.all(peersToDial.map((peerId) => this.dialPeer(peerId))); @@ -116,7 +115,19 @@ export class Dialer implements IDialer { try { log.info(`Dialing peer from queue: ${peerId}`); - await this.libp2p.dial(peerId); + await Promise.race([ + this.libp2p.dial(peerId), + new Promise((_, reject) => + setTimeout( + () => + reject( + new Error(`Dial timeout after ${this.options.dialTimeout}s`) + ), + this.options.dialTimeout * 1000 + ) + ) + ]); + this.dialHistory.set(peerId.toString(), Date.now()); this.failedDials.delete(peerId.toString()); diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index b25a225989..d397462f23 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -74,6 +74,13 @@ export type ConnectionManagerOptions = { * @default 10 seconds */ dialCooldown: number; + + /** + * Time to wait for a dial attempt to complete before timing out. + * + * @default 30 seconds + */ + dialTimeout: number; }; export interface IConnectionManager {