diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 3112f32d5f..02ea5fea20 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -38,32 +38,15 @@ export class ConnectionManager private currentActiveParallelDialCount = 0; private pendingPeerDialQueue: Array = []; - private online: boolean = false; + + private isP2PNetworkConnected: boolean = false; public isConnected(): boolean { - return this.online; - } - - private toggleOnline(): void { - if (!this.online) { - this.online = true; - this.dispatchEvent( - new CustomEvent(EConnectionStateEvents.CONNECTION_STATUS, { - detail: this.online - }) - ); + if (globalThis?.navigator && !globalThis?.navigator?.onLine) { + return false; } - } - private toggleOffline(): void { - if (this.online && this.libp2p.getConnections().length == 0) { - this.online = false; - this.dispatchEvent( - new CustomEvent(EConnectionStateEvents.CONNECTION_STATUS, { - detail: this.online - }) - ); - } + return this.isP2PNetworkConnected; } public static create( @@ -103,6 +86,7 @@ export class ConnectionManager "peer:discovery", this.onEventHandlers["peer:discovery"] ); + this.stopNetworkStatusListener(); } public async dropConnection(peerId: PeerId): Promise { @@ -193,7 +177,7 @@ export class ConnectionManager options: keepAliveOptions }); - this.run() + this.startEventListeners() .then(() => log.info(`Connection Manager is now running`)) .catch((error) => log.error(`Unexpected error while running service`, error) @@ -225,11 +209,12 @@ export class ConnectionManager } } - private async run(): Promise { - // start event listeners + private async startEventListeners(): Promise { this.startPeerDiscoveryListener(); this.startPeerConnectionListener(); this.startPeerDisconnectionListener(); + + this.startNetworkStatusListener(); } private async dialPeer(peerId: PeerId): Promise { @@ -428,14 +413,18 @@ export class ConnectionManager ) ); } - this.toggleOnline(); + + this.setP2PNetworkConnected(); })(); }, "peer:disconnect": (evt: CustomEvent): void => { void (async () => { this.keepAliveManager.stop(evt.detail); - this.toggleOffline(); + this.setP2PNetworkDisconnected(); })(); + }, + "browser:network": (): void => { + this.dispatchWakuConnectionEvent(); } }; @@ -572,4 +561,59 @@ export class ConnectionManager if (!shardInfoBytes) return undefined; return decodeRelayShard(shardInfoBytes); } + + private startNetworkStatusListener(): void { + try { + globalThis.addEventListener( + "online", + this.onEventHandlers["browser:network"] + ); + globalThis.addEventListener( + "offline", + this.onEventHandlers["browser:network"] + ); + } catch (err) { + log.error(`Failed to start network listener: ${err}`); + } + } + + private stopNetworkStatusListener(): void { + try { + globalThis.removeEventListener( + "online", + this.onEventHandlers["browser:network"] + ); + globalThis.removeEventListener( + "offline", + this.onEventHandlers["browser:network"] + ); + } catch (err) { + log.error(`Failed to stop network listener: ${err}`); + } + } + + private setP2PNetworkConnected(): void { + if (!this.isP2PNetworkConnected) { + this.isP2PNetworkConnected = true; + this.dispatchWakuConnectionEvent(); + } + } + + private setP2PNetworkDisconnected(): void { + if ( + this.isP2PNetworkConnected && + this.libp2p.getConnections().length === 0 + ) { + this.isP2PNetworkConnected = false; + this.dispatchWakuConnectionEvent(); + } + } + + private dispatchWakuConnectionEvent(): void { + this.dispatchEvent( + new CustomEvent(EConnectionStateEvents.CONNECTION_STATUS, { + detail: this.isConnected() + }) + ); + } } diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 18f1dc7521..0ef7756bbd 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -6,6 +6,7 @@ import { type ContentTopic, type CoreProtocolResult, type CreateSubscriptionResult, + EConnectionStateEvents, type IAsyncIterator, type IDecodedMessage, type IDecoder, @@ -65,20 +66,22 @@ export class SubscriptionManager implements ISubscriptionSDK { private missedMessagesByPeer: Map = new Map(); private maxPingFailures: number = DEFAULT_MAX_PINGS; private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD; + private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS; private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback - >; + > = new Map(); public constructor( private readonly pubsubTopic: PubsubTopic, - private protocol: FilterCore, - private getPeers: () => Peer[], + private readonly protocol: FilterCore, + private readonly connectionManager: ConnectionManager, + private readonly getPeers: () => Peer[], private readonly renewPeer: (peerToDisconnect: PeerId) => Promise ) { this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); + const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); this.receivedMessagesHashes = { all: new Set(), @@ -89,10 +92,6 @@ export class SubscriptionManager implements ISubscriptionSDK { allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); } - public get messageHashes(): string[] { - return [...this.receivedMessagesHashes.all]; - } - private addHash(hash: string, peerIdStr?: string): void { this.receivedMessagesHashes.all.add(hash); @@ -155,9 +154,8 @@ export class SubscriptionManager implements ISubscriptionSDK { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - if (options.keepAlive) { - this.startKeepAlivePings(options); - } + this.subscribeOptions = options; + this.startSubscriptionsMaintenance(options); return finalResult; } @@ -183,9 +181,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const finalResult = this.handleResult(results, "unsubscribe"); if (this.subscriptionCallbacks.size === 0) { - if (this.keepAliveTimer) { - this.stopKeepAlivePings(); - } + this.stopSubscriptionsMaintenance(); } return finalResult; @@ -211,9 +207,7 @@ export class SubscriptionManager implements ISubscriptionSDK { const finalResult = this.handleResult(results, "unsubscribeAll"); - if (this.keepAliveTimer) { - this.stopKeepAlivePings(); - } + this.stopSubscriptionsMaintenance(); return finalResult; } @@ -378,8 +372,19 @@ export class SubscriptionManager implements ISubscriptionSDK { } } - private startKeepAlivePings(options: SubscribeOptions): void { - const { keepAlive } = options; + private startSubscriptionsMaintenance(options: SubscribeOptions): void { + if (options?.keepAlive) { + this.startKeepAlivePings(options.keepAlive); + } + this.startConnectionListener(); + } + + private stopSubscriptionsMaintenance(): void { + this.stopKeepAlivePings(); + this.stopConnectionListener(); + } + + private startKeepAlivePings(interval: number): void { if (this.keepAliveTimer) { log.info("Recurring pings already set up."); return; @@ -389,7 +394,7 @@ export class SubscriptionManager implements ISubscriptionSDK { void this.ping().catch((error) => { log.error("Error in keep-alive ping cycle:", error); }); - }, keepAlive) as unknown as number; + }, interval) as unknown as number; } private stopKeepAlivePings(): void { @@ -403,6 +408,48 @@ export class SubscriptionManager implements ISubscriptionSDK { this.keepAliveTimer = null; } + private startConnectionListener(): void { + this.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.connectionListener.bind(this) as (v: CustomEvent) => void + ); + } + + private stopConnectionListener(): void { + this.connectionManager.removeEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.connectionListener.bind(this) as (v: CustomEvent) => void + ); + } + + private async connectionListener({ + detail: isConnected + }: CustomEvent): Promise { + if (!isConnected) { + this.stopKeepAlivePings(); + return; + } + + try { + const result = await this.ping(); + const renewPeerPromises = result.failures.map( + async (v): Promise => { + if (v.peerId) { + await this.renewAndSubscribePeer(v.peerId); + } + } + ); + + await Promise.all(renewPeerPromises); + } catch (err) { + log.error(`networkStateListener failed to recover: ${err}`); + } + + this.startKeepAlivePings( + this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive + ); + } + private incrementMissedMessageCount(peerIdStr: string): void { const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0; this.missedMessagesByPeer.set(peerIdStr, currentCount + 1); @@ -416,6 +463,7 @@ export class SubscriptionManager implements ISubscriptionSDK { class FilterSDK extends BaseProtocolSDK implements IFilterSDK { public readonly protocol: FilterCore; + private readonly _connectionManager: ConnectionManager; private activeSubscriptions = new Map(); @@ -445,8 +493,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ); this.protocol = this.core as FilterCore; - - this.activeSubscriptions = new Map(); + this._connectionManager = connectionManager; } /** @@ -576,6 +623,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { new SubscriptionManager( pubsubTopic, this.protocol, + this._connectionManager, () => this.connectedPeers, this.renewPeer.bind(this) ) diff --git a/packages/tests/tests/connection-mananger/events.spec.ts b/packages/tests/tests/connection-mananger/events.spec.ts index e8f1a5ba63..581646ccc0 100644 --- a/packages/tests/tests/connection-mananger/events.spec.ts +++ b/packages/tests/tests/connection-mananger/events.spec.ts @@ -1,5 +1,5 @@ import type { PeerId, PeerInfo } from "@libp2p/interface"; -import { CustomEvent } from "@libp2p/interface"; +import { CustomEvent, TypedEventEmitter } from "@libp2p/interface"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { EConnectionStateEvents, @@ -151,8 +151,34 @@ describe("Events", function () { }); }); - describe("peer:disconnect", () => { - it("should emit `waku:offline` event when all peers disconnect", async function () { + describe(EConnectionStateEvents.CONNECTION_STATUS, function () { + let navigatorMock: any; + + this.beforeEach(() => { + navigatorMock = { onLine: true }; + globalThis.navigator = navigatorMock; + + const eventEmmitter = new TypedEventEmitter(); + globalThis.addEventListener = + eventEmmitter.addEventListener.bind(eventEmmitter); + globalThis.removeEventListener = + eventEmmitter.removeEventListener.bind(eventEmmitter); + globalThis.dispatchEvent = + eventEmmitter.dispatchEvent.bind(eventEmmitter); + }); + + this.afterEach(() => { + // @ts-expect-error: resetting set value + globalThis.navigator = undefined; + // @ts-expect-error: resetting set value + globalThis.addEventListener = undefined; + // @ts-expect-error: resetting set value + globalThis.removeEventListener = undefined; + // @ts-expect-error: resetting set value + globalThis.dispatchEvent = undefined; + }); + + it(`should emit events and trasition isConnected state when has peers or no peers`, async function () { const peerIdPx = await createSecp256k1PeerId(); const peerIdPx2 = await createSecp256k1PeerId(); @@ -174,17 +200,8 @@ describe("Events", function () { } }); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx }) - ); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx2 }) - ); - - await delay(100); - let eventCount = 0; - const connectionStatus = new Promise((resolve) => { + const connectedStatus = new Promise((resolve) => { waku.connectionManager.addEventListener( EConnectionStateEvents.CONNECTION_STATUS, ({ detail: status }) => { @@ -194,40 +211,6 @@ describe("Events", function () { ); }); - expect(waku.isConnected()).to.be.true; - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:disconnect", { detail: peerIdPx }) - ); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) - ); - - expect(await connectionStatus).to.eq(false); - expect(eventCount).to.be.eq(1); - }); - it("isConnected should return false after all peers disconnect", async function () { - const peerIdPx = await createSecp256k1PeerId(); - const peerIdPx2 = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdPx, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - await waku.libp2p.peerStore.save(peerIdPx2, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - waku.libp2p.dispatchEvent( new CustomEvent("peer:connect", { detail: peerIdPx }) ); @@ -238,6 +221,17 @@ describe("Events", function () { await delay(100); expect(waku.isConnected()).to.be.true; + expect(await connectedStatus).to.eq(true); + expect(eventCount).to.be.eq(1); + + const disconnectedStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + resolve(status); + } + ); + }); waku.libp2p.dispatchEvent( new CustomEvent("peer:disconnect", { detail: peerIdPx }) @@ -247,6 +241,81 @@ describe("Events", function () { ); expect(waku.isConnected()).to.be.false; + expect(await disconnectedStatus).to.eq(false); + expect(eventCount).to.be.eq(2); + }); + + it("should be online or offline if network state changed", async function () { + // have to recreate js-waku for it to pick up new globalThis + waku = await createLightNode(); + + const peerIdPx = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + let eventCount = 0; + const connectedStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + + await delay(100); + + expect(waku.isConnected()).to.be.true; + expect(await connectedStatus).to.eq(true); + expect(eventCount).to.be.eq(1); + + const disconnectedStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + resolve(status); + } + ); + }); + + navigatorMock.onLine = false; + globalThis.dispatchEvent(new CustomEvent("offline")); + + await delay(100); + + expect(waku.isConnected()).to.be.false; + expect(await disconnectedStatus).to.eq(false); + expect(eventCount).to.be.eq(2); + + const connectionRecoveredStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + resolve(status); + } + ); + }); + + navigatorMock.onLine = true; + globalThis.dispatchEvent(new CustomEvent("online")); + + await delay(100); + + expect(waku.isConnected()).to.be.true; + expect(await connectionRecoveredStatus).to.eq(true); + expect(eventCount).to.be.eq(3); }); }); }); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 354f47db15..af281c039c 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -18,7 +18,8 @@ import { runMultipleNodes, ServiceNodesFleet, teardownNodesWithRedundancy, - TEST_STRING + TEST_STRING, + waitForConnections } from "../../src/index.js"; import { @@ -485,6 +486,52 @@ const runTests = (strictCheckNodes: boolean): void => { expectedPubsubTopic: TestPubsubTopic }); }); + + it("Renews subscription after lossing a connection", async function () { + // setup check + expect(waku.libp2p.getConnections()).has.length(2); + + await waku.filter.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + await serviceNodes.confirmMessageLength(1); + + // check renew logic + const nwakuPeers = await Promise.all( + serviceNodes.nodes.map((v) => v.getMultiaddrWithId()) + ); + await Promise.all(nwakuPeers.map((v) => waku.libp2p.hangUp(v))); + + expect(waku.libp2p.getConnections().length).eq(0); + + await Promise.all(nwakuPeers.map((v) => waku.libp2p.dial(v))); + await waitForConnections(nwakuPeers.length, waku); + + const testText = "second try"; + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testText) + }); + + expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq( + true + ); + serviceNodes.messageCollector.verifyReceivedMessage(1, { + expectedMessageText: testText, + expectedContentTopic: TestContentTopic + }); + }); }); };