Add version 1 support to waku relay, test decryption against nim-waku

This commit is contained in:
Franck Royer 2021-07-09 10:07:39 +10:00
parent acdc032253
commit f95d9aec3c
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 209 additions and 24 deletions

View File

@ -62,6 +62,7 @@
"rlnrelay",
"sandboxed",
"secio",
"seckey",
"secp",
"staticnode",
"statusim",

View File

@ -1,9 +1,23 @@
import { expect } from 'chai';
import debug from 'debug';
import fc from 'fast-check';
import TCP from 'libp2p-tcp';
import {
makeLogFileName,
NimWaku,
NOISE_KEY_1,
WakuRelayMessage,
} from '../../test_utils';
import { delay } from '../delay';
import { hexToBuf } from '../utils';
import { Waku } from '../waku';
import { getPublicKey } from './version_1';
import { WakuMessage } from './index';
import { DefaultContentTopic, WakuMessage } from './index';
const dbg = debug('waku:test:message');
describe('Waku Message', function () {
it('Waku message round trip binary serialization [clear]', async function () {
@ -42,7 +56,7 @@ describe('Waku Message', function () {
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, privKey);
const actual = await WakuMessage.decode(wireBytes, [privKey]);
expect(actual?.payload).to.deep.equal(payload);
}
@ -66,7 +80,7 @@ describe('Waku Message', function () {
});
const wireBytes = msg.encode();
const actual = await WakuMessage.decode(wireBytes, encPrivKey);
const actual = await WakuMessage.decode(wireBytes, [encPrivKey]);
expect(actual?.payload).to.deep.equal(payload);
expect(actual?.signaturePublicKey).to.deep.equal(sigPubKey);
@ -75,3 +89,63 @@ describe('Waku Message', function () {
);
});
});
describe('Interop: Nim', function () {
let waku: Waku;
let nimWaku: NimWaku;
beforeEach(async function () {
this.timeout(30_000);
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] },
modules: { transport: [TCP] },
},
});
const multiAddrWithId = waku.getLocalMultiaddrWithID();
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ staticnode: multiAddrWithId, rpcPrivate: true });
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
});
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
});
it('JS decrypts nim message [asymmetric, no signature]', async function () {
this.timeout(10000);
await delay(200);
const messageText = 'Here is an encrypted message.';
const message: WakuRelayMessage = {
contentTopic: DefaultContentTopic,
payload: Buffer.from(messageText, 'utf-8').toString('hex'),
};
const keyPair = await nimWaku.getAsymmetricKeyPair();
const privateKey = hexToBuf(keyPair.privateKey);
const publicKey = hexToBuf(keyPair.publicKey);
waku.relay.addDecryptionPrivateKey(privateKey);
const receivedMsgPromise: Promise<WakuMessage> = new Promise((resolve) => {
waku.relay.addObserver(resolve);
});
dbg('Post message');
await nimWaku.postAsymmetricMessage(message, publicKey);
const receivedMsg = await receivedMsgPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(1);
expect(receivedMsg.payloadAsUtf8).to.eq(messageText);
});
});

View File

