Add version 1 support to WakuMessage

This commit is contained in:
Franck Royer 2021-07-07 11:23:56 +10:00
parent 2266f31d30
commit 34e6ac5247
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
12 changed files with 254 additions and 71 deletions

View File

@ -9,6 +9,7 @@
"bitauth",
"bufbuild",
"cimg",
"ciphertext",
"circleci",
"codecov",
"commitlint",
@ -19,6 +20,7 @@
"Dscore",
"ecies",
"editorconfig",
"ephem",
"esnext",
"ethersproject",
"execa",

View File

@ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Upgrade to `libp2p@0.31.7` and `libp2p-gossipsub@0.10.0` to avoid `TextEncoder` errors in ReactJS tests.
- Disable keep alive by default as latest nim-waku release does not support ping protocol.
- **Breaking**: Optional parameters for `WakuMessage.fromBytes` and `WakuMessage.fromUtf8String` are now passed in a single `Options` object.
- **Breaking**: `WakuMessage` static functions are now async to allow for encryption and decryption.
- **Breaking**: `WakuMessage` constructor is now private, `from*` and `decode*` function should be used.
- `WakuMessage` version 1 is partially supported, enabling asymmetrical encryption and signature of messages;
this can be done by passing keys to `WakuMessage.from*` and `WakuMessage.decode*` methods.
Note: this is not yet compatible with nim-waku.
### Fixed
- Disable `keepAlive` if set to `0`.

View File

