feat: store callback takes promises

This enables the consumer to decide between:

1. Waiting for all promises, less efficient but maintain order;
2. Process promises as they resolve, faster to get messages through but
disrupt message order.
This commit is contained in:
fryorcraken.eth 2022-09-12 11:35:24 +10:00
parent 930c7beaef
commit 65511a5888
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
2 changed files with 259 additions and 142 deletions

View File

@ -25,6 +25,12 @@ const log = debug("waku:test:store");
const TestContentTopic = "/test/1/waku-store/utf8";
const isWakuMessageDefined = (
msg: WakuMessage | undefined
): msg is WakuMessage => {
return !!msg;
};
describe("Waku Store", () => {
let waku: WakuFull;
let nwaku: Nwaku;
@ -57,7 +63,18 @@ describe("Waku Store", () => {
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages = await waku.store.queryHistory([]);
const messages: WakuMessage[] = [];
await waku.store.queryHistory([], async (msgPromises) => {
await Promise.all(
msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
messages.push(msg);
}
})
);
});
expect(messages?.length).eq(2);
const result = messages?.findIndex((msg) => {
@ -92,12 +109,16 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
let messages: WakuMessage[] = [];
await waku.store.queryHistory([], {
callback: (_msgs) => {
messages = messages.concat(_msgs);
},
const messages: WakuMessage[] = [];
await waku.store.queryHistory([], async (msgPromises) => {
await Promise.all(
msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
messages.push(msg);
}
})
);
});
expect(messages?.length).eq(totalMsgs);
@ -136,13 +157,16 @@ describe("Waku Store", () => {
let messages: WakuMessage[] = [];
const desiredMsgs = 14;
await waku.store.queryHistory([], {
pageSize: 7,
callback: (_msgs) => {
messages = messages.concat(_msgs);
await waku.store.queryHistory(
[],
async (msgPromises) => {
const msgsOrUndefined = await Promise.all(msgPromises);
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
messages = messages.concat(msgs);
return messages.length >= desiredMsgs;
},
});
{ pageSize: 7 }
);
expect(messages?.length).eq(desiredMsgs);
});
@ -171,9 +195,21 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages = await waku.store.queryHistory([], {
pageDirection: PageDirection.FORWARD,
});
let messages: WakuMessage[] = [];
await waku.store.queryHistory(
[],
async (msgPromises) => {
const msgsOrUndefined = await Promise.all(msgPromises);
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
// Note: messages within a page are ordered from oldest to most recent
// so the `concat` can only preserve order when `PageDirection`
// is forward
messages = messages.concat(msgs);
},
{
pageDirection: PageDirection.FORWARD,
}
);
expect(messages?.length).eq(15);
for (let index = 0; index < 2; index++) {
@ -218,9 +254,23 @@ describe("Waku Store", () => {
const nimPeerId = await nwaku.getPeerId();
const messages = await waku.store.queryHistory([], {
peerId: nimPeerId,
});
const messages: WakuMessage[] = [];
await waku.store.queryHistory(
[],
async (msgPromises) => {
await Promise.all(
msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
messages.push(msg);
}
})
);
},
{
peerId: nimPeerId,
}
);
expect(messages?.length).eq(2);
const result = messages?.findIndex((msg) => {
@ -246,6 +296,7 @@ describe("Waku Store", () => {
const symKey = generateSymmetricKey();
const publicKey = getPublicKey(privateKey);
const timestamp = new Date();
const [
encryptedAsymmetricMessage,
encryptedSymmetricMessage,
@ -257,6 +308,7 @@ describe("Waku Store", () => {
TestContentTopic,
{
encPublicKey: publicKey,
timestamp,
}
),
WakuMessage.fromUtf8String(
@ -264,16 +316,19 @@ describe("Waku Store", () => {
TestContentTopic,
{
symKey: symKey,
timestamp: new Date(timestamp.valueOf() + 1),
}
),
WakuMessage.fromUtf8String(clearMessageText, TestContentTopic),
WakuMessage.fromUtf8String(clearMessageText, TestContentTopic, {
timestamp: new Date(timestamp.valueOf() + 2),
}),
WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, {
encPublicKey: getPublicKey(generatePrivateKey()),
timestamp: new Date(timestamp.valueOf() + 3),
}),
]);
log("Messages have been encrypted");
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
createFullNode({
staticNoiseKey: NOISE_KEY_1,
@ -308,13 +363,23 @@ describe("Waku Store", () => {
waku2.store.addDecryptionKey(symKey);
log("Retrieve messages from store");
const messages = await waku2.store.queryHistory([], {
decryptionParams: [{ key: privateKey }],
});
let messages: WakuMessage[] = [];
await waku2.store.queryHistory(
[],
async (msgPromises) => {
const msgsOrUndefined = await Promise.all(msgPromises);
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
messages = messages.concat(msgs);
},
{
decryptionParams: [{ key: privateKey }],
}
);
expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText);
// Messages are ordered from oldest to latest within a page (1 page query)
expect(messages[0]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[2]?.payloadAsUtf8).to.eq(clearMessageText);
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
@ -341,6 +406,7 @@ describe("Waku Store", () => {
const symKey = generateSymmetricKey();
const publicKey = getPublicKey(privateKey);
const timestamp = new Date();
const [
encryptedAsymmetricMessage,
encryptedSymmetricMessage,
@ -352,6 +418,7 @@ describe("Waku Store", () => {
encryptedAsymmetricContentTopic,
{
encPublicKey: publicKey,
timestamp,
}
),
WakuMessage.fromUtf8String(
@ -359,17 +426,20 @@ describe("Waku Store", () => {
encryptedSymmetricContentTopic,
{
symKey: symKey,
timestamp: new Date(timestamp.valueOf() + 1),
}
),
WakuMessage.fromUtf8String(
clearMessageText,
encryptedAsymmetricContentTopic
encryptedAsymmetricContentTopic,
{ timestamp: new Date(timestamp.valueOf() + 2) }
),
WakuMessage.fromUtf8String(
otherEncMessageText,
encryptedSymmetricContentTopic,
{
encPublicKey: getPublicKey(generatePrivateKey()),
timestamp: new Date(timestamp.valueOf() + 3),
}
),
]);
@ -412,16 +482,26 @@ describe("Waku Store", () => {
method: DecryptionMethod.Symmetric,
});
let messages: WakuMessage[] = [];
log("Retrieve messages from store");
const messages = await waku2.store.queryHistory([], {
decryptionParams: [{ key: privateKey }],
});
await waku2.store.queryHistory(
[],
async (msgPromises) => {
const msgsOrUndefined = await Promise.all(msgPromises);
const msgs = msgsOrUndefined.filter(isWakuMessageDefined);
messages = messages.concat(msgs);
},
{
decryptionParams: [{ key: privateKey }],
}
);
expect(messages?.length).eq(3);
if (!messages) throw "Length was tested";
expect(messages[0].payloadAsUtf8).to.eq(clearMessageText);
// Messages are ordered from oldest to latest within a page (1 page query)
expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(clearMessageText);
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
@ -473,22 +553,50 @@ describe("Waku Store", () => {
const nwakuPeerId = await nwaku.getPeerId();
const firstMessage = await waku.store.queryHistory([], {
peerId: nwakuPeerId,
timeFilter: { startTime, endTime: message1Timestamp },
});
const bothMessages = await waku.store.queryHistory([], {
peerId: nwakuPeerId,
timeFilter: {
startTime,
endTime,
const firstMessages: WakuMessage[] = [];
await waku.store.queryHistory(
[],
async (msgPromises) => {
await Promise.all(
msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
firstMessages.push(msg);
}
})
);
},
});
{
peerId: nwakuPeerId,
timeFilter: { startTime, endTime: message1Timestamp },
}
);
expect(firstMessage?.length).eq(1);
const bothMessages: WakuMessage[] = [];
await waku.store.queryHistory(
[],
async (msgPromises) => {
await Promise.all(
msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
bothMessages.push(msg);
}
})
);
},
{
peerId: nwakuPeerId,
timeFilter: {
startTime,
endTime,
},
}
);
expect(firstMessage[0]?.payloadAsUtf8).eq("Message 0");
expect(firstMessages?.length).eq(1);
expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0");
expect(bothMessages?.length).eq(2);
});

View File

@ -1,3 +1,4 @@
import type { Connection } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import debug from "debug";
@ -19,7 +20,7 @@ import {
WakuMessage,
} from "../waku_message";
import { HistoryRPC, PageDirection } from "./history_rpc";
import { HistoryRPC, PageDirection, Params } from "./history_rpc";
import HistoryError = HistoryResponse.HistoryError;
@ -77,18 +78,6 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Callback called on pages of stored messages as they are retrieved.
*
* Allows for a faster access to the results as it is called as soon as a page
* is received. Traversal of the pages is done automatically so this function
* will invoked for each retrieved page.
*
* If the call on a page returns `true`, then traversal of the pages is aborted.
* For example, this can be used for the caller to stop the query after a
* specific message is found.
*/
callback?: (messages: WakuMessage[]) => void | boolean;
/**
* Keys that will be used to decrypt messages.
*
@ -121,6 +110,8 @@ export class WakuStore {
*
* @param contentTopics The content topics to pass to the query, leave empty to
* retrieve all messages.
* @param callback called on a page of retrieved messages. If the callback returns `true`
* then pagination is stopped.
* @param options Optional parameters.
*
* @throws If not able to reach a Waku Store peer to query
@ -128,8 +119,11 @@ export class WakuStore {
*/
async queryHistory(
contentTopics: string[],
callback: (
messages: Array<Promise<WakuMessage | undefined>>
) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<WakuMessage[]> {
): Promise<void> {
let startTime, endTime;
if (options?.timeFilter) {
@ -137,7 +131,7 @@ export class WakuStore {
endTime = options.timeFilter.endTime;
}
const opts = Object.assign(
const queryOpts = Object.assign(
{
pubSubTopic: this.pubSubTopic,
pageDirection: PageDirection.BACKWARD,
@ -155,7 +149,7 @@ export class WakuStore {
const res = await selectPeerForProtocol(
this.libp2p.peerStore,
Object.values(StoreCodecs),
opts?.peerId
options?.peerId
);
if (!res) {
@ -180,90 +174,20 @@ export class WakuStore {
// Add the decryption keys passed to this function against the
// content topics also passed to this function.
if (opts.decryptionParams) {
decryptionParams = decryptionParams.concat(opts.decryptionParams);
if (options?.decryptionParams) {
decryptionParams = decryptionParams.concat(options.decryptionParams);
}
const messages: WakuMessage[] = [];
let cursor = undefined;
while (true) {
const stream = await connection.newStream(protocol);
const queryOpts = Object.assign(opts, { cursor });
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
log("Querying store peer", connections[0].remoteAddr.toString());
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
const reply = historyRpcQuery.decode(bytes);
if (!reply.response) {
log("No message returned from store: `response` field missing");
return messages;
}
const response = reply.response as protoV2Beta4.HistoryResponse;
if (
response.error &&
response.error !== HistoryError.ERROR_NONE_UNSPECIFIED
) {
throw "History response contains an Error: " + response.error;
}
if (!response.messages || !response.messages.length) {
// No messages left (or stored)
log("No message returned from store: `messages` array empty");
return messages;
}
log(
`${response.messages.length} messages retrieved for (${opts.pubSubTopic})`,
contentTopics
);
const pageMessages: WakuMessage[] = [];
await Promise.all(
response.messages.map(async (protoMsg) => {
const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams);
if (msg) {
messages.push(msg);
pageMessages.push(msg);
}
})
);
let abort = false;
if (opts.callback) {
abort = Boolean(opts.callback(pageMessages));
}
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
abort ||
// Response page size smaller than query, meaning this is the last page
(responsePageSize && queryPageSize && responsePageSize < queryPageSize)
) {
return messages;
}
cursor = response.pagingInfo?.cursor;
if (cursor === undefined) {
// If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop
log("Store response does not contain a cursor, stopping pagination");
return messages;
for await (const messagePromises of paginate(
connection,
protocol,
queryOpts,
decryptionParams
)) {
const abort = Boolean(await callback(messagePromises));
if (abort) {
// TODO: Also abort underlying generator
break;
}
}
}
@ -306,3 +230,88 @@ export class WakuStore {
return getPeersForProtocol(this.libp2p.peerStore, codecs);
}
}
async function* paginate(
connection: Connection,
protocol: string,
queryOpts: Params,
decryptionParams: DecryptionParams[]
): AsyncGenerator<Promise<WakuMessage | undefined>[]> {
let cursor = undefined;
while (true) {
queryOpts = Object.assign(queryOpts, { cursor });
const stream = await connection.newStream(protocol);
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
log(
"Querying store peer",
connection.remoteAddr.toString(),
`for (${queryOpts.pubSubTopic})`,
queryOpts.contentTopics
);
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});
const reply = historyRpcQuery.decode(bytes);
if (!reply.response) {
log("Stopping pagination due to store `response` field missing");
break;
}
const response = reply.response as protoV2Beta4.HistoryResponse;
if (
response.error &&
response.error !== HistoryError.ERROR_NONE_UNSPECIFIED
) {
throw "History response contains an Error: " + response.error;
}
if (!response.messages) {
log(
"Stopping pagination due to store `response.messages` field missing or empty"
);
break;
}
log(`${response.messages.length} messages retrieved from store`);
yield response.messages.map((protoMsg) =>
WakuMessage.decodeProto(protoMsg, decryptionParams)
);
cursor = response.pagingInfo?.cursor;
if (typeof cursor === "undefined") {
// If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop
log(
"Stopping pagination due to `response.pagingInfo.cursor` missing from store response"
);
break;
}
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (
// Response page size smaller than query, meaning this is the last page
responsePageSize &&
queryPageSize &&
responsePageSize < queryPageSize
) {
break;
}
}
}