diff --git a/CHANGELOG.md b/CHANGELOG.md index 49e1e6a5a1..fdd6b4b1fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- `callback` argument to `WakuStore.queryHistory()`, called as messages are retrieved + ; Messages are retrieved using pagination, and it may take some time to retrieve all messages, + with the `callback` function, messages are processed as soon as they are received. + ### Changed - Testing: Upgrade nim-waku node to v0.3. +- **Breaking**: Modify `WakuStore.queryHistory()` to accept one `Object` instead of multiple individual arguments. ## [0.3.0] - 2021-05-15 diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index 4bc9a53585..5fbba8ec18 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -76,9 +76,10 @@ export default async function startChat(): Promise { console.log( `Retrieving archived messages from ${peerId.toB58String()}` ); - const messages = await waku.store.queryHistory(peerId, [ - ChatContentTopic, - ]); + const messages = await waku.store.queryHistory({ + peerId, + contentTopics: [ChatContentTopic], + }); messages?.map((msg) => { if (msg.payload) { const chatMsg = ChatMessage.decode(msg.payload); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index eac45a4af3..ff697af13f 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -45,6 +45,29 @@ const themes = { export const ChatContentTopic = 'dingpu'; +async function retrieveStoreMessages( + waku: Waku, + peerId: PeerId, + setArchivedMessages: (value: ChatMessage[]) => void +): Promise { + const callback = (wakuMessages: WakuMessage[]): void => { + const messages = wakuMessages + .map((wakuMsg) => wakuMsg.payload) + .filter((payload) => !!payload) + .map((payload) => ChatMessage.decode(payload as Uint8Array)); + setArchivedMessages(messages); + }; + + const res = await waku.store.queryHistory({ + peerId, + contentTopics: [ChatContentTopic], + pageSize: 5, + callback, + }); + + return res ? res.length : 0; +} + export default function App() { let [newMessages, setNewMessages] = useState([]); let [archivedMessages, setArchivedMessages] = useState([]); @@ -61,6 +84,7 @@ export default function App() { } }; + // TODO: Split this const handleProtocolChange = async ( waku: Waku, { peerId, protocols }: { peerId: PeerId; protocols: string[] } @@ -68,17 +92,12 @@ export default function App() { if (protocols.includes(StoreCodec)) { console.log(`${peerId.toB58String()}: retrieving archived messages}`); try { - const response = await waku.store.queryHistory(peerId, [ - ChatContentTopic, - ]); - console.log(`${peerId.toB58String()}: messages retrieved:`, response); - if (response) { - const messages = response - .map((wakuMsg) => wakuMsg.payload) - .filter((payload) => !!payload) - .map((payload) => ChatMessage.decode(payload as Uint8Array)); - setArchivedMessages(messages); - } + const length = await retrieveStoreMessages( + waku, + peerId, + setArchivedMessages + ); + console.log(`${peerId.toB58String()}: messages retrieved:`, length); } catch (e) { console.log( `${peerId.toB58String()}: error encountered when retrieving archived messages`, diff --git a/examples/web-chat/src/ChatList.tsx b/examples/web-chat/src/ChatList.tsx index 9957d9df55..00273671e7 100644 --- a/examples/web-chat/src/ChatList.tsx +++ b/examples/web-chat/src/ChatList.tsx @@ -14,10 +14,11 @@ interface Props { export default function ChatList(props: Props) { const [messages, setMessages] = useState([]); + const [groupedMessages, setGroupedMessages] = useState([]); let updatedMessages; if (IsThereNewMessages(props.newMessages, messages)) { - updatedMessages = messages.slice().concat(props.newMessages); + updatedMessages = messages.concat(props.newMessages); if (IsThereNewMessages(props.archivedMessages, updatedMessages)) { updatedMessages = copyMergeUniqueReplace( props.archivedMessages, @@ -34,32 +35,31 @@ export default function ChatList(props: Props) { } if (updatedMessages) { + setGroupedMessages(groupMessagesBySender(updatedMessages)); setMessages(updatedMessages); } - const messagesGroupedBySender = groupMessagesBySender(messages).map( - (currentMessageGroup) => ( - - {currentMessageGroup.map((currentMessage) => ( - - {currentMessage.payloadAsUtf8} - - ))} - - ) - ); + const renderedGroupedMessages = groupedMessages.map((currentMessageGroup) => ( + + {currentMessageGroup.map((currentMessage) => ( + + {currentMessage.payloadAsUtf8} + + ))} + + )); return ( - {messagesGroupedBySender} + {renderedGroupedMessages} ); diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index 41d057ce8b..27388faf90 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -2,31 +2,42 @@ import { Reader } from 'protobufjs/minimal'; import { v4 as uuid } from 'uuid'; import * as proto from '../../proto/waku/v2/store'; -import { DefaultContentTopic } from '../waku_message'; -import { DefaultPubsubTopic } from '../waku_relay'; + +export enum Direction { + BACKWARD = 'backward', + FORWARD = 'forward', +} + +export interface Options { + contentTopics: string[]; + cursor?: proto.Index; + pubsubTopic: string; + direction: Direction; + pageSize: number; +} export class HistoryRPC { public constructor(public proto: proto.HistoryRPC) {} - static createQuery( - contentTopics: string[] = [DefaultContentTopic], - cursor?: proto.Index, - pubsubTopic: string = DefaultPubsubTopic - ): HistoryRPC { + /** + * Create History Query. + */ + static createQuery(options: Options): HistoryRPC { + const direction = directionToProto(options.direction); const pagingInfo = { - pageSize: 10, - cursor, - direction: proto.PagingInfo_Direction.DIRECTION_FORWARD, + pageSize: options.pageSize, + cursor: options.cursor, + direction, }; - const contentFilters = contentTopics.map((contentTopic) => { + const contentFilters = options.contentTopics.map((contentTopic) => { return { contentTopic }; }); return new HistoryRPC({ requestId: uuid(), query: { - pubsubTopic, + pubsubTopic: options.pubsubTopic, contentFilters, pagingInfo, startTime: undefined, @@ -53,3 +64,14 @@ export class HistoryRPC { return this.proto.response; } } + +function directionToProto(direction: Direction): proto.PagingInfo_Direction { + switch (direction) { + case Direction.BACKWARD: + return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED; + case Direction.FORWARD: + return proto.PagingInfo_Direction.DIRECTION_FORWARD; + default: + return proto.PagingInfo_Direction.DIRECTION_BACKWARD_UNSPECIFIED; + } +} diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 2f2d3f17f2..81eb4acbe6 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -3,7 +3,9 @@ import TCP from 'libp2p-tcp'; import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; import { Waku } from '../waku'; -import { WakuMessage } from '../waku_message'; +import { DefaultContentTopic, WakuMessage } from '../waku_message'; + +import { Direction } from './history_rpc'; describe('Waku Store', () => { let waku: Waku; @@ -39,7 +41,10 @@ describe('Waku Store', () => { const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory(nimPeerId); + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [], + }); expect(messages?.length).eq(2); const result = messages?.findIndex((msg) => { @@ -73,7 +78,11 @@ describe('Waku Store', () => { const nimPeerId = await nimWaku.getPeerId(); - const messages = await waku.store.queryHistory(nimPeerId); + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [DefaultContentTopic], + direction: Direction.FORWARD, + }); expect(messages?.length).eq(15); for (let index = 0; index < 2; index++) { diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 329265b04e..abff009dbf 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -5,11 +5,21 @@ import Libp2p from 'libp2p'; import PeerId from 'peer-id'; import { WakuMessage } from '../waku_message'; +import { DefaultPubsubTopic } from '../waku_relay'; -import { HistoryRPC } from './history_rpc'; +import { Direction, HistoryRPC } from './history_rpc'; export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; +export interface Options { + peerId: PeerId; + contentTopics: string[]; + pubsubTopic?: string; + direction?: Direction; + pageSize?: number; + callback?: (messages: WakuMessage[]) => void; +} + /** * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). */ @@ -19,19 +29,26 @@ export class WakuStore { /** * Query given peer using Waku Store. * - * @param peerId The peer to query. - * @param contentTopics The content topics to retrieve, leave empty to + * @param options + * @param options.peerId The peer to query. + * @param options.contentTopics The content topics to retrieve, leave empty to * retrieve all messages. - * @param pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes + * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes * use the same pubsub topic. This is reserved for future applications. + * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ - async queryHistory( - peerId: PeerId, - contentTopics?: string[], - pubsubTopic?: string - ): Promise { - const peer = this.libp2p.peerStore.get(peerId); + async queryHistory(options: Options): Promise { + const opts = Object.assign( + { + pubsubTopic: DefaultPubsubTopic, + direction: Direction.BACKWARD, + pageSize: 10, + }, + options + ); + + const peer = this.libp2p.peerStore.get(opts.peerId); if (!peer) throw 'Peer is unknown'; if (!peer.protocols.includes(StoreCodec)) throw 'Peer does not register waku store protocol'; @@ -44,11 +61,8 @@ export class WakuStore { try { const { stream } = await connection.newStream(StoreCodec); try { - const historyRpcQuery = HistoryRPC.createQuery( - contentTopics, - cursor, - pubsubTopic - ); + const queryOpts = Object.assign(opts, { cursor }); + const historyRpcQuery = HistoryRPC.createQuery(queryOpts); const res = await pipe( [historyRpcQuery.encode()], lp.encode(), @@ -71,8 +85,17 @@ export class WakuStore { return messages; } - response.messages.map((protoMsg) => { - messages.push(new WakuMessage(protoMsg)); + const pageMessages = response.messages.map((protoMsg) => { + return new WakuMessage(protoMsg); + }); + + if (opts.callback) { + // TODO: Test the callback feature + opts.callback(pageMessages); + } + + pageMessages.forEach((wakuMessage) => { + messages.push(wakuMessage); }); const responsePageSize = response.pagingInfo?.pageSize;