Merge pull request #927 from status-im/todos

This commit is contained in:
fryorcraken.eth 2022-09-07 13:28:30 +10:00 committed by GitHub
commit d9d237b7ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 84 additions and 97 deletions

View File

@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- Simple connection management that selects the most recent connection for store, light push and filter requests.
### Changed
- **breaking**: `DecryptionParams` may be passed when using `queryHistory` instead of just keys.
## [0.25.0] - 2022-09-5 ## [0.25.0] - 2022-09-5
### Changed ### Changed

44
package-lock.json generated
View File

@ -981,11 +981,11 @@
} }
}, },
"node_modules/@libp2p/interface-connection-manager": { "node_modules/@libp2p/interface-connection-manager": {
"version": "1.0.2", "version": "1.0.3",
"resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.2.tgz", "resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.3.tgz",
"integrity": "sha512-92gM7sZhVidD+vsQbc+LbI4MMvxgRjFy9kUrrsOosbtCt0nl68rIeRFKRpfX92/4QY40tL41VXT69ijCUskEwg==", "integrity": "sha512-zDDzAKbtCkqR/3AmZ3DAoK1bt+5vhyUruV8654R9IT5PI7IBBgFnYzvkWHDI/UDvhwT27ubofPagp0m25gQZvg==",
"dependencies": { "dependencies": {
"@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-peer-id": "^1.0.0", "@libp2p/interface-peer-id": "^1.0.0",
"@libp2p/interfaces": "^3.0.0" "@libp2p/interfaces": "^3.0.0"
}, },
@ -994,21 +994,6 @@
"npm": ">=7.0.0" "npm": ">=7.0.0"
} }
}, },
"node_modules/@libp2p/interface-connection-manager/node_modules/@libp2p/interface-connection": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/@libp2p/interface-connection/-/interface-connection-2.1.1.tgz",
"integrity": "sha512-gjugaMsZvfo3r4tCc/yPifVQsfLogmEmJtW+eXMNiNDna3ZfmwWD9Z+KyEwuVsXKs0C4GESXei2y4SJSCEfkbA==",
"dependencies": {
"@libp2p/interface-peer-id": "^1.0.0",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^10.2.0",
"it-stream-types": "^1.0.4"
},
"engines": {
"node": ">=16.0.0",
"npm": ">=7.0.0"
}
},
"node_modules/@libp2p/interface-content-routing": { "node_modules/@libp2p/interface-content-routing": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/@libp2p/interface-content-routing/-/interface-content-routing-1.0.2.tgz", "resolved": "https://registry.npmjs.org/@libp2p/interface-content-routing/-/interface-content-routing-1.0.2.tgz",
@ -12929,26 +12914,13 @@
} }
}, },
"@libp2p/interface-connection-manager": { "@libp2p/interface-connection-manager": {
"version": "1.0.2", "version": "1.0.3",
"resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.2.tgz", "resolved": "https://registry.npmjs.org/@libp2p/interface-connection-manager/-/interface-connection-manager-1.0.3.tgz",
"integrity": "sha512-92gM7sZhVidD+vsQbc+LbI4MMvxgRjFy9kUrrsOosbtCt0nl68rIeRFKRpfX92/4QY40tL41VXT69ijCUskEwg==", "integrity": "sha512-zDDzAKbtCkqR/3AmZ3DAoK1bt+5vhyUruV8654R9IT5PI7IBBgFnYzvkWHDI/UDvhwT27ubofPagp0m25gQZvg==",
"requires": { "requires": {
"@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-peer-id": "^1.0.0", "@libp2p/interface-peer-id": "^1.0.0",
"@libp2p/interfaces": "^3.0.0" "@libp2p/interfaces": "^3.0.0"
},
"dependencies": {
"@libp2p/interface-connection": {
"version": "2.1.1",
"resolved": "https://registry.npmjs.org/@libp2p/interface-connection/-/interface-connection-2.1.1.tgz",
"integrity": "sha512-gjugaMsZvfo3r4tCc/yPifVQsfLogmEmJtW+eXMNiNDna3ZfmwWD9Z+KyEwuVsXKs0C4GESXei2y4SJSCEfkbA==",
"requires": {
"@libp2p/interface-peer-id": "^1.0.0",
"@libp2p/interfaces": "^3.0.0",
"@multiformats/multiaddr": "^10.2.0",
"it-stream-types": "^1.0.4"
}
}
} }
}, },
"@libp2p/interface-content-routing": { "@libp2p/interface-content-routing": {

View File

@ -10,16 +10,6 @@ export const ERR_TYPE_NOT_IMPLEMENTED = "Keypair type not implemented";
export * from "./types"; export * from "./types";
export * from "./secp256k1"; export * from "./secp256k1";
// TODO: Check if @libp2p/crypto methods can be used instead.
export async function generateKeypair(type: KeypairType): Promise<IKeypair> {
switch (type) {
case KeypairType.secp256k1:
return await Secp256k1Keypair.generate();
default:
throw new Error(ERR_TYPE_NOT_IMPLEMENTED);
}
}
export function createKeypair( export function createKeypair(
type: KeypairType, type: KeypairType,
privateKey?: Uint8Array, privateKey?: Uint8Array,

View File

@ -0,0 +1,24 @@
import { Connection } from "@libp2p/interface-connection";
export function selectConnection(
connections: Connection[]
): Connection | undefined {
if (!connections.length) return;
if (connections.length === 1) return connections[0];
let latestConnection: Connection | undefined;
connections.forEach((connection) => {
if (connection.stat.status === "OPEN") {
if (!latestConnection) {
latestConnection = connection;
} else if (
connection.stat.timeline.open > latestConnection.stat.timeline.open
) {
latestConnection = connection;
}
}
});
return latestConnection;
}

View File

@ -10,6 +10,7 @@ import type { Libp2p } from "libp2p";
import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { WakuMessage as WakuMessageProto } from "../../proto/message";
import { DefaultPubSubTopic } from "../constants"; import { DefaultPubSubTopic } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils"; import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message"; import { DecryptionMethod, WakuMessage } from "../waku_message";
@ -166,7 +167,7 @@ export class WakuFilter {
return; return;
} }
const decryptionKeys = Array.from(this.decryptionKeys).map( const decryptionParams = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => { ([key, { method, contentTopics }]) => {
return { return {
key, key,
@ -177,7 +178,7 @@ export class WakuFilter {
); );
for (const message of messages) { for (const message of messages) {
const decoded = await WakuMessage.decodeProto(message, decryptionKeys); const decoded = await WakuMessage.decodeProto(message, decryptionParams);
if (!decoded) { if (!decoded) {
log("Not able to decode message"); log("Not able to decode message");
continue; continue;
@ -216,17 +217,14 @@ export class WakuFilter {
} }
} }
// Should be able to remove any at next libp2p release >0.37.3
private async newStream(peer: Peer): Promise<Stream> { private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.connectionManager.getConnections(peer.id); const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) { const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer"); throw new Error("Failed to get a connection to the peer");
} }
// TODO: Appropriate connection selection return connection.newStream(FilterCodec);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: tsc is confused by the @libp2p/interface-connection type to use
return connections[0].newStream(FilterCodec);
} }
private async getPeer(peerId?: PeerId): Promise<Peer> { private async getPeer(peerId?: PeerId): Promise<Peer> {

View File

@ -8,6 +8,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import { PushResponse } from "../../proto/light_push"; import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants"; import { DefaultPubSubTopic } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { WakuMessage } from "../waku_message"; import { WakuMessage } from "../waku_message";
@ -59,10 +60,11 @@ export class WakuLightPush {
throw "Peer does not register waku light push protocol"; throw "Peer does not register waku light push protocol";
const connections = this.libp2p.connectionManager.getConnections(peer.id); const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) throw "Failed to get a connection to the peer"; const connection = selectConnection(connections);
// TODO: Appropriate connection management if (!connection) throw "Failed to get a connection to the peer";
const stream = await connections[0].newStream(LightPushCodec);
const stream = await connection.newStream(LightPushCodec);
try { try {
const pubSubTopic = opts?.pubSubTopic const pubSubTopic = opts?.pubSubTopic
? opts.pubSubTopic ? opts.pubSubTopic

View File

@ -38,7 +38,6 @@ export interface Options {
sigPrivKey?: Uint8Array; sigPrivKey?: Uint8Array;
} }
// TODO: Use this in Options
export interface DecryptionParams { export interface DecryptionParams {
key: Uint8Array; key: Uint8Array;
method?: DecryptionMethod; method?: DecryptionMethod;
@ -270,8 +269,7 @@ export class WakuMessage {
} }
get version(): number { get version(): number {
// TODO: absent value should be replaced by default // https://github.com/status-im/js-waku/issues/921
// value of the type by the protobuf decoder
return this.proto.version ?? 0; return this.proto.version ?? 0;
} }

View File

@ -184,7 +184,7 @@ export class WakuRelay extends GossipSub {
"gossipsub:message", "gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => { (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic === pubSubTopic) { if (event.detail.msg.topic === pubSubTopic) {
const decryptionKeys = Array.from(this.decryptionKeys).map( const decryptionParams = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => { ([key, { method, contentTopics }]) => {
return { return {
key, key,
@ -195,7 +195,7 @@ export class WakuRelay extends GossipSub {
); );
dbg(`Message received on ${pubSubTopic}`); dbg(`Message received on ${pubSubTopic}`);
WakuMessage.decode(event.detail.msg.data, decryptionKeys) WakuMessage.decode(event.detail.msg.data, decryptionParams)
.then((wakuMsg) => { .then((wakuMsg) => {
if (!wakuMsg) { if (!wakuMsg) {
dbg("Failed to decode Waku Message"); dbg("Failed to decode Waku Message");

View File

@ -228,10 +228,8 @@ describe("Waku Store", () => {
nwaku = new Nwaku(makeLogFileName(this)); nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true, store: true, lightpush: true }); await nwaku.start({ persistMessages: true, store: true, lightpush: true });
const encryptedAsymmetricMessageText = const encryptedAsymmetricMessageText = "asymmetric encryption";
"This message is encrypted for me using asymmetric"; const encryptedSymmetricMessageText = "symmetric encryption";
const encryptedSymmetricMessageText =
"This message is encrypted for me using symmetric encryption";
const clearMessageText = const clearMessageText =
"This is a clear text message for everyone to read"; "This is a clear text message for everyone to read";
const otherEncMessageText = const otherEncMessageText =
@ -304,14 +302,12 @@ describe("Waku Store", () => {
dbg("Retrieve messages from store"); dbg("Retrieve messages from store");
const messages = await waku2.store.queryHistory([], { const messages = await waku2.store.queryHistory([], {
decryptionKeys: [privateKey], decryptionParams: [{ key: privateKey }],
}); });
expect(messages?.length).eq(3); expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText);
if (!messages) throw "Length was tested"; expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[0].payloadAsUtf8).to.eq(clearMessageText); expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText);
expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText);
!!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e));
@ -411,7 +407,7 @@ describe("Waku Store", () => {
dbg("Retrieve messages from store"); dbg("Retrieve messages from store");
const messages = await waku2.store.queryHistory([], { const messages = await waku2.store.queryHistory([], {
decryptionKeys: [privateKey], decryptionParams: [{ key: privateKey }],
}); });
expect(messages?.length).eq(3); expect(messages?.length).eq(3);

View File

@ -10,9 +10,14 @@ import { Uint8ArrayList } from "uint8arraylist";
import * as protoV2Beta4 from "../../proto/store_v2beta4"; import * as protoV2Beta4 from "../../proto/store_v2beta4";
import { HistoryResponse } from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4";
import { DefaultPubSubTopic, StoreCodecs } from "../constants"; import { DefaultPubSubTopic, StoreCodecs } from "../constants";
import { selectConnection } from "../select_connection";
import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; import { getPeersForProtocol, selectRandomPeer } from "../select_peer";
import { hexToBytes } from "../utils"; import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message"; import {
DecryptionMethod,
DecryptionParams,
WakuMessage,
} from "../waku_message";
import { HistoryRPC, PageDirection } from "./history_rpc"; import { HistoryRPC, PageDirection } from "./history_rpc";
@ -90,7 +95,7 @@ export interface QueryOptions {
* It can be Asymmetric Private Keys and Symmetric Keys in the same array, * It can be Asymmetric Private Keys and Symmetric Keys in the same array,
* all keys will be tried with both methods. * all keys will be tried with both methods.
*/ */
decryptionKeys?: Array<Uint8Array | string>; decryptionParams?: DecryptionParams[];
} }
/** /**
@ -171,36 +176,30 @@ export class WakuStore {
Object.assign(opts, { storeCodec }); Object.assign(opts, { storeCodec });
const connections = this.libp2p.connectionManager.getConnections(peer.id); const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections || !connections.length) const connection = selectConnection(connections);
throw "Failed to get a connection to the peer";
const decryptionKeys = Array.from(this.decryptionKeys).map( if (!connection) throw "Failed to get a connection to the peer";
([key, { method, contentTopics }]) => {
return { let decryptionParams: DecryptionParams[] = [];
key,
method, this.decryptionKeys.forEach(({ method, contentTopics }, key) => {
contentTopics, decryptionParams.push({
}; key,
} method,
); contentTopics,
});
});
// Add the decryption keys passed to this function against the // Add the decryption keys passed to this function against the
// content topics also passed to this function. // content topics also passed to this function.
if (opts.decryptionKeys) { if (opts.decryptionParams) {
opts.decryptionKeys.forEach((key) => { decryptionParams = decryptionParams.concat(opts.decryptionParams);
decryptionKeys.push({
key: hexToBytes(key),
contentTopics: contentTopics.length ? contentTopics : undefined,
method: undefined,
});
});
} }
const messages: WakuMessage[] = []; const messages: WakuMessage[] = [];
let cursor = undefined; let cursor = undefined;
while (true) { while (true) {
// TODO: Some connection selection logic? const stream = await connection.newStream(storeCodec);
const stream = await connections[0].newStream(storeCodec);
const queryOpts = Object.assign(opts, { cursor }); const queryOpts = Object.assign(opts, { cursor });
const historyRpcQuery = HistoryRPC.createQuery(queryOpts); const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
dbg("Querying store peer", connections[0].remoteAddr.toString()); dbg("Querying store peer", connections[0].remoteAddr.toString());
@ -244,7 +243,7 @@ export class WakuStore {
const pageMessages: WakuMessage[] = []; const pageMessages: WakuMessage[] = [];
await Promise.all( await Promise.all(
response.messages.map(async (protoMsg) => { response.messages.map(async (protoMsg) => {
const msg = await WakuMessage.decodeProto(protoMsg, decryptionKeys); const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams);
if (msg) { if (msg) {
messages.push(msg); messages.push(msg);