mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-05-02 10:53:11 +00:00
Merge pull request #225 from status-im/179-asymmetric-enc-store
This commit is contained in:
commit
3d219d005d
@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
- Keep alive feature that pings host regularly, reducing the chance of connections being dropped due to idle.
|
- Keep alive feature that pings host regularly, reducing the chance of connections being dropped due to idle.
|
||||||
Can be disabled or default frequency (10s) can be changed when calling `Waku.create`.
|
Can be disabled or default frequency (10s) can be changed when calling `Waku.create`.
|
||||||
- New `lib/utils` module for easy, dependency-less hex/bytes conversions.
|
- New `lib/utils` module for easy, dependency-less hex/bytes conversions.
|
||||||
|
- New `peers` and `randomPeer` methods on `WakuStore` and `WakuLightPush` to have a better idea of available peers;
|
||||||
|
Note that it does not check whether Waku node is currently connected to said peers.
|
||||||
|
- Enable passing decryption private keys to `WakuStore.queryHistory`.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- **Breaking**: Auto select peer if none provided for store and light push protocols.
|
- **Breaking**: Auto select peer if none provided for store and light push protocols.
|
||||||
|
|||||||
@ -5,14 +5,18 @@ import { Peer } from 'libp2p/src/peer-store';
|
|||||||
* Returns a pseudo-random peer that supports the given protocol.
|
* Returns a pseudo-random peer that supports the given protocol.
|
||||||
* Useful for protocols such as store and light push
|
* Useful for protocols such as store and light push
|
||||||
*/
|
*/
|
||||||
export function selectRandomPeer(
|
export function selectRandomPeer(peers: Peer[]): Peer | undefined {
|
||||||
libp2p: Libp2p,
|
|
||||||
protocol: string
|
|
||||||
): Peer | undefined {
|
|
||||||
const allPeers = Array.from(libp2p.peerStore.peers.values());
|
|
||||||
const size = allPeers.length;
|
|
||||||
const peers = allPeers.filter((peer) => peer.protocols.includes(protocol));
|
|
||||||
if (peers.length === 0) return;
|
if (peers.length === 0) return;
|
||||||
const index = Math.round(Math.random() * (size - 1));
|
|
||||||
return allPeers[index];
|
const index = Math.round(Math.random() * (peers.length - 1));
|
||||||
|
return peers[index];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of peers that supports the given protocol.
|
||||||
|
*/
|
||||||
|
export function getPeersForProtocol(libp2p: Libp2p, protocol: string): Peer[] {
|
||||||
|
return Array.from(libp2p.peerStore.peers.values()).filter((peer) =>
|
||||||
|
peer.protocols.includes(protocol)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,10 +2,11 @@ import concat from 'it-concat';
|
|||||||
import lp from 'it-length-prefixed';
|
import lp from 'it-length-prefixed';
|
||||||
import pipe from 'it-pipe';
|
import pipe from 'it-pipe';
|
||||||
import Libp2p from 'libp2p';
|
import Libp2p from 'libp2p';
|
||||||
|
import { Peer } from 'libp2p/src/peer-store';
|
||||||
import PeerId from 'peer-id';
|
import PeerId from 'peer-id';
|
||||||
|
|
||||||
import { PushResponse } from '../../proto/waku/v2/light_push';
|
import { PushResponse } from '../../proto/waku/v2/light_push';
|
||||||
import { selectRandomPeer } from '../select_peer';
|
import { getPeersForProtocol, selectRandomPeer } from '../select_peer';
|
||||||
import { WakuMessage } from '../waku_message';
|
import { WakuMessage } from '../waku_message';
|
||||||
import { DefaultPubsubTopic } from '../waku_relay';
|
import { DefaultPubsubTopic } from '../waku_relay';
|
||||||
|
|
||||||
@ -54,7 +55,7 @@ export class WakuLightPush {
|
|||||||
peer = this.libp2p.peerStore.get(opts.peerId);
|
peer = this.libp2p.peerStore.get(opts.peerId);
|
||||||
if (!peer) throw 'Peer is unknown';
|
if (!peer) throw 'Peer is unknown';
|
||||||
} else {
|
} else {
|
||||||
peer = selectRandomPeer(this.libp2p, LightPushCodec);
|
peer = this.randomPeer;
|
||||||
}
|
}
|
||||||
if (!peer) throw 'No peer available';
|
if (!peer) throw 'No peer available';
|
||||||
if (!peer.protocols.includes(LightPushCodec))
|
if (!peer.protocols.includes(LightPushCodec))
|
||||||
@ -93,4 +94,21 @@ export class WakuLightPush {
|
|||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns known peers from the address book (`libp2p.peerStore`) that support
|
||||||
|
* light push protocol. Waku may or may not be currently connected to these peers.
|
||||||
|
*/
|
||||||
|
get peers(): Peer[] {
|
||||||
|
return getPeersForProtocol(this.libp2p, LightPushCodec);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a random peer that supports light push protocol from the address
|
||||||
|
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
|
||||||
|
* this peer.
|
||||||
|
*/
|
||||||
|
get randomPeer(): Peer | undefined {
|
||||||
|
return selectRandomPeer(this.peers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,12 +1,22 @@
|
|||||||
import { expect } from 'chai';
|
import { expect } from 'chai';
|
||||||
|
import debug from 'debug';
|
||||||
import TCP from 'libp2p-tcp';
|
import TCP from 'libp2p-tcp';
|
||||||
|
|
||||||
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
|
import {
|
||||||
|
makeLogFileName,
|
||||||
|
NimWaku,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
NOISE_KEY_2,
|
||||||
|
} from '../../test_utils';
|
||||||
|
import { delay } from '../delay';
|
||||||
import { Waku } from '../waku';
|
import { Waku } from '../waku';
|
||||||
import { DefaultContentTopic, WakuMessage } from '../waku_message';
|
import { DefaultContentTopic, WakuMessage } from '../waku_message';
|
||||||
|
import { generatePrivateKey, getPublicKey } from '../waku_message/version_1';
|
||||||
|
|
||||||
import { Direction } from './history_rpc';
|
import { Direction } from './history_rpc';
|
||||||
|
|
||||||
|
const dbg = debug('waku:test:store');
|
||||||
|
|
||||||
describe('Waku Store', () => {
|
describe('Waku Store', () => {
|
||||||
let waku: Waku;
|
let waku: Waku;
|
||||||
let nimWaku: NimWaku;
|
let nimWaku: NimWaku;
|
||||||
@ -133,4 +143,87 @@ describe('Waku Store', () => {
|
|||||||
});
|
});
|
||||||
expect(result).to.not.eq(-1);
|
expect(result).to.not.eq(-1);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('Retrieves history with asymmetric encrypted messages', async function () {
|
||||||
|
this.timeout(10_000);
|
||||||
|
|
||||||
|
nimWaku = new NimWaku(makeLogFileName(this));
|
||||||
|
await nimWaku.start({ persistMessages: true, lightpush: true });
|
||||||
|
|
||||||
|
const encryptedMessageText = 'This message is encrypted for me';
|
||||||
|
const clearMessageText =
|
||||||
|
'This is a clear text message for everyone to read';
|
||||||
|
const otherEncMessageText =
|
||||||
|
'This message is not for and I must not be able to read it';
|
||||||
|
|
||||||
|
const privateKey = generatePrivateKey();
|
||||||
|
const publicKey = getPublicKey(privateKey);
|
||||||
|
|
||||||
|
const [encryptedMessage, clearMessage, otherEncMessage] = await Promise.all(
|
||||||
|
[
|
||||||
|
WakuMessage.fromUtf8String(encryptedMessageText, {
|
||||||
|
encPublicKey: publicKey,
|
||||||
|
}),
|
||||||
|
WakuMessage.fromUtf8String(clearMessageText),
|
||||||
|
WakuMessage.fromUtf8String(otherEncMessageText, {
|
||||||
|
encPublicKey: getPublicKey(generatePrivateKey()),
|
||||||
|
}),
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
dbg('Messages have been encrypted');
|
||||||
|
|
||||||
|
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
|
||||||
|
Waku.create({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
libp2p: { modules: { transport: [TCP] } },
|
||||||
|
}),
|
||||||
|
Waku.create({
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { modules: { transport: [TCP] } },
|
||||||
|
}),
|
||||||
|
nimWaku.getMultiaddrWithId(),
|
||||||
|
]);
|
||||||
|
|
||||||
|
dbg('Waku nodes created');
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
waku1.dial(nimWakuMultiaddr),
|
||||||
|
waku2.dial(nimWakuMultiaddr),
|
||||||
|
]);
|
||||||
|
|
||||||
|
dbg('Waku nodes connected to nim Waku');
|
||||||
|
|
||||||
|
let lightPushPeers = waku1.lightPush.peers;
|
||||||
|
while (lightPushPeers.length == 0) {
|
||||||
|
await delay(100);
|
||||||
|
lightPushPeers = waku1.lightPush.peers;
|
||||||
|
}
|
||||||
|
|
||||||
|
dbg('Sending messages using light push');
|
||||||
|
await Promise.all([
|
||||||
|
await waku1.lightPush.push(encryptedMessage),
|
||||||
|
waku1.lightPush.push(otherEncMessage),
|
||||||
|
waku1.lightPush.push(clearMessage),
|
||||||
|
]);
|
||||||
|
|
||||||
|
let storePeers = waku2.store.peers;
|
||||||
|
while (storePeers.length == 0) {
|
||||||
|
await delay(100);
|
||||||
|
storePeers = waku2.store.peers;
|
||||||
|
}
|
||||||
|
|
||||||
|
dbg('Retrieve messages from store');
|
||||||
|
const messages = await waku2.store.queryHistory({
|
||||||
|
contentTopics: [],
|
||||||
|
decryptionPrivateKeys: [privateKey],
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(messages?.length).eq(2);
|
||||||
|
if (!messages) throw 'Length was tested';
|
||||||
|
expect(messages[0].payloadAsUtf8).to.eq(clearMessageText);
|
||||||
|
expect(messages[1].payloadAsUtf8).to.eq(encryptedMessageText);
|
||||||
|
|
||||||
|
await Promise.all([waku1.stop(), waku2.stop()]);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -1,15 +1,19 @@
|
|||||||
|
import debug from 'debug';
|
||||||
import concat from 'it-concat';
|
import concat from 'it-concat';
|
||||||
import lp from 'it-length-prefixed';
|
import lp from 'it-length-prefixed';
|
||||||
import pipe from 'it-pipe';
|
import pipe from 'it-pipe';
|
||||||
import Libp2p from 'libp2p';
|
import Libp2p from 'libp2p';
|
||||||
|
import { Peer } from 'libp2p/src/peer-store';
|
||||||
import PeerId from 'peer-id';
|
import PeerId from 'peer-id';
|
||||||
|
|
||||||
import { selectRandomPeer } from '../select_peer';
|
import { getPeersForProtocol, selectRandomPeer } from '../select_peer';
|
||||||
import { WakuMessage } from '../waku_message';
|
import { WakuMessage } from '../waku_message';
|
||||||
import { DefaultPubsubTopic } from '../waku_relay';
|
import { DefaultPubsubTopic } from '../waku_relay';
|
||||||
|
|
||||||
import { Direction, HistoryRPC } from './history_rpc';
|
import { Direction, HistoryRPC } from './history_rpc';
|
||||||
|
|
||||||
|
const dbg = debug('waku:store');
|
||||||
|
|
||||||
export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
|
export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
|
||||||
|
|
||||||
export { Direction };
|
export { Direction };
|
||||||
@ -33,6 +37,7 @@ export interface QueryOptions {
|
|||||||
direction?: Direction;
|
direction?: Direction;
|
||||||
pageSize?: number;
|
pageSize?: number;
|
||||||
callback?: (messages: WakuMessage[]) => void;
|
callback?: (messages: WakuMessage[]) => void;
|
||||||
|
decryptionPrivateKeys?: Uint8Array[];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -70,13 +75,14 @@ export class WakuStore {
|
|||||||
},
|
},
|
||||||
options
|
options
|
||||||
);
|
);
|
||||||
|
dbg('Querying history with the following options', options);
|
||||||
|
|
||||||
let peer;
|
let peer;
|
||||||
if (opts.peerId) {
|
if (opts.peerId) {
|
||||||
peer = this.libp2p.peerStore.get(opts.peerId);
|
peer = this.libp2p.peerStore.get(opts.peerId);
|
||||||
if (!peer) throw 'Peer is unknown';
|
if (!peer) throw 'Peer is unknown';
|
||||||
} else {
|
} else {
|
||||||
peer = selectRandomPeer(this.libp2p, StoreCodec);
|
peer = this.randomPeer;
|
||||||
}
|
}
|
||||||
if (!peer) throw 'No peer available';
|
if (!peer) throw 'No peer available';
|
||||||
if (!peer.protocols.includes(StoreCodec))
|
if (!peer.protocols.includes(StoreCodec))
|
||||||
@ -114,10 +120,17 @@ export class WakuStore {
|
|||||||
return messages;
|
return messages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dbg(
|
||||||
|
`${response.messages.length} messages retrieved for pubsub topic ${opts.pubsubTopic}`
|
||||||
|
);
|
||||||
|
|
||||||
const pageMessages: WakuMessage[] = [];
|
const pageMessages: WakuMessage[] = [];
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
response.messages.map(async (protoMsg) => {
|
response.messages.map(async (protoMsg) => {
|
||||||
const msg = await WakuMessage.decodeProto(protoMsg);
|
const msg = await WakuMessage.decodeProto(
|
||||||
|
protoMsg,
|
||||||
|
opts.decryptionPrivateKeys
|
||||||
|
);
|
||||||
|
|
||||||
if (msg) {
|
if (msg) {
|
||||||
messages.push(msg);
|
messages.push(msg);
|
||||||
@ -164,4 +177,21 @@ export class WakuStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns known peers from the address book (`libp2p.peerStore`) that support
|
||||||
|
* store protocol. Waku may or may not be currently connected to these peers.
|
||||||
|
*/
|
||||||
|
get peers(): Peer[] {
|
||||||
|
return getPeersForProtocol(this.libp2p, StoreCodec);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a random peer that supports store protocol from the address
|
||||||
|
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
|
||||||
|
* this peer.
|
||||||
|
*/
|
||||||
|
get randomPeer(): Peer | undefined {
|
||||||
|
return selectRandomPeer(this.peers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user