Add callback option to store query

This commit is contained in:
Franck Royer 2021-05-17 16:11:01 +10:00
parent 4b6fe84392
commit 243b6629c3
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
4 changed files with 45 additions and 10 deletions

View File

@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Testing: Upgrade nim-waku node to v0.3. - Testing: Upgrade nim-waku node to v0.3.
- **Breaking**: Modify `WakuStore.queryHistory()` to accept one `Object` instead of multiple individual arguments.
## [0.3.0] - 2021-05-15 ## [0.3.0] - 2021-05-15

View File

@ -3,10 +3,17 @@ import { v4 as uuid } from 'uuid';
import * as proto from '../../proto/waku/v2/store'; import * as proto from '../../proto/waku/v2/store';
export enum Direction {
BACKWARD = 'backward',
FORWARD = 'forward',
}
export interface Options { export interface Options {
contentTopics: string[]; contentTopics: string[];
cursor?: proto.Index; cursor?: proto.Index;
pubsubTopic: string; pubsubTopic: string;
direction: Direction;
pageSize: number;
} }
export class HistoryRPC { export class HistoryRPC {
@ -16,10 +23,11 @@ export class HistoryRPC {
* Create History Query. * Create History Query.
*/ */
static createQuery(options: Options): HistoryRPC { static createQuery(options: Options): HistoryRPC {
const direction = directionToProto(options.direction);
const pagingInfo = { const pagingInfo = {
pageSize: 10, pageSize: options.pageSize,
cursor: options.cursor, cursor: options.cursor,
direction: proto.PagingInfo_Direction.DIRECTION_FORWARD, direction,
}; };
const contentFilters = options.contentTopics.map((contentTopic) => { const contentFilters = options.contentTopics.map((contentTopic) => {
@ -56,3 +64,14 @@ export class HistoryRPC {
return this.proto.response; return this.proto.response;
} }
} }
function directionToProto(direction: Direction): proto.PagingInfo_Direction {
switch (direction) {
case Direction.BACKWARD:
return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED;
case Direction.FORWARD:
return proto.PagingInfo_Direction.DIRECTION_FORWARD;
default:
return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED;
}
}

View File

@ -5,6 +5,8 @@ import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
import { Waku } from '../waku'; import { Waku } from '../waku';
import { DefaultContentTopic, WakuMessage } from '../waku_message'; import { DefaultContentTopic, WakuMessage } from '../waku_message';
import { Direction } from './history_rpc';
describe('Waku Store', () => { describe('Waku Store', () => {
let waku: Waku; let waku: Waku;
let nimWaku: NimWaku; let nimWaku: NimWaku;
@ -79,6 +81,7 @@ describe('Waku Store', () => {
const messages = await waku.store.queryHistory({ const messages = await waku.store.queryHistory({
peerId: nimPeerId, peerId: nimPeerId,
contentTopics: [DefaultContentTopic], contentTopics: [DefaultContentTopic],
direction: Direction.FORWARD,
}); });
expect(messages?.length).eq(15); expect(messages?.length).eq(15);

View File

@ -7,7 +7,7 @@ import PeerId from 'peer-id';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay'; import { DefaultPubsubTopic } from '../waku_relay';
import { HistoryRPC } from './history_rpc'; import { Direction, HistoryRPC } from './history_rpc';
export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
@ -15,6 +15,9 @@ export interface Options {
peerId: PeerId; peerId: PeerId;
contentTopics: string[]; contentTopics: string[];
pubsubTopic?: string; pubsubTopic?: string;
direction?: Direction;
pageSize?: number;
callback?: (messages: WakuMessage[]) => void;
} }
/** /**
@ -32,12 +35,15 @@ export class WakuStore {
* retrieve all messages. * retrieve all messages.
* @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes
* use the same pubsub topic. This is reserved for future applications. * use the same pubsub topic. This is reserved for future applications.
* @param options.callback Callback called on page of stored messages as they are retrieved
* @throws If not able to reach the peer to query. * @throws If not able to reach the peer to query.
*/ */
async queryHistory(options: Options): Promise<WakuMessage[] | null> { async queryHistory(options: Options): Promise<WakuMessage[] | null> {
const opts = Object.assign( const opts = Object.assign(
{ {
pubsubTopic: DefaultPubsubTopic, pubsubTopic: DefaultPubsubTopic,
direction: Direction.BACKWARD,
pageSize: 10,
}, },
options options
); );
@ -55,11 +61,8 @@ export class WakuStore {
try { try {
const { stream } = await connection.newStream(StoreCodec); const { stream } = await connection.newStream(StoreCodec);
try { try {
const historyRpcQuery = HistoryRPC.createQuery({ const queryOpts = Object.assign(opts, { cursor });
contentTopics: opts.contentTopics, const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
cursor,
pubsubTopic: opts.pubsubTopic,
});
const res = await pipe( const res = await pipe(
[historyRpcQuery.encode()], [historyRpcQuery.encode()],
lp.encode(), lp.encode(),
@ -82,8 +85,17 @@ export class WakuStore {
return messages; return messages;
} }
response.messages.map((protoMsg) => { const pageMessages = response.messages.map((protoMsg) => {
messages.push(new WakuMessage(protoMsg)); return new WakuMessage(protoMsg);
});
if (opts.callback) {
// TODO: Test the callback feature
opts.callback(pageMessages);
}
pageMessages.forEach((wakuMessage) => {
messages.push(wakuMessage);
}); });
const responsePageSize = response.pagingInfo?.pageSize; const responsePageSize = response.pagingInfo?.pageSize;