diff --git a/.eslintrc.json b/.eslintrc.json index 67d7fd21d7..27db61f841 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -25,6 +25,7 @@ "error", { "newlines-between": "always", "alphabetize": { "order": "asc" } } ], + "no-constant-condition": ["error", { "checkLoops": false }], "sort-imports": [ "error", { "ignoreDeclarationSort": true, "ignoreCase": true } diff --git a/src/chat/index.ts b/src/chat/index.ts index d50dfa2f40..b2aef93765 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -72,11 +72,12 @@ const ChatContentTopic = 'dingpu'; console.log( `Retrieving archived messages from ${storePeerId.toB58String()}` ); - const msg = await waku.store.queryHistory(storePeerId, [ChatContentTopic]); - msg?.messages.map((msg) => { - const wakuMsg = WakuMessage.fromProto(msg); - if (wakuMsg.payload) { - const chatMsg = ChatMessage.decode(wakuMsg.payload); + const messages = await waku.store.queryHistory(storePeerId, [ + ChatContentTopic, + ]); + messages?.map((msg) => { + if (msg.payload) { + const chatMsg = ChatMessage.decode(msg.payload); printMessage(chatMsg); } }); diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index bee9c84343..ff4a8ede4f 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -7,11 +7,14 @@ import { DEFAULT_CONTENT_TOPIC } from '../waku_message'; export class HistoryRPC { public constructor(public proto: proto.HistoryRPC) {} - static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC { + static createQuery( + topics: string[] = [DEFAULT_CONTENT_TOPIC], + cursor?: proto.Index + ): HistoryRPC { const pagingInfo = { pageSize: 10, - cursor: undefined, - direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED, + cursor, + direction: proto.Direction.DIRECTION_FORWARD, }; return new HistoryRPC({ requestId: uuid(), @@ -29,6 +32,10 @@ export class HistoryRPC { return proto.HistoryRPC.encode(this.proto).finish(); } + get query(): proto.HistoryQuery | undefined { + return this.proto.query; + } + get response(): proto.HistoryResponse | undefined { return this.proto.response; } diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 686d199306..ccc21a89ac 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -33,19 +33,6 @@ describe('Waku Store', () => { await new Promise((resolve) => waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve) ); - - await waku0.relay.publish( - WakuMessage.fromUtf8String('A message from relay.') - ); - - await nimWaku.sendMessage( - WakuMessage.fromUtf8String('Another message from json rpc.') - ); - - waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); - await waku.dial(await nimWaku.getMultiaddrWithId()); - - await delay(500); }); afterEach(async function () { @@ -54,19 +41,51 @@ describe('Waku Store', () => { }); it('Retrieves history', async function () { + this.timeout(5_000); + + for (let i = 0; i < 2; i++) { + await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); + } + + waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + await delay(500); + const nimPeerId = await nimWaku.getPeerId(); - const response = await waku.store.queryHistory(nimPeerId); - const messages = response?.messages; + const messages = await waku.store.queryHistory(nimPeerId); expect(messages?.length).eq(2); - const result = messages - ?.map((protoMsg) => { - return WakuMessage.fromProto(protoMsg); - }) - .findIndex((msg) => { - return msg.utf8Payload() === 'A message from relay.'; - }); + const result = messages?.findIndex((msg) => { + return msg.utf8Payload() === 'Message 0'; + }); expect(result).to.not.eq(-1); }); + + it('Retrieves all historical elements in chronological order through paging', async function () { + this.timeout(5_000); + + for (let i = 0; i < 15; i++) { + await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); + } + + waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + await delay(500); + + const nimPeerId = await nimWaku.getPeerId(); + + const messages = await waku.store.queryHistory(nimPeerId); + + expect(messages?.length).eq(15); + for (let index = 0; index < 2; index++) { + expect( + messages?.findIndex((msg) => { + return msg.utf8Payload() === `Message ${index}`; + }) + ).to.eq(index); + } + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index ce4341a758..b18cf6b387 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -4,6 +4,8 @@ import pipe from 'it-pipe'; import Libp2p from 'libp2p'; import PeerId from 'peer-id'; +import { WakuMessage } from '../waku_message'; + import { HistoryRPC } from './history_rpc'; export const StoreCodec = '/vac/waku/store/2.0.0-beta1'; @@ -17,7 +19,10 @@ export class WakuStore { * @param topics * @throws if not able to reach peer */ - async queryHistory(peerId: PeerId, topics?: string[]) { + async queryHistory( + peerId: PeerId, + topics?: string[] + ): Promise { const peer = this.libp2p.peerStore.get(peerId); if (!peer) throw 'Peer is unknown'; if (!peer.protocols.includes(StoreCodec)) @@ -25,34 +30,68 @@ export class WakuStore { const connection = this.libp2p.connectionManager.get(peer.id); if (!connection) throw 'Failed to get a connection to the peer'; - try { - const { stream } = await connection.newStream(StoreCodec); - - const historyRpc = HistoryRPC.query(topics).encode(); + const messages: WakuMessage[] = []; + let cursor = undefined; + while (true) { try { - const res = await pipe( - [historyRpc], - lp.encode(), - stream, - lp.decode(), - concat - ); - const buf = res.slice(); + const { stream } = await connection.newStream(StoreCodec); try { - const reply = HistoryRPC.decode(buf); - return reply.response; + const historyRpcQuery = HistoryRPC.createQuery(topics, cursor); + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + concat + ); + try { + const reply = HistoryRPC.decode(res.slice()); + + const response = reply.response; + if (!response) { + console.log('No response in HistoryRPC'); + return null; + } + + if (!response.messages || !response.messages.length) { + // No messages left (or stored) + return messages; + } + + response.messages.map((protoMsg) => { + messages.push(WakuMessage.fromProto(protoMsg)); + }); + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + // Response page size smaller than query, meaning this is the last page + return messages; + } + + cursor = response.pagingInfo?.cursor; + if (cursor === undefined) { + // If the server does not return cursor then there is an issue, + // Need to abort or we end up in an infinite loop + console.log('No cursor returned by peer.'); + return messages; + } + } catch (err) { + console.log('Failed to decode store reply', err); + } } catch (err) { - console.log('Failed to decode store reply', err); + console.log('Failed to send waku store query', err); } } catch (err) { - console.log('Failed to send waku store query', err); + console.log( + 'Failed to negotiate waku store protocol stream with peer', + err + ); } - } catch (err) { - console.log( - 'Failed to negotiate waku store protocol stream with peer', - err - ); } - return null; } }