feat: use a content topic only decoder for first pass decoding

This commit is contained in:
fryorcraken.eth 2022-09-19 16:20:33 +10:00
parent 52005f8963
commit c0c4965e28
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
10 changed files with 197 additions and 14 deletions

View File

@ -39,6 +39,10 @@
"./lib/waku_message/version_1": {
"types": "./dist/lib/waku_message/version_1.d.ts",
"import": "./dist/lib/waku_message/version_1.js"
},
"./lib/waku_message/topic_only_message": {
"types": "./dist/lib/waku_message/topic_only_message.d.ts",
"import": "./dist/lib/waku_message/topic_only_message.js"
}
},
"typesVersions": {

View File

@ -12,6 +12,8 @@ export default {
"lib/wait_for_remote_peer": "dist/lib/wait_for_remote_peer.js",
"lib/waku_message/version_0": "dist/lib/waku_message/version_0.js",
"lib/waku_message/version_1": "dist/lib/waku_message/version_1.js",
"lib/waku_message/topic_only_message":
"dist/lib/waku_message/topic_only_message.js",
},
output: {
dir: "bundle",

View File

@ -11,6 +11,7 @@ export * as enr from "./lib/enr";
export * as utils from "./lib/utils";
export * as proto_message from "./proto/message";
export * as proto_topic_only_message from "./proto/topic_only_message";
export * as waku from "./lib/waku";
export { WakuNode, Protocols } from "./lib/waku";

View File

@ -0,0 +1,28 @@
import debug from "debug";
import * as proto from "../../proto/topic_only_message";
import type { Decoder, Message, ProtoMessage } from "../interfaces";
const log = debug("waku:message:topic-only");
export class TopicOnlyMessage implements Message {
constructor(private proto: proto.TopicOnlyMessage) {}
get contentTopic(): string {
return this.proto.contentTopic ?? "";
}
}
export class TopicOnlyDecoder implements Decoder<TopicOnlyMessage> {
public contentTopic = "";
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
const protoMessage = proto.TopicOnlyMessage.decode(bytes);
log("Message decoded", protoMessage);
return Promise.resolve(protoMessage);
}
async decode(proto: ProtoMessage): Promise<TopicOnlyMessage | undefined> {
return new TopicOnlyMessage(proto);
}
}

View File

@ -160,7 +160,9 @@ describe("Waku Relay [node only]", () => {
payload: utf8ToBytes(fooMessageText),
});
await delay(200);
while (!fooMessages.length && !barMessages.length) {
await delay(100);
}
expect(fooMessages[0].contentTopic).to.eq(fooContentTopic);
expect(bytesToUtf8(fooMessages[0].payload!)).to.eq(fooMessageText);

View File

@ -14,7 +14,7 @@ import debug from "debug";
import { DefaultPubSubTopic } from "../constants";
import { Decoder, Encoder, Message } from "../interfaces";
import { pushOrInitMapSet } from "../push_or_init_map";
import { DecoderV0 } from "../waku_message/version_0";
import { TopicOnlyDecoder } from "../waku_message/topic_only_message";
import * as constants from "./constants";
@ -52,6 +52,7 @@ export type CreateOptions = {
*/
export class WakuRelay extends GossipSub {
pubSubTopic: string;
defaultDecoder: Decoder<Message>;
public static multicodec: string = constants.RelayCodecs[0];
/**
@ -72,6 +73,9 @@ export class WakuRelay extends GossipSub {
this.observers = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder();
}
/**
@ -136,30 +140,32 @@ export class WakuRelay extends GossipSub {
if (event.detail.msg.topic !== pubSubTopic) return;
log(`Message received on ${pubSubTopic}`);
const decoderV0 = new DecoderV0("");
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
const protoMsg = await decoderV0.decodeProto(event.detail.msg.data);
if (!protoMsg) {
return;
}
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic === "undefined") {
const topicOnlyMsg = await this.defaultDecoder.decodeProto(
event.detail.msg.data
);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log("Message does not have a content topic, skipping");
return;
}
const observers = this.observers.get(contentTopic);
const observers = this.observers.get(topicOnlyMsg.contentTopic);
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => {
const protoMsg = await decoder.decodeProto(event.detail.msg.data);
if (!protoMsg) {
log(
"Internal error: message previously decoded failed on 2nd pass."
);
return;
}
const msg = await decoder.decode(protoMsg);
if (msg) {
callback(msg);
} else {
log("Failed to decode messages on", contentTopic);
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
}
})
);

View File

@ -0,0 +1,67 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message } from "protons-runtime";
import type { Uint8ArrayList } from "uint8arraylist";
import type { Codec } from "protons-runtime";
export interface MessageTopicOnly {
contentTopic?: string;
}
export namespace MessageTopicOnly {
let _codec: Codec<MessageTopicOnly>;
export const codec = (): Codec<MessageTopicOnly> => {
if (_codec == null) {
_codec = message<MessageTopicOnly>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.contentTopic != null) {
writer.uint32(18);
writer.string(obj.contentTopic);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 2:
obj.contentTopic = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: MessageTopicOnly): Uint8Array => {
return encodeMessage(obj, MessageTopicOnly.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): MessageTopicOnly => {
return decodeMessage(buf, MessageTopicOnly.codec());
};
}

View File

@ -0,0 +1,5 @@
syntax = "proto3";
message TopicOnlyMessage {
optional string content_topic = 2;
}

View File

@ -0,0 +1,67 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message } from "protons-runtime";
import type { Uint8ArrayList } from "uint8arraylist";
import type { Codec } from "protons-runtime";
export interface TopicOnlyMessage {
contentTopic?: string;
}
export namespace TopicOnlyMessage {
let _codec: Codec<TopicOnlyMessage>;
export const codec = (): Codec<TopicOnlyMessage> => {
if (_codec == null) {
_codec = message<TopicOnlyMessage>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.contentTopic != null) {
writer.uint32(18);
writer.string(obj.contentTopic);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 2:
obj.contentTopic = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: TopicOnlyMessage): Uint8Array => {
return encodeMessage(obj, TopicOnlyMessage.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): TopicOnlyMessage => {
return decodeMessage(buf, TopicOnlyMessage.codec());
};
}

View File

@ -8,7 +8,8 @@
"./src/lib/predefined_bootstrap_nodes.ts",
"./src/lib/wait_for_remote_peer.ts",
"./src/lib/waku_message/version_0.ts",
"./src/lib/waku_message/version_1.ts"
"./src/lib/waku_message/version_1.ts",
"./src/lib/waku_message/topic_only_message.ts"
],
"out": "build/docs",
"exclude": ["**/*.spec.ts"],