diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index ed4f57729f..06b2a7dec3 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Waku Message `ephemeral` field to mark messages as do-not-store. + ## @waku/core [0.0.5](https://github.com/waku-org/js-waku/compare/@waku/core@0.0.4...@waku/core@0.0.5) (2022-11-11) ### Changed diff --git a/packages/core/src/lib/to_proto_message.spec.ts b/packages/core/src/lib/to_proto_message.spec.ts index 4bc7ca0fa7..2280c99f12 100644 --- a/packages/core/src/lib/to_proto_message.spec.ts +++ b/packages/core/src/lib/to_proto_message.spec.ts @@ -20,5 +20,6 @@ describe("to proto message", () => { expect(keys).to.contain("version"); expect(keys).to.contain("timestamp"); expect(keys).to.contain("rateLimitProof"); + expect(keys).to.contain("ephemeral"); }); }); diff --git a/packages/core/src/lib/to_proto_message.ts b/packages/core/src/lib/to_proto_message.ts index a2a60fd0c4..b554581214 100644 --- a/packages/core/src/lib/to_proto_message.ts +++ b/packages/core/src/lib/to_proto_message.ts @@ -8,6 +8,7 @@ const EmptyMessage: ProtoMessage = { version: undefined, timestamp: undefined, rateLimitProof: undefined, + ephemeral: undefined, }; export function toProtoMessage(wire: WakuMessageProto): ProtoMessage { diff --git a/packages/core/src/lib/waku_message/topic_only_message.ts b/packages/core/src/lib/waku_message/topic_only_message.ts index 9514847d21..d720fce1db 100644 --- a/packages/core/src/lib/waku_message/topic_only_message.ts +++ b/packages/core/src/lib/waku_message/topic_only_message.ts @@ -1,14 +1,15 @@ -import type { Decoder, Message, ProtoMessage } from "@waku/interfaces"; +import type { DecodedMessage, Decoder, ProtoMessage } from "@waku/interfaces"; import debug from "debug"; import * as proto from "../../proto/topic_only_message"; const log = debug("waku:message:topic-only"); -export class TopicOnlyMessage implements Message { +export class TopicOnlyMessage implements DecodedMessage { public payload: undefined; public rateLimitProof: undefined; public timestamp: undefined; + public ephemeral: undefined; constructor(private proto: proto.TopicOnlyMessage) {} @@ -29,6 +30,7 @@ export class TopicOnlyDecoder implements Decoder { rateLimitProof: undefined, timestamp: undefined, version: undefined, + ephemeral: undefined, }); } diff --git a/packages/core/src/lib/waku_message/version_0.spec.ts b/packages/core/src/lib/waku_message/version_0.spec.ts index 29961a655a..8601538ae8 100644 --- a/packages/core/src/lib/waku_message/version_0.spec.ts +++ b/packages/core/src/lib/waku_message/version_0.spec.ts @@ -17,6 +17,25 @@ describe("Waku Message version 0", function () { expect(result.contentTopic).to.eq(TestContentTopic); expect(result.version).to.eq(0); + expect(result.ephemeral).to.be.false; + expect(result.payload).to.deep.eq(payload); + expect(result.timestamp).to.not.be.undefined; + }) + ); + }); + + it("Ephemeral", async function () { + await fc.assert( + fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => { + const encoder = new EncoderV0(TestContentTopic, true); + const bytes = await encoder.toWire({ payload }); + const decoder = new DecoderV0(TestContentTopic); + const protoResult = await decoder.fromWireToProtoObj(bytes); + const result = (await decoder.fromProtoObj(protoResult!)) as MessageV0; + + expect(result.contentTopic).to.eq(TestContentTopic); + expect(result.version).to.eq(0); + expect(result.ephemeral).to.be.true; expect(result.payload).to.deep.eq(payload); expect(result.timestamp).to.not.be.undefined; }) diff --git a/packages/core/src/lib/waku_message/version_0.ts b/packages/core/src/lib/waku_message/version_0.ts index ea2c1a3cbe..89d630b706 100644 --- a/packages/core/src/lib/waku_message/version_0.ts +++ b/packages/core/src/lib/waku_message/version_0.ts @@ -26,6 +26,10 @@ export class MessageV0 implements DecodedMessage { return; } + get ephemeral(): boolean { + return Boolean(this.proto.ephemeral); + } + get payload(): Uint8Array | undefined { return this._rawPayload; } @@ -68,7 +72,7 @@ export class MessageV0 implements DecodedMessage { } export class EncoderV0 implements Encoder { - constructor(public contentTopic: string) {} + constructor(public contentTopic: string, public ephemeral: boolean = false) {} async toWire(message: Partial): Promise { return proto.WakuMessage.encode(await this.toProtoObj(message)); @@ -83,12 +87,13 @@ export class EncoderV0 implements Encoder { contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, + ephemeral: this.ephemeral, }; } } export class DecoderV0 implements Decoder { - constructor(public contentTopic: string) {} + constructor(public contentTopic: string, public ephemeral: boolean = false) {} fromWireToProtoObj(bytes: Uint8Array): Promise { const protoMessage = proto.WakuMessage.decode(bytes); @@ -99,6 +104,7 @@ export class DecoderV0 implements Decoder { version: protoMessage.version ?? undefined, timestamp: protoMessage.timestamp ?? undefined, rateLimitProof: protoMessage.rateLimitProof ?? undefined, + ephemeral: protoMessage.ephemeral ?? false, }); } diff --git a/packages/core/src/proto/filter.ts b/packages/core/src/proto/filter.ts index 61b6bd3e49..87b2f95cc3 100644 --- a/packages/core/src/proto/filter.ts +++ b/packages/core/src/proto/filter.ts @@ -501,6 +501,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -544,6 +545,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -578,6 +584,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/core/src/proto/light_push.ts b/packages/core/src/proto/light_push.ts index 329d853cf5..bb34340e81 100644 --- a/packages/core/src/proto/light_push.ts +++ b/packages/core/src/proto/light_push.ts @@ -425,6 +425,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -468,6 +469,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -502,6 +508,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/core/src/proto/message.proto b/packages/core/src/proto/message.proto index bd4c0dc85d..49b5618033 100644 --- a/packages/core/src/proto/message.proto +++ b/packages/core/src/proto/message.proto @@ -17,5 +17,6 @@ message WakuMessage { optional double timestamp_deprecated = 4; optional sint64 timestamp = 10; optional RateLimitProof rate_limit_proof = 21; + optional bool ephemeral = 31; } diff --git a/packages/core/src/proto/message.ts b/packages/core/src/proto/message.ts index 0c949ea7a6..f24100e98e 100644 --- a/packages/core/src/proto/message.ts +++ b/packages/core/src/proto/message.ts @@ -203,6 +203,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -246,6 +247,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -280,6 +286,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/core/src/proto/store.ts b/packages/core/src/proto/store.ts index 0e9324f885..558b545f96 100644 --- a/packages/core/src/proto/store.ts +++ b/packages/core/src/proto/store.ts @@ -743,6 +743,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -786,6 +787,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -820,6 +826,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 5551483137..91258a3b4f 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -165,6 +165,7 @@ export interface ProtoMessage { version: number | undefined; timestamp: bigint | undefined; rateLimitProof: RateLimitProof | undefined; + ephemeral: boolean | undefined; } /** @@ -178,6 +179,7 @@ export interface Message { export interface Encoder { contentTopic: string; + ephemeral: boolean; toWire: (message: Message) => Promise; toProtoObj: (message: Message) => Promise; } @@ -187,6 +189,7 @@ export interface DecodedMessage { contentTopic: string | undefined; timestamp: Date | undefined; rateLimitProof: RateLimitProof | undefined; + ephemeral: boolean | undefined; } export interface Decoder { diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index ec84603f72..fc60c60a99 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -66,7 +66,8 @@ export class AsymEncoder implements Encoder { constructor( public contentTopic: string, private publicKey: Uint8Array, - private sigPrivKey?: Uint8Array + private sigPrivKey?: Uint8Array, + public ephemeral: boolean = false ) {} async toWire(message: Partial): Promise { @@ -94,6 +95,7 @@ export class AsymEncoder implements Encoder { contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, + ephemeral: this.ephemeral, }; } } @@ -102,7 +104,8 @@ export class SymEncoder implements Encoder { constructor( public contentTopic: string, private symKey: Uint8Array, - private sigPrivKey?: Uint8Array + private sigPrivKey?: Uint8Array, + public ephemeral: boolean = false ) {} async toWire(message: Partial): Promise { @@ -129,6 +132,7 @@ export class SymEncoder implements Encoder { contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, + ephemeral: this.ephemeral, }; } } diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index b7ff8c5076..5f17d30616 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -3,7 +3,7 @@ import { PageDirection } from "@waku/core"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { createFullNode } from "@waku/create"; -import type { Message, WakuFull } from "@waku/interfaces"; +import { DecodedMessage, Message, WakuFull } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { AsymDecoder, @@ -343,7 +343,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku2, [Protocols.Store]); - const messages: Message[] = []; + const messages: DecodedMessage[] = []; log("Retrieve messages from store"); for await (const msgPromises of waku2.store.queryGenerator([ @@ -369,6 +369,143 @@ describe("Waku Store", () => { !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); }); + it.skip("Ephemeral support", async function () { + this.timeout(15_000); + + const asymText = "This message is encrypted for me using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + + const symText = + "This message is encrypted for me using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; + + const clearText = "This is a clear text message for everyone to read"; + + const storeReadableText = "This message is readable by the store"; + const storeUnreadableText = "This message is not readable by the store"; + + const timestamp = new Date(); + + const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; + const symMsg = { + payload: utf8ToBytes(symText), + timestamp: new Date(timestamp.valueOf() + 1), + }; + const clearMsg = { + payload: utf8ToBytes(clearText), + timestamp: new Date(timestamp.valueOf() + 2), + }; + + const storeReadableMsg = { + payload: utf8ToBytes(storeReadableText), + }; + const storeUnreadableMsg = { + payload: utf8ToBytes(storeUnreadableText), + }; + + const privateKey = generatePrivateKey(); + const symKey = generateSymmetricKey(); + const publicKey = getPublicKey(privateKey); + + const storeWithAsymEncoder = new AsymEncoder( + asymTopic, + publicKey, + undefined, + false + ); + const storeWithSymEncoder = new SymEncoder( + symTopic, + symKey, + undefined, + false + ); + + const dontStoreWithAsymEncoder = new AsymEncoder( + asymTopic, + publicKey, + undefined, + true + ); + const dontStoreWithSymEncoder = new SymEncoder( + symTopic, + symKey, + undefined, + true + ); + + const storeEncoder = new EncoderV0(TestContentTopic, false); + const storeUnreadableEncoder = new EncoderV0(TestContentTopic, true); + + const asymDecoder = new AsymDecoder(asymTopic, privateKey); + const symDecoder = new SymDecoder(symTopic, symKey); + + const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ + createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }).then((waku) => waku.start().then(() => waku)), + createFullNode({ + staticNoiseKey: NOISE_KEY_2, + }).then((waku) => waku.start().then(() => waku)), + nwaku.getMultiaddrWithId(), + ]); + + log("Waku nodes created"); + + await Promise.all([ + waku1.dial(nimWakuMultiaddr), + waku2.dial(nimWakuMultiaddr), + ]); + + log("Waku nodes connected to nwaku"); + + await waitForRemotePeer(waku1, [Protocols.LightPush]); + + log("Sending messages using light push"); + await Promise.all([ + waku1.lightPush.push(storeWithAsymEncoder, asymMsg), + waku1.lightPush.push(storeWithSymEncoder, symMsg), + waku1.lightPush.push(dontStoreWithAsymEncoder, asymMsg), + waku1.lightPush.push(dontStoreWithSymEncoder, symMsg), + waku1.lightPush.push(TestEncoder, clearMsg), + waku1.lightPush.push(storeEncoder, storeReadableMsg), + waku1.lightPush.push(storeUnreadableEncoder, storeUnreadableMsg), + ]); + + await waitForRemotePeer(waku2, [Protocols.Store]); + + const messages: DecodedMessage[] = []; + log("Retrieve messages from store"); + + for await (const msgPromises of waku2.store.queryGenerator([ + asymDecoder, + symDecoder, + TestDecoder, + ])) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } + } + } + + // Messages are ordered from oldest to latest within a page (1 page query) + expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); + expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); + expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); + expect(bytesToUtf8(messages[3].payload!)).to.eq(storeReadableText); + expect(messages?.length).eq(4); + + // check for ephemeral + expect(messages[0].ephemeral).to.be.false; + expect(messages[1].ephemeral).to.be.false; + expect(messages[2].ephemeral).to.be.false; + expect(messages[3].ephemeral).to.be.false; + + !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); + !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + it("Ordered callback, using start and end time", async function () { this.timeout(20000);