diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index bee9c84343..5985434328 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -7,10 +7,13 @@ import { DEFAULT_CONTENT_TOPIC } from '../waku_message'; export class HistoryRPC { public constructor(public proto: proto.HistoryRPC) {} - static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC { + static createQuery( + topics: string[] = [DEFAULT_CONTENT_TOPIC], + cursor?: proto.Index + ): HistoryRPC { const pagingInfo = { pageSize: 10, - cursor: undefined, + cursor, direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED, }; return new HistoryRPC({ @@ -29,6 +32,10 @@ export class HistoryRPC { return proto.HistoryRPC.encode(this.proto).finish(); } + get query(): proto.HistoryQuery | undefined { + return this.proto.query; + } + get response(): proto.HistoryResponse | undefined { return this.proto.response; } diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 2fe5d8c09b..5319f48226 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -66,7 +66,7 @@ describe('Waku Store', () => { it('Retrieves all history element through paging', async function () { this.timeout(5_000); - for (let i = 0; i < 20; i++) { + for (let i = 0; i < 15; i++) { await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); } @@ -79,7 +79,7 @@ describe('Waku Store', () => { const messages = await waku.store.queryHistory(nimPeerId); - expect(messages?.length).eq(20); + expect(messages?.length).eq(15); expect( messages?.findIndex((msg) => { return msg.utf8Payload() === 'Message 0'; @@ -87,7 +87,7 @@ describe('Waku Store', () => { ).to.not.eq(-1); expect( messages?.findIndex((msg) => { - return msg.utf8Payload() === 'Message 19'; + return msg.utf8Payload() === 'Message 14'; }) ).to.not.eq(-1); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index a2dae137f6..70bd4268e7 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -30,42 +30,69 @@ export class WakuStore { const connection = this.libp2p.connectionManager.get(peer.id); if (!connection) throw 'Failed to get a connection to the peer'; - try { - const { stream } = await connection.newStream(StoreCodec); - - const historyRpc = HistoryRPC.query(topics).encode(); + const messages: WakuMessage[] = []; + let cursor = undefined; + do { try { - const res = await pipe( - [historyRpc], - lp.encode(), - stream, - lp.decode(), - concat - ); - const buf = res.slice(); + const { stream } = await connection.newStream(StoreCodec); try { - const reply = HistoryRPC.decode(buf); + const historyRpcQuery = HistoryRPC.createQuery(topics, cursor); + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + concat + ); + try { + const reply = HistoryRPC.decode(res.slice()); - if (!reply.response) { - console.log('No response in HistoryRPC'); - return null; + const response = reply.response; + if (!response) { + console.log('No response in HistoryRPC'); + return null; + } + + if (!response.messages || !response.messages.length) { + // No messages left (or stored) + return messages; + } + + response.messages.map((protoMsg) => { + messages.push(WakuMessage.fromProto(protoMsg)); + }); + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + // Response page size smaller than query, meaning this is the last page + return messages; + } + + cursor = response.pagingInfo?.cursor; + if (cursor === undefined) { + // If the server does not return cursor then there is an issue, + // Need to abort or we end up in an infinite loop + console.log('No cursor returned by peer.'); + return messages; + } + } catch (err) { + console.log('Failed to decode store reply', err); } - - return reply.response.messages.map((protoMsg) => { - return WakuMessage.fromProto(protoMsg); - }); } catch (err) { - console.log('Failed to decode store reply', err); + console.log('Failed to send waku store query', err); } } catch (err) { - console.log('Failed to send waku store query', err); + console.log( + 'Failed to negotiate waku store protocol stream with peer', + err + ); } - } catch (err) { - console.log( - 'Failed to negotiate waku store protocol stream with peer', - err - ); - } - return null; + // eslint-disable-next-line no-constant-condition + } while (true); } }