@ -33,7 +33,7 @@ describe('Waku Light Push', () => {
});
const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(message);
expect(pushResponse?.isSuccess).to.be.true;
@ -73,7 +73,7 @@ describe('Waku Light Push', () => {
const nimPeerId = await nimWaku.getPeerId();
const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(message, {
peerId: nimPeerId,

View File

@ -1,29 +1,77 @@
import { expect } from 'chai';
import fc from 'fast-check';
import { getPublicKey } from './version_1';
import { WakuMessage } from './index';
describe('Waku Message', function () {
it('Waku message round trip binary serialization', function () {
fc.assert(
fc.property(fc.string(), (s) => {
const msg = WakuMessage.fromUtf8String(s);
it('Waku message round trip binary serialization [clear]', async function () {
await fc.assert(
fc.asyncProperty(fc.string(), async (s) => {
const msg = await WakuMessage.fromUtf8String(s);
const binary = msg.encode();
const actual = WakuMessage.decode(binary);
const actual = await WakuMessage.decode(binary);
expect(actual).to.deep.equal(msg);
})
);
});
it('Payload to utf-8', function () {
fc.assert(
fc.property(fc.string(), (s) => {
const msg = WakuMessage.fromUtf8String(s);
it('Payload to utf-8', async function () {
await fc.assert(
fc.asyncProperty(fc.string(), async (s) => {
const msg = await WakuMessage.fromUtf8String(s);
const utf8 = msg.payloadAsUtf8;
return utf8 === s;
})
);
});
it('Waku message round trip binary encryption [asymmetric, no signature]', async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ minLength: 32, maxLength: 32 }),
async (payload, privKey) => {
const publicKey = getPublicKey(privKey);
const msg = await WakuMessage.fromBytes(payload, {
encPublicKey: publicKey,
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, privKey);
expect(actual?.payload).to.deep.equal(payload);
}
)
);
});
it('Waku message round trip binary encryption [asymmetric, signature]', async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ minLength: 32, maxLength: 32 }),
fc.uint8Array({ minLength: 32, maxLength: 32 }),
async (payload, sigPrivKey, encPrivKey) => {
const sigPubKey = getPublicKey(sigPrivKey);
const encPubKey = getPublicKey(encPrivKey);
const msg = await WakuMessage.fromBytes(payload, {
encPublicKey: encPubKey,
sigPrivKey: sigPrivKey,
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, encPrivKey);
expect(actual?.payload).to.deep.equal(payload);
expect(actual?.signaturePublicKey).to.deep.equal(sigPubKey);
}
)
);
});
});

View File

@ -1,47 +1,122 @@
// Ensure that this class matches the proto interface while
import { Buffer } from 'buffer';
import { Reader } from 'protobufjs/minimal';
// Protecting the user from protobuf oddities
import * as proto from '../../proto/waku/v2/message';
import * as version_1 from './version_1';
export const DefaultContentTopic = '/waku/2/default-content/proto';
const DefaultVersion = 0;
export interface Options {
contentTopic?: string;
timestamp?: Date;
encPublicKey?: Uint8Array;
sigPrivKey?: Uint8Array;
}
export class WakuMessage {
public constructor(public proto: proto.WakuMessage) {}
private constructor(
public proto: proto.WakuMessage,
private _signaturePublicKey?: Uint8Array,
private _signature?: Uint8Array
) {}
/**
* Create Message with a utf-8 string as payload.
*/
static fromUtf8String(utf8: string, opts?: Options): WakuMessage {
static async fromUtf8String(
utf8: string,
opts?: Options
): Promise<WakuMessage> {
const payload = Buffer.from(utf8, 'utf-8');
return WakuMessage.fromBytes(payload, opts);
}
/**
* Create Message with a byte array as payload.
* Create a Waku Message with the given payload.
*
* By default, the payload is kept clear (version 0).
* If `opts.encPublicKey` is passed, the payload is encrypted using
* asymmetric encryption (version 1).
*
* If `opts.sigPrivKey` is passed and version 1 is used, the payload is signed
* before encryption.
*/
static fromBytes(payload: Uint8Array, opts?: Options): WakuMessage {
const { timestamp, contentTopic } = Object.assign(
static async fromBytes(
payload: Uint8Array,
opts?: Options
): Promise<WakuMessage> {
const { timestamp, contentTopic, encPublicKey, sigPrivKey } = Object.assign(
{ timestamp: new Date(), contentTopic: DefaultContentTopic },
opts ? opts : {}
);
return new WakuMessage({
payload,
timestamp: timestamp.valueOf() / 1000,
version: DefaultVersion,
contentTopic,
});
let _payload = payload;
let version = DefaultVersion;
let sig;
if (encPublicKey) {
const enc = version_1.clearEncode(_payload, sigPrivKey);
_payload = await version_1.encryptAsymmetric(enc.payload, encPublicKey);
sig = enc.sig;
version = 1;
}
return new WakuMessage(
{
payload: _payload,
timestamp: timestamp.valueOf() / 1000,
version,
contentTopic,
},
sig?.publicKey,
sig?.signature
);
}
static decode(bytes: Uint8Array): WakuMessage {
const wakuMsg = proto.WakuMessage.decode(Reader.create(bytes));
return new WakuMessage(wakuMsg);
/**
* Decode a byte array into Waku Message.
*
* If the payload is encrypted, then `decPrivateKey` is used for decryption.
*/
static async decode(
bytes: Uint8Array,
decPrivateKey?: Uint8Array
): Promise<WakuMessage | undefined> {
const protoBuf = proto.WakuMessage.decode(Reader.create(bytes));
return WakuMessage.decodeProto(protoBuf, decPrivateKey);
}
/**
* Decode a Waku Message Protobuf Object into Waku Message.
*
* If the payload is encrypted, then `decPrivateKey` is used for decryption.
*/
static async decodeProto(
protoBuf: proto.WakuMessage,
decPrivateKey?: Uint8Array
): Promise<WakuMessage | undefined> {
let signaturePublicKey;
let signature;
if (protoBuf.version === 1 && protoBuf.payload) {
if (!decPrivateKey) return;
const dec = await version_1.decryptAsymmetric(
protoBuf.payload,
decPrivateKey
);
const res = await version_1.clearDecode(dec);
if (!res) return;
Object.assign(protoBuf, { payload: res.payload });
signaturePublicKey = res.sig?.publicKey;
signature = res.sig?.signature;
}
return new WakuMessage(protoBuf, signaturePublicKey, signature);
}
encode(): Uint8Array {
@ -78,4 +153,22 @@ export class WakuMessage {
}
return;
}
/**
* The public key used to sign the message.
*
* MAY be present if the message is version 1.
*/
get signaturePublicKey(): Uint8Array | undefined {
return this._signaturePublicKey;
}
/**
* The signature of the message.
*
* MAY be present if the message is version 1.
*/
get signature(): Uint8Array | undefined {
return this._signature;
}
}

View File

@ -17,12 +17,22 @@ describe('Waku Message Version 1', function () {
fc.uint8Array({ minLength: 32, maxLength: 32 }),
(message, privKey) => {
const enc = clearEncode(message, privKey);
const res = clearDecode(enc);
const res = clearDecode(enc.payload);
const pubKey = getPublicKey(privKey);
expect(res?.payload).deep.equal(message);
expect(res?.sig?.publicKey).deep.equal(pubKey);
expect(res?.payload).deep.equal(
message,
'Payload was not encrypted then decrypted correctly'
);
expect(res?.sig?.publicKey).deep.equal(
pubKey,
'signature Public key was not recovered from encrypted then decrypted signature'
);
expect(enc?.sig?.publicKey).deep.equal(
pubKey,
'Incorrect signature public key was returned when signing the payload'
);
}
)
);
@ -31,7 +41,7 @@ describe('Waku Message Version 1', function () {
it('Asymmetric encrypt & Decrypt', async function () {
await fc.assert(
fc.asyncProperty(
fc.uint8Array({ minLength: 2 }),
fc.uint8Array({ minLength: 1 }),
fc.uint8Array({ minLength: 32, maxLength: 32 }),
async (message, privKey) => {
const publicKey = getPublicKey(privKey);

View File

@ -25,7 +25,7 @@ const SignatureLength = 65;
export function clearEncode(
messagePayload: Uint8Array,
sigPrivKey?: Uint8Array
): Uint8Array {
): { payload: Uint8Array; sig?: Signature } {
let envelope = Buffer.from([0]); // No flags
envelope = addPayloadSizeField(envelope, messagePayload);
envelope = Buffer.concat([envelope, messagePayload]);
@ -50,22 +50,24 @@ export function clearEncode(
envelope = Buffer.concat([envelope, pad]);
let sig;
if (sigPrivKey) {
envelope[0] |= IsSignedMask;
const hash = keccak256(envelope);
const s = secp256k1.ecdsaSign(hexToBuf(hash), sigPrivKey);
envelope = Buffer.concat([envelope, s.signature, Buffer.from([s.recid])]);
sig = {
signature: Buffer.from(s.signature),
publicKey: secp256k1.publicKeyCreate(sigPrivKey, false),
};
}
return envelope;
return { payload: envelope, sig };
}
export type DecodeResult = {
payload: Uint8Array;
sig?: {
signature: Uint8Array;
publicKey: Uint8Array;
};
export type Signature = {
signature: Uint8Array;
publicKey: Uint8Array;
};
/**
@ -75,7 +77,7 @@ export type DecodeResult = {
*/
export function clearDecode(
message: Uint8Array | Buffer
): DecodeResult | undefined {
): { payload: Uint8Array; sig?: Signature } | undefined {
const buf = Buffer.from(message);
let start = 1;
let sig;

View File

@ -79,7 +79,7 @@ describe('Waku Relay', () => {
const messageText = 'JS to JS communication works';
const messageTimestamp = new Date('1995-12-17T03:24:00');
const message = WakuMessage.fromUtf8String(messageText, {
const message = await WakuMessage.fromUtf8String(messageText, {
timestamp: messageTimestamp,
});
@ -106,10 +106,10 @@ describe('Waku Relay', () => {
const fooMessageText = 'Published on content topic foo';
const barMessageText = 'Published on content topic bar';
const fooMessage = WakuMessage.fromUtf8String(fooMessageText, {
const fooMessage = await WakuMessage.fromUtf8String(fooMessageText, {
contentTopic: 'foo',
});
const barMessage = WakuMessage.fromUtf8String(barMessageText, {
const barMessage = await WakuMessage.fromUtf8String(barMessageText, {
contentTopic: 'bar',
});
@ -146,7 +146,7 @@ describe('Waku Relay', () => {
const messageText =
'Published on content topic with added then deleted observer';
const message = WakuMessage.fromUtf8String(messageText, {
const message = await WakuMessage.fromUtf8String(messageText, {
contentTopic: 'added-then-deleted-observer',
});
@ -205,7 +205,7 @@ describe('Waku Relay', () => {
]);
const messageText = 'Communicating using a custom pubsub topic';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
@ -276,7 +276,7 @@ describe('Waku Relay', () => {
this.timeout(5000);
const messageText = 'This is a message';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
await waku.relay.send(message);
@ -295,7 +295,7 @@ describe('Waku Relay', () => {
it('Nim publishes to js', async function () {
this.timeout(5000);
const messageText = 'Here is another message.';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
@ -361,7 +361,7 @@ describe('Waku Relay', () => {
this.timeout(30000);
const messageText = 'This is a message';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
await delay(1000);
await waku.relay.send(message);
@ -382,7 +382,7 @@ describe('Waku Relay', () => {
await delay(200);
const messageText = 'Here is another message.';
const message = WakuMessage.fromUtf8String(messageText);
const message = await WakuMessage.fromUtf8String(messageText);
const receivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
@ -464,7 +464,7 @@ describe('Waku Relay', () => {
).to.be.false;
const msgStr = 'Hello there!';
const message = WakuMessage.fromUtf8String(msgStr);
const message = await WakuMessage.fromUtf8String(msgStr);
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {

View File

@ -185,19 +185,22 @@ export class WakuRelay extends Gossipsub {
*/
subscribe(pubsubTopic: string): void {
this.on(pubsubTopic, (event) => {
const wakuMsg = WakuMessage.decode(event.data);
if (this.observers['']) {
this.observers[''].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
WakuMessage.decode(event.data).then((wakuMsg) => {
if (!wakuMsg) return;
if (this.observers['']) {
this.observers[''].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
});
});
super.subscribe(pubsubTopic);

View File

@ -24,7 +24,9 @@ describe('Waku Store', () => {
for (let i = 0; i < 2; i++) {
expect(
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`))
await nimWaku.sendMessage(
await WakuMessage.fromUtf8String(`Message ${i}`)
)
).to.be.true;
}
@ -58,7 +60,9 @@ describe('Waku Store', () => {
for (let i = 0; i < 15; i++) {
expect(
await nimWaku.sendMessage(WakuMessage.fromUtf8String(`Message ${i}`))
await nimWaku.sendMessage(
await WakuMessage.fromUtf8String(`Message ${i}`)
)
).to.be.true;
}
@ -98,7 +102,7 @@ describe('Waku Store', () => {
for (let i = 0; i < 2; i++) {
expect(
await nimWaku.sendMessage(
WakuMessage.fromUtf8String(`Message ${i}`),
await WakuMessage.fromUtf8String(`Message ${i}`),
customPubSubTopic
)
).to.be.true;

View File

@ -114,19 +114,24 @@ export class WakuStore {
return messages;
}
const pageMessages = response.messages.map((protoMsg) => {
return new WakuMessage(protoMsg);
});
const pageMessages: WakuMessage[] = [];
await Promise.all(
response.messages.map(async (protoMsg) => {
const msg = await WakuMessage.decodeProto(protoMsg);
if (msg) {
messages.push(msg);
pageMessages.push(msg);
}
})
);
if (opts.callback) {
// TODO: Test the callback feature
// TODO: Change callback to take individual messages
opts.callback(pageMessages);
}
pageMessages.forEach((wakuMessage) => {
messages.push(wakuMessage);
});
const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if (

View File

@ -180,9 +180,20 @@ export class NimWaku {
async messages(): Promise<WakuMessage[]> {
this.checkProcess();
return this.rpcCall<proto.WakuMessage[]>('get_waku_v2_relay_v1_messages', [
DefaultPubsubTopic,
]).then((msgs) => msgs.map((protoMsg) => new WakuMessage(protoMsg)));
const isDefined = (msg: WakuMessage | undefined): msg is WakuMessage => {
return !!msg;
};
const protoMsgs = await this.rpcCall<proto.WakuMessage[]>(
'get_waku_v2_relay_v1_messages',
[DefaultPubsubTopic]
);
const msgs = await Promise.all(
protoMsgs.map(async (protoMsg) => await WakuMessage.decodeProto(protoMsg))
);
return msgs.filter(isDefined);
}
async getPeerId(): Promise<PeerId> {