From ed389ccbc970c8e41761c5c427d151bcf9f72725 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Thu, 17 Jul 2025 01:15:36 +0200 Subject: [PATCH] feat: add recovery and connection maintenance (#2496) * add FF for auto recovery * implement connection locking, connection maintenance, auto recovery, bootstrap connections maintenance and fix bootstrap peers dropping * add ut for peer manager changes * implement UT for Connection Limiter * increase connection maintenance interval * update e2e test --- .../connection_limiter.spec.ts | 972 ++++-------------- .../connection_manager/connection_limiter.ts | 238 +++-- .../connection_manager/connection_manager.ts | 6 +- .../src/lib/connection_manager/dialer.spec.ts | 4 +- packages/interfaces/src/connection_manager.ts | 24 +- packages/sdk/src/create/discovery.ts | 2 +- .../sdk/src/peer_manager/peer_manager.spec.ts | 75 +- packages/sdk/src/peer_manager/peer_manager.ts | 17 +- .../connection_limiter.spec.ts | 16 +- 9 files changed, 505 insertions(+), 849 deletions(-) diff --git a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts index 856c57f5f5..e57bced3a1 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts @@ -1,5 +1,10 @@ import { type Connection, type Peer, type PeerId } from "@libp2p/interface"; -import { IWakuEventEmitter, Tags } from "@waku/interfaces"; +import { multiaddr } from "@multiformats/multiaddr"; +import { + CONNECTION_LOCKED_TAG, + IWakuEventEmitter, + Tags +} from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -23,7 +28,13 @@ describe("ConnectionLimiter", () => { const createMockPeerId = (id: string): PeerId => ({ toString: () => id, - equals: (other: PeerId) => other.toString() === id + equals: function (other: PeerId) { + return ( + other && + typeof other.toString === "function" && + other.toString() === id + ); + } }) as PeerId; const createMockPeer = (id: string, tags: string[] = []): Peer => @@ -42,23 +53,37 @@ describe("ConnectionLimiter", () => { ): Connection => ({ remotePeer: peerId, - tags + tags: tags || [] }) as Connection; const defaultOptions = { + maxConnections: 5, maxBootstrapPeers: 2, pingKeepAlive: 300, relayKeepAlive: 300, + enableAutoRecovery: true, maxDialingPeers: 3, failedDialCooldown: 60, dialCooldown: 10 }; + function createLimiter( + opts: Partial = {} + ): ConnectionLimiter { + return new ConnectionLimiter({ + libp2p, + events, + dialer, + networkMonitor, + options: { ...defaultOptions, ...opts } + }); + } + beforeEach(() => { mockPeerId = createMockPeerId("12D3KooWTest1"); mockPeer = createMockPeer("12D3KooWTest1", [Tags.BOOTSTRAP]); - mockPeer2 = createMockPeer("12D3KooWTest2", []); + mockPeer2 = createMockPeer("12D3KooWTest2", [Tags.BOOTSTRAP]); // Ensure mockPeer2 is prioritized and dialed mockConnection = createMockConnection(mockPeerId, [Tags.BOOTSTRAP]); libp2p = { @@ -79,12 +104,6 @@ describe("ConnectionLimiter", () => { dispatchEvent: sinon.stub() } as any; - dialer = { - start: sinon.stub(), - stop: sinon.stub(), - dial: sinon.stub().resolves() - } as unknown as sinon.SinonStubbedInstance; - networkMonitor = { start: sinon.stub(), stop: sinon.stub(), @@ -101,48 +120,15 @@ describe("ConnectionLimiter", () => { sinon.restore(); }); - describe("constructor", () => { - it("should create ConnectionLimiter with required options", () => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - - expect(connectionLimiter).to.be.instanceOf(ConnectionLimiter); - }); - - it("should store libp2p and options references", () => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - - expect(connectionLimiter).to.have.property("libp2p"); - expect(connectionLimiter).to.have.property("options"); - }); - }); - describe("start", () => { beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + connectionLimiter = createLimiter(); }); it("should dial peers from store on start", async () => { const dialPeersStub = sinon.stub( - connectionLimiter, - "dialPeersFromStore" as any + connectionLimiter as any, + "dialPeersFromStore" ); connectionLimiter.start(); @@ -150,7 +136,7 @@ describe("ConnectionLimiter", () => { expect(dialPeersStub.calledOnce).to.be.true; }); - it("should add event listeners for waku:connection, peer connect and disconnect", () => { + it("should add event listeners for waku:connection and peer:disconnect", () => { connectionLimiter.start(); expect((events.addEventListener as sinon.SinonStub).calledOnce).to.be @@ -162,10 +148,7 @@ describe("ConnectionLimiter", () => { ) ).to.be.true; - expect(libp2p.addEventListener.calledTwice).to.be.true; - expect( - libp2p.addEventListener.calledWith("peer:connect", sinon.match.func) - ).to.be.true; + expect(libp2p.addEventListener.calledOnce).to.be.true; expect( libp2p.addEventListener.calledWith("peer:disconnect", sinon.match.func) ).to.be.true; @@ -178,19 +161,13 @@ describe("ConnectionLimiter", () => { expect((events.addEventListener as sinon.SinonStub).callCount).to.equal( 2 ); - expect(libp2p.addEventListener.callCount).to.equal(4); + expect(libp2p.addEventListener.callCount).to.equal(2); }); }); describe("stop", () => { beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + connectionLimiter = createLimiter(); connectionLimiter.start(); }); @@ -206,10 +183,7 @@ describe("ConnectionLimiter", () => { ) ).to.be.true; - expect(libp2p.removeEventListener.calledTwice).to.be.true; - expect( - libp2p.removeEventListener.calledWith("peer:connect", sinon.match.func) - ).to.be.true; + expect(libp2p.removeEventListener.calledOnce).to.be.true; expect( libp2p.removeEventListener.calledWith( "peer:disconnect", @@ -225,7 +199,7 @@ describe("ConnectionLimiter", () => { expect( (events.removeEventListener as sinon.SinonStub).callCount ).to.equal(2); - expect(libp2p.removeEventListener.callCount).to.equal(4); + expect(libp2p.removeEventListener.callCount).to.equal(2); }); }); @@ -233,13 +207,7 @@ describe("ConnectionLimiter", () => { let eventHandler: () => void; beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + connectionLimiter = createLimiter(); connectionLimiter.start(); const addEventListenerStub = events.addEventListener as sinon.SinonStub; @@ -248,8 +216,8 @@ describe("ConnectionLimiter", () => { it("should dial peers from store when browser is connected", () => { const dialPeersStub = sinon.stub( - connectionLimiter, - "dialPeersFromStore" as any + connectionLimiter as any, + "dialPeersFromStore" ); networkMonitor.isBrowserConnected.returns(true); @@ -260,8 +228,8 @@ describe("ConnectionLimiter", () => { it("should not dial peers from store when browser is not connected", () => { const dialPeersStub = sinon.stub( - connectionLimiter, - "dialPeersFromStore" as any + connectionLimiter as any, + "dialPeersFromStore" ); networkMonitor.isBrowserConnected.returns(false); @@ -271,212 +239,86 @@ describe("ConnectionLimiter", () => { }); }); - describe("onConnectedEvent", () => { - let eventHandler: (event: CustomEvent) => Promise; + describe("onDisconnectedEvent", () => { + let eventHandler: () => Promise; beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + connectionLimiter = createLimiter(); connectionLimiter.start(); const addEventListenerStub = libp2p.addEventListener as sinon.SinonStub; eventHandler = addEventListenerStub.getCall(0).args[1]; }); - it("should handle connection event", async () => { - const mockEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await eventHandler(mockEvent); - - expect(libp2p.peerStore.get.calledWith(mockPeerId)).to.be.true; - }); - - it("should get tags for the connected peer", async () => { - const mockEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await eventHandler(mockEvent); - - expect(libp2p.peerStore.get.calledWith(mockPeerId)).to.be.true; - }); - - it("should do nothing if peer is not a bootstrap peer", async () => { - const nonBootstrapPeer = createMockPeer("12D3KooWNonBootstrap", []); - libp2p.peerStore.get.resolves(nonBootstrapPeer); - - const mockEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await eventHandler(mockEvent); - - expect(libp2p.hangUp.called).to.be.false; - }); - - it("should not hang up bootstrap peer if under limit", async () => { - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - const connectedBootstrapPeer = createMockPeer( - "12D3KooWConnectedBootstrap", - [Tags.BOOTSTRAP] - ); - - libp2p.getConnections.returns([mockConnection]); - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get - .withArgs(mockConnection.remotePeer) - .resolves(connectedBootstrapPeer); - - const mockEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await eventHandler(mockEvent); - - expect(libp2p.hangUp.called).to.be.false; - }); - - it("should hang up bootstrap peer if over limit", async () => { - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer1 = createMockPeer("12D3KooWBootstrap1", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer2 = createMockPeer("12D3KooWBootstrap2", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer3 = createMockPeer("12D3KooWBootstrap3", [ - Tags.BOOTSTRAP - ]); - - const peerId1 = createMockPeerId("peer1"); - const peerId2 = createMockPeerId("peer2"); - const peerId3 = createMockPeerId("peer3"); - - const bootstrapConnections = [ - createMockConnection(peerId1, [Tags.BOOTSTRAP]), - createMockConnection(peerId2, [Tags.BOOTSTRAP]), - createMockConnection(peerId3, [Tags.BOOTSTRAP]) - ]; - - libp2p.getConnections.returns(bootstrapConnections); - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get.withArgs(peerId1).resolves(bootstrapPeer1); - libp2p.peerStore.get.withArgs(peerId2).resolves(bootstrapPeer2); - libp2p.peerStore.get.withArgs(peerId3).resolves(bootstrapPeer3); - - const mockEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await eventHandler(mockEvent); - - expect(libp2p.hangUp.calledWith(mockPeerId)).to.be.true; - }); - - it("should handle errors in getTagsForPeer gracefully", async () => { - libp2p.peerStore.get.rejects(new Error("Peer not found")); - - const mockEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await eventHandler(mockEvent); - - expect(libp2p.hangUp.called).to.be.false; - }); - }); - - describe("onDisconnectedEvent", () => { - let eventHandler: () => Promise; - - beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - connectionLimiter.start(); - - const addEventListenerStub = libp2p.addEventListener as sinon.SinonStub; - eventHandler = addEventListenerStub.getCall(1).args[1]; - }); - it("should dial peers from store when no connections remain", async () => { libp2p.getConnections.returns([]); const dialPeersStub = sinon.stub( - connectionLimiter, - "dialPeersFromStore" as any + connectionLimiter as any, + "dialPeersFromStore" ); - await eventHandler(); - expect(dialPeersStub.calledOnce).to.be.true; }); it("should do nothing when connections still exist", async () => { libp2p.getConnections.returns([mockConnection]); const dialPeersStub = sinon.stub( - connectionLimiter, - "dialPeersFromStore" as any + connectionLimiter as any, + "dialPeersFromStore" ); - await eventHandler(); - expect(dialPeersStub.called).to.be.false; }); }); describe("dialPeersFromStore", () => { beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + dialer = { + start: sinon.stub(), + stop: sinon.stub(), + dial: sinon.stub().resolves() + } as unknown as sinon.SinonStubbedInstance; + libp2p.hangUp = sinon.stub().resolves(); + connectionLimiter = createLimiter(); + mockPeer.addresses = [ + { + multiaddr: multiaddr("/dns4/mockpeer/tcp/443/wss"), + isCertified: false + } + ]; + mockPeer2.addresses = [ + { + multiaddr: multiaddr("/dns4/mockpeer2/tcp/443/wss"), + isCertified: false + } + ]; }); it("should get all peers from store", async () => { libp2p.peerStore.all.resolves([mockPeer, mockPeer2]); libp2p.getConnections.returns([]); - await (connectionLimiter as any).dialPeersFromStore(); - expect(libp2p.peerStore.all.calledOnce).to.be.true; }); it("should filter out already connected peers", async () => { + dialer.dial.resetHistory(); + libp2p.hangUp.resetHistory(); libp2p.peerStore.all.resolves([mockPeer, mockPeer2]); - libp2p.getConnections.returns([mockConnection]); - + libp2p.getConnections.returns([createMockConnection(mockPeer.id, [])]); await (connectionLimiter as any).dialPeersFromStore(); - expect(dialer.dial.calledOnce).to.be.true; expect(dialer.dial.calledWith(mockPeer2.id)).to.be.true; expect(dialer.dial.calledWith(mockPeer.id)).to.be.false; }); it("should dial all remaining peers", async () => { + dialer.dial.resetHistory(); + libp2p.hangUp.resetHistory(); libp2p.peerStore.all.resolves([mockPeer, mockPeer2]); libp2p.getConnections.returns([]); - await (connectionLimiter as any).dialPeersFromStore(); - - expect(dialer.dial.calledTwice).to.be.true; + expect(dialer.dial.callCount).to.equal(2); expect(dialer.dial.calledWith(mockPeer.id)).to.be.true; expect(dialer.dial.calledWith(mockPeer2.id)).to.be.true; }); @@ -485,88 +327,28 @@ describe("ConnectionLimiter", () => { libp2p.peerStore.all.resolves([mockPeer]); libp2p.getConnections.returns([]); dialer.dial.rejects(new Error("Dial failed")); - await (connectionLimiter as any).dialPeersFromStore(); - expect(dialer.dial.calledOnce).to.be.true; }); it("should handle case with no peers in store", async () => { libp2p.peerStore.all.resolves([]); libp2p.getConnections.returns([]); - await (connectionLimiter as any).dialPeersFromStore(); - expect(dialer.dial.called).to.be.false; }); it("should handle case with all peers already connected", async () => { libp2p.peerStore.all.resolves([mockPeer]); - libp2p.getConnections.returns([mockConnection]); - + libp2p.getConnections.returns([createMockConnection(mockPeer.id)]); await (connectionLimiter as any).dialPeersFromStore(); - expect(dialer.dial.called).to.be.false; }); }); - describe("getTagsForPeer", () => { - beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - }); - - it("should return tags for existing peer", async () => { - const tags = await (connectionLimiter as any).getTagsForPeer(mockPeerId); - - expect(libp2p.peerStore.get.calledWith(mockPeerId)).to.be.true; - expect(tags).to.deep.equal([Tags.BOOTSTRAP]); - }); - - it("should return empty array for non-existent peer", async () => { - libp2p.peerStore.get.rejects(new Error("Peer not found")); - - const tags = await (connectionLimiter as any).getTagsForPeer(mockPeerId); - - expect(tags).to.deep.equal([]); - }); - - it("should handle peer store errors gracefully", async () => { - libp2p.peerStore.get.rejects(new Error("Database error")); - - const tags = await (connectionLimiter as any).getTagsForPeer(mockPeerId); - - expect(tags).to.deep.equal([]); - }); - - it("should convert tags map to array of keys", async () => { - const peerWithMultipleTags = createMockPeer("12D3KooWMultiTag", [ - Tags.BOOTSTRAP, - Tags.PEER_EXCHANGE - ]); - libp2p.peerStore.get.resolves(peerWithMultipleTags); - - const tags = await (connectionLimiter as any).getTagsForPeer(mockPeerId); - - expect(tags).to.include(Tags.BOOTSTRAP); - expect(tags).to.include(Tags.PEER_EXCHANGE); - }); - }); - describe("getPeer", () => { beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + connectionLimiter = createLimiter(); }); it("should return peer for existing peer", async () => { @@ -593,491 +375,149 @@ describe("ConnectionLimiter", () => { }); }); - describe("hasMoreThanMaxBootstrapConnections", () => { - beforeEach(() => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + describe("autoRecovery flag", () => { + it("should not dial on waku:connection if enableAutoRecovery is false, but should dial on start", () => { + connectionLimiter = createLimiter({ enableAutoRecovery: false }); + const dialPeersStub = sinon.stub( + connectionLimiter as any, + "dialPeersFromStore" + ); + connectionLimiter.start(); + expect(connectionLimiter["connectionMonitorInterval"]).to.be.null; + connectionLimiter["onWakuConnectionEvent"](); + expect(dialPeersStub.calledOnce).to.be.true; }); - it("should return false when no connections", async () => { - libp2p.getConnections.returns([]); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should return false when under bootstrap limit", async () => { - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - libp2p.getConnections.returns([mockConnection]); - libp2p.peerStore.get.resolves(bootstrapPeer); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should return false when at bootstrap limit", async () => { - const bootstrapPeer1 = createMockPeer("12D3KooWBootstrap1", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer2 = createMockPeer("12D3KooWBootstrap2", [ - Tags.BOOTSTRAP - ]); - const connection1 = createMockConnection(bootstrapPeer1.id, [ - Tags.BOOTSTRAP - ]); - const connection2 = createMockConnection(bootstrapPeer2.id, [ - Tags.BOOTSTRAP - ]); - - libp2p.getConnections.returns([connection1, connection2]); - libp2p.peerStore.get.withArgs(bootstrapPeer1.id).resolves(bootstrapPeer1); - libp2p.peerStore.get.withArgs(bootstrapPeer2.id).resolves(bootstrapPeer2); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should return true when over bootstrap limit", async () => { - const bootstrapPeer1 = createMockPeer("12D3KooWBootstrap1", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer2 = createMockPeer("12D3KooWBootstrap2", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer3 = createMockPeer("12D3KooWBootstrap3", [ - Tags.BOOTSTRAP - ]); - const connection1 = createMockConnection(bootstrapPeer1.id, [ - Tags.BOOTSTRAP - ]); - const connection2 = createMockConnection(bootstrapPeer2.id, [ - Tags.BOOTSTRAP - ]); - const connection3 = createMockConnection(bootstrapPeer3.id, [ - Tags.BOOTSTRAP - ]); - - libp2p.getConnections.returns([connection1, connection2, connection3]); - libp2p.peerStore.get.withArgs(bootstrapPeer1.id).resolves(bootstrapPeer1); - libp2p.peerStore.get.withArgs(bootstrapPeer2.id).resolves(bootstrapPeer2); - libp2p.peerStore.get.withArgs(bootstrapPeer3.id).resolves(bootstrapPeer3); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.true; - }); - - it("should return false when connections are non-bootstrap peers", async () => { - const nonBootstrapPeer1 = createMockPeer("12D3KooWNonBootstrap1", []); - const nonBootstrapPeer2 = createMockPeer("12D3KooWNonBootstrap2", []); - const connection1 = createMockConnection(nonBootstrapPeer1.id, []); - const connection2 = createMockConnection(nonBootstrapPeer2.id, []); - - libp2p.getConnections.returns([connection1, connection2]); - libp2p.peerStore.get - .withArgs(nonBootstrapPeer1.id) - .resolves(nonBootstrapPeer1); - libp2p.peerStore.get - .withArgs(nonBootstrapPeer2.id) - .resolves(nonBootstrapPeer2); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should handle mixed bootstrap and non-bootstrap peers", async () => { - const bootstrapPeer1 = createMockPeer("12D3KooWBootstrap1", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer2 = createMockPeer("12D3KooWBootstrap2", [ - Tags.BOOTSTRAP - ]); - const nonBootstrapPeer = createMockPeer("12D3KooWNonBootstrap", []); - const connection1 = createMockConnection(bootstrapPeer1.id, [ - Tags.BOOTSTRAP - ]); - const connection2 = createMockConnection(bootstrapPeer2.id, [ - Tags.BOOTSTRAP - ]); - const connection3 = createMockConnection(nonBootstrapPeer.id, []); - - libp2p.getConnections.returns([connection1, connection2, connection3]); - libp2p.peerStore.get.withArgs(bootstrapPeer1.id).resolves(bootstrapPeer1); - libp2p.peerStore.get.withArgs(bootstrapPeer2.id).resolves(bootstrapPeer2); - libp2p.peerStore.get - .withArgs(nonBootstrapPeer.id) - .resolves(nonBootstrapPeer); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should handle peer store errors gracefully", async () => { - libp2p.getConnections.returns([mockConnection]); - libp2p.peerStore.get.rejects(new Error("Peer store error")); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should handle null peers returned by getPeer", async () => { - const getPeerStub = sinon.stub(connectionLimiter, "getPeer" as any); - getPeerStub.resolves(null); - - libp2p.getConnections.returns([mockConnection]); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.false; - }); - - it("should work with custom bootstrap limits", async () => { - const customOptions = { - maxBootstrapPeers: 1, - pingKeepAlive: 300, - relayKeepAlive: 300, - maxDialingPeers: 3, - failedDialCooldown: 60, - dialCooldown: 10 - }; - - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: customOptions - }); - - const bootstrapPeer1 = createMockPeer("12D3KooWBootstrap1", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer2 = createMockPeer("12D3KooWBootstrap2", [ - Tags.BOOTSTRAP - ]); - const connection1 = createMockConnection(bootstrapPeer1.id, [ - Tags.BOOTSTRAP - ]); - const connection2 = createMockConnection(bootstrapPeer2.id, [ - Tags.BOOTSTRAP - ]); - - libp2p.getConnections.returns([connection1, connection2]); - libp2p.peerStore.get.withArgs(bootstrapPeer1.id).resolves(bootstrapPeer1); - libp2p.peerStore.get.withArgs(bootstrapPeer2.id).resolves(bootstrapPeer2); - - const result = await ( - connectionLimiter as any - ).hasMoreThanMaxBootstrapConnections(); - - expect(result).to.be.true; + it("should start connection monitor interval and dial on waku:connection if enableAutoRecovery is true", () => { + connectionLimiter = createLimiter({ enableAutoRecovery: true }); + const dialPeersStub = sinon.stub( + connectionLimiter as any, + "dialPeersFromStore" + ); + connectionLimiter.start(); + expect(connectionLimiter["connectionMonitorInterval"]).to.not.be.null; + connectionLimiter["onWakuConnectionEvent"](); + expect(dialPeersStub.calledTwice).to.be.true; }); }); - describe("integration tests", () => { - it("should handle full lifecycle (start -> events -> stop)", async () => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); + describe("maintainConnectionsCount", () => { + beforeEach(() => { + dialer = { + start: sinon.stub(), + stop: sinon.stub(), + dial: sinon.stub().resolves() + } as unknown as sinon.SinonStubbedInstance; + libp2p.hangUp = sinon.stub().resolves(); + connectionLimiter = createLimiter({ maxConnections: 2 }); + mockPeer.addresses = [ + { + multiaddr: multiaddr("/dns4/mockpeer/tcp/443/wss"), + isCertified: false + } + ]; + mockPeer2.addresses = [ + { + multiaddr: multiaddr("/dns4/mockpeer2/tcp/443/wss"), + isCertified: false + } + ]; + }); - connectionLimiter.start(); - expect((events.addEventListener as sinon.SinonStub).calledOnce).to.be - .true; - expect(libp2p.addEventListener.calledTwice).to.be.true; - - const connectEventHandler = libp2p.addEventListener.getCall(0).args[1]; - const connectEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - await connectEventHandler(connectEvent); - expect(libp2p.peerStore.get.calledWith(mockPeerId)).to.be.true; - - const disconnectEventHandler = libp2p.addEventListener.getCall(1).args[1]; + it("should dial more peers if under maxConnections", async () => { libp2p.getConnections.returns([]); - await disconnectEventHandler(); - expect(libp2p.peerStore.all.called).to.be.true; - - connectionLimiter.stop(); - expect((events.removeEventListener as sinon.SinonStub).calledOnce).to.be - .true; - expect(libp2p.removeEventListener.calledTwice).to.be.true; - }); - - it("should handle multiple bootstrap peers with different limits", async () => { - const customOptions = { - maxBootstrapPeers: 1, - pingKeepAlive: 300, - relayKeepAlive: 300, - maxDialingPeers: 3, - failedDialCooldown: 60, - dialCooldown: 10 - }; - - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: customOptions - }); - connectionLimiter.start(); - - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer1 = createMockPeer("12D3KooWBootstrap1", [ - Tags.BOOTSTRAP - ]); - const bootstrapPeer2 = createMockPeer("12D3KooWBootstrap2", [ - Tags.BOOTSTRAP - ]); - - const peerId1 = createMockPeerId("peer1"); - const peerId2 = createMockPeerId("peer2"); - - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get.withArgs(peerId1).resolves(bootstrapPeer1); - libp2p.peerStore.get.withArgs(peerId2).resolves(bootstrapPeer2); - - libp2p.getConnections.returns([ - createMockConnection(peerId1, [Tags.BOOTSTRAP]), - createMockConnection(peerId2, [Tags.BOOTSTRAP]) - ]); - - const connectEventHandler = libp2p.addEventListener.getCall(0).args[1]; - const connectEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await connectEventHandler(connectEvent); - - expect(libp2p.hangUp.calledWith(mockPeerId)).to.be.true; - }); - - it("should handle bootstrap limit of 1 correctly", async () => { - const customOptions = { - maxBootstrapPeers: 1, - pingKeepAlive: 300, - relayKeepAlive: 300, - maxDialingPeers: 3, - failedDialCooldown: 60, - dialCooldown: 10 - }; - - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: customOptions - }); - connectionLimiter.start(); - - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - const existingBootstrapPeer = createMockPeer( - "12D3KooWExistingBootstrap", - [Tags.BOOTSTRAP] - ); - const existingPeerId = createMockPeerId("existing"); - - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get - .withArgs(existingPeerId) - .resolves(existingBootstrapPeer); - - // Include the new peer in connections since peer:connect is fired after connection is established - libp2p.getConnections.returns([ - createMockConnection(existingPeerId, [Tags.BOOTSTRAP]), - createMockConnection(mockPeerId, [Tags.BOOTSTRAP]) - ]); - - const connectEventHandler = libp2p.addEventListener.getCall(0).args[1]; - const connectEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await connectEventHandler(connectEvent); - - expect(libp2p.hangUp.calledWith(mockPeerId)).to.be.true; - }); - - it("should handle high bootstrap limit correctly", async () => { - const customOptions = { - maxBootstrapPeers: 10, - pingKeepAlive: 300, - relayKeepAlive: 300, - maxDialingPeers: 3, - failedDialCooldown: 60, - dialCooldown: 10 - }; - - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: customOptions - }); - connectionLimiter.start(); - - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - const existingBootstrapPeer = createMockPeer( - "12D3KooWExistingBootstrap", - [Tags.BOOTSTRAP] - ); - const existingPeerId = createMockPeerId("existing"); - - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get - .withArgs(existingPeerId) - .resolves(existingBootstrapPeer); - - libp2p.getConnections.returns([ - createMockConnection(existingPeerId, [Tags.BOOTSTRAP]) - ]); - - const connectEventHandler = libp2p.addEventListener.getCall(0).args[1]; - const connectEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await connectEventHandler(connectEvent); - - expect(libp2p.hangUp.called).to.be.false; - }); - - it("should handle mixed peer types with bootstrap limiting", async () => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - connectionLimiter.start(); - - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP - ]); - const existingBootstrapPeer = createMockPeer( - "12D3KooWExistingBootstrap", - [Tags.BOOTSTRAP] - ); - const nonBootstrapPeer = createMockPeer("12D3KooWNonBootstrap", []); - - const existingBootstrapPeerId = createMockPeerId("existing-bootstrap"); - const nonBootstrapPeerId = createMockPeerId("non-bootstrap"); - - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get - .withArgs(existingBootstrapPeerId) - .resolves(existingBootstrapPeer); - libp2p.peerStore.get - .withArgs(nonBootstrapPeerId) - .resolves(nonBootstrapPeer); - - libp2p.getConnections.returns([ - createMockConnection(existingBootstrapPeerId, [Tags.BOOTSTRAP]), - createMockConnection(nonBootstrapPeerId, []) - ]); - - const connectEventHandler = libp2p.addEventListener.getCall(0).args[1]; - const connectEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await connectEventHandler(connectEvent); - - expect(libp2p.hangUp.called).to.be.false; - }); - - it("should redial peers when all connections are lost", async () => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - connectionLimiter.start(); - - const disconnectEventHandler = libp2p.addEventListener.getCall(1).args[1]; - - libp2p.getConnections.returns([]); - libp2p.peerStore.all.resolves([mockPeer, mockPeer2]); - - await disconnectEventHandler(); - - expect(libp2p.peerStore.all.called).to.be.true; + sinon + .stub(connectionLimiter as any, "getPrioritizedPeers") + .resolves([mockPeer, mockPeer2]); + await (connectionLimiter as any).maintainConnectionsCount(); expect(dialer.dial.calledTwice).to.be.true; }); - it("should handle peer store errors during connection limiting", async () => { - connectionLimiter = new ConnectionLimiter({ - libp2p, - events, - dialer, - networkMonitor, - options: defaultOptions - }); - connectionLimiter.start(); - - const bootstrapPeer = createMockPeer("12D3KooWBootstrap", [ - Tags.BOOTSTRAP + it("should drop only non-locked connections when over maxConnections", async () => { + dialer.dial.resetHistory(); + libp2p.hangUp.resetHistory(); + const lockedConn = createMockConnection(mockPeerId, [ + CONNECTION_LOCKED_TAG ]); + const normalConn1 = createMockConnection(createMockPeerId("p2"), []); + const normalConn2 = createMockConnection(createMockPeerId("p3"), []); + const normalConn3 = createMockConnection(createMockPeerId("p4"), []); + const connections = [lockedConn, normalConn1, normalConn2, normalConn3]; + libp2p.getConnections.returns(connections); + sinon.stub(connectionLimiter as any, "getPrioritizedPeers").resolves([]); + await (connectionLimiter as any).maintainConnectionsCount(); - libp2p.peerStore.get.withArgs(mockPeerId).resolves(bootstrapPeer); - libp2p.peerStore.get - .withArgs(mockConnection.remotePeer) - .rejects(new Error("Peer store error")); - - libp2p.getConnections.returns([mockConnection]); - - const connectEventHandler = libp2p.addEventListener.getCall(0).args[1]; - const connectEvent = new CustomEvent("peer:connect", { - detail: mockPeerId - }); - - await connectEventHandler(connectEvent); + expect(libp2p.hangUp.callCount).to.equal(1); + expect(libp2p.hangUp.calledWith(normalConn3.remotePeer)).to.be.true; + expect(libp2p.hangUp.calledWith(normalConn1.remotePeer)).to.be.false; + expect(libp2p.hangUp.calledWith(normalConn2.remotePeer)).to.be.false; + expect(libp2p.hangUp.calledWith(lockedConn.remotePeer)).to.be.false; + }); + it("should do nothing if no non-locked connections to drop", async () => { + const lockedConn1 = createMockConnection(createMockPeerId("p1"), [ + CONNECTION_LOCKED_TAG + ]); + const lockedConn2 = createMockConnection(createMockPeerId("p2"), [ + CONNECTION_LOCKED_TAG + ]); + libp2p.getConnections.returns([lockedConn1, lockedConn2]); + sinon.stub(connectionLimiter as any, "getPrioritizedPeers").resolves([]); + await (connectionLimiter as any).maintainConnectionsCount(); expect(libp2p.hangUp.called).to.be.false; }); }); + + describe("maintainBootstrapConnections", () => { + beforeEach(() => { + connectionLimiter = createLimiter({ maxBootstrapPeers: 2 }); + }); + + it("should do nothing if at or below maxBootstrapPeers", async () => { + sinon + .stub(connectionLimiter as any, "getBootstrapPeers") + .resolves([mockPeer, mockPeer2]); + await (connectionLimiter as any).maintainBootstrapConnections(); + expect(libp2p.hangUp.called).to.be.false; + }); + + it("should drop excess bootstrap peers if over maxBootstrapPeers", async () => { + const p1 = createMockPeer("p1", [Tags.BOOTSTRAP]); + const p2 = createMockPeer("p2", [Tags.BOOTSTRAP]); + const p3 = createMockPeer("p3", [Tags.BOOTSTRAP]); + sinon + .stub(connectionLimiter as any, "getBootstrapPeers") + .resolves([p1, p2, p3]); + await (connectionLimiter as any).maintainBootstrapConnections(); + expect(libp2p.hangUp.calledOnce).to.be.true; + expect(libp2p.hangUp.calledWith(p3.id)).to.be.true; + }); + }); + + describe("dialPeersFromStore prioritization", () => { + beforeEach(() => { + connectionLimiter = createLimiter(); + }); + + it("should prioritize bootstrap, then peer exchange, then local peers", async () => { + const bootstrapPeer = createMockPeer("b", [Tags.BOOTSTRAP]); + bootstrapPeer.addresses = [ + { multiaddr: multiaddr("/dns4/b/tcp/443/wss"), isCertified: false } + ]; + const pxPeer = createMockPeer("px", [Tags.PEER_EXCHANGE]); + pxPeer.addresses = [ + { multiaddr: multiaddr("/dns4/px/tcp/443/wss"), isCertified: false } + ]; + const localPeer = createMockPeer("l", [Tags.LOCAL]); + localPeer.addresses = [ + { multiaddr: multiaddr("/dns4/l/tcp/443/wss"), isCertified: false } + ]; + libp2p.peerStore.all.resolves([bootstrapPeer, pxPeer, localPeer]); + libp2p.getConnections.returns([]); + const peers = await (connectionLimiter as any).getPrioritizedPeers(); + expect(peers[0].id.toString()).to.equal("b"); + expect(peers[1].id.toString()).to.equal("px"); + expect(peers[2].id.toString()).to.equal("l"); + }); + }); }); diff --git a/packages/core/src/lib/connection_manager/connection_limiter.ts b/packages/core/src/lib/connection_manager/connection_limiter.ts index 4210a8f761..d1e614a12c 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.ts @@ -1,5 +1,6 @@ import { Peer, PeerId } from "@libp2p/interface"; import { + CONNECTION_LOCKED_TAG, ConnectionManagerOptions, IWakuEventEmitter, Libp2p, @@ -13,6 +14,8 @@ import { NetworkMonitor } from "./network_monitor.js"; const log = new Logger("connection-limiter"); +const DEFAULT_CONNECTION_MONITOR_INTERVAL = 5 * 1_000; + type ConnectionLimiterConstructorOptions = { libp2p: Libp2p; events: IWakuEventEmitter; @@ -37,6 +40,7 @@ export class ConnectionLimiter implements IConnectionLimiter { private readonly networkMonitor: NetworkMonitor; private readonly dialer: Dialer; + private connectionMonitorInterval: NodeJS.Timeout | null = null; private readonly options: ConnectionManagerOptions; public constructor(options: ConnectionLimiterConstructorOptions) { @@ -48,7 +52,6 @@ export class ConnectionLimiter implements IConnectionLimiter { this.options = options.options; this.onWakuConnectionEvent = this.onWakuConnectionEvent.bind(this); - this.onConnectedEvent = this.onConnectedEvent.bind(this); this.onDisconnectedEvent = this.onDisconnectedEvent.bind(this); } @@ -56,12 +59,17 @@ export class ConnectionLimiter implements IConnectionLimiter { // dial all known peers because libp2p might have emitted `peer:discovery` before initialization void this.dialPeersFromStore(); - this.events.addEventListener("waku:connection", this.onWakuConnectionEvent); + if ( + this.options.enableAutoRecovery && + this.connectionMonitorInterval === null + ) { + this.connectionMonitorInterval = setInterval( + () => void this.maintainConnections(), + DEFAULT_CONNECTION_MONITOR_INTERVAL + ); + } - this.libp2p.addEventListener( - "peer:connect", - this.onConnectedEvent as Libp2pEventHandler - ); + this.events.addEventListener("waku:connection", this.onWakuConnectionEvent); /** * NOTE: Event is not being emitted on closing nor losing a connection. @@ -86,44 +94,31 @@ export class ConnectionLimiter implements IConnectionLimiter { this.onWakuConnectionEvent ); - this.libp2p.removeEventListener( - "peer:connect", - this.onConnectedEvent as Libp2pEventHandler - ); - this.libp2p.removeEventListener( "peer:disconnect", this.onDisconnectedEvent as Libp2pEventHandler ); + + if (this.connectionMonitorInterval) { + clearInterval(this.connectionMonitorInterval); + this.connectionMonitorInterval = null; + } } private onWakuConnectionEvent(): void { + if (!this.options.enableAutoRecovery) { + log.info(`Auto recovery is disabled, skipping`); + return; + } + if (this.networkMonitor.isBrowserConnected()) { void this.dialPeersFromStore(); } } - private async onConnectedEvent(evt: CustomEvent): Promise { - log.info(`Connected to peer ${evt.detail.toString()}`); - - const peerId = evt.detail; - - const tags = await this.getTagsForPeer(peerId); - const isBootstrap = tags.includes(Tags.BOOTSTRAP); - - if (!isBootstrap) { - log.info( - `Connected to peer ${peerId.toString()} is not a bootstrap peer` - ); - return; - } - - if (await this.hasMoreThanMaxBootstrapConnections()) { - log.info( - `Connected to peer ${peerId.toString()} and node has more than max bootstrap connections ${this.options.maxBootstrapPeers}. Dropping connection.` - ); - await this.libp2p.hangUp(peerId); - } + private async maintainConnections(): Promise { + await this.maintainConnectionsCount(); + await this.maintainBootstrapConnections(); } private async onDisconnectedEvent(): Promise { @@ -133,22 +128,98 @@ export class ConnectionLimiter implements IConnectionLimiter { } } + private async maintainConnectionsCount(): Promise { + log.info(`Maintaining connections count`); + + const connections = this.libp2p.getConnections(); + + if (connections.length <= this.options.maxConnections) { + log.info( + `Node has less than max connections ${this.options.maxConnections}, trying to dial more peers` + ); + + const peers = await this.getPrioritizedPeers(); + + if (peers.length === 0) { + log.info(`No peers to dial, node is utilizing all known peers`); + return; + } + + const promises = peers + .slice(0, this.options.maxConnections - connections.length) + .map((p) => this.dialer.dial(p.id)); + await Promise.all(promises); + + return; + } + + log.info( + `Node has more than max connections ${this.options.maxConnections}, dropping connections` + ); + + try { + const connectionsToDrop = connections + .filter((c) => !c.tags.includes(CONNECTION_LOCKED_TAG)) + .slice(this.options.maxConnections); + + if (connectionsToDrop.length === 0) { + log.info(`No connections to drop, skipping`); + return; + } + + const promises = connectionsToDrop.map((c) => + this.libp2p.hangUp(c.remotePeer) + ); + await Promise.all(promises); + + log.info(`Dropped ${connectionsToDrop.length} connections`); + } catch (error) { + log.error(`Unexpected error while maintaining connections`, error); + } + } + + private async maintainBootstrapConnections(): Promise { + log.info(`Maintaining bootstrap connections`); + + const bootstrapPeers = await this.getBootstrapPeers(); + + if (bootstrapPeers.length <= this.options.maxBootstrapPeers) { + return; + } + + try { + const peersToDrop = bootstrapPeers.slice(this.options.maxBootstrapPeers); + + log.info( + `Dropping ${peersToDrop.length} bootstrap connections because node has more than max bootstrap connections ${this.options.maxBootstrapPeers}` + ); + + const promises = peersToDrop.map((p) => this.libp2p.hangUp(p.id)); + await Promise.all(promises); + + log.info(`Dropped ${peersToDrop.length} bootstrap connections`); + } catch (error) { + log.error( + `Unexpected error while maintaining bootstrap connections`, + error + ); + } + } + private async dialPeersFromStore(): Promise { log.info(`Dialing peers from store`); - const allPeers = await this.libp2p.peerStore.all(); - const allConnections = this.libp2p.getConnections(); - - log.info( - `Found ${allPeers.length} peers in store, and found ${allConnections.length} connections` - ); - - const promises = allPeers - .filter((p) => !allConnections.some((c) => c.remotePeer.equals(p.id))) - .map((p) => this.dialer.dial(p.id)); - try { - log.info(`Dialing ${promises.length} peers from store`); + const peers = await this.getPrioritizedPeers(); + + if (peers.length === 0) { + log.info(`No peers to dial, skipping`); + return; + } + + const promises = peers.map((p) => this.dialer.dial(p.id)); + + log.info(`Dialing ${peers.length} peers from store`); await Promise.all(promises); log.info(`Dialed ${promises.length} peers from store`); } catch (error) { @@ -156,27 +227,58 @@ export class ConnectionLimiter implements IConnectionLimiter { } } - private async hasMoreThanMaxBootstrapConnections(): Promise { - try { - const peers = await Promise.all( - this.libp2p - .getConnections() - .map((conn) => conn.remotePeer) - .map((id) => this.getPeer(id)) - ); + /** + * Returns a list of peers ordered by priority: + * - bootstrap peers + * - peers from peer exchange + * - peers from local store (last because we are not sure that locally stored information is up to date) + */ + private async getPrioritizedPeers(): Promise { + const allPeers = await this.libp2p.peerStore.all(); + const allConnections = this.libp2p.getConnections(); - const bootstrapPeers = peers.filter( - (peer) => peer && peer.tags.has(Tags.BOOTSTRAP) - ); + log.info( + `Found ${allPeers.length} peers in store, and found ${allConnections.length} connections` + ); - return bootstrapPeers.length > this.options.maxBootstrapPeers; - } catch (error) { - log.error( - `Unexpected error while checking for bootstrap connections`, - error - ); - return false; - } + const notConnectedPeers = allPeers.filter( + (p) => + !allConnections.some((c) => c.remotePeer.equals(p.id)) && + p.addresses.some( + (a) => + a.multiaddr.toString().includes("wss") || + a.multiaddr.toString().includes("ws") + ) + ); + + const bootstrapPeers = notConnectedPeers.filter((p) => + p.tags.has(Tags.BOOTSTRAP) + ); + + const peerExchangePeers = notConnectedPeers.filter((p) => + p.tags.has(Tags.PEER_EXCHANGE) + ); + + const localStorePeers = notConnectedPeers.filter((p) => + p.tags.has(Tags.LOCAL) + ); + + return [...bootstrapPeers, ...peerExchangePeers, ...localStorePeers]; + } + + private async getBootstrapPeers(): Promise { + const peers = await Promise.all( + this.libp2p + .getConnections() + .map((conn) => conn.remotePeer) + .map((id) => this.getPeer(id)) + ); + + const bootstrapPeers = peers.filter( + (peer) => peer && peer.tags.has(Tags.BOOTSTRAP) + ) as Peer[]; + + return bootstrapPeers; } private async getPeer(peerId: PeerId): Promise { @@ -187,14 +289,4 @@ export class ConnectionLimiter implements IConnectionLimiter { return null; } } - - private async getTagsForPeer(peerId: PeerId): Promise { - try { - const peer = await this.libp2p.peerStore.get(peerId); - return Array.from(peer.tags.keys()); - } catch (error) { - log.error(`Failed to get peer ${peerId}, error: ${error}`); - return []; - } - } } diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 8b175f83a6..4dca6d9164 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -21,9 +21,11 @@ import { getPeerPing, mapToPeerId, mapToPeerIdOrMultiaddr } from "./utils.js"; const log = new Logger("connection-manager"); -const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1; +const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 3; const DEFAULT_PING_KEEP_ALIVE_SEC = 5 * 60; const DEFAULT_RELAY_KEEP_ALIVE_SEC = 5 * 60; +const DEFAULT_ENABLE_AUTO_RECOVERY = true; +const DEFAULT_MAX_CONNECTIONS = 10; const DEFAULT_MAX_DIALING_PEERS = 3; const DEFAULT_FAILED_DIAL_COOLDOWN_SEC = 60; const DEFAULT_DIAL_COOLDOWN_SEC = 10; @@ -56,8 +58,10 @@ export class ConnectionManager implements IConnectionManager { this.options = { maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, + maxConnections: DEFAULT_MAX_CONNECTIONS, pingKeepAlive: DEFAULT_PING_KEEP_ALIVE_SEC, relayKeepAlive: DEFAULT_RELAY_KEEP_ALIVE_SEC, + enableAutoRecovery: DEFAULT_ENABLE_AUTO_RECOVERY, maxDialingPeers: DEFAULT_MAX_DIALING_PEERS, failedDialCooldown: DEFAULT_FAILED_DIAL_COOLDOWN_SEC, dialCooldown: DEFAULT_DIAL_COOLDOWN_SEC, diff --git a/packages/core/src/lib/connection_manager/dialer.spec.ts b/packages/core/src/lib/connection_manager/dialer.spec.ts index 971f5dafcf..74690a79c9 100644 --- a/packages/core/src/lib/connection_manager/dialer.spec.ts +++ b/packages/core/src/lib/connection_manager/dialer.spec.ts @@ -38,7 +38,9 @@ describe("Dialer", () => { relayKeepAlive: 300, maxDialingPeers: 3, failedDialCooldown: 60, - dialCooldown: 10 + dialCooldown: 10, + maxConnections: 10, + enableAutoRecovery: true }; mockPeerId = createMockPeerId("12D3KooWTest1"); diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 6e1b0c1239..35640378df 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -3,21 +3,32 @@ import type { MultiaddrInput } from "@multiformats/multiaddr"; import type { PubsubTopic } from "./misc.js"; +// Peer tags export enum Tags { BOOTSTRAP = "bootstrap", PEER_EXCHANGE = "peer-exchange", LOCAL = "local-peer-cache" } +// Connection tag +export const CONNECTION_LOCKED_TAG = "locked"; + export type ConnectionManagerOptions = { /** * Max number of bootstrap peers allowed to be connected to initially. * This is used to increase intention of dialing non-bootstrap peers, found using other discovery mechanisms (like Peer Exchange). * - * @default 1 + * @default 3 */ maxBootstrapPeers: number; + /** + * Max number of connections allowed to be connected to. + * + * @default 10 + */ + maxConnections: number; + /** * Keep alive libp2p pings interval in seconds. * @@ -32,6 +43,17 @@ export type ConnectionManagerOptions = { */ relayKeepAlive: number; + /** + * Enable auto recovery of connections if has not enough: + * - bootstrap peers + * - LightPush and Filter peers + * - number of connected peers + * - dial known peers on reconnect to Internet + * + * @default true + */ + enableAutoRecovery: boolean; + /** * Max number of peers to dial at once. * diff --git a/packages/sdk/src/create/discovery.ts b/packages/sdk/src/create/discovery.ts index 87b48a48b8..68172cd826 100644 --- a/packages/sdk/src/create/discovery.ts +++ b/packages/sdk/src/create/discovery.ts @@ -10,7 +10,7 @@ import { CreateNodeOptions, type Libp2pComponents } from "@waku/interfaces"; export function getPeerDiscoveries( enabled?: CreateNodeOptions["discovery"] ): ((components: Libp2pComponents) => PeerDiscovery)[] { - const dnsEnrTrees = [enrTree["SANDBOX"]]; + const dnsEnrTrees = [enrTree["SANDBOX"], enrTree["TEST"]]; const discoveries: ((components: Libp2pComponents) => PeerDiscovery)[] = []; diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index 3b0532533e..81a5ec58d1 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -1,5 +1,10 @@ import { PeerId } from "@libp2p/interface"; -import { IConnectionManager, Libp2p, Protocols } from "@waku/interfaces"; +import { + CONNECTION_LOCKED_TAG, + IConnectionManager, + Libp2p, + Protocols +} from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -10,6 +15,7 @@ describe("PeerManager", () => { let peerManager: PeerManager; let connectionManager: IConnectionManager; let peers: any[]; + let mockConnections: any[]; const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8"; const TEST_PROTOCOL = Protocols.LightPush; @@ -42,7 +48,6 @@ describe("PeerManager", () => { }; beforeEach(() => { - libp2p = mockLibp2p(); peers = [ { id: makePeerId("peer-1"), @@ -57,6 +62,21 @@ describe("PeerManager", () => { protocols: [Protocols.LightPush, Protocols.Filter, Protocols.Store] } ]; + mockConnections = [ + { + remotePeer: makePeerId("peer-1"), + tags: [] as string[] + }, + { + remotePeer: makePeerId("peer-2"), + tags: [] as string[] + }, + { + remotePeer: makePeerId("peer-3"), + tags: [] as string[] + } + ]; + libp2p = mockLibp2p(mockConnections); connectionManager = { pubsubTopics: [TEST_PUBSUB_TOPIC], getConnectedPeers: async () => peers, @@ -222,11 +242,58 @@ describe("PeerManager", () => { }); expect(true).to.be.true; }); + + it("should add CONNECTION_LOCKED_TAG to peer connections when locking", async () => { + clearPeerState(); + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + + const peerId = result[0]; + const connection = mockConnections.find((c) => c.remotePeer.equals(peerId)); + + expect(connection).to.exist; + expect(connection.tags).to.include(CONNECTION_LOCKED_TAG); + }); + + it("should remove CONNECTION_LOCKED_TAG from peer connections when unlocking", async () => { + clearPeerState(); + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + + const peerId = result[0]; + await peerManager.renewPeer(peerId, { + protocol: TEST_PROTOCOL, + pubsubTopic: TEST_PUBSUB_TOPIC + }); + + const connection = mockConnections.find((c) => c.remotePeer.equals(peerId)); + + expect(connection).to.exist; + expect(connection.tags).to.not.include(CONNECTION_LOCKED_TAG); + }); + + it("should not modify tags of connections for different peers", async () => { + clearPeerState(); + const result = await getPeersForTest(); + if (skipIfNoPeers(result)) return; + + const lockedPeerId = result[0]; + const otherPeerId = peers.find((p) => !p.id.equals(lockedPeerId))?.id; + + if (!otherPeerId) return; + + const otherConnection = mockConnections.find((c) => + c.remotePeer.equals(otherPeerId) + ); + + expect(otherConnection).to.exist; + expect(otherConnection.tags).to.not.include(CONNECTION_LOCKED_TAG); + }); }); -function mockLibp2p(): Libp2p { +function mockLibp2p(connections: any[]): Libp2p { return { - getConnections: sinon.stub(), + getConnections: sinon.stub().returns(connections), getPeers: sinon .stub() .returns([ diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index b945bde100..a42baf7215 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -10,7 +10,12 @@ import { LightPushCodec, StoreCodec } from "@waku/core"; -import { Libp2p, Libp2pEventHandler, Protocols } from "@waku/interfaces"; +import { + CONNECTION_LOCKED_TAG, + Libp2p, + Libp2pEventHandler, + Protocols +} from "@waku/interfaces"; import { Logger } from "@waku/utils"; const log = new Logger("peer-manager"); @@ -229,6 +234,10 @@ export class PeerManager { private lockPeer(id: PeerId): void { log.info(`Locking peer ${id}`); this.lockedPeers.add(id.toString()); + this.libp2p + .getConnections() + .filter((c) => c.remotePeer.equals(id)) + .forEach((c) => c.tags.push(CONNECTION_LOCKED_TAG)); this.unlockedPeers.delete(id.toString()); } @@ -239,6 +248,12 @@ export class PeerManager { private unlockPeer(id: PeerId): void { log.info(`Unlocking peer ${id}`); this.lockedPeers.delete(id.toString()); + this.libp2p + .getConnections() + .filter((c) => c.remotePeer.equals(id)) + .forEach((c) => { + c.tags = c.tags.filter((t) => t !== CONNECTION_LOCKED_TAG); + }); this.unlockedPeers.set(id.toString(), Date.now()); } diff --git a/packages/tests/tests/connection-mananger/connection_limiter.spec.ts b/packages/tests/tests/connection-mananger/connection_limiter.spec.ts index 74b5207462..57aa374b14 100644 --- a/packages/tests/tests/connection-mananger/connection_limiter.spec.ts +++ b/packages/tests/tests/connection-mananger/connection_limiter.spec.ts @@ -61,7 +61,21 @@ describe("Connection Limiter", function () { ); }); - it("should discard bootstrap peers when has more than 1 (default limit)", async function () { + it("should discard bootstrap peers when has more than set limit", async function () { + this.timeout(15_000); // increase due to additional initialization + + await teardownNodesWithRedundancy(serviceNodes, [waku]); + + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + { lightpush: true, filter: true, peerExchange: true }, + false, + 2, + true, + { connectionManager: { maxBootstrapPeers: 1 } } + ); + let peers = await waku.getConnectedPeers(); expect(peers.length).to.equal( serviceNodes.nodes.length,