From 9ff602da7ebe3836d345d2048df65e8a8335299f Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 20 Sep 2022 12:38:04 +1000 Subject: [PATCH 1/8] doc: fix changelog --- CHANGELOG.md | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b1b7d6474..966710ecce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Waku message encoding and decoding is more generic, to enable upcoming feature such as [RLN](https://rfc.vac.dev/spec/17/) & [Noise](https://rfc.vac.dev/spec/43/); + it also enables separating the `version_1` module out to reduce bundle size and improve cross-platform compatibility when not used. +- Due to the change above, all APIs that handle messages have changed to receive a `Decoder` or `Encoder`. + ## [0.28.1] - 2022-09-20 ### Added @@ -17,9 +23,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `queryCallbackOnPromise`'s return value has been simplified to `Promise`. - doc: clarified behaviour of `WakuStore` query functions. -- Waku message encoding and decoding is more generic, to enable upcoming feature such as [RLN](https://rfc.vac.dev/spec/17/) & [Noise](https://rfc.vac.dev/spec/43/); - it also enables separating the `version_1` module out to reduce bundle size and improve cross-platform compatibility when not used. -- Due to the change above, all APIs that handle messages have changed to receive a `Decoder` or `Encoder`. ### Deleted From 52005f8963cac3fd597be4e4c81d78f372a802f3 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 19 Sep 2022 16:06:03 +1000 Subject: [PATCH 2/8] feat: enable custom Message type on Decoder This enables the type passed on the callback functions to match the decoder's so the consumer can access implementation specific fields. --- src/lib/interfaces.ts | 4 ++-- src/lib/waku_filter/index.ts | 16 ++++++++++------ src/lib/waku_message/version_0.ts | 4 ++-- src/lib/waku_message/version_1.ts | 4 ++-- src/lib/waku_relay/index.node.spec.ts | 4 +--- src/lib/waku_relay/index.ts | 14 +++++++++++--- src/lib/waku_store/index.ts | 16 ++++++++-------- 7 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/lib/interfaces.ts b/src/lib/interfaces.ts index 4df89f1b47..1b210be585 100644 --- a/src/lib/interfaces.ts +++ b/src/lib/interfaces.ts @@ -70,8 +70,8 @@ export interface Encoder { encodeProto: (message: Message) => Promise; } -export interface Decoder { +export interface Decoder { contentTopic: string; decodeProto: (bytes: Uint8Array) => Promise; - decode: (proto: ProtoMessage) => Promise; + decode: (proto: ProtoMessage) => Promise; } diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index 094c7d7e02..003642a480 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -63,9 +63,9 @@ export type UnsubscribeFunction = () => Promise; export class WakuFilter { pubSubTopic: string; private subscriptions: Map; - public decoders: Map< + private decoders: Map< string, // content topic - Set + Set> >; constructor(public libp2p: Libp2p, options?: CreateOptions) { @@ -83,8 +83,8 @@ export class WakuFilter { * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @returns Unsubscribe function that can be used to end the subscription. */ - async subscribe( - decoders: Decoder[], + async subscribe( + decoders: Decoder[], callback: FilterCallback, opts?: FilterSubscriptionOpts ): Promise { @@ -217,7 +217,9 @@ export class WakuFilter { this.subscriptions.delete(requestId); } - private addDecoders(decoders: Map>): void { + private addDecoders( + decoders: Map>> + ): void { decoders.forEach((decoders, contentTopic) => { const currDecs = this.decoders.get(contentTopic); if (!currDecs) { @@ -228,7 +230,9 @@ export class WakuFilter { }); } - private deleteDecoders(decoders: Map>): void { + private deleteDecoders( + decoders: Map>> + ): void { decoders.forEach((decoders, contentTopic) => { const currDecs = this.decoders.get(contentTopic); if (currDecs) { diff --git a/src/lib/waku_message/version_0.ts b/src/lib/waku_message/version_0.ts index 8ba75a0713..d316c3ceb1 100644 --- a/src/lib/waku_message/version_0.ts +++ b/src/lib/waku_message/version_0.ts @@ -75,7 +75,7 @@ export class EncoderV0 implements Encoder { } } -export class DecoderV0 implements Decoder { +export class DecoderV0 implements Decoder { constructor(public contentTopic: string) {} decodeProto(bytes: Uint8Array): Promise { @@ -84,7 +84,7 @@ export class DecoderV0 implements Decoder { return Promise.resolve(protoMessage); } - async decode(proto: ProtoMessage): Promise { + async decode(proto: ProtoMessage): Promise { // https://github.com/status-im/js-waku/issues/921 if (proto.version === undefined) { proto.version = 0; diff --git a/src/lib/waku_message/version_1.ts b/src/lib/waku_message/version_1.ts index 99b8e2e1cb..f0af9a7223 100644 --- a/src/lib/waku_message/version_1.ts +++ b/src/lib/waku_message/version_1.ts @@ -110,7 +110,7 @@ export class SymEncoder implements Encoder { } } -export class AsymDecoder extends DecoderV0 implements Decoder { +export class AsymDecoder extends DecoderV0 implements Decoder { constructor(contentTopic: string, private privateKey: Uint8Array) { super(contentTopic); } @@ -166,7 +166,7 @@ export class AsymDecoder extends DecoderV0 implements Decoder { } } -export class SymDecoder extends DecoderV0 implements Decoder { +export class SymDecoder extends DecoderV0 implements Decoder { constructor(contentTopic: string, private symKey: Uint8Array) { super(contentTopic); } diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index ea3045e58c..7ad8df7591 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -381,9 +381,7 @@ describe("Waku Relay [node only]", () => { const messageText = "Here is another message."; const receivedMsgPromise: Promise = new Promise((resolve) => { - waku.relay.addObserver(TestDecoder, (msg) => - resolve(msg as unknown as MessageV0) - ); + waku.relay.addObserver(TestDecoder, (msg) => resolve(msg)); }); await nwaku.sendMessage( diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index d132e7e17b..913672f879 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -20,7 +20,12 @@ import * as constants from "./constants"; const log = debug("waku:relay"); -export type Callback = (msg: Message) => void; +export type Callback = (msg: T) => void; + +export type Observer = { + decoder: Decoder; + callback: Callback; +}; export type CreateOptions = { /** @@ -53,7 +58,7 @@ export class WakuRelay extends GossipSub { * observers called when receiving new message. * Observers under key `""` are always called. */ - public observers: Map>; + public observers: Map>>; constructor(options?: Partial) { options = Object.assign(options ?? {}, { @@ -101,7 +106,10 @@ export class WakuRelay extends GossipSub { * * @returns Function to delete the observer */ - addObserver(decoder: Decoder, callback: Callback): () => void { + addObserver( + decoder: Decoder, + callback: Callback + ): () => void { const observer = { decoder, callback, diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 158601fdf3..8874942c45 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -103,8 +103,8 @@ export class WakuStore { * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryOrderedCallback( - decoder: Decoder, + async queryOrderedCallback( + decoder: Decoder, callback: (message: Message) => Promise | boolean | void, options?: QueryOptions ): Promise { @@ -151,8 +151,8 @@ export class WakuStore { * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryCallbackOnPromise( - decoder: Decoder, + async queryCallbackOnPromise( + decoder: Decoder, callback: ( message: Promise ) => Promise | boolean | void, @@ -188,8 +188,8 @@ export class WakuStore { * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async *queryGenerator( - decoder: Decoder, + async *queryGenerator( + decoder: Decoder, options?: QueryOptions ): AsyncGenerator[]> { let startTime, endTime; @@ -256,11 +256,11 @@ export class WakuStore { } } -async function* paginate( +async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoder: Decoder + decoder: Decoder ): AsyncGenerator[]> { let cursor = undefined; while (true) { From c0c4965e289f1ac6000b17179607500b7a05a8e4 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 19 Sep 2022 16:20:33 +1000 Subject: [PATCH 3/8] feat: use a content topic only decoder for first pass decoding --- package.json | 4 ++ rollup.config.js | 2 + src/index.ts | 1 + src/lib/waku_message/topic_only_message.ts | 28 +++++++++ src/lib/waku_relay/index.node.spec.ts | 4 +- src/lib/waku_relay/index.ts | 30 ++++++---- src/proto/message_topic_only.ts | 67 ++++++++++++++++++++++ src/proto/topic_only_message.proto | 5 ++ src/proto/topic_only_message.ts | 67 ++++++++++++++++++++++ typedoc.json | 3 +- 10 files changed, 197 insertions(+), 14 deletions(-) create mode 100644 src/lib/waku_message/topic_only_message.ts create mode 100644 src/proto/message_topic_only.ts create mode 100644 src/proto/topic_only_message.proto create mode 100644 src/proto/topic_only_message.ts diff --git a/package.json b/package.json index 5c5b5b20fa..437ada0485 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,10 @@ "./lib/waku_message/version_1": { "types": "./dist/lib/waku_message/version_1.d.ts", "import": "./dist/lib/waku_message/version_1.js" + }, + "./lib/waku_message/topic_only_message": { + "types": "./dist/lib/waku_message/topic_only_message.d.ts", + "import": "./dist/lib/waku_message/topic_only_message.js" } }, "typesVersions": { diff --git a/rollup.config.js b/rollup.config.js index cb58ac9f1c..c4867c7c61 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -12,6 +12,8 @@ export default { "lib/wait_for_remote_peer": "dist/lib/wait_for_remote_peer.js", "lib/waku_message/version_0": "dist/lib/waku_message/version_0.js", "lib/waku_message/version_1": "dist/lib/waku_message/version_1.js", + "lib/waku_message/topic_only_message": + "dist/lib/waku_message/topic_only_message.js", }, output: { dir: "bundle", diff --git a/src/index.ts b/src/index.ts index b1f3f85403..c6f2448a8e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,7 @@ export * as enr from "./lib/enr"; export * as utils from "./lib/utils"; export * as proto_message from "./proto/message"; +export * as proto_topic_only_message from "./proto/topic_only_message"; export * as waku from "./lib/waku"; export { WakuNode, Protocols } from "./lib/waku"; diff --git a/src/lib/waku_message/topic_only_message.ts b/src/lib/waku_message/topic_only_message.ts new file mode 100644 index 0000000000..682c732ef9 --- /dev/null +++ b/src/lib/waku_message/topic_only_message.ts @@ -0,0 +1,28 @@ +import debug from "debug"; + +import * as proto from "../../proto/topic_only_message"; +import type { Decoder, Message, ProtoMessage } from "../interfaces"; + +const log = debug("waku:message:topic-only"); + +export class TopicOnlyMessage implements Message { + constructor(private proto: proto.TopicOnlyMessage) {} + + get contentTopic(): string { + return this.proto.contentTopic ?? ""; + } +} + +export class TopicOnlyDecoder implements Decoder { + public contentTopic = ""; + + decodeProto(bytes: Uint8Array): Promise { + const protoMessage = proto.TopicOnlyMessage.decode(bytes); + log("Message decoded", protoMessage); + return Promise.resolve(protoMessage); + } + + async decode(proto: ProtoMessage): Promise { + return new TopicOnlyMessage(proto); + } +} diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index 7ad8df7591..89d63281a4 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -160,7 +160,9 @@ describe("Waku Relay [node only]", () => { payload: utf8ToBytes(fooMessageText), }); - await delay(200); + while (!fooMessages.length && !barMessages.length) { + await delay(100); + } expect(fooMessages[0].contentTopic).to.eq(fooContentTopic); expect(bytesToUtf8(fooMessages[0].payload!)).to.eq(fooMessageText); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 913672f879..f9eb04fc1a 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -14,7 +14,7 @@ import debug from "debug"; import { DefaultPubSubTopic } from "../constants"; import { Decoder, Encoder, Message } from "../interfaces"; import { pushOrInitMapSet } from "../push_or_init_map"; -import { DecoderV0 } from "../waku_message/version_0"; +import { TopicOnlyDecoder } from "../waku_message/topic_only_message"; import * as constants from "./constants"; @@ -52,6 +52,7 @@ export type CreateOptions = { */ export class WakuRelay extends GossipSub { pubSubTopic: string; + defaultDecoder: Decoder; public static multicodec: string = constants.RelayCodecs[0]; /** @@ -72,6 +73,9 @@ export class WakuRelay extends GossipSub { this.observers = new Map(); this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; + + // TODO: User might want to decide what decoder should be used (e.g. for RLN) + this.defaultDecoder = new TopicOnlyDecoder(); } /** @@ -136,30 +140,32 @@ export class WakuRelay extends GossipSub { if (event.detail.msg.topic !== pubSubTopic) return; log(`Message received on ${pubSubTopic}`); - const decoderV0 = new DecoderV0(""); - // TODO: User might want to decide what decoder should be used (e.g. for RLN) - const protoMsg = await decoderV0.decodeProto(event.detail.msg.data); - if (!protoMsg) { - return; - } - const contentTopic = protoMsg.contentTopic; - - if (typeof contentTopic === "undefined") { + const topicOnlyMsg = await this.defaultDecoder.decodeProto( + event.detail.msg.data + ); + if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { log("Message does not have a content topic, skipping"); return; } - const observers = this.observers.get(contentTopic); + const observers = this.observers.get(topicOnlyMsg.contentTopic); if (!observers) { return; } await Promise.all( Array.from(observers).map(async ({ decoder, callback }) => { + const protoMsg = await decoder.decodeProto(event.detail.msg.data); + if (!protoMsg) { + log( + "Internal error: message previously decoded failed on 2nd pass." + ); + return; + } const msg = await decoder.decode(protoMsg); if (msg) { callback(msg); } else { - log("Failed to decode messages on", contentTopic); + log("Failed to decode messages on", topicOnlyMsg.contentTopic); } }) ); diff --git a/src/proto/message_topic_only.ts b/src/proto/message_topic_only.ts new file mode 100644 index 0000000000..9c7b505070 --- /dev/null +++ b/src/proto/message_topic_only.ts @@ -0,0 +1,67 @@ +/* eslint-disable import/export */ +/* eslint-disable @typescript-eslint/no-namespace */ + +import { encodeMessage, decodeMessage, message } from "protons-runtime"; +import type { Uint8ArrayList } from "uint8arraylist"; +import type { Codec } from "protons-runtime"; + +export interface MessageTopicOnly { + contentTopic?: string; +} + +export namespace MessageTopicOnly { + let _codec: Codec; + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message( + (obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork(); + } + + if (obj.contentTopic != null) { + writer.uint32(18); + writer.string(obj.contentTopic); + } + + if (opts.lengthDelimited !== false) { + writer.ldelim(); + } + }, + (reader, length) => { + const obj: any = {}; + + const end = length == null ? reader.len : reader.pos + length; + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 2: + obj.contentTopic = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + + return obj; + } + ); + } + + return _codec; + }; + + export const encode = (obj: MessageTopicOnly): Uint8Array => { + return encodeMessage(obj, MessageTopicOnly.codec()); + }; + + export const decode = ( + buf: Uint8Array | Uint8ArrayList + ): MessageTopicOnly => { + return decodeMessage(buf, MessageTopicOnly.codec()); + }; +} diff --git a/src/proto/topic_only_message.proto b/src/proto/topic_only_message.proto new file mode 100644 index 0000000000..6729bf39b9 --- /dev/null +++ b/src/proto/topic_only_message.proto @@ -0,0 +1,5 @@ +syntax = "proto3"; + +message TopicOnlyMessage { + optional string content_topic = 2; +} diff --git a/src/proto/topic_only_message.ts b/src/proto/topic_only_message.ts new file mode 100644 index 0000000000..a17ed89a36 --- /dev/null +++ b/src/proto/topic_only_message.ts @@ -0,0 +1,67 @@ +/* eslint-disable import/export */ +/* eslint-disable @typescript-eslint/no-namespace */ + +import { encodeMessage, decodeMessage, message } from "protons-runtime"; +import type { Uint8ArrayList } from "uint8arraylist"; +import type { Codec } from "protons-runtime"; + +export interface TopicOnlyMessage { + contentTopic?: string; +} + +export namespace TopicOnlyMessage { + let _codec: Codec; + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message( + (obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork(); + } + + if (obj.contentTopic != null) { + writer.uint32(18); + writer.string(obj.contentTopic); + } + + if (opts.lengthDelimited !== false) { + writer.ldelim(); + } + }, + (reader, length) => { + const obj: any = {}; + + const end = length == null ? reader.len : reader.pos + length; + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 2: + obj.contentTopic = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + + return obj; + } + ); + } + + return _codec; + }; + + export const encode = (obj: TopicOnlyMessage): Uint8Array => { + return encodeMessage(obj, TopicOnlyMessage.codec()); + }; + + export const decode = ( + buf: Uint8Array | Uint8ArrayList + ): TopicOnlyMessage => { + return decodeMessage(buf, TopicOnlyMessage.codec()); + }; +} diff --git a/typedoc.json b/typedoc.json index a8daf63237..31019f4fff 100644 --- a/typedoc.json +++ b/typedoc.json @@ -8,7 +8,8 @@ "./src/lib/predefined_bootstrap_nodes.ts", "./src/lib/wait_for_remote_peer.ts", "./src/lib/waku_message/version_0.ts", - "./src/lib/waku_message/version_1.ts" + "./src/lib/waku_message/version_1.ts", + "./src/lib/waku_message/topic_only_message.ts" ], "out": "build/docs", "exclude": ["**/*.spec.ts"], From 8679adcf80b78fd9b4ed03ed87ab65610c4dfb46 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 19 Sep 2022 16:33:07 +1000 Subject: [PATCH 4/8] feat: enable store queries with multiple content topics and decoders --- src/lib/waku_store/index.node.spec.ts | 53 +++++++++------------- src/lib/waku_store/index.ts | 65 ++++++++++++++++++++------- 2 files changed, 69 insertions(+), 49 deletions(-) diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 8156d4469c..ecbec36309 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -72,7 +72,7 @@ describe("Waku Store", () => { const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -103,7 +103,7 @@ describe("Waku Store", () => { const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -142,12 +142,15 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: Message[] = []; - await waku.store.queryCallbackOnPromise(TestDecoder, async (msgPromise) => { - const msg = await msgPromise; - if (msg) { - messages.push(msg); + await waku.store.queryCallbackOnPromise( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } } - }); + ); expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { @@ -182,7 +185,7 @@ describe("Waku Store", () => { const desiredMsgs = 14; const messages: Message[] = []; await waku.store.queryCallbackOnPromise( - TestDecoder, + [TestDecoder], async (msgPromise) => { const msg = await msgPromise; if (msg) { @@ -220,7 +223,7 @@ describe("Waku Store", () => { const messages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], async (msg) => { messages.push(msg); }, @@ -263,7 +266,7 @@ describe("Waku Store", () => { let messages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], async (msg) => { messages.push(msg); }, @@ -361,25 +364,11 @@ describe("Waku Store", () => { const messages: Message[] = []; log("Retrieve messages from store"); - for await (const msgPromises of waku2.store.queryGenerator(asymDecoder)) { - for (const promise of msgPromises) { - const msg = await promise; - if (msg) { - messages.push(msg); - } - } - } - - for await (const msgPromises of waku2.store.queryGenerator(symDecoder)) { - for (const promise of msgPromises) { - const msg = await promise; - if (msg) { - messages.push(msg); - } - } - } - - for await (const msgPromises of waku2.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku2.store.queryGenerator([ + asymDecoder, + symDecoder, + TestDecoder, + ])) { for (const promise of msgPromises) { const msg = await promise; if (msg) { @@ -443,7 +432,7 @@ describe("Waku Store", () => { const firstMessages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], (msg) => { if (msg) { firstMessages.push(msg); @@ -457,7 +446,7 @@ describe("Waku Store", () => { const bothMessages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], async (msg) => { bothMessages.push(msg); }, @@ -524,7 +513,7 @@ describe("Waku Store, custom pubsub topic", () => { const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 8874942c45..5f51f63e93 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -100,16 +100,17 @@ export class WakuStore { * If strong ordering is needed, you may need to handle this at application level * and set your own timestamps too (the WakuMessage timestamps are not certified). * - * @throws If not able to reach a Waku Store peer to query - * or if an error is encountered when processing the reply. + * @throws If not able to reach a Waku Store peer to query, + * or if an error is encountered when processing the reply, + * or if two decoders with the same content topic are passed. */ async queryOrderedCallback( - decoder: Decoder, + decoders: Decoder[], callback: (message: Message) => Promise | boolean | void, options?: QueryOptions ): Promise { const abort = false; - for await (const promises of this.queryGenerator(decoder, options)) { + for await (const promises of this.queryGenerator(decoders, options)) { if (abort) break; let messages = await Promise.all(promises); @@ -148,11 +149,12 @@ export class WakuStore { * break the order as it may rely on the browser decryption API, which in turn, * may have a different speed depending on the type of decryption. * - * @throws If not able to reach a Waku Store peer to query - * or if an error is encountered when processing the reply. + * @throws If not able to reach a Waku Store peer to query, + * or if an error is encountered when processing the reply, + * or if two decoders with the same content topic are passed. */ async queryCallbackOnPromise( - decoder: Decoder, + decoders: Decoder[], callback: ( message: Promise ) => Promise | boolean | void, @@ -160,7 +162,7 @@ export class WakuStore { ): Promise { let abort = false; let promises: Promise[] = []; - for await (const page of this.queryGenerator(decoder, options)) { + for await (const page of this.queryGenerator(decoders, options)) { const _promises = page.map(async (msg) => { if (!abort) { abort = Boolean(await callback(msg)); @@ -185,11 +187,12 @@ export class WakuStore { * * However, there is no way to guarantee the behavior of the remote node. * - * @throws If not able to reach a Waku Store peer to query - * or if an error is encountered when processing the reply. + * @throws If not able to reach a Waku Store peer to query, + * or if an error is encountered when processing the reply, + * or if two decoders with the same content topic are passed. */ async *queryGenerator( - decoder: Decoder, + decoders: Decoder[], options?: QueryOptions ): AsyncGenerator[]> { let startTime, endTime; @@ -199,7 +202,17 @@ export class WakuStore { endTime = options.timeFilter.endTime; } - const contentTopic = decoder.contentTopic; + const decodersAsMap = new Map(); + decoders.forEach((dec) => { + if (decodersAsMap.has(dec.contentTopic)) { + throw new Error( + "API does not support different decoder per content topic" + ); + } + decodersAsMap.set(dec.contentTopic, dec); + }); + + const contentTopics = decoders.map((dec) => dec.contentTopic); const queryOpts = Object.assign( { @@ -208,7 +221,7 @@ export class WakuStore { pageSize: DefaultPageSize, }, options, - { contentTopics: [contentTopic], startTime, endTime } + { contentTopics, startTime, endTime } ); log("Querying history with the following options", { @@ -236,7 +249,7 @@ export class WakuStore { connection, protocol, queryOpts, - decoder + decodersAsMap )) { yield messages; } @@ -260,8 +273,17 @@ async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoder: Decoder -): AsyncGenerator[]> { + decoders: Map> +): AsyncGenerator[]> { + if ( + queryOpts.contentTopics.toString() !== + Array.from(decoders.keys()).toString() + ) { + throw new Error( + "Internal error, the decoders should match the query's content topics" + ); + } + let cursor = undefined; while (true) { queryOpts = Object.assign(queryOpts, { cursor }); @@ -314,7 +336,16 @@ async function* paginate( log(`${response.messages.length} messages retrieved from store`); - yield response.messages.map((protoMsg) => decoder.decode(protoMsg)); + yield response.messages.map((protoMsg) => { + const contentTopic = protoMsg.contentTopic; + if (typeof contentTopic !== "undefined") { + const decoder = decoders.get(contentTopic); + if (decoder) { + return decoder.decode(protoMsg); + } + } + return Promise.resolve(undefined); + }); cursor = response.pagingInfo?.cursor; if (typeof cursor === "undefined") { From ae46640ba8920bea9990576081bf9dba8af26849 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 20 Sep 2022 12:41:37 +1000 Subject: [PATCH 5/8] test: group encoder/decoder declaration --- src/lib/waku_filter/index.node.spec.ts | 26 +++++++++------------- src/lib/waku_light_push/index.node.spec.ts | 7 +++--- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts index eea4a72716..205335de36 100644 --- a/src/lib/waku_filter/index.node.spec.ts +++ b/src/lib/waku_filter/index.node.spec.ts @@ -13,6 +13,8 @@ import { DecoderV0, EncoderV0 } from "../waku_message/version_0"; const log = debug("waku:test"); const TestContentTopic = "/test/1/waku-filter"; +const TestEncoder = new EncoderV0(TestContentTopic); +const TestDecoder = new DecoderV0(TestContentTopic); describe("Waku Filter", () => { let waku: WakuFull; @@ -50,16 +52,13 @@ describe("Waku Filter", () => { expect(bytesToUtf8(msg.payload!)).to.eq(messageText); }; - const decoder = new DecoderV0(TestContentTopic); - - await waku.filter.subscribe([decoder], callback); + await waku.filter.subscribe([TestDecoder], callback); // As the filter protocol does not cater for an ack of subscription // we cannot know whether the subscription happened. Something we want to // correct in future versions of the protocol. await delay(200); - const encoder = new EncoderV0(TestContentTopic); - await waku.lightPush.push(encoder, message); + await waku.lightPush.push(TestEncoder, message); while (messageCount === 0) { await delay(250); } @@ -74,15 +73,13 @@ describe("Waku Filter", () => { messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); }; - const decoder = new DecoderV0(TestContentTopic); - await waku.filter.subscribe([decoder], callback); + await waku.filter.subscribe([TestDecoder], callback); await delay(200); - const encoder = new EncoderV0(TestContentTopic); - await waku.lightPush.push(encoder, { + await waku.lightPush.push(TestEncoder, { payload: utf8ToBytes("Filtering works!"), }); - await waku.lightPush.push(encoder, { + await waku.lightPush.push(TestEncoder, { payload: utf8ToBytes("Filtering still works!"), }); while (messageCount < 2) { @@ -96,19 +93,16 @@ describe("Waku Filter", () => { const callback = (): void => { messageCount++; }; - const decoder = new DecoderV0(TestContentTopic); - const unsubscribe = await waku.filter.subscribe([decoder], callback); - - const encoder = new EncoderV0(TestContentTopic); + const unsubscribe = await waku.filter.subscribe([TestDecoder], callback); await delay(200); - await waku.lightPush.push(encoder, { + await waku.lightPush.push(TestEncoder, { payload: utf8ToBytes("This should be received"), }); await delay(100); await unsubscribe(); await delay(200); - await waku.lightPush.push(encoder, { + await waku.lightPush.push(TestEncoder, { payload: utf8ToBytes("This should not be received"), }); await delay(100); diff --git a/src/lib/waku_light_push/index.node.spec.ts b/src/lib/waku_light_push/index.node.spec.ts index 9cb96e6f6a..df7cf29f61 100644 --- a/src/lib/waku_light_push/index.node.spec.ts +++ b/src/lib/waku_light_push/index.node.spec.ts @@ -18,6 +18,7 @@ import { EncoderV0 } from "../waku_message/version_0"; const log = debug("waku:test:lightpush"); const TestContentTopic = "/test/1/waku-light-push/utf8"; +const TestEncoder = new EncoderV0(TestContentTopic); describe("Waku Light Push [node only]", () => { let waku: WakuFull; @@ -42,9 +43,8 @@ describe("Waku Light Push [node only]", () => { await waitForRemotePeer(waku, [Protocols.LightPush]); const messageText = "Light Push works!"; - const encoder = new EncoderV0(TestContentTopic); - const pushResponse = await waku.lightPush.push(encoder, { + const pushResponse = await waku.lightPush.push(TestEncoder, { payload: utf8ToBytes(messageText), }); expect(pushResponse?.isSuccess).to.be.true; @@ -79,11 +79,10 @@ describe("Waku Light Push [node only]", () => { const nimPeerId = await nwaku.getPeerId(); const messageText = "Light Push works!"; - const encoder = new EncoderV0(TestContentTopic); log("Send message via lightpush"); const pushResponse = await waku.lightPush.push( - encoder, + TestEncoder, { payload: utf8ToBytes(messageText) }, { peerId: nimPeerId, From 49e16de39655ecad0e6a0638908e9b9cb8bd99c4 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 20 Sep 2022 14:18:32 +1000 Subject: [PATCH 6/8] feat: `WakuLightPush.push` and `WakuRelay.send` returns `SendResult` with the list of recipients. --- CHANGELOG.md | 1 + src/lib/interfaces.ts | 4 ++++ src/lib/waku_light_push/index.node.spec.ts | 4 ++-- src/lib/waku_light_push/index.ts | 17 +++++++++++------ src/lib/waku_relay/index.ts | 8 ++------ 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 966710ecce..936a23b11a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `WakuRelay.addObserver` now returns a function to delete the observer. +- `WakuLightPush.push` and `WakuRelay.send` returns `SendResult` with the list of recipients. ### Changed diff --git a/src/lib/interfaces.ts b/src/lib/interfaces.ts index 1b210be585..da3e9ee727 100644 --- a/src/lib/interfaces.ts +++ b/src/lib/interfaces.ts @@ -75,3 +75,7 @@ export interface Decoder { decodeProto: (bytes: Uint8Array) => Promise; decode: (proto: ProtoMessage) => Promise; } + +export interface SendResult { + recipients: PeerId[]; +} diff --git a/src/lib/waku_light_push/index.node.spec.ts b/src/lib/waku_light_push/index.node.spec.ts index df7cf29f61..6a9eeb92f6 100644 --- a/src/lib/waku_light_push/index.node.spec.ts +++ b/src/lib/waku_light_push/index.node.spec.ts @@ -47,7 +47,7 @@ describe("Waku Light Push [node only]", () => { const pushResponse = await waku.lightPush.push(TestEncoder, { payload: utf8ToBytes(messageText), }); - expect(pushResponse?.isSuccess).to.be.true; + expect(pushResponse.recipients.length).to.eq(1); let msgs: MessageRpcResponse[] = []; @@ -90,7 +90,7 @@ describe("Waku Light Push [node only]", () => { } ); log("Ack received", pushResponse); - expect(pushResponse?.isSuccess).to.be.true; + expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); let msgs: MessageRpcResponse[] = []; diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 93ae0a6157..3681dfbf90 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -9,7 +9,7 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; -import { Encoder, Message } from "../interfaces"; +import { Encoder, Message, SendResult } from "../interfaces"; import { selectConnection } from "../select_connection"; import { getPeersForProtocol, @@ -55,7 +55,7 @@ export class WakuLightPush { encoder: Encoder, message: Message, opts?: PushOptions - ): Promise { + ): Promise { const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; const res = await selectPeerForProtocol( @@ -75,11 +75,14 @@ export class WakuLightPush { if (!connection) throw "Failed to get a connection to the peer"; const stream = await connection.newStream(LightPushCodec); + + const recipients: PeerId[] = []; + try { const protoMessage = await encoder.encodeProto(message); if (!protoMessage) { log("Failed to encode to protoMessage, aborting push"); - return; + return { recipients }; } const query = PushRPC.createRequest(protoMessage, pubSubTopic); const res = await pipe( @@ -99,17 +102,19 @@ export class WakuLightPush { if (!response) { log("No response in PushRPC"); - return; + return { recipients }; } - return response; + if (response.isSuccess) { + recipients.push(peer.id); + } } catch (err) { log("Failed to decode push reply", err); } } catch (err) { log("Failed to send waku light push request", err); } - return; + return { recipients }; } /** diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index f9eb04fc1a..6dd7f5c729 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -8,11 +8,10 @@ import { TopicStr, } from "@chainsafe/libp2p-gossipsub/dist/src/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; -import { PublishResult } from "@libp2p/interface-pubsub"; import debug from "debug"; import { DefaultPubSubTopic } from "../constants"; -import { Decoder, Encoder, Message } from "../interfaces"; +import { Decoder, Encoder, Message, SendResult } from "../interfaces"; import { pushOrInitMapSet } from "../push_or_init_map"; import { TopicOnlyDecoder } from "../waku_message/topic_only_message"; @@ -93,10 +92,7 @@ export class WakuRelay extends GossipSub { /** * Send Waku message. */ - public async send( - encoder: Encoder, - message: Message - ): Promise { + public async send(encoder: Encoder, message: Message): Promise { const msg = await encoder.encode(message); if (!msg) { log("Failed to encode message, aborting publish"); From dc639370bcbb4ecfd00e7881709b5b4f74cd5ee5 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 20 Sep 2022 14:47:58 +1000 Subject: [PATCH 7/8] chore: ensure size-limit conf file is formatted --- package.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/package.json b/package.json index 437ada0485..40f3d1bb90 100644 --- a/package.json +++ b/package.json @@ -201,6 +201,9 @@ ], "*.{ts,json,conf*.*js}": [ "prettier --write" + ], + "./*.*js": [ + "prettier --write" ] } } From 50348a6d9138f4694280714a43fc292fd8278c39 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 20 Sep 2022 14:48:28 +1000 Subject: [PATCH 8/8] chore: remove unused buf files --- buf.gen.yaml | 7 ------- buf.yaml | 5 ----- 2 files changed, 12 deletions(-) delete mode 100644 buf.gen.yaml delete mode 100644 buf.yaml diff --git a/buf.gen.yaml b/buf.gen.yaml deleted file mode 100644 index 4bbd3ae240..0000000000 --- a/buf.gen.yaml +++ /dev/null @@ -1,7 +0,0 @@ -version: v1beta1 - -plugins: - - name: ts_proto - out: ./src/proto - strategy: all - opt: grpc_js,esModuleInterop=true,forceLong=long diff --git a/buf.yaml b/buf.yaml deleted file mode 100644 index c7b2d892ef..0000000000 --- a/buf.yaml +++ /dev/null @@ -1,5 +0,0 @@ -version: v1beta1 - -build: - roots: - - ./proto