From 1d0e2ace7fa5b44ab192505c7ebce01a7ce343e0 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Mon, 27 Nov 2023 03:44:49 -0800 Subject: [PATCH] feat: track node connection state (#1719) Co-authored-by: chair <29414216+chair28980@users.noreply.github.com> Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com> --- packages/core/src/lib/connection_manager.ts | 41 +++- packages/core/src/lib/keep_alive_manager.ts | 6 + packages/core/src/lib/waku.ts | 4 + packages/interfaces/src/connection_manager.ts | 11 +- packages/interfaces/src/waku.ts | 2 + .../tests/tests/connection_manager.spec.ts | 218 +++++++++++++++++- 6 files changed, 274 insertions(+), 8 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 98ba706626..ca74079479 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -6,8 +6,10 @@ import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; import { decodeRelayShard } from "@waku/enr"; import { ConnectionManagerOptions, + EConnectionStateEvents, EPeersByDiscoveryEvents, IConnectionManager, + IConnectionStateEvents, IPeersByDiscoveryEvents, IRelay, KeepAliveOptions, @@ -28,7 +30,7 @@ export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3; export const DEFAULT_MAX_PARALLEL_DIALS = 3; export class ConnectionManager - extends EventEmitter + extends EventEmitter implements IConnectionManager { private static instances = new Map(); @@ -40,6 +42,33 @@ export class ConnectionManager private currentActiveParallelDialCount = 0; private pendingPeerDialQueue: Array = []; + private online: boolean = false; + + public isConnected(): boolean { + return this.online; + } + + private toggleOnline(): void { + if (!this.online) { + this.online = true; + this.dispatchEvent( + new CustomEvent(EConnectionStateEvents.CONNECTION_STATUS, { + detail: this.online + }) + ); + } + } + + private toggleOffline(): void { + if (this.online && this.libp2p.getConnections().length == 0) { + this.online = false; + this.dispatchEvent( + new CustomEvent(EConnectionStateEvents.CONNECTION_STATUS, { + detail: this.online + }) + ); + } + } public static create( peerId: string, @@ -393,12 +422,14 @@ export class ConnectionManager ) ); } + this.toggleOnline(); })(); }, - "peer:disconnect": () => { - return (evt: CustomEvent): void => { + "peer:disconnect": (evt: CustomEvent): void => { + void (async () => { this.keepAliveManager.stop(evt.detail); - }; + this.toggleOffline(); + })(); } }; @@ -427,7 +458,7 @@ export class ConnectionManager log.warn( `Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ this.configuredPubsubTopics - }). + }). Not dialing.` ); return false; diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 8b6f2fd507..df63b891a5 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -111,6 +111,12 @@ export class KeepAliveManager { this.relayKeepAliveTimers.clear(); } + public connectionsExist(): boolean { + return ( + this.pingKeepAliveTimers.size > 0 || this.relayKeepAliveTimers.size > 0 + ); + } + private scheduleRelayPings( relay: IRelay, relayPeriodSecs: number, diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index da8d15356a..0affa301cd 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -178,6 +178,10 @@ export class WakuNode implements Waku { return this.libp2p.isStarted(); } + isConnected(): boolean { + return this.connectionManager.isConnected(); + } + /** * Return the local multiaddr with peer id on which libp2p is listening. * diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 721f9b0f77..8b332fe4a7 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -49,8 +49,17 @@ export interface PeersByDiscoveryResult { }; } +export enum EConnectionStateEvents { + CONNECTION_STATUS = "waku:connection" +} + +export interface IConnectionStateEvents { + // true when online, false when offline + [EConnectionStateEvents.CONNECTION_STATUS]: CustomEvent; +} + export interface IConnectionManager - extends EventEmitter { + extends EventEmitter { getPeersByDiscovery(): Promise; stop(): void; } diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index ff8587bbca..876403ecea 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -26,6 +26,8 @@ export interface Waku { stop(): Promise; isStarted(): boolean; + + isConnected(): boolean; } export interface LightNode extends Waku { diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts index ff777e6b82..1978e2e365 100644 --- a/packages/tests/tests/connection_manager.spec.ts +++ b/packages/tests/tests/connection_manager.spec.ts @@ -2,18 +2,26 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerInfo } from "@libp2p/interface/peer-info"; import { CustomEvent } from "@libp2p/interfaces/events"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces"; +import { Multiaddr } from "@multiformats/multiaddr"; +import { + EConnectionStateEvents, + EPeersByDiscoveryEvents, + LightNode, + Protocols, + Tags +} from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { expect } from "chai"; import sinon, { SinonSpy, SinonStub } from "sinon"; import { delay } from "../dist/delay.js"; -import { tearDownNodes } from "../src/index.js"; +import { makeLogFileName, NimGoNode, tearDownNodes } from "../src/index.js"; const TEST_TIMEOUT = 10_000; const DELAY_MS = 1_000; describe("ConnectionManager", function () { + this.timeout(20_000); let waku: LightNode; beforeEach(async function () { @@ -156,6 +164,105 @@ describe("ConnectionManager", function () { expect(await peerConnectedPeerExchange).to.eq(true); }); }); + + describe("peer:disconnect", () => { + it("should emit `waku:offline` event when all peers disconnect", async function () { + const peerIdPx = await createSecp256k1PeerId(); + const peerIdPx2 = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + await waku.libp2p.peerStore.save(peerIdPx2, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx2 }) + ); + + await delay(100); + + let eventCount = 0; + const connectionStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + expect(waku.isConnected()).to.be.true; + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) + ); + + expect(await connectionStatus).to.eq(false); + expect(eventCount).to.be.eq(1); + }); + it("isConnected should return false after all peers disconnect", async function () { + const peerIdPx = await createSecp256k1PeerId(); + const peerIdPx2 = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + await waku.libp2p.peerStore.save(peerIdPx2, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx2 }) + ); + + await delay(100); + + expect(waku.isConnected()).to.be.true; + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) + ); + + expect(waku.isConnected()).to.be.false; + }); + }); }); describe("Dials", () => { @@ -376,4 +483,111 @@ describe("ConnectionManager", function () { }); }); }); + + describe("Connection state", () => { + this.timeout(20_000); + let nwaku1: NimGoNode; + let nwaku2: NimGoNode; + let nwaku1PeerId: Multiaddr; + let nwaku2PeerId: Multiaddr; + + beforeEach(async () => { + this.timeout(20_000); + nwaku1 = new NimGoNode(makeLogFileName(this.ctx) + "1"); + nwaku2 = new NimGoNode(makeLogFileName(this.ctx) + "2"); + await nwaku1.start({ + filter: true + }); + + await nwaku2.start({ + filter: true + }); + + nwaku1PeerId = await nwaku1.getMultiaddrWithId(); + nwaku2PeerId = await nwaku2.getMultiaddrWithId(); + }); + + afterEach(async () => { + this.timeout(15000); + await tearDownNodes([nwaku1, nwaku2], []); + }); + + it("should emit `waku:online` event only when first peer is connected", async function () { + this.timeout(20_000); + + let eventCount = 0; + const connectionStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + // await waku.start(); + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + + await delay(250); + + expect(await connectionStatus).to.eq(true); + expect(eventCount).to.be.eq(1); + }); + + it("isConnected should return true after first peer connects", async function () { + this.timeout(20_000); + expect(waku.isConnected()).to.be.false; + + // await waku.start(); + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + + await delay(250); + + expect(waku.isConnected()).to.be.true; + }); + + it("should emit `waku:offline` event only when all peers disconnect", async function () { + this.timeout(20_000); + expect(waku.isConnected()).to.be.false; + + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + + await delay(250); + + let eventCount = 0; + const connectionStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + await waku.libp2p.hangUp(nwaku1PeerId); + await waku.libp2p.hangUp(nwaku2PeerId); + expect(await connectionStatus).to.eq(false); + expect(eventCount).to.be.eq(1); + }); + + it("isConnected should return false after all peers disconnect", async function () { + this.timeout(20_000); + expect(waku.isConnected()).to.be.false; + + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + + await delay(250); + expect(waku.isConnected()).to.be.true; + + await waku.libp2p.hangUp(nwaku1PeerId); + await waku.libp2p.hangUp(nwaku2PeerId); + expect(waku.isConnected()).to.be.false; + }); + }); });