diff --git a/package.json b/package.json index 5c5b5b20fa..437ada0485 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,10 @@ "./lib/waku_message/version_1": { "types": "./dist/lib/waku_message/version_1.d.ts", "import": "./dist/lib/waku_message/version_1.js" + }, + "./lib/waku_message/topic_only_message": { + "types": "./dist/lib/waku_message/topic_only_message.d.ts", + "import": "./dist/lib/waku_message/topic_only_message.js" } }, "typesVersions": { diff --git a/rollup.config.js b/rollup.config.js index cb58ac9f1c..c4867c7c61 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -12,6 +12,8 @@ export default { "lib/wait_for_remote_peer": "dist/lib/wait_for_remote_peer.js", "lib/waku_message/version_0": "dist/lib/waku_message/version_0.js", "lib/waku_message/version_1": "dist/lib/waku_message/version_1.js", + "lib/waku_message/topic_only_message": + "dist/lib/waku_message/topic_only_message.js", }, output: { dir: "bundle", diff --git a/src/index.ts b/src/index.ts index b1f3f85403..c6f2448a8e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,6 +11,7 @@ export * as enr from "./lib/enr"; export * as utils from "./lib/utils"; export * as proto_message from "./proto/message"; +export * as proto_topic_only_message from "./proto/topic_only_message"; export * as waku from "./lib/waku"; export { WakuNode, Protocols } from "./lib/waku"; diff --git a/src/lib/waku_message/topic_only_message.ts b/src/lib/waku_message/topic_only_message.ts new file mode 100644 index 0000000000..682c732ef9 --- /dev/null +++ b/src/lib/waku_message/topic_only_message.ts @@ -0,0 +1,28 @@ +import debug from "debug"; + +import * as proto from "../../proto/topic_only_message"; +import type { Decoder, Message, ProtoMessage } from "../interfaces"; + +const log = debug("waku:message:topic-only"); + +export class TopicOnlyMessage implements Message { + constructor(private proto: proto.TopicOnlyMessage) {} + + get contentTopic(): string { + return this.proto.contentTopic ?? ""; + } +} + +export class TopicOnlyDecoder implements Decoder { + public contentTopic = ""; + + decodeProto(bytes: Uint8Array): Promise { + const protoMessage = proto.TopicOnlyMessage.decode(bytes); + log("Message decoded", protoMessage); + return Promise.resolve(protoMessage); + } + + async decode(proto: ProtoMessage): Promise { + return new TopicOnlyMessage(proto); + } +} diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index 7ad8df7591..89d63281a4 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -160,7 +160,9 @@ describe("Waku Relay [node only]", () => { payload: utf8ToBytes(fooMessageText), }); - await delay(200); + while (!fooMessages.length && !barMessages.length) { + await delay(100); + } expect(fooMessages[0].contentTopic).to.eq(fooContentTopic); expect(bytesToUtf8(fooMessages[0].payload!)).to.eq(fooMessageText); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 913672f879..f9eb04fc1a 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -14,7 +14,7 @@ import debug from "debug"; import { DefaultPubSubTopic } from "../constants"; import { Decoder, Encoder, Message } from "../interfaces"; import { pushOrInitMapSet } from "../push_or_init_map"; -import { DecoderV0 } from "../waku_message/version_0"; +import { TopicOnlyDecoder } from "../waku_message/topic_only_message"; import * as constants from "./constants"; @@ -52,6 +52,7 @@ export type CreateOptions = { */ export class WakuRelay extends GossipSub { pubSubTopic: string; + defaultDecoder: Decoder; public static multicodec: string = constants.RelayCodecs[0]; /** @@ -72,6 +73,9 @@ export class WakuRelay extends GossipSub { this.observers = new Map(); this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; + + // TODO: User might want to decide what decoder should be used (e.g. for RLN) + this.defaultDecoder = new TopicOnlyDecoder(); } /** @@ -136,30 +140,32 @@ export class WakuRelay extends GossipSub { if (event.detail.msg.topic !== pubSubTopic) return; log(`Message received on ${pubSubTopic}`); - const decoderV0 = new DecoderV0(""); - // TODO: User might want to decide what decoder should be used (e.g. for RLN) - const protoMsg = await decoderV0.decodeProto(event.detail.msg.data); - if (!protoMsg) { - return; - } - const contentTopic = protoMsg.contentTopic; - - if (typeof contentTopic === "undefined") { + const topicOnlyMsg = await this.defaultDecoder.decodeProto( + event.detail.msg.data + ); + if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { log("Message does not have a content topic, skipping"); return; } - const observers = this.observers.get(contentTopic); + const observers = this.observers.get(topicOnlyMsg.contentTopic); if (!observers) { return; } await Promise.all( Array.from(observers).map(async ({ decoder, callback }) => { + const protoMsg = await decoder.decodeProto(event.detail.msg.data); + if (!protoMsg) { + log( + "Internal error: message previously decoded failed on 2nd pass." + ); + return; + } const msg = await decoder.decode(protoMsg); if (msg) { callback(msg); } else { - log("Failed to decode messages on", contentTopic); + log("Failed to decode messages on", topicOnlyMsg.contentTopic); } }) ); diff --git a/src/proto/message_topic_only.ts b/src/proto/message_topic_only.ts new file mode 100644 index 0000000000..9c7b505070 --- /dev/null +++ b/src/proto/message_topic_only.ts @@ -0,0 +1,67 @@ +/* eslint-disable import/export */ +/* eslint-disable @typescript-eslint/no-namespace */ + +import { encodeMessage, decodeMessage, message } from "protons-runtime"; +import type { Uint8ArrayList } from "uint8arraylist"; +import type { Codec } from "protons-runtime"; + +export interface MessageTopicOnly { + contentTopic?: string; +} + +export namespace MessageTopicOnly { + let _codec: Codec; + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message( + (obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork(); + } + + if (obj.contentTopic != null) { + writer.uint32(18); + writer.string(obj.contentTopic); + } + + if (opts.lengthDelimited !== false) { + writer.ldelim(); + } + }, + (reader, length) => { + const obj: any = {}; + + const end = length == null ? reader.len : reader.pos + length; + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 2: + obj.contentTopic = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + + return obj; + } + ); + } + + return _codec; + }; + + export const encode = (obj: MessageTopicOnly): Uint8Array => { + return encodeMessage(obj, MessageTopicOnly.codec()); + }; + + export const decode = ( + buf: Uint8Array | Uint8ArrayList + ): MessageTopicOnly => { + return decodeMessage(buf, MessageTopicOnly.codec()); + }; +} diff --git a/src/proto/topic_only_message.proto b/src/proto/topic_only_message.proto new file mode 100644 index 0000000000..6729bf39b9 --- /dev/null +++ b/src/proto/topic_only_message.proto @@ -0,0 +1,5 @@ +syntax = "proto3"; + +message TopicOnlyMessage { + optional string content_topic = 2; +} diff --git a/src/proto/topic_only_message.ts b/src/proto/topic_only_message.ts new file mode 100644 index 0000000000..a17ed89a36 --- /dev/null +++ b/src/proto/topic_only_message.ts @@ -0,0 +1,67 @@ +/* eslint-disable import/export */ +/* eslint-disable @typescript-eslint/no-namespace */ + +import { encodeMessage, decodeMessage, message } from "protons-runtime"; +import type { Uint8ArrayList } from "uint8arraylist"; +import type { Codec } from "protons-runtime"; + +export interface TopicOnlyMessage { + contentTopic?: string; +} + +export namespace TopicOnlyMessage { + let _codec: Codec; + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message( + (obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork(); + } + + if (obj.contentTopic != null) { + writer.uint32(18); + writer.string(obj.contentTopic); + } + + if (opts.lengthDelimited !== false) { + writer.ldelim(); + } + }, + (reader, length) => { + const obj: any = {}; + + const end = length == null ? reader.len : reader.pos + length; + + while (reader.pos < end) { + const tag = reader.uint32(); + + switch (tag >>> 3) { + case 2: + obj.contentTopic = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + + return obj; + } + ); + } + + return _codec; + }; + + export const encode = (obj: TopicOnlyMessage): Uint8Array => { + return encodeMessage(obj, TopicOnlyMessage.codec()); + }; + + export const decode = ( + buf: Uint8Array | Uint8ArrayList + ): TopicOnlyMessage => { + return decodeMessage(buf, TopicOnlyMessage.codec()); + }; +} diff --git a/typedoc.json b/typedoc.json index a8daf63237..31019f4fff 100644 --- a/typedoc.json +++ b/typedoc.json @@ -8,7 +8,8 @@ "./src/lib/predefined_bootstrap_nodes.ts", "./src/lib/wait_for_remote_peer.ts", "./src/lib/waku_message/version_0.ts", - "./src/lib/waku_message/version_1.ts" + "./src/lib/waku_message/version_1.ts", + "./src/lib/waku_message/topic_only_message.ts" ], "out": "build/docs", "exclude": ["**/*.spec.ts"],