feat: make message encoding more generic

This commit is contained in:
fryorcraken.eth 2022-09-19 13:50:29 +10:00
parent 96e8d0ebd5
commit b2c7e4185f
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
29 changed files with 1085 additions and 1388 deletions

View File

@ -3,6 +3,8 @@
"$schema": "https://raw.githubusercontent.com/streetsidesoftware/cspell/master/cspell.schema.json",
"language": "en",
"words": [
"abortable",
"asym",
"backoff",
"backoffs",
"bitjson",

View File

@ -10,12 +10,14 @@ module.exports = [
import: {
"./bundle/lib/create_waku.js": "{ createLightNode }",
"./bundle/lib/wait_for_remote_peer.js": "{ waitForRemotePeer }",
"./bundle/lib/waku_message/version_0.js":
"{ MessageV0, DecoderV0, EncoderV0 }",
},
},
{
name: "Asymmetric, symmetric encryption and signature",
path: "bundle/index.js",
import: "{ WakuMessage }",
path: "bundle/lib/waku_message/version_1.js",
import: "{ MessageV1, AsymEncoder, AsymDecoder, SymEncoder, SymDecoder }",
},
{
name: "DNS discovery",

View File

@ -17,6 +17,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `queryCallbackOnPromise`'s return value has been simplified to `Promise<void>`.
- doc: clarified behaviour of `WakuStore` query functions.
- Waku message encoding and decoding is more generic, to enable upcoming feature such as [RLN](https://rfc.vac.dev/spec/17/) & [Noise](https://rfc.vac.dev/spec/43/);
it also enables separating the `version_1` module out to reduce bundle size and improve cross-platform compatibility when not used.
- Due to the change above, all APIs that handle messages have changed to receive a `Decoder` or `Encoder`.
### Deleted
- `WakuMessage` class in favour of the `Message`, `Encoder`, `Decoder` interfaces and `EncoderV0`, `AsymEncoder`, `SymEncoder` (and related decoders).
## [0.28.0] - 2022-09-16

View File

@ -31,6 +31,14 @@
"./lib/wait_for_remote_peer": {
"types": "./dist/lib/wait_for_remote_peer.d.ts",
"import": "./dist/lib/wait_for_remote_peer.js"
},
"./lib/waku_message/version_0": {
"types": "./dist/lib/waku_message/version_0.d.ts",
"import": "./dist/lib/waku_message/version_0.js"
},
"./lib/waku_message/version_1": {
"types": "./dist/lib/waku_message/version_1.d.ts",
"import": "./dist/lib/waku_message/version_1.js"
}
},
"typesVersions": {

View File

@ -10,6 +10,8 @@ export default {
"lib/peer_discovery_static_list": "dist/lib/peer_discovery_static_list.js",
"lib/predefined_bootstrap_nodes": "dist/lib/predefined_bootstrap_nodes.js",
"lib/wait_for_remote_peer": "dist/lib/wait_for_remote_peer.js",
"lib/waku_message/version_0": "dist/lib/waku_message/version_0.js",
"lib/waku_message/version_1": "dist/lib/waku_message/version_1.js",
},
output: {
dir: "bundle",

View File

@ -15,9 +15,6 @@ export * as proto_message from "./proto/message";
export * as waku from "./lib/waku";
export { WakuNode, Protocols } from "./lib/waku";
export * as waku_message from "./lib/waku_message";
export { WakuMessage } from "./lib/waku_message";
export * as waku_filter from "./lib/waku_filter";
export { WakuFilter } from "./lib/waku_filter";

14
src/lib/group_by.ts Normal file
View File

@ -0,0 +1,14 @@
export function groupByContentTopic<T extends { contentTopic: string }>(
values: T[]
): Map<string, Array<T>> {
const groupedDecoders = new Map();
values.forEach((value) => {
let decs = groupedDecoders.get(value.contentTopic);
if (!decs) {
groupedDecoders.set(value.contentTopic, []);
decs = groupedDecoders.get(value.contentTopic);
}
decs.push(value);
});
return groupedDecoders;
}

View File

@ -6,7 +6,6 @@ import type { Libp2p } from "libp2p";
import type { Protocols } from "./waku";
import type { WakuFilter } from "./waku_filter";
import type { WakuLightPush } from "./waku_light_push";
import type { DecryptionMethod } from "./waku_message";
import type { WakuRelay } from "./waku_relay";
import type { WakuStore } from "./waku_store";
@ -29,13 +28,6 @@ export interface Waku {
stop(): Promise<void>;
isStarted(): boolean;
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void;
deleteDecryptionKey(key: Uint8Array | string): void;
}
export interface WakuLight extends Waku {
@ -58,3 +50,28 @@ export interface WakuFull extends Waku {
filter: WakuFilter;
lightPush: WakuLightPush;
}
export interface ProtoMessage {
payload?: Uint8Array;
contentTopic?: string;
version?: number;
timestamp?: bigint;
}
export interface Message {
payload?: Uint8Array;
contentTopic?: string;
timestamp?: Date;
}
export interface Encoder {
contentTopic: string;
encode: (message: Message) => Promise<Uint8Array | undefined>;
encodeProto: (message: Message) => Promise<ProtoMessage | undefined>;
}
export interface Decoder {
contentTopic: string;
decodeProto: (bytes: Uint8Array) => Promise<ProtoMessage | undefined>;
decode: (proto: ProtoMessage) => Promise<Message | undefined>;
}

View File

@ -0,0 +1,27 @@
import { expect } from "chai";
import { pushOrInitMapSet } from "./push_or_init_map";
describe("pushOrInitMapSet", () => {
it("Init the array if not present", () => {
const map = new Map();
const key = "key";
const value = "value";
pushOrInitMapSet(map, key, value);
expect(map.get(key)).to.deep.eq(new Set([value]));
});
it("Push to array if already present", () => {
const map = new Map();
const key = "key";
const value1 = "value1";
const value2 = "value2";
pushOrInitMapSet(map, key, value1);
pushOrInitMapSet(map, key, value2);
expect(map.get(key)).to.deep.eq(new Set([value1, value2]));
});
});

View File

@ -0,0 +1,13 @@
export function pushOrInitMapSet<K, V>(
map: Map<K, Set<V>>,
key: K,
newValue: V
): void {
let arr = map.get(key);
if (typeof arr === "undefined") {
map.set(key, new Set());
arr = map.get(key) as Set<V>;
}
arr.add(newValue);
}

View File

@ -10,11 +10,12 @@ import {
import { createLightNode, createPrivacyNode } from "./create_waku";
import { generateSymmetricKey } from "./crypto";
import type { Waku, WakuLight, WakuPrivacy } from "./interfaces";
import type { Message, Waku, WakuLight, WakuPrivacy } from "./interfaces";
import { PeerDiscoveryStaticPeers } from "./peer_discovery_static_list";
import { bytesToUtf8, utf8ToBytes } from "./utils";
import { waitForRemotePeer } from "./wait_for_remote_peer";
import { Protocols } from "./waku";
import { WakuMessage } from "./waku_message";
import { SymDecoder, SymEncoder } from "./waku_message/version_1.js";
const TestContentTopic = "/test/1/waku/utf8";
@ -158,31 +159,26 @@ describe("Decryption Keys", () => {
this.timeout(10000);
const symKey = generateSymmetricKey();
const decoder = new SymDecoder(TestContentTopic, symKey);
waku2.addDecryptionKey(symKey);
const encoder = new SymEncoder(TestContentTopic, symKey);
const messageText = "Message is encrypted";
const messageTimestamp = new Date("1995-12-17T03:24:00");
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic,
{
timestamp: messageTimestamp,
symKey,
}
);
const message = {
payload: utf8ToBytes(messageText),
timestamp: messageTimestamp,
};
const receivedMsgPromise: Promise<WakuMessage> = new Promise((resolve) => {
waku2.relay.addObserver(resolve);
const receivedMsgPromise: Promise<Message> = new Promise((resolve) => {
waku2.relay.addObserver(decoder, resolve);
});
await waku1.relay.send(message);
await waku1.relay.send(encoder, message);
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
expect(receivedMsg.contentTopic).to.eq(TestContentTopic);
expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText);
expect(receivedMsg.timestamp?.valueOf()).to.eq(messageTimestamp.valueOf());
});
});

View File

@ -10,7 +10,7 @@ import type { Libp2p } from "libp2p";
import { Waku } from "./interfaces";
import { FilterCodec, WakuFilter } from "./waku_filter";
import { LightPushCodec, WakuLightPush } from "./waku_light_push";
import { DecryptionMethod, WakuMessage } from "./waku_message";
import { EncoderV0 } from "./waku_message/version_0";
import { WakuRelay } from "./waku_relay";
import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants";
import * as relayConstants from "./waku_relay/constants";
@ -43,7 +43,6 @@ export interface WakuOptions {
* @default {@link DefaultRelayKeepAliveValueSecs}
*/
relayKeepAlive?: number;
decryptionKeys?: Array<Uint8Array | string>;
}
export class WakuNode implements Waku {
@ -110,10 +109,6 @@ export class WakuNode implements Waku {
libp2p.connectionManager.addEventListener("peer:disconnect", (evt) => {
this.stopKeepAlive(evt.detail.remotePeer);
});
options?.decryptionKeys?.forEach((key) => {
this.addDecryptionKey(key);
});
}
/**
@ -183,34 +178,6 @@ export class WakuNode implements Waku {
return this.libp2p.isStarted();
}
/**
* Register a decryption key to attempt decryption of messages received via
* { @link WakuRelay } and { @link WakuStore }. This can either be a private key for
* asymmetric encryption or a symmetric key.
*
* Strings must be in hex format.
*/
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void {
if (this.relay) this.relay.addDecryptionKey(key, options);
if (this.store) this.store.addDecryptionKey(key, options);
if (this.filter) this.filter.addDecryptionKey(key, options);
}
/**
* Delete a decryption key that was used to attempt decryption of messages
* received via { @link WakuRelay } or { @link WakuStore }.
*
* Strings must be in hex format.
*/
deleteDecryptionKey(key: Uint8Array | string): void {
if (this.relay) this.relay.deleteDecryptionKey(key);
if (this.store) this.store.deleteDecryptionKey(key);
if (this.filter) this.filter.deleteDecryptionKey(key);
}
/**
* Return the local multiaddr with peer id on which libp2p is listening.
*
@ -246,11 +213,12 @@ export class WakuNode implements Waku {
const relay = this.relay;
if (relay && relayPeriodSecs !== 0) {
const encoder = new EncoderV0(RelayPingContentTopic);
this.relayKeepAliveTimers[peerIdStr] = setInterval(() => {
log("Sending Waku Relay ping message");
WakuMessage.fromBytes(new Uint8Array(), RelayPingContentTopic).then(
(wakuMsg) => relay.send(wakuMsg)
);
relay
.send(encoder, { payload: new Uint8Array() })
.catch((e) => log("Failed to send relay ping", e));
}, relayPeriodSecs * 1000);
}
}

View File

@ -4,10 +4,11 @@ import debug from "debug";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
import { delay } from "../../test_utils/delay";
import { createFullNode } from "../create_waku";
import type { WakuFull } from "../interfaces";
import type { Message, WakuFull } from "../interfaces";
import { bytesToUtf8, utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku";
import { WakuMessage } from "../waku_message";
import { DecoderV0, EncoderV0 } from "../waku_message/version_0";
const log = debug("waku:test");
@ -40,22 +41,25 @@ describe("Waku Filter", () => {
let messageCount = 0;
const messageText = "Filtering works!";
const callback = (msg: WakuMessage): void => {
const message = { payload: utf8ToBytes(messageText) };
const callback = (msg: Message): void => {
log("Got a message");
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
expect(msg.payloadAsUtf8).to.eq(messageText);
expect(bytesToUtf8(msg.payload!)).to.eq(messageText);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
const decoder = new DecoderV0(TestContentTopic);
await waku.filter.subscribe([decoder], callback);
// As the filter protocol does not cater for an ack of subscription
// we cannot know whether the subscription happened. Something we want to
// correct in future versions of the protocol.
await delay(200);
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
await waku.lightPush.push(message);
const encoder = new EncoderV0(TestContentTopic);
await waku.lightPush.push(encoder, message);
while (messageCount === 0) {
await delay(250);
}
@ -66,21 +70,21 @@ describe("Waku Filter", () => {
this.timeout(10000);
let messageCount = 0;
const callback = (msg: WakuMessage): void => {
const callback = (msg: Message): void => {
messageCount++;
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
const decoder = new DecoderV0(TestContentTopic);
await waku.filter.subscribe([decoder], callback);
await delay(200);
await waku.lightPush.push(
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
);
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"Filtering still works!",
TestContentTopic
)
);
const encoder = new EncoderV0(TestContentTopic);
await waku.lightPush.push(encoder, {
payload: utf8ToBytes("Filtering works!"),
});
await waku.lightPush.push(encoder, {
payload: utf8ToBytes("Filtering still works!"),
});
while (messageCount < 2) {
await delay(250);
}
@ -92,25 +96,21 @@ describe("Waku Filter", () => {
const callback = (): void => {
messageCount++;
};
const unsubscribe = await waku.filter.subscribe(callback, [
TestContentTopic,
]);
const decoder = new DecoderV0(TestContentTopic);
const unsubscribe = await waku.filter.subscribe([decoder], callback);
const encoder = new EncoderV0(TestContentTopic);
await delay(200);
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should be received",
TestContentTopic
)
);
await waku.lightPush.push(encoder, {
payload: utf8ToBytes("This should be received"),
});
await delay(100);
await unsubscribe();
await delay(200);
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should not be received",
TestContentTopic
)
);
await waku.lightPush.push(encoder, {
payload: utf8ToBytes("This should not be received"),
});
await delay(100);
expect(messageCount).to.eq(1);
});

View File

@ -10,14 +10,14 @@ import type { Libp2p } from "libp2p";
import { WakuMessage as WakuMessageProto } from "../../proto/message";
import { DefaultPubSubTopic } from "../constants";
import { groupByContentTopic } from "../group_by";
import { Decoder, Message } from "../interfaces";
import { selectConnection } from "../select_connection";
import {
getPeersForProtocol,
selectPeerForProtocol,
selectRandomPeer,
} from "../select_peer";
import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { ContentFilter, FilterRPC } from "./filter_rpc";
export { ContentFilter };
@ -49,7 +49,7 @@ export type FilterSubscriptionOpts = {
peerId?: PeerId;
};
export type FilterCallback = (msg: WakuMessage) => void | Promise<void>;
export type FilterCallback = (msg: Message) => void | Promise<void>;
export type UnsubscribeFunction = () => Promise<void>;
@ -63,14 +63,14 @@ export type UnsubscribeFunction = () => Promise<void>;
export class WakuFilter {
pubSubTopic: string;
private subscriptions: Map<string, FilterCallback>;
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
public decoders: Map<
string, // content topic
Set<Decoder>
>;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
this.subscriptions = new Map();
this.decryptionKeys = new Map();
this.decoders = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.libp2p
.handle(FilterCodec, this.onRequest.bind(this))
@ -78,17 +78,21 @@ export class WakuFilter {
}
/**
* @param contentTopics Array of ContentTopics to subscribe to. If empty, no messages will be returned from the filter.
* @param decoders Array of Decoders to use to decode messages, it also specifies the content topics.
* @param callback A function that will be called on each message returned by the filter.
* @param opts The FilterSubscriptionOpts used to narrow which messages are returned, and which peer to connect to.
* @returns Unsubscribe function that can be used to end the subscription.
*/
async subscribe(
decoders: Decoder[],
callback: FilterCallback,
contentTopics: string[],
opts?: FilterSubscriptionOpts
): Promise<UnsubscribeFunction> {
const topic = opts?.pubsubTopic ?? this.pubSubTopic;
const groupedDecoders = groupByContentTopic(decoders);
const contentTopics = Array.from(groupedDecoders.keys());
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
}));
@ -130,11 +134,13 @@ export class WakuFilter {
throw e;
}
this.addDecoders(groupedDecoders);
this.addCallback(requestId, callback);
return async () => {
await this.unsubscribe(topic, contentFilters, requestId, peer);
this.removeCallback(requestId);
this.deleteDecoders(groupedDecoders);
this.deleteCallback(requestId);
};
}
@ -171,23 +177,35 @@ export class WakuFilter {
return;
}
const decryptionParams = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
method,
contentTopics,
};
for (const protoMessage of messages) {
const contentTopic = protoMessage.contentTopic;
if (!contentTopic) {
log("Message has no content topic, skipping");
return;
}
);
for (const message of messages) {
const decoded = await WakuMessage.decodeProto(message, decryptionParams);
if (!decoded) {
log("Not able to decode message");
continue;
const decoders = this.decoders.get(contentTopic);
if (!decoders) {
log("No decoder for", contentTopic);
return;
}
callback(decoded);
let msg: Message | undefined;
// We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works
// noinspection ES6MissingAwait
decoders.forEach(async (dec) => {
if (msg) return;
const decoded = await dec.decode(protoMessage);
if (!decoded) {
log("Not able to decode message");
return;
}
// This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises
msg = decoded;
await callback(decoded);
});
}
}
@ -195,10 +213,32 @@ export class WakuFilter {
this.subscriptions.set(requestId, callback);
}
private removeCallback(requestId: string): void {
private deleteCallback(requestId: string): void {
this.subscriptions.delete(requestId);
}
private addDecoders(decoders: Map<string, Array<Decoder>>): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (!currDecs) {
this.decoders.set(contentTopic, new Set(decoders));
} else {
this.decoders.set(contentTopic, new Set([...currDecs, ...decoders]));
}
});
}
private deleteDecoders(decoders: Map<string, Array<Decoder>>): void {
decoders.forEach((decoders, contentTopic) => {
const currDecs = this.decoders.get(contentTopic);
if (currDecs) {
decoders.forEach((dec) => {
currDecs.delete(dec);
});
}
});
}
private async unsubscribe(
topic: string,
contentFilters: ContentFilter[],
@ -243,30 +283,6 @@ export class WakuFilter {
return res.peer;
}
/**
* Register a decryption key to attempt decryption of messages received in any
* subsequent { @link subscribe } call. This can either be a private key for
* asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to
* decrypt messages using both methods.
*
* Strings must be in hex format.
*/
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void {
this.decryptionKeys.set(hexToBytes(key), options ?? {});
}
/**
* Delete a decryption key so that it cannot be used in future { @link subscribe } calls
*
* Strings must be in hex format.
*/
deleteDecryptionKey(key: Uint8Array | string): void {
this.decryptionKeys.delete(hexToBytes(key));
}
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]);
}

