feat: add recovery and connection maintenance (#2496)

* add FF for auto recovery

* implement connection locking, connection maintenance, auto recovery, bootstrap connections maintenance and fix bootstrap peers dropping

* add ut for peer manager changes

* implement UT for Connection Limiter

* increase connection maintenance interval

* update e2e test
This commit is contained in:
Sasha 2025-07-17 01:15:36 +02:00 committed by GitHub
parent 7f7f772d93
commit ed389ccbc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 505 additions and 849 deletions

View File

@ -1,5 +1,6 @@
import { Peer, PeerId } from "@libp2p/interface";
import {
CONNECTION_LOCKED_TAG,
ConnectionManagerOptions,
IWakuEventEmitter,
Libp2p,
@ -13,6 +14,8 @@ import { NetworkMonitor } from "./network_monitor.js";
const log = new Logger("connection-limiter");
const DEFAULT_CONNECTION_MONITOR_INTERVAL = 5 * 1_000;
type ConnectionLimiterConstructorOptions = {
libp2p: Libp2p;
events: IWakuEventEmitter;
@ -37,6 +40,7 @@ export class ConnectionLimiter implements IConnectionLimiter {
private readonly networkMonitor: NetworkMonitor;
private readonly dialer: Dialer;
private connectionMonitorInterval: NodeJS.Timeout | null = null;
private readonly options: ConnectionManagerOptions;
public constructor(options: ConnectionLimiterConstructorOptions) {
@ -48,7 +52,6 @@ export class ConnectionLimiter implements IConnectionLimiter {
this.options = options.options;
this.onWakuConnectionEvent = this.onWakuConnectionEvent.bind(this);
this.onConnectedEvent = this.onConnectedEvent.bind(this);
this.onDisconnectedEvent = this.onDisconnectedEvent.bind(this);
}
@ -56,12 +59,17 @@ export class ConnectionLimiter implements IConnectionLimiter {
// dial all known peers because libp2p might have emitted `peer:discovery` before initialization
void this.dialPeersFromStore();
this.events.addEventListener("waku:connection", this.onWakuConnectionEvent);
if (
this.options.enableAutoRecovery &&
this.connectionMonitorInterval === null
) {
this.connectionMonitorInterval = setInterval(
() => void this.maintainConnections(),
DEFAULT_CONNECTION_MONITOR_INTERVAL
);
}
this.libp2p.addEventListener(
"peer:connect",
this.onConnectedEvent as Libp2pEventHandler<PeerId>
);
this.events.addEventListener("waku:connection", this.onWakuConnectionEvent);
/**
* NOTE: Event is not being emitted on closing nor losing a connection.
@ -86,44 +94,31 @@ export class ConnectionLimiter implements IConnectionLimiter {
this.onWakuConnectionEvent
);
this.libp2p.removeEventListener(
"peer:connect",
this.onConnectedEvent as Libp2pEventHandler<PeerId>
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onDisconnectedEvent as Libp2pEventHandler<PeerId>
);
if (this.connectionMonitorInterval) {
clearInterval(this.connectionMonitorInterval);
this.connectionMonitorInterval = null;
}
}
private onWakuConnectionEvent(): void {
if (!this.options.enableAutoRecovery) {
log.info(`Auto recovery is disabled, skipping`);
return;
}
if (this.networkMonitor.isBrowserConnected()) {
void this.dialPeersFromStore();
}
}
private async onConnectedEvent(evt: CustomEvent<PeerId>): Promise<void> {
log.info(`Connected to peer ${evt.detail.toString()}`);
const peerId = evt.detail;
const tags = await this.getTagsForPeer(peerId);
const isBootstrap = tags.includes(Tags.BOOTSTRAP);
if (!isBootstrap) {
log.info(
`Connected to peer ${peerId.toString()} is not a bootstrap peer`
);
return;
}
if (await this.hasMoreThanMaxBootstrapConnections()) {
log.info(
`Connected to peer ${peerId.toString()} and node has more than max bootstrap connections ${this.options.maxBootstrapPeers}. Dropping connection.`
);
await this.libp2p.hangUp(peerId);
}
private async maintainConnections(): Promise<void> {
await this.maintainConnectionsCount();
await this.maintainBootstrapConnections();
}
private async onDisconnectedEvent(): Promise<void> {
@ -133,22 +128,98 @@ export class ConnectionLimiter implements IConnectionLimiter {
}
}
private async maintainConnectionsCount(): Promise<void> {
log.info(`Maintaining connections count`);
const connections = this.libp2p.getConnections();
if (connections.length <= this.options.maxConnections) {
log.info(
`Node has less than max connections ${this.options.maxConnections}, trying to dial more peers`
);
const peers = await this.getPrioritizedPeers();
if (peers.length === 0) {
log.info(`No peers to dial, node is utilizing all known peers`);
return;
}
const promises = peers
.slice(0, this.options.maxConnections - connections.length)
.map((p) => this.dialer.dial(p.id));
await Promise.all(promises);
return;
}
log.info(
`Node has more than max connections ${this.options.maxConnections}, dropping connections`
);
try {
const connectionsToDrop = connections
.filter((c) => !c.tags.includes(CONNECTION_LOCKED_TAG))
.slice(this.options.maxConnections);
if (connectionsToDrop.length === 0) {
log.info(`No connections to drop, skipping`);
return;
}
const promises = connectionsToDrop.map((c) =>
this.libp2p.hangUp(c.remotePeer)
);
await Promise.all(promises);
log.info(`Dropped ${connectionsToDrop.length} connections`);
} catch (error) {
log.error(`Unexpected error while maintaining connections`, error);
}
}
private async maintainBootstrapConnections(): Promise<void> {
log.info(`Maintaining bootstrap connections`);
const bootstrapPeers = await this.getBootstrapPeers();
if (bootstrapPeers.length <= this.options.maxBootstrapPeers) {
return;
}
try {
const peersToDrop = bootstrapPeers.slice(this.options.maxBootstrapPeers);
log.info(
`Dropping ${peersToDrop.length} bootstrap connections because node has more than max bootstrap connections ${this.options.maxBootstrapPeers}`
);
const promises = peersToDrop.map((p) => this.libp2p.hangUp(p.id));
await Promise.all(promises);
log.info(`Dropped ${peersToDrop.length} bootstrap connections`);
} catch (error) {
log.error(
`Unexpected error while maintaining bootstrap connections`,
error
);
}
}
private async dialPeersFromStore(): Promise<void> {
log.info(`Dialing peers from store`);
const allPeers = await this.libp2p.peerStore.all();
const allConnections = this.libp2p.getConnections();
log.info(
`Found ${allPeers.length} peers in store, and found ${allConnections.length} connections`
);
const promises = allPeers
.filter((p) => !allConnections.some((c) => c.remotePeer.equals(p.id)))
.map((p) => this.dialer.dial(p.id));
try {
log.info(`Dialing ${promises.length} peers from store`);
const peers = await this.getPrioritizedPeers();
if (peers.length === 0) {
log.info(`No peers to dial, skipping`);
return;
}
const promises = peers.map((p) => this.dialer.dial(p.id));
log.info(`Dialing ${peers.length} peers from store`);
await Promise.all(promises);
log.info(`Dialed ${promises.length} peers from store`);
} catch (error) {
@ -156,27 +227,58 @@ export class ConnectionLimiter implements IConnectionLimiter {
}
}
private async hasMoreThanMaxBootstrapConnections(): Promise<boolean> {
try {
const peers = await Promise.all(
this.libp2p
.getConnections()
.map((conn) => conn.remotePeer)
.map((id) => this.getPeer(id))
);
/**
* Returns a list of peers ordered by priority:
* - bootstrap peers
* - peers from peer exchange
* - peers from local store (last because we are not sure that locally stored information is up to date)
*/
private async getPrioritizedPeers(): Promise<Peer[]> {
const allPeers = await this.libp2p.peerStore.all();
const allConnections = this.libp2p.getConnections();
const bootstrapPeers = peers.filter(
(peer) => peer && peer.tags.has(Tags.BOOTSTRAP)
);
log.info(
`Found ${allPeers.length} peers in store, and found ${allConnections.length} connections`
);
return bootstrapPeers.length > this.options.maxBootstrapPeers;
} catch (error) {
log.error(
`Unexpected error while checking for bootstrap connections`,
error
);
return false;
}
const notConnectedPeers = allPeers.filter(
(p) =>
!allConnections.some((c) => c.remotePeer.equals(p.id)) &&
p.addresses.some(
(a) =>
a.multiaddr.toString().includes("wss") ||
a.multiaddr.toString().includes("ws")
)
);
const bootstrapPeers = notConnectedPeers.filter((p) =>
p.tags.has(Tags.BOOTSTRAP)
);
const peerExchangePeers = notConnectedPeers.filter((p) =>
p.tags.has(Tags.PEER_EXCHANGE)
);
const localStorePeers = notConnectedPeers.filter((p) =>
p.tags.has(Tags.LOCAL)
);
return [...bootstrapPeers, ...peerExchangePeers, ...localStorePeers];
}
private async getBootstrapPeers(): Promise<Peer[]> {
const peers = await Promise.all(
this.libp2p
.getConnections()
.map((conn) => conn.remotePeer)
.map((id) => this.getPeer(id))
);
const bootstrapPeers = peers.filter(
(peer) => peer && peer.tags.has(Tags.BOOTSTRAP)
) as Peer[];
return bootstrapPeers;
}
private async getPeer(peerId: PeerId): Promise<Peer | null> {
@ -187,14 +289,4 @@ export class ConnectionLimiter implements IConnectionLimiter {
return null;
}
}
private async getTagsForPeer(peerId: PeerId): Promise<string[]> {
try {
const peer = await this.libp2p.peerStore.get(peerId);
return Array.from(peer.tags.keys());
} catch (error) {
log.error(`Failed to get peer ${peerId}, error: ${error}`);
return [];
}
}
}

View File

@ -21,9 +21,11 @@ import { getPeerPing, mapToPeerId, mapToPeerIdOrMultiaddr } from "./utils.js";
const log = new Logger("connection-manager");
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 3;
const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60;
const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60;
const DEFAULT_ENABLE_AUTO_RECOVERY = true;
const DEFAULT_MAX_CONNECTIONS = 10;
const DEFAULT_MAX_DIALING_PEERS = 3;
const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60;
const DEFAULT_DIAL_COOLDOWN_SEC = 10;
@ -56,8 +58,10 @@ export class ConnectionManager implements IConnectionManager {
this.options = {
maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
maxConnections: DEFAULT_MAX_CONNECTIONS,
pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC,
relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC,
enableAutoRecovery: DEFAULT_ENABLE_AUTO_RECOVERY,
maxDialingPeers: DEFAULT_MAX_DIALING_PEERS,
failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC,
dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC,

View File

@ -38,7 +38,9 @@ describe("Dialer", () => {
relayKeepAlive: 300,
maxDialingPeers: 3,
failedDialCooldown: 60,
dialCooldown: 10
dialCooldown: 10,
maxConnections: 10,
enableAutoRecovery: true
};
mockPeerId = createMockPeerId("12D3KooWTest1");

View File

@ -3,21 +3,32 @@ import type { MultiaddrInput } from "@multiformats/multiaddr";
import type { PubsubTopic } from "./misc.js";
// Peer tags
export enum Tags {
BOOTSTRAP = "bootstrap",
PEER_EXCHANGE = "peer-exchange",
LOCAL = "local-peer-cache"
}
// Connection tag
export const CONNECTION_LOCKED_TAG = "locked";
export type ConnectionManagerOptions = {
/**
* Max number of bootstrap peers allowed to be connected to initially.
* This is used to increase intention of dialing non-bootstrap peers, found using other discovery mechanisms (like Peer Exchange).
*
* @default 1
* @default 3
*/
maxBootstrapPeers: number;
/**
* Max number of connections allowed to be connected to.
*
* @default 10
*/
maxConnections: number;
/**
* Keep alive libp2p pings interval in seconds.
*
@ -32,6 +43,17 @@ export type ConnectionManagerOptions = {
*/
relayKeepAlive: number;
/**
* Enable auto recovery of connections if has not enough:
* - bootstrap peers
* - LightPush and Filter peers
* - number of connected peers
* - dial known peers on reconnect to Internet
*
* @default true
*/
enableAutoRecovery: boolean;
/**
* Max number of peers to dial at once.
*

View File

@ -10,7 +10,7 @@ import { CreateNodeOptions, type Libp2pComponents } from "@waku/interfaces";
export function getPeerDiscoveries(
enabled?: CreateNodeOptions["discovery"]
): ((components: Libp2pComponents) => PeerDiscovery)[] {
const dnsEnrTrees = [enrTree["SANDBOX"]];
const dnsEnrTrees = [enrTree["SANDBOX"], enrTree["TEST"]];
const discoveries: ((components: Libp2pComponents) => PeerDiscovery)[] = [];

View File

@ -1,5 +1,10 @@
import { PeerId } from "@libp2p/interface";
import { IConnectionManager, Libp2p, Protocols } from "@waku/interfaces";
import {
CONNECTION_LOCKED_TAG,
IConnectionManager,
Libp2p,
Protocols
} from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
@ -10,6 +15,7 @@ describe("PeerManager", () => {
let peerManager: PeerManager;
let connectionManager: IConnectionManager;
let peers: any[];
let mockConnections: any[];
const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8";
const TEST_PROTOCOL = Protocols.LightPush;
@ -42,7 +48,6 @@ describe("PeerManager", () => {
};
beforeEach(() => {
libp2p = mockLibp2p();
peers = [
{
id: makePeerId("peer-1"),
@ -57,6 +62,21 @@ describe("PeerManager", () => {
protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store]
}
];
mockConnections = [
{
remotePeer: makePeerId("peer-1"),
tags: [] as string[]
},
{
remotePeer: makePeerId("peer-2"),
tags: [] as string[]
},
{
remotePeer: makePeerId("peer-3"),
tags: [] as string[]
}
];
libp2p = mockLibp2p(mockConnections);
connectionManager = {
pubsubTopics: [TEST_PUBSUB_TOPIC],
getConnectedPeers: async () => peers,
@ -222,11 +242,58 @@ describe("PeerManager", () => {
});
expect(true).to.be.true;
});
it("should add CONNECTION_LOCKED_TAG to peer connections when locking", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
const peerId = result[0];
const connection = mockConnections.find((c) => c.remotePeer.equals(peerId));
expect(connection).to.exist;
expect(connection.tags).to.include(CONNECTION_LOCKED_TAG);
});
it("should remove CONNECTION_LOCKED_TAG from peer connections when unlocking", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
const peerId = result[0];
await peerManager.renewPeer(peerId, {
protocol: TEST_PROTOCOL,
pubsubTopic: TEST_PUBSUB_TOPIC
});
const connection = mockConnections.find((c) => c.remotePeer.equals(peerId));
expect(connection).to.exist;
expect(connection.tags).to.not.include(CONNECTION_LOCKED_TAG);
});
it("should not modify tags of connections for different peers", async () => {
clearPeerState();
const result = await getPeersForTest();
if (skipIfNoPeers(result)) return;
const lockedPeerId = result[0];
const otherPeerId = peers.find((p) => !p.id.equals(lockedPeerId))?.id;
if (!otherPeerId) return;
const otherConnection = mockConnections.find((c) =>
c.remotePeer.equals(otherPeerId)
);
expect(otherConnection).to.exist;
expect(otherConnection.tags).to.not.include(CONNECTION_LOCKED_TAG);
});
});
function mockLibp2p(): Libp2p {
function mockLibp2p(connections: any[]): Libp2p {
return {
getConnections: sinon.stub(),
getConnections: sinon.stub().returns(connections),
getPeers: sinon
.stub()
.returns([

View File

@ -10,7 +10,12 @@ import {
LightPushCodec,
StoreCodec
} from "@waku/core";
import { Libp2p, Libp2pEventHandler, Protocols } from "@waku/interfaces";
import {
CONNECTION_LOCKED_TAG,
Libp2p,
Libp2pEventHandler,
Protocols
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
const log = new Logger("peer-manager");
@ -229,6 +234,10 @@ export class PeerManager {
private lockPeer(id: PeerId): void {
log.info(`Locking peer ${id}`);
this.lockedPeers.add(id.toString());
this.libp2p
.getConnections()
.filter((c) => c.remotePeer.equals(id))
.forEach((c) => c.tags.push(CONNECTION_LOCKED_TAG));
this.unlockedPeers.delete(id.toString());
}
@ -239,6 +248,12 @@ export class PeerManager {
private unlockPeer(id: PeerId): void {
log.info(`Unlocking peer ${id}`);
this.lockedPeers.delete(id.toString());
this.libp2p
.getConnections()
.filter((c) => c.remotePeer.equals(id))
.forEach((c) => {
c.tags = c.tags.filter((t) => t !== CONNECTION_LOCKED_TAG);
});
this.unlockedPeers.set(id.toString(), Date.now());
}

View File

@ -61,7 +61,21 @@ describe("Connection Limiter", function () {
);
});
it("should discard bootstrap peers when has more than 1 (default limit)", async function () {
it("should discard bootstrap peers when has more than set limit", async function () {
this.timeout(15_000); // increase due to additional initialization
await teardownNodesWithRedundancy(serviceNodes, [waku]);
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
{ lightpush: true, filter: true, peerExchange: true },
false,
2,
true,
{ connectionManager: { maxBootstrapPeers: 1 } }
);
let peers = await waku.getConnectedPeers();
expect(peers.length).to.equal(
serviceNodes.nodes.length,