From 59df4374906b9fecc212e38035488da962eb0817 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 13 Apr 2021 11:47:15 +1000 Subject: [PATCH 1/5] test: History messages through several pages are retrieved --- src/lib/waku_store/index.spec.ts | 64 +++++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 686d199306..a6e1ddaba5 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -33,19 +33,6 @@ describe('Waku Store', () => { await new Promise((resolve) => waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve) ); - - await waku0.relay.publish( - WakuMessage.fromUtf8String('A message from relay.') - ); - - await nimWaku.sendMessage( - WakuMessage.fromUtf8String('Another message from json rpc.') - ); - - waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); - await waku.dial(await nimWaku.getMultiaddrWithId()); - - await delay(500); }); afterEach(async function () { @@ -54,6 +41,17 @@ describe('Waku Store', () => { }); it('Retrieves history', async function () { + this.timeout(5_000); + + for (let i = 0; i < 2; i++) { + await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); + } + + waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + await delay(500); + const nimPeerId = await nimWaku.getPeerId(); const response = await waku.store.queryHistory(nimPeerId); @@ -65,8 +63,46 @@ describe('Waku Store', () => { return WakuMessage.fromProto(protoMsg); }) .findIndex((msg) => { - return msg.utf8Payload() === 'A message from relay.'; + return msg.utf8Payload() === 'Message 0'; }); expect(result).to.not.eq(-1); }); + + it('Retrieves all history element through paging', async function () { + this.timeout(5_000); + + for (let i = 0; i < 20; i++) { + await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); + } + + waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + await delay(500); + + const nimPeerId = await nimWaku.getPeerId(); + + const response = await waku.store.queryHistory(nimPeerId); + const messages = response?.messages; + + expect(messages?.length).eq(20); + expect( + messages + ?.map((protoMsg) => { + return WakuMessage.fromProto(protoMsg); + }) + .findIndex((msg) => { + return msg.utf8Payload() === 'Message 0'; + }) + ).to.not.eq(-1); + expect( + messages + ?.map((protoMsg) => { + return WakuMessage.fromProto(protoMsg); + }) + .findIndex((msg) => { + return msg.utf8Payload() === 'Message 19'; + }) + ).to.not.eq(-1); + }); }); From 1e10eeb5f592c677acd66faf8ec04961fee723ae Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 13 Apr 2021 12:51:04 +1000 Subject: [PATCH 2/5] Change API to directly return array of Waku Messages --- src/chat/index.ts | 11 +++++----- src/lib/waku_store/index.spec.ts | 36 ++++++++++---------------------- src/lib/waku_store/index.ts | 17 +++++++++++++-- 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/chat/index.ts b/src/chat/index.ts index d50dfa2f40..b2aef93765 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -72,11 +72,12 @@ const ChatContentTopic = 'dingpu'; console.log( `Retrieving archived messages from ${storePeerId.toB58String()}` ); - const msg = await waku.store.queryHistory(storePeerId, [ChatContentTopic]); - msg?.messages.map((msg) => { - const wakuMsg = WakuMessage.fromProto(msg); - if (wakuMsg.payload) { - const chatMsg = ChatMessage.decode(wakuMsg.payload); + const messages = await waku.store.queryHistory(storePeerId, [ + ChatContentTopic, + ]); + messages?.map((msg) => { + if (msg.payload) { + const chatMsg = ChatMessage.decode(msg.payload); printMessage(chatMsg); } }); diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index a6e1ddaba5..2fe5d8c09b 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -54,17 +54,12 @@ describe('Waku Store', () => { const nimPeerId = await nimWaku.getPeerId(); - const response = await waku.store.queryHistory(nimPeerId); - const messages = response?.messages; + const messages = await waku.store.queryHistory(nimPeerId); expect(messages?.length).eq(2); - const result = messages - ?.map((protoMsg) => { - return WakuMessage.fromProto(protoMsg); - }) - .findIndex((msg) => { - return msg.utf8Payload() === 'Message 0'; - }); + const result = messages?.findIndex((msg) => { + return msg.utf8Payload() === 'Message 0'; + }); expect(result).to.not.eq(-1); }); @@ -82,27 +77,18 @@ describe('Waku Store', () => { const nimPeerId = await nimWaku.getPeerId(); - const response = await waku.store.queryHistory(nimPeerId); - const messages = response?.messages; + const messages = await waku.store.queryHistory(nimPeerId); expect(messages?.length).eq(20); expect( - messages - ?.map((protoMsg) => { - return WakuMessage.fromProto(protoMsg); - }) - .findIndex((msg) => { - return msg.utf8Payload() === 'Message 0'; - }) + messages?.findIndex((msg) => { + return msg.utf8Payload() === 'Message 0'; + }) ).to.not.eq(-1); expect( - messages - ?.map((protoMsg) => { - return WakuMessage.fromProto(protoMsg); - }) - .findIndex((msg) => { - return msg.utf8Payload() === 'Message 19'; - }) + messages?.findIndex((msg) => { + return msg.utf8Payload() === 'Message 19'; + }) ).to.not.eq(-1); }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index ce4341a758..a2dae137f6 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -4,6 +4,8 @@ import pipe from 'it-pipe'; import Libp2p from 'libp2p'; import PeerId from 'peer-id'; +import { WakuMessage } from '../waku_message'; + import { HistoryRPC } from './history_rpc'; export const StoreCodec = '/vac/waku/store/2.0.0-beta1'; @@ -17,7 +19,10 @@ export class WakuStore { * @param topics * @throws if not able to reach peer */ - async queryHistory(peerId: PeerId, topics?: string[]) { + async queryHistory( + peerId: PeerId, + topics?: string[] + ): Promise { const peer = this.libp2p.peerStore.get(peerId); if (!peer) throw 'Peer is unknown'; if (!peer.protocols.includes(StoreCodec)) @@ -40,7 +45,15 @@ export class WakuStore { const buf = res.slice(); try { const reply = HistoryRPC.decode(buf); - return reply.response; + + if (!reply.response) { + console.log('No response in HistoryRPC'); + return null; + } + + return reply.response.messages.map((protoMsg) => { + return WakuMessage.fromProto(protoMsg); + }); } catch (err) { console.log('Failed to decode store reply', err); } From 433f0432b3e8b5f6fea58c34a8333fd61a4f5fa1 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 13 Apr 2021 14:48:05 +1000 Subject: [PATCH 3/5] Use pagination to retrieve all results from store --- src/lib/waku_store/history_rpc.ts | 11 +++- src/lib/waku_store/index.spec.ts | 6 +-- src/lib/waku_store/index.ts | 85 ++++++++++++++++++++----------- 3 files changed, 68 insertions(+), 34 deletions(-) diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index bee9c84343..5985434328 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -7,10 +7,13 @@ import { DEFAULT_CONTENT_TOPIC } from '../waku_message'; export class HistoryRPC { public constructor(public proto: proto.HistoryRPC) {} - static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC { + static createQuery( + topics: string[] = [DEFAULT_CONTENT_TOPIC], + cursor?: proto.Index + ): HistoryRPC { const pagingInfo = { pageSize: 10, - cursor: undefined, + cursor, direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED, }; return new HistoryRPC({ @@ -29,6 +32,10 @@ export class HistoryRPC { return proto.HistoryRPC.encode(this.proto).finish(); } + get query(): proto.HistoryQuery | undefined { + return this.proto.query; + } + get response(): proto.HistoryResponse | undefined { return this.proto.response; } diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 2fe5d8c09b..5319f48226 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -66,7 +66,7 @@ describe('Waku Store', () => { it('Retrieves all history element through paging', async function () { this.timeout(5_000); - for (let i = 0; i < 20; i++) { + for (let i = 0; i < 15; i++) { await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`)); } @@ -79,7 +79,7 @@ describe('Waku Store', () => { const messages = await waku.store.queryHistory(nimPeerId); - expect(messages?.length).eq(20); + expect(messages?.length).eq(15); expect( messages?.findIndex((msg) => { return msg.utf8Payload() === 'Message 0'; @@ -87,7 +87,7 @@ describe('Waku Store', () => { ).to.not.eq(-1); expect( messages?.findIndex((msg) => { - return msg.utf8Payload() === 'Message 19'; + return msg.utf8Payload() === 'Message 14'; }) ).to.not.eq(-1); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index a2dae137f6..70bd4268e7 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -30,42 +30,69 @@ export class WakuStore { const connection = this.libp2p.connectionManager.get(peer.id); if (!connection) throw 'Failed to get a connection to the peer'; - try { - const { stream } = await connection.newStream(StoreCodec); - - const historyRpc = HistoryRPC.query(topics).encode(); + const messages: WakuMessage[] = []; + let cursor = undefined; + do { try { - const res = await pipe( - [historyRpc], - lp.encode(), - stream, - lp.decode(), - concat - ); - const buf = res.slice(); + const { stream } = await connection.newStream(StoreCodec); try { - const reply = HistoryRPC.decode(buf); + const historyRpcQuery = HistoryRPC.createQuery(topics, cursor); + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + concat + ); + try { + const reply = HistoryRPC.decode(res.slice()); - if (!reply.response) { - console.log('No response in HistoryRPC'); - return null; + const response = reply.response; + if (!response) { + console.log('No response in HistoryRPC'); + return null; + } + + if (!response.messages || !response.messages.length) { + // No messages left (or stored) + return messages; + } + + response.messages.map((protoMsg) => { + messages.push(WakuMessage.fromProto(protoMsg)); + }); + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + // Response page size smaller than query, meaning this is the last page + return messages; + } + + cursor = response.pagingInfo?.cursor; + if (cursor === undefined) { + // If the server does not return cursor then there is an issue, + // Need to abort or we end up in an infinite loop + console.log('No cursor returned by peer.'); + return messages; + } + } catch (err) { + console.log('Failed to decode store reply', err); } - - return reply.response.messages.map((protoMsg) => { - return WakuMessage.fromProto(protoMsg); - }); } catch (err) { - console.log('Failed to decode store reply', err); + console.log('Failed to send waku store query', err); } } catch (err) { - console.log('Failed to send waku store query', err); + console.log( + 'Failed to negotiate waku store protocol stream with peer', + err + ); } - } catch (err) { - console.log( - 'Failed to negotiate waku store protocol stream with peer', - err - ); - } - return null; + // eslint-disable-next-line no-constant-condition + } while (true); } } From 0e9b0b1b7472b45850e614168fe17c232809aa42 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 13 Apr 2021 14:58:21 +1000 Subject: [PATCH 4/5] Ensure store waku calls returns messages in chronological order Oldest at the start of the list. --- src/lib/waku_store/history_rpc.ts | 2 +- src/lib/waku_store/index.spec.ts | 19 ++++++++----------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts index 5985434328..ff4a8ede4f 100644 --- a/src/lib/waku_store/history_rpc.ts +++ b/src/lib/waku_store/history_rpc.ts @@ -14,7 +14,7 @@ export class HistoryRPC { const pagingInfo = { pageSize: 10, cursor, - direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED, + direction: proto.Direction.DIRECTION_FORWARD, }; return new HistoryRPC({ requestId: uuid(), diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 5319f48226..ccc21a89ac 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -63,7 +63,7 @@ describe('Waku Store', () => { expect(result).to.not.eq(-1); }); - it('Retrieves all history element through paging', async function () { + it('Retrieves all historical elements in chronological order through paging', async function () { this.timeout(5_000); for (let i = 0; i < 15; i++) { @@ -80,15 +80,12 @@ describe('Waku Store', () => { const messages = await waku.store.queryHistory(nimPeerId); expect(messages?.length).eq(15); - expect( - messages?.findIndex((msg) => { - return msg.utf8Payload() === 'Message 0'; - }) - ).to.not.eq(-1); - expect( - messages?.findIndex((msg) => { - return msg.utf8Payload() === 'Message 14'; - }) - ).to.not.eq(-1); + for (let index = 0; index < 2; index++) { + expect( + messages?.findIndex((msg) => { + return msg.utf8Payload() === `Message ${index}`; + }) + ).to.eq(index); + } }); }); From ee8db698b3ac1088cf7a5f8e71899e6d6b821de0 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 13 Apr 2021 15:00:47 +1000 Subject: [PATCH 5/5] Allow constant condition for loops --- .eslintrc.json | 1 + src/lib/waku_store/index.ts | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index 67d7fd21d7..27db61f841 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -25,6 +25,7 @@ "error", { "newlines-between": "always", "alphabetize": { "order": "asc" } } ], + "no-constant-condition": ["error", { "checkLoops": false }], "sort-imports": [ "error", { "ignoreDeclarationSort": true, "ignoreCase": true } diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 70bd4268e7..b18cf6b387 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -32,7 +32,7 @@ export class WakuStore { const messages: WakuMessage[] = []; let cursor = undefined; - do { + while (true) { try { const { stream } = await connection.newStream(StoreCodec); try { @@ -92,7 +92,6 @@ export class WakuStore { err ); } - // eslint-disable-next-line no-constant-condition - } while (true); + } } }