From b7693853d24586d808cd69c45905478ef0f72991 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Thu, 19 Aug 2021 15:49:43 +1000 Subject: [PATCH] Added support for `startTime` and `endTime` in Store queries --- CHANGELOG.md | 1 + src/lib/waku_store/history_rpc.ts | 8 ++-- src/lib/waku_store/index.spec.ts | 68 +++++++++++++++++++++++++++++++ src/lib/waku_store/index.ts | 16 ++++++++ 4 files changed, 90 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b9b657e23..5b461677dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - New `bootstrap` option for `Waku.create` to easily connect to Waku nodes upon start up. +- Support for `startTime` and `endTime` in Store queries to filter by time window as per [21/WAKU2-FTSTORE](https://rfc.vac.dev/spec/21/). ### Changed - Renamed `discover.getStatusFleetNodes` to `discovery.getBootstrapNodes`; diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index dd2ad185ae..cdf328fb57 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -10,10 +10,12 @@ export enum Direction { export interface Params { contentTopics: string[]; - cursor?: proto.Index; pubSubTopic: string; direction: Direction; pageSize: number; + startTime?: number; + endTime?: number; + cursor?: proto.Index; } export class HistoryRPC { @@ -40,8 +42,8 @@ export class HistoryRPC { pubSubTopic: params.pubSubTopic, contentFilters, pagingInfo, - startTime: undefined, - endTime: undefined, + startTime: params.startTime, + endTime: params.endTime, }, response: undefined, }); diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index a41bb84cc2..5fb7afd320 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -245,4 +245,72 @@ describe('Waku Store', () => { await Promise.all([waku1.stop(), waku2.stop()]); }); + + it('Retrieves history using start and end time', async function () { + this.timeout(5_000); + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true }); + + const startTime = new Date(); + + const message1Timestamp = new Date(); + message1Timestamp.setTime(startTime.getTime() + 60 * 1000); + const message2Timestamp = new Date(); + message2Timestamp.setTime(startTime.getTime() + 2 * 60 * 1000); + const messageTimestamps = [message1Timestamp, message2Timestamp]; + + const endTime = new Date(); + endTime.setTime(startTime.getTime() + 3 * 60 * 1000); + + let firstMessageTime; + for (let i = 0; i < 2; i++) { + expect( + await nimWaku.sendMessage( + await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic, { + timestamp: messageTimestamps[i], + }) + ) + ).to.be.true; + if (!firstMessageTime) firstMessageTime = Date.now() / 1000; + } + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + // TODO: This scenario can be tested once https://github.com/status-im/nim-waku/issues/706 is done + // const noMessage = await waku.store.queryHistory([], { + // peerId: nimPeerId, + // endTime: startTime, + // }); + + const firstMessage = await waku.store.queryHistory([], { + peerId: nimPeerId, + startTime, + endTime: message1Timestamp, + }); + + const bothMessages = await waku.store.queryHistory([], { + peerId: nimPeerId, + startTime, + endTime, + }); + + // expect(noMessage?.length).eq(0); + expect(firstMessage?.length).eq(1); + + expect(firstMessage[0]?.payloadAsUtf8).eq('Message 0'); + + expect(bothMessages?.length).eq(2); + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 2ecbcecc5d..84f94161b3 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -36,6 +36,8 @@ export interface QueryOptions { pubSubTopic?: string; direction?: Direction; pageSize?: number; + startTime?: Date; + endTime?: Date; callback?: (messages: WakuMessage[]) => void; decryptionKeys?: Uint8Array[]; } @@ -61,6 +63,8 @@ export class WakuStore { * retrieve all messages. * @param options * @param options.peerId The peer to query.Options + * @param options.startTime Query messages with a timestamp greater than this value. + * @param options.endTime Query messages with a timestamp lesser than this value. * @param options.pubSubTopic The pubsub topic to pass to the query. Defaults * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/). * @param options.callback Callback called on page of stored messages as they are retrieved @@ -73,6 +77,14 @@ export class WakuStore { contentTopics: string[], options?: QueryOptions ): Promise { + let startTime, endTime; + if (options?.startTime) { + startTime = options.startTime.getTime() / 1000; + } + if (options?.endTime) { + endTime = options.endTime.getTime() / 1000; + } + const opts = Object.assign( { pubSubTopic: this.pubSubTopic, @@ -80,6 +92,10 @@ export class WakuStore { pageSize: 10, }, options, + { + startTime, + endTime, + }, { contentTopics } ); dbg('Querying history with the following options', options);