From b2c7e4185f2008c1b79af13655ea749b174d1bb8 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 19 Sep 2022 13:50:29 +1000 Subject: [PATCH] feat: make message encoding more generic --- .cspell.json | 2 + .size-limit.cjs | 6 +- CHANGELOG.md | 7 + package.json | 8 + rollup.config.js | 2 + src/index.ts | 3 - src/lib/group_by.ts | 14 + src/lib/interfaces.ts | 33 +- src/lib/push_or_init_map.spec.ts | 27 ++ src/lib/push_or_init_map.ts | 13 + src/lib/waku.node.spec.ts | 32 +- src/lib/waku.ts | 42 +-- src/lib/waku_filter/index.node.spec.ts | 72 ++--- src/lib/waku_filter/index.ts | 114 ++++--- src/lib/waku_light_push/index.node.spec.ts | 36 ++- src/lib/waku_light_push/index.ts | 23 +- src/lib/waku_light_push/push_rpc.ts | 8 +- src/lib/waku_message/index.node.spec.ts | 191 ------------ src/lib/waku_message/index.spec.ts | 138 --------- src/lib/waku_message/index.ts | 312 ------------------- src/lib/waku_message/version_0.spec.ts | 25 ++ src/lib/waku_message/version_0.ts | 105 +++++++ src/lib/waku_message/version_1.spec.ts | 181 +++++++++-- src/lib/waku_message/version_1.ts | 340 ++++++++++++++++----- src/lib/waku_relay/index.node.spec.ts | 249 +++++++-------- src/lib/waku_relay/index.ts | 192 ++++-------- src/lib/waku_store/index.node.spec.ts | 197 ++++++------ src/lib/waku_store/index.ts | 97 ++---- typedoc.json | 4 +- 29 files changed, 1085 insertions(+), 1388 deletions(-) create mode 100644 src/lib/group_by.ts create mode 100644 src/lib/push_or_init_map.spec.ts create mode 100644 src/lib/push_or_init_map.ts delete mode 100644 src/lib/waku_message/index.node.spec.ts delete mode 100644 src/lib/waku_message/index.spec.ts delete mode 100644 src/lib/waku_message/index.ts create mode 100644 src/lib/waku_message/version_0.spec.ts create mode 100644 src/lib/waku_message/version_0.ts diff --git a/.cspell.json b/.cspell.json index a2a890fc94..754e3a733f 100644 --- a/.cspell.json +++ b/.cspell.json @@ -3,6 +3,8 @@ "$schema": "https://raw.githubusercontent.com/streetsidesoftware/cspell/master/cspell.schema.json", "language": "en", "words": [ + "abortable", + "asym", "backoff", "backoffs", "bitjson", diff --git a/.size-limit.cjs b/.size-limit.cjs index 370ec75ddc..65ef6dba29 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -10,12 +10,14 @@ module.exports = [ import: { "./bundle/lib/create_waku.js": "{ createLightNode }", "./bundle/lib/wait_for_remote_peer.js": "{ waitForRemotePeer }", + "./bundle/lib/waku_message/version_0.js": + "{ MessageV0, DecoderV0, EncoderV0 }", }, }, { name: "Asymmetric, symmetric encryption and signature", - path: "bundle/index.js", - import: "{ WakuMessage }", + path: "bundle/lib/waku_message/version_1.js", + import: "{ MessageV1, AsymEncoder, AsymDecoder, SymEncoder, SymDecoder }", }, { name: "DNS discovery", diff --git a/CHANGELOG.md b/CHANGELOG.md index 36f3ddd670..6b1b7d6474 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `queryCallbackOnPromise`'s return value has been simplified to `Promise`. - doc: clarified behaviour of `WakuStore` query functions. +- Waku message encoding and decoding is more generic, to enable upcoming feature such as [RLN](https://rfc.vac.dev/spec/17/) & [Noise](https://rfc.vac.dev/spec/43/); + it also enables separating the `version_1` module out to reduce bundle size and improve cross-platform compatibility when not used. +- Due to the change above, all APIs that handle messages have changed to receive a `Decoder` or `Encoder`. + +### Deleted + +- `WakuMessage` class in favour of the `Message`, `Encoder`, `Decoder` interfaces and `EncoderV0`, `AsymEncoder`, `SymEncoder` (and related decoders). ## [0.28.0] - 2022-09-16 diff --git a/package.json b/package.json index 4442183a0c..5c5b5b20fa 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,14 @@ "./lib/wait_for_remote_peer": { "types": "./dist/lib/wait_for_remote_peer.d.ts", "import": "./dist/lib/wait_for_remote_peer.js" + }, + "./lib/waku_message/version_0": { + "types": "./dist/lib/waku_message/version_0.d.ts", + "import": "./dist/lib/waku_message/version_0.js" + }, + "./lib/waku_message/version_1": { + "types": "./dist/lib/waku_message/version_1.d.ts", + "import": "./dist/lib/waku_message/version_1.js" } }, "typesVersions": { diff --git a/rollup.config.js b/rollup.config.js index bbb2da0822..cb58ac9f1c 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -10,6 +10,8 @@ export default { "lib/peer_discovery_static_list": "dist/lib/peer_discovery_static_list.js", "lib/predefined_bootstrap_nodes": "dist/lib/predefined_bootstrap_nodes.js", "lib/wait_for_remote_peer": "dist/lib/wait_for_remote_peer.js", + "lib/waku_message/version_0": "dist/lib/waku_message/version_0.js", + "lib/waku_message/version_1": "dist/lib/waku_message/version_1.js", }, output: { dir: "bundle", diff --git a/src/index.ts b/src/index.ts index 60078fbb9b..b1f3f85403 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,9 +15,6 @@ export * as proto_message from "./proto/message"; export * as waku from "./lib/waku"; export { WakuNode, Protocols } from "./lib/waku"; -export * as waku_message from "./lib/waku_message"; -export { WakuMessage } from "./lib/waku_message"; - export * as waku_filter from "./lib/waku_filter"; export { WakuFilter } from "./lib/waku_filter"; diff --git a/src/lib/group_by.ts b/src/lib/group_by.ts new file mode 100644 index 0000000000..e270ae6b71 --- /dev/null +++ b/src/lib/group_by.ts @@ -0,0 +1,14 @@ +export function groupByContentTopic( + values: T[] +): Map> { + const groupedDecoders = new Map(); + values.forEach((value) => { + let decs = groupedDecoders.get(value.contentTopic); + if (!decs) { + groupedDecoders.set(value.contentTopic, []); + decs = groupedDecoders.get(value.contentTopic); + } + decs.push(value); + }); + return groupedDecoders; +} diff --git a/src/lib/interfaces.ts b/src/lib/interfaces.ts index 1f315dc0fe..4df89f1b47 100644 --- a/src/lib/interfaces.ts +++ b/src/lib/interfaces.ts @@ -6,7 +6,6 @@ import type { Libp2p } from "libp2p"; import type { Protocols } from "./waku"; import type { WakuFilter } from "./waku_filter"; import type { WakuLightPush } from "./waku_light_push"; -import type { DecryptionMethod } from "./waku_message"; import type { WakuRelay } from "./waku_relay"; import type { WakuStore } from "./waku_store"; @@ -29,13 +28,6 @@ export interface Waku { stop(): Promise; isStarted(): boolean; - - addDecryptionKey( - key: Uint8Array | string, - options?: { method?: DecryptionMethod; contentTopics?: string[] } - ): void; - - deleteDecryptionKey(key: Uint8Array | string): void; } export interface WakuLight extends Waku { @@ -58,3 +50,28 @@ export interface WakuFull extends Waku { filter: WakuFilter; lightPush: WakuLightPush; } + +export interface ProtoMessage { + payload?: Uint8Array; + contentTopic?: string; + version?: number; + timestamp?: bigint; +} + +export interface Message { + payload?: Uint8Array; + contentTopic?: string; + timestamp?: Date; +} + +export interface Encoder { + contentTopic: string; + encode: (message: Message) => Promise; + encodeProto: (message: Message) => Promise; +} + +export interface Decoder { + contentTopic: string; + decodeProto: (bytes: Uint8Array) => Promise; + decode: (proto: ProtoMessage) => Promise; +} diff --git a/src/lib/push_or_init_map.spec.ts b/src/lib/push_or_init_map.spec.ts new file mode 100644 index 0000000000..e6ffd1456e --- /dev/null +++ b/src/lib/push_or_init_map.spec.ts @@ -0,0 +1,27 @@ +import { expect } from "chai"; + +import { pushOrInitMapSet } from "./push_or_init_map"; + +describe("pushOrInitMapSet", () => { + it("Init the array if not present", () => { + const map = new Map(); + const key = "key"; + const value = "value"; + + pushOrInitMapSet(map, key, value); + + expect(map.get(key)).to.deep.eq(new Set([value])); + }); + + it("Push to array if already present", () => { + const map = new Map(); + const key = "key"; + const value1 = "value1"; + const value2 = "value2"; + + pushOrInitMapSet(map, key, value1); + pushOrInitMapSet(map, key, value2); + + expect(map.get(key)).to.deep.eq(new Set([value1, value2])); + }); +}); diff --git a/src/lib/push_or_init_map.ts b/src/lib/push_or_init_map.ts new file mode 100644 index 0000000000..1f5d4edcfd --- /dev/null +++ b/src/lib/push_or_init_map.ts @@ -0,0 +1,13 @@ +export function pushOrInitMapSet( + map: Map>, + key: K, + newValue: V +): void { + let arr = map.get(key); + if (typeof arr === "undefined") { + map.set(key, new Set()); + arr = map.get(key) as Set; + } + + arr.add(newValue); +} diff --git a/src/lib/waku.node.spec.ts b/src/lib/waku.node.spec.ts index 9913cfc3fb..a0ec3f1c25 100644 --- a/src/lib/waku.node.spec.ts +++ b/src/lib/waku.node.spec.ts @@ -10,11 +10,12 @@ import { import { createLightNode, createPrivacyNode } from "./create_waku"; import { generateSymmetricKey } from "./crypto"; -import type { Waku, WakuLight, WakuPrivacy } from "./interfaces"; +import type { Message, Waku, WakuLight, WakuPrivacy } from "./interfaces"; import { PeerDiscoveryStaticPeers } from "./peer_discovery_static_list"; +import { bytesToUtf8, utf8ToBytes } from "./utils"; import { waitForRemotePeer } from "./wait_for_remote_peer"; import { Protocols } from "./waku"; -import { WakuMessage } from "./waku_message"; +import { SymDecoder, SymEncoder } from "./waku_message/version_1.js"; const TestContentTopic = "/test/1/waku/utf8"; @@ -158,31 +159,26 @@ describe("Decryption Keys", () => { this.timeout(10000); const symKey = generateSymmetricKey(); + const decoder = new SymDecoder(TestContentTopic, symKey); - waku2.addDecryptionKey(symKey); - + const encoder = new SymEncoder(TestContentTopic, symKey); const messageText = "Message is encrypted"; const messageTimestamp = new Date("1995-12-17T03:24:00"); - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic, - { - timestamp: messageTimestamp, - symKey, - } - ); + const message = { + payload: utf8ToBytes(messageText), + timestamp: messageTimestamp, + }; - const receivedMsgPromise: Promise = new Promise((resolve) => { - waku2.relay.addObserver(resolve); + const receivedMsgPromise: Promise = new Promise((resolve) => { + waku2.relay.addObserver(decoder, resolve); }); - await waku1.relay.send(message); + await waku1.relay.send(encoder, message); const receivedMsg = await receivedMsgPromise; - expect(receivedMsg.contentTopic).to.eq(message.contentTopic); - expect(receivedMsg.version).to.eq(message.version); - expect(receivedMsg.payloadAsUtf8).to.eq(messageText); + expect(receivedMsg.contentTopic).to.eq(TestContentTopic); + expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText); expect(receivedMsg.timestamp?.valueOf()).to.eq(messageTimestamp.valueOf()); }); }); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index dcfb81e2ee..dfb14aa93b 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -10,7 +10,7 @@ import type { Libp2p } from "libp2p"; import { Waku } from "./interfaces"; import { FilterCodec, WakuFilter } from "./waku_filter"; import { LightPushCodec, WakuLightPush } from "./waku_light_push"; -import { DecryptionMethod, WakuMessage } from "./waku_message"; +import { EncoderV0 } from "./waku_message/version_0"; import { WakuRelay } from "./waku_relay"; import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants"; import * as relayConstants from "./waku_relay/constants"; @@ -43,7 +43,6 @@ export interface WakuOptions { * @default {@link DefaultRelayKeepAliveValueSecs} */ relayKeepAlive?: number; - decryptionKeys?: Array; } export class WakuNode implements Waku { @@ -110,10 +109,6 @@ export class WakuNode implements Waku { libp2p.connectionManager.addEventListener("peer:disconnect", (evt) => { this.stopKeepAlive(evt.detail.remotePeer); }); - - options?.decryptionKeys?.forEach((key) => { - this.addDecryptionKey(key); - }); } /** @@ -183,34 +178,6 @@ export class WakuNode implements Waku { return this.libp2p.isStarted(); } - /** - * Register a decryption key to attempt decryption of messages received via - * { @link WakuRelay } and { @link WakuStore }. This can either be a private key for - * asymmetric encryption or a symmetric key. - * - * Strings must be in hex format. - */ - addDecryptionKey( - key: Uint8Array | string, - options?: { method?: DecryptionMethod; contentTopics?: string[] } - ): void { - if (this.relay) this.relay.addDecryptionKey(key, options); - if (this.store) this.store.addDecryptionKey(key, options); - if (this.filter) this.filter.addDecryptionKey(key, options); - } - - /** - * Delete a decryption key that was used to attempt decryption of messages - * received via { @link WakuRelay } or { @link WakuStore }. - * - * Strings must be in hex format. - */ - deleteDecryptionKey(key: Uint8Array | string): void { - if (this.relay) this.relay.deleteDecryptionKey(key); - if (this.store) this.store.deleteDecryptionKey(key); - if (this.filter) this.filter.deleteDecryptionKey(key); - } - /** * Return the local multiaddr with peer id on which libp2p is listening. * @@ -246,11 +213,12 @@ export class WakuNode implements Waku { const relay = this.relay; if (relay && relayPeriodSecs !== 0) { + const encoder = new EncoderV0(RelayPingContentTopic); this.relayKeepAliveTimers[peerIdStr] = setInterval(() => { log("Sending Waku Relay ping message"); - WakuMessage.fromBytes(new Uint8Array(), RelayPingContentTopic).then( - (wakuMsg) => relay.send(wakuMsg) - ); + relay + .send(encoder, { payload: new Uint8Array() }) + .catch((e) => log("Failed to send relay ping", e)); }, relayPeriodSecs * 1000); } } diff --git a/src/lib/waku_filter/index.node.spec.ts b/src/lib/waku_filter/index.node.spec.ts index da0dfdefdf..eea4a72716 100644 --- a/src/lib/waku_filter/index.node.spec.ts +++ b/src/lib/waku_filter/index.node.spec.ts @@ -4,10 +4,11 @@ import debug from "debug"; import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils"; import { delay } from "../../test_utils/delay"; import { createFullNode } from "../create_waku"; -import type { WakuFull } from "../interfaces"; +import type { Message, WakuFull } from "../interfaces"; +import { bytesToUtf8, utf8ToBytes } from "../utils"; import { waitForRemotePeer } from "../wait_for_remote_peer"; import { Protocols } from "../waku"; -import { WakuMessage } from "../waku_message"; +import { DecoderV0, EncoderV0 } from "../waku_message/version_0"; const log = debug("waku:test"); @@ -40,22 +41,25 @@ describe("Waku Filter", () => { let messageCount = 0; const messageText = "Filtering works!"; - const callback = (msg: WakuMessage): void => { + const message = { payload: utf8ToBytes(messageText) }; + + const callback = (msg: Message): void => { log("Got a message"); messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); - expect(msg.payloadAsUtf8).to.eq(messageText); + expect(bytesToUtf8(msg.payload!)).to.eq(messageText); }; - await waku.filter.subscribe(callback, [TestContentTopic]); + + const decoder = new DecoderV0(TestContentTopic); + + await waku.filter.subscribe([decoder], callback); // As the filter protocol does not cater for an ack of subscription // we cannot know whether the subscription happened. Something we want to // correct in future versions of the protocol. await delay(200); - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic - ); - await waku.lightPush.push(message); + + const encoder = new EncoderV0(TestContentTopic); + await waku.lightPush.push(encoder, message); while (messageCount === 0) { await delay(250); } @@ -66,21 +70,21 @@ describe("Waku Filter", () => { this.timeout(10000); let messageCount = 0; - const callback = (msg: WakuMessage): void => { + const callback = (msg: Message): void => { messageCount++; expect(msg.contentTopic).to.eq(TestContentTopic); }; - await waku.filter.subscribe(callback, [TestContentTopic]); + const decoder = new DecoderV0(TestContentTopic); + await waku.filter.subscribe([decoder], callback); + await delay(200); - await waku.lightPush.push( - await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic) - ); - await waku.lightPush.push( - await WakuMessage.fromUtf8String( - "Filtering still works!", - TestContentTopic - ) - ); + const encoder = new EncoderV0(TestContentTopic); + await waku.lightPush.push(encoder, { + payload: utf8ToBytes("Filtering works!"), + }); + await waku.lightPush.push(encoder, { + payload: utf8ToBytes("Filtering still works!"), + }); while (messageCount < 2) { await delay(250); } @@ -92,25 +96,21 @@ describe("Waku Filter", () => { const callback = (): void => { messageCount++; }; - const unsubscribe = await waku.filter.subscribe(callback, [ - TestContentTopic, - ]); + const decoder = new DecoderV0(TestContentTopic); + const unsubscribe = await waku.filter.subscribe([decoder], callback); + + const encoder = new EncoderV0(TestContentTopic); + await delay(200); - await waku.lightPush.push( - await WakuMessage.fromUtf8String( - "This should be received", - TestContentTopic - ) - ); + await waku.lightPush.push(encoder, { + payload: utf8ToBytes("This should be received"), + }); await delay(100); await unsubscribe(); await delay(200); - await waku.lightPush.push( - await WakuMessage.fromUtf8String( - "This should not be received", - TestContentTopic - ) - ); + await waku.lightPush.push(encoder, { + payload: utf8ToBytes("This should not be received"), + }); await delay(100); expect(messageCount).to.eq(1); }); diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index e2fa36ca06..094c7d7e02 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -10,14 +10,14 @@ import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; +import { groupByContentTopic } from "../group_by"; +import { Decoder, Message } from "../interfaces"; import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectPeerForProtocol, selectRandomPeer, } from "../select_peer"; -import { hexToBytes } from "../utils"; -import { DecryptionMethod, WakuMessage } from "../waku_message"; import { ContentFilter, FilterRPC } from "./filter_rpc"; export { ContentFilter }; @@ -49,7 +49,7 @@ export type FilterSubscriptionOpts = { peerId?: PeerId; }; -export type FilterCallback = (msg: WakuMessage) => void | Promise; +export type FilterCallback = (msg: Message) => void | Promise; export type UnsubscribeFunction = () => Promise; @@ -63,14 +63,14 @@ export type UnsubscribeFunction = () => Promise; export class WakuFilter { pubSubTopic: string; private subscriptions: Map; - public decryptionKeys: Map< - Uint8Array, - { method?: DecryptionMethod; contentTopics?: string[] } + public decoders: Map< + string, // content topic + Set >; constructor(public libp2p: Libp2p, options?: CreateOptions) { this.subscriptions = new Map(); - this.decryptionKeys = new Map(); + this.decoders = new Map(); this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; this.libp2p .handle(FilterCodec, this.onRequest.bind(this)) @@ -78,17 +78,21 @@ export class WakuFilter { } /** - * @param contentTopics Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter. + * @param decoders Array of Decoders to use to decode messages, it also specifies the content topics. * @param callback A function that will be called on each message returned by the filter. * @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to. * @returns Unsubscribe function that can be used to end the subscription. */ async subscribe( + decoders: Decoder[], callback: FilterCallback, - contentTopics: string[], opts?: FilterSubscriptionOpts ): Promise { const topic = opts?.pubsubTopic ?? this.pubSubTopic; + + const groupedDecoders = groupByContentTopic(decoders); + const contentTopics = Array.from(groupedDecoders.keys()); + const contentFilters = contentTopics.map((contentTopic) => ({ contentTopic, })); @@ -130,11 +134,13 @@ export class WakuFilter { throw e; } + this.addDecoders(groupedDecoders); this.addCallback(requestId, callback); return async () => { await this.unsubscribe(topic, contentFilters, requestId, peer); - this.removeCallback(requestId); + this.deleteDecoders(groupedDecoders); + this.deleteCallback(requestId); }; } @@ -171,23 +177,35 @@ export class WakuFilter { return; } - const decryptionParams = Array.from(this.decryptionKeys).map( - ([key, { method, contentTopics }]) => { - return { - key, - method, - contentTopics, - }; + for (const protoMessage of messages) { + const contentTopic = protoMessage.contentTopic; + if (!contentTopic) { + log("Message has no content topic, skipping"); + return; } - ); - for (const message of messages) { - const decoded = await WakuMessage.decodeProto(message, decryptionParams); - if (!decoded) { - log("Not able to decode message"); - continue; + const decoders = this.decoders.get(contentTopic); + if (!decoders) { + log("No decoder for", contentTopic); + return; } - callback(decoded); + + let msg: Message | undefined; + // We don't want to wait for decoding failure, just attempt to decode + // all messages and do the call back on the one that works + // noinspection ES6MissingAwait + decoders.forEach(async (dec) => { + if (msg) return; + const decoded = await dec.decode(protoMessage); + if (!decoded) { + log("Not able to decode message"); + return; + } + // This is just to prevent more decoding attempt + // TODO: Could be better if we were to abort promises + msg = decoded; + await callback(decoded); + }); } } @@ -195,10 +213,32 @@ export class WakuFilter { this.subscriptions.set(requestId, callback); } - private removeCallback(requestId: string): void { + private deleteCallback(requestId: string): void { this.subscriptions.delete(requestId); } + private addDecoders(decoders: Map>): void { + decoders.forEach((decoders, contentTopic) => { + const currDecs = this.decoders.get(contentTopic); + if (!currDecs) { + this.decoders.set(contentTopic, new Set(decoders)); + } else { + this.decoders.set(contentTopic, new Set([...currDecs, ...decoders])); + } + }); + } + + private deleteDecoders(decoders: Map>): void { + decoders.forEach((decoders, contentTopic) => { + const currDecs = this.decoders.get(contentTopic); + if (currDecs) { + decoders.forEach((dec) => { + currDecs.delete(dec); + }); + } + }); + } + private async unsubscribe( topic: string, contentFilters: ContentFilter[], @@ -243,30 +283,6 @@ export class WakuFilter { return res.peer; } - /** - * Register a decryption key to attempt decryption of messages received in any - * subsequent { @link subscribe } call. This can either be a private key for - * asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to - * decrypt messages using both methods. - * - * Strings must be in hex format. - */ - addDecryptionKey( - key: Uint8Array | string, - options?: { method?: DecryptionMethod; contentTopics?: string[] } - ): void { - this.decryptionKeys.set(hexToBytes(key), options ?? {}); - } - - /** - * Delete a decryption key so that it cannot be used in future { @link subscribe } calls - * - * Strings must be in hex format. - */ - deleteDecryptionKey(key: Uint8Array | string): void { - this.decryptionKeys.delete(hexToBytes(key)); - } - async peers(): Promise { return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]); } diff --git a/src/lib/waku_light_push/index.node.spec.ts b/src/lib/waku_light_push/index.node.spec.ts index f0c16013f9..9cb96e6f6a 100644 --- a/src/lib/waku_light_push/index.node.spec.ts +++ b/src/lib/waku_light_push/index.node.spec.ts @@ -10,10 +10,10 @@ import { import { delay } from "../../test_utils/delay"; import { createFullNode } from "../create_waku"; import type { WakuFull } from "../interfaces"; -import { bytesToUtf8 } from "../utils"; +import { bytesToUtf8, utf8ToBytes } from "../utils"; import { waitForRemotePeer } from "../wait_for_remote_peer"; import { Protocols } from "../waku"; -import { WakuMessage } from "../waku_message"; +import { EncoderV0 } from "../waku_message/version_0"; const log = debug("waku:test:lightpush"); @@ -42,12 +42,11 @@ describe("Waku Light Push [node only]", () => { await waitForRemotePeer(waku, [Protocols.LightPush]); const messageText = "Light Push works!"; - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic - ); + const encoder = new EncoderV0(TestContentTopic); - const pushResponse = await waku.lightPush.push(message); + const pushResponse = await waku.lightPush.push(encoder, { + payload: utf8ToBytes(messageText), + }); expect(pushResponse?.isSuccess).to.be.true; let msgs: MessageRpcResponse[] = []; @@ -57,8 +56,7 @@ describe("Waku Light Push [node only]", () => { msgs = await nwaku.messages(); } - expect(msgs[0].contentTopic).to.equal(message.contentTopic); - expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].contentTopic).to.equal(TestContentTopic); expect(bytesToUtf8(new Uint8Array(msgs[0].payload))).to.equal(messageText); }); @@ -81,16 +79,17 @@ describe("Waku Light Push [node only]", () => { const nimPeerId = await nwaku.getPeerId(); const messageText = "Light Push works!"; - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic - ); + const encoder = new EncoderV0(TestContentTopic); log("Send message via lightpush"); - const pushResponse = await waku.lightPush.push(message, { - peerId: nimPeerId, - pubSubTopic: customPubSubTopic, - }); + const pushResponse = await waku.lightPush.push( + encoder, + { payload: utf8ToBytes(messageText) }, + { + peerId: nimPeerId, + pubSubTopic: customPubSubTopic, + } + ); log("Ack received", pushResponse); expect(pushResponse?.isSuccess).to.be.true; @@ -102,8 +101,7 @@ describe("Waku Light Push [node only]", () => { msgs = await nwaku.messages(customPubSubTopic); } - expect(msgs[0].contentTopic).to.equal(message.contentTopic); - expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].contentTopic).to.equal(TestContentTopic); expect(bytesToUtf8(new Uint8Array(msgs[0].payload))!).to.equal(messageText); }); }); diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 0c3db3bf20..93ae0a6157 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -9,13 +9,13 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; +import { Encoder, Message } from "../interfaces"; import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectPeerForProtocol, selectRandomPeer, } from "../select_peer"; -import { WakuMessage } from "../waku_message"; import { PushRPC } from "./push_rpc"; @@ -52,9 +52,12 @@ export class WakuLightPush { } async push( - message: WakuMessage, + encoder: Encoder, + message: Message, opts?: PushOptions - ): Promise { + ): Promise { + const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; + const res = await selectPeerForProtocol( this.libp2p.peerStore, [LightPushCodec], @@ -73,10 +76,12 @@ export class WakuLightPush { const stream = await connection.newStream(LightPushCodec); try { - const pubSubTopic = opts?.pubSubTopic - ? opts.pubSubTopic - : this.pubSubTopic; - const query = PushRPC.createRequest(message, pubSubTopic); + const protoMessage = await encoder.encodeProto(message); + if (!protoMessage) { + log("Failed to encode to protoMessage, aborting push"); + return; + } + const query = PushRPC.createRequest(protoMessage, pubSubTopic); const res = await pipe( [query.encode()], lp.encode(), @@ -94,7 +99,7 @@ export class WakuLightPush { if (!response) { log("No response in PushRPC"); - return null; + return; } return response; @@ -104,7 +109,7 @@ export class WakuLightPush { } catch (err) { log("Failed to send waku light push request", err); } - return null; + return; } /** diff --git a/src/lib/waku_light_push/push_rpc.ts b/src/lib/waku_light_push/push_rpc.ts index e610043e9c..31ce6c94fb 100644 --- a/src/lib/waku_light_push/push_rpc.ts +++ b/src/lib/waku_light_push/push_rpc.ts @@ -2,16 +2,18 @@ import type { Uint8ArrayList } from "uint8arraylist"; import { v4 as uuid } from "uuid"; import * as proto from "../../proto/light_push"; -import { WakuMessage } from "../waku_message"; export class PushRPC { public constructor(public proto: proto.PushRPC) {} - static createRequest(message: WakuMessage, pubSubTopic: string): PushRPC { + static createRequest( + message: proto.WakuMessage, + pubSubTopic: string + ): PushRPC { return new PushRPC({ requestId: uuid(), request: { - message: message.proto, + message: message, pubSubTopic: pubSubTopic, }, response: undefined, diff --git a/src/lib/waku_message/index.node.spec.ts b/src/lib/waku_message/index.node.spec.ts deleted file mode 100644 index 8bb35a3502..0000000000 --- a/src/lib/waku_message/index.node.spec.ts +++ /dev/null @@ -1,191 +0,0 @@ -import { expect } from "chai"; -import debug from "debug"; - -import { - makeLogFileName, - MessageRpcQuery, - MessageRpcResponseHex, - NOISE_KEY_1, - Nwaku, -} from "../../test_utils"; -import { delay } from "../../test_utils/delay"; -import { createPrivacyNode } from "../create_waku"; -import { - generatePrivateKey, - generateSymmetricKey, - getPublicKey, -} from "../crypto"; -import type { WakuPrivacy } from "../interfaces"; -import { bytesToHex, bytesToUtf8, hexToBytes, utf8ToBytes } from "../utils"; -import { waitForRemotePeer } from "../wait_for_remote_peer"; -import { Protocols } from "../waku"; - -import { DecryptionMethod, WakuMessage } from "./index"; - -const log = debug("waku:test:message"); - -const TestContentTopic = "/test/1/waku-message/utf8"; - -describe("Waku Message [node only]", function () { - describe("Interop: nwaku", function () { - let waku: WakuPrivacy; - let nwaku: Nwaku; - - beforeEach(async function () { - this.timeout(30_000); - waku = await createPrivacyNode({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - - nwaku = new Nwaku(makeLogFileName(this)); - log("Starting nwaku node"); - await nwaku.start({ rpcPrivate: true }); - - log("Dialing to nwaku node"); - await waku.dial(await nwaku.getMultiaddrWithId()); - log("Wait for remote peer"); - await waitForRemotePeer(waku, [Protocols.Relay]); - log("Remote peer ready"); - // As this test uses the nwaku RPC API, we somehow often face - // Race conditions where the nwaku node does not have the js-waku - // Node in its relay mesh just yet. - await delay(500); - }); - - afterEach(async function () { - !!nwaku && nwaku.stop(); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Decrypts nwaku message [asymmetric, no signature]", async function () { - this.timeout(5000); - - const messageText = "Here is an encrypted message."; - const message: MessageRpcQuery = { - contentTopic: TestContentTopic, - payload: bytesToHex(utf8ToBytes(messageText)), - }; - - const privateKey = generatePrivateKey(); - - waku.relay.addDecryptionKey(privateKey, { - method: DecryptionMethod.Asymmetric, - }); - - const receivedMsgPromise: Promise = new Promise( - (resolve) => { - waku.relay.addObserver(resolve); - } - ); - - const publicKey = getPublicKey(privateKey); - log("Post message"); - const res = await nwaku.postAsymmetricMessage(message, publicKey); - expect(res).to.be.true; - - const receivedMsg = await receivedMsgPromise; - - expect(receivedMsg.contentTopic).to.eq(message.contentTopic); - expect(receivedMsg.version).to.eq(1); - expect(receivedMsg.payloadAsUtf8).to.eq(messageText); - }); - - it("Encrypts message for nwaku [asymmetric, no signature]", async function () { - this.timeout(5000); - - log("Ask nwaku to generate asymmetric key pair"); - const keyPair = await nwaku.getAsymmetricKeyPair(); - const privateKey = hexToBytes(keyPair.privateKey); - const publicKey = hexToBytes(keyPair.publicKey); - - const messageText = "This is a message I am going to encrypt"; - log("Encrypt message"); - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic, - { - encPublicKey: publicKey, - } - ); - - log("Send message over relay"); - await waku.relay.send(message); - - let msgs: MessageRpcResponseHex[] = []; - - while (msgs.length === 0) { - log("Wait for message to be seen by nwaku"); - await delay(200); - msgs = await nwaku.getAsymmetricMessages(privateKey); - } - - log("Check message content"); - expect(msgs[0].contentTopic).to.equal(message.contentTopic); - expect(bytesToUtf8(hexToBytes(msgs[0].payload))).to.equal(messageText); - }); - - it("Decrypts nwaku message [symmetric, no signature]", async function () { - this.timeout(5000); - - const messageText = "Here is a message encrypted in a symmetric manner."; - const message: MessageRpcQuery = { - contentTopic: TestContentTopic, - payload: bytesToHex(utf8ToBytes(messageText)), - }; - - log("Generate symmetric key"); - const symKey = generateSymmetricKey(); - - waku.relay.addDecryptionKey(symKey, { - method: DecryptionMethod.Symmetric, - }); - - const receivedMsgPromise: Promise = new Promise( - (resolve) => { - waku.relay.addObserver(resolve); - } - ); - - log("Post message using nwaku"); - await nwaku.postSymmetricMessage(message, symKey); - log("Wait for message to be received by js-waku"); - const receivedMsg = await receivedMsgPromise; - log("Message received by js-waku"); - - expect(receivedMsg.contentTopic).to.eq(message.contentTopic); - expect(receivedMsg.version).to.eq(1); - expect(receivedMsg.payloadAsUtf8).to.eq(messageText); - }); - - it("Encrypts message for nwaku [symmetric, no signature]", async function () { - this.timeout(5000); - - log("Getting symmetric key from nwaku"); - const symKey = await nwaku.getSymmetricKey(); - log("Encrypting message with js-waku"); - const messageText = - "This is a message I am going to encrypt with a symmetric key"; - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic, - { - symKey: symKey, - } - ); - log("Sending message over relay"); - await waku.relay.send(message); - - let msgs: MessageRpcResponseHex[] = []; - - while (msgs.length === 0) { - await delay(200); - log("Getting messages from nwaku"); - msgs = await nwaku.getSymmetricMessages(symKey); - } - - expect(msgs[0].contentTopic).to.equal(message.contentTopic); - expect(bytesToUtf8(hexToBytes(msgs[0].payload))).to.equal(messageText); - }); - }); -}); diff --git a/src/lib/waku_message/index.spec.ts b/src/lib/waku_message/index.spec.ts deleted file mode 100644 index bd5297f560..0000000000 --- a/src/lib/waku_message/index.spec.ts +++ /dev/null @@ -1,138 +0,0 @@ -import { expect } from "chai"; -import fc from "fast-check"; - -import { getPublicKey } from "../crypto"; - -import { WakuMessage } from "./index"; - -const TestContentTopic = "/test/1/waku-message/utf8"; - -describe("Waku Message: Browser & Node", function () { - it("Waku message round trip binary serialization [clear]", async function () { - await fc.assert( - fc.asyncProperty(fc.string(), async (s) => { - const msg = await WakuMessage.fromUtf8String(s, TestContentTopic); - const binary = msg.encode(); - const actual = await WakuMessage.decode(binary); - - expect(actual).to.deep.equal(msg); - }) - ); - }); - - it("Payload to utf-8", async function () { - await fc.assert( - fc.asyncProperty(fc.string(), async (s) => { - const msg = await WakuMessage.fromUtf8String(s, TestContentTopic); - const utf8 = msg.payloadAsUtf8; - - return utf8 === s; - }) - ); - }); - - it("Waku message round trip binary encryption [asymmetric, no signature]", async function () { - await fc.assert( - fc.asyncProperty( - fc.uint8Array({ minLength: 1 }), - fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), - async (payload, key) => { - const publicKey = getPublicKey(key); - - const msg = await WakuMessage.fromBytes(payload, TestContentTopic, { - encPublicKey: publicKey, - }); - - const wireBytes = msg.encode(); - const actual = await WakuMessage.decode(wireBytes, [{ key }]); - - expect(actual?.payload).to.deep.equal(payload); - } - ) - ); - }); - - it("Waku message round trip binary encryption [asymmetric, signature]", async function () { - this.timeout(4000); - - await fc.assert( - fc.asyncProperty( - fc.uint8Array({ minLength: 1 }), - fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), - fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), - async (payload, sigPrivKey, encPrivKey) => { - const sigPubKey = getPublicKey(sigPrivKey); - const encPubKey = getPublicKey(encPrivKey); - - const msg = await WakuMessage.fromBytes(payload, TestContentTopic, { - encPublicKey: encPubKey, - sigPrivKey: sigPrivKey, - }); - - const wireBytes = msg.encode(); - const actual = await WakuMessage.decode(wireBytes, [ - { key: encPrivKey }, - ]); - - expect(actual?.payload).to.deep.equal(payload); - expect(actual?.signaturePublicKey).to.deep.equal(sigPubKey); - } - ) - ); - }); - - it("Waku message round trip binary encryption [symmetric, no signature]", async function () { - await fc.assert( - fc.asyncProperty( - fc.uint8Array({ minLength: 1 }), - fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), - async (payload, key) => { - const msg = await WakuMessage.fromBytes(payload, TestContentTopic, { - symKey: key, - }); - - const wireBytes = msg.encode(); - const actual = await WakuMessage.decode(wireBytes, [{ key }]); - - expect(actual?.payload).to.deep.equal(payload); - } - ) - ); - }); - - it("Waku message round trip binary encryption [symmetric, signature]", async function () { - await fc.assert( - fc.asyncProperty( - fc.uint8Array({ minLength: 1 }), - fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), - fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), - async (payload, sigPrivKey, symKey) => { - const sigPubKey = getPublicKey(sigPrivKey); - - const msg = await WakuMessage.fromBytes(payload, TestContentTopic, { - symKey: symKey, - sigPrivKey: sigPrivKey, - }); - - const wireBytes = msg.encode(); - const actual = await WakuMessage.decode(wireBytes, [{ key: symKey }]); - - expect(actual?.payload).to.deep.equal(payload); - expect(actual?.signaturePublicKey).to.deep.equal(sigPubKey); - } - ) - ); - }); - - it("Waku message round trip utf-8 including emojis", async function () { - const messageText = "😁🤣🥧🤦👩‍🎓"; - const wakuMessage = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic - ); - - const decodedText = wakuMessage.payloadAsUtf8; - - expect(decodedText).to.eq(messageText); - }); -}); diff --git a/src/lib/waku_message/index.ts b/src/lib/waku_message/index.ts deleted file mode 100644 index fa6634059e..0000000000 --- a/src/lib/waku_message/index.ts +++ /dev/null @@ -1,312 +0,0 @@ -import debug from "debug"; - -import * as proto from "../../proto/message"; -import { bytesToUtf8, utf8ToBytes } from "../utils"; - -import * as version_1 from "./version_1"; - -const DefaultVersion = 0; -const log = debug("waku:message"); -const OneMillion = BigInt(1_000_000); - -export enum DecryptionMethod { - Asymmetric = "asymmetric", - Symmetric = "symmetric", -} - -export interface Options { - /** - * Timestamp to set on the message, defaults to now if not passed. - */ - timestamp?: Date; - /** - * Public Key to use to encrypt the messages using ECIES (Asymmetric Encryption). - * - * @throws if both `encPublicKey` and `symKey` are passed - */ - encPublicKey?: Uint8Array | string; - /** - * Key to use to encrypt the messages using AES (Symmetric Encryption). - * - * @throws if both `encPublicKey` and `symKey` are passed - */ - symKey?: Uint8Array | string; - /** - * Private key to use to sign the message, either `encPublicKey` or `symKey` must be provided as only - * encrypted messages are signed. - */ - sigPrivKey?: Uint8Array; -} - -export interface DecryptionParams { - key: Uint8Array; - method?: DecryptionMethod; - contentTopics?: string[]; -} - -export class WakuMessage { - private constructor( - public proto: proto.WakuMessage, - private _signaturePublicKey?: Uint8Array, - private _signature?: Uint8Array - ) {} - - /** - * Create Message with an utf-8 string as payload. - */ - static async fromUtf8String( - utf8: string, - contentTopic: string, - opts?: Options - ): Promise { - const payload = utf8ToBytes(utf8); - return WakuMessage.fromBytes(payload, contentTopic, opts); - } - - /** - * Create a Waku Message with the given payload. - * - * By default, the payload is kept clear (version 0). - * If `opts.encPublicKey` is passed, the payload is encrypted using - * asymmetric encryption (version 1). - * - * If `opts.sigPrivKey` is passed and version 1 is used, the payload is signed - * before encryption. - * - * @throws if both `opts.encPublicKey` and `opt.symKey` are passed - */ - static async fromBytes( - payload: Uint8Array, - contentTopic: string, - opts?: Options - ): Promise { - const { timestamp, encPublicKey, symKey, sigPrivKey } = Object.assign( - { timestamp: new Date() }, - opts ? opts : {} - ); - - let _payload = payload; - let version = DefaultVersion; - let sig; - - if (encPublicKey && symKey) { - throw "Pass either `encPublicKey` or `symKey`, not both."; - } - - if (encPublicKey) { - const enc = await version_1.clearEncode(_payload, sigPrivKey); - _payload = await version_1.encryptAsymmetric(enc.payload, encPublicKey); - sig = enc.sig; - version = 1; - } else if (symKey) { - const enc = await version_1.clearEncode(_payload, sigPrivKey); - _payload = await version_1.encryptSymmetric(enc.payload, symKey); - sig = enc.sig; - version = 1; - } - - return new WakuMessage( - { - payload: _payload, - timestampDeprecated: timestamp.valueOf() / 1000, - // milliseconds 10^-3 to nanoseconds 10^-9 - timestamp: BigInt(timestamp.valueOf()) * OneMillion, - version, - contentTopic, - }, - sig?.publicKey, - sig?.signature - ); - } - - /** - * Decode a byte array into Waku Message. - * - * @params bytes The message encoded using protobuf as defined in [14/WAKU2-MESSAGE](https://rfc.vac.dev/spec/14/). - * @params decryptionKeys If the payload is encrypted (version = 1), then the - * keys are used to attempt decryption of the message. The passed key can either - * be asymmetric private keys or symmetric keys, both method are tried for each - * key until the message is decrypted or combinations are run out. - */ - static async decode( - bytes: Uint8Array, - decryptionParams?: DecryptionParams[] - ): Promise { - const protoBuf = proto.WakuMessage.decode(bytes); - - return WakuMessage.decodeProto(protoBuf, decryptionParams); - } - - /** - * Decode and decrypt Waku Message Protobuf Object into Waku Message. - * - * @params protoBuf The message to decode and decrypt. - * @params decryptionParams If the payload is encrypted (version = 1), then the - * keys are used to attempt decryption of the message. The passed key can either - * be asymmetric private keys or symmetric keys, both method are tried for each - * key until the message is decrypted or combinations are run out. - */ - static async decodeProto( - protoBuf: proto.WakuMessage, - decryptionParams?: DecryptionParams[] - ): Promise { - const payload = protoBuf.payload; - - let signaturePublicKey; - let signature; - if (protoBuf.version === 1 && payload) { - if (decryptionParams === undefined) { - log("Payload is encrypted but no private keys have been provided."); - return; - } - - // Returns a bunch of `undefined` and hopefully one decrypted result - const allResults = await Promise.all( - decryptionParams.map(async ({ key, method, contentTopics }) => { - if ( - !contentTopics || - (protoBuf.contentTopic && - contentTopics.includes(protoBuf.contentTopic)) - ) { - switch (method) { - case DecryptionMethod.Asymmetric: - try { - return await version_1.decryptAsymmetric(payload, key); - } catch (e) { - log( - "Failed to decrypt message using asymmetric encryption despite decryption method being specified", - e - ); - return; - } - case DecryptionMethod.Symmetric: - try { - return await version_1.decryptSymmetric(payload, key); - } catch (e) { - log( - "Failed to decrypt message using asymmetric encryption despite decryption method being specified", - e - ); - return; - } - default: - try { - return await version_1.decryptSymmetric(payload, key); - } catch (e) { - log( - "Failed to decrypt message using symmetric encryption", - e - ); - try { - return await version_1.decryptAsymmetric(payload, key); - } catch (e) { - log( - "Failed to decrypt message using asymmetric encryption", - e - ); - return; - } - } - } - } else { - // No key available for this content topic - return; - } - }) - ); - - const isDefined = (dec: Uint8Array | undefined): dec is Uint8Array => { - return !!dec; - }; - - const decodedResults = allResults.filter(isDefined); - - if (decodedResults.length === 0) { - log("Failed to decrypt payload."); - return; - } - const dec = decodedResults[0]; - - const res = await version_1.clearDecode(dec); - if (!res) { - log("Failed to decode payload."); - return; - } - Object.assign(protoBuf, { payload: res.payload }); - signaturePublicKey = res.sig?.publicKey; - signature = res.sig?.signature; - } - - return new WakuMessage(protoBuf, signaturePublicKey, signature); - } - - encode(): Uint8Array { - return proto.WakuMessage.encode(this.proto); - } - - get payloadAsUtf8(): string { - if (!this.payload) { - return ""; - } - - try { - return bytesToUtf8(this.payload); - } catch (e) { - log("Could not decode byte as UTF-8", e); - return ""; - } - } - - get payload(): Uint8Array | undefined { - if (this.proto.payload) { - return new Uint8Array(this.proto.payload); - } - return; - } - - get contentTopic(): string | undefined { - return this.proto.contentTopic; - } - - get version(): number { - // https://github.com/status-im/js-waku/issues/921 - return this.proto.version ?? 0; - } - - get timestamp(): Date | undefined { - // In the case we receive a value that is bigger than JS's max number, - // we catch the error and return undefined. - try { - if (this.proto.timestamp) { - // nanoseconds 10^-9 to milliseconds 10^-3 - const timestamp = this.proto.timestamp / OneMillion; - return new Date(Number(timestamp)); - } - - if (this.proto.timestampDeprecated) { - return new Date(this.proto.timestampDeprecated * 1000); - } - } catch (e) { - return; - } - return; - } - - /** - * The public key used to sign the message. - * - * MAY be present if the message is version 1. - */ - get signaturePublicKey(): Uint8Array | undefined { - return this._signaturePublicKey; - } - - /** - * The signature of the message. - * - * MAY be present if the message is version 1. - */ - get signature(): Uint8Array | undefined { - return this._signature; - } -} diff --git a/src/lib/waku_message/version_0.spec.ts b/src/lib/waku_message/version_0.spec.ts new file mode 100644 index 0000000000..1eaa79550f --- /dev/null +++ b/src/lib/waku_message/version_0.spec.ts @@ -0,0 +1,25 @@ +import { expect } from "chai"; +import fc from "fast-check"; + +import { DecoderV0, EncoderV0, MessageV0 } from "./version_0"; + +const TestContentTopic = "/test/1/waku-message/utf8"; + +describe("Waku Message version 0", function () { + it("Round trip binary serialization", async function () { + await fc.assert( + fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => { + const encoder = new EncoderV0(TestContentTopic); + const bytes = await encoder.encode({ payload }); + const decoder = new DecoderV0(TestContentTopic); + const protoResult = await decoder.decodeProto(bytes); + const result = (await decoder.decode(protoResult!)) as MessageV0; + + expect(result.contentTopic).to.eq(TestContentTopic); + expect(result.version).to.eq(0); + expect(result.payload).to.deep.eq(payload); + expect(result.timestamp).to.not.be.undefined; + }) + ); + }); +}); diff --git a/src/lib/waku_message/version_0.ts b/src/lib/waku_message/version_0.ts new file mode 100644 index 0000000000..8ba75a0713 --- /dev/null +++ b/src/lib/waku_message/version_0.ts @@ -0,0 +1,105 @@ +import debug from "debug"; + +import * as proto from "../../proto/message"; +import { Decoder, Message, ProtoMessage } from "../interfaces"; +import { Encoder } from "../interfaces"; + +const log = debug("waku:message:version-0"); + +const OneMillion = BigInt(1_000_000); +export const Version = 0; + +export class MessageV0 implements Message { + constructor(private proto: proto.WakuMessage) {} + + get _rawPayload(): Uint8Array | undefined { + if (this.proto.payload) { + return new Uint8Array(this.proto.payload); + } + return; + } + + get payload(): Uint8Array | undefined { + return this._rawPayload; + } + + get contentTopic(): string | undefined { + return this.proto.contentTopic; + } + + get _rawTimestamp(): bigint | undefined { + return this.proto.timestamp; + } + + get timestamp(): Date | undefined { + // In the case we receive a value that is bigger than JS's max number, + // we catch the error and return undefined. + try { + if (this.proto.timestamp) { + // nanoseconds 10^-9 to milliseconds 10^-3 + const timestamp = this.proto.timestamp / OneMillion; + return new Date(Number(timestamp)); + } + + if (this.proto.timestampDeprecated) { + return new Date(this.proto.timestampDeprecated * 1000); + } + } catch (e) { + return; + } + return; + } + + get version(): number { + // https://github.com/status-im/js-waku/issues/921 + return this.proto.version ?? 0; + } +} + +export class EncoderV0 implements Encoder { + constructor(public contentTopic: string) {} + + async encode(message: Message): Promise { + return proto.WakuMessage.encode(await this.encodeProto(message)); + } + + async encodeProto(message: Message): Promise { + const timestamp = message.timestamp ?? new Date(); + + return { + payload: message.payload, + version: Version, + contentTopic: message.contentTopic ?? this.contentTopic, + timestamp: BigInt(timestamp.valueOf()) * OneMillion, + }; + } +} + +export class DecoderV0 implements Decoder { + constructor(public contentTopic: string) {} + + decodeProto(bytes: Uint8Array): Promise { + const protoMessage = proto.WakuMessage.decode(bytes); + log("Message decoded", protoMessage); + return Promise.resolve(protoMessage); + } + + async decode(proto: ProtoMessage): Promise { + // https://github.com/status-im/js-waku/issues/921 + if (proto.version === undefined) { + proto.version = 0; + } + + if (proto.version !== Version) { + log( + "Failed to decode due to incorrect version, expected:", + Version, + ", actual:", + proto.version + ); + return Promise.resolve(undefined); + } + + return new MessageV0(proto); + } +} diff --git a/src/lib/waku_message/version_1.spec.ts b/src/lib/waku_message/version_1.spec.ts index ddf756a19c..00574d5ae7 100644 --- a/src/lib/waku_message/version_1.spec.ts +++ b/src/lib/waku_message/version_1.spec.ts @@ -2,46 +2,140 @@ import { expect } from "chai"; import fc from "fast-check"; import { getPublicKey } from "../crypto"; -import { bytesToHex } from "../utils"; import { - clearDecode, - clearEncode, + AsymDecoder, + AsymEncoder, decryptAsymmetric, decryptSymmetric, encryptAsymmetric, encryptSymmetric, + postCipher, + preCipher, + SymDecoder, + SymEncoder, } from "./version_1"; -describe("Waku Message Version 1", function () { - it("Sign & Recover", function () { - fc.assert( +const TestContentTopic = "/test/1/waku-message/utf8"; + +describe("Waku Message version 1", function () { + it("Round trip binary encryption [asymmetric, no signature]", async function () { + await fc.assert( fc.asyncProperty( - fc.uint8Array(), - fc.uint8Array({ minLength: 32, maxLength: 32 }), - async (message, privKey) => { - const enc = await clearEncode(message, privKey); - const res = clearDecode(enc.payload); + fc.uint8Array({ minLength: 1 }), + fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), + async (payload, privateKey) => { + const publicKey = getPublicKey(privateKey); - const pubKey = getPublicKey(privKey); + const encoder = new AsymEncoder(TestContentTopic, publicKey); + const bytes = await encoder.encode({ payload }); - expect(res?.payload).deep.equal( - message, - "Payload was not encrypted then decrypted correctly" - ); - expect(res?.sig?.publicKey).deep.equal( - pubKey, - "signature Public key was not recovered from encrypted then decrypted signature" - ); - expect(enc?.sig?.publicKey).deep.equal( - pubKey, - "Incorrect signature public key was returned when signing the payload" - ); + const decoder = new AsymDecoder(TestContentTopic, privateKey); + const protoResult = await decoder.decodeProto(bytes!); + if (!protoResult) throw "Failed to proto decode"; + const result = await decoder.decode(protoResult); + if (!result) throw "Failed to decode"; + + expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.version).to.equal(1); + expect(result?.payload).to.deep.equal(payload); + expect(result.signature).to.be.undefined; + expect(result.signaturePublicKey).to.be.undefined; } ) ); }); + it("R trip binary encryption [asymmetric, signature]", async function () { + this.timeout(4000); + + await fc.assert( + fc.asyncProperty( + fc.uint8Array({ minLength: 1 }), + fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), + fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), + async (payload, alicePrivateKey, bobPrivateKey) => { + const alicePublicKey = getPublicKey(alicePrivateKey); + const bobPublicKey = getPublicKey(bobPrivateKey); + + const encoder = new AsymEncoder( + TestContentTopic, + bobPublicKey, + alicePrivateKey + ); + const bytes = await encoder.encode({ payload }); + + const decoder = new AsymDecoder(TestContentTopic, bobPrivateKey); + const protoResult = await decoder.decodeProto(bytes!); + if (!protoResult) throw "Failed to proto decode"; + const result = await decoder.decode(protoResult); + if (!result) throw "Failed to decode"; + + expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.version).to.equal(1); + expect(result?.payload).to.deep.equal(payload); + expect(result.signature).to.not.be.undefined; + expect(result.signaturePublicKey).to.deep.eq(alicePublicKey); + } + ) + ); + }); + + it("Round trip binary encryption [symmetric, no signature]", async function () { + await fc.assert( + fc.asyncProperty( + fc.uint8Array({ minLength: 1 }), + fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), + async (payload, symKey) => { + const encoder = new SymEncoder(TestContentTopic, symKey); + const bytes = await encoder.encode({ payload }); + + const decoder = new SymDecoder(TestContentTopic, symKey); + const protoResult = await decoder.decodeProto(bytes!); + if (!protoResult) throw "Failed to proto decode"; + const result = await decoder.decode(protoResult); + if (!result) throw "Failed to decode"; + + expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.version).to.equal(1); + expect(result?.payload).to.deep.equal(payload); + expect(result.signature).to.be.undefined; + expect(result.signaturePublicKey).to.be.undefined; + } + ) + ); + }); + + it("Round trip binary encryption [symmetric, signature]", async function () { + await fc.assert( + fc.asyncProperty( + fc.uint8Array({ minLength: 1 }), + fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), + fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }), + async (payload, sigPrivKey, symKey) => { + const sigPubKey = getPublicKey(sigPrivKey); + + const encoder = new SymEncoder(TestContentTopic, symKey, sigPrivKey); + const bytes = await encoder.encode({ payload }); + + const decoder = new SymDecoder(TestContentTopic, symKey); + const protoResult = await decoder.decodeProto(bytes!); + if (!protoResult) throw "Failed to proto decode"; + const result = await decoder.decode(protoResult); + if (!result) throw "Failed to decode"; + + expect(result.contentTopic).to.equal(TestContentTopic); + expect(result.version).to.equal(1); + expect(result?.payload).to.deep.equal(payload); + expect(result.signature).to.not.be.undefined; + expect(result.signaturePublicKey).to.deep.eq(sigPubKey); + } + ) + ); + }); +}); + +describe("Encryption helpers", () => { it("Asymmetric encrypt & decrypt", async function () { await fc.assert( fc.asyncProperty( @@ -74,14 +168,41 @@ describe("Waku Message Version 1", function () { ); }); - it("Clear encode and decode", async function () { + it("pre and post cipher", async function () { await fc.assert( - fc.asyncProperty(fc.uint8Array(), async (payload) => { - const enc = await clearEncode(payload); - const dec = clearDecode(enc.payload); - if (!dec?.payload) throw "payload missing"; - expect(bytesToHex(dec?.payload)).to.eq(bytesToHex(payload)); + fc.asyncProperty(fc.uint8Array(), async (message) => { + const enc = await preCipher(message); + const res = postCipher(enc); + + expect(res?.payload).deep.equal( + message, + "Payload was not encrypted then decrypted correctly" + ); }) ); }); + + it("Sign & Recover", async function () { + await fc.assert( + fc.asyncProperty( + fc.uint8Array(), + fc.uint8Array({ minLength: 32, maxLength: 32 }), + async (message, sigPrivKey) => { + const sigPubKey = getPublicKey(sigPrivKey); + + const enc = await preCipher(message, sigPrivKey); + const res = postCipher(enc); + + expect(res?.payload).deep.equal( + message, + "Payload was not encrypted then decrypted correctly" + ); + expect(res?.sig?.publicKey).deep.equal( + sigPubKey, + "signature Public key was not recovered from encrypted then decrypted signature" + ); + } + ) + ); + }); }); diff --git a/src/lib/waku_message/version_1.ts b/src/lib/waku_message/version_1.ts index 56b39ad795..99b8e2e1cb 100644 --- a/src/lib/waku_message/version_1.ts +++ b/src/lib/waku_message/version_1.ts @@ -1,100 +1,225 @@ import * as secp from "@noble/secp256k1"; +import debug from "debug"; +import * as proto from "../../proto/message"; import { keccak256, randomBytes, sign } from "../crypto"; +import { Decoder, Encoder, Message, ProtoMessage } from "../interfaces"; import { concat, hexToBytes } from "../utils"; import { Symmetric } from "./constants"; import * as ecies from "./ecies"; import * as symmetric from "./symmetric"; +import { DecoderV0, MessageV0 } from "./version_0"; + +const log = debug("waku:message:version-1"); const FlagsLength = 1; const FlagMask = 3; // 0011 const IsSignedMask = 4; // 0100 const PaddingTarget = 256; const SignatureLength = 65; +const OneMillion = BigInt(1_000_000); + +export const Version = 1; export type Signature = { signature: Uint8Array; publicKey: Uint8Array | undefined; }; -/** - * Encode the payload pre-encryption. - * - * @internal - * @param messagePayload: The payload to include in the message - * @param sigPrivKey: If set, a signature using this private key is added. - * @returns The encoded payload, ready for encryption using {@link encryptAsymmetric} - * or {@link encryptSymmetric}. - */ -export async function clearEncode( - messagePayload: Uint8Array, - sigPrivKey?: Uint8Array -): Promise<{ payload: Uint8Array; sig?: Signature }> { - let envelope = new Uint8Array([0]); // No flags - envelope = addPayloadSizeField(envelope, messagePayload); - envelope = concat([envelope, messagePayload]); +export class MessageV1 extends MessageV0 implements Message { + private readonly _decodedPayload: Uint8Array; - // Calculate padding: - let rawSize = - FlagsLength + - computeSizeOfPayloadSizeField(messagePayload) + - messagePayload.length; - - if (sigPrivKey) { - rawSize += SignatureLength; + constructor( + proto: proto.WakuMessage, + decodedPayload: Uint8Array, + public signature?: Uint8Array, + public signaturePublicKey?: Uint8Array + ) { + super(proto); + this._decodedPayload = decodedPayload; } - const remainder = rawSize % PaddingTarget; - const paddingSize = PaddingTarget - remainder; - const pad = randomBytes(paddingSize); - - if (!validateDataIntegrity(pad, paddingSize)) { - throw new Error("failed to generate random padding of size " + paddingSize); + get payload(): Uint8Array { + return this._decodedPayload; } - - envelope = concat([envelope, pad]); - let sig; - if (sigPrivKey) { - envelope[0] |= IsSignedMask; - const hash = keccak256(envelope); - const bytesSignature = await sign(hash, sigPrivKey); - envelope = concat([envelope, bytesSignature]); - sig = { - signature: bytesSignature, - publicKey: secp.getPublicKey(sigPrivKey, false), - }; - } - - return { payload: envelope, sig }; } -/** - * Decode a decrypted payload. - * - * @internal - */ -export function clearDecode( - message: Uint8Array -): { payload: Uint8Array; sig?: Signature } | undefined { - const sizeOfPayloadSizeField = getSizeOfPayloadSizeField(message); - if (sizeOfPayloadSizeField === 0) return; +export class AsymEncoder implements Encoder { + constructor( + public contentTopic: string, + private publicKey: Uint8Array, + private sigPrivKey?: Uint8Array + ) {} - const payloadSize = getPayloadSize(message, sizeOfPayloadSizeField); - const payloadStart = 1 + sizeOfPayloadSizeField; - const payload = message.slice(payloadStart, payloadStart + payloadSize); + async encode(message: Message): Promise { + const protoMessage = await this.encodeProto(message); + if (!protoMessage) return; - const isSigned = isMessageSigned(message); - - let sig; - if (isSigned) { - const signature = getSignature(message); - const hash = getHash(message, isSigned); - const publicKey = ecRecoverPubKey(hash, signature); - sig = { signature, publicKey }; + return proto.WakuMessage.encode(protoMessage); } - return { payload, sig }; + async encodeProto(message: Message): Promise { + const timestamp = message.timestamp ?? new Date(); + if (!message.payload) { + log("No payload to encrypt, skipping: ", message); + return; + } + const preparedPayload = await preCipher(message.payload, this.sigPrivKey); + + const payload = await encryptAsymmetric(preparedPayload, this.publicKey); + + return { + payload, + version: Version, + contentTopic: this.contentTopic, + timestamp: BigInt(timestamp.valueOf()) * OneMillion, + }; + } +} + +export class SymEncoder implements Encoder { + constructor( + public contentTopic: string, + private symKey: Uint8Array, + private sigPrivKey?: Uint8Array + ) {} + + async encode(message: Message): Promise { + const protoMessage = await this.encodeProto(message); + if (!protoMessage) return; + + return proto.WakuMessage.encode(protoMessage); + } + + async encodeProto(message: Message): Promise { + const timestamp = message.timestamp ?? new Date(); + if (!message.payload) { + log("No payload to encrypt, skipping: ", message); + return; + } + const preparedPayload = await preCipher(message.payload, this.sigPrivKey); + + const payload = await encryptSymmetric(preparedPayload, this.symKey); + return { + payload, + version: Version, + contentTopic: this.contentTopic, + timestamp: BigInt(timestamp.valueOf()) * OneMillion, + }; + } +} + +export class AsymDecoder extends DecoderV0 implements Decoder { + constructor(contentTopic: string, private privateKey: Uint8Array) { + super(contentTopic); + } + + async decode(protoMessage: ProtoMessage): Promise { + const cipherPayload = protoMessage.payload; + + if (protoMessage.version !== Version) { + log( + "Failed to decrypt due to incorrect version, expected:", + Version, + ", actual:", + protoMessage.version + ); + return; + } + + let payload; + if (!cipherPayload) { + log(`No payload to decrypt for contentTopic ${this.contentTopic}`); + return; + } + + try { + payload = await decryptAsymmetric(cipherPayload, this.privateKey); + } catch (e) { + log( + `Failed to decrypt message using asymmetric decryption for contentTopic: ${this.contentTopic}`, + e + ); + return; + } + + if (!payload) { + log(`Failed to decrypt payload for contentTopic ${this.contentTopic}`); + return; + } + + const res = await postCipher(payload); + + if (!res) { + log(`Failed to decode payload for contentTopic ${this.contentTopic}`); + return; + } + + log("Message decrypted", protoMessage); + return new MessageV1( + protoMessage, + res.payload, + res.sig?.signature, + res.sig?.publicKey + ); + } +} + +export class SymDecoder extends DecoderV0 implements Decoder { + constructor(contentTopic: string, private symKey: Uint8Array) { + super(contentTopic); + } + + async decode(protoMessage: ProtoMessage): Promise { + const cipherPayload = protoMessage.payload; + + if (protoMessage.version !== Version) { + log( + "Failed to decrypt due to incorrect version, expected:", + Version, + ", actual:", + protoMessage.version + ); + return; + } + + let payload; + if (!cipherPayload) { + log(`No payload to decrypt for contentTopic ${this.contentTopic}`); + return; + } + + try { + payload = await decryptSymmetric(cipherPayload, this.symKey); + } catch (e) { + log( + `Failed to decrypt message using asymmetric decryption for contentTopic: ${this.contentTopic}`, + e + ); + return; + } + + if (!payload) { + log(`Failed to decrypt payload for contentTopic ${this.contentTopic}`); + return; + } + + const res = await postCipher(payload); + + if (!res) { + log(`Failed to decode payload for contentTopic ${this.contentTopic}`); + return; + } + + log("Message decrypted", protoMessage); + return new MessageV1( + protoMessage, + res.payload, + res.sig?.signature, + res.sig?.publicKey + ); + } } function getSizeOfPayloadSizeField(message: Uint8Array): number { @@ -246,12 +371,77 @@ function ecRecoverPubKey( const recovery = recoveryDataView.getUint8(0); const _signature = secp.Signature.fromCompact(signature.slice(0, 64)); - return secp.recoverPublicKey( - messageHash, - _signature, - recovery, - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: compressed: false - false - ); + return secp.recoverPublicKey(messageHash, _signature, recovery, false); +} + +/** + * Prepare the payload pre-encryption. + * + * @internal + * @returns The encoded payload, ready for encryption using {@link encryptAsymmetric} + * or {@link encryptSymmetric}. + */ +export async function preCipher( + messagePayload: Uint8Array, + sigPrivKey?: Uint8Array +): Promise { + let envelope = new Uint8Array([0]); // No flags + envelope = addPayloadSizeField(envelope, messagePayload); + envelope = concat([envelope, messagePayload]); + + // Calculate padding: + let rawSize = + FlagsLength + + computeSizeOfPayloadSizeField(messagePayload) + + messagePayload.length; + + if (sigPrivKey) { + rawSize += SignatureLength; + } + + const remainder = rawSize % PaddingTarget; + const paddingSize = PaddingTarget - remainder; + const pad = randomBytes(paddingSize); + + if (!validateDataIntegrity(pad, paddingSize)) { + throw new Error("failed to generate random padding of size " + paddingSize); + } + + envelope = concat([envelope, pad]); + if (sigPrivKey) { + envelope[0] |= IsSignedMask; + const hash = keccak256(envelope); + const bytesSignature = await sign(hash, sigPrivKey); + envelope = concat([envelope, bytesSignature]); + } + + return envelope; +} + +/** + * Decode a decrypted payload. + * + * @internal + */ +export function postCipher( + message: Uint8Array +): { payload: Uint8Array; sig?: Signature } | undefined { + const sizeOfPayloadSizeField = getSizeOfPayloadSizeField(message); + if (sizeOfPayloadSizeField === 0) return; + + const payloadSize = getPayloadSize(message, sizeOfPayloadSizeField); + const payloadStart = 1 + sizeOfPayloadSizeField; + const payload = message.slice(payloadStart, payloadStart + payloadSize); + + const isSigned = isMessageSigned(message); + + let sig; + if (isSigned) { + const signature = getSignature(message); + const hash = getHash(message, isSigned); + const publicKey = ecRecoverPubKey(hash, signature); + sig = { signature, publicKey }; + } + + return { payload, sig }; } diff --git a/src/lib/waku_relay/index.node.spec.ts b/src/lib/waku_relay/index.node.spec.ts index 05e6a62146..ea3045e58c 100644 --- a/src/lib/waku_relay/index.node.spec.ts +++ b/src/lib/waku_relay/index.node.spec.ts @@ -18,15 +18,23 @@ import { generateSymmetricKey, getPublicKey, } from "../crypto"; -import type { WakuPrivacy } from "../interfaces"; +import type { Message, WakuPrivacy } from "../interfaces"; import { bytesToUtf8, utf8ToBytes } from "../utils"; import { waitForRemotePeer } from "../wait_for_remote_peer"; import { Protocols } from "../waku"; -import { DecryptionMethod, WakuMessage } from "../waku_message"; +import { DecoderV0, EncoderV0, MessageV0 } from "../waku_message/version_0.js"; +import { + AsymDecoder, + AsymEncoder, + SymDecoder, + SymEncoder, +} from "../waku_message/version_1.js"; const log = debug("waku:test"); const TestContentTopic = "/test/1/waku-relay/utf8"; +const TestEncoder = new EncoderV0(TestContentTopic); +const TestDecoder = new DecoderV0(TestContentTopic); describe("Waku Relay [node only]", () => { // Node needed as we don't have a way to connect 2 js waku @@ -100,27 +108,21 @@ describe("Waku Relay [node only]", () => { const messageText = "JS to JS communication works"; const messageTimestamp = new Date("1995-12-17T03:24:00"); - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic, - { - timestamp: messageTimestamp, - } - ); + const message = { + payload: utf8ToBytes(messageText), + timestamp: messageTimestamp, + }; - const receivedMsgPromise: Promise = new Promise( - (resolve) => { - waku2.relay.addObserver(resolve); - } - ); + const receivedMsgPromise: Promise = new Promise((resolve) => { + waku2.relay.addObserver(TestDecoder, resolve); + }); - await waku1.relay.send(message); + await waku1.relay.send(TestEncoder, message); const receivedMsg = await receivedMsgPromise; - expect(receivedMsg.contentTopic).to.eq(message.contentTopic); - expect(receivedMsg.version).to.eq(message.version); - expect(receivedMsg.payloadAsUtf8).to.eq(messageText); + expect(receivedMsg.contentTopic).to.eq(TestContentTopic); + expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText); expect(receivedMsg.timestamp?.valueOf()).to.eq( messageTimestamp.valueOf() ); @@ -131,108 +133,83 @@ describe("Waku Relay [node only]", () => { const fooMessageText = "Published on content topic foo"; const barMessageText = "Published on content topic bar"; - const fooMessage = await WakuMessage.fromUtf8String( - fooMessageText, - "foo" - ); - const barMessage = await WakuMessage.fromUtf8String( - barMessageText, - "bar" - ); - const receivedBarMsgPromise: Promise = new Promise( - (resolve) => { - waku2.relay.addObserver(resolve, ["bar"]); - } - ); + const fooContentTopic = "foo"; + const barContentTopic = "bar"; - const allMessages: WakuMessage[] = []; - waku2.relay.addObserver((wakuMsg) => { - allMessages.push(wakuMsg); + const fooEncoder = new EncoderV0(fooContentTopic); + const barEncoder = new EncoderV0(barContentTopic); + + const fooDecoder = new DecoderV0(fooContentTopic); + const barDecoder = new DecoderV0(barContentTopic); + + const fooMessages: Message[] = []; + waku2.relay.addObserver(fooDecoder, (msg) => { + fooMessages.push(msg); }); - await waku1.relay.send(fooMessage); - await waku1.relay.send(barMessage); + const barMessages: Message[] = []; + waku2.relay.addObserver(barDecoder, (msg) => { + barMessages.push(msg); + }); - const receivedBarMsg = await receivedBarMsgPromise; + await waku1.relay.send(barEncoder, { + payload: utf8ToBytes(barMessageText), + }); + await waku1.relay.send(fooEncoder, { + payload: utf8ToBytes(fooMessageText), + }); - expect(receivedBarMsg.contentTopic).to.eq(barMessage.contentTopic); - expect(receivedBarMsg.version).to.eq(barMessage.version); - expect(receivedBarMsg.payloadAsUtf8).to.eq(barMessageText); - expect(allMessages.length).to.eq(2); - expect(allMessages[0].contentTopic).to.eq(fooMessage.contentTopic); - expect(allMessages[0].version).to.eq(fooMessage.version); - expect(allMessages[0].payloadAsUtf8).to.eq(fooMessageText); - expect(allMessages[1].contentTopic).to.eq(barMessage.contentTopic); - expect(allMessages[1].version).to.eq(barMessage.version); - expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText); + await delay(200); + + expect(fooMessages[0].contentTopic).to.eq(fooContentTopic); + expect(bytesToUtf8(fooMessages[0].payload!)).to.eq(fooMessageText); + + expect(barMessages[0].contentTopic).to.eq(barContentTopic); + expect(bytesToUtf8(barMessages[0].payload!)).to.eq(barMessageText); + + expect(fooMessages.length).to.eq(1); + expect(barMessages.length).to.eq(1); }); it("Decrypt messages", async function () { this.timeout(10000); - const encryptedAsymmetricMessageText = - "This message is encrypted using asymmetric"; - const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto"; - const encryptedSymmetricMessageText = - "This message is encrypted using symmetric encryption"; - const encryptedSymmetricContentTopic = "/test/1/symmetric/proto"; + const asymText = "This message is encrypted using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + const symText = "This message is encrypted using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; const privateKey = generatePrivateKey(); const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); - const [encryptedAsymmetricMessage, encryptedSymmetricMessage] = - await Promise.all([ - WakuMessage.fromUtf8String( - encryptedAsymmetricMessageText, - encryptedAsymmetricContentTopic, - { - encPublicKey: publicKey, - } - ), - WakuMessage.fromUtf8String( - encryptedSymmetricMessageText, - encryptedSymmetricContentTopic, - { - symKey: symKey, - } - ), - ]); + const asymEncoder = new AsymEncoder(asymTopic, publicKey); + const symEncoder = new SymEncoder(symTopic, symKey); - waku2.addDecryptionKey(privateKey, { - contentTopics: [encryptedAsymmetricContentTopic], - method: DecryptionMethod.Asymmetric, - }); - waku2.addDecryptionKey(symKey, { - contentTopics: [encryptedSymmetricContentTopic], - method: DecryptionMethod.Symmetric, - }); + const asymDecoder = new AsymDecoder(asymTopic, privateKey); + const symDecoder = new SymDecoder(symTopic, symKey); - const msgs: WakuMessage[] = []; - waku2.relay.addObserver((wakuMsg) => { + const msgs: Message[] = []; + waku2.relay.addObserver(asymDecoder, (wakuMsg) => { + msgs.push(wakuMsg); + }); + waku2.relay.addObserver(symDecoder, (wakuMsg) => { msgs.push(wakuMsg); }); - await waku1.relay.send(encryptedAsymmetricMessage); + await waku1.relay.send(asymEncoder, { payload: utf8ToBytes(asymText) }); await delay(200); - await waku1.relay.send(encryptedSymmetricMessage); + await waku1.relay.send(symEncoder, { payload: utf8ToBytes(symText) }); while (msgs.length < 2) { await delay(200); } - expect(msgs.length).to.eq(2); - expect(msgs[0].contentTopic).to.eq( - encryptedAsymmetricMessage.contentTopic - ); - expect(msgs[0].version).to.eq(encryptedAsymmetricMessage.version); - expect(msgs[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - expect(msgs[1].contentTopic).to.eq( - encryptedSymmetricMessage.contentTopic - ); - expect(msgs[1].version).to.eq(encryptedSymmetricMessage.version); - expect(msgs[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); + expect(msgs[0].contentTopic).to.eq(asymTopic); + expect(bytesToUtf8(msgs[0].payload!)).to.eq(asymText); + expect(msgs[1].contentTopic).to.eq(symTopic); + expect(bytesToUtf8(msgs[1].payload!)).to.eq(symText); }); it("Delete observer", async function () { @@ -240,22 +217,23 @@ describe("Waku Relay [node only]", () => { const messageText = "Published on content topic with added then deleted observer"; - const message = await WakuMessage.fromUtf8String( - messageText, - "added-then-deleted-observer" - ); + + const contentTopic = "added-then-deleted-observer"; // The promise **fails** if we receive a message on this observer. - const receivedMsgPromise: Promise = new Promise( + const receivedMsgPromise: Promise = new Promise( (resolve, reject) => { - const deleteObserver = waku2.relay.addObserver(reject, [ - "added-then-deleted-observer", - ]); + const deleteObserver = waku2.relay.addObserver( + new DecoderV0(contentTopic), + reject + ); deleteObserver(); setTimeout(resolve, 500); } ); - await waku1.relay.send(message); + await waku1.relay.send(new EncoderV0(contentTopic), { + payload: utf8ToBytes(messageText), + }); await receivedMsgPromise; // If it does not throw then we are good. @@ -312,32 +290,30 @@ describe("Waku Relay [node only]", () => { ]); const messageText = "Communicating using a custom pubsub topic"; - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic - ); - const waku2ReceivedMsgPromise: Promise = new Promise( + const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - waku2.relay.addObserver(resolve); + waku2.relay.addObserver(TestDecoder, resolve); } ); // The promise **fails** if we receive a message on the default // pubsub topic. - const waku3NoMsgPromise: Promise = new Promise( + const waku3NoMsgPromise: Promise = new Promise( (resolve, reject) => { - waku3.relay.addObserver(reject); + waku3.relay.addObserver(TestDecoder, reject); setTimeout(resolve, 1000); } ); - await waku1.relay.send(message); + await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes(messageText), + }); const waku2ReceivedMsg = await waku2ReceivedMsgPromise; await waku3NoMsgPromise; - expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText); + expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); }); }); @@ -382,12 +358,7 @@ describe("Waku Relay [node only]", () => { this.timeout(30000); const messageText = "This is a message"; - const message = await WakuMessage.fromUtf8String( - messageText, - TestContentTopic - ); - await delay(1000); - await waku.relay.send(message); + await waku.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) }); let msgs: MessageRpcResponse[] = []; @@ -397,8 +368,8 @@ describe("Waku Relay [node only]", () => { msgs = await nwaku.messages(); } - expect(msgs[0].contentTopic).to.equal(message.contentTopic); - expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].contentTopic).to.equal(TestContentTopic); + expect(msgs[0].version).to.equal(0); expect(bytesToUtf8(new Uint8Array(msgs[0].payload))).to.equal( messageText ); @@ -408,24 +379,25 @@ describe("Waku Relay [node only]", () => { await delay(200); const messageText = "Here is another message."; - const message = { - payload: utf8ToBytes(messageText), - contentTopic: TestContentTopic, - }; - const receivedMsgPromise: Promise = new Promise( - (resolve) => { - waku.relay.addObserver(resolve); - } + const receivedMsgPromise: Promise = new Promise((resolve) => { + waku.relay.addObserver(TestDecoder, (msg) => + resolve(msg as unknown as MessageV0) + ); + }); + + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + contentTopic: TestContentTopic, + payload: utf8ToBytes(messageText), + }) ); - await nwaku.sendMessage(Nwaku.toMessageRpcQuery(message)); - const receivedMsg = await receivedMsgPromise; - expect(receivedMsg.contentTopic).to.eq(message.contentTopic); + expect(receivedMsg.contentTopic).to.eq(TestContentTopic); expect(receivedMsg.version).to.eq(0); - expect(receivedMsg.payloadAsUtf8).to.eq(messageText); + expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText); }); describe.skip("Two nodes connected to nwaku", function () { @@ -475,22 +447,19 @@ describe("Waku Relay [node only]", () => { expect(waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.be.false; const msgStr = "Hello there!"; - const message = await WakuMessage.fromUtf8String( - msgStr, - TestContentTopic - ); + const message = { payload: utf8ToBytes(msgStr) }; - const waku2ReceivedMsgPromise: Promise = new Promise( + const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - waku2.relay.addObserver(resolve); + waku2.relay.addObserver(TestDecoder, resolve); } ); - await waku1.relay.send(message); + await waku1.relay.send(TestEncoder, message); console.log("Waiting for message"); const waku2ReceivedMsg = await waku2ReceivedMsgPromise; - expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr); + expect(waku2ReceivedMsg.payload).to.eq(msgStr); }); }); }); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 0480bc23a4..d132e7e17b 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -8,16 +8,20 @@ import { TopicStr, } from "@chainsafe/libp2p-gossipsub/dist/src/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; +import { PublishResult } from "@libp2p/interface-pubsub"; import debug from "debug"; import { DefaultPubSubTopic } from "../constants"; -import { hexToBytes } from "../utils"; -import { DecryptionMethod, WakuMessage } from "../waku_message"; +import { Decoder, Encoder, Message } from "../interfaces"; +import { pushOrInitMapSet } from "../push_or_init_map"; +import { DecoderV0 } from "../waku_message/version_0"; import * as constants from "./constants"; const log = debug("waku:relay"); +export type Callback = (msg: Message) => void; + export type CreateOptions = { /** * The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}. @@ -33,7 +37,6 @@ export type CreateOptions = { * @default {@link DefaultPubSubTopic} */ pubSubTopic?: string; - decryptionKeys?: Array; } & GossipsubOpts; /** @@ -46,18 +49,11 @@ export class WakuRelay extends GossipSub { pubSubTopic: string; public static multicodec: string = constants.RelayCodecs[0]; - public decryptionKeys: Map< - Uint8Array, - { method?: DecryptionMethod; contentTopics?: string[] } - >; - /** * observers called when receiving new message. * Observers under key `""` are always called. */ - public observers: { - [contentTopic: string]: Set<(message: WakuMessage) => void>; - }; + public observers: Map>; constructor(options?: Partial) { options = Object.assign(options ?? {}, { @@ -68,14 +64,9 @@ export class WakuRelay extends GossipSub { super(options); this.multicodecs = constants.RelayCodecs; - this.observers = {}; - this.decryptionKeys = new Map(); + this.observers = new Map(); this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; - - options?.decryptionKeys?.forEach((key) => { - this.addDecryptionKey(key); - }); } /** @@ -92,102 +83,39 @@ export class WakuRelay extends GossipSub { /** * Send Waku message. - * - * @param {WakuMessage} message - * @returns {Promise} */ - public async send(message: WakuMessage): Promise { - const msg = message.encode(); - await this.publish(this.pubSubTopic, msg); + public async send( + encoder: Encoder, + message: Message + ): Promise { + const msg = await encoder.encode(message); + if (!msg) { + log("Failed to encode message, aborting publish"); + return { recipients: [] }; + } + return this.publish(this.pubSubTopic, msg); } /** - * Register a decryption key to attempt decryption of received messages. - * This can either be a private key for asymmetric encryption or a symmetric - * key. `WakuRelay` will attempt to decrypt messages using both methods. + * Add an observer and associated Decoder to process incoming messages on a given content topic. * - * Strings must be in hex format. - */ - addDecryptionKey( - key: Uint8Array | string, - options?: { method?: DecryptionMethod; contentTopics?: string[] } - ): void { - this.decryptionKeys.set(hexToBytes(key), options ?? {}); - } - - /** - * Delete a decryption key that was used to attempt decryption of received - * messages. - * - * Strings must be in hex format. - */ - deleteDecryptionKey(key: Uint8Array | string): void { - this.decryptionKeys.delete(hexToBytes(key)); - } - - /** - * Register an observer of new messages received via waku relay - * - * @param callback called when a new message is received via waku relay - * @param contentTopics Content Topics for which the callback with be called, - * all of them if undefined, [] or ["",..] is passed. * @returns Function to delete the observer */ - addObserver( - callback: (message: WakuMessage) => void, - contentTopics: string[] = [] - ): () => void { - if (contentTopics.length === 0) { - if (!this.observers[""]) { - this.observers[""] = new Set(); - } - this.observers[""].add(callback); - } else { - contentTopics.forEach((contentTopic) => { - if (!this.observers[contentTopic]) { - this.observers[contentTopic] = new Set(); - } - this.observers[contentTopic].add(callback); - }); - } + addObserver(decoder: Decoder, callback: Callback): () => void { + const observer = { + decoder, + callback, + }; + pushOrInitMapSet(this.observers, decoder.contentTopic, observer); return () => { - if (contentTopics.length === 0) { - if (this.observers[""]) { - this.observers[""].delete(callback); - } - } else { - contentTopics.forEach((contentTopic) => { - if (this.observers[contentTopic]) { - this.observers[contentTopic].delete(callback); - } - }); + const observers = this.observers.get(decoder.contentTopic); + if (observers) { + observers.delete(observer); } }; } - /** - * Remove an observer of new messages received via waku relay. - * Useful to ensure the same observer is not registered several time - * (e.g when loading React components) - */ - deleteObserver( - callback: (message: WakuMessage) => void, - contentTopics: string[] = [] - ): void { - if (contentTopics.length === 0) { - if (this.observers[""]) { - this.observers[""].delete(callback); - } - } else { - contentTopics.forEach((contentTopic) => { - if (this.observers[contentTopic]) { - this.observers[contentTopic].delete(callback); - } - }); - } - } - /** * Subscribe to a pubsub topic and start emitting Waku messages to observers. * @@ -196,43 +124,37 @@ export class WakuRelay extends GossipSub { subscribe(pubSubTopic: string): void { this.addEventListener( "gossipsub:message", - (event: CustomEvent) => { - if (event.detail.msg.topic === pubSubTopic) { - const decryptionParams = Array.from(this.decryptionKeys).map( - ([key, { method, contentTopics }]) => { - return { - key, - method, - contentTopics, - }; - } - ); + async (event: CustomEvent) => { + if (event.detail.msg.topic !== pubSubTopic) return; + log(`Message received on ${pubSubTopic}`); - log(`Message received on ${pubSubTopic}`); - WakuMessage.decode(event.detail.msg.data, decryptionParams) - .then((wakuMsg) => { - if (!wakuMsg) { - log("Failed to decode Waku Message"); - return; - } - - if (this.observers[""]) { - this.observers[""].forEach((callbackFn) => { - callbackFn(wakuMsg); - }); - } - if (wakuMsg.contentTopic) { - if (this.observers[wakuMsg.contentTopic]) { - this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { - callbackFn(wakuMsg); - }); - } - } - }) - .catch((e) => { - log("Failed to decode Waku Message", e); - }); + const decoderV0 = new DecoderV0(""); + // TODO: User might want to decide what decoder should be used (e.g. for RLN) + const protoMsg = await decoderV0.decodeProto(event.detail.msg.data); + if (!protoMsg) { + return; } + const contentTopic = protoMsg.contentTopic; + + if (typeof contentTopic === "undefined") { + log("Message does not have a content topic, skipping"); + return; + } + + const observers = this.observers.get(contentTopic); + if (!observers) { + return; + } + await Promise.all( + Array.from(observers).map(async ({ decoder, callback }) => { + const msg = await decoder.decode(protoMsg); + if (msg) { + callback(msg); + } else { + log("Failed to decode messages on", contentTopic); + } + }) + ); } ); diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 07db951213..8156d4469c 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -13,17 +13,25 @@ import { generateSymmetricKey, getPublicKey, } from "../crypto"; -import type { WakuFull } from "../interfaces"; -import { utf8ToBytes } from "../utils"; +import type { Message, WakuFull } from "../interfaces"; +import { bytesToUtf8, utf8ToBytes } from "../utils"; import { waitForRemotePeer } from "../wait_for_remote_peer"; import { Protocols } from "../waku"; -import { DecryptionMethod, WakuMessage } from "../waku_message"; +import { DecoderV0, EncoderV0 } from "../waku_message/version_0.js"; +import { + AsymDecoder, + AsymEncoder, + SymDecoder, + SymEncoder, +} from "../waku_message/version_1.js"; import { PageDirection } from "./history_rpc"; const log = debug("waku:test:store"); const TestContentTopic = "/test/1/waku-store/utf8"; +const TestEncoder = new EncoderV0(TestContentTopic); +const TestDecoder = new DecoderV0(TestContentTopic); describe("Waku Store", () => { let waku: WakuFull; @@ -62,9 +70,9 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: WakuMessage[] = []; + const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([])) { + for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -78,7 +86,7 @@ describe("Waku Store", () => { expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; + return bytesToUtf8(msg.payload!) === "Message 0"; }); expect(result).to.not.eq(-1); }); @@ -93,9 +101,9 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: WakuMessage[] = []; + const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([])) { + for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -133,8 +141,8 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: WakuMessage[] = []; - await waku.store.queryCallbackOnPromise([], async (msgPromise) => { + const messages: Message[] = []; + await waku.store.queryCallbackOnPromise(TestDecoder, async (msgPromise) => { const msg = await msgPromise; if (msg) { messages.push(msg); @@ -143,7 +151,7 @@ describe("Waku Store", () => { expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; + return bytesToUtf8(msg.payload!) === "Message 0"; }); expect(result).to.not.eq(-1); }); @@ -172,9 +180,9 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const desiredMsgs = 14; - const messages: WakuMessage[] = []; + const messages: Message[] = []; await waku.store.queryCallbackOnPromise( - [], + TestDecoder, async (msgPromise) => { const msg = await msgPromise; if (msg) { @@ -210,9 +218,9 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: WakuMessage[] = []; + const messages: Message[] = []; await waku.store.queryOrderedCallback( - [], + TestDecoder, async (msg) => { messages.push(msg); }, @@ -225,7 +233,7 @@ describe("Waku Store", () => { for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === `Message ${index}`; + return bytesToUtf8(msg.payload!) === `Message ${index}`; }) ).to.eq(index); } @@ -253,9 +261,9 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; + let messages: Message[] = []; await waku.store.queryOrderedCallback( - [], + TestDecoder, async (msg) => { messages.push(msg); }, @@ -270,7 +278,7 @@ describe("Waku Store", () => { for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === `Message ${index}`; + return bytesToUtf8(msg.payload!) === `Message ${index}`; }) ).to.eq(index); } @@ -279,60 +287,45 @@ describe("Waku Store", () => { it("Generator, with asymmetric & symmetric encrypted messages", async function () { this.timeout(15_000); - const encryptedAsymmetricMessageText = - "This message is encrypted for me using asymmetric"; - const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto"; - const encryptedSymmetricMessageText = + const asymText = "This message is encrypted for me using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + const symText = "This message is encrypted for me using symmetric encryption"; - const encryptedSymmetricContentTopic = "/test/1/symmetric/proto"; - const clearMessageText = - "This is a clear text message for everyone to read"; - const otherEncMessageText = + const symTopic = "/test/1/symmetric/proto"; + const clearText = "This is a clear text message for everyone to read"; + const otherText = "This message is not for and I must not be able to read it"; + const timestamp = new Date(); + + const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; + const symMsg = { + payload: utf8ToBytes(symText), + timestamp: new Date(timestamp.valueOf() + 1), + }; + const clearMsg = { + payload: utf8ToBytes(clearText), + timestamp: new Date(timestamp.valueOf() + 2), + }; + const otherMsg = { + payload: utf8ToBytes(otherText), + timestamp: new Date(timestamp.valueOf() + 3), + }; + const privateKey = generatePrivateKey(); const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); - const timestamp = new Date(); - const [ - encryptedAsymmetricMessage, - encryptedSymmetricMessage, - clearMessage, - otherEncMessage, - ] = await Promise.all([ - WakuMessage.fromUtf8String( - encryptedAsymmetricMessageText, - encryptedAsymmetricContentTopic, - { - encPublicKey: publicKey, - timestamp, - } - ), - WakuMessage.fromUtf8String( - encryptedSymmetricMessageText, - encryptedSymmetricContentTopic, - { - symKey: symKey, - timestamp: new Date(timestamp.valueOf() + 1), - } - ), - WakuMessage.fromUtf8String( - clearMessageText, - encryptedAsymmetricContentTopic, - { timestamp: new Date(timestamp.valueOf() + 2) } - ), - WakuMessage.fromUtf8String( - otherEncMessageText, - encryptedSymmetricContentTopic, - { - encPublicKey: getPublicKey(generatePrivateKey()), - timestamp: new Date(timestamp.valueOf() + 3), - } - ), - ]); + const asymEncoder = new AsymEncoder(asymTopic, publicKey); + const symEncoder = new SymEncoder(symTopic, symKey); - log("Messages have been encrypted"); + const otherEncoder = new AsymEncoder( + TestContentTopic, + getPublicKey(generatePrivateKey()) + ); + + const asymDecoder = new AsymDecoder(asymTopic, privateKey); + const symDecoder = new SymDecoder(symTopic, symKey); const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ createFullNode({ @@ -357,25 +350,18 @@ describe("Waku Store", () => { log("Sending messages using light push"); await Promise.all([ - waku1.lightPush.push(encryptedAsymmetricMessage), - waku1.lightPush.push(encryptedSymmetricMessage), - waku1.lightPush.push(otherEncMessage), - waku1.lightPush.push(clearMessage), + waku1.lightPush.push(asymEncoder, asymMsg), + waku1.lightPush.push(symEncoder, symMsg), + waku1.lightPush.push(otherEncoder, otherMsg), + waku1.lightPush.push(TestEncoder, clearMsg), ]); await waitForRemotePeer(waku2, [Protocols.Store]); - waku2.addDecryptionKey(symKey, { - contentTopics: [encryptedSymmetricContentTopic], - method: DecryptionMethod.Symmetric, - }); - - const messages: WakuMessage[] = []; + const messages: Message[] = []; log("Retrieve messages from store"); - for await (const msgPromises of waku2.store.queryGenerator([], { - decryptionParams: [{ key: privateKey }], - })) { + for await (const msgPromises of waku2.store.queryGenerator(asymDecoder)) { for (const promise of msgPromises) { const msg = await promise; if (msg) { @@ -384,25 +370,30 @@ describe("Waku Store", () => { } } - expect(messages?.length).eq(3); - if (!messages) throw "Length was tested"; - // Messages are ordered from oldest to latest within a page (1 page query) - expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2].payloadAsUtf8).to.eq(clearMessageText); - - for (const text of [ - encryptedAsymmetricMessageText, - encryptedSymmetricMessageText, - clearMessageText, - ]) { - expect( - messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === text; - }) - ).to.not.eq(-1); + for await (const msgPromises of waku2.store.queryGenerator(symDecoder)) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } + } } + for await (const msgPromises of waku2.store.queryGenerator(TestDecoder)) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } + } + } + + // Messages are ordered from oldest to latest within a page (1 page query) + expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); + expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); + expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); + expect(messages?.length).eq(3); + !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); }); @@ -450,9 +441,9 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); - const firstMessages: WakuMessage[] = []; + const firstMessages: Message[] = []; await waku.store.queryOrderedCallback( - [], + TestDecoder, (msg) => { if (msg) { firstMessages.push(msg); @@ -464,9 +455,9 @@ describe("Waku Store", () => { } ); - const bothMessages: WakuMessage[] = []; + const bothMessages: Message[] = []; await waku.store.queryOrderedCallback( - [], + TestDecoder, async (msg) => { bothMessages.push(msg); }, @@ -481,7 +472,7 @@ describe("Waku Store", () => { expect(firstMessages?.length).eq(1); - expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0"); + expect(bytesToUtf8(firstMessages[0].payload!)).eq("Message 0"); expect(bothMessages?.length).eq(2); }); @@ -531,9 +522,9 @@ describe("Waku Store, custom pubsub topic", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: WakuMessage[] = []; + const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([])) { + for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -547,7 +538,7 @@ describe("Waku Store, custom pubsub topic", () => { expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; + return bytesToUtf8(msg.payload!) === "Message 0"; }); expect(result).to.not.eq(-1); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 0785234521..158601fdf3 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -11,14 +11,9 @@ import { Uint8ArrayList } from "uint8arraylist"; import * as protoV2Beta4 from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4"; import { DefaultPubSubTopic, StoreCodecs } from "../constants"; +import { Decoder, Message } from "../interfaces"; import { selectConnection } from "../select_connection"; import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer"; -import { hexToBytes } from "../utils"; -import { - DecryptionMethod, - DecryptionParams, - WakuMessage, -} from "../waku_message"; import { HistoryRPC, PageDirection, Params } from "./history_rpc"; @@ -78,13 +73,6 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; - /** - * Keys that will be used to decrypt messages. - * - * It can be Asymmetric Private Keys and Symmetric Keys in the same array, - * all keys will be tried with both methods. - */ - decryptionParams?: DecryptionParams[]; } /** @@ -94,15 +82,9 @@ export interface QueryOptions { */ export class WakuStore { pubSubTopic: string; - public decryptionKeys: Map< - Uint8Array, - { method?: DecryptionMethod; contentTopics?: string[] } - >; constructor(public libp2p: Libp2p, options?: CreateOptions) { this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; - - this.decryptionKeys = new Map(); } /** @@ -122,14 +104,12 @@ export class WakuStore { * or if an error is encountered when processing the reply. */ async queryOrderedCallback( - contentTopics: string[], - callback: ( - message: WakuMessage - ) => Promise | boolean | void, + decoder: Decoder, + callback: (message: Message) => Promise | boolean | void, options?: QueryOptions ): Promise { const abort = false; - for await (const promises of this.queryGenerator(contentTopics, options)) { + for await (const promises of this.queryGenerator(decoder, options)) { if (abort) break; let messages = await Promise.all(promises); @@ -172,15 +152,15 @@ export class WakuStore { * or if an error is encountered when processing the reply. */ async queryCallbackOnPromise( - contentTopics: string[], + decoder: Decoder, callback: ( - message: Promise + message: Promise ) => Promise | boolean | void, options?: QueryOptions ): Promise { let abort = false; let promises: Promise[] = []; - for await (const page of this.queryGenerator(contentTopics, options)) { + for await (const page of this.queryGenerator(decoder, options)) { const _promises = page.map(async (msg) => { if (!abort) { abort = Boolean(await callback(msg)); @@ -209,9 +189,9 @@ export class WakuStore { * or if an error is encountered when processing the reply. */ async *queryGenerator( - contentTopics: string[], + decoder: Decoder, options?: QueryOptions - ): AsyncGenerator[]> { + ): AsyncGenerator[]> { let startTime, endTime; if (options?.timeFilter) { @@ -219,6 +199,8 @@ export class WakuStore { endTime = options.timeFilter.endTime; } + const contentTopic = decoder.contentTopic; + const queryOpts = Object.assign( { pubSubTopic: this.pubSubTopic, @@ -226,7 +208,7 @@ export class WakuStore { pageSize: DefaultPageSize, }, options, - { contentTopics, startTime, endTime } + { contentTopics: [contentTopic], startTime, endTime } ); log("Querying history with the following options", { @@ -250,57 +232,16 @@ export class WakuStore { if (!connection) throw "Failed to get a connection to the peer"; - let decryptionParams: DecryptionParams[] = []; - - this.decryptionKeys.forEach(({ method, contentTopics }, key) => { - decryptionParams.push({ - key, - method, - contentTopics, - }); - }); - - // Add the decryption keys passed to this function against the - // content topics also passed to this function. - if (options?.decryptionParams) { - decryptionParams = decryptionParams.concat(options.decryptionParams); - } - for await (const messages of paginate( connection, protocol, queryOpts, - decryptionParams + decoder )) { yield messages; } } - /** - * Register a decryption key to attempt decryption of messages received in any - * subsequent query call. This can either be a private key for - * asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to - * decrypt messages using both methods. - * - * Strings must be in hex format. - */ - addDecryptionKey( - key: Uint8Array | string, - options?: { method?: DecryptionMethod; contentTopics?: string[] } - ): void { - this.decryptionKeys.set(hexToBytes(key), options ?? {}); - } - - /**cursorV2Beta4 - * Delete a decryption key that was used to attempt decryption of messages - * received in subsequent query calls. - * - * Strings must be in hex format. - */ - deleteDecryptionKey(key: Uint8Array | string): void { - this.decryptionKeys.delete(hexToBytes(key)); - } - /** * Returns known peers from the address book (`libp2p.peerStore`) that support * store protocol. Waku may or may not be currently connected to these peers. @@ -319,8 +260,8 @@ async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decryptionParams: DecryptionParams[] -): AsyncGenerator[]> { + decoder: Decoder +): AsyncGenerator[]> { let cursor = undefined; while (true) { queryOpts = Object.assign(queryOpts, { cursor }); @@ -373,9 +314,7 @@ async function* paginate( log(`${response.messages.length} messages retrieved from store`); - yield response.messages.map((protoMsg) => - WakuMessage.decodeProto(protoMsg, decryptionParams) - ); + yield response.messages.map((protoMsg) => decoder.decode(protoMsg)); cursor = response.pagingInfo?.cursor; if (typeof cursor === "undefined") { @@ -401,7 +340,7 @@ async function* paginate( } export const isWakuMessageDefined = ( - msg: WakuMessage | undefined -): msg is WakuMessage => { + msg: Message | undefined +): msg is Message => { return !!msg; }; diff --git a/typedoc.json b/typedoc.json index 948a5a0b7e..a8daf63237 100644 --- a/typedoc.json +++ b/typedoc.json @@ -6,7 +6,9 @@ "./src/lib/peer_discovery_dns.ts", "./src/lib/peer_discovery_static_list.ts", "./src/lib/predefined_bootstrap_nodes.ts", - "./src/lib/wait_for_remote_peer.ts" + "./src/lib/wait_for_remote_peer.ts", + "./src/lib/waku_message/version_0.ts", + "./src/lib/waku_message/version_1.ts" ], "out": "build/docs", "exclude": ["**/*.spec.ts"],