feat: enable store queries with multiple content topics and decoders

This commit is contained in:
fryorcraken.eth 2022-09-19 16:33:07 +10:00
parent c0c4965e28
commit 8679adcf80
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
2 changed files with 69 additions and 49 deletions

View File

@ -72,7 +72,7 @@ describe("Waku Store", () => {
const messages: Message[] = []; const messages: Message[] = [];
let promises: Promise<void>[] = []; let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => { const _promises = msgPromises.map(async (promise) => {
const msg = await promise; const msg = await promise;
if (msg) { if (msg) {
@ -103,7 +103,7 @@ describe("Waku Store", () => {
const messages: Message[] = []; const messages: Message[] = [];
let promises: Promise<void>[] = []; let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => { const _promises = msgPromises.map(async (promise) => {
const msg = await promise; const msg = await promise;
if (msg) { if (msg) {
@ -142,12 +142,15 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]); await waitForRemotePeer(waku, [Protocols.Store]);
const messages: Message[] = []; const messages: Message[] = [];
await waku.store.queryCallbackOnPromise(TestDecoder, async (msgPromise) => { await waku.store.queryCallbackOnPromise(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise; const msg = await msgPromise;
if (msg) { if (msg) {
messages.push(msg); messages.push(msg);
} }
}); }
);
expect(messages?.length).eq(totalMsgs); expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => { const result = messages?.findIndex((msg) => {
@ -182,7 +185,7 @@ describe("Waku Store", () => {
const desiredMsgs = 14; const desiredMsgs = 14;
const messages: Message[] = []; const messages: Message[] = [];
await waku.store.queryCallbackOnPromise( await waku.store.queryCallbackOnPromise(
TestDecoder, [TestDecoder],
async (msgPromise) => { async (msgPromise) => {
const msg = await msgPromise; const msg = await msgPromise;
if (msg) { if (msg) {
@ -220,7 +223,7 @@ describe("Waku Store", () => {
const messages: Message[] = []; const messages: Message[] = [];
await waku.store.queryOrderedCallback( await waku.store.queryOrderedCallback(
TestDecoder, [TestDecoder],
async (msg) => { async (msg) => {
messages.push(msg); messages.push(msg);
}, },
@ -263,7 +266,7 @@ describe("Waku Store", () => {
let messages: Message[] = []; let messages: Message[] = [];
await waku.store.queryOrderedCallback( await waku.store.queryOrderedCallback(
TestDecoder, [TestDecoder],
async (msg) => { async (msg) => {
messages.push(msg); messages.push(msg);
}, },
@ -361,25 +364,11 @@ describe("Waku Store", () => {
const messages: Message[] = []; const messages: Message[] = [];
log("Retrieve messages from store"); log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator(asymDecoder)) { for await (const msgPromises of waku2.store.queryGenerator([
for (const promise of msgPromises) { asymDecoder,
const msg = await promise; symDecoder,
if (msg) { TestDecoder,
messages.push(msg); ])) {
}
}
}
for await (const msgPromises of waku2.store.queryGenerator(symDecoder)) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
for await (const msgPromises of waku2.store.queryGenerator(TestDecoder)) {
for (const promise of msgPromises) { for (const promise of msgPromises) {
const msg = await promise; const msg = await promise;
if (msg) { if (msg) {
@ -443,7 +432,7 @@ describe("Waku Store", () => {
const firstMessages: Message[] = []; const firstMessages: Message[] = [];
await waku.store.queryOrderedCallback( await waku.store.queryOrderedCallback(
TestDecoder, [TestDecoder],
(msg) => { (msg) => {
if (msg) { if (msg) {
firstMessages.push(msg); firstMessages.push(msg);
@ -457,7 +446,7 @@ describe("Waku Store", () => {
const bothMessages: Message[] = []; const bothMessages: Message[] = [];
await waku.store.queryOrderedCallback( await waku.store.queryOrderedCallback(
TestDecoder, [TestDecoder],
async (msg) => { async (msg) => {
bothMessages.push(msg); bothMessages.push(msg);
}, },
@ -524,7 +513,7 @@ describe("Waku Store, custom pubsub topic", () => {
const messages: Message[] = []; const messages: Message[] = [];
let promises: Promise<void>[] = []; let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => { const _promises = msgPromises.map(async (promise) => {
const msg = await promise; const msg = await promise;
if (msg) { if (msg) {

View File

@ -100,16 +100,17 @@ export class WakuStore {
* If strong ordering is needed, you may need to handle this at application level * If strong ordering is needed, you may need to handle this at application level
* and set your own timestamps too (the WakuMessage timestamps are not certified). * and set your own timestamps too (the WakuMessage timestamps are not certified).
* *
* @throws If not able to reach a Waku Store peer to query * @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply. * or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/ */
async queryOrderedCallback<T extends Message>( async queryOrderedCallback<T extends Message>(
decoder: Decoder<T>, decoders: Decoder<T>[],
callback: (message: Message) => Promise<void | boolean> | boolean | void, callback: (message: Message) => Promise<void | boolean> | boolean | void,
options?: QueryOptions options?: QueryOptions
): Promise<void> { ): Promise<void> {
const abort = false; const abort = false;
for await (const promises of this.queryGenerator(decoder, options)) { for await (const promises of this.queryGenerator(decoders, options)) {
if (abort) break; if (abort) break;
let messages = await Promise.all(promises); let messages = await Promise.all(promises);
@ -148,11 +149,12 @@ export class WakuStore {
* break the order as it may rely on the browser decryption API, which in turn, * break the order as it may rely on the browser decryption API, which in turn,
* may have a different speed depending on the type of decryption. * may have a different speed depending on the type of decryption.
* *
* @throws If not able to reach a Waku Store peer to query * @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply. * or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/ */
async queryCallbackOnPromise<T extends Message>( async queryCallbackOnPromise<T extends Message>(
decoder: Decoder<T>, decoders: Decoder<T>[],
callback: ( callback: (
message: Promise<Message | undefined> message: Promise<Message | undefined>
) => Promise<void | boolean> | boolean | void, ) => Promise<void | boolean> | boolean | void,
@ -160,7 +162,7 @@ export class WakuStore {
): Promise<void> { ): Promise<void> {
let abort = false; let abort = false;
let promises: Promise<void>[] = []; let promises: Promise<void>[] = [];
for await (const page of this.queryGenerator(decoder, options)) { for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msg) => { const _promises = page.map(async (msg) => {
if (!abort) { if (!abort) {
abort = Boolean(await callback(msg)); abort = Boolean(await callback(msg));
@ -185,11 +187,12 @@ export class WakuStore {
* *
* However, there is no way to guarantee the behavior of the remote node. * However, there is no way to guarantee the behavior of the remote node.
* *
* @throws If not able to reach a Waku Store peer to query * @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply. * or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/ */
async *queryGenerator<T extends Message>( async *queryGenerator<T extends Message>(
decoder: Decoder<T>, decoders: Decoder<T>[],
options?: QueryOptions options?: QueryOptions
): AsyncGenerator<Promise<Message | undefined>[]> { ): AsyncGenerator<Promise<Message | undefined>[]> {
let startTime, endTime; let startTime, endTime;
@ -199,7 +202,17 @@ export class WakuStore {
endTime = options.timeFilter.endTime; endTime = options.timeFilter.endTime;
} }
const contentTopic = decoder.contentTopic; const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
throw new Error(
"API does not support different decoder per content topic"
);
}
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders.map((dec) => dec.contentTopic);
const queryOpts = Object.assign( const queryOpts = Object.assign(
{ {
@ -208,7 +221,7 @@ export class WakuStore {
pageSize: DefaultPageSize, pageSize: DefaultPageSize,
}, },
options, options,
{ contentTopics: [contentTopic], startTime, endTime } { contentTopics, startTime, endTime }
); );
log("Querying history with the following options", { log("Querying history with the following options", {
@ -236,7 +249,7 @@ export class WakuStore {
connection, connection,
protocol, protocol,
queryOpts, queryOpts,
decoder decodersAsMap
)) { )) {
yield messages; yield messages;
} }
@ -260,8 +273,17 @@ async function* paginate<T extends Message>(
connection: Connection, connection: Connection,
protocol: string, protocol: string,
queryOpts: Params, queryOpts: Params,
decoder: Decoder<T> decoders: Map<string, Decoder<T>>
): AsyncGenerator<Promise<Message | undefined>[]> { ): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Array.from(decoders.keys()).toString()
) {
throw new Error(
"Internal error, the decoders should match the query's content topics"
);
}
let cursor = undefined; let cursor = undefined;
while (true) { while (true) {
queryOpts = Object.assign(queryOpts, { cursor }); queryOpts = Object.assign(queryOpts, { cursor });
@ -314,7 +336,16 @@ async function* paginate<T extends Message>(
log(`${response.messages.length} messages retrieved from store`); log(`${response.messages.length} messages retrieved from store`);
yield response.messages.map((protoMsg) => decoder.decode(protoMsg)); yield response.messages.map((protoMsg) => {
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic !== "undefined") {
const decoder = decoders.get(contentTopic);
if (decoder) {
return decoder.decode(protoMsg);
}
}
return Promise.resolve(undefined);
});
cursor = response.pagingInfo?.cursor; cursor = response.pagingInfo?.cursor;
if (typeof cursor === "undefined") { if (typeof cursor === "undefined") {