Merge pull request #963 from waku-org/template-decoder

This commit is contained in:
fryorcraken.eth 2022-09-21 11:04:24 +10:00 committed by GitHub
commit dc50c176c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 340 additions and 134 deletions

View File

@ -7,19 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- Waku message encoding and decoding is more generic, to enable upcoming feature such as [RLN](https://rfc.vac.dev/spec/17/) & [Noise](https://rfc.vac.dev/spec/43/);
it also enables separating the `version_1` module out to reduce bundle size and improve cross-platform compatibility when not used.
- Due to the change above, all APIs that handle messages have changed to receive a `Decoder` or `Encoder`.
## [0.28.1] - 2022-09-20
### Added
- `WakuRelay.addObserver` now returns a function to delete the observer.
- `WakuLightPush.push` and `WakuRelay.send` returns `SendResult` with the list of recipients.
### Changed
- `queryCallbackOnPromise`'s return value has been simplified to `Promise<void>`.
- doc: clarified behaviour of `WakuStore` query functions.
- Waku message encoding and decoding is more generic, to enable upcoming feature such as [RLN](https://rfc.vac.dev/spec/17/) & [Noise](https://rfc.vac.dev/spec/43/);
it also enables separating the `version_1` module out to reduce bundle size and improve cross-platform compatibility when not used.
- Due to the change above, all APIs that handle messages have changed to receive a `Decoder` or `Encoder`.
### Deleted

View File

@ -1,7 +0,0 @@
version: v1beta1
plugins:
- name: ts_proto
out: ./src/proto
strategy: all
opt: grpc_js,esModuleInterop=true,forceLong=long

View File

@ -1,5 +0,0 @@
version: v1beta1
build:
roots:
- ./proto

View File

@ -39,6 +39,10 @@
"./lib/waku_message/version_1": {
"types": "./dist/lib/waku_message/version_1.d.ts",
"import": "./dist/lib/waku_message/version_1.js"
},
"./lib/waku_message/topic_only_message": {
"types": "./dist/lib/waku_message/topic_only_message.d.ts",
"import": "./dist/lib/waku_message/topic_only_message.js"
}
},
"typesVersions": {
@ -197,6 +201,9 @@
],
"*.{ts,json,conf*.*js}": [
"prettier --write"
],
"./*.*js": [
"prettier --write"
]
}
}

View File

