diff --git a/CHANGELOG.md b/CHANGELOG.md index 49e1e6a5a1..e022ff32e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `callback` argument to `WakuStore.queryHistory()`, called as messages are retrieved + ; Messages are retrieved using pagination, and it may take some time to retrieve all messages, + with the `callback` function, messages are processed as soon as they are received. + ### Changed - Testing: Upgrade nim-waku node to v0.3. diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index 4bc9a53585..5fbba8ec18 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -76,9 +76,10 @@ export default async function startChat(): Promise { console.log( `Retrieving archived messages from ${peerId.toB58String()}` ); - const messages = await waku.store.queryHistory(peerId, [ - ChatContentTopic, - ]); + const messages = await waku.store.queryHistory({ + peerId, + contentTopics: [ChatContentTopic], + }); messages?.map((msg) => { if (msg.payload) { const chatMsg = ChatMessage.decode(msg.payload); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index eac45a4af3..63c41f5b68 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -68,9 +68,10 @@ export default function App() { if (protocols.includes(StoreCodec)) { console.log(`${peerId.toB58String()}: retrieving archived messages}`); try { - const response = await waku.store.queryHistory(peerId, [ - ChatContentTopic, - ]); + const response = await waku.store.queryHistory({ + peerId, + contentTopics: [ChatContentTopic], + }); console.log(`${peerId.toB58String()}: messages retrieved:`, response); if (response) { const messages = response diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index 41d057ce8b..a8b128c81d 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -2,31 +2,34 @@ import { Reader } from 'protobufjs/minimal'; import { v4 as uuid } from 'uuid'; import * as proto from '../../proto/waku/v2/store'; -import { DefaultContentTopic } from '../waku_message'; -import { DefaultPubsubTopic } from '../waku_relay'; + +export interface Options { + contentTopics: string[]; + cursor?: proto.Index; + pubsubTopic: string; +} export class HistoryRPC { public constructor(public proto: proto.HistoryRPC) {} - static createQuery( - contentTopics: string[] = [DefaultContentTopic], - cursor?: proto.Index, - pubsubTopic: string = DefaultPubsubTopic - ): HistoryRPC { + /** + * Create History Query. + */ + static createQuery(options: Options): HistoryRPC { const pagingInfo = { pageSize: 10, - cursor, + cursor: options.cursor, direction: proto.PagingInfo_Direction.DIRECTION_FORWARD, }; - const contentFilters = contentTopics.map((contentTopic) => { + const contentFilters = options.contentTopics.map((contentTopic) => { return { contentTopic }; }); return new HistoryRPC({ requestId: uuid(), query: { - pubsubTopic, + pubsubTopic: options.pubsubTopic, contentFilters, pagingInfo, startTime: undefined, diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 2f2d3f17f2..bf26ce84ab 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -3,7 +3,7 @@ import TCP from 'libp2p-tcp'; import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { Waku } from '../waku'; -import { WakuMessage } from '../waku_message'; +import { DefaultContentTopic, WakuMessage } from '../waku_message'; describe('Waku Store', () => { let waku: Waku; @@ -39,7 +39,10 @@ describe('Waku Store', () => { const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory(nimPeerId); + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [], + }); expect(messages?.length).eq(2); const result = messages?.findIndex((msg) => { @@ -73,7 +76,10 @@ describe('Waku Store', () => { const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory(nimPeerId); + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [DefaultContentTopic], + }); expect(messages?.length).eq(15); for (let index = 0; index < 2; index++) { diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 329265b04e..24b69ba3fc 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -5,11 +5,18 @@ import Libp2p from 'libp2p'; import PeerId from 'peer-id'; import { WakuMessage } from '../waku_message'; +import { DefaultPubsubTopic } from '../waku_relay'; import { HistoryRPC } from './history_rpc'; export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; +export interface Options { + peerId: PeerId; + contentTopics: string[]; + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). */ @@ -19,19 +26,23 @@ export class WakuStore { /** * Query given peer using Waku Store. * - * @param peerId The peer to query. - * @param contentTopics The content topics to retrieve, leave empty to + * @param options + * @param options.peerId The peer to query. + * @param options.contentTopics The content topics to retrieve, leave empty to * retrieve all messages. - * @param pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes + * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes * use the same pubsub topic. This is reserved for future applications. * @throws If not able to reach the peer to query. */ - async queryHistory( - peerId: PeerId, - contentTopics?: string[], - pubsubTopic?: string - ): Promise { - const peer = this.libp2p.peerStore.get(peerId); + async queryHistory(options: Options): Promise { + const opts = Object.assign( + { + pubsubTopic: DefaultPubsubTopic, + }, + options + ); + + const peer = this.libp2p.peerStore.get(opts.peerId); if (!peer) throw 'Peer is unknown'; if (!peer.protocols.includes(StoreCodec)) throw 'Peer does not register waku store protocol'; @@ -44,11 +55,11 @@ export class WakuStore { try { const { stream } = await connection.newStream(StoreCodec); try { - const historyRpcQuery = HistoryRPC.createQuery( - contentTopics, + const historyRpcQuery = HistoryRPC.createQuery({ + contentTopics: opts.contentTopics, cursor, - pubsubTopic - ); + pubsubTopic: opts.pubsubTopic, + }); const res = await pipe( [historyRpcQuery.encode()], lp.encode(),