From 9cdc9f96ac761de1ff544ffc9161b9159ec3b9f7 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 13:50:11 +1100 Subject: [PATCH 01/12] test: nwaku's --persist-message is now deprecated `--store=true` is enough to enable message persist for waku store. --- packages/tests/src/nwaku.ts | 1 - packages/tests/tests/store.node.spec.ts | 3 +-- packages/tests/tests/wait_for_remote_peer.node.spec.ts | 3 --- packages/tests/tests/waku.node.spec.ts | 1 - 4 files changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/tests/src/nwaku.ts b/packages/tests/src/nwaku.ts index 036196446e..d3e9e3f9d9 100644 --- a/packages/tests/src/nwaku.ts +++ b/packages/tests/src/nwaku.ts @@ -44,7 +44,6 @@ export interface Args { nodekey?: string; portsShift?: number; logLevel?: LogLevel; - persistMessages?: boolean; lightpush?: boolean; filter?: boolean; store?: boolean; diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index da6deeece3..3de49b6386 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -32,7 +32,7 @@ describe("Waku Store", () => { beforeEach(async function () { this.timeout(15_000); nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); + await nwaku.start({ store: true, lightpush: true }); }); afterEach(async function () { @@ -505,7 +505,6 @@ describe("Waku Store, custom pubsub topic", () => { this.timeout(15_000); nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start({ - persistMessages: true, store: true, topics: customPubSubTopic, }); diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 96aeba1ed3..08bbb4da5c 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -101,7 +101,6 @@ describe("Wait for remote peer", function () { relay: false, lightpush: false, filter: false, - persistMessages: true, }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); @@ -128,7 +127,6 @@ describe("Wait for remote peer", function () { relay: false, lightpush: false, filter: false, - persistMessages: true, }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); @@ -213,7 +211,6 @@ describe("Wait for remote peer", function () { lightpush: true, relay: false, store: true, - persistMessages: true, }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 587f6ab808..4d8def7b4d 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -38,7 +38,6 @@ describe("Waku Dial [node only]", function () { filter: true, store: true, lightpush: true, - persistMessages: true, }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); From 66270836f87d71f1d4e26801b04635c7cb0f206c Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 14:09:04 +1100 Subject: [PATCH 02/12] test: need to specify in memory sqlite So that messages are not persisted between tests. --- packages/tests/src/nwaku.ts | 4 +++- packages/tests/tests/nwaku.node.spec.ts | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/tests/src/nwaku.ts b/packages/tests/src/nwaku.ts index d3e9e3f9d9..e366e48b50 100644 --- a/packages/tests/src/nwaku.ts +++ b/packages/tests/src/nwaku.ts @@ -47,6 +47,7 @@ export interface Args { lightpush?: boolean; filter?: boolean; store?: boolean; + storeMessageDbUrl?: string; topics?: string; rpcPrivate?: boolean; websocketSupport?: boolean; @@ -422,7 +423,7 @@ export function argsToArray(args: Args): Array { for (const [key, value] of Object.entries(args)) { // Change the key from camelCase to kebab-case - const kebabKey = key.replace(/([A-Z])/, (_, capital) => { + const kebabKey = key.replace(/([A-Z])/g, (_, capital) => { return "-" + capital.toLowerCase(); }); @@ -441,6 +442,7 @@ export function defaultArgs(): Args { rpc: true, rpcAdmin: true, websocketSupport: true, + storeMessageDbUrl: "sqlite://:memory:", logLevel: LogLevel.Debug, }; } diff --git a/packages/tests/tests/nwaku.node.spec.ts b/packages/tests/tests/nwaku.node.spec.ts index 5299aab3f6..b1602bb2da 100644 --- a/packages/tests/tests/nwaku.node.spec.ts +++ b/packages/tests/tests/nwaku.node.spec.ts @@ -16,6 +16,7 @@ describe("nwaku", () => { "--rpc=true", "--rpc-admin=true", "--websocket-support=true", + "--store-message-db-url=sqlite://:memory:", "--log-level=DEBUG", "--ports-shift=42", ]; From 584fe29b3951900e8e11007d159ce51c45e75cf8 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 14:15:14 +1100 Subject: [PATCH 03/12] chore: improve log format --- packages/core/src/lib/waku_store/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 6bef492059..efcbf10b72 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -227,8 +227,8 @@ export class WakuStore { ); log("Querying history with the following options", { - peerId: options?.peerId?.toString(), ...options, + peerId: options?.peerId?.toString(), }); const res = await selectPeerForProtocol( From 862a33f239dac42d21a6f07dc70e9e5fa58f511f Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 14:15:29 +1100 Subject: [PATCH 04/12] test: reduce log verbosity --- packages/tests/src/nwaku.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/src/nwaku.ts b/packages/tests/src/nwaku.ts index e366e48b50..1a240e6dff 100644 --- a/packages/tests/src/nwaku.ts +++ b/packages/tests/src/nwaku.ts @@ -407,7 +407,7 @@ export class Nwaku { headers: new Headers({ "Content-Type": "application/json" }), }); const json = await res.json(); - log(`RPC Response: `, res, JSON.stringify(json)); + log(`RPC Response: `, JSON.stringify(json)); return json.result; } From c3c3833b1ba1b713d6d02eb58a5adad6ff17b161 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 14:15:55 +1100 Subject: [PATCH 05/12] test: nwaku's store now reject messages older than 20s --- packages/tests/tests/store.node.spec.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 3de49b6386..3895d52e2f 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -385,21 +385,21 @@ describe("Waku Store", () => { const now = new Date(); const startTime = new Date(); - // Set start time 5 minutes in the past - startTime.setTime(now.getTime() - 5 * 60 * 1000); + // Set start time 15 seconds in the past + startTime.setTime(now.getTime() - 15 * 1000); const message1Timestamp = new Date(); - // Set first message was 4 minutes in the past - message1Timestamp.setTime(now.getTime() - 4 * 60 * 1000); + // Set first message was 10 seconds in the past + message1Timestamp.setTime(now.getTime() - 10 * 1000); const message2Timestamp = new Date(); - // Set second message 2 minutes in the past - message2Timestamp.setTime(now.getTime() - 2 * 60 * 1000); + // Set second message 2 seconds in the past + message2Timestamp.setTime(now.getTime() - 2 * 1000); const messageTimestamps = [message1Timestamp, message2Timestamp]; const endTime = new Date(); - // Set end time 1 minute in the past - endTime.setTime(now.getTime() - 60 * 1000); + // Set end time 1 second in the past + endTime.setTime(now.getTime() - 1000); for (let i = 0; i < 2; i++) { expect( From 11c9823a9d42453641ad104a65272d7c4477f033 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 14:28:41 +1100 Subject: [PATCH 06/12] test: use byte payload to more easily read nwaku's logs --- packages/tests/tests/store.node.spec.ts | 28 ++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 3895d52e2f..62a56c8315 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -48,7 +48,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }) ) @@ -78,7 +78,7 @@ describe("Waku Store", () => { expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; + return msg.payload![0]! === 0; }); expect(result).to.not.eq(-1); }); @@ -119,7 +119,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }) ) @@ -146,7 +146,7 @@ describe("Waku Store", () => { expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; + return msg.payload![0]! === 0; }); expect(result).to.not.eq(-1); }); @@ -160,7 +160,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }) ) @@ -199,7 +199,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }) ) @@ -228,7 +228,7 @@ describe("Waku Store", () => { for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === `Message ${index}`; + return msg.payload![0]! === index; }) ).to.eq(index); } @@ -242,7 +242,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }) ) @@ -273,7 +273,7 @@ describe("Waku Store", () => { for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === `Message ${index}`; + return msg.payload![0]! === index; }) ).to.eq(index); } @@ -405,7 +405,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, timestamp: messageTimestamps[i], }) @@ -453,7 +453,7 @@ describe("Waku Store", () => { expect(firstMessages?.length).eq(1); - expect(bytesToUtf8(firstMessages[0].payload!)).eq("Message 0"); + expect(firstMessages[0].payload![0]!).eq(0); expect(bothMessages?.length).eq(2); }); @@ -467,7 +467,7 @@ describe("Waku Store", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }) ) @@ -523,7 +523,7 @@ describe("Waku Store, custom pubsub topic", () => { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), + payload: new Uint8Array([i]), contentTopic: TestContentTopic, }), customPubSubTopic @@ -555,7 +555,7 @@ describe("Waku Store, custom pubsub topic", () => { expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; + return msg.payload![0]! === 0; }); expect(result).to.not.eq(-1); }); From 74d7bb90c7fe2a59ef8bd353b03ed26d2ec1f258 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 15:32:34 +1100 Subject: [PATCH 07/12] test: compare all messages at once Makes it easier to understand the order mismatch. --- packages/tests/tests/store.node.spec.ts | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 62a56c8315..b7ff8c5076 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -225,13 +225,8 @@ describe("Waku Store", () => { ); expect(messages?.length).eq(totalMsgs); - for (let index = 0; index < totalMsgs; index++) { - expect( - messages?.findIndex((msg) => { - return msg.payload![0]! === index; - }) - ).to.eq(index); - } + const payloads = messages.map((msg) => msg.payload![0]!); + expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); }); it("Ordered Callback - Backward", async function () { @@ -270,13 +265,8 @@ describe("Waku Store", () => { messages = messages.reverse(); expect(messages?.length).eq(totalMsgs); - for (let index = 0; index < totalMsgs; index++) { - expect( - messages?.findIndex((msg) => { - return msg.payload![0]! === index; - }) - ).to.eq(index); - } + const payloads = messages.map((msg) => msg.payload![0]!); + expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); }); it("Generator, with asymmetric & symmetric encrypted messages", async function () { From 1b85373472ea937aba11e885616adfa1f0b78b4e Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 20:58:30 +1100 Subject: [PATCH 08/12] test: use `esnext` for NodeJS tests --- packages/tests/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/package.json b/packages/tests/package.json index 5b4ce611c2..f2eaabaade 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -48,7 +48,7 @@ "check:spelling": "cspell \"{README.md,{tests,src}/**/*.ts}\"", "check:tsc": "tsc -p tsconfig.dev.json", "test": "run-s test:*", - "test:node": "TS_NODE_PROJECT=./tsconfig.json mocha", + "test:node": "TS_NODE_PROJECT=./tsconfig.dev.json mocha", "reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build" }, "engines": { From 12fa89c9308a618dbb1120231b55e0eb29c8dc5c Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 21:23:27 +1100 Subject: [PATCH 09/12] fix: defaults to mounted protocol when dialing --- packages/core/src/lib/waku.ts | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index f792eb6c56..6d73436a80 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -13,8 +13,8 @@ import { FilterCodec, WakuFilter } from "./waku_filter"; import { LightPushCodec, WakuLightPush } from "./waku_light_push"; import { EncoderV0 } from "./waku_message/version_0"; import { WakuRelay } from "./waku_relay"; -import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants"; import * as relayConstants from "./waku_relay/constants"; +import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants"; import { StoreCodec, WakuStore } from "./waku_store"; export const DefaultPingKeepAliveValueSecs = 0; @@ -109,13 +109,20 @@ export class WakuNode implements Waku { * Dials to the provided peer. * * @param peer The peer to dial - * @param protocols Waku protocols we expect from the peer; Default to Relay + * @param protocols Waku protocols we expect from the peer; Defaults to mounted protocols */ async dial( peer: PeerId | Multiaddr, protocols?: Protocols[] ): Promise { - const _protocols = protocols ?? [Protocols.Relay]; + const _protocols = protocols ?? []; + + if (typeof protocols === "undefined") { + this.relay && _protocols.push(Protocols.Relay); + this.store && _protocols.push(Protocols.Store); + this.filter && _protocols.push(Protocols.Filter); + this.lightPush && _protocols.push(Protocols.LightPush); + } const codecs: string[] = []; if (_protocols.includes(Protocols.Relay)) { From a0162febc7cb8571bd03d0a058b33ce89df43947 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 21:23:59 +1100 Subject: [PATCH 10/12] chore: remove unnecessary ts-ignore --- packages/core/src/lib/waku.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 6d73436a80..ba1bdd6c9e 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -138,8 +138,6 @@ export class WakuNode implements Waku { codecs.push(FilterCodec); } - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: new Multiaddr is not backward compatible return this.libp2p.dialProtocol(peer, codecs); } From ab61eac84f1fe6db76c5ff25c16f8ff5d8be47c7 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Tue, 15 Nov 2022 21:59:31 +1100 Subject: [PATCH 11/12] chore(ci/tests): bump nwaku to v0.13.0 --- .github/workflows/ci.yml | 2 +- nwaku | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60ff4cdc93..0b029c3f32 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ on: pull_request: env: - NWAKU_VERSION: "v0.11" + NWAKU_VERSION: "v0.13.0" NODE_JS: "16" jobs: diff --git a/nwaku b/nwaku index fec1397483..9debd44e2a 160000 --- a/nwaku +++ b/nwaku @@ -1 +1 @@ -Subproject commit fec13974836149c2b81cab8d4178dee7bfc56db1 +Subproject commit 9debd44e2aebf07eaf96b4525a4497c69aaf4445 From 59992832fe7a7e18e4cf2b292744d207c2a476ab Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Wed, 16 Nov 2022 11:00:43 +1100 Subject: [PATCH 12/12] feat!: add Waku Message ephemeral support --- packages/core/CHANGELOG.md | 4 + .../core/src/lib/to_proto_message.spec.ts | 1 + packages/core/src/lib/to_proto_message.ts | 1 + .../lib/waku_message/topic_only_message.ts | 6 +- .../src/lib/waku_message/version_0.spec.ts | 19 +++ .../core/src/lib/waku_message/version_0.ts | 10 +- packages/core/src/proto/filter.ts | 9 ++ packages/core/src/proto/light_push.ts | 9 ++ packages/core/src/proto/message.proto | 1 + packages/core/src/proto/message.ts | 9 ++ packages/core/src/proto/store.ts | 9 ++ packages/interfaces/src/index.ts | 3 + packages/message-encryption/src/index.ts | 8 +- packages/tests/tests/store.node.spec.ts | 141 +++++++++++++++++- 14 files changed, 222 insertions(+), 8 deletions(-) diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index ed4f57729f..06b2a7dec3 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Waku Message `ephemeral` field to mark messages as do-not-store. + ## @waku/core [0.0.5](https://github.com/waku-org/js-waku/compare/@waku/core@0.0.4...@waku/core@0.0.5) (2022-11-11) ### Changed diff --git a/packages/core/src/lib/to_proto_message.spec.ts b/packages/core/src/lib/to_proto_message.spec.ts index 4bc7ca0fa7..2280c99f12 100644 --- a/packages/core/src/lib/to_proto_message.spec.ts +++ b/packages/core/src/lib/to_proto_message.spec.ts @@ -20,5 +20,6 @@ describe("to proto message", () => { expect(keys).to.contain("version"); expect(keys).to.contain("timestamp"); expect(keys).to.contain("rateLimitProof"); + expect(keys).to.contain("ephemeral"); }); }); diff --git a/packages/core/src/lib/to_proto_message.ts b/packages/core/src/lib/to_proto_message.ts index a2a60fd0c4..b554581214 100644 --- a/packages/core/src/lib/to_proto_message.ts +++ b/packages/core/src/lib/to_proto_message.ts @@ -8,6 +8,7 @@ const EmptyMessage: ProtoMessage = { version: undefined, timestamp: undefined, rateLimitProof: undefined, + ephemeral: undefined, }; export function toProtoMessage(wire: WakuMessageProto): ProtoMessage { diff --git a/packages/core/src/lib/waku_message/topic_only_message.ts b/packages/core/src/lib/waku_message/topic_only_message.ts index 9514847d21..d720fce1db 100644 --- a/packages/core/src/lib/waku_message/topic_only_message.ts +++ b/packages/core/src/lib/waku_message/topic_only_message.ts @@ -1,14 +1,15 @@ -import type { Decoder, Message, ProtoMessage } from "@waku/interfaces"; +import type { DecodedMessage, Decoder, ProtoMessage } from "@waku/interfaces"; import debug from "debug"; import * as proto from "../../proto/topic_only_message"; const log = debug("waku:message:topic-only"); -export class TopicOnlyMessage implements Message { +export class TopicOnlyMessage implements DecodedMessage { public payload: undefined; public rateLimitProof: undefined; public timestamp: undefined; + public ephemeral: undefined; constructor(private proto: proto.TopicOnlyMessage) {} @@ -29,6 +30,7 @@ export class TopicOnlyDecoder implements Decoder { rateLimitProof: undefined, timestamp: undefined, version: undefined, + ephemeral: undefined, }); } diff --git a/packages/core/src/lib/waku_message/version_0.spec.ts b/packages/core/src/lib/waku_message/version_0.spec.ts index 29961a655a..8601538ae8 100644 --- a/packages/core/src/lib/waku_message/version_0.spec.ts +++ b/packages/core/src/lib/waku_message/version_0.spec.ts @@ -17,6 +17,25 @@ describe("Waku Message version 0", function () { expect(result.contentTopic).to.eq(TestContentTopic); expect(result.version).to.eq(0); + expect(result.ephemeral).to.be.false; + expect(result.payload).to.deep.eq(payload); + expect(result.timestamp).to.not.be.undefined; + }) + ); + }); + + it("Ephemeral", async function () { + await fc.assert( + fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => { + const encoder = new EncoderV0(TestContentTopic, true); + const bytes = await encoder.toWire({ payload }); + const decoder = new DecoderV0(TestContentTopic); + const protoResult = await decoder.fromWireToProtoObj(bytes); + const result = (await decoder.fromProtoObj(protoResult!)) as MessageV0; + + expect(result.contentTopic).to.eq(TestContentTopic); + expect(result.version).to.eq(0); + expect(result.ephemeral).to.be.true; expect(result.payload).to.deep.eq(payload); expect(result.timestamp).to.not.be.undefined; }) diff --git a/packages/core/src/lib/waku_message/version_0.ts b/packages/core/src/lib/waku_message/version_0.ts index ea2c1a3cbe..89d630b706 100644 --- a/packages/core/src/lib/waku_message/version_0.ts +++ b/packages/core/src/lib/waku_message/version_0.ts @@ -26,6 +26,10 @@ export class MessageV0 implements DecodedMessage { return; } + get ephemeral(): boolean { + return Boolean(this.proto.ephemeral); + } + get payload(): Uint8Array | undefined { return this._rawPayload; } @@ -68,7 +72,7 @@ export class MessageV0 implements DecodedMessage { } export class EncoderV0 implements Encoder { - constructor(public contentTopic: string) {} + constructor(public contentTopic: string, public ephemeral: boolean = false) {} async toWire(message: Partial): Promise { return proto.WakuMessage.encode(await this.toProtoObj(message)); @@ -83,12 +87,13 @@ export class EncoderV0 implements Encoder { contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, + ephemeral: this.ephemeral, }; } } export class DecoderV0 implements Decoder { - constructor(public contentTopic: string) {} + constructor(public contentTopic: string, public ephemeral: boolean = false) {} fromWireToProtoObj(bytes: Uint8Array): Promise { const protoMessage = proto.WakuMessage.decode(bytes); @@ -99,6 +104,7 @@ export class DecoderV0 implements Decoder { version: protoMessage.version ?? undefined, timestamp: protoMessage.timestamp ?? undefined, rateLimitProof: protoMessage.rateLimitProof ?? undefined, + ephemeral: protoMessage.ephemeral ?? false, }); } diff --git a/packages/core/src/proto/filter.ts b/packages/core/src/proto/filter.ts index 61b6bd3e49..87b2f95cc3 100644 --- a/packages/core/src/proto/filter.ts +++ b/packages/core/src/proto/filter.ts @@ -501,6 +501,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -544,6 +545,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -578,6 +584,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/core/src/proto/light_push.ts b/packages/core/src/proto/light_push.ts index 329d853cf5..bb34340e81 100644 --- a/packages/core/src/proto/light_push.ts +++ b/packages/core/src/proto/light_push.ts @@ -425,6 +425,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -468,6 +469,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -502,6 +508,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/core/src/proto/message.proto b/packages/core/src/proto/message.proto index bd4c0dc85d..49b5618033 100644 --- a/packages/core/src/proto/message.proto +++ b/packages/core/src/proto/message.proto @@ -17,5 +17,6 @@ message WakuMessage { optional double timestamp_deprecated = 4; optional sint64 timestamp = 10; optional RateLimitProof rate_limit_proof = 21; + optional bool ephemeral = 31; } diff --git a/packages/core/src/proto/message.ts b/packages/core/src/proto/message.ts index 0c949ea7a6..f24100e98e 100644 --- a/packages/core/src/proto/message.ts +++ b/packages/core/src/proto/message.ts @@ -203,6 +203,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -246,6 +247,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -280,6 +286,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/core/src/proto/store.ts b/packages/core/src/proto/store.ts index 0e9324f885..558b545f96 100644 --- a/packages/core/src/proto/store.ts +++ b/packages/core/src/proto/store.ts @@ -743,6 +743,7 @@ export interface WakuMessage { timestampDeprecated?: number; timestamp?: bigint; rateLimitProof?: RateLimitProof; + ephemeral?: boolean; } export namespace WakuMessage { @@ -786,6 +787,11 @@ export namespace WakuMessage { RateLimitProof.codec().encode(obj.rateLimitProof, writer); } + if (obj.ephemeral != null) { + writer.uint32(248); + writer.bool(obj.ephemeral); + } + if (opts.lengthDelimited !== false) { writer.ldelim(); } @@ -820,6 +826,9 @@ export namespace WakuMessage { reader.uint32() ); break; + case 31: + obj.ephemeral = reader.bool(); + break; default: reader.skipType(tag & 7); break; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 5551483137..91258a3b4f 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -165,6 +165,7 @@ export interface ProtoMessage { version: number | undefined; timestamp: bigint | undefined; rateLimitProof: RateLimitProof | undefined; + ephemeral: boolean | undefined; } /** @@ -178,6 +179,7 @@ export interface Message { export interface Encoder { contentTopic: string; + ephemeral: boolean; toWire: (message: Message) => Promise; toProtoObj: (message: Message) => Promise; } @@ -187,6 +189,7 @@ export interface DecodedMessage { contentTopic: string | undefined; timestamp: Date | undefined; rateLimitProof: RateLimitProof | undefined; + ephemeral: boolean | undefined; } export interface Decoder { diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index ec84603f72..fc60c60a99 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -66,7 +66,8 @@ export class AsymEncoder implements Encoder { constructor( public contentTopic: string, private publicKey: Uint8Array, - private sigPrivKey?: Uint8Array + private sigPrivKey?: Uint8Array, + public ephemeral: boolean = false ) {} async toWire(message: Partial): Promise { @@ -94,6 +95,7 @@ export class AsymEncoder implements Encoder { contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, + ephemeral: this.ephemeral, }; } } @@ -102,7 +104,8 @@ export class SymEncoder implements Encoder { constructor( public contentTopic: string, private symKey: Uint8Array, - private sigPrivKey?: Uint8Array + private sigPrivKey?: Uint8Array, + public ephemeral: boolean = false ) {} async toWire(message: Partial): Promise { @@ -129,6 +132,7 @@ export class SymEncoder implements Encoder { contentTopic: this.contentTopic, timestamp: BigInt(timestamp.valueOf()) * OneMillion, rateLimitProof: message.rateLimitProof, + ephemeral: this.ephemeral, }; } } diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index b7ff8c5076..5f17d30616 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -3,7 +3,7 @@ import { PageDirection } from "@waku/core"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { createFullNode } from "@waku/create"; -import type { Message, WakuFull } from "@waku/interfaces"; +import { DecodedMessage, Message, WakuFull } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { AsymDecoder, @@ -343,7 +343,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku2, [Protocols.Store]); - const messages: Message[] = []; + const messages: DecodedMessage[] = []; log("Retrieve messages from store"); for await (const msgPromises of waku2.store.queryGenerator([ @@ -369,6 +369,143 @@ describe("Waku Store", () => { !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); }); + it.skip("Ephemeral support", async function () { + this.timeout(15_000); + + const asymText = "This message is encrypted for me using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + + const symText = + "This message is encrypted for me using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; + + const clearText = "This is a clear text message for everyone to read"; + + const storeReadableText = "This message is readable by the store"; + const storeUnreadableText = "This message is not readable by the store"; + + const timestamp = new Date(); + + const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; + const symMsg = { + payload: utf8ToBytes(symText), + timestamp: new Date(timestamp.valueOf() + 1), + }; + const clearMsg = { + payload: utf8ToBytes(clearText), + timestamp: new Date(timestamp.valueOf() + 2), + }; + + const storeReadableMsg = { + payload: utf8ToBytes(storeReadableText), + }; + const storeUnreadableMsg = { + payload: utf8ToBytes(storeUnreadableText), + }; + + const privateKey = generatePrivateKey(); + const symKey = generateSymmetricKey(); + const publicKey = getPublicKey(privateKey); + + const storeWithAsymEncoder = new AsymEncoder( + asymTopic, + publicKey, + undefined, + false + ); + const storeWithSymEncoder = new SymEncoder( + symTopic, + symKey, + undefined, + false + ); + + const dontStoreWithAsymEncoder = new AsymEncoder( + asymTopic, + publicKey, + undefined, + true + ); + const dontStoreWithSymEncoder = new SymEncoder( + symTopic, + symKey, + undefined, + true + ); + + const storeEncoder = new EncoderV0(TestContentTopic, false); + const storeUnreadableEncoder = new EncoderV0(TestContentTopic, true); + + const asymDecoder = new AsymDecoder(asymTopic, privateKey); + const symDecoder = new SymDecoder(symTopic, symKey); + + const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ + createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }).then((waku) => waku.start().then(() => waku)), + createFullNode({ + staticNoiseKey: NOISE_KEY_2, + }).then((waku) => waku.start().then(() => waku)), + nwaku.getMultiaddrWithId(), + ]); + + log("Waku nodes created"); + + await Promise.all([ + waku1.dial(nimWakuMultiaddr), + waku2.dial(nimWakuMultiaddr), + ]); + + log("Waku nodes connected to nwaku"); + + await waitForRemotePeer(waku1, [Protocols.LightPush]); + + log("Sending messages using light push"); + await Promise.all([ + waku1.lightPush.push(storeWithAsymEncoder, asymMsg), + waku1.lightPush.push(storeWithSymEncoder, symMsg), + waku1.lightPush.push(dontStoreWithAsymEncoder, asymMsg), + waku1.lightPush.push(dontStoreWithSymEncoder, symMsg), + waku1.lightPush.push(TestEncoder, clearMsg), + waku1.lightPush.push(storeEncoder, storeReadableMsg), + waku1.lightPush.push(storeUnreadableEncoder, storeUnreadableMsg), + ]); + + await waitForRemotePeer(waku2, [Protocols.Store]); + + const messages: DecodedMessage[] = []; + log("Retrieve messages from store"); + + for await (const msgPromises of waku2.store.queryGenerator([ + asymDecoder, + symDecoder, + TestDecoder, + ])) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } + } + } + + // Messages are ordered from oldest to latest within a page (1 page query) + expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); + expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); + expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); + expect(bytesToUtf8(messages[3].payload!)).to.eq(storeReadableText); + expect(messages?.length).eq(4); + + // check for ephemeral + expect(messages[0].ephemeral).to.be.false; + expect(messages[1].ephemeral).to.be.false; + expect(messages[2].ephemeral).to.be.false; + expect(messages[3].ephemeral).to.be.false; + + !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); + !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + it("Ordered callback, using start and end time", async function () { this.timeout(20000);