@ -12,6 +12,8 @@ export default {
"lib/wait_for_remote_peer": "dist/lib/wait_for_remote_peer.js",
"lib/waku_message/version_0": "dist/lib/waku_message/version_0.js",
"lib/waku_message/version_1": "dist/lib/waku_message/version_1.js",
"lib/waku_message/topic_only_message":
"dist/lib/waku_message/topic_only_message.js",
},
output: {
dir: "bundle",

View File

@ -11,6 +11,7 @@ export * as enr from "./lib/enr";
export * as utils from "./lib/utils";
export * as proto_message from "./proto/message";
export * as proto_topic_only_message from "./proto/topic_only_message";
export * as waku from "./lib/waku";
export { WakuNode, Protocols } from "./lib/waku";

View File

@ -70,8 +70,12 @@ export interface Encoder {
encodeProto: (message: Message) => Promise<ProtoMessage | undefined>;
}
export interface Decoder {
export interface Decoder<T extends Message> {
contentTopic: string;
decodeProto: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
decode: (proto: ProtoMessage) => Promise<Message | undefined>;
decode: (proto: ProtoMessage) => Promise<T | undefined>;
}
export interface SendResult {
recipients: PeerId[];
}

View File

@ -13,6 +13,8 @@ import { DecoderV0, EncoderV0 } from "../waku_message/version_0";
const log = debug("waku:test");
const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = new EncoderV0(TestContentTopic);
const TestDecoder = new DecoderV0(TestContentTopic);
describe("Waku Filter", () => {
let waku: WakuFull;
@ -50,16 +52,13 @@ describe("Waku Filter", () => {
expect(bytesToUtf8(msg.payload!)).to.eq(messageText);
};
const decoder = new DecoderV0(TestContentTopic);
await waku.filter.subscribe([decoder], callback);
await waku.filter.subscribe([TestDecoder], callback);
// As the filter protocol does not cater for an ack of subscription
// we cannot know whether the subscription happened. Something we want to
// correct in future versions of the protocol.
await delay(200);
const encoder = new EncoderV0(TestContentTopic);
await waku.lightPush.push(encoder, message);
await waku.lightPush.push(TestEncoder, message);
while (messageCount === 0) {
await delay(250);
}
@ -74,15 +73,13 @@ describe("Waku Filter", () => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
const decoder = new DecoderV0(TestContentTopic);
await waku.filter.subscribe([decoder], callback);
await waku.filter.subscribe([TestDecoder], callback);
await delay(200);
const encoder = new EncoderV0(TestContentTopic);
await waku.lightPush.push(encoder, {
await waku.lightPush.push(TestEncoder, {
payload: utf8ToBytes("Filtering works!"),
});
await waku.lightPush.push(encoder, {
await waku.lightPush.push(TestEncoder, {
payload: utf8ToBytes("Filtering still works!"),
});
while (messageCount < 2) {
@ -96,19 +93,16 @@ describe("Waku Filter", () => {
const callback = (): void => {
messageCount++;
};
const decoder = new DecoderV0(TestContentTopic);
const unsubscribe = await waku.filter.subscribe([decoder], callback);
const encoder = new EncoderV0(TestContentTopic);
const unsubscribe = await waku.filter.subscribe([TestDecoder], callback);
await delay(200);
await waku.lightPush.push(encoder, {
await waku.lightPush.push(TestEncoder, {
payload: utf8ToBytes("This should be received"),
});
await delay(100);
await unsubscribe();
await delay(200);
await waku.lightPush.push(encoder, {
await waku.lightPush.push(TestEncoder, {
payload: utf8ToBytes("This should not be received"),
});
await delay(100);

View File

@ -63,9 +63,9 @@ export type UnsubscribeFunction = () => Promise<void>;
export class WakuFilter {
pubSubTopic: string;
private subscriptions: Map<string, FilterCallback>;
public decoders: Map<
private decoders: Map<
string, // content topic
Set<Decoder>
Set<Decoder<any>>
>;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
@ -83,8 +83,8 @@ export class WakuFilter {
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe(
decoders: Decoder[],
async subscribe<T extends Message>(
decoders: Decoder<T>[],
callback: FilterCallback,
opts?: FilterSubscriptionOpts
): Promise<UnsubscribeFunction> {
@ -217,7 +217,9 @@ export class WakuFilter {
this.subscriptions.delete(requestId);
}
private addDecoders(decoders: Map<string, Array<Decoder>>): void {
private addDecoders<T extends Message>(
decoders: Map<string, Array<Decoder<T>>>
): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (!currDecs) {
@ -228,7 +230,9 @@ export class WakuFilter {
});
}
private deleteDecoders(decoders: Map<string, Array<Decoder>>): void {
private deleteDecoders<T extends Message>(
decoders: Map<string, Array<Decoder<T>>>
): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (currDecs) {

View File

@ -18,6 +18,7 @@ import { EncoderV0 } from "../waku_message/version_0";
const log = debug("waku:test:lightpush");
const TestContentTopic = "/test/1/waku-light-push/utf8";
const TestEncoder = new EncoderV0(TestContentTopic);
describe("Waku Light Push [node only]", () => {
let waku: WakuFull;
@ -42,12 +43,11 @@ describe("Waku Light Push [node only]", () => {
await waitForRemotePeer(waku, [Protocols.LightPush]);
const messageText = "Light Push works!";
const encoder = new EncoderV0(TestContentTopic);
const pushResponse = await waku.lightPush.push(encoder, {
const pushResponse = await waku.lightPush.push(TestEncoder, {
payload: utf8ToBytes(messageText),
});
expect(pushResponse?.isSuccess).to.be.true;
expect(pushResponse.recipients.length).to.eq(1);
let msgs: MessageRpcResponse[] = [];
@ -79,11 +79,10 @@ describe("Waku Light Push [node only]", () => {
const nimPeerId = await nwaku.getPeerId();
const messageText = "Light Push works!";
const encoder = new EncoderV0(TestContentTopic);
log("Send message via lightpush");
const pushResponse = await waku.lightPush.push(
encoder,
TestEncoder,
{ payload: utf8ToBytes(messageText) },
{
peerId: nimPeerId,
@ -91,7 +90,7 @@ describe("Waku Light Push [node only]", () => {
}
);
log("Ack received", pushResponse);
expect(pushResponse?.isSuccess).to.be.true;
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
let msgs: MessageRpcResponse[] = [];

View File

@ -9,7 +9,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants";
import { Encoder, Message } from "../interfaces";
import { Encoder, Message, SendResult } from "../interfaces";
import { selectConnection } from "../select_connection";
import {
getPeersForProtocol,
@ -55,7 +55,7 @@ export class WakuLightPush {
encoder: Encoder,
message: Message,
opts?: PushOptions
): Promise<PushResponse | undefined> {
): Promise<SendResult> {
const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic;
const res = await selectPeerForProtocol(
@ -75,11 +75,14 @@ export class WakuLightPush {
if (!connection) throw "Failed to get a connection to the peer";
const stream = await connection.newStream(LightPushCodec);
const recipients: PeerId[] = [];
try {
const protoMessage = await encoder.encodeProto(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
return;
return { recipients };
}
const query = PushRPC.createRequest(protoMessage, pubSubTopic);
const res = await pipe(
@ -99,17 +102,19 @@ export class WakuLightPush {
if (!response) {
log("No response in PushRPC");
return;
return { recipients };
}
return response;
if (response.isSuccess) {
recipients.push(peer.id);
}
} catch (err) {
log("Failed to decode push reply", err);
}
} catch (err) {
log("Failed to send waku light push request", err);
}
return;
return { recipients };
}
/**

View File

@ -0,0 +1,28 @@
import debug from "debug";
import * as proto from "../../proto/topic_only_message";
import type { Decoder, Message, ProtoMessage } from "../interfaces";
const log = debug("waku:message:topic-only");
export class TopicOnlyMessage implements Message {
constructor(private proto: proto.TopicOnlyMessage) {}
get contentTopic(): string {
return this.proto.contentTopic ?? "";
}
}
export class TopicOnlyDecoder implements Decoder<TopicOnlyMessage> {
public contentTopic = "";
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
const protoMessage = proto.TopicOnlyMessage.decode(bytes);
log("Message decoded", protoMessage);
return Promise.resolve(protoMessage);
}
async decode(proto: ProtoMessage): Promise<TopicOnlyMessage | undefined> {
return new TopicOnlyMessage(proto);
}
}

View File

@ -75,7 +75,7 @@ export class EncoderV0 implements Encoder {
}
}
export class DecoderV0 implements Decoder {
export class DecoderV0 implements Decoder<MessageV0> {
constructor(public contentTopic: string) {}
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
@ -84,7 +84,7 @@ export class DecoderV0 implements Decoder {
return Promise.resolve(protoMessage);
}
async decode(proto: ProtoMessage): Promise<Message | undefined> {
async decode(proto: ProtoMessage): Promise<MessageV0 | undefined> {
// https://github.com/status-im/js-waku/issues/921
if (proto.version === undefined) {
proto.version = 0;

View File

@ -110,7 +110,7 @@ export class SymEncoder implements Encoder {
}
}
export class AsymDecoder extends DecoderV0 implements Decoder {
export class AsymDecoder extends DecoderV0 implements Decoder<MessageV1> {
constructor(contentTopic: string, private privateKey: Uint8Array) {
super(contentTopic);
}
@ -166,7 +166,7 @@ export class AsymDecoder extends DecoderV0 implements Decoder {
}
}
export class SymDecoder extends DecoderV0 implements Decoder {
export class SymDecoder extends DecoderV0 implements Decoder<MessageV1> {
constructor(contentTopic: string, private symKey: Uint8Array) {
super(contentTopic);
}

View File

@ -160,7 +160,9 @@ describe("Waku Relay [node only]", () => {
payload: utf8ToBytes(fooMessageText),
});
await delay(200);
while (!fooMessages.length && !barMessages.length) {
await delay(100);
}
expect(fooMessages[0].contentTopic).to.eq(fooContentTopic);
expect(bytesToUtf8(fooMessages[0].payload!)).to.eq(fooMessageText);
@ -381,9 +383,7 @@ describe("Waku Relay [node only]", () => {
const messageText = "Here is another message.";
const receivedMsgPromise: Promise<MessageV0> = new Promise((resolve) => {
waku.relay.addObserver(TestDecoder, (msg) =>
resolve(msg as unknown as MessageV0)
);
waku.relay.addObserver<MessageV0>(TestDecoder, (msg) => resolve(msg));
});
await nwaku.sendMessage(

View File

@ -8,19 +8,23 @@ import {
TopicStr,
} from "@chainsafe/libp2p-gossipsub/dist/src/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import { PublishResult } from "@libp2p/interface-pubsub";
import debug from "debug";
import { DefaultPubSubTopic } from "../constants";
import { Decoder, Encoder, Message } from "../interfaces";
import { Decoder, Encoder, Message, SendResult } from "../interfaces";
import { pushOrInitMapSet } from "../push_or_init_map";
import { DecoderV0 } from "../waku_message/version_0";
import { TopicOnlyDecoder } from "../waku_message/topic_only_message";
import * as constants from "./constants";
const log = debug("waku:relay");
export type Callback = (msg: Message) => void;
export type Callback<T extends Message> = (msg: T) => void;
export type Observer<T extends Message> = {
decoder: Decoder<T>;
callback: Callback<T>;
};
export type CreateOptions = {
/**
@ -47,13 +51,14 @@ export type CreateOptions = {
*/
export class WakuRelay extends GossipSub {
pubSubTopic: string;
defaultDecoder: Decoder<Message>;
public static multicodec: string = constants.RelayCodecs[0];
/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
public observers: Map<string, Set<{ decoder: Decoder; callback: Callback }>>;
public observers: Map<string, Set<Observer<any>>>;
constructor(options?: Partial<CreateOptions>) {
options = Object.assign(options ?? {}, {
@ -67,6 +72,9 @@ export class WakuRelay extends GossipSub {
this.observers = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder();
}
/**
@ -84,10 +92,7 @@ export class WakuRelay extends GossipSub {
/**
* Send Waku message.
*/
public async send(
encoder: Encoder,
message: Message
): Promise<PublishResult> {
public async send(encoder: Encoder, message: Message): Promise<SendResult> {
const msg = await encoder.encode(message);
if (!msg) {
log("Failed to encode message, aborting publish");
@ -101,7 +106,10 @@ export class WakuRelay extends GossipSub {
*
* @returns Function to delete the observer
*/
addObserver(decoder: Decoder, callback: Callback): () => void {
addObserver<T extends Message>(
decoder: Decoder<T>,
callback: Callback<T>
): () => void {
const observer = {
decoder,
callback,
@ -128,30 +136,32 @@ export class WakuRelay extends GossipSub {
if (event.detail.msg.topic !== pubSubTopic) return;
log(`Message received on ${pubSubTopic}`);
const decoderV0 = new DecoderV0("");
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
const protoMsg = await decoderV0.decodeProto(event.detail.msg.data);
if (!protoMsg) {
return;
}
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic === "undefined") {
const topicOnlyMsg = await this.defaultDecoder.decodeProto(
event.detail.msg.data
);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log("Message does not have a content topic, skipping");
return;
}
const observers = this.observers.get(contentTopic);
const observers = this.observers.get(topicOnlyMsg.contentTopic);
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => {
const protoMsg = await decoder.decodeProto(event.detail.msg.data);
if (!protoMsg) {
log(
"Internal error: message previously decoded failed on 2nd pass."
);
return;
}
const msg = await decoder.decode(protoMsg);
if (msg) {
callback(msg);
} else {
log("Failed to decode messages on", contentTopic);
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
}
})
);

View File

@ -72,7 +72,7 @@ describe("Waku Store", () => {
const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) {
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
@ -103,7 +103,7 @@ describe("Waku Store", () => {
const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) {
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
@ -142,12 +142,15 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: Message[] = [];
await waku.store.queryCallbackOnPromise(TestDecoder, async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
await waku.store.queryCallbackOnPromise(
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
}
}
});
);
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
@ -182,7 +185,7 @@ describe("Waku Store", () => {
const desiredMsgs = 14;
const messages: Message[] = [];
await waku.store.queryCallbackOnPromise(
TestDecoder,
[TestDecoder],
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
@ -220,7 +223,7 @@ describe("Waku Store", () => {
const messages: Message[] = [];
await waku.store.queryOrderedCallback(
TestDecoder,
[TestDecoder],
async (msg) => {
messages.push(msg);
},
@ -263,7 +266,7 @@ describe("Waku Store", () => {
let messages: Message[] = [];
await waku.store.queryOrderedCallback(
TestDecoder,
[TestDecoder],
async (msg) => {
messages.push(msg);
},
@ -361,25 +364,11 @@ describe("Waku Store", () => {
const messages: Message[] = [];
log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator(asymDecoder)) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
for await (const msgPromises of waku2.store.queryGenerator(symDecoder)) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
for await (const msgPromises of waku2.store.queryGenerator(TestDecoder)) {
for await (const msgPromises of waku2.store.queryGenerator([
asymDecoder,
symDecoder,
TestDecoder,
])) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
@ -443,7 +432,7 @@ describe("Waku Store", () => {
const firstMessages: Message[] = [];
await waku.store.queryOrderedCallback(
TestDecoder,
[TestDecoder],
(msg) => {
if (msg) {
firstMessages.push(msg);
@ -457,7 +446,7 @@ describe("Waku Store", () => {
const bothMessages: Message[] = [];
await waku.store.queryOrderedCallback(
TestDecoder,
[TestDecoder],
async (msg) => {
bothMessages.push(msg);
},
@ -524,7 +513,7 @@ describe("Waku Store, custom pubsub topic", () => {
const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) {
for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {

View File

@ -100,16 +100,17 @@ export class WakuStore {
* If strong ordering is needed, you may need to handle this at application level
* and set your own timestamps too (the WakuMessage timestamps are not certified).
*
* @throws If not able to reach a Waku Store peer to query
* or if an error is encountered when processing the reply.
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryOrderedCallback(
decoder: Decoder,
async queryOrderedCallback<T extends Message>(
decoders: Decoder<T>[],
callback: (message: Message) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
const abort = false;
for await (const promises of this.queryGenerator(decoder, options)) {
for await (const promises of this.queryGenerator(decoders, options)) {
if (abort) break;
let messages = await Promise.all(promises);
@ -148,11 +149,12 @@ export class WakuStore {
* break the order as it may rely on the browser decryption API, which in turn,
* may have a different speed depending on the type of decryption.
*
* @throws If not able to reach a Waku Store peer to query
* or if an error is encountered when processing the reply.
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async queryCallbackOnPromise(
decoder: Decoder,
async queryCallbackOnPromise<T extends Message>(
decoders: Decoder<T>[],
callback: (
message: Promise<Message | undefined>
) => Promise<void | boolean> | boolean | void,
@ -160,7 +162,7 @@ export class WakuStore {
): Promise<void> {
let abort = false;
let promises: Promise<void>[] = [];
for await (const page of this.queryGenerator(decoder, options)) {
for await (const page of this.queryGenerator(decoders, options)) {
const _promises = page.map(async (msg) => {
if (!abort) {
abort = Boolean(await callback(msg));
@ -185,11 +187,12 @@ export class WakuStore {
*
* However, there is no way to guarantee the behavior of the remote node.
*
* @throws If not able to reach a Waku Store peer to query
* or if an error is encountered when processing the reply.
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
*/
async *queryGenerator(
decoder: Decoder,
async *queryGenerator<T extends Message>(
decoders: Decoder<T>[],
options?: QueryOptions
): AsyncGenerator<Promise<Message | undefined>[]> {
let startTime, endTime;
@ -199,7 +202,17 @@ export class WakuStore {
endTime = options.timeFilter.endTime;
}
const contentTopic = decoder.contentTopic;
const decodersAsMap = new Map();
decoders.forEach((dec) => {
if (decodersAsMap.has(dec.contentTopic)) {
throw new Error(
"API does not support different decoder per content topic"
);
}
decodersAsMap.set(dec.contentTopic, dec);
});
const contentTopics = decoders.map((dec) => dec.contentTopic);
const queryOpts = Object.assign(
{
@ -208,7 +221,7 @@ export class WakuStore {
pageSize: DefaultPageSize,
},
options,
{ contentTopics: [contentTopic], startTime, endTime }
{ contentTopics, startTime, endTime }
);
log("Querying history with the following options", {
@ -236,7 +249,7 @@ export class WakuStore {
connection,
protocol,
queryOpts,
decoder
decodersAsMap
)) {
yield messages;
}
@ -256,12 +269,21 @@ export class WakuStore {
}
}
async function* paginate(
async function* paginate<T extends Message>(
connection: Connection,
protocol: string,
queryOpts: Params,
decoder: Decoder
): AsyncGenerator<Promise<Message | undefined>[]> {
decoders: Map<string, Decoder<T>>
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Array.from(decoders.keys()).toString()
) {
throw new Error(
"Internal error, the decoders should match the query's content topics"
);
}
let cursor = undefined;
while (true) {
queryOpts = Object.assign(queryOpts, { cursor });
@ -314,7 +336,16 @@ async function* paginate(
log(`${response.messages.length} messages retrieved from store`);
yield response.messages.map((protoMsg) => decoder.decode(protoMsg));
yield response.messages.map((protoMsg) => {
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic !== "undefined") {
const decoder = decoders.get(contentTopic);
if (decoder) {
return decoder.decode(protoMsg);
}
}
return Promise.resolve(undefined);
});
cursor = response.pagingInfo?.cursor;
if (typeof cursor === "undefined") {

View File

@ -0,0 +1,67 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message } from "protons-runtime";
import type { Uint8ArrayList } from "uint8arraylist";
import type { Codec } from "protons-runtime";
export interface MessageTopicOnly {
contentTopic?: string;
}
export namespace MessageTopicOnly {
let _codec: Codec<MessageTopicOnly>;
export const codec = (): Codec<MessageTopicOnly> => {
if (_codec == null) {
_codec = message<MessageTopicOnly>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.contentTopic != null) {
writer.uint32(18);
writer.string(obj.contentTopic);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 2:
obj.contentTopic = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: MessageTopicOnly): Uint8Array => {
return encodeMessage(obj, MessageTopicOnly.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): MessageTopicOnly => {
return decodeMessage(buf, MessageTopicOnly.codec());
};
}

View File

@ -0,0 +1,5 @@
syntax = "proto3";
message TopicOnlyMessage {
optional string content_topic = 2;
}

View File

@ -0,0 +1,67 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message } from "protons-runtime";
import type { Uint8ArrayList } from "uint8arraylist";
import type { Codec } from "protons-runtime";
export interface TopicOnlyMessage {
contentTopic?: string;
}
export namespace TopicOnlyMessage {
let _codec: Codec<TopicOnlyMessage>;
export const codec = (): Codec<TopicOnlyMessage> => {
if (_codec == null) {
_codec = message<TopicOnlyMessage>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.contentTopic != null) {
writer.uint32(18);
writer.string(obj.contentTopic);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 2:
obj.contentTopic = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: TopicOnlyMessage): Uint8Array => {
return encodeMessage(obj, TopicOnlyMessage.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): TopicOnlyMessage => {
return decodeMessage(buf, TopicOnlyMessage.codec());
};
}

View File

@ -8,7 +8,8 @@
"./src/lib/predefined_bootstrap_nodes.ts",
"./src/lib/wait_for_remote_peer.ts",
"./src/lib/waku_message/version_0.ts",
"./src/lib/waku_message/version_1.ts"
"./src/lib/waku_message/version_1.ts",
"./src/lib/waku_message/topic_only_message.ts"
],
"out": "build/docs",
"exclude": ["**/*.spec.ts"],