From d68ee3fb7455254fff89bca4db6da60abc1bf4b0 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 12 Jul 2021 13:11:43 +1000 Subject: [PATCH 1/2] New `peers` and `randomPeer` methods to return available peers --- CHANGELOG.md | 2 ++ src/lib/select_peer.ts | 22 +++++++++++++--------- src/lib/waku_light_push/index.ts | 22 ++++++++++++++++++++-- src/lib/waku_store/index.ts | 22 ++++++++++++++++++++-- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dfdd696bcd..4fe694decb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Keep alive feature that pings host regularly, reducing the chance of connections being dropped due to idle. Can be disabled or default frequency (10s) can be changed when calling `Waku.create`. - New `lib/utils` module for easy, dependency-less hex/bytes conversions. +- New `peers` and `randomPeer` methods on `WakuStore` and `WakuLightPush` to have a better idea of available peers; + Note that it does not check whether Waku node is currently connected to said peers. ### Changed - **Breaking**: Auto select peer if none provided for store and light push protocols. diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts index 8e01850ec6..c509236bba 100644 --- a/src/lib/select_peer.ts +++ b/src/lib/select_peer.ts @@ -5,14 +5,18 @@ import { Peer } from 'libp2p/src/peer-store'; * Returns a pseudo-random peer that supports the given protocol. * Useful for protocols such as store and light push */ -export function selectRandomPeer( - libp2p: Libp2p, - protocol: string -): Peer | undefined { - const allPeers = Array.from(libp2p.peerStore.peers.values()); - const size = allPeers.length; - const peers = allPeers.filter((peer) => peer.protocols.includes(protocol)); +export function selectRandomPeer(peers: Peer[]): Peer | undefined { if (peers.length === 0) return; - const index = Math.round(Math.random() * (size - 1)); - return allPeers[index]; + + const index = Math.round(Math.random() * (peers.length - 1)); + return peers[index]; +} + +/** + * Returns the list of peers that supports the given protocol. + */ +export function getPeersForProtocol(libp2p: Libp2p, protocol: string): Peer[] { + return Array.from(libp2p.peerStore.peers.values()).filter((peer) => + peer.protocols.includes(protocol) + ); } diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 88551756de..f46155514c 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -2,10 +2,11 @@ import concat from 'it-concat'; import lp from 'it-length-prefixed'; import pipe from 'it-pipe'; import Libp2p from 'libp2p'; +import { Peer } from 'libp2p/src/peer-store'; import PeerId from 'peer-id'; import { PushResponse } from '../../proto/waku/v2/light_push'; -import { selectRandomPeer } from '../select_peer'; +import { getPeersForProtocol, selectRandomPeer } from '../select_peer'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; @@ -54,7 +55,7 @@ export class WakuLightPush { peer = this.libp2p.peerStore.get(opts.peerId); if (!peer) throw 'Peer is unknown'; } else { - peer = selectRandomPeer(this.libp2p, LightPushCodec); + peer = this.randomPeer; } if (!peer) throw 'No peer available'; if (!peer.protocols.includes(LightPushCodec)) @@ -93,4 +94,21 @@ export class WakuLightPush { } return null; } + + /** + * Returns known peers from the address book (`libp2p.peerStore`) that support + * light push protocol. Waku may or may not be currently connected to these peers. + */ + get peers(): Peer[] { + return getPeersForProtocol(this.libp2p, LightPushCodec); + } + + /** + * Returns a random peer that supports light push protocol from the address + * book (`libp2p.peerStore`). Waku may or may not be currently connected to + * this peer. + */ + get randomPeer(): Peer | undefined { + return selectRandomPeer(this.peers); + } } diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index c0cbe65bf1..242482cdbd 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -2,9 +2,10 @@ import concat from 'it-concat'; import lp from 'it-length-prefixed'; import pipe from 'it-pipe'; import Libp2p from 'libp2p'; +import { Peer } from 'libp2p/src/peer-store'; import PeerId from 'peer-id'; -import { selectRandomPeer } from '../select_peer'; +import { getPeersForProtocol, selectRandomPeer } from '../select_peer'; import { WakuMessage } from '../waku_message'; import { DefaultPubsubTopic } from '../waku_relay'; @@ -76,7 +77,7 @@ export class WakuStore { peer = this.libp2p.peerStore.get(opts.peerId); if (!peer) throw 'Peer is unknown'; } else { - peer = selectRandomPeer(this.libp2p, StoreCodec); + peer = this.randomPeer; } if (!peer) throw 'No peer available'; if (!peer.protocols.includes(StoreCodec)) @@ -164,4 +165,21 @@ export class WakuStore { } } } + + /** + * Returns known peers from the address book (`libp2p.peerStore`) that support + * store protocol. Waku may or may not be currently connected to these peers. + */ + get peers(): Peer[] { + return getPeersForProtocol(this.libp2p, StoreCodec); + } + + /** + * Returns a random peer that supports store protocol from the address + * book (`libp2p.peerStore`). Waku may or may not be currently connected to + * this peer. + */ + get randomPeer(): Peer | undefined { + return selectRandomPeer(this.peers); + } } From 0e9c482a197869ad0a93f73334071e9b84e70536 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Mon, 12 Jul 2021 13:13:00 +1000 Subject: [PATCH 2/2] Enable decryption of messages retrieve via `WakuStore.queryHistory` --- CHANGELOG.md | 1 + src/lib/waku_store/index.spec.ts | 95 +++++++++++++++++++++++++++++++- src/lib/waku_store/index.ts | 14 ++++- 3 files changed, 108 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4fe694decb..571d84a051 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - New `lib/utils` module for easy, dependency-less hex/bytes conversions. - New `peers` and `randomPeer` methods on `WakuStore` and `WakuLightPush` to have a better idea of available peers; Note that it does not check whether Waku node is currently connected to said peers. +- Enable passing decryption private keys to `WakuStore.queryHistory`. ### Changed - **Breaking**: Auto select peer if none provided for store and light push protocols. diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 2a5b935b5d..ff220a9026 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -1,12 +1,22 @@ import { expect } from 'chai'; +import debug from 'debug'; import TCP from 'libp2p-tcp'; -import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; +import { + makeLogFileName, + NimWaku, + NOISE_KEY_1, + NOISE_KEY_2, +} from '../../test_utils'; +import { delay } from '../delay'; import { Waku } from '../waku'; import { DefaultContentTopic, WakuMessage } from '../waku_message'; +import { generatePrivateKey, getPublicKey } from '../waku_message/version_1'; import { Direction } from './history_rpc'; +const dbg = debug('waku:test:store'); + describe('Waku Store', () => { let waku: Waku; let nimWaku: NimWaku; @@ -133,4 +143,87 @@ describe('Waku Store', () => { }); expect(result).to.not.eq(-1); }); + + it('Retrieves history with asymmetric encrypted messages', async function () { + this.timeout(10_000); + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true, lightpush: true }); + + const encryptedMessageText = 'This message is encrypted for me'; + const clearMessageText = + 'This is a clear text message for everyone to read'; + const otherEncMessageText = + 'This message is not for and I must not be able to read it'; + + const privateKey = generatePrivateKey(); + const publicKey = getPublicKey(privateKey); + + const [encryptedMessage, clearMessage, otherEncMessage] = await Promise.all( + [ + WakuMessage.fromUtf8String(encryptedMessageText, { + encPublicKey: publicKey, + }), + WakuMessage.fromUtf8String(clearMessageText), + WakuMessage.fromUtf8String(otherEncMessageText, { + encPublicKey: getPublicKey(generatePrivateKey()), + }), + ] + ); + + dbg('Messages have been encrypted'); + + const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ + Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }), + Waku.create({ + staticNoiseKey: NOISE_KEY_2, + libp2p: { modules: { transport: [TCP] } }, + }), + nimWaku.getMultiaddrWithId(), + ]); + + dbg('Waku nodes created'); + + await Promise.all([ + waku1.dial(nimWakuMultiaddr), + waku2.dial(nimWakuMultiaddr), + ]); + + dbg('Waku nodes connected to nim Waku'); + + let lightPushPeers = waku1.lightPush.peers; + while (lightPushPeers.length == 0) { + await delay(100); + lightPushPeers = waku1.lightPush.peers; + } + + dbg('Sending messages using light push'); + await Promise.all([ + await waku1.lightPush.push(encryptedMessage), + waku1.lightPush.push(otherEncMessage), + waku1.lightPush.push(clearMessage), + ]); + + let storePeers = waku2.store.peers; + while (storePeers.length == 0) { + await delay(100); + storePeers = waku2.store.peers; + } + + dbg('Retrieve messages from store'); + const messages = await waku2.store.queryHistory({ + contentTopics: [], + decryptionPrivateKeys: [privateKey], + }); + + expect(messages?.length).eq(2); + if (!messages) throw 'Length was tested'; + expect(messages[0].payloadAsUtf8).to.eq(clearMessageText); + expect(messages[1].payloadAsUtf8).to.eq(encryptedMessageText); + + await Promise.all([waku1.stop(), waku2.stop()]); + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 242482cdbd..c1b4762538 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -1,3 +1,4 @@ +import debug from 'debug'; import concat from 'it-concat'; import lp from 'it-length-prefixed'; import pipe from 'it-pipe'; @@ -11,6 +12,8 @@ import { DefaultPubsubTopic } from '../waku_relay'; import { Direction, HistoryRPC } from './history_rpc'; +const dbg = debug('waku:store'); + export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export { Direction }; @@ -34,6 +37,7 @@ export interface QueryOptions { direction?: Direction; pageSize?: number; callback?: (messages: WakuMessage[]) => void; + decryptionPrivateKeys?: Uint8Array[]; } /** @@ -71,6 +75,7 @@ export class WakuStore { }, options ); + dbg('Querying history with the following options', options); let peer; if (opts.peerId) { @@ -115,10 +120,17 @@ export class WakuStore { return messages; } + dbg( + `${response.messages.length} messages retrieved for pubsub topic ${opts.pubsubTopic}` + ); + const pageMessages: WakuMessage[] = []; await Promise.all( response.messages.map(async (protoMsg) => { - const msg = await WakuMessage.decodeProto(protoMsg); + const msg = await WakuMessage.decodeProto( + protoMsg, + opts.decryptionPrivateKeys + ); if (msg) { messages.push(msg);