Added support for `startTime` and `endTime` in Store queries

This commit is contained in:
Franck Royer 2021-08-19 15:49:43 +10:00
parent c2109736d4
commit b7693853d2
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
4 changed files with 90 additions and 3 deletions

View File

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- New `bootstrap` option for `Waku.create` to easily connect to Waku nodes upon start up. - New `bootstrap` option for `Waku.create` to easily connect to Waku nodes upon start up.
- Support for `startTime` and `endTime` in Store queries to filter by time window as per [21/WAKU2-FTSTORE](https://rfc.vac.dev/spec/21/).
### Changed ### Changed
- Renamed `discover.getStatusFleetNodes` to `discovery.getBootstrapNodes`; - Renamed `discover.getStatusFleetNodes` to `discovery.getBootstrapNodes`;

View File

@ -10,10 +10,12 @@ export enum Direction {
export interface Params { export interface Params {
contentTopics: string[]; contentTopics: string[];
cursor?: proto.Index;
pubSubTopic: string; pubSubTopic: string;
direction: Direction; direction: Direction;
pageSize: number; pageSize: number;
startTime?: number;
endTime?: number;
cursor?: proto.Index;
} }
export class HistoryRPC { export class HistoryRPC {
@ -40,8 +42,8 @@ export class HistoryRPC {
pubSubTopic: params.pubSubTopic, pubSubTopic: params.pubSubTopic,
contentFilters, contentFilters,
pagingInfo, pagingInfo,
startTime: undefined, startTime: params.startTime,
endTime: undefined, endTime: params.endTime,
}, },
response: undefined, response: undefined,
}); });

View File

@ -245,4 +245,72 @@ describe('Waku Store', () => {
await Promise.all([waku1.stop(), waku2.stop()]); await Promise.all([waku1.stop(), waku2.stop()]);
}); });
it('Retrieves history using start and end time', async function () {
this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ persistMessages: true });
const startTime = new Date();
const message1Timestamp = new Date();
message1Timestamp.setTime(startTime.getTime() + 60 * 1000);
const message2Timestamp = new Date();
message2Timestamp.setTime(startTime.getTime() + 2 * 60 * 1000);
const messageTimestamps = [message1Timestamp, message2Timestamp];
const endTime = new Date();
endTime.setTime(startTime.getTime() + 3 * 60 * 1000);
let firstMessageTime;
for (let i = 0; i < 2; i++) {
expect(
await nimWaku.sendMessage(
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic, {
timestamp: messageTimestamps[i],
})
)
).to.be.true;
if (!firstMessageTime) firstMessageTime = Date.now() / 1000;
}
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
libp2p: { modules: { transport: [TCP] } },
});
await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
// TODO: This scenario can be tested once https://github.com/status-im/nim-waku/issues/706 is done
// const noMessage = await waku.store.queryHistory([], {
// peerId: nimPeerId,
// endTime: startTime,
// });
const firstMessage = await waku.store.queryHistory([], {
peerId: nimPeerId,
startTime,
endTime: message1Timestamp,
});
const bothMessages = await waku.store.queryHistory([], {
peerId: nimPeerId,
startTime,
endTime,
});
// expect(noMessage?.length).eq(0);
expect(firstMessage?.length).eq(1);
expect(firstMessage[0]?.payloadAsUtf8).eq('Message 0');
expect(bothMessages?.length).eq(2);
});
}); });

View File

@ -36,6 +36,8 @@ export interface QueryOptions {
pubSubTopic?: string; pubSubTopic?: string;
direction?: Direction; direction?: Direction;
pageSize?: number; pageSize?: number;
startTime?: Date;
endTime?: Date;
callback?: (messages: WakuMessage[]) => void; callback?: (messages: WakuMessage[]) => void;
decryptionKeys?: Uint8Array[]; decryptionKeys?: Uint8Array[];
} }
@ -61,6 +63,8 @@ export class WakuStore {
* retrieve all messages. * retrieve all messages.
* @param options * @param options
* @param options.peerId The peer to query.Options * @param options.peerId The peer to query.Options
* @param options.startTime Query messages with a timestamp greater than this value.
* @param options.endTime Query messages with a timestamp lesser than this value.
* @param options.pubSubTopic The pubsub topic to pass to the query. Defaults * @param options.pubSubTopic The pubsub topic to pass to the query. Defaults
* to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/). * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/).
* @param options.callback Callback called on page of stored messages as they are retrieved * @param options.callback Callback called on page of stored messages as they are retrieved
@ -73,6 +77,14 @@ export class WakuStore {
contentTopics: string[], contentTopics: string[],
options?: QueryOptions options?: QueryOptions
): Promise<WakuMessage[]> { ): Promise<WakuMessage[]> {
let startTime, endTime;
if (options?.startTime) {
startTime = options.startTime.getTime() / 1000;
}
if (options?.endTime) {
endTime = options.endTime.getTime() / 1000;
}
const opts = Object.assign( const opts = Object.assign(
{ {
pubSubTopic: this.pubSubTopic, pubSubTopic: this.pubSubTopic,
@ -80,6 +92,10 @@ export class WakuStore {
pageSize: 10, pageSize: 10,
}, },
options, options,
{
startTime,
endTime,
},
{ contentTopics } { contentTopics }
); );
dbg('Querying history with the following options', options); dbg('Querying history with the following options', options);