Use pagination to retrieve all results from store

This commit is contained in:
Franck Royer 2021-04-13 14:48:05 +10:00
parent 1e10eeb5f5
commit 433f0432b3
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 68 additions and 34 deletions

View File

@ -7,10 +7,13 @@ import { DEFAULT_CONTENT_TOPIC } from '../waku_message';
export class HistoryRPC {
public constructor(public proto: proto.HistoryRPC) {}
static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC {
static createQuery(
topics: string[] = [DEFAULT_CONTENT_TOPIC],
cursor?: proto.Index
): HistoryRPC {
const pagingInfo = {
pageSize: 10,
cursor: undefined,
cursor,
direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED,
};
return new HistoryRPC({
@ -29,6 +32,10 @@ export class HistoryRPC {
return proto.HistoryRPC.encode(this.proto).finish();
}
get query(): proto.HistoryQuery | undefined {
return this.proto.query;
}
get response(): proto.HistoryResponse | undefined {
return this.proto.response;
}

View File

@ -66,7 +66,7 @@ describe('Waku Store', () => {
it('Retrieves all history element through paging', async function () {
this.timeout(5_000);
for (let i = 0; i < 20; i++) {
for (let i = 0; i < 15; i++) {
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
}
@ -79,7 +79,7 @@ describe('Waku Store', () => {
const messages = await waku.store.queryHistory(nimPeerId);
expect(messages?.length).eq(20);
expect(messages?.length).eq(15);
expect(
messages?.findIndex((msg) => {
return msg.utf8Payload() === 'Message 0';
@ -87,7 +87,7 @@ describe('Waku Store', () => {
).to.not.eq(-1);
expect(
messages?.findIndex((msg) => {
return msg.utf8Payload() === 'Message 19';
return msg.utf8Payload() === 'Message 14';
})
).to.not.eq(-1);
});

View File

@ -30,30 +30,56 @@ export class WakuStore {
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) throw 'Failed to get a connection to the peer';
const messages: WakuMessage[] = [];
let cursor = undefined;
do {
try {
const { stream } = await connection.newStream(StoreCodec);
const historyRpc = HistoryRPC.query(topics).encode();
try {
const historyRpcQuery = HistoryRPC.createQuery(topics, cursor);
const res = await pipe(
[historyRpc],
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
concat
);
const buf = res.slice();
try {
const reply = HistoryRPC.decode(buf);
const reply = HistoryRPC.decode(res.slice());
if (!reply.response) {
const response = reply.response;
if (!response) {
console.log('No response in HistoryRPC');
return null;
}
return reply.response.messages.map((protoMsg) => {
return WakuMessage.fromProto(protoMsg);
if (!response.messages || !response.messages.length) {
// No messages left (or stored)
return messages;
}
response.messages.map((protoMsg) => {
messages.push(WakuMessage.fromProto(protoMsg));
});
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
// Response page size smaller than query, meaning this is the last page
return messages;
}
cursor = response.pagingInfo?.cursor;
if (cursor === undefined) {
// If the server does not return cursor then there is an issue,
// Need to abort or we end up in an infinite loop
console.log('No cursor returned by peer.');
return messages;
}
} catch (err) {
console.log('Failed to decode store reply', err);
}
@ -66,6 +92,7 @@ export class WakuStore {
err
);
}
return null;
// eslint-disable-next-line no-constant-condition
} while (true);
}
}