View File

@ -10,10 +10,10 @@ import {
import { delay } from "../../test_utils/delay";
import { createFullNode } from "../create_waku";
import type { WakuFull } from "../interfaces";
import { bytesToUtf8 } from "../utils";
import { bytesToUtf8, utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku";
import { WakuMessage } from "../waku_message";
import { EncoderV0 } from "../waku_message/version_0";
const log = debug("waku:test:lightpush");
@ -42,12 +42,11 @@ describe("Waku Light Push [node only]", () => {
await waitForRemotePeer(waku, [Protocols.LightPush]);
const messageText = "Light Push works!";
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
const encoder = new EncoderV0(TestContentTopic);
const pushResponse = await waku.lightPush.push(message);
const pushResponse = await waku.lightPush.push(encoder, {
payload: utf8ToBytes(messageText),
});
expect(pushResponse?.isSuccess).to.be.true;
let msgs: MessageRpcResponse[] = [];
@ -57,8 +56,7 @@ describe("Waku Light Push [node only]", () => {
msgs = await nwaku.messages();
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
expect(bytesToUtf8(new Uint8Array(msgs[0].payload))).to.equal(messageText);
});
@ -81,16 +79,17 @@ describe("Waku Light Push [node only]", () => {
const nimPeerId = await nwaku.getPeerId();
const messageText = "Light Push works!";
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
const encoder = new EncoderV0(TestContentTopic);
log("Send message via lightpush");
const pushResponse = await waku.lightPush.push(message, {
peerId: nimPeerId,
pubSubTopic: customPubSubTopic,
});
const pushResponse = await waku.lightPush.push(
encoder,
{ payload: utf8ToBytes(messageText) },
{
peerId: nimPeerId,
pubSubTopic: customPubSubTopic,
}
);
log("Ack received", pushResponse);
expect(pushResponse?.isSuccess).to.be.true;
@ -102,8 +101,7 @@ describe("Waku Light Push [node only]", () => {
msgs = await nwaku.messages(customPubSubTopic);
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
expect(bytesToUtf8(new Uint8Array(msgs[0].payload))!).to.equal(messageText);
});
});

View File

@ -9,13 +9,13 @@ import { Uint8ArrayList } from "uint8arraylist";
import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants";
import { Encoder, Message } from "../interfaces";
import { selectConnection } from "../select_connection";
import {
getPeersForProtocol,
selectPeerForProtocol,
selectRandomPeer,
} from "../select_peer";
import { WakuMessage } from "../waku_message";
import { PushRPC } from "./push_rpc";
@ -52,9 +52,12 @@ export class WakuLightPush {
}
async push(
message: WakuMessage,
encoder: Encoder,
message: Message,
opts?: PushOptions
): Promise<PushResponse | null> {
): Promise<PushResponse | undefined> {
const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic;
const res = await selectPeerForProtocol(
this.libp2p.peerStore,
[LightPushCodec],
@ -73,10 +76,12 @@ export class WakuLightPush {
const stream = await connection.newStream(LightPushCodec);
try {
const pubSubTopic = opts?.pubSubTopic
? opts.pubSubTopic
: this.pubSubTopic;
const query = PushRPC.createRequest(message, pubSubTopic);
const protoMessage = await encoder.encodeProto(message);
if (!protoMessage) {
log("Failed to encode to protoMessage, aborting push");
return;
}
const query = PushRPC.createRequest(protoMessage, pubSubTopic);
const res = await pipe(
[query.encode()],
lp.encode(),
@ -94,7 +99,7 @@ export class WakuLightPush {
if (!response) {
log("No response in PushRPC");
return null;
return;
}
return response;
@ -104,7 +109,7 @@ export class WakuLightPush {
} catch (err) {
log("Failed to send waku light push request", err);
}
return null;
return;
}
/**

View File

@ -2,16 +2,18 @@ import type { Uint8ArrayList } from "uint8arraylist";
import { v4 as uuid } from "uuid";
import * as proto from "../../proto/light_push";
import { WakuMessage } from "../waku_message";
export class PushRPC {
public constructor(public proto: proto.PushRPC) {}
static createRequest(message: WakuMessage, pubSubTopic: string): PushRPC {
static createRequest(
message: proto.WakuMessage,
pubSubTopic: string
): PushRPC {
return new PushRPC({
requestId: uuid(),
request: {
message: message.proto,
message: message,
pubSubTopic: pubSubTopic,
},
response: undefined,

View File

@ -1,191 +0,0 @@
import { expect } from "chai";
import debug from "debug";
import {
makeLogFileName,
MessageRpcQuery,
MessageRpcResponseHex,
NOISE_KEY_1,
Nwaku,
} from "../../test_utils";
import { delay } from "../../test_utils/delay";
import { createPrivacyNode } from "../create_waku";
import {
generatePrivateKey,
generateSymmetricKey,
getPublicKey,
} from "../crypto";
import type { WakuPrivacy } from "../interfaces";
import { bytesToHex, bytesToUtf8, hexToBytes, utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku";
import { DecryptionMethod, WakuMessage } from "./index";
const log = debug("waku:test:message");
const TestContentTopic = "/test/1/waku-message/utf8";
describe("Waku Message [node only]", function () {
describe("Interop: nwaku", function () {
let waku: WakuPrivacy;
let nwaku: Nwaku;
beforeEach(async function () {
this.timeout(30_000);
waku = await createPrivacyNode({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
nwaku = new Nwaku(makeLogFileName(this));
log("Starting nwaku node");
await nwaku.start({ rpcPrivate: true });
log("Dialing to nwaku node");
await waku.dial(await nwaku.getMultiaddrWithId());
log("Wait for remote peer");
await waitForRemotePeer(waku, [Protocols.Relay]);
log("Remote peer ready");
// As this test uses the nwaku RPC API, we somehow often face
// Race conditions where the nwaku node does not have the js-waku
// Node in its relay mesh just yet.
await delay(500);
});
afterEach(async function () {
!!nwaku && nwaku.stop();
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Decrypts nwaku message [asymmetric, no signature]", async function () {
this.timeout(5000);
const messageText = "Here is an encrypted message.";
const message: MessageRpcQuery = {
contentTopic: TestContentTopic,
payload: bytesToHex(utf8ToBytes(messageText)),
};
const privateKey = generatePrivateKey();
waku.relay.addDecryptionKey(privateKey, {
method: DecryptionMethod.Asymmetric,
});
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
);
const publicKey = getPublicKey(privateKey);
log("Post message");
const res = await nwaku.postAsymmetricMessage(message, publicKey);
expect(res).to.be.true;
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(1);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
});
it("Encrypts message for nwaku [asymmetric, no signature]", async function () {
this.timeout(5000);
log("Ask nwaku to generate asymmetric key pair");
const keyPair = await nwaku.getAsymmetricKeyPair();
const privateKey = hexToBytes(keyPair.privateKey);
const publicKey = hexToBytes(keyPair.publicKey);
const messageText = "This is a message I am going to encrypt";
log("Encrypt message");
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic,
{
encPublicKey: publicKey,
}
);
log("Send message over relay");
await waku.relay.send(message);
let msgs: MessageRpcResponseHex[] = [];
while (msgs.length === 0) {
log("Wait for message to be seen by nwaku");
await delay(200);
msgs = await nwaku.getAsymmetricMessages(privateKey);
}
log("Check message content");
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(bytesToUtf8(hexToBytes(msgs[0].payload))).to.equal(messageText);
});
it("Decrypts nwaku message [symmetric, no signature]", async function () {
this.timeout(5000);
const messageText = "Here is a message encrypted in a symmetric manner.";
const message: MessageRpcQuery = {
contentTopic: TestContentTopic,
payload: bytesToHex(utf8ToBytes(messageText)),
};
log("Generate symmetric key");
const symKey = generateSymmetricKey();
waku.relay.addDecryptionKey(symKey, {
method: DecryptionMethod.Symmetric,
});
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
);
log("Post message using nwaku");
await nwaku.postSymmetricMessage(message, symKey);
log("Wait for message to be received by js-waku");
const receivedMsg = await receivedMsgPromise;
log("Message received by js-waku");
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(1);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
});
it("Encrypts message for nwaku [symmetric, no signature]", async function () {
this.timeout(5000);
log("Getting symmetric key from nwaku");
const symKey = await nwaku.getSymmetricKey();
log("Encrypting message with js-waku");
const messageText =
"This is a message I am going to encrypt with a symmetric key";
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic,
{
symKey: symKey,
}
);
log("Sending message over relay");
await waku.relay.send(message);
let msgs: MessageRpcResponseHex[] = [];
while (msgs.length === 0) {
await delay(200);
log("Getting messages from nwaku");
msgs = await nwaku.getSymmetricMessages(symKey);
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(bytesToUtf8(hexToBytes(msgs[0].payload))).to.equal(messageText);
});
});
});

View File

@ -1,138 +0,0 @@
import { expect } from "chai";
import fc from "fast-check";
import { getPublicKey } from "../crypto";
import { WakuMessage } from "./index";
const TestContentTopic = "/test/1/waku-message/utf8";
describe("Waku Message: Browser & Node", function () {
it("Waku message round trip binary serialization [clear]", async function () {
await fc.assert(
fc.asyncProperty(fc.string(), async (s) => {
const msg = await WakuMessage.fromUtf8String(s, TestContentTopic);
const binary = msg.encode();
const actual = await WakuMessage.decode(binary);
expect(actual).to.deep.equal(msg);
})
);
});
it("Payload to utf-8", async function () {
await fc.assert(
fc.asyncProperty(fc.string(), async (s) => {
const msg = await WakuMessage.fromUtf8String(s, TestContentTopic);
const utf8 = msg.payloadAsUtf8;
return utf8 === s;
})
);
});
it("Waku message round trip binary encryption [asymmetric, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, key) => {
const publicKey = getPublicKey(key);
const msg = await WakuMessage.fromBytes(payload, TestContentTopic, {
encPublicKey: publicKey,
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, [{ key }]);
expect(actual?.payload).to.deep.equal(payload);
}
)
);
});
it("Waku message round trip binary encryption [asymmetric, signature]", async function () {
this.timeout(4000);
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, sigPrivKey, encPrivKey) => {
const sigPubKey = getPublicKey(sigPrivKey);
const encPubKey = getPublicKey(encPrivKey);
const msg = await WakuMessage.fromBytes(payload, TestContentTopic, {
encPublicKey: encPubKey,
sigPrivKey: sigPrivKey,
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, [
{ key: encPrivKey },
]);
expect(actual?.payload).to.deep.equal(payload);
expect(actual?.signaturePublicKey).to.deep.equal(sigPubKey);
}
)
);
});
it("Waku message round trip binary encryption [symmetric, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, key) => {
const msg = await WakuMessage.fromBytes(payload, TestContentTopic, {
symKey: key,
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, [{ key }]);
expect(actual?.payload).to.deep.equal(payload);
}
)
);
});
it("Waku message round trip binary encryption [symmetric, signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, sigPrivKey, symKey) => {
const sigPubKey = getPublicKey(sigPrivKey);
const msg = await WakuMessage.fromBytes(payload, TestContentTopic, {
symKey: symKey,
sigPrivKey: sigPrivKey,
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, [{ key: symKey }]);
expect(actual?.payload).to.deep.equal(payload);
expect(actual?.signaturePublicKey).to.deep.equal(sigPubKey);
}
)
);
});
it("Waku message round trip utf-8 including emojis", async function () {
const messageText = "😁🤣🥧🤦👩‍🎓";
const wakuMessage = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
const decodedText = wakuMessage.payloadAsUtf8;
expect(decodedText).to.eq(messageText);
});
});

View File

@ -1,312 +0,0 @@
import debug from "debug";
import * as proto from "../../proto/message";
import { bytesToUtf8, utf8ToBytes } from "../utils";
import * as version_1 from "./version_1";
const DefaultVersion = 0;
const log = debug("waku:message");
const OneMillion = BigInt(1_000_000);
export enum DecryptionMethod {
Asymmetric = "asymmetric",
Symmetric = "symmetric",
}
export interface Options {
/**
* Timestamp to set on the message, defaults to now if not passed.
*/
timestamp?: Date;
/**
* Public Key to use to encrypt the messages using ECIES (Asymmetric Encryption).
*
* @throws if both `encPublicKey` and `symKey` are passed
*/
encPublicKey?: Uint8Array | string;
/**
* Key to use to encrypt the messages using AES (Symmetric Encryption).
*
* @throws if both `encPublicKey` and `symKey` are passed
*/
symKey?: Uint8Array | string;
/**
* Private key to use to sign the message, either `encPublicKey` or `symKey` must be provided as only
* encrypted messages are signed.
*/
sigPrivKey?: Uint8Array;
}
export interface DecryptionParams {
key: Uint8Array;
method?: DecryptionMethod;
contentTopics?: string[];
}
export class WakuMessage {
private constructor(
public proto: proto.WakuMessage,
private _signaturePublicKey?: Uint8Array,
private _signature?: Uint8Array
) {}
/**
* Create Message with an utf-8 string as payload.
*/
static async fromUtf8String(
utf8: string,
contentTopic: string,
opts?: Options
): Promise<WakuMessage> {
const payload = utf8ToBytes(utf8);
return WakuMessage.fromBytes(payload, contentTopic, opts);
}
/**
* Create a Waku Message with the given payload.
*
* By default, the payload is kept clear (version 0).
* If `opts.encPublicKey` is passed, the payload is encrypted using
* asymmetric encryption (version 1).
*
* If `opts.sigPrivKey` is passed and version 1 is used, the payload is signed
* before encryption.
*
* @throws if both `opts.encPublicKey` and `opt.symKey` are passed
*/
static async fromBytes(
payload: Uint8Array,
contentTopic: string,
opts?: Options
): Promise<WakuMessage> {
const { timestamp, encPublicKey, symKey, sigPrivKey } = Object.assign(
{ timestamp: new Date() },
opts ? opts : {}
);
let _payload = payload;
let version = DefaultVersion;
let sig;
if (encPublicKey && symKey) {
throw "Pass either `encPublicKey` or `symKey`, not both.";
}
if (encPublicKey) {
const enc = await version_1.clearEncode(_payload, sigPrivKey);
_payload = await version_1.encryptAsymmetric(enc.payload, encPublicKey);
sig = enc.sig;
version = 1;
} else if (symKey) {
const enc = await version_1.clearEncode(_payload, sigPrivKey);
_payload = await version_1.encryptSymmetric(enc.payload, symKey);
sig = enc.sig;
version = 1;
}
return new WakuMessage(
{
payload: _payload,
timestampDeprecated: timestamp.valueOf() / 1000,
// milliseconds 10^-3 to nanoseconds 10^-9
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
version,
contentTopic,
},
sig?.publicKey,
sig?.signature
);
}
/**
* Decode a byte array into Waku Message.
*
* @params bytes The message encoded using protobuf as defined in [14/WAKU2-MESSAGE](https://rfc.vac.dev/spec/14/).
* @params decryptionKeys If the payload is encrypted (version = 1), then the
* keys are used to attempt decryption of the message. The passed key can either
* be asymmetric private keys or symmetric keys, both method are tried for each
* key until the message is decrypted or combinations are run out.
*/
static async decode(
bytes: Uint8Array,
decryptionParams?: DecryptionParams[]
): Promise<WakuMessage | undefined> {
const protoBuf = proto.WakuMessage.decode(bytes);
return WakuMessage.decodeProto(protoBuf, decryptionParams);
}
/**
* Decode and decrypt Waku Message Protobuf Object into Waku Message.
*
* @params protoBuf The message to decode and decrypt.
* @params decryptionParams If the payload is encrypted (version = 1), then the
* keys are used to attempt decryption of the message. The passed key can either
* be asymmetric private keys or symmetric keys, both method are tried for each
* key until the message is decrypted or combinations are run out.
*/
static async decodeProto(
protoBuf: proto.WakuMessage,
decryptionParams?: DecryptionParams[]
): Promise<WakuMessage | undefined> {
const payload = protoBuf.payload;
let signaturePublicKey;
let signature;
if (protoBuf.version === 1 && payload) {
if (decryptionParams === undefined) {
log("Payload is encrypted but no private keys have been provided.");
return;
}
// Returns a bunch of `undefined` and hopefully one decrypted result
const allResults = await Promise.all(
decryptionParams.map(async ({ key, method, contentTopics }) => {
if (
!contentTopics ||
(protoBuf.contentTopic &&
contentTopics.includes(protoBuf.contentTopic))
) {
switch (method) {
case DecryptionMethod.Asymmetric:
try {
return await version_1.decryptAsymmetric(payload, key);
} catch (e) {
log(
"Failed to decrypt message using asymmetric encryption despite decryption method being specified",
e
);
return;
}
case DecryptionMethod.Symmetric:
try {
return await version_1.decryptSymmetric(payload, key);
} catch (e) {
log(
"Failed to decrypt message using asymmetric encryption despite decryption method being specified",
e
);
return;
}
default:
try {
return await version_1.decryptSymmetric(payload, key);
} catch (e) {
log(
"Failed to decrypt message using symmetric encryption",
e
);
try {
return await version_1.decryptAsymmetric(payload, key);
} catch (e) {
log(
"Failed to decrypt message using asymmetric encryption",
e
);
return;
}
}
}
} else {
// No key available for this content topic
return;
}
})
);
const isDefined = (dec: Uint8Array | undefined): dec is Uint8Array => {
return !!dec;
};
const decodedResults = allResults.filter(isDefined);
if (decodedResults.length === 0) {
log("Failed to decrypt payload.");
return;
}
const dec = decodedResults[0];
const res = await version_1.clearDecode(dec);
if (!res) {
log("Failed to decode payload.");
return;
}
Object.assign(protoBuf, { payload: res.payload });
signaturePublicKey = res.sig?.publicKey;
signature = res.sig?.signature;
}
return new WakuMessage(protoBuf, signaturePublicKey, signature);
}
encode(): Uint8Array {
return proto.WakuMessage.encode(this.proto);
}
get payloadAsUtf8(): string {
if (!this.payload) {
return "";
}
try {
return bytesToUtf8(this.payload);
} catch (e) {
log("Could not decode byte as UTF-8", e);
return "";
}
}
get payload(): Uint8Array | undefined {
if (this.proto.payload) {
return new Uint8Array(this.proto.payload);
}
return;
}
get contentTopic(): string | undefined {
return this.proto.contentTopic;
}
get version(): number {
// https://github.com/status-im/js-waku/issues/921
return this.proto.version ?? 0;
}
get timestamp(): Date | undefined {
// In the case we receive a value that is bigger than JS's max number,
// we catch the error and return undefined.
try {
if (this.proto.timestamp) {
// nanoseconds 10^-9 to milliseconds 10^-3
const timestamp = this.proto.timestamp / OneMillion;
return new Date(Number(timestamp));
}
if (this.proto.timestampDeprecated) {
return new Date(this.proto.timestampDeprecated * 1000);
}
} catch (e) {
return;
}
return;
}
/**
* The public key used to sign the message.
*
* MAY be present if the message is version 1.
*/
get signaturePublicKey(): Uint8Array | undefined {
return this._signaturePublicKey;
}
/**
* The signature of the message.
*
* MAY be present if the message is version 1.
*/
get signature(): Uint8Array | undefined {
return this._signature;
}
}

View File

@ -0,0 +1,25 @@
import { expect } from "chai";
import fc from "fast-check";
import { DecoderV0, EncoderV0, MessageV0 } from "./version_0";
const TestContentTopic = "/test/1/waku-message/utf8";
describe("Waku Message version 0", function () {
it("Round trip binary serialization", async function () {
await fc.assert(
fc.asyncProperty(fc.uint8Array({ minLength: 1 }), async (payload) => {
const encoder = new EncoderV0(TestContentTopic);
const bytes = await encoder.encode({ payload });
const decoder = new DecoderV0(TestContentTopic);
const protoResult = await decoder.decodeProto(bytes);
const result = (await decoder.decode(protoResult!)) as MessageV0;
expect(result.contentTopic).to.eq(TestContentTopic);
expect(result.version).to.eq(0);
expect(result.payload).to.deep.eq(payload);
expect(result.timestamp).to.not.be.undefined;
})
);
});
});

View File

@ -0,0 +1,105 @@
import debug from "debug";
import * as proto from "../../proto/message";
import { Decoder, Message, ProtoMessage } from "../interfaces";
import { Encoder } from "../interfaces";
const log = debug("waku:message:version-0");
const OneMillion = BigInt(1_000_000);
export const Version = 0;
export class MessageV0 implements Message {
constructor(private proto: proto.WakuMessage) {}
get _rawPayload(): Uint8Array | undefined {
if (this.proto.payload) {
return new Uint8Array(this.proto.payload);
}
return;
}
get payload(): Uint8Array | undefined {
return this._rawPayload;
}
get contentTopic(): string | undefined {
return this.proto.contentTopic;
}
get _rawTimestamp(): bigint | undefined {
return this.proto.timestamp;
}
get timestamp(): Date | undefined {
// In the case we receive a value that is bigger than JS's max number,
// we catch the error and return undefined.
try {
if (this.proto.timestamp) {
// nanoseconds 10^-9 to milliseconds 10^-3
const timestamp = this.proto.timestamp / OneMillion;
return new Date(Number(timestamp));
}
if (this.proto.timestampDeprecated) {
return new Date(this.proto.timestampDeprecated * 1000);
}
} catch (e) {
return;
}
return;
}
get version(): number {
// https://github.com/status-im/js-waku/issues/921
return this.proto.version ?? 0;
}
}
export class EncoderV0 implements Encoder {
constructor(public contentTopic: string) {}
async encode(message: Message): Promise<Uint8Array> {
return proto.WakuMessage.encode(await this.encodeProto(message));
}
async encodeProto(message: Message): Promise<ProtoMessage> {
const timestamp = message.timestamp ?? new Date();
return {
payload: message.payload,
version: Version,
contentTopic: message.contentTopic ?? this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
};
}
}
export class DecoderV0 implements Decoder {
constructor(public contentTopic: string) {}
decodeProto(bytes: Uint8Array): Promise<ProtoMessage | undefined> {
const protoMessage = proto.WakuMessage.decode(bytes);
log("Message decoded", protoMessage);
return Promise.resolve(protoMessage);
}
async decode(proto: ProtoMessage): Promise<Message | undefined> {
// https://github.com/status-im/js-waku/issues/921
if (proto.version === undefined) {
proto.version = 0;
}
if (proto.version !== Version) {
log(
"Failed to decode due to incorrect version, expected:",
Version,
", actual:",
proto.version
);
return Promise.resolve(undefined);
}
return new MessageV0(proto);
}
}

View File

@ -2,46 +2,140 @@ import { expect } from "chai";
import fc from "fast-check";
import { getPublicKey } from "../crypto";
import { bytesToHex } from "../utils";
import {
clearDecode,
clearEncode,
AsymDecoder,
AsymEncoder,
decryptAsymmetric,
decryptSymmetric,
encryptAsymmetric,
encryptSymmetric,
postCipher,
preCipher,
SymDecoder,
SymEncoder,
} from "./version_1";
describe("Waku Message Version 1", function () {
it("Sign & Recover", function () {
fc.assert(
const TestContentTopic = "/test/1/waku-message/utf8";
describe("Waku Message version 1", function () {
it("Round trip binary encryption [asymmetric, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array(),
fc.uint8Array({ minLength: 32, maxLength: 32 }),
async (message, privKey) => {
const enc = await clearEncode(message, privKey);
const res = clearDecode(enc.payload);
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, privateKey) => {
const publicKey = getPublicKey(privateKey);
const pubKey = getPublicKey(privKey);
const encoder = new AsymEncoder(TestContentTopic, publicKey);
const bytes = await encoder.encode({ payload });
expect(res?.payload).deep.equal(
message,
"Payload was not encrypted then decrypted correctly"
);
expect(res?.sig?.publicKey).deep.equal(
pubKey,
"signature Public key was not recovered from encrypted then decrypted signature"
);
expect(enc?.sig?.publicKey).deep.equal(
pubKey,
"Incorrect signature public key was returned when signing the payload"
);
const decoder = new AsymDecoder(TestContentTopic, privateKey);
const protoResult = await decoder.decodeProto(bytes!);
if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult);
if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic);
expect(result.version).to.equal(1);
expect(result?.payload).to.deep.equal(payload);
expect(result.signature).to.be.undefined;
expect(result.signaturePublicKey).to.be.undefined;
}
)
);
});
it("R trip binary encryption [asymmetric, signature]", async function () {
this.timeout(4000);
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, alicePrivateKey, bobPrivateKey) => {
const alicePublicKey = getPublicKey(alicePrivateKey);
const bobPublicKey = getPublicKey(bobPrivateKey);
const encoder = new AsymEncoder(
TestContentTopic,
bobPublicKey,
alicePrivateKey
);
const bytes = await encoder.encode({ payload });
const decoder = new AsymDecoder(TestContentTopic, bobPrivateKey);
const protoResult = await decoder.decodeProto(bytes!);
if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult);
if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic);
expect(result.version).to.equal(1);
expect(result?.payload).to.deep.equal(payload);
expect(result.signature).to.not.be.undefined;
expect(result.signaturePublicKey).to.deep.eq(alicePublicKey);
}
)
);
});
it("Round trip binary encryption [symmetric, no signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, symKey) => {
const encoder = new SymEncoder(TestContentTopic, symKey);
const bytes = await encoder.encode({ payload });
const decoder = new SymDecoder(TestContentTopic, symKey);
const protoResult = await decoder.decodeProto(bytes!);
if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult);
if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic);
expect(result.version).to.equal(1);
expect(result?.payload).to.deep.equal(payload);
expect(result.signature).to.be.undefined;
expect(result.signaturePublicKey).to.be.undefined;
}
)
);
});
it("Round trip binary encryption [symmetric, signature]", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
fc.uint8Array({ min: 1, minLength: 32, maxLength: 32 }),
async (payload, sigPrivKey, symKey) => {
const sigPubKey = getPublicKey(sigPrivKey);
const encoder = new SymEncoder(TestContentTopic, symKey, sigPrivKey);
const bytes = await encoder.encode({ payload });
const decoder = new SymDecoder(TestContentTopic, symKey);
const protoResult = await decoder.decodeProto(bytes!);
if (!protoResult) throw "Failed to proto decode";
const result = await decoder.decode(protoResult);
if (!result) throw "Failed to decode";
expect(result.contentTopic).to.equal(TestContentTopic);
expect(result.version).to.equal(1);
expect(result?.payload).to.deep.equal(payload);
expect(result.signature).to.not.be.undefined;
expect(result.signaturePublicKey).to.deep.eq(sigPubKey);
}
)
);
});
});
describe("Encryption helpers", () => {
it("Asymmetric encrypt & decrypt", async function () {
await fc.assert(
fc.asyncProperty(
@ -74,14 +168,41 @@ describe("Waku Message Version 1", function () {
);
});
it("Clear encode and decode", async function () {
it("pre and post cipher", async function () {
await fc.assert(
fc.asyncProperty(fc.uint8Array(), async (payload) => {
const enc = await clearEncode(payload);
const dec = clearDecode(enc.payload);
if (!dec?.payload) throw "payload missing";
expect(bytesToHex(dec?.payload)).to.eq(bytesToHex(payload));
fc.asyncProperty(fc.uint8Array(), async (message) => {
const enc = await preCipher(message);
const res = postCipher(enc);
expect(res?.payload).deep.equal(
message,
"Payload was not encrypted then decrypted correctly"
);
})
);
});
it("Sign & Recover", async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array(),
fc.uint8Array({ minLength: 32, maxLength: 32 }),
async (message, sigPrivKey) => {
const sigPubKey = getPublicKey(sigPrivKey);
const enc = await preCipher(message, sigPrivKey);
const res = postCipher(enc);
expect(res?.payload).deep.equal(
message,
"Payload was not encrypted then decrypted correctly"
);
expect(res?.sig?.publicKey).deep.equal(
sigPubKey,
"signature Public key was not recovered from encrypted then decrypted signature"
);
}
)
);
});
});

View File

@ -1,100 +1,225 @@
import * as secp from "@noble/secp256k1";
import debug from "debug";
import * as proto from "../../proto/message";
import { keccak256, randomBytes, sign } from "../crypto";
import { Decoder, Encoder, Message, ProtoMessage } from "../interfaces";
import { concat, hexToBytes } from "../utils";
import { Symmetric } from "./constants";
import * as ecies from "./ecies";
import * as symmetric from "./symmetric";
import { DecoderV0, MessageV0 } from "./version_0";
const log = debug("waku:message:version-1");
const FlagsLength = 1;
const FlagMask = 3; // 0011
const IsSignedMask = 4; // 0100
const PaddingTarget = 256;
const SignatureLength = 65;
const OneMillion = BigInt(1_000_000);
export const Version = 1;
export type Signature = {
signature: Uint8Array;
publicKey: Uint8Array | undefined;
};
/**
* Encode the payload pre-encryption.
*
* @internal
* @param messagePayload: The payload to include in the message
* @param sigPrivKey: If set, a signature using this private key is added.
* @returns The encoded payload, ready for encryption using {@link encryptAsymmetric}
* or {@link encryptSymmetric}.
*/
export async function clearEncode(
messagePayload: Uint8Array,
sigPrivKey?: Uint8Array
): Promise<{ payload: Uint8Array; sig?: Signature }> {
let envelope = new Uint8Array([0]); // No flags
envelope = addPayloadSizeField(envelope, messagePayload);
envelope = concat([envelope, messagePayload]);
export class MessageV1 extends MessageV0 implements Message {
private readonly _decodedPayload: Uint8Array;
// Calculate padding:
let rawSize =
FlagsLength +
computeSizeOfPayloadSizeField(messagePayload) +
messagePayload.length;
if (sigPrivKey) {
rawSize += SignatureLength;
constructor(
proto: proto.WakuMessage,
decodedPayload: Uint8Array,
public signature?: Uint8Array,
public signaturePublicKey?: Uint8Array
) {
super(proto);
this._decodedPayload = decodedPayload;
}
const remainder = rawSize % PaddingTarget;
const paddingSize = PaddingTarget - remainder;
const pad = randomBytes(paddingSize);
if (!validateDataIntegrity(pad, paddingSize)) {
throw new Error("failed to generate random padding of size " + paddingSize);
get payload(): Uint8Array {
return this._decodedPayload;
}
envelope = concat([envelope, pad]);
let sig;
if (sigPrivKey) {
envelope[0] |= IsSignedMask;
const hash = keccak256(envelope);
const bytesSignature = await sign(hash, sigPrivKey);
envelope = concat([envelope, bytesSignature]);
sig = {
signature: bytesSignature,
publicKey: secp.getPublicKey(sigPrivKey, false),
};
}
return { payload: envelope, sig };
}
/**
* Decode a decrypted payload.
*
* @internal
*/
export function clearDecode(
message: Uint8Array
): { payload: Uint8Array; sig?: Signature } | undefined {
const sizeOfPayloadSizeField = getSizeOfPayloadSizeField(message);
if (sizeOfPayloadSizeField === 0) return;
export class AsymEncoder implements Encoder {
constructor(
public contentTopic: string,
private publicKey: Uint8Array,
private sigPrivKey?: Uint8Array
) {}
const payloadSize = getPayloadSize(message, sizeOfPayloadSizeField);
const payloadStart = 1 + sizeOfPayloadSizeField;
const payload = message.slice(payloadStart, payloadStart + payloadSize);
async encode(message: Message): Promise<Uint8Array | undefined> {
const protoMessage = await this.encodeProto(message);
if (!protoMessage) return;
const isSigned = isMessageSigned(message);
let sig;
if (isSigned) {
const signature = getSignature(message);
const hash = getHash(message, isSigned);
const publicKey = ecRecoverPubKey(hash, signature);
sig = { signature, publicKey };
return proto.WakuMessage.encode(protoMessage);
}
return { payload, sig };
async encodeProto(message: Message): Promise<ProtoMessage | undefined> {
const timestamp = message.timestamp ?? new Date();
if (!message.payload) {
log("No payload to encrypt, skipping: ", message);
return;
}
const preparedPayload = await preCipher(message.payload, this.sigPrivKey);
const payload = await encryptAsymmetric(preparedPayload, this.publicKey);
return {
payload,
version: Version,
contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
};
}
}
export class SymEncoder implements Encoder {
constructor(
public contentTopic: string,
private symKey: Uint8Array,
private sigPrivKey?: Uint8Array
) {}
async encode(message: Message): Promise<Uint8Array | undefined> {
const protoMessage = await this.encodeProto(message);
if (!protoMessage) return;
return proto.WakuMessage.encode(protoMessage);
}
async encodeProto(message: Message): Promise<ProtoMessage | undefined> {
const timestamp = message.timestamp ?? new Date();
if (!message.payload) {
log("No payload to encrypt, skipping: ", message);
return;
}
const preparedPayload = await preCipher(message.payload, this.sigPrivKey);
const payload = await encryptSymmetric(preparedPayload, this.symKey);
return {
payload,
version: Version,
contentTopic: this.contentTopic,
timestamp: BigInt(timestamp.valueOf()) * OneMillion,
};
}
}
export class AsymDecoder extends DecoderV0 implements Decoder {
constructor(contentTopic: string, private privateKey: Uint8Array) {
super(contentTopic);
}
async decode(protoMessage: ProtoMessage): Promise<MessageV1 | undefined> {
const cipherPayload = protoMessage.payload;
if (protoMessage.version !== Version) {
log(
"Failed to decrypt due to incorrect version, expected:",
Version,
", actual:",
protoMessage.version
);
return;
}
let payload;
if (!cipherPayload) {
log(`No payload to decrypt for contentTopic ${this.contentTopic}`);
return;
}
try {
payload = await decryptAsymmetric(cipherPayload, this.privateKey);
} catch (e) {
log(
`Failed to decrypt message using asymmetric decryption for contentTopic: ${this.contentTopic}`,
e
);
return;
}
if (!payload) {
log(`Failed to decrypt payload for contentTopic ${this.contentTopic}`);
return;
}
const res = await postCipher(payload);
if (!res) {
log(`Failed to decode payload for contentTopic ${this.contentTopic}`);
return;
}
log("Message decrypted", protoMessage);
return new MessageV1(
protoMessage,
res.payload,
res.sig?.signature,
res.sig?.publicKey
);
}
}
export class SymDecoder extends DecoderV0 implements Decoder {
constructor(contentTopic: string, private symKey: Uint8Array) {
super(contentTopic);
}
async decode(protoMessage: ProtoMessage): Promise<MessageV1 | undefined> {
const cipherPayload = protoMessage.payload;
if (protoMessage.version !== Version) {
log(
"Failed to decrypt due to incorrect version, expected:",
Version,
", actual:",
protoMessage.version
);
return;
}
let payload;
if (!cipherPayload) {
log(`No payload to decrypt for contentTopic ${this.contentTopic}`);
return;
}
try {
payload = await decryptSymmetric(cipherPayload, this.symKey);
} catch (e) {
log(
`Failed to decrypt message using asymmetric decryption for contentTopic: ${this.contentTopic}`,
e
);
return;
}
if (!payload) {
log(`Failed to decrypt payload for contentTopic ${this.contentTopic}`);
return;
}
const res = await postCipher(payload);
if (!res) {
log(`Failed to decode payload for contentTopic ${this.contentTopic}`);
return;
}
log("Message decrypted", protoMessage);
return new MessageV1(
protoMessage,
res.payload,
res.sig?.signature,
res.sig?.publicKey
);
}
}
function getSizeOfPayloadSizeField(message: Uint8Array): number {
@ -246,12 +371,77 @@ function ecRecoverPubKey(
const recovery = recoveryDataView.getUint8(0);
const _signature = secp.Signature.fromCompact(signature.slice(0, 64));
return secp.recoverPublicKey(
messageHash,
_signature,
recovery,
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: compressed: false
false
);
return secp.recoverPublicKey(messageHash, _signature, recovery, false);
}
/**
* Prepare the payload pre-encryption.
*
* @internal
* @returns The encoded payload, ready for encryption using {@link encryptAsymmetric}
* or {@link encryptSymmetric}.
*/
export async function preCipher(
messagePayload: Uint8Array,
sigPrivKey?: Uint8Array
): Promise<Uint8Array> {
let envelope = new Uint8Array([0]); // No flags
envelope = addPayloadSizeField(envelope, messagePayload);
envelope = concat([envelope, messagePayload]);
// Calculate padding:
let rawSize =
FlagsLength +
computeSizeOfPayloadSizeField(messagePayload) +
messagePayload.length;
if (sigPrivKey) {
rawSize += SignatureLength;
}
const remainder = rawSize % PaddingTarget;
const paddingSize = PaddingTarget - remainder;
const pad = randomBytes(paddingSize);
if (!validateDataIntegrity(pad, paddingSize)) {
throw new Error("failed to generate random padding of size " + paddingSize);
}
envelope = concat([envelope, pad]);
if (sigPrivKey) {
envelope[0] |= IsSignedMask;
const hash = keccak256(envelope);
const bytesSignature = await sign(hash, sigPrivKey);
envelope = concat([envelope, bytesSignature]);
}
return envelope;
}
/**
* Decode a decrypted payload.
*
* @internal
*/
export function postCipher(
message: Uint8Array
): { payload: Uint8Array; sig?: Signature } | undefined {
const sizeOfPayloadSizeField = getSizeOfPayloadSizeField(message);
if (sizeOfPayloadSizeField === 0) return;
const payloadSize = getPayloadSize(message, sizeOfPayloadSizeField);
const payloadStart = 1 + sizeOfPayloadSizeField;
const payload = message.slice(payloadStart, payloadStart + payloadSize);
const isSigned = isMessageSigned(message);
let sig;
if (isSigned) {
const signature = getSignature(message);
const hash = getHash(message, isSigned);
const publicKey = ecRecoverPubKey(hash, signature);
sig = { signature, publicKey };
}
return { payload, sig };
}

View File

@ -18,15 +18,23 @@ import {
generateSymmetricKey,
getPublicKey,
} from "../crypto";
import type { WakuPrivacy } from "../interfaces";
import type { Message, WakuPrivacy } from "../interfaces";
import { bytesToUtf8, utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { DecoderV0, EncoderV0, MessageV0 } from "../waku_message/version_0.js";
import {
AsymDecoder,
AsymEncoder,
SymDecoder,
SymEncoder,
} from "../waku_message/version_1.js";
const log = debug("waku:test");
const TestContentTopic = "/test/1/waku-relay/utf8";
const TestEncoder = new EncoderV0(TestContentTopic);
const TestDecoder = new DecoderV0(TestContentTopic);
describe("Waku Relay [node only]", () => {
// Node needed as we don't have a way to connect 2 js waku
@ -100,27 +108,21 @@ describe("Waku Relay [node only]", () => {
const messageText = "JS to JS communication works";
const messageTimestamp = new Date("1995-12-17T03:24:00");
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic,
{
timestamp: messageTimestamp,
}
);
const message = {
payload: utf8ToBytes(messageText),
timestamp: messageTimestamp,
};
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve);
}
);
const receivedMsgPromise: Promise<Message> = new Promise((resolve) => {
waku2.relay.addObserver(TestDecoder, resolve);
});
await waku1.relay.send(message);
await waku1.relay.send(TestEncoder, message);
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
expect(receivedMsg.contentTopic).to.eq(TestContentTopic);
expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText);
expect(receivedMsg.timestamp?.valueOf()).to.eq(
messageTimestamp.valueOf()
);
@ -131,108 +133,83 @@ describe("Waku Relay [node only]", () => {
const fooMessageText = "Published on content topic foo";
const barMessageText = "Published on content topic bar";
const fooMessage = await WakuMessage.fromUtf8String(
fooMessageText,
"foo"
);
const barMessage = await WakuMessage.fromUtf8String(
barMessageText,
"bar"
);
const receivedBarMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve, ["bar"]);
}
);
const fooContentTopic = "foo";
const barContentTopic = "bar";
const allMessages: WakuMessage[] = [];
waku2.relay.addObserver((wakuMsg) => {
allMessages.push(wakuMsg);
const fooEncoder = new EncoderV0(fooContentTopic);
const barEncoder = new EncoderV0(barContentTopic);
const fooDecoder = new DecoderV0(fooContentTopic);
const barDecoder = new DecoderV0(barContentTopic);
const fooMessages: Message[] = [];
waku2.relay.addObserver(fooDecoder, (msg) => {
fooMessages.push(msg);
});
await waku1.relay.send(fooMessage);
await waku1.relay.send(barMessage);
const barMessages: Message[] = [];
waku2.relay.addObserver(barDecoder, (msg) => {
barMessages.push(msg);
});
const receivedBarMsg = await receivedBarMsgPromise;
await waku1.relay.send(barEncoder, {
payload: utf8ToBytes(barMessageText),
});
await waku1.relay.send(fooEncoder, {
payload: utf8ToBytes(fooMessageText),
});
expect(receivedBarMsg.contentTopic).to.eq(barMessage.contentTopic);
expect(receivedBarMsg.version).to.eq(barMessage.version);
expect(receivedBarMsg.payloadAsUtf8).to.eq(barMessageText);
expect(allMessages.length).to.eq(2);
expect(allMessages[0].contentTopic).to.eq(fooMessage.contentTopic);
expect(allMessages[0].version).to.eq(fooMessage.version);
expect(allMessages[0].payloadAsUtf8).to.eq(fooMessageText);
expect(allMessages[1].contentTopic).to.eq(barMessage.contentTopic);
expect(allMessages[1].version).to.eq(barMessage.version);
expect(allMessages[1].payloadAsUtf8).to.eq(barMessageText);
await delay(200);
expect(fooMessages[0].contentTopic).to.eq(fooContentTopic);
expect(bytesToUtf8(fooMessages[0].payload!)).to.eq(fooMessageText);
expect(barMessages[0].contentTopic).to.eq(barContentTopic);
expect(bytesToUtf8(barMessages[0].payload!)).to.eq(barMessageText);
expect(fooMessages.length).to.eq(1);
expect(barMessages.length).to.eq(1);
});
it("Decrypt messages", async function () {
this.timeout(10000);
const encryptedAsymmetricMessageText =
"This message is encrypted using asymmetric";
const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto";
const encryptedSymmetricMessageText =
"This message is encrypted using symmetric encryption";
const encryptedSymmetricContentTopic = "/test/1/symmetric/proto";
const asymText = "This message is encrypted using asymmetric";
const asymTopic = "/test/1/asymmetric/proto";
const symText = "This message is encrypted using symmetric encryption";
const symTopic = "/test/1/symmetric/proto";
const privateKey = generatePrivateKey();
const symKey = generateSymmetricKey();
const publicKey = getPublicKey(privateKey);
const [encryptedAsymmetricMessage, encryptedSymmetricMessage] =
await Promise.all([
WakuMessage.fromUtf8String(
encryptedAsymmetricMessageText,
encryptedAsymmetricContentTopic,
{
encPublicKey: publicKey,
}
),
WakuMessage.fromUtf8String(
encryptedSymmetricMessageText,
encryptedSymmetricContentTopic,
{
symKey: symKey,
}
),
]);
const asymEncoder = new AsymEncoder(asymTopic, publicKey);
const symEncoder = new SymEncoder(symTopic, symKey);
waku2.addDecryptionKey(privateKey, {
contentTopics: [encryptedAsymmetricContentTopic],
method: DecryptionMethod.Asymmetric,
});
waku2.addDecryptionKey(symKey, {
contentTopics: [encryptedSymmetricContentTopic],
method: DecryptionMethod.Symmetric,
});
const asymDecoder = new AsymDecoder(asymTopic, privateKey);
const symDecoder = new SymDecoder(symTopic, symKey);
const msgs: WakuMessage[] = [];
waku2.relay.addObserver((wakuMsg) => {
const msgs: Message[] = [];
waku2.relay.addObserver(asymDecoder, (wakuMsg) => {
msgs.push(wakuMsg);
});
waku2.relay.addObserver(symDecoder, (wakuMsg) => {
msgs.push(wakuMsg);
});
await waku1.relay.send(encryptedAsymmetricMessage);
await waku1.relay.send(asymEncoder, { payload: utf8ToBytes(asymText) });
await delay(200);
await waku1.relay.send(encryptedSymmetricMessage);
await waku1.relay.send(symEncoder, { payload: utf8ToBytes(symText) });
while (msgs.length < 2) {
await delay(200);
}
expect(msgs.length).to.eq(2);
expect(msgs[0].contentTopic).to.eq(
encryptedAsymmetricMessage.contentTopic
);
expect(msgs[0].version).to.eq(encryptedAsymmetricMessage.version);
expect(msgs[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(msgs[1].contentTopic).to.eq(
encryptedSymmetricMessage.contentTopic
);
expect(msgs[1].version).to.eq(encryptedSymmetricMessage.version);
expect(msgs[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(msgs[0].contentTopic).to.eq(asymTopic);
expect(bytesToUtf8(msgs[0].payload!)).to.eq(asymText);
expect(msgs[1].contentTopic).to.eq(symTopic);
expect(bytesToUtf8(msgs[1].payload!)).to.eq(symText);
});
it("Delete observer", async function () {
@ -240,22 +217,23 @@ describe("Waku Relay [node only]", () => {
const messageText =
"Published on content topic with added then deleted observer";
const message = await WakuMessage.fromUtf8String(
messageText,
"added-then-deleted-observer"
);
const contentTopic = "added-then-deleted-observer";
// The promise **fails** if we receive a message on this observer.
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
const receivedMsgPromise: Promise<Message> = new Promise(
(resolve, reject) => {
const deleteObserver = waku2.relay.addObserver(reject, [
"added-then-deleted-observer",
]);
const deleteObserver = waku2.relay.addObserver(
new DecoderV0(contentTopic),
reject
);
deleteObserver();
setTimeout(resolve, 500);
}
);
await waku1.relay.send(message);
await waku1.relay.send(new EncoderV0(contentTopic), {
payload: utf8ToBytes(messageText),
});
await receivedMsgPromise;
// If it does not throw then we are good.
@ -312,32 +290,30 @@ describe("Waku Relay [node only]", () => {
]);
const messageText = "Communicating using a custom pubsub topic";
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
const waku2ReceivedMsgPromise: Promise<Message> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve);
waku2.relay.addObserver(TestDecoder, resolve);
}
);
// The promise **fails** if we receive a message on the default
// pubsub topic.
const waku3NoMsgPromise: Promise<WakuMessage> = new Promise(
const waku3NoMsgPromise: Promise<Message> = new Promise(
(resolve, reject) => {
waku3.relay.addObserver(reject);
waku3.relay.addObserver(TestDecoder, reject);
setTimeout(resolve, 1000);
}
);
await waku1.relay.send(message);
await waku1.relay.send(TestEncoder, {
payload: utf8ToBytes(messageText),
});
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
await waku3NoMsgPromise;
expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText);
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
});
});
@ -382,12 +358,7 @@ describe("Waku Relay [node only]", () => {
this.timeout(30000);
const messageText = "This is a message";
const message = await WakuMessage.fromUtf8String(
messageText,
TestContentTopic
);
await delay(1000);
await waku.relay.send(message);
await waku.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) });
let msgs: MessageRpcResponse[] = [];
@ -397,8 +368,8 @@ describe("Waku Relay [node only]", () => {
msgs = await nwaku.messages();
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
expect(msgs[0].version).to.equal(0);
expect(bytesToUtf8(new Uint8Array(msgs[0].payload))).to.equal(
messageText
);
@ -408,24 +379,25 @@ describe("Waku Relay [node only]", () => {
await delay(200);
const messageText = "Here is another message.";
const message = {
payload: utf8ToBytes(messageText),
contentTopic: TestContentTopic,
};
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku.relay.addObserver(resolve);
}
const receivedMsgPromise: Promise<MessageV0> = new Promise((resolve) => {
waku.relay.addObserver(TestDecoder, (msg) =>
resolve(msg as unknown as MessageV0)
);
});
await nwaku.sendMessage(
Nwaku.toMessageRpcQuery({
contentTopic: TestContentTopic,
payload: utf8ToBytes(messageText),
})
);
await nwaku.sendMessage(Nwaku.toMessageRpcQuery(message));
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.contentTopic).to.eq(TestContentTopic);
expect(receivedMsg.version).to.eq(0);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText);
});
describe.skip("Two nodes connected to nwaku", function () {
@ -475,22 +447,19 @@ describe("Waku Relay [node only]", () => {
expect(waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.be.false;
const msgStr = "Hello there!";
const message = await WakuMessage.fromUtf8String(
msgStr,
TestContentTopic
);
const message = { payload: utf8ToBytes(msgStr) };
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
const waku2ReceivedMsgPromise: Promise<Message> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve);
waku2.relay.addObserver(TestDecoder, resolve);
}
);
await waku1.relay.send(message);
await waku1.relay.send(TestEncoder, message);
console.log("Waiting for message");
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(msgStr);
expect(waku2ReceivedMsg.payload).to.eq(msgStr);
});
});
});

View File

@ -8,16 +8,20 @@ import {
TopicStr,
} from "@chainsafe/libp2p-gossipsub/dist/src/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import { PublishResult } from "@libp2p/interface-pubsub";
import debug from "debug";
import { DefaultPubSubTopic } from "../constants";
import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { Decoder, Encoder, Message } from "../interfaces";
import { pushOrInitMapSet } from "../push_or_init_map";
import { DecoderV0 } from "../waku_message/version_0";
import * as constants from "./constants";
const log = debug("waku:relay");
export type Callback = (msg: Message) => void;
export type CreateOptions = {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
@ -33,7 +37,6 @@ export type CreateOptions = {
* @default {@link DefaultPubSubTopic}
*/
pubSubTopic?: string;
decryptionKeys?: Array<Uint8Array | string>;
} & GossipsubOpts;
/**
@ -46,18 +49,11 @@ export class WakuRelay extends GossipSub {
pubSubTopic: string;
public static multicodec: string = constants.RelayCodecs[0];
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
>;
/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
public observers: {
[contentTopic: string]: Set<(message: WakuMessage) => void>;
};
public observers: Map<string, Set<{ decoder: Decoder; callback: Callback }>>;
constructor(options?: Partial<CreateOptions>) {
options = Object.assign(options ?? {}, {
@ -68,14 +64,9 @@ export class WakuRelay extends GossipSub {
super(options);
this.multicodecs = constants.RelayCodecs;
this.observers = {};
this.decryptionKeys = new Map();
this.observers = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
options?.decryptionKeys?.forEach((key) => {
this.addDecryptionKey(key);
});
}
/**
@ -92,102 +83,39 @@ export class WakuRelay extends GossipSub {
/**
* Send Waku message.
*
* @param {WakuMessage} message
* @returns {Promise<void>}
*/
public async send(message: WakuMessage): Promise<void> {
const msg = message.encode();
await this.publish(this.pubSubTopic, msg);
public async send(
encoder: Encoder,
message: Message
): Promise<PublishResult> {
const msg = await encoder.encode(message);
if (!msg) {
log("Failed to encode message, aborting publish");
return { recipients: [] };
}
return this.publish(this.pubSubTopic, msg);
}
/**
* Register a decryption key to attempt decryption of received messages.
* This can either be a private key for asymmetric encryption or a symmetric
* key. `WakuRelay` will attempt to decrypt messages using both methods.
* Add an observer and associated Decoder to process incoming messages on a given content topic.
*
* Strings must be in hex format.
*/
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void {
this.decryptionKeys.set(hexToBytes(key), options ?? {});
}
/**
* Delete a decryption key that was used to attempt decryption of received
* messages.
*
* Strings must be in hex format.
*/
deleteDecryptionKey(key: Uint8Array | string): void {
this.decryptionKeys.delete(hexToBytes(key));
}
/**
* Register an observer of new messages received via waku relay
*
* @param callback called when a new message is received via waku relay
* @param contentTopics Content Topics for which the callback with be called,
* all of them if undefined, [] or ["",..] is passed.
* @returns Function to delete the observer
*/
addObserver(
callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): () => void {
if (contentTopics.length === 0) {
if (!this.observers[""]) {
this.observers[""] = new Set();
}
this.observers[""].add(callback);
} else {
contentTopics.forEach((contentTopic) => {
if (!this.observers[contentTopic]) {
this.observers[contentTopic] = new Set();
}
this.observers[contentTopic].add(callback);
});
}
addObserver(decoder: Decoder, callback: Callback): () => void {
const observer = {
decoder,
callback,
};
pushOrInitMapSet(this.observers, decoder.contentTopic, observer);
return () => {
if (contentTopics.length === 0) {
if (this.observers[""]) {
this.observers[""].delete(callback);
}
} else {
contentTopics.forEach((contentTopic) => {
if (this.observers[contentTopic]) {
this.observers[contentTopic].delete(callback);
}
});
const observers = this.observers.get(decoder.contentTopic);
if (observers) {
observers.delete(observer);
}
};
}
/**
* Remove an observer of new messages received via waku relay.
* Useful to ensure the same observer is not registered several time
* (e.g when loading React components)
*/
deleteObserver(
callback: (message: WakuMessage) => void,
contentTopics: string[] = []
): void {
if (contentTopics.length === 0) {
if (this.observers[""]) {
this.observers[""].delete(callback);
}
} else {
contentTopics.forEach((contentTopic) => {
if (this.observers[contentTopic]) {
this.observers[contentTopic].delete(callback);
}
});
}
}
/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
@ -196,43 +124,37 @@ export class WakuRelay extends GossipSub {
subscribe(pubSubTopic: string): void {
this.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic === pubSubTopic) {
const decryptionParams = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
key,
method,
contentTopics,
};
}
);
async (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return;
log(`Message received on ${pubSubTopic}`);
log(`Message received on ${pubSubTopic}`);
WakuMessage.decode(event.detail.msg.data, decryptionParams)
.then((wakuMsg) => {
if (!wakuMsg) {
log("Failed to decode Waku Message");
return;
}
if (this.observers[""]) {
this.observers[""].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
})
.catch((e) => {
log("Failed to decode Waku Message", e);
});
const decoderV0 = new DecoderV0("");
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
const protoMsg = await decoderV0.decodeProto(event.detail.msg.data);
if (!protoMsg) {
return;
}
const contentTopic = protoMsg.contentTopic;
if (typeof contentTopic === "undefined") {
log("Message does not have a content topic, skipping");
return;
}
const observers = this.observers.get(contentTopic);
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => {
const msg = await decoder.decode(protoMsg);
if (msg) {
callback(msg);
} else {
log("Failed to decode messages on", contentTopic);
}
})
);
}
);

View File

@ -13,17 +13,25 @@ import {
generateSymmetricKey,
getPublicKey,
} from "../crypto";
import type { WakuFull } from "../interfaces";
import { utf8ToBytes } from "../utils";
import type { Message, WakuFull } from "../interfaces";
import { bytesToUtf8, utf8ToBytes } from "../utils";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { Protocols } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { DecoderV0, EncoderV0 } from "../waku_message/version_0.js";
import {
AsymDecoder,
AsymEncoder,
SymDecoder,
SymEncoder,
} from "../waku_message/version_1.js";
import { PageDirection } from "./history_rpc";
const log = debug("waku:test:store");
const TestContentTopic = "/test/1/waku-store/utf8";
const TestEncoder = new EncoderV0(TestContentTopic);
const TestDecoder = new DecoderV0(TestContentTopic);
describe("Waku Store", () => {
let waku: WakuFull;
@ -62,9 +70,9 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: WakuMessage[] = [];
const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([])) {
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
@ -78,7 +86,7 @@ describe("Waku Store", () => {
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === "Message 0";
return bytesToUtf8(msg.payload!) === "Message 0";
});
expect(result).to.not.eq(-1);
});
@ -93,9 +101,9 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: WakuMessage[] = [];
const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([])) {
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
@ -133,8 +141,8 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: WakuMessage[] = [];
await waku.store.queryCallbackOnPromise([], async (msgPromise) => {
const messages: Message[] = [];
await waku.store.queryCallbackOnPromise(TestDecoder, async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
messages.push(msg);
@ -143,7 +151,7 @@ describe("Waku Store", () => {
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === "Message 0";
return bytesToUtf8(msg.payload!) === "Message 0";
});
expect(result).to.not.eq(-1);
});
@ -172,9 +180,9 @@ describe("Waku Store", () => {
await waitForRemotePeer(waku, [Protocols.Store]);
const desiredMsgs = 14;
const messages: WakuMessage[] = [];
const messages: Message[] = [];
await waku.store.queryCallbackOnPromise(
[],
TestDecoder,
async (msgPromise) => {
const msg = await msgPromise;
if (msg) {
@ -210,9 +218,9 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: WakuMessage[] = [];
const messages: Message[] = [];
await waku.store.queryOrderedCallback(
[],
TestDecoder,
async (msg) => {
messages.push(msg);
},
@ -225,7 +233,7 @@ describe("Waku Store", () => {
for (let index = 0; index < totalMsgs; index++) {
expect(
messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === `Message ${index}`;
return bytesToUtf8(msg.payload!) === `Message ${index}`;
})
).to.eq(index);
}
@ -253,9 +261,9 @@ describe("Waku Store", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
let messages: WakuMessage[] = [];
let messages: Message[] = [];
await waku.store.queryOrderedCallback(
[],
TestDecoder,
async (msg) => {
messages.push(msg);
},
@ -270,7 +278,7 @@ describe("Waku Store", () => {
for (let index = 0; index < totalMsgs; index++) {
expect(
messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === `Message ${index}`;
return bytesToUtf8(msg.payload!) === `Message ${index}`;
})
).to.eq(index);
}
@ -279,60 +287,45 @@ describe("Waku Store", () => {
it("Generator, with asymmetric & symmetric encrypted messages", async function () {
this.timeout(15_000);
const encryptedAsymmetricMessageText =
"This message is encrypted for me using asymmetric";
const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto";
const encryptedSymmetricMessageText =
const asymText = "This message is encrypted for me using asymmetric";
const asymTopic = "/test/1/asymmetric/proto";
const symText =
"This message is encrypted for me using symmetric encryption";
const encryptedSymmetricContentTopic = "/test/1/symmetric/proto";
const clearMessageText =
"This is a clear text message for everyone to read";
const otherEncMessageText =
const symTopic = "/test/1/symmetric/proto";
const clearText = "This is a clear text message for everyone to read";
const otherText =
"This message is not for and I must not be able to read it";
const timestamp = new Date();
const asymMsg = { payload: utf8ToBytes(asymText), timestamp };
const symMsg = {
payload: utf8ToBytes(symText),
timestamp: new Date(timestamp.valueOf() + 1),
};
const clearMsg = {
payload: utf8ToBytes(clearText),
timestamp: new Date(timestamp.valueOf() + 2),
};
const otherMsg = {
payload: utf8ToBytes(otherText),
timestamp: new Date(timestamp.valueOf() + 3),
};
const privateKey = generatePrivateKey();
const symKey = generateSymmetricKey();
const publicKey = getPublicKey(privateKey);
const timestamp = new Date();
const [
encryptedAsymmetricMessage,
encryptedSymmetricMessage,
clearMessage,
otherEncMessage,
] = await Promise.all([
WakuMessage.fromUtf8String(
encryptedAsymmetricMessageText,
encryptedAsymmetricContentTopic,
{
encPublicKey: publicKey,
timestamp,
}
),
WakuMessage.fromUtf8String(
encryptedSymmetricMessageText,
encryptedSymmetricContentTopic,
{
symKey: symKey,
timestamp: new Date(timestamp.valueOf() + 1),
}
),
WakuMessage.fromUtf8String(
clearMessageText,
encryptedAsymmetricContentTopic,
{ timestamp: new Date(timestamp.valueOf() + 2) }
),
WakuMessage.fromUtf8String(
otherEncMessageText,
encryptedSymmetricContentTopic,
{
encPublicKey: getPublicKey(generatePrivateKey()),
timestamp: new Date(timestamp.valueOf() + 3),
}
),
]);
const asymEncoder = new AsymEncoder(asymTopic, publicKey);
const symEncoder = new SymEncoder(symTopic, symKey);
log("Messages have been encrypted");
const otherEncoder = new AsymEncoder(
TestContentTopic,
getPublicKey(generatePrivateKey())
);
const asymDecoder = new AsymDecoder(asymTopic, privateKey);
const symDecoder = new SymDecoder(symTopic, symKey);
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
createFullNode({
@ -357,25 +350,18 @@ describe("Waku Store", () => {
log("Sending messages using light push");
await Promise.all([
waku1.lightPush.push(encryptedAsymmetricMessage),
waku1.lightPush.push(encryptedSymmetricMessage),
waku1.lightPush.push(otherEncMessage),
waku1.lightPush.push(clearMessage),
waku1.lightPush.push(asymEncoder, asymMsg),
waku1.lightPush.push(symEncoder, symMsg),
waku1.lightPush.push(otherEncoder, otherMsg),
waku1.lightPush.push(TestEncoder, clearMsg),
]);
await waitForRemotePeer(waku2, [Protocols.Store]);
waku2.addDecryptionKey(symKey, {
contentTopics: [encryptedSymmetricContentTopic],
method: DecryptionMethod.Symmetric,
});
const messages: WakuMessage[] = [];
const messages: Message[] = [];
log("Retrieve messages from store");
for await (const msgPromises of waku2.store.queryGenerator([], {
decryptionParams: [{ key: privateKey }],
})) {
for await (const msgPromises of waku2.store.queryGenerator(asymDecoder)) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
@ -384,25 +370,30 @@ describe("Waku Store", () => {
}
}
expect(messages?.length).eq(3);
if (!messages) throw "Length was tested";
// Messages are ordered from oldest to latest within a page (1 page query)
expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(clearMessageText);
for (const text of [
encryptedAsymmetricMessageText,
encryptedSymmetricMessageText,
clearMessageText,
]) {
expect(
messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === text;
})
).to.not.eq(-1);
for await (const msgPromises of waku2.store.queryGenerator(symDecoder)) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
for await (const msgPromises of waku2.store.queryGenerator(TestDecoder)) {
for (const promise of msgPromises) {
const msg = await promise;
if (msg) {
messages.push(msg);
}
}
}
// Messages are ordered from oldest to latest within a page (1 page query)
expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText);
expect(bytesToUtf8(messages[1].payload!)).to.eq(symText);
expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText);
expect(messages?.length).eq(3);
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
});
@ -450,9 +441,9 @@ describe("Waku Store", () => {
const nwakuPeerId = await nwaku.getPeerId();
const firstMessages: WakuMessage[] = [];
const firstMessages: Message[] = [];
await waku.store.queryOrderedCallback(
[],
TestDecoder,
(msg) => {
if (msg) {
firstMessages.push(msg);
@ -464,9 +455,9 @@ describe("Waku Store", () => {
}
);
const bothMessages: WakuMessage[] = [];
const bothMessages: Message[] = [];
await waku.store.queryOrderedCallback(
[],
TestDecoder,
async (msg) => {
bothMessages.push(msg);
},
@ -481,7 +472,7 @@ describe("Waku Store", () => {
expect(firstMessages?.length).eq(1);
expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0");
expect(bytesToUtf8(firstMessages[0].payload!)).eq("Message 0");
expect(bothMessages?.length).eq(2);
});
@ -531,9 +522,9 @@ describe("Waku Store, custom pubsub topic", () => {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const messages: WakuMessage[] = [];
const messages: Message[] = [];
let promises: Promise<void>[] = [];
for await (const msgPromises of waku.store.queryGenerator([])) {
for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) {
const _promises = msgPromises.map(async (promise) => {
const msg = await promise;
if (msg) {
@ -547,7 +538,7 @@ describe("Waku Store, custom pubsub topic", () => {
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === "Message 0";
return bytesToUtf8(msg.payload!) === "Message 0";
});
expect(result).to.not.eq(-1);
});

View File

@ -11,14 +11,9 @@ import { Uint8ArrayList } from "uint8arraylist";
import * as protoV2Beta4 from "../../proto/store_v2beta4";
import { HistoryResponse } from "../../proto/store_v2beta4";
import { DefaultPubSubTopic, StoreCodecs } from "../constants";
import { Decoder, Message } from "../interfaces";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer";
import { hexToBytes } from "../utils";
import {
DecryptionMethod,
DecryptionParams,
WakuMessage,
} from "../waku_message";
import { HistoryRPC, PageDirection, Params } from "./history_rpc";
@ -78,13 +73,6 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Keys that will be used to decrypt messages.
*
* It can be Asymmetric Private Keys and Symmetric Keys in the same array,
* all keys will be tried with both methods.
*/
decryptionParams?: DecryptionParams[];
}
/**
@ -94,15 +82,9 @@ export interface QueryOptions {
*/
export class WakuStore {
pubSubTopic: string;
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
>;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.decryptionKeys = new Map();
}
/**
@ -122,14 +104,12 @@ export class WakuStore {
* or if an error is encountered when processing the reply.
*/
async queryOrderedCallback(
contentTopics: string[],
callback: (
message: WakuMessage
) => Promise<void | boolean> | boolean | void,
decoder: Decoder,
callback: (message: Message) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
const abort = false;
for await (const promises of this.queryGenerator(contentTopics, options)) {
for await (const promises of this.queryGenerator(decoder, options)) {
if (abort) break;
let messages = await Promise.all(promises);
@ -172,15 +152,15 @@ export class WakuStore {
* or if an error is encountered when processing the reply.
*/
async queryCallbackOnPromise(
contentTopics: string[],
decoder: Decoder,
callback: (
message: Promise<WakuMessage | undefined>
message: Promise<Message | undefined>
) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
let abort = false;
let promises: Promise<void>[] = [];
for await (const page of this.queryGenerator(contentTopics, options)) {
for await (const page of this.queryGenerator(decoder, options)) {
const _promises = page.map(async (msg) => {
if (!abort) {
abort = Boolean(await callback(msg));
@ -209,9 +189,9 @@ export class WakuStore {
* or if an error is encountered when processing the reply.
*/
async *queryGenerator(
contentTopics: string[],
decoder: Decoder,
options?: QueryOptions
): AsyncGenerator<Promise<WakuMessage | undefined>[]> {
): AsyncGenerator<Promise<Message | undefined>[]> {
let startTime, endTime;
if (options?.timeFilter) {
@ -219,6 +199,8 @@ export class WakuStore {
endTime = options.timeFilter.endTime;
}
const contentTopic = decoder.contentTopic;
const queryOpts = Object.assign(
{
pubSubTopic: this.pubSubTopic,
@ -226,7 +208,7 @@ export class WakuStore {
pageSize: DefaultPageSize,
},
options,
{ contentTopics, startTime, endTime }
{ contentTopics: [contentTopic], startTime, endTime }
);
log("Querying history with the following options", {
@ -250,57 +232,16 @@ export class WakuStore {
if (!connection) throw "Failed to get a connection to the peer";
let decryptionParams: DecryptionParams[] = [];
this.decryptionKeys.forEach(({ method, contentTopics }, key) => {
decryptionParams.push({
key,
method,
contentTopics,
});
});
// Add the decryption keys passed to this function against the
// content topics also passed to this function.
if (options?.decryptionParams) {
decryptionParams = decryptionParams.concat(options.decryptionParams);
}
for await (const messages of paginate(
connection,
protocol,
queryOpts,
decryptionParams
decoder
)) {
yield messages;
}
}
/**
* Register a decryption key to attempt decryption of messages received in any
* subsequent query call. This can either be a private key for
* asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to
* decrypt messages using both methods.
*
* Strings must be in hex format.
*/
addDecryptionKey(
key: Uint8Array | string,
options?: { method?: DecryptionMethod; contentTopics?: string[] }
): void {
this.decryptionKeys.set(hexToBytes(key), options ?? {});
}
/**cursorV2Beta4
* Delete a decryption key that was used to attempt decryption of messages
* received in subsequent query calls.
*
* Strings must be in hex format.
*/
deleteDecryptionKey(key: Uint8Array | string): void {
this.decryptionKeys.delete(hexToBytes(key));
}
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* store protocol. Waku may or may not be currently connected to these peers.
@ -319,8 +260,8 @@ async function* paginate(
connection: Connection,
protocol: string,
queryOpts: Params,
decryptionParams: DecryptionParams[]
): AsyncGenerator<Promise<WakuMessage | undefined>[]> {
decoder: Decoder
): AsyncGenerator<Promise<Message | undefined>[]> {
let cursor = undefined;
while (true) {
queryOpts = Object.assign(queryOpts, { cursor });
@ -373,9 +314,7 @@ async function* paginate(
log(`${response.messages.length} messages retrieved from store`);
yield response.messages.map((protoMsg) =>
WakuMessage.decodeProto(protoMsg, decryptionParams)
);
yield response.messages.map((protoMsg) => decoder.decode(protoMsg));
cursor = response.pagingInfo?.cursor;
if (typeof cursor === "undefined") {
@ -401,7 +340,7 @@ async function* paginate(
}
export const isWakuMessageDefined = (
msg: WakuMessage | undefined
): msg is WakuMessage => {
msg: Message | undefined
): msg is Message => {
return !!msg;
};

View File

@ -6,7 +6,9 @@
"./src/lib/peer_discovery_dns.ts",
"./src/lib/peer_discovery_static_list.ts",
"./src/lib/predefined_bootstrap_nodes.ts",
"./src/lib/wait_for_remote_peer.ts"
"./src/lib/wait_for_remote_peer.ts",
"./src/lib/waku_message/version_0.ts",
"./src/lib/waku_message/version_1.ts"
],
"out": "build/docs",
"exclude": ["**/*.spec.ts"],