From 243b6629c3993c449bd8cdd139e7969133540a26 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 17 May 2021 16:11:01 +1000 Subject: [PATCH] Add callback option to store query --- CHANGELOG.md | 1 + src/lib/waku_store/history_rpc.ts | 23 +++++++++++++++++++++-- src/lib/waku_store/index.spec.ts | 3 +++ src/lib/waku_store/index.ts | 28 ++++++++++++++++++++-------- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e022ff32e6..fdd6b4b1fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Testing: Upgrade nim-waku node to v0.3. +- **Breaking**: Modify `WakuStore.queryHistory()` to accept one `Object` instead of multiple individual arguments. ## [0.3.0] - 2021-05-15 diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index a8b128c81d..27388faf90 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -3,10 +3,17 @@ import { v4 as uuid } from 'uuid'; import * as proto from '../../proto/waku/v2/store'; +export enum Direction { + BACKWARD = 'backward', + FORWARD = 'forward', +} + export interface Options { contentTopics: string[]; cursor?: proto.Index; pubsubTopic: string; + direction: Direction; + pageSize: number; } export class HistoryRPC { @@ -16,10 +23,11 @@ export class HistoryRPC { * Create History Query. */ static createQuery(options: Options): HistoryRPC { + const direction = directionToProto(options.direction); const pagingInfo = { - pageSize: 10, + pageSize: options.pageSize, cursor: options.cursor, - direction: proto.PagingInfo_Direction.DIRECTION_FORWARD, + direction, }; const contentFilters = options.contentTopics.map((contentTopic) => { @@ -56,3 +64,14 @@ export class HistoryRPC { return this.proto.response; } } + +function directionToProto(direction: Direction): proto.PagingInfo_Direction { + switch (direction) { + case Direction.BACKWARD: + return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED; + case Direction.FORWARD: + return proto.PagingInfo_Direction.DIRECTION_FORWARD; + default: + return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED; + } +} diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index bf26ce84ab..81eb4acbe6 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -5,6 +5,8 @@ import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { Waku } from '../waku'; import { DefaultContentTopic, WakuMessage } from '../waku_message'; +import { Direction } from './history_rpc'; + describe('Waku Store', () => { let waku: Waku; let nimWaku: NimWaku; @@ -79,6 +81,7 @@ describe('Waku Store', () => { const messages = await waku.store.queryHistory({ peerId: nimPeerId, contentTopics: [DefaultContentTopic], + direction: Direction.FORWARD, }); expect(messages?.length).eq(15); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 24b69ba3fc..abff009dbf 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -7,7 +7,7 @@ import PeerId from 'peer-id'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; -import { HistoryRPC } from './history_rpc'; +import { Direction, HistoryRPC } from './history_rpc'; export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; @@ -15,6 +15,9 @@ export interface Options { peerId: PeerId; contentTopics: string[]; pubsubTopic?: string; + direction?: Direction; + pageSize?: number; + callback?: (messages: WakuMessage[]) => void; } /** @@ -32,12 +35,15 @@ export class WakuStore { * retrieve all messages. * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes * use the same pubsub topic. This is reserved for future applications. + * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ async queryHistory(options: Options): Promise { const opts = Object.assign( { pubsubTopic: DefaultPubsubTopic, + direction: Direction.BACKWARD, + pageSize: 10, }, options ); @@ -55,11 +61,8 @@ export class WakuStore { try { const { stream } = await connection.newStream(StoreCodec); try { - const historyRpcQuery = HistoryRPC.createQuery({ - contentTopics: opts.contentTopics, - cursor, - pubsubTopic: opts.pubsubTopic, - }); + const queryOpts = Object.assign(opts, { cursor }); + const historyRpcQuery = HistoryRPC.createQuery(queryOpts); const res = await pipe( [historyRpcQuery.encode()], lp.encode(), @@ -82,8 +85,17 @@ export class WakuStore { return messages; } - response.messages.map((protoMsg) => { - messages.push(new WakuMessage(protoMsg)); + const pageMessages = response.messages.map((protoMsg) => { + return new WakuMessage(protoMsg); + }); + + if (opts.callback) { + // TODO: Test the callback feature + opts.callback(pageMessages); + } + + pageMessages.forEach((wakuMessage) => { + messages.push(wakuMessage); }); const responsePageSize = response.pagingInfo?.pageSize;