feat: don't dial peers that failed before, make dialer use dial queue (#2478)

* make dialer use dial queue

* skip undiable peers

* cover new cases in tests

* expose dialer config at connection manager

* update tests with new config

* add more tests
This commit is contained in:
Sasha 2025-07-15 23:13:58 +02:00 committed by GitHub
parent 27292edabc
commit 35acdf8fa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 243 additions and 28 deletions

View File

@ -61,6 +61,7 @@
"ineed",
"IPAM",
"ipfs",
"cooldown",
"iwant",
"jdev",
"jswaku",

View File

@ -48,7 +48,10 @@ describe("ConnectionLimiter", () => {
const defaultOptions = {
maxBootstrapPeers: 2,
pingKeepAlive: 300,
relayKeepAlive: 300
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
};
beforeEach(() => {
@ -761,7 +764,10 @@ describe("ConnectionLimiter", () => {
const customOptions = {
maxBootstrapPeers: 1,
pingKeepAlive: 300,
relayKeepAlive: 300
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
};
connectionLimiter = new ConnectionLimiter({
@ -834,7 +840,10 @@ describe("ConnectionLimiter", () => {
const customOptions = {
maxBootstrapPeers: 1,
pingKeepAlive: 300,
relayKeepAlive: 300
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
};
connectionLimiter = new ConnectionLimiter({
@ -882,7 +891,10 @@ describe("ConnectionLimiter", () => {
const customOptions = {
maxBootstrapPeers: 1,
pingKeepAlive: 300,
relayKeepAlive: 300
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
};
connectionLimiter = new ConnectionLimiter({
@ -928,7 +940,10 @@ describe("ConnectionLimiter", () => {
const customOptions = {
maxBootstrapPeers: 10,
pingKeepAlive: 300,
relayKeepAlive: 300
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
};
connectionLimiter = new ConnectionLimiter({

View File

@ -24,6 +24,9 @@ const log = new Logger("connection-manager");
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60;
const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60;
const DEFAULT_MAX_DIALING_PEERS = 3;
const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60;
const DEFAULT_DIAL_COOLDOWN_SEC = 10;
type ConnectionManagerConstructorOptions = {
libp2p: Libp2p;
@ -55,6 +58,9 @@ export class ConnectionManager implements IConnectionManager {
maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC,
relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC,
maxDialingPeers: DEFAULT_MAX_DIALING_PEERS,
failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC,
dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC,
...options.config
};
@ -74,7 +80,8 @@ export class ConnectionManager implements IConnectionManager {
this.dialer = new Dialer({
libp2p: options.libp2p,
shardReader: this.shardReader
shardReader: this.shardReader,
options: this.options
});
this.discoveryDialer = new DiscoveryDialer({
@ -97,6 +104,7 @@ export class ConnectionManager implements IConnectionManager {
}
public start(): void {
this.dialer.start();
this.networkMonitor.start();
this.discoveryDialer.start();
this.keepAliveManager.start();
@ -104,6 +112,7 @@ export class ConnectionManager implements IConnectionManager {
}
public stop(): void {
this.dialer.stop();
this.networkMonitor.stop();
this.discoveryDialer.stop();
this.keepAliveManager.stop();

View File

@ -1,5 +1,5 @@
import { PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { ConnectionManagerOptions, Libp2p } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
@ -13,6 +13,7 @@ describe("Dialer", () => {
let mockPeerId: PeerId;
let mockPeerId2: PeerId;
let clock: sinon.SinonFakeTimers;
let mockOptions: ConnectionManagerOptions;
const createMockPeerId = (id: string): PeerId =>
({
@ -31,10 +32,21 @@ describe("Dialer", () => {
isPeerOnNetwork: sinon.stub().resolves(true)
} as unknown as sinon.SinonStubbedInstance<ShardReader>;
mockOptions = {
maxBootstrapPeers: 1,
pingKeepAlive: 300,
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
};
mockPeerId = createMockPeerId("12D3KooWTest1");
mockPeerId2 = createMockPeerId("12D3KooWTest2");
clock = sinon.useFakeTimers();
clock = sinon.useFakeTimers({
now: 1000000000000
});
});
afterEach(() => {
@ -49,7 +61,8 @@ describe("Dialer", () => {
it("should create dialer with libp2p and shardReader", () => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
expect(dialer).to.be.instanceOf(Dialer);
@ -60,7 +73,8 @@ describe("Dialer", () => {
beforeEach(() => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
});
@ -98,7 +112,8 @@ describe("Dialer", () => {
beforeEach(() => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
dialer.start();
});
@ -135,7 +150,8 @@ describe("Dialer", () => {
beforeEach(() => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
dialer.start();
});
@ -152,14 +168,28 @@ describe("Dialer", () => {
it("should add peer to queue when queue is not empty", async () => {
const dialStub = libp2p.dial as sinon.SinonStub;
dialStub.resolves();
void dialer.dial(mockPeerId);
let resolveFirstDial: () => void;
const firstDialPromise = new Promise<void>((resolve) => {
resolveFirstDial = resolve;
});
dialStub.onFirstCall().returns(firstDialPromise);
dialStub.onSecondCall().resolves();
const firstDialCall = dialer.dial(mockPeerId);
dialStub.resetHistory();
await dialer.dial(mockPeerId2);
expect(dialStub.called).to.be.true;
expect(dialStub.calledOnce).to.be.true;
expect(dialStub.calledWith(mockPeerId)).to.be.true;
resolveFirstDial!();
await firstDialCall;
clock.tick(500);
await Promise.resolve();
expect(dialStub.calledTwice).to.be.true;
expect(dialStub.calledWith(mockPeerId2)).to.be.true;
});
@ -186,7 +216,54 @@ describe("Dialer", () => {
clock.tick(5000);
await dialer.dial(mockPeerId);
expect(dialStub.called).to.be.true;
expect(dialStub.called).to.be.false;
});
it("should skip peer when failed to dial recently", async () => {
const dialStub = libp2p.dial as sinon.SinonStub;
dialStub.rejects(new Error("Dial failed"));
await dialer.dial(mockPeerId);
expect(dialStub.calledOnce).to.be.true;
dialStub.resetHistory();
dialStub.resolves();
clock.tick(30000);
await dialer.dial(mockPeerId);
expect(dialStub.called).to.be.false;
});
it("should populate queue if has active dial", async () => {
const dialStub = libp2p.dial as sinon.SinonStub;
const mockPeerId3 = createMockPeerId("12D3KooWTest3");
let resolveFirstDial: () => void;
const firstDialPromise = new Promise<void>((resolve) => {
resolveFirstDial = resolve;
});
dialStub.onFirstCall().returns(firstDialPromise);
dialStub.onSecondCall().resolves();
dialStub.onThirdCall().resolves();
const firstDialCall = dialer.dial(mockPeerId);
await dialer.dial(mockPeerId2);
await dialer.dial(mockPeerId3);
expect(dialStub.calledOnce).to.be.true;
expect(dialStub.calledWith(mockPeerId)).to.be.true;
resolveFirstDial!();
await firstDialCall;
clock.tick(500);
await Promise.resolve();
expect(dialStub.callCount).to.equal(3);
expect(dialStub.calledWith(mockPeerId2)).to.be.true;
expect(dialStub.calledWith(mockPeerId3)).to.be.true;
});
it("should allow redial after cooldown period", async () => {
@ -252,13 +329,49 @@ describe("Dialer", () => {
expect(dialStub.calledOnce).to.be.true;
expect(dialStub.calledWith(mockPeerId)).to.be.true;
});
it("should allow redial after failed dial cooldown expires", async () => {
const dialStub = libp2p.dial as sinon.SinonStub;
dialStub.onFirstCall().rejects(new Error("Dial failed"));
dialStub.onSecondCall().resolves();
await dialer.dial(mockPeerId);
expect(dialStub.calledOnce).to.be.true;
clock.tick(60001);
await dialer.dial(mockPeerId);
expect(dialStub.calledTwice).to.be.true;
});
it("should handle queue overflow by adding peers to queue", async () => {
const dialStub = libp2p.dial as sinon.SinonStub;
const peers = [];
for (let i = 0; i < 100; i++) {
peers.push(createMockPeerId(`12D3KooWTest${i}`));
}
let resolveFirstDial: () => void;
const firstDialPromise = new Promise<void>((resolve) => {
resolveFirstDial = resolve;
});
dialStub.onFirstCall().returns(firstDialPromise);
dialStub.resolves();
const firstDialCall = dialer.dial(peers[0]);
for (let i = 1; i < 100; i++) {
await dialer.dial(peers[i]);
}
expect(dialStub.calledOnce).to.be.true;
resolveFirstDial!();
await firstDialCall;
clock.tick(500);
await Promise.resolve();
expect(dialStub.callCount).to.be.greaterThan(1);
});
});
describe("queue processing", () => {
beforeEach(() => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
dialer.start();
});
@ -334,7 +447,8 @@ describe("Dialer", () => {
beforeEach(() => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
dialer.start();
});
@ -368,7 +482,8 @@ describe("Dialer", () => {
it("should handle complete dial lifecycle", async () => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
dialer.start();
@ -386,7 +501,8 @@ describe("Dialer", () => {
it("should handle multiple peers with different shard configurations", async () => {
dialer = new Dialer({
libp2p,
shardReader: mockShardReader
shardReader: mockShardReader,
options: mockOptions
});
dialer.start();

View File

@ -1,5 +1,5 @@
import type { PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { ConnectionManagerOptions, Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { ShardReader } from "./shard_reader.js";
@ -9,6 +9,7 @@ const log = new Logger("dialer");
type DialerConstructorOptions = {
libp2p: Libp2p;
shardReader: ShardReader;
options: ConnectionManagerOptions;
};
interface IDialer {
@ -20,18 +21,25 @@ interface IDialer {
export class Dialer implements IDialer {
private readonly libp2p: Libp2p;
private readonly shardReader: ShardReader;
private readonly options: ConnectionManagerOptions;
private dialingQueue: PeerId[] = [];
private dialHistory: Map<string, number> = new Map();
private failedDials: Map<string, number> = new Map();
private dialingInterval: NodeJS.Timeout | null = null;
private isProcessing = false;
private isImmediateDialing = false;
public constructor(options: DialerConstructorOptions) {
this.libp2p = options.libp2p;
this.shardReader = options.shardReader;
this.options = options.options;
}
public start(): void {
log.info("Starting dialer");
if (!this.dialingInterval) {
this.dialingInterval = setInterval(() => {
void this.processQueue();
@ -39,15 +47,19 @@ export class Dialer implements IDialer {
}
this.dialHistory.clear();
this.failedDials.clear();
}
public stop(): void {
log.info("Stopping dialer");
if (this.dialingInterval) {
clearInterval(this.dialingInterval);
this.dialingInterval = null;
}
this.dialHistory.clear();
this.failedDials.clear();
}
public async dial(peerId: PeerId): Promise<void> {
@ -58,11 +70,17 @@ export class Dialer implements IDialer {
return;
}
const isEmptyQueue = this.dialingQueue.length === 0;
const isNotDialing = !this.isProcessing && !this.isImmediateDialing;
// If queue is empty and we're not currently processing, dial immediately
if (this.dialingQueue.length === 0 && !this.isProcessing) {
if (isEmptyQueue && isNotDialing) {
this.isImmediateDialing = true;
log.info("Dialed peer immediately");
await this.dialPeer(peerId);
this.isImmediateDialing = false;
log.info("Released immediate dial lock");
} else {
// Add to queue
this.dialingQueue.push(peerId);
log.info(
`Added peer to dialing queue, queue size: ${this.dialingQueue.length}`
@ -71,12 +89,17 @@ export class Dialer implements IDialer {
}
private async processQueue(): Promise<void> {
if (this.dialingQueue.length === 0 || this.isProcessing) return;
if (this.dialingQueue.length === 0 || this.isProcessing) {
return;
}
this.isProcessing = true;
try {
const peersToDial = this.dialingQueue.slice(0, 3);
const peersToDial = this.dialingQueue.slice(
0,
this.options.maxDialingPeers
);
this.dialingQueue = this.dialingQueue.slice(peersToDial.length);
log.info(
@ -95,10 +118,12 @@ export class Dialer implements IDialer {
await this.libp2p.dial(peerId);
this.dialHistory.set(peerId.toString(), Date.now());
this.failedDials.delete(peerId.toString());
log.info(`Successfully dialed peer from queue: ${peerId}`);
} catch (error) {
log.error(`Error dialing peer ${peerId}`, error);
this.failedDials.set(peerId.toString(), Date.now());
}
}
@ -109,14 +134,18 @@ export class Dialer implements IDialer {
return true;
}
const lastDialed = this.dialHistory.get(peerId.toString());
if (lastDialed && Date.now() - lastDialed < 10_000) {
if (this.isRecentlyDialed(peerId)) {
log.info(
`Skipping peer ${peerId} - already dialed in the last 10 seconds`
);
return true;
}
if (this.isRecentlyFailed(peerId)) {
log.info(`Skipping peer ${peerId} - recently failed to dial`);
return true;
}
try {
const hasShardInfo = await this.shardReader.hasShardInfo(peerId);
if (!hasShardInfo) {
@ -136,4 +165,28 @@ export class Dialer implements IDialer {
return true; // Skip peer when there's an error
}
}
private isRecentlyDialed(peerId: PeerId): boolean {
const lastDialed = this.dialHistory.get(peerId.toString());
if (
lastDialed &&
Date.now() - lastDialed < this.options.dialCooldown * 1000
) {
return true;
}
return false;
}
private isRecentlyFailed(peerId: PeerId): boolean {
const lastFailed = this.failedDials.get(peerId.toString());
if (
lastFailed &&
Date.now() - lastFailed < this.options.failedDialCooldown * 1000
) {
return true;
}
return false;
}
}

View File

@ -31,6 +31,27 @@ export type ConnectionManagerOptions = {
* @default 300 seconds
*/
relayKeepAlive: number;
/**
* Max number of peers to dial at once.
*
* @default 3
*/
maxDialingPeers: number;
/**
* Time to wait before dialing failed peers again.
*
* @default 60 seconds
*/
failedDialCooldown: number;
/**
* Time to wait before dialing a peer again.
*
* @default 10 seconds
*/
dialCooldown: number;
};
export interface IConnectionManager {