30: Implement pagination for waku store r=D4nte a=D4nte

Resolve #29 

Co-authored-by: Franck Royer <franck@royer.one>
This commit is contained in:
bors[bot] 2021-04-13 05:05:16 +00:00 committed by GitHub
commit 09d89ebd78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 120 additions and 53 deletions

View File

@ -25,6 +25,7 @@
"error",
{ "newlines-between": "always", "alphabetize": { "order": "asc" } }
],
"no-constant-condition": ["error", { "checkLoops": false }],
"sort-imports": [
"error",
{ "ignoreDeclarationSort": true, "ignoreCase": true }

View File

@ -72,11 +72,12 @@ const ChatContentTopic = 'dingpu';
console.log(
`Retrieving archived messages from ${storePeerId.toB58String()}`
);
const msg = await waku.store.queryHistory(storePeerId, [ChatContentTopic]);
msg?.messages.map((msg) => {
const wakuMsg = WakuMessage.fromProto(msg);
if (wakuMsg.payload) {
const chatMsg = ChatMessage.decode(wakuMsg.payload);
const messages = await waku.store.queryHistory(storePeerId, [
ChatContentTopic,
]);
messages?.map((msg) => {
if (msg.payload) {
const chatMsg = ChatMessage.decode(msg.payload);
printMessage(chatMsg);
}
});

View File

@ -7,11 +7,14 @@ import { DEFAULT_CONTENT_TOPIC } from '../waku_message';
export class HistoryRPC {
public constructor(public proto: proto.HistoryRPC) {}
static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC {
static createQuery(
topics: string[] = [DEFAULT_CONTENT_TOPIC],
cursor?: proto.Index
): HistoryRPC {
const pagingInfo = {
pageSize: 10,
cursor: undefined,
direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED,
cursor,
direction: proto.Direction.DIRECTION_FORWARD,
};
return new HistoryRPC({
requestId: uuid(),
@ -29,6 +32,10 @@ export class HistoryRPC {
return proto.HistoryRPC.encode(this.proto).finish();
}
get query(): proto.HistoryQuery | undefined {
return this.proto.query;
}
get response(): proto.HistoryResponse | undefined {
return this.proto.response;
}

View File

@ -33,19 +33,6 @@ describe('Waku Store', () => {
await new Promise((resolve) =>
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
await waku0.relay.publish(
WakuMessage.fromUtf8String('A message from relay.')
);
await nimWaku.sendMessage(
WakuMessage.fromUtf8String('Another message from json rpc.')
);
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(500);
});
afterEach(async function () {
@ -54,19 +41,51 @@ describe('Waku Store', () => {
});
it('Retrieves history', async function () {
this.timeout(5_000);
for (let i = 0; i < 2; i++) {
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
}
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(500);
const nimPeerId = await nimWaku.getPeerId();
const response = await waku.store.queryHistory(nimPeerId);
const messages = response?.messages;
const messages = await waku.store.queryHistory(nimPeerId);
expect(messages?.length).eq(2);
const result = messages
?.map((protoMsg) => {
return WakuMessage.fromProto(protoMsg);
})
.findIndex((msg) => {
return msg.utf8Payload() === 'A message from relay.';
});
const result = messages?.findIndex((msg) => {
return msg.utf8Payload() === 'Message 0';
});
expect(result).to.not.eq(-1);
});
it('Retrieves all historical elements in chronological order through paging', async function () {
this.timeout(5_000);
for (let i = 0; i < 15; i++) {
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`));
}
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(500);
const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory(nimPeerId);
expect(messages?.length).eq(15);
for (let index = 0; index < 2; index++) {
expect(
messages?.findIndex((msg) => {
return msg.utf8Payload() === `Message ${index}`;
})
).to.eq(index);
}
});
});

View File

@ -4,6 +4,8 @@ import pipe from 'it-pipe';
import Libp2p from 'libp2p';
import PeerId from 'peer-id';
import { WakuMessage } from '../waku_message';
import { HistoryRPC } from './history_rpc';
export const StoreCodec = '/vac/waku/store/2.0.0-beta1';
@ -17,7 +19,10 @@ export class WakuStore {
* @param topics
* @throws if not able to reach peer
*/
async queryHistory(peerId: PeerId, topics?: string[]) {
async queryHistory(
peerId: PeerId,
topics?: string[]
): Promise<WakuMessage[] | null> {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'Peer is unknown';
if (!peer.protocols.includes(StoreCodec))
@ -25,34 +30,68 @@ export class WakuStore {
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) throw 'Failed to get a connection to the peer';
try {
const { stream } = await connection.newStream(StoreCodec);
const historyRpc = HistoryRPC.query(topics).encode();
const messages: WakuMessage[] = [];
let cursor = undefined;
while (true) {
try {
const res = await pipe(
[historyRpc],
lp.encode(),
stream,
lp.decode(),
concat
);
const buf = res.slice();
const { stream } = await connection.newStream(StoreCodec);
try {
const reply = HistoryRPC.decode(buf);
return reply.response;
const historyRpcQuery = HistoryRPC.createQuery(topics, cursor);
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
concat
);
try {
const reply = HistoryRPC.decode(res.slice());
const response = reply.response;
if (!response) {
console.log('No response in HistoryRPC');
return null;
}
if (!response.messages || !response.messages.length) {
// No messages left (or stored)
return messages;
}
response.messages.map((protoMsg) => {
messages.push(WakuMessage.fromProto(protoMsg));
});
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
// Response page size smaller than query, meaning this is the last page
return messages;
}
cursor = response.pagingInfo?.cursor;
if (cursor === undefined) {
// If the server does not return cursor then there is an issue,
// Need to abort or we end up in an infinite loop
console.log('No cursor returned by peer.');
return messages;
}
} catch (err) {
console.log('Failed to decode store reply', err);
}
} catch (err) {
console.log('Failed to decode store reply', err);
console.log('Failed to send waku store query', err);
}
} catch (err) {
console.log('Failed to send waku store query', err);
console.log(
'Failed to negotiate waku store protocol stream with peer',
err
);
}
} catch (err) {
console.log(
'Failed to negotiate waku store protocol stream with peer',
err
);
}
return null;
}
}