diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5507f94509..2782eb82ec 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -34,6 +34,7 @@ export type RequestID = string; type Subscription = { decoders: IDecoder[]; callback: Callback; + pubSubTopic: string; }; /** @@ -108,7 +109,8 @@ class Filter extends BaseProtocol implements IFilter { throw e; } - this.subscriptions.set(requestId, { callback, decoders }); + const subscription: Subscription = { callback, decoders, pubSubTopic }; + this.subscriptions.set(requestId, subscription); return async () => { await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer); @@ -150,7 +152,7 @@ class Filter extends BaseProtocol implements IFilter { log(`No subscription locally registered for request ID ${requestId}`); return; } - const { decoders, callback } = subscription; + const { decoders, callback, pubSubTopic } = subscription; if (!decoders || !decoders.length) { log(`No decoder registered for request ID ${requestId}`); @@ -170,7 +172,10 @@ class Filter extends BaseProtocol implements IFilter { // noinspection ES6MissingAwait decoders.forEach(async (dec: IDecoder) => { if (didDecodeMsg) return; - const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage)); + const decoded = await dec.fromProtoObj( + pubSubTopic, + toProtoMessage(protoMessage) + ); if (!decoded) { log("Not able to decode message"); return; diff --git a/packages/core/src/lib/message/topic_only_message.ts b/packages/core/src/lib/message/topic_only_message.ts index e4f77e6696..6bf90b3db4 100644 --- a/packages/core/src/lib/message/topic_only_message.ts +++ b/packages/core/src/lib/message/topic_only_message.ts @@ -14,7 +14,10 @@ export class TopicOnlyMessage implements IDecodedMessage { public timestamp: undefined; public ephemeral: undefined; - constructor(private proto: ProtoTopicOnlyMessage) {} + constructor( + public pubSubTopic: string, + private proto: ProtoTopicOnlyMessage + ) {} get contentTopic(): string { return this.proto.contentTopic; @@ -38,8 +41,9 @@ export class TopicOnlyDecoder implements IDecoder { } async fromProtoObj( + pubSubTopic: string, proto: IProtoMessage ): Promise { - return new TopicOnlyMessage(proto); + return new TopicOnlyMessage(pubSubTopic, proto); } } diff --git a/packages/core/src/lib/message/version_0.spec.ts b/packages/core/src/lib/message/version_0.spec.ts index 32e0aa3970..a71fc84f21 100644 --- a/packages/core/src/lib/message/version_0.spec.ts +++ b/packages/core/src/lib/message/version_0.spec.ts @@ -4,6 +4,7 @@ import fc from "fast-check"; import { createDecoder, createEncoder, DecodedMessage } from "./version_0.js"; const TestContentTopic = "/test/1/waku-message/utf8"; +const TestPubSubTopic = "/test/pubsub/topic"; describe("Waku Message version 0", function () { it("Round trip binary serialization", async function () { @@ -16,10 +17,12 @@ describe("Waku Message version 0", function () { const decoder = createDecoder(TestContentTopic); const protoResult = await decoder.fromWireToProtoObj(bytes); const result = (await decoder.fromProtoObj( + TestPubSubTopic, protoResult! )) as DecodedMessage; expect(result.contentTopic).to.eq(TestContentTopic); + expect(result.pubSubTopic).to.eq(TestPubSubTopic); expect(result.version).to.eq(0); expect(result.ephemeral).to.be.false; expect(result.payload).to.deep.eq(payload); @@ -39,14 +42,11 @@ describe("Waku Message version 0", function () { const decoder = createDecoder(TestContentTopic); const protoResult = await decoder.fromWireToProtoObj(bytes); const result = (await decoder.fromProtoObj( + TestPubSubTopic, protoResult! )) as DecodedMessage; - expect(result.contentTopic).to.eq(TestContentTopic); - expect(result.version).to.eq(0); expect(result.ephemeral).to.be.true; - expect(result.payload).to.deep.eq(payload); - expect(result.timestamp).to.not.be.undefined; }) ); }); diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index 00287359ed..38bd22fa26 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -17,7 +17,7 @@ export const Version = 0; export { proto }; export class DecodedMessage implements IDecodedMessage { - constructor(protected proto: proto.WakuMessage) {} + constructor(public pubSubTopic: string, protected proto: proto.WakuMessage) {} get ephemeral(): boolean { return Boolean(this.proto.ephemeral); @@ -115,6 +115,7 @@ export class Decoder implements IDecoder { } async fromProtoObj( + pubSubTopic: string, proto: IProtoMessage ): Promise { // https://rfc.vac.dev/spec/14/ @@ -129,7 +130,7 @@ export class Decoder implements IDecoder { return Promise.resolve(undefined); } - return new DecodedMessage(proto); + return new DecodedMessage(pubSubTopic, proto); } } diff --git a/packages/core/src/lib/relay/index.ts b/packages/core/src/lib/relay/index.ts index 5878509327..b588a79bb5 100644 --- a/packages/core/src/lib/relay/index.ts +++ b/packages/core/src/lib/relay/index.ts @@ -122,6 +122,7 @@ class Relay extends GossipSub implements IRelay { } private async processIncomingMessage( + pubSubTopic: string, bytes: Uint8Array ): Promise { const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes); @@ -143,7 +144,7 @@ class Relay extends GossipSub implements IRelay { log("Internal error: message previously decoded failed on 2nd pass."); return; } - const msg = await decoder.fromProtoObj(protoMsg); + const msg = await decoder.fromProtoObj(pubSubTopic, protoMsg); if (msg) { callback(msg); } else { @@ -165,9 +166,10 @@ class Relay extends GossipSub implements IRelay { if (event.detail.msg.topic !== pubSubTopic) return; log(`Message received on ${pubSubTopic}`); - this.processIncomingMessage(event.detail.msg.data).catch((e) => - log("Failed to process incoming message", e) - ); + this.processIncomingMessage( + event.detail.msg.topic, + event.detail.msg.data + ).catch((e) => log("Failed to process incoming message", e)); } ); diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index a1e6007e62..e50190d1ef 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -311,7 +311,10 @@ async function* paginate( if (typeof contentTopic !== "undefined") { const decoder = decoders.get(contentTopic); if (decoder) { - return decoder.fromProtoObj(toProtoMessage(protoMsg)); + return decoder.fromProtoObj( + queryOpts.pubSubTopic, + toProtoMessage(protoMsg) + ); } } return Promise.resolve(undefined); diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 45dd5a571c..d2e52f04f0 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -50,6 +50,7 @@ export interface IEncoder { export interface IDecodedMessage { payload: Uint8Array; contentTopic: string; + pubSubTopic: string; timestamp: Date | undefined; rateLimitProof: IRateLimitProof | undefined; ephemeral: boolean | undefined; @@ -58,5 +59,8 @@ export interface IDecodedMessage { export interface IDecoder { contentTopic: string; fromWireToProtoObj: (bytes: Uint8Array) => Promise; - fromProtoObj: (proto: IProtoMessage) => Promise; + fromProtoObj: ( + pubSubTopic: string, + proto: IProtoMessage + ) => Promise; } diff --git a/packages/message-encryption/src/decoded_message.ts b/packages/message-encryption/src/decoded_message.ts index 6963d2fc8f..72e1626419 100644 --- a/packages/message-encryption/src/decoded_message.ts +++ b/packages/message-encryption/src/decoded_message.ts @@ -11,12 +11,13 @@ export class DecodedMessage private readonly _decodedPayload: Uint8Array; constructor( + pubSubTopic: string, proto: proto.WakuMessage, decodedPayload: Uint8Array, public signature?: Uint8Array, public signaturePublicKey?: Uint8Array ) { - super(proto); + super(pubSubTopic, proto); this._decodedPayload = decodedPayload; } diff --git a/packages/message-encryption/src/ecies.spec.ts b/packages/message-encryption/src/ecies.spec.ts index cab9b9dc2e..fb2f969d99 100644 --- a/packages/message-encryption/src/ecies.spec.ts +++ b/packages/message-encryption/src/ecies.spec.ts @@ -5,6 +5,7 @@ import { getPublicKey } from "./crypto/index.js"; import { createDecoder, createEncoder } from "./ecies.js"; const TestContentTopic = "/test/1/waku-message/utf8"; +const TestPubSubTopic = "/test/pubsub/topic"; describe("Ecies Encryption", function () { it("Round trip binary encryption [ecies, no signature]", async function () { @@ -24,10 +25,14 @@ describe("Ecies Encryption", function () { const decoder = createDecoder(TestContentTopic, privateKey); const protoResult = await decoder.fromWireToProtoObj(bytes!); if (!protoResult) throw "Failed to proto decode"; - const result = await decoder.fromProtoObj(protoResult); + const result = await decoder.fromProtoObj( + TestPubSubTopic, + protoResult + ); if (!result) throw "Failed to decode"; expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.pubSubTopic).to.equal(TestPubSubTopic); expect(result.version).to.equal(1); expect(result?.payload).to.deep.equal(payload); expect(result.signature).to.be.undefined; @@ -59,10 +64,14 @@ describe("Ecies Encryption", function () { const decoder = createDecoder(TestContentTopic, bobPrivateKey); const protoResult = await decoder.fromWireToProtoObj(bytes!); if (!protoResult) throw "Failed to proto decode"; - const result = await decoder.fromProtoObj(protoResult); + const result = await decoder.fromProtoObj( + TestPubSubTopic, + protoResult + ); if (!result) throw "Failed to decode"; expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.pubSubTopic).to.equal(TestPubSubTopic); expect(result.version).to.equal(1); expect(result?.payload).to.deep.equal(payload); expect(result.signature).to.not.be.undefined; diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index e4a5f315fb..c991db9c69 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -95,6 +95,7 @@ class Decoder extends DecoderV0 implements IDecoder { } async fromProtoObj( + pubSubTopic: string, protoMessage: IProtoMessage ): Promise { const cipherPayload = protoMessage.payload; @@ -135,6 +136,7 @@ class Decoder extends DecoderV0 implements IDecoder { log("Message decrypted", protoMessage); return new DecodedMessage( + pubSubTopic, protoMessage, res.payload, res.sig?.signature, diff --git a/packages/message-encryption/src/symmetric.spec.ts b/packages/message-encryption/src/symmetric.spec.ts index a12d68159b..477848477f 100644 --- a/packages/message-encryption/src/symmetric.spec.ts +++ b/packages/message-encryption/src/symmetric.spec.ts @@ -5,6 +5,7 @@ import { getPublicKey } from "./crypto/index.js"; import { createDecoder, createEncoder } from "./symmetric.js"; const TestContentTopic = "/test/1/waku-message/utf8"; +const TestPubSubTopic = "/test/pubsub/topic"; describe("Symmetric Encryption", function () { it("Round trip binary encryption [symmetric, no signature]", async function () { @@ -22,10 +23,14 @@ describe("Symmetric Encryption", function () { const decoder = createDecoder(TestContentTopic, symKey); const protoResult = await decoder.fromWireToProtoObj(bytes!); if (!protoResult) throw "Failed to proto decode"; - const result = await decoder.fromProtoObj(protoResult); + const result = await decoder.fromProtoObj( + TestPubSubTopic, + protoResult + ); if (!result) throw "Failed to decode"; expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.pubSubTopic).to.equal(TestPubSubTopic); expect(result.version).to.equal(1); expect(result?.payload).to.deep.equal(payload); expect(result.signature).to.be.undefined; @@ -54,10 +59,14 @@ describe("Symmetric Encryption", function () { const decoder = createDecoder(TestContentTopic, symKey); const protoResult = await decoder.fromWireToProtoObj(bytes!); if (!protoResult) throw "Failed to proto decode"; - const result = await decoder.fromProtoObj(protoResult); + const result = await decoder.fromProtoObj( + TestPubSubTopic, + protoResult + ); if (!result) throw "Failed to decode"; expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.pubSubTopic).to.equal(TestPubSubTopic); expect(result.version).to.equal(1); expect(result?.payload).to.deep.equal(payload); expect(result.signature).to.not.be.undefined; diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 50a5a8212c..6e18301b7f 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -90,6 +90,7 @@ class Decoder extends DecoderV0 implements IDecoder { } async fromProtoObj( + pubSubTopic: string, protoMessage: IProtoMessage ): Promise { const cipherPayload = protoMessage.payload; @@ -130,6 +131,7 @@ class Decoder extends DecoderV0 implements IDecoder { log("Message decrypted", protoMessage); return new DecodedMessage( + pubSubTopic, protoMessage, res.payload, res.sig?.signature, diff --git a/packages/tests/tests/filter.node.spec.ts b/packages/tests/tests/filter.node.spec.ts index b958d13433..dde262ac0c 100644 --- a/packages/tests/tests/filter.node.spec.ts +++ b/packages/tests/tests/filter.node.spec.ts @@ -2,6 +2,7 @@ import { createDecoder, createEncoder, DecodedMessage, + DefaultPubSubTopic, waitForRemotePeer, } from "@waku/core"; import { createLightNode } from "@waku/create"; @@ -52,6 +53,7 @@ describe("Waku Filter", () => { log("Got a message"); messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); + expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic); expect(bytesToUtf8(msg.payload)).to.eq(messageText); }; diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index 95d19fbc7d..5b8e86f99f 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -334,6 +334,7 @@ describe("Waku Relay [node only]", () => { await waku3NoMsgPromise; expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); + expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic); }); }); diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 5c94349462..444e81ec2e 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -615,6 +615,7 @@ describe("Waku Store, custom pubsub topic", () => { const msg = await promise; if (msg) { messages.push(msg); + expect(msg.pubSubTopic).to.eq(customPubSubTopic); } });