diff --git a/package-lock.json b/package-lock.json index e50a8d7164..d17408b2ec 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7536,9 +7536,17 @@ "version": "4.17.18", "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.18.tgz", "integrity": "sha512-KJ65INaxqxmU6EoCiJmRPZC9H9RVWCRd349tXM2M3O5NA7cY6YL7c0bHAHQ93NOfTObEQ004kd2QVHs/r0+m4g==", - "dev": true, "license": "MIT" }, + "node_modules/@types/lodash.debounce": { + "version": "4.0.9", + "resolved": "https://registry.npmjs.org/@types/lodash.debounce/-/lodash.debounce-4.0.9.tgz", + "integrity": "sha512-Ma5JcgTREwpLRwMM+XwBR7DaWe96nC38uCBDFKZWbNKD+osjVzdpnUSwBcqCptrp16sSOLBAUb50Car5I0TCsQ==", + "license": "MIT", + "dependencies": { + "@types/lodash": "*" + } + }, "node_modules/@types/markdown-it": { "version": "14.1.2", "resolved": "https://registry.npmjs.org/@types/markdown-it/-/markdown-it-14.1.2.tgz", @@ -37608,12 +37616,14 @@ "@libp2p/ping": "2.0.35", "@libp2p/websockets": "9.2.16", "@noble/hashes": "^1.3.3", + "@types/lodash.debounce": "^4.0.9", "@waku/core": "0.0.38", "@waku/discovery": "0.0.11", "@waku/interfaces": "0.0.33", "@waku/proto": "^0.0.13", "@waku/utils": "0.0.26", - "libp2p": "2.8.11" + "libp2p": "2.8.11", + "lodash.debounce": "^4.0.8" }, "devDependencies": { "@libp2p/interface": "2.10.4", diff --git a/packages/browser-tests/tests/server.spec.ts b/packages/browser-tests/tests/server.spec.ts index cebcbbbce0..85c8098c05 100644 --- a/packages/browser-tests/tests/server.spec.ts +++ b/packages/browser-tests/tests/server.spec.ts @@ -391,7 +391,7 @@ test.describe("Waku Server API", () => { }); await axios.post(`${API_URL}/admin/v1/start-node`); - // Connect to peers + // FilterConnect to peers const dialResponse = await axios.post(`${API_URL}/admin/v1/peers`, { peerMultiaddrs: PEERS }); @@ -425,7 +425,7 @@ test.describe("Waku Server API", () => { }); await axios.post(`${API_URL}/admin/v1/start-node`); - // Connect to peers + // FilterConnect to peers await axios.post(`${API_URL}/admin/v1/peers`, { peerMultiaddrs: PEERS }); @@ -465,7 +465,7 @@ test.describe("Waku Server API", () => { }); await axios.post(`${API_URL}/admin/v1/start-node`); - // Connect to peers + // FilterConnect to peers await axios.post(`${API_URL}/admin/v1/peers`, { peerMultiaddrs: PEERS }); @@ -577,7 +577,7 @@ test.describe("Waku Server API", () => { // Start node await axios.post(`${API_URL}/admin/v1/start-node`); - // Connect to peers + // FilterConnect to peers await axios.post(`${API_URL}/admin/v1/peers`, { peerMultiaddrs: PEERS }); diff --git a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts index ebdeb22f64..db1d650e60 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.spec.ts @@ -3,7 +3,8 @@ import { multiaddr } from "@multiformats/multiaddr"; import { CONNECTION_LOCKED_TAG, IWakuEventEmitter, - Tags + Tags, + WakuEvent } from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -143,7 +144,7 @@ describe("ConnectionLimiter", () => { .true; expect( (events.addEventListener as sinon.SinonStub).calledWith( - "waku:connection", + WakuEvent.Connection, sinon.match.func ) ).to.be.true; @@ -178,7 +179,7 @@ describe("ConnectionLimiter", () => { .true; expect( (events.removeEventListener as sinon.SinonStub).calledWith( - "waku:connection", + WakuEvent.Connection, sinon.match.func ) ).to.be.true; diff --git a/packages/core/src/lib/connection_manager/connection_limiter.ts b/packages/core/src/lib/connection_manager/connection_limiter.ts index 713c73b2dd..3b59c5f286 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.ts @@ -5,7 +5,8 @@ import { IWakuEventEmitter, Libp2p, Libp2pEventHandler, - Tags + Tags, + WakuEvent } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -69,7 +70,10 @@ export class ConnectionLimiter implements IConnectionLimiter { ); } - this.events.addEventListener("waku:connection", this.onWakuConnectionEvent); + this.events.addEventListener( + WakuEvent.Connection, + this.onWakuConnectionEvent + ); /** * NOTE: Event is not being emitted on closing nor losing a connection. @@ -90,7 +94,7 @@ export class ConnectionLimiter implements IConnectionLimiter { public stop(): void { this.events.removeEventListener( - "waku:connection", + WakuEvent.Connection, this.onWakuConnectionEvent ); @@ -274,11 +278,9 @@ export class ConnectionLimiter implements IConnectionLimiter { .map((id) => this.getPeer(id)) ); - const bootstrapPeers = peers.filter( + return peers.filter( (peer) => peer && peer.tags.has(Tags.BOOTSTRAP) ) as Peer[]; - - return bootstrapPeers; } private async getPeer(peerId: PeerId): Promise { diff --git a/packages/core/src/lib/connection_manager/network_monitor.spec.ts b/packages/core/src/lib/connection_manager/network_monitor.spec.ts index fb4359d6cf..355fc655e9 100644 --- a/packages/core/src/lib/connection_manager/network_monitor.spec.ts +++ b/packages/core/src/lib/connection_manager/network_monitor.spec.ts @@ -1,4 +1,4 @@ -import { IWakuEventEmitter, Libp2p } from "@waku/interfaces"; +import { IWakuEventEmitter, Libp2p, WakuEvent } from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -341,7 +341,7 @@ describe("NetworkMonitor", () => { const dispatchedEvent = dispatchEventStub.getCall(0) .args[0] as CustomEvent; expect(dispatchedEvent).to.be.instanceOf(CustomEvent); - expect(dispatchedEvent.type).to.equal("waku:connection"); + expect(dispatchedEvent.type).to.equal(WakuEvent.Connection); expect(dispatchedEvent.detail).to.be.true; }); }); diff --git a/packages/core/src/lib/connection_manager/network_monitor.ts b/packages/core/src/lib/connection_manager/network_monitor.ts index 9a518674e7..d099703e53 100644 --- a/packages/core/src/lib/connection_manager/network_monitor.ts +++ b/packages/core/src/lib/connection_manager/network_monitor.ts @@ -1,4 +1,4 @@ -import { IWakuEventEmitter, Libp2p } from "@waku/interfaces"; +import { IWakuEventEmitter, Libp2p, WakuEvent } from "@waku/interfaces"; type NetworkMonitorConstructorOptions = { libp2p: Libp2p; @@ -104,7 +104,7 @@ export class NetworkMonitor implements INetworkMonitor { private dispatchNetworkEvent(): void { this.events.dispatchEvent( - new CustomEvent("waku:connection", { + new CustomEvent(WakuEvent.Connection, { detail: this.isConnected() }) ); diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 146fcd73c2..f76203fbfb 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -42,20 +42,30 @@ export class FilterCore { public constructor( private handleIncomingMessage: IncomingMessageHandler, - libp2p: Libp2p + private libp2p: Libp2p ) { this.streamManager = new StreamManager( FilterCodecs.SUBSCRIBE, libp2p.components ); + } - libp2p - .handle(FilterCodecs.PUSH, this.onRequest.bind(this), { + public async start(): Promise { + try { + await this.libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this), { maxInboundStreams: 100 - }) - .catch((e) => { - log.error("Failed to register ", FilterCodecs.PUSH, e); }); + } catch (e) { + log.error("Failed to register ", FilterCodecs.PUSH, e); + } + } + + public async stop(): Promise { + try { + await this.libp2p.unhandle(FilterCodecs.PUSH); + } catch (e) { + log.error("Failed to unregister ", FilterCodecs.PUSH, e); + } } public async subscribe( diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 1eff38e21b..cf383fcf4e 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -4,6 +4,16 @@ import type { Callback } from "./protocols.js"; export type IFilter = { readonly multicodec: string; + /** + * Starts the filter protocol. + */ + start(): Promise; + + /** + * Stops the filter protocol. + */ + stop(): Promise; + /** * Subscribes to messages that match the filtering criteria defined in the specified decoders. * Executes a callback upon receiving each message. diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index 014842aaa6..8a1e91a451 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -1,3 +1,5 @@ +import type { PeerId } from "@libp2p/interface"; + import type { IDecodedMessage, IDecoder } from "./message.js"; export type StoreCursor = Uint8Array; @@ -73,6 +75,19 @@ export type QueryRequestParams = { * @default undefined */ paginationLimit?: number; + + /** + * The service node to use for queries. Will fail if: + * - this peer is not in the peer store. + * - we are not connected to this peer + * No fallback is done. Overrides any other peer selection option. + * + * Expected to be used with [[PeerManagerEventNames.StoreConnect]] so that + * we know we are connected to this peer before doing the store query. + * + * Only use if you know what you are doing. + */ + peerId?: PeerId; }; export type IStore = { diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 394e87a0b5..1fbec20cce 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -25,28 +25,33 @@ export type CreateEncoderParams = CreateDecoderParams & { ephemeral?: boolean; }; +export enum WakuEvent { + Connection = "waku:connection", + Health = "waku:health" +} + export interface IWakuEvents { /** * Emitted when a connection is established or lost. * * @example * ```typescript - * waku.addEventListener("waku:connection", (event) => { + * waku.addEventListener(WakuEvent.Connection, (event) => { * console.log(event.detail); // true if connected, false if disconnected * }); */ - "waku:connection": CustomEvent; + [WakuEvent.Connection]: CustomEvent; /** * Emitted when the health status changes. * * @example * ```typescript - * waku.addEventListener("waku:health", (event) => { + * waku.addEventListener(WakuEvent.Health, (event) => { * console.log(event.detail); // 'Unhealthy', 'MinimallyHealthy', or 'SufficientlyHealthy' * }); */ - "waku:health": CustomEvent; + [WakuEvent.Health]: CustomEvent; } export type IWakuEventEmitter = TypedEventEmitter; @@ -74,12 +79,12 @@ export interface IWaku { /** * Emits events related to the Waku node. * Those are: - * - "waku:connection" - * - "waku:health" + * - WakuEvent.Connection + * - WakuEvent.Health * * @example * ```typescript - * waku.events.addEventListener("waku:connection", (event) => { + * waku.events.addEventListener(WakuEvent.Connection, (event) => { * console.log(event.detail); // true if connected, false if disconnected * }); * ``` diff --git a/packages/rln/src/codec.ts b/packages/rln/src/codec.ts index 62cb68dad9..067e2f382a 100644 --- a/packages/rln/src/codec.ts +++ b/packages/rln/src/codec.ts @@ -13,7 +13,7 @@ import type { IdentityCredential } from "./identity.js"; import { RlnMessage, toRLNSignal } from "./message.js"; import { RLNInstance } from "./rln.js"; -const log = new Logger("waku:rln:encoder"); +const log = new Logger("rln:encoder"); export class RLNEncoder implements IEncoder { private readonly idSecretHash: Uint8Array; diff --git a/packages/rln/src/contract/rln_base_contract.ts b/packages/rln/src/contract/rln_base_contract.ts index d6fdbf2ff5..7546596cd0 100644 --- a/packages/rln/src/contract/rln_base_contract.ts +++ b/packages/rln/src/contract/rln_base_contract.ts @@ -21,7 +21,7 @@ import { RLNContractInitOptions } from "./types.js"; -const log = new Logger("waku:rln:contract:base"); +const log = new Logger("rln:contract:base"); export class RLNBaseContract { public contract: ethers.Contract; diff --git a/packages/rln/src/contract/rln_contract.ts b/packages/rln/src/contract/rln_contract.ts index 0f894a59c1..eae91b323e 100644 --- a/packages/rln/src/contract/rln_contract.ts +++ b/packages/rln/src/contract/rln_contract.ts @@ -9,7 +9,7 @@ import { BytesUtils } from "../utils/bytes.js"; import { RLNBaseContract } from "./rln_base_contract.js"; import { RLNContractInitOptions } from "./types.js"; -const log = new Logger("waku:rln:contract"); +const log = new Logger("rln:contract"); export class RLNContract extends RLNBaseContract { private instance: RLNInstance; diff --git a/packages/rln/src/credentials_manager.ts b/packages/rln/src/credentials_manager.ts index 8eca23b466..c4fdceec70 100644 --- a/packages/rln/src/credentials_manager.ts +++ b/packages/rln/src/credentials_manager.ts @@ -17,7 +17,7 @@ import { BytesUtils } from "./utils/bytes.js"; import { extractMetaMaskSigner } from "./utils/index.js"; import { Zerokit } from "./zerokit.js"; -const log = new Logger("waku:credentials"); +const log = new Logger("rln:credentials"); /** * Manages credentials for RLN diff --git a/packages/rln/src/keystore/keystore.ts b/packages/rln/src/keystore/keystore.ts index cb86ff25a7..880d707bf3 100644 --- a/packages/rln/src/keystore/keystore.ts +++ b/packages/rln/src/keystore/keystore.ts @@ -27,7 +27,7 @@ import type { Sha256Hash } from "./types.js"; -const log = new Logger("waku:rln:keystore"); +const log = new Logger("rln:keystore"); type NwakuCredential = { crypto: { diff --git a/packages/rln/src/rln.ts b/packages/rln/src/rln.ts index c677ad9d0a..00e7eaa249 100644 --- a/packages/rln/src/rln.ts +++ b/packages/rln/src/rln.ts @@ -26,7 +26,7 @@ import * as wc from "./resources/witness_calculator"; import { WitnessCalculator } from "./resources/witness_calculator"; import { Zerokit } from "./zerokit.js"; -const log = new Logger("waku:rln"); +const log = new Logger("rln"); type WakuRLNEncoderOptions = WakuEncoderOptions & { credentials: EncryptedCredentials | DecryptedCredentials; diff --git a/packages/rln/src/utils/epoch.ts b/packages/rln/src/utils/epoch.ts index 937e6641cd..19b2f81108 100644 --- a/packages/rln/src/utils/epoch.ts +++ b/packages/rln/src/utils/epoch.ts @@ -2,7 +2,7 @@ import { Logger } from "@waku/utils"; const DefaultEpochUnitSeconds = 10; // the rln-relay epoch length in seconds -const log = new Logger("waku:rln:epoch"); +const log = new Logger("rln:epoch"); export function dateToEpoch( timestamp: Date, diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 01a819032d..dd88c6f350 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -67,20 +67,22 @@ "@libp2p/ping": "2.0.35", "@libp2p/websockets": "9.2.16", "@noble/hashes": "^1.3.3", + "@types/lodash.debounce": "^4.0.9", "@waku/core": "0.0.38", "@waku/discovery": "0.0.11", "@waku/interfaces": "0.0.33", "@waku/proto": "^0.0.13", "@waku/utils": "0.0.26", - "libp2p": "2.8.11" + "libp2p": "2.8.11", + "lodash.debounce": "^4.0.8" }, "devDependencies": { "@libp2p/interface": "2.10.4", - "@types/chai": "^4.3.11", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@rollup/plugin-replace": "^5.0.5", + "@types/chai": "^4.3.11", "@types/mocha": "^10.0.9", "@waku/build-utils": "*", "chai": "^5.1.1", @@ -104,4 +106,4 @@ "LICENSE", "README.md" ] -} \ No newline at end of file +} diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index a0819b36a5..4eabec6969 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -91,7 +91,7 @@ describe("Filter SDK", () => { const message = createMockMessage(testContentTopic); const peerId = "peer1"; - await (filter as any).onIncomingMessage(testPubsubTopic, message, peerId); + await filter["onIncomingMessage"](testPubsubTopic, message, peerId); expect(subscriptionInvokeStub.calledOnce).to.be.true; expect(subscriptionInvokeStub.firstCall.args[0]).to.equal(message); diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 43895fab7c..4d12f8d32d 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -45,6 +45,14 @@ export class Filter implements IFilter { return this.protocol.multicodec; } + public async start(): Promise { + await this.protocol.start(); + } + + public async stop(): Promise { + await this.protocol.stop(); + } + public unsubscribeAll(): void { for (const subscription of this.subscriptions.values()) { subscription.stop(); diff --git a/packages/sdk/src/filter/subscription.ts b/packages/sdk/src/filter/subscription.ts index 00804f5601..3ab7273eb1 100644 --- a/packages/sdk/src/filter/subscription.ts +++ b/packages/sdk/src/filter/subscription.ts @@ -363,11 +363,11 @@ export class Subscription { private setupEventListeners(): void { this.peerManager.events.addEventListener( - PeerManagerEventNames.Connect, + PeerManagerEventNames.FilterConnect, this.onPeerConnected as Libp2pEventHandler ); this.peerManager.events.addEventListener( - PeerManagerEventNames.Disconnect, + PeerManagerEventNames.FilterDisconnect, this.onPeerDisconnected as Libp2pEventHandler ); } @@ -398,11 +398,11 @@ export class Subscription { private disposeEventListeners(): void { this.peerManager.events.removeEventListener( - PeerManagerEventNames.Connect, + PeerManagerEventNames.FilterConnect, this.onPeerConnected as Libp2pEventHandler ); this.peerManager.events.removeEventListener( - PeerManagerEventNames.Disconnect, + PeerManagerEventNames.FilterDisconnect, this.onPeerDisconnected as Libp2pEventHandler ); } diff --git a/packages/sdk/src/health_indicator/health_indicator.spec.ts b/packages/sdk/src/health_indicator/health_indicator.spec.ts index fdda7ede55..af2bcf6d8c 100644 --- a/packages/sdk/src/health_indicator/health_indicator.spec.ts +++ b/packages/sdk/src/health_indicator/health_indicator.spec.ts @@ -1,6 +1,11 @@ import { Connection, Peer } from "@libp2p/interface"; import { FilterCodecs, LightPushCodec } from "@waku/core"; -import { HealthStatus, IWakuEventEmitter, Libp2p } from "@waku/interfaces"; +import { + HealthStatus, + IWakuEventEmitter, + Libp2p, + WakuEvent +} from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -34,8 +39,9 @@ describe("HealthIndicator", () => { // Start monitoring const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -53,8 +59,9 @@ describe("HealthIndicator", () => { healthIndicator.start(); const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -76,8 +83,9 @@ describe("HealthIndicator", () => { healthIndicator.start(); const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -131,8 +139,9 @@ describe("HealthIndicator", () => { peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2); const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -144,8 +153,9 @@ describe("HealthIndicator", () => { ); const statusChangePromise2 = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -166,8 +176,9 @@ describe("HealthIndicator", () => { sinon.stub(libp2p.peerStore, "get").resolves(peer); const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -189,8 +200,9 @@ describe("HealthIndicator", () => { sinon.stub(libp2p.peerStore, "get").rejects(new Error("Peer not found")); const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); @@ -217,8 +229,9 @@ describe("HealthIndicator", () => { peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2); const statusChangePromise = new Promise((resolve) => { - events.addEventListener("waku:health", (e: CustomEvent) => - resolve(e.detail) + events.addEventListener( + WakuEvent.Health, + (e: CustomEvent) => resolve(e.detail) ); }); diff --git a/packages/sdk/src/health_indicator/health_indicator.ts b/packages/sdk/src/health_indicator/health_indicator.ts index b3d71fb9d0..1e7888fba6 100644 --- a/packages/sdk/src/health_indicator/health_indicator.ts +++ b/packages/sdk/src/health_indicator/health_indicator.ts @@ -1,7 +1,13 @@ import type { IdentifyResult, PeerId } from "@libp2p/interface"; import { FilterCodecs, LightPushCodec } from "@waku/core"; -import { HealthStatus, IWakuEventEmitter, Libp2p } from "@waku/interfaces"; +import { + HealthStatus, + IWakuEventEmitter, + Libp2p, + WakuEvent +} from "@waku/interfaces"; import { Logger } from "@waku/utils"; +import debounce from "lodash.debounce"; type PeerEvent = (_event: CustomEvent) => void; @@ -19,10 +25,13 @@ interface IHealthIndicator { } export class HealthIndicator implements IHealthIndicator { + private isStarted = false; + private readonly libp2p: Libp2p; private readonly events: IWakuEventEmitter; private value: HealthStatus = HealthStatus.Unhealthy; + private readonly debouncedAssessHealth: ReturnType; public constructor(params: HealthIndicatorParams) { this.libp2p = params.libp2p; @@ -30,9 +39,18 @@ export class HealthIndicator implements IHealthIndicator { this.onPeerIdentify = this.onPeerIdentify.bind(this); this.onPeerDisconnected = this.onPeerDisconnected.bind(this); + + this.debouncedAssessHealth = debounce(() => { + void this.assessHealth(); + }, 100); } public start(): void { + if (this.isStarted) { + return; + } + + this.isStarted = true; log.info("start: adding listeners to libp2p"); this.libp2p.addEventListener( @@ -44,10 +62,15 @@ export class HealthIndicator implements IHealthIndicator { this.onPeerDisconnected as PeerEvent ); - void this.assessHealth(); + this.debouncedAssessHealth(); } public stop(): void { + if (!this.isStarted) { + return; + } + + this.isStarted = false; log.info("stop: removing listeners to libp2p"); this.libp2p.removeEventListener( @@ -58,22 +81,22 @@ export class HealthIndicator implements IHealthIndicator { "peer:disconnect", this.onPeerDisconnected as PeerEvent ); + + this.debouncedAssessHealth.cancel(); } public toValue(): HealthStatus { return this.value; } - private async onPeerDisconnected(_event: CustomEvent): Promise { + private onPeerDisconnected(_event: CustomEvent): void { log.info(`onPeerDisconnected: received libp2p event`); - await this.assessHealth(); + this.debouncedAssessHealth(); } - private async onPeerIdentify( - _event: CustomEvent - ): Promise { + private onPeerIdentify(_event: CustomEvent): void { log.info(`onPeerIdentify: received libp2p event`); - await this.assessHealth(); + this.debouncedAssessHealth(); } private async assessHealth(): Promise { @@ -130,7 +153,7 @@ export class HealthIndicator implements IHealthIndicator { if (this.value !== newValue) { this.value = newValue; this.events.dispatchEvent( - new CustomEvent("waku:health", { + new CustomEvent(WakuEvent.Health, { detail: this.value }) ); diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index 8a4ce9ad84..60174a9078 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -43,10 +43,7 @@ describe("PeerManager", () => { }; const skipIfNoPeers = (result: PeerId[] | null): boolean => { - if (!result || result.length === 0) { - return true; - } - return false; + return !result || result.length === 0; }; beforeEach(() => { @@ -151,20 +148,27 @@ describe("PeerManager", () => { }); it("should dispatch connect and disconnect events", () => { - const connectSpy = sinon.spy(); - const disconnectSpy = sinon.spy(); + const filterConnectSpy = sinon.spy(); + const storeConnectSpy = sinon.spy(); + const filterDisconnectSpy = sinon.spy(); peerManager.events.addEventListener( - PeerManagerEventNames.Connect, - connectSpy + PeerManagerEventNames.FilterConnect, + filterConnectSpy ); peerManager.events.addEventListener( - PeerManagerEventNames.Disconnect, - disconnectSpy + PeerManagerEventNames.StoreConnect, + storeConnectSpy + ); + peerManager.events.addEventListener( + PeerManagerEventNames.FilterDisconnect, + filterDisconnectSpy ); peerManager["dispatchFilterPeerConnect"](peers[0].id); + peerManager["dispatchStorePeerConnect"](peers[0].id); peerManager["dispatchFilterPeerDisconnect"](peers[0].id); - expect(connectSpy.calledOnce).to.be.true; - expect(disconnectSpy.calledOnce).to.be.true; + expect(filterConnectSpy.calledOnce).to.be.true; + expect(storeConnectSpy.calledOnce).to.be.true; + expect(filterDisconnectSpy.calledOnce).to.be.true; }); it("should handle onConnected and onDisconnected", async () => { diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index 48c7a1efe2..5b62292b24 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -34,20 +34,26 @@ type GetPeersParams = { }; export enum PeerManagerEventNames { - Connect = "filter:connect", - Disconnect = "filter:disconnect" + FilterConnect = "filter:connect", + FilterDisconnect = "filter:disconnect", + StoreConnect = "store:connect" } -interface IPeerManagerEvents { +export interface IPeerManagerEvents { /** * Notifies about Filter peer being connected. */ - [PeerManagerEventNames.Connect]: CustomEvent; + [PeerManagerEventNames.FilterConnect]: CustomEvent; /** * Notifies about Filter peer being disconnected. */ - [PeerManagerEventNames.Disconnect]: CustomEvent; + [PeerManagerEventNames.FilterDisconnect]: CustomEvent; + + /** + * Notifies about a Store peer being connected. + */ + [PeerManagerEventNames.StoreConnect]: CustomEvent; } /** @@ -198,13 +204,14 @@ export class PeerManager { private async onConnected(event: CustomEvent): Promise { const result = event.detail; - const isFilterPeer = result.protocols.includes( - this.matchProtocolToCodec(Protocols.Filter) - ); - - if (isFilterPeer) { + if ( + result.protocols.includes(this.matchProtocolToCodec(Protocols.Filter)) + ) { this.dispatchFilterPeerConnect(result.peerId); } + if (result.protocols.includes(this.matchProtocolToCodec(Protocols.Store))) { + this.dispatchStorePeerConnect(result.peerId); + } } private async onDisconnected(event: CustomEvent): Promise { @@ -261,18 +268,24 @@ export class PeerManager { } const wasUnlocked = new Date(value).getTime(); - return Date.now() - wasUnlocked >= 10_000 ? true : false; + return Date.now() - wasUnlocked >= 10_000; } private dispatchFilterPeerConnect(id: PeerId): void { this.events.dispatchEvent( - new CustomEvent(PeerManagerEventNames.Connect, { detail: id }) + new CustomEvent(PeerManagerEventNames.FilterConnect, { detail: id }) + ); + } + + private dispatchStorePeerConnect(id: PeerId): void { + this.events.dispatchEvent( + new CustomEvent(PeerManagerEventNames.StoreConnect, { detail: id }) ); } private dispatchFilterPeerDisconnect(id: PeerId): void { this.events.dispatchEvent( - new CustomEvent(PeerManagerEventNames.Disconnect, { detail: id }) + new CustomEvent(PeerManagerEventNames.FilterDisconnect, { detail: id }) ); } diff --git a/packages/sdk/src/query_on_connect/index.ts b/packages/sdk/src/query_on_connect/index.ts new file mode 100644 index 0000000000..982b19a916 --- /dev/null +++ b/packages/sdk/src/query_on_connect/index.ts @@ -0,0 +1,5 @@ +export { + QueryOnConnectOptions, + QueryOnConnectEvent, + QueryOnConnect +} from "./query_on_connect.js"; diff --git a/packages/sdk/src/query_on_connect/query_on_connect.spec.ts b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts new file mode 100644 index 0000000000..cdbc6f2da6 --- /dev/null +++ b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts @@ -0,0 +1,803 @@ +import { type PeerId, TypedEventEmitter } from "@libp2p/interface"; +import { + HealthStatus, + type IDecodedMessage, + type IDecoder, + IWakuEventEmitter, + QueryRequestParams, + WakuEvent +} from "@waku/interfaces"; +import { delay } from "@waku/utils"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { + IPeerManagerEvents, + PeerManagerEventNames +} from "../peer_manager/peer_manager.js"; + +import { + calculateTimeRange, + QueryOnConnect, + QueryOnConnectEvent, + QueryOnConnectOptions +} from "./query_on_connect.js"; + +describe("QueryOnConnect", () => { + let queryOnConnect: QueryOnConnect; + let mockDecoders: IDecoder[]; + let mockPeerManagerEventEmitter: TypedEventEmitter; + let mockWakuEventEmitter: IWakuEventEmitter; + let mockQueryGenerator: sinon.SinonStub; + let mockPeerId: PeerId; + let options: QueryOnConnectOptions; + + beforeEach(() => { + // Mock decoders + mockDecoders = [ + { + contentTopic: "/test/1/content", + fromWireToProtoObj: sinon.stub(), + fromProtoObj: sinon.stub() + } as any, + { + contentTopic: "/test/2/content", + fromWireToProtoObj: sinon.stub(), + fromProtoObj: sinon.stub() + } as any + ]; + + // Mock peer manager event emitter + mockPeerManagerEventEmitter = { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + dispatchEvent: sinon.stub() + } as any; + + // Mock waku event emitter + mockWakuEventEmitter = { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + dispatchEvent: sinon.stub() + } as any; + + // Mock retrieve function + mockQueryGenerator = sinon.stub().callsFake(() => + (async function* () { + yield [ + Promise.resolve({ + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined, + hashStr: "12345" + } as IDecodedMessage) + ]; + })() + ); + + mockPeerId = { + toString: () => "QmTestPeerId" + } as unknown as PeerId; + + // Mock options + options = { + forceQueryThresholdMs: 10000 + }; + }); + + describe("constructor", () => { + it("should create QueryOnConnect instance with all required parameters", () => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + expect(queryOnConnect).to.be.instanceOf(QueryOnConnect); + expect(queryOnConnect.decoders).to.equal(mockDecoders); + }); + + it("should create QueryOnConnect instance without options", () => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator + ); + + expect(queryOnConnect).to.be.instanceOf(QueryOnConnect); + expect(queryOnConnect.decoders).to.equal(mockDecoders); + }); + + it("should accept empty decoders array", () => { + queryOnConnect = new QueryOnConnect( + [], + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + expect(queryOnConnect.decoders).to.deep.equal([]); + }); + }); + + describe("start and stop", () => { + beforeEach(() => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + it("should set up event listeners when started", () => { + const peerEventSpy = + mockPeerManagerEventEmitter.addEventListener as sinon.SinonSpy; + const wakuEventSpy = + mockWakuEventEmitter.addEventListener as sinon.SinonSpy; + + queryOnConnect.start(); + + expect(peerEventSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be + .true; + expect(wakuEventSpy.calledWith(WakuEvent.Health)).to.be.true; + }); + + it("should remove event listeners when stopped", () => { + const peerRemoveSpy = + mockPeerManagerEventEmitter.removeEventListener as sinon.SinonSpy; + const wakuRemoveSpy = + mockWakuEventEmitter.removeEventListener as sinon.SinonSpy; + + queryOnConnect.start(); + queryOnConnect.stop(); + + expect(peerRemoveSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be + .true; + expect(wakuRemoveSpy.calledWith(WakuEvent.Health)).to.be.true; + }); + }); + + describe("mock validation", () => { + beforeEach(() => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + it("should work with stubbed peer manager event emitter", () => { + expect(mockPeerManagerEventEmitter.addEventListener).to.be.a("function"); + expect(mockPeerManagerEventEmitter.removeEventListener).to.be.a( + "function" + ); + expect(mockPeerManagerEventEmitter.dispatchEvent).to.be.a("function"); + }); + + it("should work with stubbed waku event emitter", () => { + expect(mockWakuEventEmitter.addEventListener).to.be.a("function"); + expect(mockWakuEventEmitter.removeEventListener).to.be.a("function"); + expect(mockWakuEventEmitter.dispatchEvent).to.be.a("function"); + }); + + it("should work with stubbed retrieve function", () => { + expect(mockQueryGenerator).to.be.a("function"); + }); + + it("should work with mock decoders", () => { + expect(mockDecoders).to.be.an("array"); + expect(mockDecoders[0]).to.have.property("contentTopic"); + expect(mockDecoders[0]).to.have.property("fromWireToProtoObj"); + expect(mockDecoders[0]).to.have.property("fromProtoObj"); + }); + }); + + describe("event handling simulation", () => { + let addEventListenerStub: sinon.SinonStub; + let healthEventCallback: (event: CustomEvent) => void; + let storeConnectCallback: () => void; + + beforeEach(() => { + addEventListenerStub = sinon.stub(); + mockPeerManagerEventEmitter.addEventListener = addEventListenerStub; + mockWakuEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === WakuEvent.Health) { + healthEventCallback = callback; + } + }); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + it("should capture event listeners for testing", () => { + queryOnConnect.start(); + + expect( + addEventListenerStub.calledWith(PeerManagerEventNames.StoreConnect) + ).to.be.true; + + storeConnectCallback = addEventListenerStub.getCall(0).args[1]; + expect(storeConnectCallback).to.be.a("function"); + }); + + it("should properly setup health event callback", () => { + queryOnConnect.start(); + + expect(mockWakuEventEmitter.addEventListener).to.be.a("function"); + expect(healthEventCallback).to.be.a("function"); + }); + }); + + describe("async generator retrieve function mock", () => { + it("should work with async generator that yields promises", async () => { + const mockMessage: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + yield [Promise.resolve(undefined)]; + }; + + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + const generator = mockQueryGenerator(mockDecoders, {}); + const firstPage = await generator.next(); + expect(firstPage.done).to.be.false; + + const messages = await Promise.all(firstPage.value); + expect(messages[0]).to.deep.equal(mockMessage); + }); + + it("should handle retrieve function with query parameters", async () => { + const queryParams: Partial = { + timeStart: new Date(Date.now() - 1000), + timeEnd: new Date() + }; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + mockQueryGenerator(mockDecoders, queryParams); + + expect(mockQueryGenerator.calledWith(mockDecoders, queryParams)).to.be + .true; + }); + }); + + describe("message retrieval event emission conditions", () => { + let mockClock: sinon.SinonFakeTimers; + + beforeEach(() => { + mockClock = sinon.useFakeTimers(); + mockClock.tick(10); // always tick as now === 0 messes up the logic + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + afterEach(() => { + mockClock.restore(); + }); + + it("should trigger query when it went offline since the last successful query", async () => { + let healthEventCallback: + | ((event: CustomEvent) => void) + | undefined; + + // Capture the health event callback + mockWakuEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === WakuEvent.Health) { + healthEventCallback = callback; + } + }); + + queryOnConnect.start(); + + // Set lastSuccessfulQuery to simulate old query + await queryOnConnect["maybeQuery"](mockPeerId); + mockClock.tick(1); + + // goes offline + const healthEvent = new CustomEvent("health", { + detail: HealthStatus.Unhealthy + }); + expect(healthEventCallback).to.not.be.undefined; + healthEventCallback!.call(queryOnConnect, healthEvent); + mockClock.tick(1); + + // Call maybeQuery directly to test condition + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + + it("should not trigger query if health event is healthy since last successful query", async () => { + queryOnConnect.start(); + + // Set lastSuccessfulQuery to simulate old query + await queryOnConnect["maybeQuery"](mockPeerId); + + // goes offline + const healthEvent = new CustomEvent("health", { + detail: HealthStatus.SufficientlyHealthy + }); + mockWakuEventEmitter.dispatchEvent(healthEvent); + + // Call maybeQuery directly to test condition + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should trigger query when time since last query exceeds threshold", async function () { + const customThreshold = 10; + const customOptions: QueryOnConnectOptions = { + forceQueryThresholdMs: customThreshold + }; + + const queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + customOptions + ); + queryOnConnect.start(); + + // Set lastSuccessfulQuery to simulate old query + await queryOnConnect["maybeQuery"](mockPeerId); + + // Advance fake timer over the force threshold + mockClock.tick(20); + + // Call maybeQuery directly to test condition + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + + it("should not trigger query when a recent query happened under threshold", async () => { + const customThreshold = 2000; + const customOptions: QueryOnConnectOptions = { + forceQueryThresholdMs: customThreshold + }; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + customOptions + ); + + queryOnConnect.start(); + + // First call to set a successful call + await queryOnConnect["maybeQuery"](mockPeerId); + + // Second call should not trigger + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + }); + + describe("end-to-end message emission tests", () => { + let storeConnectCallback: (event: CustomEvent) => void; + let healthEventCallback: (event: CustomEvent) => void; + let messageEventPromise: Promise; + let resolveMessageEvent: (messages: IDecodedMessage[]) => void; + let rejectMessageEvent: (reason: string) => void; + let connectStoreEvent: CustomEvent; + + beforeEach(() => { + // Create a promise that resolves when a message event is emitted + messageEventPromise = new Promise( + (resolve, reject) => { + resolveMessageEvent = resolve; + rejectMessageEvent = reject; + } + ); + + // Setup event listener capture with proper binding + mockPeerManagerEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === PeerManagerEventNames.StoreConnect) { + storeConnectCallback = callback; + } + }); + + mockWakuEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === WakuEvent.Health) { + healthEventCallback = callback; + } + }); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + // Listen for message events + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + resolveMessageEvent(event.detail); + } + ); + + connectStoreEvent = new CustomEvent("connect:store", { + detail: mockPeerId + }); + + // Set a timeout to reject if no message is received + setTimeout( + () => rejectMessageEvent("No message received within timeout"), + 500 + ); + }); + + it("should emit message when we just started and store connect event occurs", async () => { + const mockMessage: IDecodedMessage = { + hash: utf8ToBytes("1234"), + hashStr: "1234", + version: 1, + timestamp: new Date(), + contentTopic: "/test/offline/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return the mock message + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect.start(); + + // Step 2: Simulate store peer reconnection + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Step 4: Wait for message emission + const receivedMessage = await messageEventPromise; + + expect(receivedMessage).to.deep.equal([mockMessage]); + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should emit message when we went offline since last successful query and store reconnect event occurs", async () => { + const mockMessage: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "1234", + version: 1, + timestamp: new Date(), + contentTopic: "/test/offline/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return the mock message + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect.start(); + + // Step 1: Simulate successful query in the past + await queryOnConnect["maybeQuery"](mockPeerId); + await delay(100); + + // Step 2: Simulate going offline after the successful query + const healthEvent = new CustomEvent("health", { + detail: HealthStatus.Unhealthy + }); + healthEventCallback.call(queryOnConnect, healthEvent); + + // Step 3: Simulate store peer reconnection + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Step 4: Wait for message emission + const receivedMessages = await messageEventPromise; + + expect(receivedMessages).to.deep.equal([mockMessage]); + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + + it("should emit message when store reconnect event occurs and last query was over max time threshold", async () => { + const mockMessage: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/timeout/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([4, 5, 6]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return the mock message + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + { forceQueryThresholdMs: 5000 } // 5 second threshold + ); + + // Re-setup event listeners for new instance + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + resolveMessageEvent(event.detail); + } + ); + + queryOnConnect.start(); + + // Step 1: Simulate old successful query (over threshold) + await queryOnConnect["maybeQuery"](mockPeerId); + + // Step 3: Simulate store peer reconnection + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Step 4: Wait for message emission + const receivedMessages = await messageEventPromise; + + expect(receivedMessages).to.deep.equal([mockMessage]); + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should emit multiple messages when query returns multiple messages", async () => { + const mockMessage1: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/multi/content1", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + const mockMessage2: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/multi/content2", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([4, 5, 6]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return multiple messages + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage1)]; + yield [Promise.resolve(mockMessage2)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + const receivedMessages: IDecodedMessage[] = []; + let messageCount = 0; + + // Create a new promise for multiple messages + const multipleMessagesPromise = new Promise((resolve) => { + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + receivedMessages.push(...event.detail); + messageCount++; + if (messageCount === 2) { + resolve(); + } + } + ); + }); + + queryOnConnect.start(); + + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Wait for all messages with timeout + await Promise.race([ + multipleMessagesPromise, + delay(200).then(() => + Promise.reject(new Error("Timeout waiting for messages")) + ) + ]); + + expect(receivedMessages).to.have.length(2); + expect(receivedMessages[0]).to.deep.equal(mockMessage1); + expect(receivedMessages[1]).to.deep.equal(mockMessage2); + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should not emit message when conditions are not met (recent query, no offline)", async () => { + queryOnConnect.start(); + + await queryOnConnect["maybeQuery"](mockPeerId); + + // Override promise to reject if any message is received + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + () => { + rejectMessageEvent("Unexpected message emission"); + } + ); + + await delay(10); + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Wait briefly to ensure no message is emitted + await delay(50); + + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should handle retrieve errors gracefully without emitting messages", async () => { + // Setup retrieve function to throw an error + mockQueryGenerator.rejects(new Error("Retrieval failed")); + + queryOnConnect.start(); + + // Override promise to reject if any message is received + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (_event: CustomEvent) => { + rejectMessageEvent("Unexpected message emission after error"); + } + ); + + await queryOnConnect["maybeQuery"](mockPeerId); + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Wait briefly to ensure no message is emitted + await delay(100); + + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + }); +}); + +describe("calculateTimeRange", () => { + it("should return start time to last successful query since last query is less than max range", () => { + const now = 1000000; // Some arbitrary timestamp + const lastSuccessfulQuery = now - 100; // 100ms ago + const maxTimeRangeQueryMs = 500; // 500ms max range + + const result = calculateTimeRange( + now, + lastSuccessfulQuery, + maxTimeRangeQueryMs + ); + + const expectedTimeStart = new Date(lastSuccessfulQuery); + const expectedTimeEnd = new Date(now); + + expect(result.timeStart).to.deep.equal(expectedTimeStart); + expect(result.timeEnd).to.deep.equal(expectedTimeEnd); + }); + + it("should return start time to match max range", () => { + const now = 1000000; + const lastSuccessfulQuery = 1000000 - 800; // 800ms ago + const maxTimeRangeQueryMs = 500; // 500ms max range + + const result = calculateTimeRange( + now, + lastSuccessfulQuery, + maxTimeRangeQueryMs + ); + + const expectedTimeStart = new Date(now - maxTimeRangeQueryMs); + const expectedTimeEnd = new Date(now); + + expect(result.timeStart).to.deep.equal(expectedTimeStart); + expect(result.timeEnd).to.deep.equal(expectedTimeEnd); + }); + + it("should handle zero lastSuccessfulQuery (never queried before)", () => { + const now = 1000000; + const lastSuccessfulQuery = 0; // Never queried + const maxTimeRangeQueryMs = 500; + + const result = calculateTimeRange( + now, + lastSuccessfulQuery, + maxTimeRangeQueryMs + ); + + const expectedTimeStart = new Date(now - maxTimeRangeQueryMs); // 1000000 - 1000000 = 0 + const expectedTimeEnd = new Date(now); // 1000000 + + expect(result.timeStart).to.deep.equal(expectedTimeStart); + expect(result.timeEnd).to.deep.equal(expectedTimeEnd); + }); +}); diff --git a/packages/sdk/src/query_on_connect/query_on_connect.ts b/packages/sdk/src/query_on_connect/query_on_connect.ts new file mode 100644 index 0000000000..f42c2ada91 --- /dev/null +++ b/packages/sdk/src/query_on_connect/query_on_connect.ts @@ -0,0 +1,208 @@ +import { type PeerId, TypedEventEmitter } from "@libp2p/interface"; +import { + HealthStatus, + type IDecodedMessage, + type IDecoder, + IWakuEventEmitter, + QueryRequestParams, + WakuEvent +} from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +import { + IPeerManagerEvents, + PeerManagerEventNames +} from "../peer_manager/peer_manager.js"; + +const log = new Logger("sdk:query-on-connect"); + +export const DEFAULT_FORCE_QUERY_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes +export const MAX_TIME_RANGE_QUERY_MS = 24 * 60 * 60 * 1000; // 24 hours + +export interface QueryOnConnectOptions { + /** + * Elapsed time since the last successful query, after which we proceed with + * a store query, on a connection event, no matter the conditions. + * @default [[DEFAULT_FORCE_QUERY_THRESHOLD_MS]] + */ + forceQueryThresholdMs?: number; +} + +export enum QueryOnConnectEvent { + /** + * A message has been retrieved. + */ + MessagesRetrieved = "messages:retrieved" +} + +export type QueryOnConnectEvents = { + [QueryOnConnectEvent.MessagesRetrieved]: CustomEvent; +}; + +/** + * Proceed with time-range store queries after connection to a store node. + * Partial implementation of [Waku P2P Reliability](https://github.com/waku-org/specs/blob/master/standards/application/p2p-reliability.md) + * + * @emits message retrieved on "messages" + */ +export class QueryOnConnect< + T extends IDecodedMessage +> extends TypedEventEmitter { + private lastSuccessfulQuery: number; + private lastTimeOffline: number; + private readonly forceQueryThresholdMs: number; + + public constructor( + public decoders: IDecoder[], + private readonly peerManagerEventEmitter: TypedEventEmitter, + private readonly wakuEventEmitter: IWakuEventEmitter, + private readonly _queryGenerator: ( + decoders: IDecoder[], + options?: Partial + ) => AsyncGenerator[]>, + options?: QueryOnConnectOptions + ) { + super(); + this.lastSuccessfulQuery = 0; + this.lastTimeOffline = 0; + this.forceQueryThresholdMs = + options?.forceQueryThresholdMs ?? DEFAULT_FORCE_QUERY_THRESHOLD_MS; + } + + public start(): void { + log.info("starting query-on-connect service"); + this.setupEventListeners(); + } + + public stop(): void { + this.unsetEventListeners(); + } + + /** + * Mainly exposed for testing. Only use if you know what you are doing. + * + * Proceed with a query if: + * - No successful query has happened + * - OR, We detected that we were offline since last successful query + * - OR, It bas been more than `forceQueryThresholdMs` than last query + * + * [[QueryOnConnect]] handles the listening to event to call this function. + * + * @param peerId A store peer id. Must be passed as we expect this to be trigger + * upon a detected connection to a store peer. + */ + private async maybeQuery(peerId: PeerId): Promise { + const timeSinceLastQuery = Date.now() - this.lastSuccessfulQuery; + log.info( + `maybe do store query to ${peerId.toString()}`, + this.lastSuccessfulQuery, + this.lastTimeOffline, + timeSinceLastQuery, + this.forceQueryThresholdMs + ); + + if ( + this.lastSuccessfulQuery === 0 || + this.lastTimeOffline > this.lastSuccessfulQuery || + timeSinceLastQuery > this.forceQueryThresholdMs + ) { + await this.query(peerId); + } else { + log.info(`no querying`); + } + } + + private async query(peerId: PeerId): Promise { + log.info(`perform store query to ${peerId.toString()}`); + const { timeStart, timeEnd } = this.queryTimeRange(); + try { + for await (const page of this._queryGenerator(this.decoders, { + timeStart, + timeEnd, + peerId + })) { + // Await for decoding + const messages = (await Promise.all(page)).filter( + (m) => m !== undefined + ); + // Bundle the messages to help batch process by sds + this.dispatchMessages(messages); + } + + // Didn't throw, so it didn't fail + this.lastSuccessfulQuery = Date.now(); + } catch (err) { + log.warn(`store query to ${peerId.toString()} failed`, err); + } + } + + private queryTimeRange(): { timeStart: Date; timeEnd: Date } { + return calculateTimeRange( + Date.now(), + this.lastSuccessfulQuery, + MAX_TIME_RANGE_QUERY_MS + ); + } + + private dispatchMessages(messages: T[]): void { + log.info( + "dispatching messages", + messages.map((m) => m.hashStr) + ); + this.dispatchEvent( + new CustomEvent( + QueryOnConnectEvent.MessagesRetrieved, + { + detail: messages + } + ) + ); + } + + private setupEventListeners(): void { + this.peerManagerEventEmitter.addEventListener( + PeerManagerEventNames.StoreConnect, + (event) => + void this.maybeQuery(event.detail).catch((err) => + log.error("query-on-connect error", err) + ) + ); + + this.wakuEventEmitter.addEventListener( + WakuEvent.Health, + this.updateLastOfflineDate.bind(this) + ); + } + + private unsetEventListeners(): void { + this.peerManagerEventEmitter.removeEventListener( + PeerManagerEventNames.StoreConnect, + (event) => + void this.maybeQuery(event.detail).catch((err) => + log.error("query-on-connect error", err) + ) + ); + + this.wakuEventEmitter.removeEventListener( + WakuEvent.Health, + this.updateLastOfflineDate.bind(this) + ); + } + + private updateLastOfflineDate(event: CustomEvent): void { + if (event.detail === HealthStatus.Unhealthy) { + this.lastTimeOffline = Date.now(); + } + } +} + +export function calculateTimeRange( + now: number, + lastSuccessfulQuery: number, + maxTimeRangeQueryMs: number +): { timeStart: Date; timeEnd: Date } { + const timeRange = Math.min(now - lastSuccessfulQuery, maxTimeRangeQueryMs); + const timeStart = new Date(now - timeRange); + const timeEnd = new Date(now); + return { timeStart, timeEnd }; +} diff --git a/packages/sdk/src/store/store.spec.ts b/packages/sdk/src/store/store.spec.ts index 4931aafa7c..8983ba2335 100644 --- a/packages/sdk/src/store/store.spec.ts +++ b/packages/sdk/src/store/store.spec.ts @@ -1,3 +1,4 @@ +import { type PeerId } from "@libp2p/interface"; import { StoreCore } from "@waku/core"; import type { IDecodedMessage, IDecoder, Libp2p } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -292,5 +293,36 @@ describe("Store", () => { expect(mockPeerManager.getPeers.called).to.be.true; }); + + it("should use peerId from options when provided to queryGenerator", async () => { + const customPeerId = { + toString: () => "QmCustomPeerId" + } as unknown as PeerId; + + const mockMessages = [Promise.resolve(mockMessage)]; + const mockResponseGenerator = (async function* () { + yield mockMessages; + })(); + + mockStoreCore.queryPerPage.returns(mockResponseGenerator); + + const generator = store.queryGenerator([mockDecoder], { + peerId: customPeerId + }); + + const results = []; + for await (const messages of generator) { + results.push(messages); + } + + expect(mockPeerManager.getPeers.called).to.be.false; + + expect(mockStoreCore.queryPerPage.called).to.be.true; + const callArgs = mockStoreCore.queryPerPage.getCall(0).args; + expect(callArgs[2]).to.equal(customPeerId); + + expect(results).to.have.length(1); + expect(results[0]).to.equal(mockMessages); + }); }); }); diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 1297060cf2..2165005899 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -65,8 +65,8 @@ export class Store implements IStore { ); for (const queryOption of queryOptions) { - const peer = await this.getPeerToUse(queryOption.pubsubTopic); - + const peer = + options?.peerId ?? (await this.getPeerToUse(queryOption.pubsubTopic)); if (!peer) { log.error("No peers available to query"); throw new Error("No peers available to query"); diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index a3c36ef384..6fc689d1b1 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -40,7 +40,7 @@ import { Store } from "../store/index.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; -const log = new Logger("waku"); +const log = new Logger("sdk:waku"); type ProtocolsEnabled = { filter?: boolean; @@ -227,6 +227,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; await this.libp2p.start(); + await this.filter?.start(); this.connectionManager.start(); this.peerManager.start(); this.healthIndicator.start(); @@ -244,6 +245,7 @@ export class WakuNode implements IWaku { this.sender?.stop(); this.lightPush?.stop(); + await this.filter?.stop(); this.healthIndicator.stop(); this.peerManager.stop(); this.connectionManager.stop(); diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index f5e4586fe0..3c1fb30cb1 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -14,7 +14,8 @@ export { type HistoryEntry, type ChannelId, type MessageChannelEvents, - type SenderId + type SenderId, + type MessageId } from "./message_channel/index.js"; export { BloomFilter }; diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 2d2f22abae..8218bdf2f9 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -23,9 +23,15 @@ export class MemLocalHistory { this.validateMessage(item); } - // Add new items and ensure uniqueness by messageId using sortedUniqBy + // Add new items and sort by timestamp, ensuring uniqueness by messageId // The valueOf() method on ContentMessage enables native < operator sorting - this.items = _.sortedUniqBy([...this.items, ...items], "messageId"); + const combinedItems = [...this.items, ...items]; + + // Sort by timestamp (using valueOf() which creates timestamp_messageId string) + combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); + + // Remove duplicates by messageId while maintaining order + this.items = _.uniqBy(combinedItems, "messageId"); return this.items.length; } @@ -56,6 +62,17 @@ export class MemLocalHistory { return this.items.find(predicate, thisArg); } + public findIndex( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): number { + return this.items.findIndex(predicate, thisArg); + } + private validateMessage(message: ContentMessage): void { if (!isContentMessage(message)) { throw new Error( diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 9b4a2e0841..37dfd5db28 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -1,3 +1,4 @@ +import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; @@ -32,6 +33,27 @@ describe("Message serialization", () => { expect(decBloomFilter.lookup(messageId)).to.be.true; }); + + it("Retrieval Hint", () => { + const depMessageId = "dependency"; + const depRetrievalHint = utf8ToBytes("dependency"); + const message = new Message( + "123", + "my-channel", + "me", + [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], + 0, + undefined, + undefined + ); + + const bytes = message.encode(); + const decMessage = Message.decode(bytes); + + expect(decMessage.causalHistory).to.deep.equal([ + { messageId: depMessageId, retrievalHint: depRetrievalHint } + ]); + }); }); describe("ContentMessage comparison with < operator", () => { diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 624e9a97e8..6132f469d9 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -23,7 +23,7 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { }; const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { - return (channel as any).filter as DefaultBloomFilter; + return channel["filter"] as DefaultBloomFilter; }; const messagesA = ["message-1", "message-2"]; @@ -54,9 +54,10 @@ const sendSyncMessage = async ( const receiveMessage = async ( channel: MessageChannel, - message: Message + message: Message, + retrievalHint?: Uint8Array ): Promise => { - channel.pushIncomingMessage(message); + channel.pushIncomingMessage(message, retrievalHint); await channel.processTasks(); }; @@ -71,16 +72,16 @@ describe("MessageChannel", function () { }); it("should increase lamport timestamp", async () => { - const timestampBefore = (channelA as any).lamportTimestamp; + const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelA, utf8ToBytes("message"), callback); - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore + 1); }); it("should push the message to the outgoing buffer", async () => { - const bufferLengthBefore = (channelA as any).outgoingBuffer.length; + const bufferLengthBefore = channelA["outgoingBuffer"].length; await sendMessage(channelA, utf8ToBytes("message"), callback); - const bufferLengthAfter = (channelA as any).outgoingBuffer.length; + const bufferLengthAfter = channelA["outgoingBuffer"].length; expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); }); @@ -94,10 +95,10 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const payload = utf8ToBytes("message"); - const expectedTimestamp = (channelA as any).lamportTimestamp + 1; + const expectedTimestamp = channelA["lamportTimestamp"] + 1; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); - const messageIdLog = (channelA as any).localHistory as ILocalHistory; + const messageIdLog = channelA["localHistory"] as ILocalHistory; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( @@ -108,9 +109,30 @@ describe("MessageChannel", function () { ).to.equal(true); }); + it("should add sent message to localHistory with retrievalHint", async () => { + const payload = utf8ToBytes("message with retrieval hint"); + const messageId = MessageChannel.getMessageId(payload); + const testRetrievalHint = utf8ToBytes("test-retrieval-hint-data"); + + await sendMessage(channelA, payload, async (_message) => { + // Simulate successful sending with retrievalHint + return { success: true, retrievalHint: testRetrievalHint }; + }); + + const localHistory = channelA["localHistory"] as ILocalHistory; + expect(localHistory.length).to.equal(1); + + // Find the message in local history + const historyEntry = localHistory.find( + (entry) => entry.messageId === messageId + ); + expect(historyEntry).to.exist; + expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); + }); + it("should attach causal history and bloom filter to each message", async () => { const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; const filterBytes = new Array(); const messages = new Array(causalHistorySize + 5) .fill("message") @@ -122,7 +144,7 @@ describe("MessageChannel", function () { bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); } - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(messages.length); outgoingBuffer.forEach((message, index) => { @@ -153,12 +175,12 @@ describe("MessageChannel", function () { }); it("should increase lamport timestamp", async () => { - const timestampBefore = (channelA as any).lamportTimestamp; + const timestampBefore = channelA["lamportTimestamp"]; await sendMessage(channelB, utf8ToBytes("message"), async (message) => { await receiveMessage(channelA, message); return { success: true }; }); - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore + 1); }); @@ -172,7 +194,7 @@ describe("MessageChannel", function () { return { success: true }; }); } - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(messagesB.length); }); @@ -182,7 +204,7 @@ describe("MessageChannel", function () { await sendMessage(channelA, utf8ToBytes(m), async (message) => { timestamp++; await receiveMessage(channelB, message); - expect((channelB as any).lamportTimestamp).to.equal(timestamp); + expect(channelB["lamportTimestamp"]).to.equal(timestamp); return { success: true }; }); } @@ -191,15 +213,15 @@ describe("MessageChannel", function () { await sendMessage(channelB, utf8ToBytes(m), async (message) => { timestamp++; await receiveMessage(channelA, message); - expect((channelA as any).lamportTimestamp).to.equal(timestamp); + expect(channelA["lamportTimestamp"]).to.equal(timestamp); return { success: true }; }); } const expectedLength = messagesA.length + messagesB.length; - expect((channelA as any).lamportTimestamp).to.equal(expectedLength); - expect((channelA as any).lamportTimestamp).to.equal( - (channelB as any).lamportTimestamp + expect(channelA["lamportTimestamp"]).to.equal(expectedLength); + expect(channelA["lamportTimestamp"]).to.equal( + channelB["lamportTimestamp"] ); }); @@ -220,7 +242,7 @@ describe("MessageChannel", function () { } let receivedMessage: Message | null = null; - const timestampBefore = (channelB as any).lamportTimestamp; + const timestampBefore = channelB["lamportTimestamp"]; await sendMessage( channelA, @@ -232,26 +254,180 @@ describe("MessageChannel", function () { } ); - const incomingBuffer = (channelB as any).incomingBuffer as Message[]; + const incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer[0].messageId).to.equal(receivedMessage!.messageId); // Since the dependency is not met, the lamport timestamp should not increase - const timestampAfter = (channelB as any).lamportTimestamp; + const timestampAfter = channelB["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); // Message should not be in local history - const localHistory = (channelB as any).localHistory as { - timestamp: number; - historyEntry: HistoryEntry; - }[]; + const localHistory = channelB["localHistory"]; expect( localHistory.some( - ({ historyEntry: { messageId } }) => - messageId === receivedMessage!.messageId + ({ messageId }) => messageId === receivedMessage!.messageId ) ).to.equal(false); }); + + it("should add received message to localHistory with retrievalHint", async () => { + const payload = utf8ToBytes("message with retrieval hint"); + const messageId = MessageChannel.getMessageId(payload); + const testRetrievalHint = utf8ToBytes("test-retrieval-hint-data"); + + await receiveMessage( + channelA, + new Message( + messageId, + channelA.channelId, + "not-alice", + [], + 1, + undefined, + payload, + testRetrievalHint + ), + testRetrievalHint + ); + + const localHistory = channelA["localHistory"] as ILocalHistory; + console.log("localHistory", localHistory); + expect(localHistory.length).to.equal(1); + + // Find the message in local history + const historyEntry = localHistory.find( + (entry) => entry.messageId === messageId + ); + console.log("history entry", historyEntry); + expect(historyEntry).to.exist; + expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); + }); + + it("should maintain chronological order of messages in localHistory", async () => { + // Send messages with different timestamps (including own messages) + const message1Payload = utf8ToBytes("message 1"); + const message2Payload = utf8ToBytes("message 2"); + const message3Payload = utf8ToBytes("message 3"); + + const message1Id = MessageChannel.getMessageId(message1Payload); + const message2Id = MessageChannel.getMessageId(message2Payload); + const message3Id = MessageChannel.getMessageId(message3Payload); + + // Send own message first (timestamp will be 1) + await sendMessage(channelA, message1Payload, callback); + + // Receive a message from another sender with higher timestamp (3) + await receiveMessage( + channelA, + new ContentMessage( + message3Id, + channelA.channelId, + "bob", + [], + 3, // Higher timestamp + undefined, + message3Payload + ) + ); + + // Receive a message from another sender with middle timestamp (2) + await receiveMessage( + channelA, + new ContentMessage( + message2Id, + channelA.channelId, + "carol", + [], + 2, // Middle timestamp + undefined, + message2Payload + ) + ); + + const localHistory = channelA["localHistory"]; + expect(localHistory.length).to.equal(3); + + // Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3) + + const first = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === message1Id && lamportTimestamp === 1; + } + ); + expect(first).to.eq(0); + + const second = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === message2Id && lamportTimestamp === 2; + } + ); + expect(second).to.eq(1); + + const third = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === message3Id && lamportTimestamp === 3; + } + ); + expect(third).to.eq(2); + }); + + it("should handle messages with same timestamp ordered by messageId", async () => { + const message1Payload = utf8ToBytes("message a"); + const message2Payload = utf8ToBytes("message b"); + + const message1Id = MessageChannel.getMessageId(message1Payload); + const message2Id = MessageChannel.getMessageId(message2Payload); + + // Receive messages with same timestamp but different message IDs + // The valueOf() method ensures ordering by messageId when timestamps are equal + await receiveMessage( + channelA, + new ContentMessage( + message2Id, // This will come second alphabetically by messageId + channelA.channelId, + "bob", + [], + 5, // Same timestamp + undefined, + message2Payload + ) + ); + + await receiveMessage( + channelA, + new ContentMessage( + message1Id, // This will come first alphabetically by messageId + channelA.channelId, + "carol", + [], + 5, // Same timestamp + undefined, + message1Payload + ) + ); + + const localHistory = channelA["localHistory"] as ILocalHistory; + expect(localHistory.length).to.equal(2); + + // When timestamps are equal, should be ordered by messageId lexicographically + // The valueOf() method creates "000000000000005_messageId" for comparison + const expectedOrder = [message1Id, message2Id].sort(); + + const first = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === expectedOrder[0] && lamportTimestamp == 5; + } + ); + expect(first).to.eq(0); + + const second = localHistory.findIndex( + ({ messageId, lamportTimestamp }) => { + return messageId === expectedOrder[1] && lamportTimestamp == 5; + } + ); + expect(second).to.eq(1); + }); }); describe("reviewing ack status", () => { @@ -283,9 +459,7 @@ describe("MessageChannel", function () { await channelA.processTasks(); await channelB.processTasks(); - expect((channelA as any).outgoingBuffer.length).to.equal( - messagesA.length + 1 - ); + expect(channelA["outgoingBuffer"].length).to.equal(messagesA.length + 1); await sendMessage( channelB, @@ -302,7 +476,7 @@ describe("MessageChannel", function () { // Since B received message-1, message-2, and not-in-history (3 messages), // and causalHistorySize is 3, it will only include the last 2 in its causal history // So message-1 won't be acknowledged, only message-2 and not-in-history - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(1); // The remaining message should be message-1 (not acknowledged) expect(outgoingBuffer[0].messageId).to.equal( @@ -320,15 +494,12 @@ describe("MessageChannel", function () { await channelA.processTasks(); // All messages remain in the buffer - expect((channelA as any).outgoingBuffer.length).to.equal( - messagesA.length - ); + expect(channelA["outgoingBuffer"].length).to.equal(messagesA.length); }); it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { - const possibleAcksThreshold = (channelA as any).possibleAcksThreshold; - - const causalHistorySize = (channelA as any).causalHistorySize; + const possibleAcksThreshold = channelA["possibleAcksThreshold"]; + const causalHistorySize = channelA["causalHistorySize"]; const unacknowledgedMessages = [ "unacknowledged-message-1", @@ -358,8 +529,8 @@ describe("MessageChannel", function () { } ); - const possibleAcks: ReadonlyMap = (channelA as any) - .possibleAcks; + const possibleAcks: ReadonlyMap = + channelA["possibleAcks"]; // Other than the message IDs which were included in causal history, // the remaining messages sent by channel A should be considered possibly acknowledged // for having been included in the bloom filter sent from channel B @@ -396,12 +567,12 @@ describe("MessageChannel", function () { expect(possibleAcks.size).to.equal(0); // Messages that were not acknowledged should still be in the outgoing buffer - expect((channelA as any).outgoingBuffer.length).to.equal( + expect(channelA["outgoingBuffer"].length).to.equal( unacknowledgedMessages.length ); unacknowledgedMessages.forEach((m) => { expect( - ((channelA as any).outgoingBuffer as Message[]).some( + (channelA["outgoingBuffer"] as Message[]).some( (message) => message.messageId === MessageChannel.getMessageId(utf8ToBytes(m)) ) @@ -417,9 +588,8 @@ describe("MessageChannel", function () { }); } - const possibleAcks: ReadonlyMap = (channelA as any) - .possibleAcks; - + const possibleAcks: ReadonlyMap = + channelA["possibleAcks"]; expect(possibleAcks.size).to.equal(0); }); @@ -478,7 +648,7 @@ describe("MessageChannel", function () { }); it("should detect messages with missing dependencies", async () => { - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; for (const m of messagesA) { await sendMessage(channelA, utf8ToBytes(m), callback); } @@ -492,7 +662,7 @@ describe("MessageChannel", function () { } ); - const incomingBuffer = (channelB as any).incomingBuffer as Message[]; + const incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer[0].messageId).to.equal( MessageChannel.getMessageId(utf8ToBytes(messagesB[0])) @@ -506,7 +676,7 @@ describe("MessageChannel", function () { }); it("should deliver messages after dependencies are met", async () => { - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; const sentMessages = new Array(); // First, send messages from A but DON'T deliver them to B yet for (const m of messagesA) { @@ -537,7 +707,7 @@ describe("MessageChannel", function () { MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) ); - let incomingBuffer = (channelB as any).incomingBuffer as Message[]; + let incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); // Now deliver the missing dependencies @@ -550,7 +720,7 @@ describe("MessageChannel", function () { const missingMessages2 = channelB.sweepIncomingBuffer(); expect(missingMessages2.length).to.equal(0); - incomingBuffer = (channelB as any).incomingBuffer as Message[]; + incomingBuffer = channelB["incomingBuffer"]; expect(incomingBuffer.length).to.equal(0); }); @@ -594,8 +764,74 @@ describe("MessageChannel", function () { expect(irretrievablyLost).to.be.true; }); + it("should emit InMessageLost event with retrievalHint when timeout is exceeded", async () => { + const testRetrievalHint = utf8ToBytes("lost-message-hint"); + let lostMessages: HistoryEntry[] = []; + + // Create a channel with very short timeout + const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + timeoutForLostMessagesMs: 10 + }); + + channelC.addEventListener(MessageChannelEvent.InMessageLost, (event) => { + lostMessages = event.detail; + }); + + // Send message from A with retrievalHint + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (message) => { + message.retrievalHint = testRetrievalHint; + return { success: true, retrievalHint: testRetrievalHint }; + } + ); + + // Send another message from A + await sendMessage(channelA, utf8ToBytes(messagesA[1]), callback); + + // Send a message to C that depends on the previous messages + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelC, message); + return { success: true }; + } + ); + + // First sweep - should detect missing messages + channelC.sweepIncomingBuffer(); + + // Wait for timeout + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Second sweep - should mark messages as lost + channelC.sweepIncomingBuffer(); + + expect(lostMessages.length).to.equal(2); + + // Verify retrievalHint is included in the lost message + const lostMessageWithHint = lostMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + expect(lostMessageWithHint).to.exist; + expect(lostMessageWithHint!.retrievalHint).to.deep.equal( + testRetrievalHint + ); + + // Verify message without retrievalHint has undefined + const lostMessageWithoutHint = lostMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[1])) + ); + expect(lostMessageWithoutHint).to.exist; + expect(lostMessageWithoutHint!.retrievalHint).to.be.undefined; + }); + it("should remove messages without delivering if timeout is exceeded", async () => { - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; // Create a channel with very very short timeout const channelC: MessageChannel = new MessageChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 @@ -616,15 +852,173 @@ describe("MessageChannel", function () { const missingMessages = channelC.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); - let incomingBuffer = (channelC as any).incomingBuffer as Message[]; + let incomingBuffer = channelC["incomingBuffer"]; expect(incomingBuffer.length).to.equal(1); await new Promise((resolve) => setTimeout(resolve, 20)); channelC.sweepIncomingBuffer(); - incomingBuffer = (channelC as any).incomingBuffer as Message[]; + incomingBuffer = channelC["incomingBuffer"]; expect(incomingBuffer.length).to.equal(0); }); + + it("should return HistoryEntry with retrievalHint from sweepIncomingBuffer", async () => { + const testRetrievalHint = utf8ToBytes("test-retrieval-hint"); + + // Send message from A with a retrievalHint + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (message) => { + message.retrievalHint = testRetrievalHint; + return { success: true, retrievalHint: testRetrievalHint }; + } + ); + + // Send another message from A that depends on the first one + await sendMessage( + channelA, + utf8ToBytes(messagesA[1]), + async (_message) => { + // Don't send to B yet - we want B to have missing dependencies + return { success: true }; + } + ); + + // Send a message from A to B that depends on previous messages + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + + // Sweep should detect missing dependencies and return them with retrievalHint + const missingMessages = channelB.sweepIncomingBuffer(); + expect(missingMessages.length).to.equal(2); + + // Find the first message in missing dependencies + const firstMissingMessage = missingMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + expect(firstMissingMessage).to.exist; + expect(firstMissingMessage!.retrievalHint).to.deep.equal( + testRetrievalHint + ); + }); + + it("should emit InMessageMissing event with retrievalHint", async () => { + const testRetrievalHint1 = utf8ToBytes("hint-for-message-1"); + const testRetrievalHint2 = utf8ToBytes("hint-for-message-2"); + let eventReceived = false; + let emittedMissingMessages: HistoryEntry[] = []; + + // Listen for InMessageMissing event + channelB.addEventListener( + MessageChannelEvent.InMessageMissing, + (event) => { + eventReceived = true; + emittedMissingMessages = event.detail; + } + ); + + // Send messages from A with retrievalHints + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (message) => { + message.retrievalHint = testRetrievalHint1; + return { success: true, retrievalHint: testRetrievalHint1 }; + } + ); + + await sendMessage( + channelA, + utf8ToBytes(messagesA[1]), + async (message) => { + message.retrievalHint = testRetrievalHint2; + return { success: true, retrievalHint: testRetrievalHint2 }; + } + ); + + // Send a message to B that depends on the previous messages + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + + // Sweep should trigger InMessageMissing event + channelB.sweepIncomingBuffer(); + + expect(eventReceived).to.be.true; + expect(emittedMissingMessages.length).to.equal(2); + + // Verify retrievalHints are included in the event + const firstMissing = emittedMissingMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + const secondMissing = emittedMissingMessages.find( + (m) => + m.messageId === MessageChannel.getMessageId(utf8ToBytes(messagesA[1])) + ); + + expect(firstMissing).to.exist; + expect(firstMissing!.retrievalHint).to.deep.equal(testRetrievalHint1); + expect(secondMissing).to.exist; + expect(secondMissing!.retrievalHint).to.deep.equal(testRetrievalHint2); + }); + + it("should handle missing messages with undefined retrievalHint", async () => { + let emittedMissingMessages: HistoryEntry[] = []; + + channelB.addEventListener( + MessageChannelEvent.InMessageMissing, + (event) => { + emittedMissingMessages = event.detail; + } + ); + + // Send message from A without retrievalHint + await sendMessage( + channelA, + utf8ToBytes(messagesA[0]), + async (_message) => { + // Don't set retrievalHint + return { success: true }; + } + ); + + // Send a message to B that depends on the previous message + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + + // Sweep should handle missing message with undefined retrievalHint + const missingMessages = channelB.sweepIncomingBuffer(); + + expect(missingMessages.length).to.equal(1); + expect(missingMessages[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); + expect(missingMessages[0].retrievalHint).to.be.undefined; + + // Event should also reflect undefined retrievalHint + expect(emittedMissingMessages.length).to.equal(1); + expect(emittedMissingMessages[0].retrievalHint).to.be.undefined; + }); }); describe("Sweeping outgoing buffer", () => { @@ -649,7 +1043,7 @@ describe("MessageChannel", function () { expect(possiblyAcknowledged.length).to.equal(0); // Make sure messages sent by channel A are not in causal history - const causalHistorySize = (channelA as any).causalHistorySize; + const causalHistorySize = channelA["causalHistorySize"]; for (const m of messagesB.slice(0, causalHistorySize)) { await sendMessage(channelB, utf8ToBytes(m), callback); } @@ -690,7 +1084,7 @@ describe("MessageChannel", function () { it("should not be added to outgoing buffer, bloom filter, or local log", async () => { await channelA.pushOutgoingSyncMessage(); - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(0); const bloomFilter = getBloomFilter(channelA); @@ -698,26 +1092,20 @@ describe("MessageChannel", function () { bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) ).to.equal(false); - const localLog = (channelA as any).localHistory as { - timestamp: number; - messageId: MessageId; - }[]; + const localLog = channelA["localHistory"]; expect(localLog.length).to.equal(0); }); it("should not be delivered", async () => { - const timestampBefore = (channelB as any).lamportTimestamp; + const timestampBefore = channelB["lamportTimestamp"]; await channelA.pushOutgoingSyncMessage(async (message) => { await receiveMessage(channelB, message); return true; }); - const timestampAfter = (channelB as any).lamportTimestamp; + const timestampAfter = channelB["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); - const localLog = (channelB as any).localHistory as { - timestamp: number; - messageId: MessageId; - }[]; + const localLog = channelB["localHistory"]; expect(localLog.length).to.equal(0); const bloomFilter = getBloomFilter(channelB); @@ -739,8 +1127,8 @@ describe("MessageChannel", function () { return true; }); - const causalHistorySize = (channelA as any).causalHistorySize; - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const causalHistorySize = channelA["causalHistorySize"]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal( messagesA.length - causalHistorySize ); @@ -753,7 +1141,7 @@ describe("MessageChannel", function () { }); it("should be sent without a timestamp, causal history, or bloom filter", async () => { - const timestampBefore = (channelA as any).lamportTimestamp; + const timestampBefore = channelA["lamportTimestamp"]; await channelA.pushOutgoingEphemeralMessage( new Uint8Array(), async (message) => { @@ -764,10 +1152,10 @@ describe("MessageChannel", function () { } ); - const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + const outgoingBuffer = channelA["outgoingBuffer"] as Message[]; expect(outgoingBuffer.length).to.equal(0); - const timestampAfter = (channelA as any).lamportTimestamp; + const timestampAfter = channelA["lamportTimestamp"]; expect(timestampAfter).to.equal(timestampBefore); }); @@ -775,9 +1163,9 @@ describe("MessageChannel", function () { const channelB = new MessageChannel(channelId, "bob"); // Track initial state - const localHistoryBefore = (channelB as any).localHistory.length; - const incomingBufferBefore = (channelB as any).incomingBuffer.length; - const timestampBefore = (channelB as any).lamportTimestamp; + const localHistoryBefore = channelB["localHistory"].length; + const incomingBufferBefore = channelB["incomingBuffer"].length; + const timestampBefore = channelB["lamportTimestamp"]; await channelA.pushOutgoingEphemeralMessage( utf8ToBytes(messagesA[0]), @@ -793,15 +1181,11 @@ describe("MessageChannel", function () { // Verify ephemeral message behavior: // 1. Not added to local history - expect((channelB as any).localHistory.length).to.equal( - localHistoryBefore - ); + expect(channelB["localHistory"].length).to.equal(localHistoryBefore); // 2. Not added to incoming buffer - expect((channelB as any).incomingBuffer.length).to.equal( - incomingBufferBefore - ); + expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore); // 3. Doesn't update lamport timestamp - expect((channelB as any).lamportTimestamp).to.equal(timestampBefore); + expect(channelB["lamportTimestamp"]).to.equal(timestampBefore); }); }); }); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 6ae3799801..a9cd980a71 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -30,7 +30,7 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = { const DEFAULT_CAUSAL_HISTORY_SIZE = 200; const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; -const log = new Logger("waku:sds:message-channel"); +const log = new Logger("sds:message-channel"); export interface MessageChannelOptions { causalHistorySize?: number; @@ -50,7 +50,7 @@ export interface MessageChannelOptions { export type ILocalHistory = Pick< Array, - "some" | "push" | "slice" | "find" | "length" + "some" | "push" | "slice" | "find" | "length" | "findIndex" >; export class MessageChannel extends TypedEventEmitter { @@ -61,7 +61,7 @@ export class MessageChannel extends TypedEventEmitter { private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; private incomingBuffer: Array; - private localHistory: ILocalHistory; + private readonly localHistory: ILocalHistory; private timeReceived: Map; private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; @@ -226,7 +226,7 @@ export class MessageChannel extends TypedEventEmitter { * proper dependency resolution and causal ordering. * * @param message - The message to receive and process - * + * @param retrievalHint - The retrieval hint for the message, provided by the transport layer * @example * ```typescript * const channel = new MessageChannel("chat-room"); @@ -238,7 +238,12 @@ export class MessageChannel extends TypedEventEmitter { * await channel.processTasks(); * ``` */ - public pushIncomingMessage(message: Message): void { + public pushIncomingMessage( + message: Message, + retrievalHint: Uint8Array | undefined + ): void { + message.retrievalHint = retrievalHint; + this.tasks.push({ command: Command.Receive, params: { @@ -282,7 +287,9 @@ export class MessageChannel extends TypedEventEmitter { this.senderId, message.messageId, "is missing dependencies", - missingDependencies.map((ch) => ch.messageId) + missingDependencies.map(({ messageId, retrievalHint }) => { + return { messageId, retrievalHint }; + }) ); // Optionally, if a message has not been received after a predetermined amount of time, @@ -395,7 +402,15 @@ export class MessageChannel extends TypedEventEmitter { } private _pushIncomingMessage(message: Message): void { - log.info(this.senderId, "incoming message", message.messageId); + if (message.channelId !== this.channelId) { + log.warn("dropping message on different channel", message.channelId); + return; + } + + log.info( + `${this.senderId} incoming message ${message.messageId}`, + `retrieval hint: ${bytesToHex(message.retrievalHint ?? new Uint8Array())}` + ); const isDuplicate = message.content && message.content.length > 0 && @@ -589,14 +604,10 @@ export class MessageChannel extends TypedEventEmitter { * Return true if the message was "delivered" * * @param message - * @param retrievalHint * @private */ // See https://rfc.vac.dev/vac/raw/sds/#deliver-message - private deliverMessage( - message: ContentMessage, - retrievalHint?: Uint8Array - ): boolean { + private deliverMessage(message: ContentMessage): boolean { if (!isContentMessage(message)) { // Messages with empty content are sync messages. // Messages with no timestamp are ephemeral messages. @@ -605,7 +616,12 @@ export class MessageChannel extends TypedEventEmitter { return false; } - log.info(this.senderId, "delivering message", message.messageId); + log.info( + this.senderId, + "delivering message", + message.messageId, + message.retrievalHint + ); if (message.lamportTimestamp > this.lamportTimestamp) { this.lamportTimestamp = message.lamportTimestamp; } @@ -620,7 +636,9 @@ export class MessageChannel extends TypedEventEmitter { return true; } - message.retrievalHint = retrievalHint; + if (!message.retrievalHint) { + log.warn("message delivered without a retrieval hint", message.messageId); + } this.localHistory.push(message); return true; diff --git a/packages/tests/tests/connection-mananger/network_monitor.spec.ts b/packages/tests/tests/connection-mananger/network_monitor.spec.ts index bfc0c2c322..76f2ec6b31 100644 --- a/packages/tests/tests/connection-mananger/network_monitor.spec.ts +++ b/packages/tests/tests/connection-mananger/network_monitor.spec.ts @@ -3,7 +3,7 @@ import type { PeerId } from "@libp2p/interface"; import { TypedEventEmitter } from "@libp2p/interface"; import { peerIdFromPrivateKey } from "@libp2p/peer-id"; import { Multiaddr } from "@multiformats/multiaddr"; -import { LightNode, Protocols, Tags } from "@waku/interfaces"; +import { LightNode, Protocols, Tags, WakuEvent } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; import { createLightNode } from "@waku/sdk"; import { expect } from "chai"; @@ -65,10 +65,13 @@ describe("Connection state", function () { it("should emit `waku:online` event only when first peer is connected", async function () { let eventCount = 0; const connectionStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - eventCount++; - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); }); await waku.dial(nwaku1PeerId, [Protocols.Filter]); @@ -87,10 +90,13 @@ describe("Connection state", function () { let eventCount = 0; const connectionStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - eventCount++; - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); }); await nwaku1.stop(); @@ -116,18 +122,24 @@ describe("Connection state", function () { let eventCount1 = 0; const connectionStatus1 = new Promise((resolve) => { - waku1.events.addEventListener("waku:connection", ({ detail: status }) => { - eventCount1++; - resolve(status); - }); + waku1.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + eventCount1++; + resolve(status); + } + ); }); let eventCount2 = 0; const connectionStatus2 = new Promise((resolve) => { - waku2.events.addEventListener("waku:connection", ({ detail: status }) => { - eventCount2++; - resolve(status); - }); + waku2.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + eventCount2++; + resolve(status); + } + ); }); await waku1.libp2p.peerStore.merge(waku2.peerId, { @@ -191,7 +203,7 @@ describe("Connection state", function () { }); }); -describe("waku:connection", function () { +describe(WakuEvent.Connection, function () { let navigatorMock: any; let originalNavigator: any; @@ -259,10 +271,13 @@ describe("waku:connection", function () { let eventCount = 0; const connectedStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - eventCount++; - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); }); waku.libp2p.dispatchEvent( @@ -279,9 +294,12 @@ describe("waku:connection", function () { expect(eventCount).to.be.eq(1); const disconnectedStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + resolve(status); + } + ); }); waku.libp2p.dispatchEvent( @@ -314,10 +332,13 @@ describe("waku:connection", function () { let eventCount = 0; const connectedStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - eventCount++; - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); }); waku.libp2p.dispatchEvent( @@ -331,9 +352,12 @@ describe("waku:connection", function () { expect(eventCount).to.be.eq(1); const disconnectedStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + resolve(status); + } + ); }); navigatorMock.onLine = false; @@ -346,9 +370,12 @@ describe("waku:connection", function () { expect(eventCount).to.be.eq(2); const connectionRecoveredStatus = new Promise((resolve) => { - waku.events.addEventListener("waku:connection", ({ detail: status }) => { - resolve(status); - }); + waku.events.addEventListener( + WakuEvent.Connection, + ({ detail: status }) => { + resolve(status); + } + ); }); navigatorMock.onLine = true; diff --git a/packages/utils/src/common/sharding/index.ts b/packages/utils/src/common/sharding/index.ts index 495ea42c6e..c4e816f389 100644 --- a/packages/utils/src/common/sharding/index.ts +++ b/packages/utils/src/common/sharding/index.ts @@ -1,165 +1,3 @@ -import { sha256 } from "@noble/hashes/sha256"; -import { - type ClusterId, - ContentTopic, - PubsubTopic, - type ShardId -} from "@waku/interfaces"; - -import { concat, utf8ToBytes } from "../../bytes/index.js"; - export * from "./type_guards.js"; export * from "./routing_info.js"; - -export const formatPubsubTopic = ( - clusterId: ClusterId, - shard: ShardId -): PubsubTopic => { - return `/waku/2/rs/${clusterId}/${shard}`; -}; - -/** - * @deprecated will be removed - */ -export const pubsubTopicToSingleShardInfo = ( - pubsubTopics: PubsubTopic -): { clusterId: ClusterId; shard: ShardId } => { - const parts = pubsubTopics.split("/"); - - if ( - parts.length != 6 || - parts[1] !== "waku" || - parts[2] !== "2" || - parts[3] !== "rs" - ) - throw new Error("Invalid pubsub topic"); - - const clusterId = parseInt(parts[4]); - const shard = parseInt(parts[5]); - - if (isNaN(clusterId) || isNaN(shard)) - throw new Error("Invalid clusterId or shard"); - - return { - clusterId, - shard - }; -}; - -interface ParsedContentTopic { - generation: number; - application: string; - version: string; - topicName: string; - encoding: string; -} - -/** - * Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/ - * @param contentTopic String to validate - * @returns Object with each content topic field as an attribute - */ -export function ensureValidContentTopic( - contentTopic: ContentTopic -): ParsedContentTopic { - const parts = (contentTopic as string).split("/"); - if (parts.length < 5 || parts.length > 6) { - throw Error(`Content topic format is invalid: ${contentTopic}`); - } - // Validate generation field if present - let generation = 0; - if (parts.length == 6) { - generation = parseInt(parts[1]); - if (isNaN(generation)) { - throw new Error( - `Invalid generation field in content topic: ${contentTopic}` - ); - } - if (generation > 0) { - throw new Error( - `Generation greater than 0 is not supported: ${contentTopic}` - ); - } - } - // Validate remaining fields - const fields = parts.splice(-4); - // Validate application field - if (fields[0].length == 0) { - throw new Error(`Application field cannot be empty: ${contentTopic}`); - } - // Validate version field - if (fields[1].length == 0) { - throw new Error(`Version field cannot be empty: ${contentTopic}`); - } - // Validate topic name field - if (fields[2].length == 0) { - throw new Error(`Topic name field cannot be empty: ${contentTopic}`); - } - // Validate encoding field - if (fields[3].length == 0) { - throw new Error(`Encoding field cannot be empty: ${contentTopic}`); - } - - return { - generation, - application: fields[0], - version: fields[1], - topicName: fields[2], - encoding: fields[3] - }; -} - -/** - * Given a string, determines which autoshard index to use for its pubsub topic. - * Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm - */ -export function contentTopicToShardIndex( - contentTopic: ContentTopic, - numShardsInCluster: number -): number { - const { application, version } = ensureValidContentTopic(contentTopic); - const digest = sha256( - concat([utf8ToBytes(application), utf8ToBytes(version)]) - ); - const dataview = new DataView(digest.buffer.slice(-8)); - return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster)); -} - -export function contentTopicToPubsubTopic( - contentTopic: ContentTopic, - clusterId: number, - numShardsInCluster: number -): string { - if (!contentTopic) { - throw Error("Content topic must be specified"); - } - - const shardIndex = contentTopicToShardIndex(contentTopic, numShardsInCluster); - return `/waku/2/rs/${clusterId}/${shardIndex}`; -} - -/** - * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. - * If any of the content topics are not properly formatted, the function will throw an error. - */ -export function contentTopicsByPubsubTopic( - contentTopics: ContentTopic[], - clusterId: number, - networkShards: number -): Map> { - const groupedContentTopics = new Map(); - for (const contentTopic of contentTopics) { - const pubsubTopic = contentTopicToPubsubTopic( - contentTopic, - clusterId, - networkShards - ); - let topics = groupedContentTopics.get(pubsubTopic); - if (!topics) { - groupedContentTopics.set(pubsubTopic, []); - topics = groupedContentTopics.get(pubsubTopic); - } - topics.push(contentTopic); - } - return groupedContentTopics; -} +export * from "./topics.js"; diff --git a/packages/utils/src/common/sharding/routing_info.ts b/packages/utils/src/common/sharding/routing_info.ts index a51de7cfc3..45194ba360 100644 --- a/packages/utils/src/common/sharding/routing_info.ts +++ b/packages/utils/src/common/sharding/routing_info.ts @@ -13,9 +13,9 @@ import { contentTopicToShardIndex, ensureValidContentTopic, formatPubsubTopic, - isAutoSharding, pubsubTopicToSingleShardInfo -} from "./index.js"; +} from "./topics.js"; +import { isAutoSharding } from "./type_guards.js"; export type RoutingInfo = AutoShardingRoutingInfo | StaticShardingRoutingInfo; diff --git a/packages/utils/src/common/sharding/index.spec.ts b/packages/utils/src/common/sharding/topics.spec.ts similarity index 75% rename from packages/utils/src/common/sharding/index.spec.ts rename to packages/utils/src/common/sharding/topics.spec.ts index 28b5aeba2e..2460983aab 100644 --- a/packages/utils/src/common/sharding/index.spec.ts +++ b/packages/utils/src/common/sharding/topics.spec.ts @@ -7,7 +7,7 @@ import { contentTopicToShardIndex, ensureValidContentTopic, pubsubTopicToSingleShardInfo -} from "./index.js"; +} from "./topics.js"; const ClusterId = 0; const NumShardsInCluster = 8; @@ -292,78 +292,3 @@ describe("pubsubTopicToSingleShardInfo with various invalid formats", () => { }); }); }); - -// describe("ensureShardingConfigured", () => { -// it("should return valid sharding parameters for static sharding", () => { -// const shardInfo = { clusterId: 1, shards: [0, 1] }; -// const result = ensureShardingConfigured(shardInfo); -// expect(result.shardInfo).to.deep.include({ -// clusterId: 1, -// shards: [0, 1] -// }); -// expect(result.shardInfo).to.deep.include({ clusterId: 1, shards: [0, 1] }); -// expect(result.pubsubTopics).to.have.members([ -// "/waku/2/rs/1/0", -// "/waku/2/rs/1/1" -// ]); -// }); -// -// it("should return valid sharding parameters for content topics autosharding", () => { -// const contentTopicInfo = { contentTopics: ["/app/v1/topic1/proto"] }; -// const result = ensureShardingConfigured(contentTopicInfo); -// const expectedPubsubTopic = contentTopicToPubsubTopic( -// "/app/v1/topic1/proto", -// DEFAULT_CLUSTER_ID -// ); -// expect(result.shardInfo.shards).to.include( -// contentTopicToShardIndex("/app/v1/topic1/proto") -// ); -// expect(result.pubsubTopics).to.include(expectedPubsubTopic); -// }); -// -// it("should throw an error for missing sharding configuration", () => { -// const shardInfo = {} as any as NetworkConfig; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// -// it("handles empty shards array correctly", () => { -// const shardInfo = { clusterId: 1, shards: [] }; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// -// it("handles empty contentTopics array correctly", () => { -// const shardInfo = { contentTopics: [] }; -// expect(() => ensureShardingConfigured(shardInfo)).to.throw(); -// }); -// }); -// -// describe("contentTopicToPubsubTopic", () => { -// it("should correctly map a content topic to a pubsub topic", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// expect(contentTopicToPubsubTopic(contentTopic)).to.equal("/waku/2/rs/1/4"); -// }); -// -// it("should map different content topics to different pubsub topics based on shard index", () => { -// const contentTopic1 = "/app/v1/topic1/proto"; -// const contentTopic2 = "/app/v2/topic2/proto"; -// const pubsubTopic1 = contentTopicToPubsubTopic(contentTopic1); -// const pubsubTopic2 = contentTopicToPubsubTopic(contentTopic2); -// expect(pubsubTopic1).not.to.equal(pubsubTopic2); -// }); -// -// it("should use the provided clusterId for the pubsub topic", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// const clusterId = 2; -// expect(contentTopicToPubsubTopic(contentTopic, clusterId)).to.equal( -// "/waku/2/rs/2/4" -// ); -// }); -// -// it("should correctly map a content topic to a pubsub topic for different network shard sizes", () => { -// const contentTopic = "/app/v1/topic1/proto"; -// const networkShards = 16; -// expect(contentTopicToPubsubTopic(contentTopic, 1, networkShards)).to.equal( -// "/waku/2/rs/1/4" -// ); -// }); -// }); diff --git a/packages/utils/src/common/sharding/topics.ts b/packages/utils/src/common/sharding/topics.ts new file mode 100644 index 0000000000..fb377315e7 --- /dev/null +++ b/packages/utils/src/common/sharding/topics.ts @@ -0,0 +1,162 @@ +import { sha256 } from "@noble/hashes/sha256"; +import { + type ClusterId, + ContentTopic, + PubsubTopic, + type ShardId +} from "@waku/interfaces"; + +import { concat, utf8ToBytes } from "../../bytes/index.js"; + +export const formatPubsubTopic = ( + clusterId: ClusterId, + shard: ShardId +): PubsubTopic => { + return `/waku/2/rs/${clusterId}/${shard}`; +}; + +/** + * @deprecated will be removed + */ +export const pubsubTopicToSingleShardInfo = ( + pubsubTopics: PubsubTopic +): { clusterId: ClusterId; shard: ShardId } => { + const parts = pubsubTopics.split("/"); + + if ( + parts.length != 6 || + parts[1] !== "waku" || + parts[2] !== "2" || + parts[3] !== "rs" + ) + throw new Error("Invalid pubsub topic"); + + const clusterId = parseInt(parts[4]); + const shard = parseInt(parts[5]); + + if (isNaN(clusterId) || isNaN(shard)) + throw new Error("Invalid clusterId or shard"); + + return { + clusterId, + shard + }; +}; + +interface ParsedContentTopic { + generation: number; + application: string; + version: string; + topicName: string; + encoding: string; +} + +/** + * Given a string, will throw an error if it is not formatted as a valid content topic for autosharding based on https://rfc.vac.dev/spec/51/ + * @param contentTopic String to validate + * @returns Object with each content topic field as an attribute + */ +export function ensureValidContentTopic( + contentTopic: ContentTopic +): ParsedContentTopic { + const parts = (contentTopic as string).split("/"); + if (parts.length < 5 || parts.length > 6) { + throw Error(`Content topic format is invalid: ${contentTopic}`); + } + // Validate generation field if present + let generation = 0; + if (parts.length == 6) { + generation = parseInt(parts[1]); + if (isNaN(generation)) { + throw new Error( + `Invalid generation field in content topic: ${contentTopic}` + ); + } + if (generation > 0) { + throw new Error( + `Generation greater than 0 is not supported: ${contentTopic}` + ); + } + } + // Validate remaining fields + const fields = parts.splice(-4); + // Validate application field + if (fields[0].length == 0) { + throw new Error(`Application field cannot be empty: ${contentTopic}`); + } + // Validate version field + if (fields[1].length == 0) { + throw new Error(`Version field cannot be empty: ${contentTopic}`); + } + // Validate topic name field + if (fields[2].length == 0) { + throw new Error(`Topic name field cannot be empty: ${contentTopic}`); + } + // Validate encoding field + if (fields[3].length == 0) { + throw new Error(`Encoding field cannot be empty: ${contentTopic}`); + } + + return { + generation, + application: fields[0], + version: fields[1], + topicName: fields[2], + encoding: fields[3] + }; +} + +/** + * Given a string, determines which autoshard index to use for its pubsub topic. + * Based on the algorithm described in the RFC: https://rfc.vac.dev/spec/51//#algorithm + */ +export function contentTopicToShardIndex( + contentTopic: ContentTopic, + numShardsInCluster: number +): number { + const { application, version } = ensureValidContentTopic(contentTopic); + const digest = sha256( + concat([utf8ToBytes(application), utf8ToBytes(version)]) + ); + const dataview = new DataView(digest.buffer.slice(-8)); + return Number(dataview.getBigUint64(0, false) % BigInt(numShardsInCluster)); +} + +export function contentTopicToPubsubTopic( + contentTopic: ContentTopic, + clusterId: number, + numShardsInCluster: number +): string { + if (!contentTopic) { + throw Error("Content topic must be specified"); + } + + const shardIndex = contentTopicToShardIndex(contentTopic, numShardsInCluster); + return `/waku/2/rs/${clusterId}/${shardIndex}`; +} + +/** + * Given an array of content topics, groups them together by their Pubsub topic as derived using the algorithm for autosharding. + * If any of the content topics are not properly formatted, the function will throw an error. + */ +export function contentTopicsByPubsubTopic( + contentTopics: ContentTopic[], + clusterId: number, + networkShards: number +): Map> { + const groupedContentTopics = new Map(); + for (const contentTopic of contentTopics) { + const pubsubTopic = contentTopicToPubsubTopic( + contentTopic, + clusterId, + networkShards + ); + let topics = groupedContentTopics.get(pubsubTopic); + if (!topics) { + groupedContentTopics.set(pubsubTopic, []); + topics = groupedContentTopics.get(pubsubTopic); + } + topics.push(contentTopic); + } + return groupedContentTopics; +}