From 8542d04bf5c9472f955ef8c9e5bc9e89c70f4738 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Thu, 28 Aug 2025 13:58:16 +1000 Subject: [PATCH] feat: query on connect (#2602) * feat: query on connect Perform store time-range queries upon connecting to a store node. Some heuristics are applied to ensure the store queries are not too frequent. * make `maybeQuery` private * query-on-connect: use index.ts only for re-export * query-on-connect: update doc --- packages/interfaces/src/store.ts | 15 + packages/sdk/src/query_on_connect/index.ts | 5 + .../query_on_connect/query_on_connect.spec.ts | 803 ++++++++++++++++++ .../src/query_on_connect/query_on_connect.ts | 208 +++++ packages/sdk/src/store/store.spec.ts | 32 + packages/sdk/src/store/store.ts | 4 +- 6 files changed, 1065 insertions(+), 2 deletions(-) create mode 100644 packages/sdk/src/query_on_connect/index.ts create mode 100644 packages/sdk/src/query_on_connect/query_on_connect.spec.ts create mode 100644 packages/sdk/src/query_on_connect/query_on_connect.ts diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index 014842aaa6..8a1e91a451 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -1,3 +1,5 @@ +import type { PeerId } from "@libp2p/interface"; + import type { IDecodedMessage, IDecoder } from "./message.js"; export type StoreCursor = Uint8Array; @@ -73,6 +75,19 @@ export type QueryRequestParams = { * @default undefined */ paginationLimit?: number; + + /** + * The service node to use for queries. Will fail if: + * - this peer is not in the peer store. + * - we are not connected to this peer + * No fallback is done. Overrides any other peer selection option. + * + * Expected to be used with [[PeerManagerEventNames.StoreConnect]] so that + * we know we are connected to this peer before doing the store query. + * + * Only use if you know what you are doing. + */ + peerId?: PeerId; }; export type IStore = { diff --git a/packages/sdk/src/query_on_connect/index.ts b/packages/sdk/src/query_on_connect/index.ts new file mode 100644 index 0000000000..982b19a916 --- /dev/null +++ b/packages/sdk/src/query_on_connect/index.ts @@ -0,0 +1,5 @@ +export { + QueryOnConnectOptions, + QueryOnConnectEvent, + QueryOnConnect +} from "./query_on_connect.js"; diff --git a/packages/sdk/src/query_on_connect/query_on_connect.spec.ts b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts new file mode 100644 index 0000000000..cdbc6f2da6 --- /dev/null +++ b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts @@ -0,0 +1,803 @@ +import { type PeerId, TypedEventEmitter } from "@libp2p/interface"; +import { + HealthStatus, + type IDecodedMessage, + type IDecoder, + IWakuEventEmitter, + QueryRequestParams, + WakuEvent +} from "@waku/interfaces"; +import { delay } from "@waku/utils"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { + IPeerManagerEvents, + PeerManagerEventNames +} from "../peer_manager/peer_manager.js"; + +import { + calculateTimeRange, + QueryOnConnect, + QueryOnConnectEvent, + QueryOnConnectOptions +} from "./query_on_connect.js"; + +describe("QueryOnConnect", () => { + let queryOnConnect: QueryOnConnect; + let mockDecoders: IDecoder[]; + let mockPeerManagerEventEmitter: TypedEventEmitter; + let mockWakuEventEmitter: IWakuEventEmitter; + let mockQueryGenerator: sinon.SinonStub; + let mockPeerId: PeerId; + let options: QueryOnConnectOptions; + + beforeEach(() => { + // Mock decoders + mockDecoders = [ + { + contentTopic: "/test/1/content", + fromWireToProtoObj: sinon.stub(), + fromProtoObj: sinon.stub() + } as any, + { + contentTopic: "/test/2/content", + fromWireToProtoObj: sinon.stub(), + fromProtoObj: sinon.stub() + } as any + ]; + + // Mock peer manager event emitter + mockPeerManagerEventEmitter = { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + dispatchEvent: sinon.stub() + } as any; + + // Mock waku event emitter + mockWakuEventEmitter = { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub(), + dispatchEvent: sinon.stub() + } as any; + + // Mock retrieve function + mockQueryGenerator = sinon.stub().callsFake(() => + (async function* () { + yield [ + Promise.resolve({ + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined, + hashStr: "12345" + } as IDecodedMessage) + ]; + })() + ); + + mockPeerId = { + toString: () => "QmTestPeerId" + } as unknown as PeerId; + + // Mock options + options = { + forceQueryThresholdMs: 10000 + }; + }); + + describe("constructor", () => { + it("should create QueryOnConnect instance with all required parameters", () => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + expect(queryOnConnect).to.be.instanceOf(QueryOnConnect); + expect(queryOnConnect.decoders).to.equal(mockDecoders); + }); + + it("should create QueryOnConnect instance without options", () => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator + ); + + expect(queryOnConnect).to.be.instanceOf(QueryOnConnect); + expect(queryOnConnect.decoders).to.equal(mockDecoders); + }); + + it("should accept empty decoders array", () => { + queryOnConnect = new QueryOnConnect( + [], + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + expect(queryOnConnect.decoders).to.deep.equal([]); + }); + }); + + describe("start and stop", () => { + beforeEach(() => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + it("should set up event listeners when started", () => { + const peerEventSpy = + mockPeerManagerEventEmitter.addEventListener as sinon.SinonSpy; + const wakuEventSpy = + mockWakuEventEmitter.addEventListener as sinon.SinonSpy; + + queryOnConnect.start(); + + expect(peerEventSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be + .true; + expect(wakuEventSpy.calledWith(WakuEvent.Health)).to.be.true; + }); + + it("should remove event listeners when stopped", () => { + const peerRemoveSpy = + mockPeerManagerEventEmitter.removeEventListener as sinon.SinonSpy; + const wakuRemoveSpy = + mockWakuEventEmitter.removeEventListener as sinon.SinonSpy; + + queryOnConnect.start(); + queryOnConnect.stop(); + + expect(peerRemoveSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be + .true; + expect(wakuRemoveSpy.calledWith(WakuEvent.Health)).to.be.true; + }); + }); + + describe("mock validation", () => { + beforeEach(() => { + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + it("should work with stubbed peer manager event emitter", () => { + expect(mockPeerManagerEventEmitter.addEventListener).to.be.a("function"); + expect(mockPeerManagerEventEmitter.removeEventListener).to.be.a( + "function" + ); + expect(mockPeerManagerEventEmitter.dispatchEvent).to.be.a("function"); + }); + + it("should work with stubbed waku event emitter", () => { + expect(mockWakuEventEmitter.addEventListener).to.be.a("function"); + expect(mockWakuEventEmitter.removeEventListener).to.be.a("function"); + expect(mockWakuEventEmitter.dispatchEvent).to.be.a("function"); + }); + + it("should work with stubbed retrieve function", () => { + expect(mockQueryGenerator).to.be.a("function"); + }); + + it("should work with mock decoders", () => { + expect(mockDecoders).to.be.an("array"); + expect(mockDecoders[0]).to.have.property("contentTopic"); + expect(mockDecoders[0]).to.have.property("fromWireToProtoObj"); + expect(mockDecoders[0]).to.have.property("fromProtoObj"); + }); + }); + + describe("event handling simulation", () => { + let addEventListenerStub: sinon.SinonStub; + let healthEventCallback: (event: CustomEvent) => void; + let storeConnectCallback: () => void; + + beforeEach(() => { + addEventListenerStub = sinon.stub(); + mockPeerManagerEventEmitter.addEventListener = addEventListenerStub; + mockWakuEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === WakuEvent.Health) { + healthEventCallback = callback; + } + }); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + it("should capture event listeners for testing", () => { + queryOnConnect.start(); + + expect( + addEventListenerStub.calledWith(PeerManagerEventNames.StoreConnect) + ).to.be.true; + + storeConnectCallback = addEventListenerStub.getCall(0).args[1]; + expect(storeConnectCallback).to.be.a("function"); + }); + + it("should properly setup health event callback", () => { + queryOnConnect.start(); + + expect(mockWakuEventEmitter.addEventListener).to.be.a("function"); + expect(healthEventCallback).to.be.a("function"); + }); + }); + + describe("async generator retrieve function mock", () => { + it("should work with async generator that yields promises", async () => { + const mockMessage: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/1/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + yield [Promise.resolve(undefined)]; + }; + + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + const generator = mockQueryGenerator(mockDecoders, {}); + const firstPage = await generator.next(); + expect(firstPage.done).to.be.false; + + const messages = await Promise.all(firstPage.value); + expect(messages[0]).to.deep.equal(mockMessage); + }); + + it("should handle retrieve function with query parameters", async () => { + const queryParams: Partial = { + timeStart: new Date(Date.now() - 1000), + timeEnd: new Date() + }; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + mockQueryGenerator(mockDecoders, queryParams); + + expect(mockQueryGenerator.calledWith(mockDecoders, queryParams)).to.be + .true; + }); + }); + + describe("message retrieval event emission conditions", () => { + let mockClock: sinon.SinonFakeTimers; + + beforeEach(() => { + mockClock = sinon.useFakeTimers(); + mockClock.tick(10); // always tick as now === 0 messes up the logic + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + }); + + afterEach(() => { + mockClock.restore(); + }); + + it("should trigger query when it went offline since the last successful query", async () => { + let healthEventCallback: + | ((event: CustomEvent) => void) + | undefined; + + // Capture the health event callback + mockWakuEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === WakuEvent.Health) { + healthEventCallback = callback; + } + }); + + queryOnConnect.start(); + + // Set lastSuccessfulQuery to simulate old query + await queryOnConnect["maybeQuery"](mockPeerId); + mockClock.tick(1); + + // goes offline + const healthEvent = new CustomEvent("health", { + detail: HealthStatus.Unhealthy + }); + expect(healthEventCallback).to.not.be.undefined; + healthEventCallback!.call(queryOnConnect, healthEvent); + mockClock.tick(1); + + // Call maybeQuery directly to test condition + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + + it("should not trigger query if health event is healthy since last successful query", async () => { + queryOnConnect.start(); + + // Set lastSuccessfulQuery to simulate old query + await queryOnConnect["maybeQuery"](mockPeerId); + + // goes offline + const healthEvent = new CustomEvent("health", { + detail: HealthStatus.SufficientlyHealthy + }); + mockWakuEventEmitter.dispatchEvent(healthEvent); + + // Call maybeQuery directly to test condition + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should trigger query when time since last query exceeds threshold", async function () { + const customThreshold = 10; + const customOptions: QueryOnConnectOptions = { + forceQueryThresholdMs: customThreshold + }; + + const queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + customOptions + ); + queryOnConnect.start(); + + // Set lastSuccessfulQuery to simulate old query + await queryOnConnect["maybeQuery"](mockPeerId); + + // Advance fake timer over the force threshold + mockClock.tick(20); + + // Call maybeQuery directly to test condition + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + + it("should not trigger query when a recent query happened under threshold", async () => { + const customThreshold = 2000; + const customOptions: QueryOnConnectOptions = { + forceQueryThresholdMs: customThreshold + }; + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + customOptions + ); + + queryOnConnect.start(); + + // First call to set a successful call + await queryOnConnect["maybeQuery"](mockPeerId); + + // Second call should not trigger + await queryOnConnect["maybeQuery"](mockPeerId); + + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + }); + + describe("end-to-end message emission tests", () => { + let storeConnectCallback: (event: CustomEvent) => void; + let healthEventCallback: (event: CustomEvent) => void; + let messageEventPromise: Promise; + let resolveMessageEvent: (messages: IDecodedMessage[]) => void; + let rejectMessageEvent: (reason: string) => void; + let connectStoreEvent: CustomEvent; + + beforeEach(() => { + // Create a promise that resolves when a message event is emitted + messageEventPromise = new Promise( + (resolve, reject) => { + resolveMessageEvent = resolve; + rejectMessageEvent = reject; + } + ); + + // Setup event listener capture with proper binding + mockPeerManagerEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === PeerManagerEventNames.StoreConnect) { + storeConnectCallback = callback; + } + }); + + mockWakuEventEmitter.addEventListener = sinon + .stub() + .callsFake((eventType, callback) => { + if (eventType === WakuEvent.Health) { + healthEventCallback = callback; + } + }); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + options + ); + + // Listen for message events + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + resolveMessageEvent(event.detail); + } + ); + + connectStoreEvent = new CustomEvent("connect:store", { + detail: mockPeerId + }); + + // Set a timeout to reject if no message is received + setTimeout( + () => rejectMessageEvent("No message received within timeout"), + 500 + ); + }); + + it("should emit message when we just started and store connect event occurs", async () => { + const mockMessage: IDecodedMessage = { + hash: utf8ToBytes("1234"), + hashStr: "1234", + version: 1, + timestamp: new Date(), + contentTopic: "/test/offline/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return the mock message + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect.start(); + + // Step 2: Simulate store peer reconnection + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Step 4: Wait for message emission + const receivedMessage = await messageEventPromise; + + expect(receivedMessage).to.deep.equal([mockMessage]); + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should emit message when we went offline since last successful query and store reconnect event occurs", async () => { + const mockMessage: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "1234", + version: 1, + timestamp: new Date(), + contentTopic: "/test/offline/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return the mock message + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect.start(); + + // Step 1: Simulate successful query in the past + await queryOnConnect["maybeQuery"](mockPeerId); + await delay(100); + + // Step 2: Simulate going offline after the successful query + const healthEvent = new CustomEvent("health", { + detail: HealthStatus.Unhealthy + }); + healthEventCallback.call(queryOnConnect, healthEvent); + + // Step 3: Simulate store peer reconnection + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Step 4: Wait for message emission + const receivedMessages = await messageEventPromise; + + expect(receivedMessages).to.deep.equal([mockMessage]); + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + + it("should emit message when store reconnect event occurs and last query was over max time threshold", async () => { + const mockMessage: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/timeout/content", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([4, 5, 6]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return the mock message + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + queryOnConnect = new QueryOnConnect( + mockDecoders, + mockPeerManagerEventEmitter, + mockWakuEventEmitter, + mockQueryGenerator, + { forceQueryThresholdMs: 5000 } // 5 second threshold + ); + + // Re-setup event listeners for new instance + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + resolveMessageEvent(event.detail); + } + ); + + queryOnConnect.start(); + + // Step 1: Simulate old successful query (over threshold) + await queryOnConnect["maybeQuery"](mockPeerId); + + // Step 3: Simulate store peer reconnection + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Step 4: Wait for message emission + const receivedMessages = await messageEventPromise; + + expect(receivedMessages).to.deep.equal([mockMessage]); + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should emit multiple messages when query returns multiple messages", async () => { + const mockMessage1: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/multi/content1", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([1, 2, 3]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + const mockMessage2: IDecodedMessage = { + hash: new Uint8Array(), + hashStr: "", + version: 1, + timestamp: new Date(), + contentTopic: "/test/multi/content2", + pubsubTopic: "/waku/2/default-waku/proto", + payload: new Uint8Array([4, 5, 6]), + rateLimitProof: undefined, + ephemeral: false, + meta: undefined + }; + + // Setup retrieve function to return multiple messages + const mockAsyncGenerator = async function* (): AsyncGenerator< + Promise[] + > { + yield [Promise.resolve(mockMessage1)]; + yield [Promise.resolve(mockMessage2)]; + }; + mockQueryGenerator.returns(mockAsyncGenerator()); + + const receivedMessages: IDecodedMessage[] = []; + let messageCount = 0; + + // Create a new promise for multiple messages + const multipleMessagesPromise = new Promise((resolve) => { + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (event: CustomEvent) => { + receivedMessages.push(...event.detail); + messageCount++; + if (messageCount === 2) { + resolve(); + } + } + ); + }); + + queryOnConnect.start(); + + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Wait for all messages with timeout + await Promise.race([ + multipleMessagesPromise, + delay(200).then(() => + Promise.reject(new Error("Timeout waiting for messages")) + ) + ]); + + expect(receivedMessages).to.have.length(2); + expect(receivedMessages[0]).to.deep.equal(mockMessage1); + expect(receivedMessages[1]).to.deep.equal(mockMessage2); + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should not emit message when conditions are not met (recent query, no offline)", async () => { + queryOnConnect.start(); + + await queryOnConnect["maybeQuery"](mockPeerId); + + // Override promise to reject if any message is received + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + () => { + rejectMessageEvent("Unexpected message emission"); + } + ); + + await delay(10); + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Wait briefly to ensure no message is emitted + await delay(50); + + expect(mockQueryGenerator.calledOnce).to.be.true; + }); + + it("should handle retrieve errors gracefully without emitting messages", async () => { + // Setup retrieve function to throw an error + mockQueryGenerator.rejects(new Error("Retrieval failed")); + + queryOnConnect.start(); + + // Override promise to reject if any message is received + queryOnConnect.addEventListener( + QueryOnConnectEvent.MessagesRetrieved, + (_event: CustomEvent) => { + rejectMessageEvent("Unexpected message emission after error"); + } + ); + + await queryOnConnect["maybeQuery"](mockPeerId); + storeConnectCallback.call(queryOnConnect, connectStoreEvent); + + // Wait briefly to ensure no message is emitted + await delay(100); + + expect(mockQueryGenerator.calledTwice).to.be.true; + }); + }); +}); + +describe("calculateTimeRange", () => { + it("should return start time to last successful query since last query is less than max range", () => { + const now = 1000000; // Some arbitrary timestamp + const lastSuccessfulQuery = now - 100; // 100ms ago + const maxTimeRangeQueryMs = 500; // 500ms max range + + const result = calculateTimeRange( + now, + lastSuccessfulQuery, + maxTimeRangeQueryMs + ); + + const expectedTimeStart = new Date(lastSuccessfulQuery); + const expectedTimeEnd = new Date(now); + + expect(result.timeStart).to.deep.equal(expectedTimeStart); + expect(result.timeEnd).to.deep.equal(expectedTimeEnd); + }); + + it("should return start time to match max range", () => { + const now = 1000000; + const lastSuccessfulQuery = 1000000 - 800; // 800ms ago + const maxTimeRangeQueryMs = 500; // 500ms max range + + const result = calculateTimeRange( + now, + lastSuccessfulQuery, + maxTimeRangeQueryMs + ); + + const expectedTimeStart = new Date(now - maxTimeRangeQueryMs); + const expectedTimeEnd = new Date(now); + + expect(result.timeStart).to.deep.equal(expectedTimeStart); + expect(result.timeEnd).to.deep.equal(expectedTimeEnd); + }); + + it("should handle zero lastSuccessfulQuery (never queried before)", () => { + const now = 1000000; + const lastSuccessfulQuery = 0; // Never queried + const maxTimeRangeQueryMs = 500; + + const result = calculateTimeRange( + now, + lastSuccessfulQuery, + maxTimeRangeQueryMs + ); + + const expectedTimeStart = new Date(now - maxTimeRangeQueryMs); // 1000000 - 1000000 = 0 + const expectedTimeEnd = new Date(now); // 1000000 + + expect(result.timeStart).to.deep.equal(expectedTimeStart); + expect(result.timeEnd).to.deep.equal(expectedTimeEnd); + }); +}); diff --git a/packages/sdk/src/query_on_connect/query_on_connect.ts b/packages/sdk/src/query_on_connect/query_on_connect.ts new file mode 100644 index 0000000000..f42c2ada91 --- /dev/null +++ b/packages/sdk/src/query_on_connect/query_on_connect.ts @@ -0,0 +1,208 @@ +import { type PeerId, TypedEventEmitter } from "@libp2p/interface"; +import { + HealthStatus, + type IDecodedMessage, + type IDecoder, + IWakuEventEmitter, + QueryRequestParams, + WakuEvent +} from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +import { + IPeerManagerEvents, + PeerManagerEventNames +} from "../peer_manager/peer_manager.js"; + +const log = new Logger("sdk:query-on-connect"); + +export const DEFAULT_FORCE_QUERY_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes +export const MAX_TIME_RANGE_QUERY_MS = 24 * 60 * 60 * 1000; // 24 hours + +export interface QueryOnConnectOptions { + /** + * Elapsed time since the last successful query, after which we proceed with + * a store query, on a connection event, no matter the conditions. + * @default [[DEFAULT_FORCE_QUERY_THRESHOLD_MS]] + */ + forceQueryThresholdMs?: number; +} + +export enum QueryOnConnectEvent { + /** + * A message has been retrieved. + */ + MessagesRetrieved = "messages:retrieved" +} + +export type QueryOnConnectEvents = { + [QueryOnConnectEvent.MessagesRetrieved]: CustomEvent; +}; + +/** + * Proceed with time-range store queries after connection to a store node. + * Partial implementation of [Waku P2P Reliability](https://github.com/waku-org/specs/blob/master/standards/application/p2p-reliability.md) + * + * @emits message retrieved on "messages" + */ +export class QueryOnConnect< + T extends IDecodedMessage +> extends TypedEventEmitter { + private lastSuccessfulQuery: number; + private lastTimeOffline: number; + private readonly forceQueryThresholdMs: number; + + public constructor( + public decoders: IDecoder[], + private readonly peerManagerEventEmitter: TypedEventEmitter, + private readonly wakuEventEmitter: IWakuEventEmitter, + private readonly _queryGenerator: ( + decoders: IDecoder[], + options?: Partial + ) => AsyncGenerator[]>, + options?: QueryOnConnectOptions + ) { + super(); + this.lastSuccessfulQuery = 0; + this.lastTimeOffline = 0; + this.forceQueryThresholdMs = + options?.forceQueryThresholdMs ?? DEFAULT_FORCE_QUERY_THRESHOLD_MS; + } + + public start(): void { + log.info("starting query-on-connect service"); + this.setupEventListeners(); + } + + public stop(): void { + this.unsetEventListeners(); + } + + /** + * Mainly exposed for testing. Only use if you know what you are doing. + * + * Proceed with a query if: + * - No successful query has happened + * - OR, We detected that we were offline since last successful query + * - OR, It bas been more than `forceQueryThresholdMs` than last query + * + * [[QueryOnConnect]] handles the listening to event to call this function. + * + * @param peerId A store peer id. Must be passed as we expect this to be trigger + * upon a detected connection to a store peer. + */ + private async maybeQuery(peerId: PeerId): Promise { + const timeSinceLastQuery = Date.now() - this.lastSuccessfulQuery; + log.info( + `maybe do store query to ${peerId.toString()}`, + this.lastSuccessfulQuery, + this.lastTimeOffline, + timeSinceLastQuery, + this.forceQueryThresholdMs + ); + + if ( + this.lastSuccessfulQuery === 0 || + this.lastTimeOffline > this.lastSuccessfulQuery || + timeSinceLastQuery > this.forceQueryThresholdMs + ) { + await this.query(peerId); + } else { + log.info(`no querying`); + } + } + + private async query(peerId: PeerId): Promise { + log.info(`perform store query to ${peerId.toString()}`); + const { timeStart, timeEnd } = this.queryTimeRange(); + try { + for await (const page of this._queryGenerator(this.decoders, { + timeStart, + timeEnd, + peerId + })) { + // Await for decoding + const messages = (await Promise.all(page)).filter( + (m) => m !== undefined + ); + // Bundle the messages to help batch process by sds + this.dispatchMessages(messages); + } + + // Didn't throw, so it didn't fail + this.lastSuccessfulQuery = Date.now(); + } catch (err) { + log.warn(`store query to ${peerId.toString()} failed`, err); + } + } + + private queryTimeRange(): { timeStart: Date; timeEnd: Date } { + return calculateTimeRange( + Date.now(), + this.lastSuccessfulQuery, + MAX_TIME_RANGE_QUERY_MS + ); + } + + private dispatchMessages(messages: T[]): void { + log.info( + "dispatching messages", + messages.map((m) => m.hashStr) + ); + this.dispatchEvent( + new CustomEvent( + QueryOnConnectEvent.MessagesRetrieved, + { + detail: messages + } + ) + ); + } + + private setupEventListeners(): void { + this.peerManagerEventEmitter.addEventListener( + PeerManagerEventNames.StoreConnect, + (event) => + void this.maybeQuery(event.detail).catch((err) => + log.error("query-on-connect error", err) + ) + ); + + this.wakuEventEmitter.addEventListener( + WakuEvent.Health, + this.updateLastOfflineDate.bind(this) + ); + } + + private unsetEventListeners(): void { + this.peerManagerEventEmitter.removeEventListener( + PeerManagerEventNames.StoreConnect, + (event) => + void this.maybeQuery(event.detail).catch((err) => + log.error("query-on-connect error", err) + ) + ); + + this.wakuEventEmitter.removeEventListener( + WakuEvent.Health, + this.updateLastOfflineDate.bind(this) + ); + } + + private updateLastOfflineDate(event: CustomEvent): void { + if (event.detail === HealthStatus.Unhealthy) { + this.lastTimeOffline = Date.now(); + } + } +} + +export function calculateTimeRange( + now: number, + lastSuccessfulQuery: number, + maxTimeRangeQueryMs: number +): { timeStart: Date; timeEnd: Date } { + const timeRange = Math.min(now - lastSuccessfulQuery, maxTimeRangeQueryMs); + const timeStart = new Date(now - timeRange); + const timeEnd = new Date(now); + return { timeStart, timeEnd }; +} diff --git a/packages/sdk/src/store/store.spec.ts b/packages/sdk/src/store/store.spec.ts index 4931aafa7c..8983ba2335 100644 --- a/packages/sdk/src/store/store.spec.ts +++ b/packages/sdk/src/store/store.spec.ts @@ -1,3 +1,4 @@ +import { type PeerId } from "@libp2p/interface"; import { StoreCore } from "@waku/core"; import type { IDecodedMessage, IDecoder, Libp2p } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; @@ -292,5 +293,36 @@ describe("Store", () => { expect(mockPeerManager.getPeers.called).to.be.true; }); + + it("should use peerId from options when provided to queryGenerator", async () => { + const customPeerId = { + toString: () => "QmCustomPeerId" + } as unknown as PeerId; + + const mockMessages = [Promise.resolve(mockMessage)]; + const mockResponseGenerator = (async function* () { + yield mockMessages; + })(); + + mockStoreCore.queryPerPage.returns(mockResponseGenerator); + + const generator = store.queryGenerator([mockDecoder], { + peerId: customPeerId + }); + + const results = []; + for await (const messages of generator) { + results.push(messages); + } + + expect(mockPeerManager.getPeers.called).to.be.false; + + expect(mockStoreCore.queryPerPage.called).to.be.true; + const callArgs = mockStoreCore.queryPerPage.getCall(0).args; + expect(callArgs[2]).to.equal(customPeerId); + + expect(results).to.have.length(1); + expect(results[0]).to.equal(mockMessages); + }); }); }); diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 1297060cf2..2165005899 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -65,8 +65,8 @@ export class Store implements IStore { ); for (const queryOption of queryOptions) { - const peer = await this.getPeerToUse(queryOption.pubsubTopic); - + const peer = + options?.peerId ?? (await this.getPeerToUse(queryOption.pubsubTopic)); if (!peer) { log.error("No peers available to query"); throw new Error("No peers available to query");