@ -1,6 +1,7 @@
// Ensure that this class matches the proto interface while
import { Buffer } from 'buffer';
import debug from 'debug';
import { Reader } from 'protobufjs/minimal';
// Protecting the user from protobuf oddities
@ -10,6 +11,7 @@ import * as version_1 from './version_1';
export const DefaultContentTopic = '/waku/2/default-content/proto';
const DefaultVersion = 0;
const dbg = debug('waku:message');
export interface Options {
contentTopic?: string;
@ -84,11 +86,11 @@ export class WakuMessage {
*/
static async decode(
bytes: Uint8Array,
decPrivateKey?: Uint8Array
decPrivateKeys?: Uint8Array[]
): Promise<WakuMessage | undefined> {
const protoBuf = proto.WakuMessage.decode(Reader.create(bytes));
return WakuMessage.decodeProto(protoBuf, decPrivateKey);
return WakuMessage.decodeProto(protoBuf, decPrivateKeys);
}
/**
@ -98,19 +100,52 @@ export class WakuMessage {
*/
static async decodeProto(
protoBuf: proto.WakuMessage,
decPrivateKey?: Uint8Array
decPrivateKeys?: Uint8Array[]
): Promise<WakuMessage | undefined> {
if (protoBuf.payload === undefined) {
dbg('Payload is undefined');
return;
}
const payload = protoBuf.payload;
let signaturePublicKey;
let signature;
if (protoBuf.version === 1 && protoBuf.payload) {
if (!decPrivateKey) return;
if (decPrivateKeys === undefined) {
dbg('Payload is encrypted but no private keys have been provided.');
const dec = await version_1.decryptAsymmetric(
protoBuf.payload,
decPrivateKey
return;
}
// Returns a bunch of `undefined` and hopefully one decrypted result
const allResults = await Promise.all(
decPrivateKeys.map(async (privateKey) => {
try {
return await version_1.decryptAsymmetric(payload, privateKey);
} catch (e) {
dbg('Failed to decrypt asymmetric message', e);
return;
}
})
);
const isDefined = (dec: Uint8Array | undefined): dec is Uint8Array => {
return !!dec;
};
const decodedResults = allResults.filter(isDefined);
if (decodedResults.length === 0) {
dbg('Failed to decrypt payload.');
return;
}
const dec = decodedResults[0];
const res = await version_1.clearDecode(dec);
if (!res) return;
if (!res) {
dbg('Failed to decode payload.');
return;
}
Object.assign(protoBuf, { payload: res.payload });
signaturePublicKey = res.sig?.publicKey;
signature = res.sig?.signature;

View File

@ -1,3 +1,4 @@
import debug from 'debug';
import Libp2p from 'libp2p';
import Gossipsub from 'libp2p-gossipsub';
import { AddrInfo, MessageIdFunction } from 'libp2p-gossipsub/src/interfaces';
@ -24,6 +25,8 @@ import { DefaultPubsubTopic, RelayCodec } from './constants';
import { getRelayPeers } from './get_relay_peers';
import { RelayHeartbeat } from './relay_heartbeat';
const dbg = debug('waku:relay');
export { RelayCodec, DefaultPubsubTopic };
/**
@ -60,9 +63,15 @@ export interface GossipOptions {
export class WakuRelay extends Gossipsub {
heartbeat: RelayHeartbeat;
pubsubTopic: string;
/**
* Decryption private keys to use to attempt decryption of incoming messages.
*/
public decPrivateKeys: Set<Uint8Array>;
/**
* observers called when receiving new message.
* Observers under key "" are always called.
* Observers under key `""` are always called.
*/
public observers: {
[contentTopic: string]: Set<(message: WakuMessage) => void>;
@ -82,6 +91,7 @@ export class WakuRelay extends Gossipsub {
this.heartbeat = new RelayHeartbeat(this);
this.observers = {};
this.decPrivateKeys = new Set();
const multicodecs = [constants.RelayCodec];
@ -113,12 +123,29 @@ export class WakuRelay extends Gossipsub {
await super.publish(this.pubsubTopic, Buffer.from(msg));
}
/**
* Register a decryption private key to attempt decryption of messages of
* the given content topic.
*/
addDecryptionPrivateKey(privateKey: Uint8Array): void {
this.decPrivateKeys.add(privateKey);
}
/**
* Delete a decryption private key to attempt decryption of messages of
* the given content topic.
*/
deleteDecryptionPrivateKey(privateKey: Uint8Array): void {
this.decPrivateKeys.delete(privateKey);
}
/**
* Register an observer of new messages received via waku relay
*
* @param callback called when a new message is received via waku relay
* @param contentTopics Content Topics for which the callback with be called,
* all of them if undefined, [] or ["",..] is passed.
* @param decPrivateKeys Private keys used to decrypt incoming Waku Messages.
* @returns {void}
*/
addObserver(
@ -181,22 +208,30 @@ export class WakuRelay extends Gossipsub {
*/
subscribe(pubsubTopic: string): void {
this.on(pubsubTopic, (event) => {
WakuMessage.decode(event.data).then((wakuMsg) => {
if (!wakuMsg) return;
dbg(`Message received on ${pubsubTopic}`);
WakuMessage.decode(event.data, Array.from(this.decPrivateKeys))
.then((wakuMsg) => {
if (!wakuMsg) {
dbg('Failed to decode Waku Message');
return;
}
if (this.observers['']) {
this.observers[''].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
if (this.observers['']) {
this.observers[''].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
});
if (wakuMsg.contentTopic) {
if (this.observers[wakuMsg.contentTopic]) {
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
callbackFn(wakuMsg);
});
}
}
})
.catch((e) => {
dbg('Failed to decode Waku Message', e);
});
});
super.subscribe(pubsubTopic);

View File

@ -41,6 +41,7 @@ export interface Args {
persistMessages?: boolean;
lightpush?: boolean;
topics?: string;
rpcPrivate?: boolean;
}
export enum LogLevel {
@ -53,6 +54,16 @@ export enum LogLevel {
Fatal = 'fatal',
}
export interface KeyPair {
privateKey: string;
publicKey: string;
}
export interface WakuRelayMessage {
payload: string;
contentTopic?: string;
}
export class NimWaku {
private process?: ChildProcess;
private pid?: number;
@ -196,6 +207,35 @@ export class NimWaku {
return msgs.filter(isDefined);
}
async getAsymmetricKeyPair(): Promise<KeyPair> {
this.checkProcess();
const { seckey, pubkey } = await this.rpcCall<{
seckey: string;
pubkey: string;
}>('get_waku_v2_private_v1_asymmetric_keypair', []);
return { privateKey: seckey, publicKey: pubkey };
}
async postAsymmetricMessage(
message: WakuRelayMessage,
publicKey: Uint8Array,
pubsubTopic?: string
): Promise<boolean> {
this.checkProcess();
if (!message.payload) {
throw 'Attempting to send empty message';
}
return this.rpcCall<boolean>('post_waku_v2_private_v1_asymmetric_message', [
pubsubTopic ? pubsubTopic : DefaultPubsubTopic,
message,
'0x' + bufToHex(publicKey),
]);
}
async getPeerId(): Promise<PeerId> {
return await this.setPeerId().then((res) => res.peerId);
}