Merge branch 'master' into chore-update-docs-link

This commit is contained in:
Sasha 2023-07-25 21:53:46 +02:00 committed by GitHub
commit 1feea5c24b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
54 changed files with 5311 additions and 13422 deletions

View File

@ -49,7 +49,10 @@
"error", "error",
{ "ignoreDeclarationSort": true, "ignoreCase": true } { "ignoreDeclarationSort": true, "ignoreCase": true }
], ],
"no-console": "error" "no-console": "error",
"@typescript-eslint/no-floating-promises": "error",
"@typescript-eslint/await-thenable": "error",
"@typescript-eslint/no-misused-promises": "error"
}, },
"overrides": [ "overrides": [
{ {

View File

@ -9,7 +9,7 @@ jobs:
name: Add issue to project name: Add issue to project
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/add-to-project@v0.3.0 - uses: actions/add-to-project@v0.5.0
with: with:
project-url: https://github.com/orgs/waku-org/projects/2 project-url: https://github.com/orgs/waku-org/projects/2
github-token: ${{ secrets.ADD_TO_PROJECT_PAT }} github-token: ${{ secrets.ADD_TO_PROJECT_PAT }}

17400
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -46,7 +46,16 @@
"lint-staged": "^13.2.2", "lint-staged": "^13.2.2",
"size-limit": "^8.1.2", "size-limit": "^8.1.2",
"typedoc": "^0.23.26", "typedoc": "^0.23.26",
"typedoc-plugin-resolve-crossmodule-references": "^0.3.3" "typedoc-plugin-resolve-crossmodule-references": "^0.3.3",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.62.0",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"prettier": "^2.8.8"
}, },
"lint-staged": { "lint-staged": {
"*.{ts,js}": [ "*.{ts,js}": [

View File

@ -0,0 +1,24 @@
module.exports = {
root: true,
parserOptions: {
ecmaVersion: 2021,
sourceType: "module",
},
env: {
browser: true,
es2021: true,
node: true,
},
extends: ["eslint:recommended"],
rules: {
indent: ["error", 2],
"linebreak-style": ["error", "unix"],
quotes: ["error", "double"],
semi: ["error", "always"],
"no-unused-vars": [
"error",
{ vars: "all", args: "after-used", ignoreRestSiblings: false },
],
"no-console": "warn",
},
};

View File

@ -21,14 +21,5 @@
"bugs": { "bugs": {
"url": "https://github.com/waku-org/js-waku/issues" "url": "https://github.com/waku-org/js-waku/issues"
}, },
"homepage": "https://github.com/waku-org/js-waku#readme", "homepage": "https://github.com/waku-org/js-waku#readme"
"devDependencies": {
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"prettier": "^2.8.8"
}
} }

View File

@ -85,11 +85,11 @@
"uuid": "^9.0.0" "uuid": "^9.0.0"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-connection": "^3.0.8", "@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-libp2p": "^1.1.2", "@libp2p/interface-libp2p": "^3.2.0",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interface-registrar": "^2.0.8", "@libp2p/interface-registrar": "^2.0.12",
"@multiformats/multiaddr": "^12.0.0", "@multiformats/multiaddr": "^12.0.0",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
@ -98,17 +98,9 @@
"@types/debug": "^4.1.7", "@types/debug": "^4.1.7",
"@types/mocha": "^10.0.1", "@types/mocha": "^10.0.1",
"@types/uuid": "^9.0.1", "@types/uuid": "^9.0.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"fast-check": "^3.8.1", "fast-check": "^3.8.1",
"ignore-loader": "^0.1.2", "ignore-loader": "^0.1.2",
"isomorphic-fetch": "^3.0.0", "isomorphic-fetch": "^3.0.0",
@ -128,7 +120,7 @@
}, },
"peerDependencies": { "peerDependencies": {
"@multiformats/multiaddr": "^12.0.0", "@multiformats/multiaddr": "^12.0.0",
"libp2p": "^0.42.2" "libp2p": "^0.45.9"
}, },
"peerDependenciesMeta": { "peerDependenciesMeta": {
"@multiformats/multiaddr": { "@multiformats/multiaddr": {

View File

@ -1,6 +1,8 @@
import type { Connection, Stream } from "@libp2p/interface-connection"; import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer, PeerStore } from "@libp2p/interface-peer-store"; import { Peer, PeerStore } from "@libp2p/interface-peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import { import {
getPeersForProtocol, getPeersForProtocol,
selectConnection, selectConnection,
@ -11,19 +13,29 @@ import {
* A class with predefined helpers, to be used as a base to implement Waku * A class with predefined helpers, to be used as a base to implement Waku
* Protocols. * Protocols.
*/ */
export class BaseProtocol { export class BaseProtocol implements IBaseProtocol {
constructor( public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public multicodec: string, public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
public peerStore: PeerStore,
protected getConnections: (peerId?: PeerId) => Connection[] constructor(public multicodec: string, private components: Libp2pComponents) {
) {} this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
components.events
);
}
public get peerStore(): PeerStore {
return this.components.peerStore;
}
/** /**
* Returns known peers from the address book (`libp2p.peerStore`) that support * Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these * the class protocol. Waku may or may not be currently connected to these
* peers. * peers.
*/ */
async peers(): Promise<Peer[]> { public async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]); return getPeersForProtocol(this.peerStore, [this.multicodec]);
} }
@ -36,7 +48,9 @@ export class BaseProtocol {
return peer; return peer;
} }
protected async newStream(peer: Peer): Promise<Stream> { protected async newStream(peer: Peer): Promise<Stream> {
const connections = this.getConnections(peer.id); const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections); const connection = selectConnection(connections);
if (!connection) { if (!connection) {
throw new Error("Failed to get a connection to the peer"); throw new Error("Failed to get a connection to the peer");

View File

@ -1,9 +1,7 @@
import type { Connection } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info"; import type { PeerInfo } from "@libp2p/interface-peer-info";
import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces"; import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces";
import { Tags } from "@waku/interfaces"; import { Libp2p, Tags } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import { KeepAliveManager, KeepAliveOptions } from "./keep_alive_manager.js"; import { KeepAliveManager, KeepAliveOptions } from "./keep_alive_manager.js";
@ -18,7 +16,7 @@ export class ConnectionManager {
private static instances = new Map<string, ConnectionManager>(); private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager; private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions; private options: ConnectionManagerOptions;
private libp2pComponents: Libp2p; private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map(); private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map(); private dialErrorsForPeer: Map<string, any> = new Map();
@ -47,12 +45,12 @@ export class ConnectionManager {
} }
private constructor( private constructor(
libp2pComponents: Libp2p, libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions, keepAliveOptions: KeepAliveOptions,
relay?: IRelay, relay?: IRelay,
options?: Partial<ConnectionManagerOptions> options?: Partial<ConnectionManagerOptions>
) { ) {
this.libp2pComponents = libp2pComponents; this.libp2p = libp2p;
this.options = { this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER, maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
@ -69,17 +67,17 @@ export class ConnectionManager {
// libp2p emits `peer:discovery` events during its initialization // libp2p emits `peer:discovery` events during its initialization
// which means that before the ConnectionManager is initialized, some peers may have been discovered // which means that before the ConnectionManager is initialized, some peers may have been discovered
// we will dial the peers in peerStore ONCE before we start to listen to the `peer:discovery` events within the ConnectionManager // we will dial the peers in peerStore ONCE before we start to listen to the `peer:discovery` events within the ConnectionManager
this.dialPeerStorePeers(); this.dialPeerStorePeers().catch((error) =>
log(`Unexpected error while dialing peer store peers`, error)
);
} }
private async dialPeerStorePeers(): Promise<void> { private async dialPeerStorePeers(): Promise<void> {
const peerInfos = await this.libp2pComponents.peerStore.all(); const peerInfos = await this.libp2p.peerStore.all();
const dialPromises = []; const dialPromises = [];
for (const peerInfo of peerInfos) { for (const peerInfo of peerInfos) {
if ( if (
this.libp2pComponents this.libp2p.getConnections().find((c) => c.remotePeer === peerInfo.id)
.getConnections()
.find((c) => c.remotePeer === peerInfo.id)
) )
continue; continue;
@ -101,15 +99,15 @@ export class ConnectionManager {
stop(): void { stop(): void {
this.keepAliveManager.stopAll(); this.keepAliveManager.stopAll();
this.libp2pComponents.removeEventListener( this.libp2p.removeEventListener(
"peer:connect", "peer:connect",
this.onEventHandlers["peer:connect"] this.onEventHandlers["peer:connect"]
); );
this.libp2pComponents.removeEventListener( this.libp2p.removeEventListener(
"peer:disconnect", "peer:disconnect",
this.onEventHandlers["peer:disconnect"] this.onEventHandlers["peer:disconnect"]
); );
this.libp2pComponents.removeEventListener( this.libp2p.removeEventListener(
"peer:discovery", "peer:discovery",
this.onEventHandlers["peer:discovery"] this.onEventHandlers["peer:discovery"]
); );
@ -121,12 +119,12 @@ export class ConnectionManager {
while (dialAttempt <= this.options.maxDialAttemptsForPeer) { while (dialAttempt <= this.options.maxDialAttemptsForPeer) {
try { try {
log(`Dialing peer ${peerId.toString()}`); log(`Dialing peer ${peerId.toString()}`);
await this.libp2pComponents.dial(peerId); await this.libp2p.dial(peerId);
const tags = await this.getTagNamesForPeer(peerId); const tags = await this.getTagNamesForPeer(peerId);
// add tag to connection describing discovery mechanism // add tag to connection describing discovery mechanism
// don't add duplicate tags // don't add duplicate tags
this.libp2pComponents this.libp2p
.getConnections(peerId) .getConnections(peerId)
.forEach( .forEach(
(conn) => (conn.tags = Array.from(new Set([...conn.tags, ...tags]))) (conn) => (conn.tags = Array.from(new Set([...conn.tags, ...tags])))
@ -157,7 +155,7 @@ export class ConnectionManager {
}` }`
); );
this.dialErrorsForPeer.delete(peerId.toString()); this.dialErrorsForPeer.delete(peerId.toString());
return await this.libp2pComponents.peerStore.delete(peerId); return await this.libp2p.peerStore.delete(peerId);
} catch (error) { } catch (error) {
throw `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`; throw `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`;
} finally { } finally {
@ -168,7 +166,7 @@ export class ConnectionManager {
async dropConnection(peerId: PeerId): Promise<void> { async dropConnection(peerId: PeerId): Promise<void> {
try { try {
await this.libp2pComponents.hangUp(peerId); await this.libp2p.hangUp(peerId);
log(`Dropped connection with peer ${peerId.toString()}`); log(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) { } catch (error) {
log( log(
@ -177,7 +175,7 @@ export class ConnectionManager {
} }
} }
private async processDialQueue(): Promise<void> { private processDialQueue(): void {
if ( if (
this.pendingPeerDialQueue.length > 0 && this.pendingPeerDialQueue.length > 0 &&
this.currentActiveDialCount < this.options.maxParallelDials this.currentActiveDialCount < this.options.maxParallelDials
@ -191,14 +189,14 @@ export class ConnectionManager {
} }
private startPeerDiscoveryListener(): void { private startPeerDiscoveryListener(): void {
this.libp2pComponents.peerStore.addEventListener( this.libp2p.addEventListener(
"peer", "peer:discovery",
this.onEventHandlers["peer:discovery"] this.onEventHandlers["peer:discovery"]
); );
} }
private startPeerConnectionListener(): void { private startPeerConnectionListener(): void {
this.libp2pComponents.addEventListener( this.libp2p.addEventListener(
"peer:connect", "peer:connect",
this.onEventHandlers["peer:connect"] this.onEventHandlers["peer:connect"]
); );
@ -217,7 +215,7 @@ export class ConnectionManager {
* >this event will **only** be triggered when the last connection is closed. * >this event will **only** be triggered when the last connection is closed.
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100 * @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
*/ */
this.libp2pComponents.addEventListener( this.libp2p.addEventListener(
"peer:disconnect", "peer:disconnect",
this.onEventHandlers["peer:disconnect"] this.onEventHandlers["peer:disconnect"]
); );
@ -237,41 +235,44 @@ export class ConnectionManager {
} }
private onEventHandlers = { private onEventHandlers = {
"peer:discovery": async (evt: CustomEvent<PeerInfo>): Promise<void> => { "peer:discovery": (evt: CustomEvent<PeerInfo>): void => {
const { id: peerId } = evt.detail; void (async () => {
const { id: peerId } = evt.detail;
this.attemptDial(peerId).catch((err) => try {
log(`Error dialing peer ${peerId.toString()} : ${err}`) await this.attemptDial(peerId);
); } catch (error) {
}, log(`Error dialing peer ${peerId.toString()} : ${error}`);
"peer:connect": async (evt: CustomEvent<Connection>): Promise<void> => {
const { remotePeer: peerId } = evt.detail;
this.keepAliveManager.start(
peerId,
this.libp2pComponents.ping.bind(this)
);
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);
if (isBootstrap) {
const bootstrapConnections = this.libp2pComponents
.getConnections()
.filter((conn) => conn.tags.includes(Tags.BOOTSTRAP));
// If we have too many bootstrap connections, drop one
if (
bootstrapConnections.length > this.options.maxBootstrapPeersAllowed
) {
await this.dropConnection(peerId);
} }
} })();
},
"peer:connect": (evt: CustomEvent<PeerId>): void => {
void (async () => {
const peerId = evt.detail;
this.keepAliveManager.start(peerId, this.libp2p.services.ping);
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);
if (isBootstrap) {
const bootstrapConnections = this.libp2p
.getConnections()
.filter((conn) => conn.tags.includes(Tags.BOOTSTRAP));
// If we have too many bootstrap connections, drop one
if (
bootstrapConnections.length > this.options.maxBootstrapPeersAllowed
) {
await this.dropConnection(peerId);
}
}
})();
}, },
"peer:disconnect": () => { "peer:disconnect": () => {
return (evt: CustomEvent<Connection>): void => { return (evt: CustomEvent<PeerId>): void => {
this.keepAliveManager.stop(evt.detail.remotePeer); this.keepAliveManager.stop(evt.detail);
}; };
}, },
}; };
@ -282,19 +283,19 @@ export class ConnectionManager {
* 2. If the peer is not a bootstrap peer * 2. If the peer is not a bootstrap peer
*/ */
private async shouldDialPeer(peerId: PeerId): Promise<boolean> { private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
const isConnected = this.libp2pComponents.getConnections(peerId).length > 0; const isConnected = this.libp2p.getConnections(peerId).length > 0;
if (isConnected) return false; if (isConnected) return false;
const isBootstrap = (await this.getTagNamesForPeer(peerId)).some( const tagNames = await this.getTagNamesForPeer(peerId);
(tagName) => tagName === Tags.BOOTSTRAP
); const isBootstrap = tagNames.some((tagName) => tagName === Tags.BOOTSTRAP);
if (isBootstrap) { if (isBootstrap) {
const currentBootstrapConnections = this.libp2pComponents const currentBootstrapConnections = this.libp2p
.getConnections() .getConnections()
.filter((conn) => { .filter((conn) => {
conn.tags.find((name) => name === Tags.BOOTSTRAP); return conn.tags.find((name) => name === Tags.BOOTSTRAP);
}).length; }).length;
if (currentBootstrapConnections < this.options.maxBootstrapPeersAllowed) if (currentBootstrapConnections < this.options.maxBootstrapPeersAllowed)
return true; return true;
@ -309,9 +310,12 @@ export class ConnectionManager {
* Fetches the tag names for a given peer * Fetches the tag names for a given peer
*/ */
private async getTagNamesForPeer(peerId: PeerId): Promise<string[]> { private async getTagNamesForPeer(peerId: PeerId): Promise<string[]> {
const tags = (await this.libp2pComponents.peerStore.getTags(peerId)).map( try {
(tag) => tag.name const peer = await this.libp2p.peerStore.get(peerId);
); return Array.from(peer.tags.keys());
return tags; } catch (error) {
log(`Failed to get peer ${peerId}, error: ${error}`);
return [];
}
} }
} }

View File

@ -1,4 +1,3 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type { import type {
@ -9,6 +8,7 @@ import type {
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IFilter, IFilter,
Libp2p,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -50,11 +50,11 @@ class Filter extends BaseProtocol implements IFilter {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private subscriptions: Map<RequestID, unknown>; private subscriptions: Map<RequestID, unknown>;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p.peerStore, libp2p.getConnections.bind(libp2p)); super(FilterCodec, libp2p.components);
this.options = options ?? {}; this.options = options ?? {};
this.subscriptions = new Map(); this.subscriptions = new Map();
this.libp2p libp2p
.handle(this.multicodec, this.onRequest.bind(this)) .handle(this.multicodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e)); .catch((e) => log("Failed to register filter protocol", e));
} }
@ -200,7 +200,7 @@ class Filter extends BaseProtocol implements IFilter {
// We don't want to wait for decoding failure, just attempt to decode // We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works // all messages and do the call back on the one that works
// noinspection ES6MissingAwait // noinspection ES6MissingAwait
decoders.forEach(async (dec: IDecoder<T>) => { for (const dec of decoders) {
if (didDecodeMsg) return; if (didDecodeMsg) return;
const decoded = await dec.fromProtoObj( const decoded = await dec.fromProtoObj(
pubSubTopic, pubSubTopic,
@ -208,13 +208,13 @@ class Filter extends BaseProtocol implements IFilter {
); );
if (!decoded) { if (!decoded) {
log("Not able to decode message"); log("Not able to decode message");
return; continue;
} }
// This is just to prevent more decoding attempt // This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises // TODO: Could be better if we were to abort promises
didDecodeMsg = Boolean(decoded); didDecodeMsg = Boolean(decoded);
await callback(decoded); await callback(decoded);
}); }
} }
} }

View File

@ -1,5 +1,4 @@
import { Stream } from "@libp2p/interface-connection"; import { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { IncomingStreamData } from "@libp2p/interface-registrar";
@ -12,6 +11,7 @@ import type {
IFilterV2, IFilterV2,
IProtoMessage, IProtoMessage,
IReceiver, IReceiver,
Libp2p,
PeerIdStr, PeerIdStr,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
@ -245,18 +245,12 @@ class FilterV2 extends BaseProtocol implements IReceiver {
return subscription; return subscription;
} }
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super( super(FilterV2Codecs.SUBSCRIBE, libp2p.components);
FilterV2Codecs.SUBSCRIBE,
libp2p.peerStore,
libp2p.getConnections.bind(libp2p)
);
this.libp2p libp2p.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this)).catch((e) => {
.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this)) log("Failed to register ", FilterV2Codecs.PUSH, e);
.catch((e) => { });
log("Failed to register ", FilterV2Codecs.PUSH, e);
});
this.activeSubscriptions = new Map(); this.activeSubscriptions = new Map();
@ -312,7 +306,7 @@ class FilterV2 extends BaseProtocol implements IReceiver {
): Promise<Unsubscribe> { ): Promise<Unsubscribe> {
const subscription = await this.createSubscription(undefined, opts?.peerId); const subscription = await this.createSubscription(undefined, opts?.peerId);
subscription.subscribe(decoders, callback); await subscription.subscribe(decoders, callback);
const contentTopics = Array.from( const contentTopics = Array.from(
groupByContentTopic( groupByContentTopic(
@ -394,20 +388,19 @@ async function pushMessage<T extends IDecodedMessage>(
// We don't want to wait for decoding failure, just attempt to decode // We don't want to wait for decoding failure, just attempt to decode
// all messages and do the call back on the one that works // all messages and do the call back on the one that works
// noinspection ES6MissingAwait // noinspection ES6MissingAwait
decoders.forEach(async (dec: IDecoder<T>) => { for (const dec of decoders) {
if (didDecodeMsg) return; if (didDecodeMsg) break;
const decoded = await dec.fromProtoObj( const decoded = await dec.fromProtoObj(
pubSubTopic, pubSubTopic,
message as IProtoMessage message as IProtoMessage
); );
// const decoded = await dec.fromProtoObj(pubSubTopic, message);
if (!decoded) { if (!decoded) {
log("Not able to decode message"); log("Not able to decode message");
return; continue;
} }
// This is just to prevent more decoding attempt // This is just to prevent more decoding attempt
// TODO: Could be better if we were to abort promises // TODO: Could be better if we were to abort promises
didDecodeMsg = Boolean(decoded); didDecodeMsg = Boolean(decoded);
await callback(decoded); await callback(decoded);
}); }
} }

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { IRelay } from "@waku/interfaces"; import type { IRelay } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import type { Libp2p } from "libp2p"; import type { PingService } from "libp2p/ping";
import { createEncoder } from "../index.js"; import { createEncoder } from "../index.js";
@ -26,7 +26,7 @@ export class KeepAliveManager {
this.relay = relay; this.relay = relay;
} }
public start(peerId: PeerId, libp2pPing: Libp2p["ping"]): void { public start(peerId: PeerId, libp2pPing: PingService): void {
// Just in case a timer already exist for this peer // Just in case a timer already exist for this peer
this.stop(peerId); this.stop(peerId);
@ -37,7 +37,7 @@ export class KeepAliveManager {
if (pingPeriodSecs !== 0) { if (pingPeriodSecs !== 0) {
const interval = setInterval(() => { const interval = setInterval(() => {
libp2pPing(peerId).catch((e) => { libp2pPing.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e); log(`Ping failed (${peerIdStr})`, e);
}); });
}, pingPeriodSecs * 1000); }, pingPeriodSecs * 1000);

View File

@ -1,9 +1,9 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import { import {
IEncoder, IEncoder,
ILightPush, ILightPush,
IMessage, IMessage,
Libp2p,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
SendError, SendError,
@ -33,8 +33,8 @@ export { PushResponse };
class LightPush extends BaseProtocol implements ILightPush { class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.peerStore, libp2p.getConnections.bind(libp2p)); super(LightPushCodec, libp2p.components);
this.options = options || {}; this.options = options || {};
} }

View File

@ -1,5 +1,4 @@
import type { Stream } from "@libp2p/interface-connection"; import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { import {
@ -7,6 +6,7 @@ import {
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IStore, IStore,
Libp2p,
ProtocolCreateOptions, ProtocolCreateOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto"; import { proto_store as proto } from "@waku/proto";
@ -81,8 +81,8 @@ export interface QueryOptions {
class Store extends BaseProtocol implements IStore { class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.peerStore, libp2p.getConnections.bind(libp2p)); super(StoreCodec, libp2p.components);
this.options = options ?? {}; this.options = options ?? {};
} }

View File

@ -1,5 +1,5 @@
import type { PeerProtocolsChangeData } from "@libp2p/interface-peer-store"; import type { IdentifyResult } from "@libp2p/interface-libp2p";
import type { IRelay, PointToPointProtocol, Waku } from "@waku/interfaces"; import type { IBaseProtocol, IRelay, Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import { pEvent } from "p-event"; import { pEvent } from "p-event";
@ -74,9 +74,7 @@ export async function waitForRemotePeer(
/** /**
* Wait for a peer with the given protocol to be connected. * Wait for a peer with the given protocol to be connected.
*/ */
async function waitForConnectedPeer( async function waitForConnectedPeer(protocol: IBaseProtocol): Promise<void> {
protocol: PointToPointProtocol
): Promise<void> {
const codec = protocol.multicodec; const codec = protocol.multicodec;
const peers = await protocol.peers(); const peers = await protocol.peers();
@ -86,14 +84,14 @@ async function waitForConnectedPeer(
} }
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => { const cb = (evt: CustomEvent<IdentifyResult>): void => {
if (evt.detail.protocols.includes(codec)) { if (evt.detail?.protocols?.includes(codec)) {
log("Resolving for", codec, evt.detail.protocols); log("Resolving for", codec, evt.detail.protocols);
protocol.peerStore.removeEventListener("change:protocols", cb); protocol.removeLibp2pEventListener("peer:identify", cb);
resolve(); resolve();
} }
}; };
protocol.peerStore.addEventListener("change:protocols", cb); protocol.addLibp2pEventListener("peer:identify", cb);
}); });
} }

View File

@ -1,5 +1,4 @@
import type { Stream } from "@libp2p/interface-connection"; import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import { isPeerId, PeerId } from "@libp2p/interface-peer-id"; import { isPeerId, PeerId } from "@libp2p/interface-peer-id";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import type { import type {
@ -8,6 +7,7 @@ import type {
ILightPush, ILightPush,
IRelay, IRelay,
IStore, IStore,
Libp2p,
Waku, Waku,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";

View File

@ -51,8 +51,8 @@
"node": ">=16" "node": ">=16"
}, },
"dependencies": { "dependencies": {
"@libp2p/interface-peer-discovery": "^1.0.5", "@libp2p/interface-peer-discovery": "^2.0.0",
"@libp2p/interfaces": "^3.3.1", "@libp2p/interfaces": "^3.3.2",
"@waku/enr": "0.0.14", "@waku/enr": "0.0.14",
"@waku/utils": "0.0.8", "@waku/utils": "0.0.8",
"debug": "^4.3.4", "debug": "^4.3.4",
@ -61,27 +61,19 @@
"uint8arrays": "^4.0.3" "uint8arrays": "^4.0.3"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-peer-info": "^1.0.8", "@libp2p/interface-peer-info": "^1.0.10",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/peer-id": "^2.0.3", "@libp2p/peer-id": "^2.0.4",
"@libp2p/peer-id-factory": "^2.0.3", "@libp2p/peer-id-factory": "^2.0.4",
"@multiformats/multiaddr": "^12.0.0", "@multiformats/multiaddr": "^12.0.0",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@types/chai": "^4.3.4", "@types/chai": "^4.3.4",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@waku/interfaces": "0.0.15", "@waku/interfaces": "0.0.15",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"karma": "^6.4.1", "karma": "^6.4.1",
"karma-chrome-launcher": "^3.1.1", "karma-chrome-launcher": "^3.1.1",
"karma-mocha": "^2.0.1", "karma-mocha": "^2.0.1",

View File

@ -2,7 +2,7 @@ import type {
PeerDiscovery, PeerDiscovery,
PeerDiscoveryEvents, PeerDiscoveryEvents,
} from "@libp2p/interface-peer-discovery"; } from "@libp2p/interface-peer-discovery";
import { symbol } from "@libp2p/interface-peer-discovery"; import { peerDiscovery as symbol } from "@libp2p/interface-peer-discovery";
import type { PeerInfo } from "@libp2p/interface-peer-info"; import type { PeerInfo } from "@libp2p/interface-peer-info";
import type { PeerStore } from "@libp2p/interface-peer-store"; import type { PeerStore } from "@libp2p/interface-peer-store";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
@ -97,30 +97,47 @@ export class PeerDiscoveryDns
); );
} }
for await (const peer of this.nextPeer()) { for await (const peerEnr of this.nextPeer()) {
if (!this._started) return; if (!this._started) {
return;
}
const peerInfo = peer.peerInfo; const peerInfo = peerEnr.peerInfo;
if (!peerInfo) continue;
if ( if (!peerInfo) {
(await this._components.peerStore.getTags(peerInfo.id)).find(
({ name }) => name === DEFAULT_BOOTSTRAP_TAG_NAME
)
)
continue; continue;
}
await this._components.peerStore.tagPeer( const tagsToUpdate = {
peerInfo.id, tags: {
DEFAULT_BOOTSTRAP_TAG_NAME, [DEFAULT_BOOTSTRAP_TAG_NAME]: {
{ value: this._options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE,
value: this._options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE, ttl: this._options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL,
ttl: this._options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL, },
},
};
let isPeerChanged = false;
const isPeerExists = await this._components.peerStore.has(peerInfo.id);
if (isPeerExists) {
const peer = await this._components.peerStore.get(peerInfo.id);
const hasBootstrapTag = peer.tags.has(DEFAULT_BOOTSTRAP_TAG_NAME);
if (!hasBootstrapTag) {
isPeerChanged = true;
await this._components.peerStore.merge(peerInfo.id, tagsToUpdate);
} }
); } else {
this.dispatchEvent( isPeerChanged = true;
new CustomEvent<PeerInfo>("peer", { detail: peerInfo }) await this._components.peerStore.save(peerInfo.id, tagsToUpdate);
); }
if (isPeerChanged) {
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { detail: peerInfo })
);
}
} }
} }

View File

@ -53,7 +53,7 @@
"dependencies": { "dependencies": {
"@ethersproject/rlp": "^5.7.0", "@ethersproject/rlp": "^5.7.0",
"@libp2p/crypto": "^1.0.17", "@libp2p/crypto": "^1.0.17",
"@libp2p/peer-id": "^2.0.3", "@libp2p/peer-id": "^2.0.4",
"@multiformats/multiaddr": "^12.0.0", "@multiformats/multiaddr": "^12.0.0",
"@noble/secp256k1": "^1.7.1", "@noble/secp256k1": "^1.7.1",
"@waku/utils": "0.0.8", "@waku/utils": "0.0.8",
@ -61,26 +61,18 @@
"js-sha3": "^0.8.0" "js-sha3": "^0.8.0"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-info": "^1.0.8", "@libp2p/interface-peer-info": "^1.0.10",
"@libp2p/peer-id-factory": "^2.0.3", "@libp2p/peer-id-factory": "^2.0.4",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@types/chai": "^4.3.4", "@types/chai": "^4.3.4",
"@types/mocha": "^10.0.1", "@types/mocha": "^10.0.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@waku/interfaces": "0.0.15", "@waku/interfaces": "0.0.15",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"karma": "^6.4.1", "karma": "^6.4.1",
"karma-chrome-launcher": "^3.1.1", "karma-chrome-launcher": "^3.1.1",
"karma-mocha": "^2.0.1", "karma-mocha": "^2.0.1",

View File

@ -129,11 +129,11 @@ describe("ENR", function () {
} }
}); });
it("should throw error - no public key", () => { it("should throw error - no public key", async () => {
try { try {
const txt = const txt =
"enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2d11eu6dCsxoQIB_c-jQMOXsbjWkbN-kj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk"; "enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2d11eu6dCsxoQIB_c-jQMOXsbjWkbN-kj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk";
EnrDecoder.fromString(txt); await EnrDecoder.fromString(txt);
assert.fail("Expect error here"); assert.fail("Expect error here");
} catch (err: unknown) { } catch (err: unknown) {
const e = err as Error; const e = err as Error;
@ -306,7 +306,7 @@ describe("ENR", function () {
}); });
}); });
describe("Location multiaddr", async () => { describe("Location multiaddr", () => {
const ip4 = "127.0.0.1"; const ip4 = "127.0.0.1";
const ip6 = "::1"; const ip6 = "::1";
const tcp = 8080; const tcp = 8080;
@ -414,7 +414,7 @@ describe("ENR", function () {
}); });
}); });
describe("waku2 key round trip", async () => { describe("waku2 key round trip", () => {
let peerId; let peerId;
let enr: ENR; let enr: ENR;
let waku2Protocols: Waku2; let waku2Protocols: Waku2;

View File

@ -47,27 +47,20 @@
"node": ">=16" "node": ">=16"
}, },
"devDependencies": { "devDependencies": {
"@chainsafe/libp2p-gossipsub": "^6.1.0", "@chainsafe/libp2p-gossipsub": "^9.1.0",
"@libp2p/interface-connection": "^3.0.8", "@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-connection-manager": "^1.3.7", "@libp2p/interface-connection-manager": "^3.0.1",
"@libp2p/interface-libp2p": "^1.1.2", "@libp2p/interface-libp2p": "^3.2.0",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-info": "^1.0.8", "@libp2p/interface-peer-info": "^1.0.10",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interface-registrar": "^2.0.8", "@libp2p/interface-registrar": "^2.0.12",
"@multiformats/multiaddr": "^12.0.0", "@multiformats/multiaddr": "^12.0.0",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"prettier": "^2.8.8", "prettier": "^2.8.8",
"typescript": "^5.0.4" "typescript": "^5.0.4",
"libp2p": "^0.45.9"
}, },
"typedoc": { "typedoc": {
"entryPoint": "./src/index.ts" "entryPoint": "./src/index.ts"

View File

@ -2,14 +2,14 @@ import type { PeerId } from "@libp2p/interface-peer-id";
import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic } from "./misc.js"; import type { ContentTopic } from "./misc.js";
import type { Callback, PointToPointProtocol } from "./protocols.js"; import type { Callback, IBaseProtocol } from "./protocols.js";
import type { IReceiver } from "./receiver.js"; import type { IReceiver } from "./receiver.js";
export type ContentFilter = { export type ContentFilter = {
contentTopic: string; contentTopic: string;
}; };
export type IFilter = IReceiver & PointToPointProtocol; export type IFilter = IReceiver & IBaseProtocol;
export interface IFilterV2Subscription { export interface IFilterV2Subscription {
subscribe<T extends IDecodedMessage>( subscribe<T extends IDecodedMessage>(
@ -25,7 +25,7 @@ export interface IFilterV2Subscription {
} }
export type IFilterV2 = IReceiver & export type IFilterV2 = IReceiver &
PointToPointProtocol & { IBaseProtocol & {
createSubscription( createSubscription(
pubSubTopic?: string, pubSubTopic?: string,
peerId?: PeerId peerId?: PeerId

View File

@ -11,3 +11,4 @@ export * from "./connection_manager.js";
export * from "./sender.js"; export * from "./sender.js";
export * from "./receiver.js"; export * from "./receiver.js";
export * from "./misc.js"; export * from "./misc.js";
export * from "./libp2p.js";

View File

@ -0,0 +1,21 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { Libp2p as BaseLibp2p } from "@libp2p/interface-libp2p";
import type { Libp2pInit } from "libp2p";
import type { identifyService } from "libp2p/identify";
import type { PingService } from "libp2p/ping";
export type Libp2pServices = {
ping: PingService;
pubsub?: GossipSub;
identify: ReturnType<ReturnType<typeof identifyService>>;
};
// TODO: Get libp2p to export this.
export type Libp2pComponents = Parameters<
Exclude<Libp2pInit["metrics"], undefined>
>[0];
// thought components are not defined on the Libp2p interface they are present on Libp2pNode class
export type Libp2p = BaseLibp2p<Libp2pServices> & {
components: Libp2pComponents;
};

View File

@ -1,4 +1,4 @@
import type { PointToPointProtocol } from "./protocols.js"; import type { IBaseProtocol } from "./protocols.js";
import type { ISender } from "./sender.js"; import type { ISender } from "./sender.js";
export type ILightPush = ISender & PointToPointProtocol; export type ILightPush = ISender & IBaseProtocol;

View File

@ -3,9 +3,9 @@ import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerStore } from "@libp2p/interface-peer-store"; import type { PeerStore } from "@libp2p/interface-peer-store";
import { IEnr } from "./enr.js"; import { IEnr } from "./enr.js";
import { PointToPointProtocol } from "./protocols.js"; import { IBaseProtocol } from "./protocols.js";
export interface IPeerExchange extends PointToPointProtocol { export interface IPeerExchange extends IBaseProtocol {
query(params: PeerExchangeQueryParams): Promise<PeerInfo[] | undefined>; query(params: PeerExchangeQueryParams): Promise<PeerInfo[] | undefined>;
} }

View File

@ -1,3 +1,4 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import type { Libp2pOptions } from "libp2p"; import type { Libp2pOptions } from "libp2p";
@ -11,10 +12,12 @@ export enum Protocols {
Filter = "filter", Filter = "filter",
} }
export interface PointToPointProtocol { export interface IBaseProtocol {
multicodec: string; multicodec: string;
peerStore: PeerStore; peerStore: PeerStore;
peers: () => Promise<Peer[]>; peers: () => Promise<Peer[]>;
addLibp2pEventListener: Libp2p["addEventListener"];
removeLibp2pEventListener: Libp2p["removeEventListener"];
} }
export type ProtocolCreateOptions = { export type ProtocolCreateOptions = {

View File

@ -1,5 +1,5 @@
import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IDecodedMessage, IDecoder } from "./message.js";
import type { PointToPointProtocol, ProtocolOptions } from "./protocols.js"; import type { IBaseProtocol, ProtocolOptions } from "./protocols.js";
export enum PageDirection { export enum PageDirection {
BACKWARD = "backward", BACKWARD = "backward",
@ -45,7 +45,7 @@ export type StoreQueryOptions = {
cursor?: Cursor; cursor?: Cursor;
} & ProtocolOptions; } & ProtocolOptions;
export interface IStore extends PointToPointProtocol { export interface IStore extends IBaseProtocol {
queryOrderedCallback: <T extends IDecodedMessage>( queryOrderedCallback: <T extends IDecodedMessage>(
decoders: IDecoder<T>[], decoders: IDecoder<T>[],
callback: (message: T) => Promise<void | boolean> | boolean | void, callback: (message: T) => Promise<void | boolean> | boolean | void,

View File

@ -1,9 +1,9 @@
import type { Stream } from "@libp2p/interface-connection"; import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Multiaddr } from "@multiformats/multiaddr"; import type { Multiaddr } from "@multiformats/multiaddr";
import type { IFilter, IFilterV2 } from "./filter.js"; import type { IFilter, IFilterV2 } from "./filter.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js"; import type { ILightPush } from "./light_push.js";
import { Protocols } from "./protocols.js"; import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js"; import type { IRelay } from "./relay.js";

View File

@ -80,28 +80,20 @@
"js-sha3": "^0.8.0" "js-sha3": "^0.8.0"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-connection": "^3.0.8", "@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-connection-manager": "^1.3.7", "@libp2p/interface-connection-manager": "^3.0.1",
"@libp2p/interface-libp2p": "^1.1.2", "@libp2p/interface-libp2p": "^3.2.0",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interface-registrar": "^2.0.8", "@libp2p/interface-registrar": "^2.0.12",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@types/chai": "^4.3.4", "@types/chai": "^4.3.4",
"@types/mocha": "^10.0.1", "@types/mocha": "^10.0.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"fast-check": "^3.8.1", "fast-check": "^3.8.1",
"karma": "^6.4.1", "karma": "^6.4.1",
"karma-chrome-launcher": "^3.1.1", "karma-chrome-launcher": "^3.1.1",

View File

@ -127,7 +127,7 @@ export async function encrypt(
): Promise<Uint8Array> { ): Promise<Uint8Array> {
const ephemPrivateKey = randomBytes(32); const ephemPrivateKey = randomBytes(32);
const sharedPx = await derive(ephemPrivateKey, publicKeyTo); const sharedPx = derive(ephemPrivateKey, publicKeyTo);
const hash = await kdf(sharedPx, 32); const hash = await kdf(sharedPx, 32);

View File

@ -148,7 +148,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
return; return;
} }
const res = await postCipher(payload); const res = postCipher(payload);
if (!res) { if (!res) {
log(`Failed to decode payload for contentTopic ${this.contentTopic}`); log(`Failed to decode payload for contentTopic ${this.contentTopic}`);

View File

@ -138,7 +138,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
return; return;
} }
const res = await postCipher(payload); const res = postCipher(payload);
if (!res) { if (!res) {
log(`Failed to decode payload for contentTopic ${this.contentTopic}`); log(`Failed to decode payload for contentTopic ${this.contentTopic}`);

View File

@ -62,18 +62,10 @@
"@types/chai": "^4.3.4", "@types/chai": "^4.3.4",
"@types/debug": "^4.1.7", "@types/debug": "^4.1.7",
"@types/mocha": "^10.0.1", "@types/mocha": "^10.0.1",
"@typescript-eslint/eslint-plugin": "^5.54.1",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@waku/interfaces": "0.0.15", "@waku/interfaces": "0.0.15",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.28.0", "cspell": "^6.28.0",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"fast-check": "^3.7.0", "fast-check": "^3.7.0",
"ignore-loader": "^0.1.2", "ignore-loader": "^0.1.2",
"isomorphic-fetch": "^3.0.0", "isomorphic-fetch": "^3.0.0",

View File

@ -48,38 +48,31 @@
"node": ">=16" "node": ">=16"
}, },
"dependencies": { "dependencies": {
"@libp2p/interface-peer-discovery": "^1.0.5", "@libp2p/interface-peer-discovery": "^2.0.0",
"@libp2p/interfaces": "^3.3.1", "@libp2p/interfaces": "^3.3.2",
"@waku/core": "0.0.20", "@waku/core": "0.0.20",
"@waku/enr": "0.0.14", "@waku/enr": "0.0.14",
"@waku/proto": "0.0.5", "@waku/proto": "0.0.5",
"@waku/utils": "0.0.8", "@waku/utils": "0.0.8",
"@waku/interfaces": "0.0.15",
"debug": "^4.3.4", "debug": "^4.3.4",
"it-all": "^3.0.2", "it-all": "^3.0.2",
"it-length-prefixed": "^9.0.1", "it-length-prefixed": "^9.0.1",
"it-pipe": "^3.0.1" "it-pipe": "^3.0.1"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-connection-manager": "^1.3.7", "@libp2p/interface-connection-manager": "^3.0.1",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-libp2p": "^3.2.0",
"@libp2p/interface-peer-info": "^1.0.8", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-info": "^1.0.10",
"@libp2p/interface-registrar": "^2.0.8", "@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interface-registrar": "^2.0.12",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@waku/interfaces": "0.0.15",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"prettier": "^2.8.8", "prettier": "^2.8.8",
"rollup": "^3.21.3", "rollup": "^3.21.3",

View File

@ -1,9 +1,8 @@
import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerStore } from "@libp2p/interface-peer-store";
import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { EnrDecoder } from "@waku/enr"; import { EnrDecoder } from "@waku/enr";
import type { import type {
IPeerExchange, IPeerExchange,
Libp2pComponents,
PeerExchangeQueryParams, PeerExchangeQueryParams,
PeerInfo, PeerInfo,
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -20,11 +19,6 @@ export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
const log = debug("waku:peer-exchange"); const log = debug("waku:peer-exchange");
export interface PeerExchangeComponents {
peerStore: PeerStore;
connectionManager: ConnectionManager;
}
/** /**
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/) * Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/ */
@ -34,14 +28,8 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
/** /**
* @param components - libp2p components * @param components - libp2p components
*/ */
constructor(public components: PeerExchangeComponents) { constructor(components: Libp2pComponents) {
super( super(PeerExchangeCodec, components);
PeerExchangeCodec,
components.peerStore,
components.connectionManager.getConnections.bind(
components.connectionManager
)
);
this.multicodec = PeerExchangeCodec; this.multicodec = PeerExchangeCodec;
} }
@ -102,8 +90,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* @returns A function that creates a new peer exchange protocol * @returns A function that creates a new peer exchange protocol
*/ */
export function wakuPeerExchange(): ( export function wakuPeerExchange(): (
components: PeerExchangeComponents components: Libp2pComponents
) => WakuPeerExchange { ) => WakuPeerExchange {
return (components: PeerExchangeComponents) => return (components: Libp2pComponents) => new WakuPeerExchange(components);
new WakuPeerExchange(components);
} }

View File

@ -1,19 +1,16 @@
import type { PeerUpdate } from "@libp2p/interface-libp2p";
import type { import type {
PeerDiscovery, PeerDiscovery,
PeerDiscoveryEvents, PeerDiscoveryEvents,
} from "@libp2p/interface-peer-discovery"; } from "@libp2p/interface-peer-discovery";
import { symbol } from "@libp2p/interface-peer-discovery"; import { peerDiscovery as symbol } from "@libp2p/interface-peer-discovery";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info"; import type { PeerInfo } from "@libp2p/interface-peer-info";
import type { PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import type { Libp2pComponents } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import { import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
PeerExchangeCodec,
PeerExchangeComponents,
WakuPeerExchange,
} from "./waku_peer_exchange.js";
const log = debug("waku:peer-exchange-discovery"); const log = debug("waku:peer-exchange-discovery");
@ -56,17 +53,19 @@ export class PeerExchangeDiscovery
extends EventEmitter<PeerDiscoveryEvents> extends EventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery implements PeerDiscovery
{ {
private readonly components: PeerExchangeComponents; private readonly components: Libp2pComponents;
private readonly peerExchange: WakuPeerExchange; private readonly peerExchange: WakuPeerExchange;
private readonly options: Options; private readonly options: Options;
private isStarted: boolean; private isStarted: boolean;
private queryingPeers: Set<string> = new Set(); private queryingPeers: Set<string> = new Set();
private queryAttempts: Map<string, number> = new Map(); private queryAttempts: Map<string, number> = new Map();
private readonly eventHandler = async ( private readonly handleDiscoveredPeer = (
event: CustomEvent<PeerProtocolsChangeData> event: CustomEvent<PeerUpdate>
): Promise<void> => { ): void => {
const { protocols, peerId } = event.detail; const {
peer: { protocols, id: peerId },
} = event.detail;
if ( if (
!protocols.includes(PeerExchangeCodec) || !protocols.includes(PeerExchangeCodec) ||
this.queryingPeers.has(peerId.toString()) this.queryingPeers.has(peerId.toString())
@ -79,7 +78,7 @@ export class PeerExchangeDiscovery
); );
}; };
constructor(components: PeerExchangeComponents, options: Options = {}) { constructor(components: Libp2pComponents, options: Options = {}) {
super(); super();
this.components = components; this.components = components;
this.peerExchange = new WakuPeerExchange(components); this.peerExchange = new WakuPeerExchange(components);
@ -97,9 +96,10 @@ export class PeerExchangeDiscovery
log("Starting peer exchange node discovery, discovering peers"); log("Starting peer exchange node discovery, discovering peers");
this.components.peerStore.addEventListener( // might be better to use "peer:identify" or "peer:update"
"change:protocols", this.components.events.addEventListener(
this.eventHandler "peer:update",
this.handleDiscoveredPeer
); );
} }
@ -111,9 +111,9 @@ export class PeerExchangeDiscovery
log("Stopping peer exchange node discovery"); log("Stopping peer exchange node discovery");
this.isStarted = false; this.isStarted = false;
this.queryingPeers.clear(); this.queryingPeers.clear();
this.components.peerStore.removeEventListener( this.components.events.removeEventListener(
"change:protocols", "peer:update",
this.eventHandler this.handleDiscoveredPeer
); );
} }
@ -143,9 +143,11 @@ export class PeerExchangeDiscovery
return; return;
} }
setTimeout(async () => { setTimeout(() => {
this.queryAttempts.set(peerIdStr, currentAttempt + 1); this.queryAttempts.set(peerIdStr, currentAttempt + 1);
await this.startRecurringQueries(peerId); this.startRecurringQueries(peerId).catch((error) => {
log(`Error in startRecurringQueries: ${error}`);
});
}, queryInterval * currentAttempt); }, queryInterval * currentAttempt);
}; };
@ -168,33 +170,31 @@ export class PeerExchangeDiscovery
} }
const { peerId, peerInfo } = ENR; const { peerId, peerInfo } = ENR;
if (!peerId || !peerInfo) {
if (!peerId || !peerInfo) continue;
const { multiaddrs } = peerInfo;
if (
(await this.components.peerStore.getTags(peerId)).find(
({ name }) => name === DEFAULT_PEER_EXCHANGE_TAG_NAME
)
)
continue; continue;
}
await this.components.peerStore.tagPeer( const hasPeer = await this.components.peerStore.has(peerId);
peerId, if (hasPeer) {
DEFAULT_PEER_EXCHANGE_TAG_NAME, continue;
{ }
value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE,
ttl: this.options.tagTTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL, // update the tags for the peer
} await this.components.peerStore.save(peerId, {
); tags: {
[DEFAULT_PEER_EXCHANGE_TAG_NAME]: {
value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE,
ttl: this.options.tagTTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL,
},
},
});
this.dispatchEvent( this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { new CustomEvent<PeerInfo>("peer", {
detail: { detail: {
id: peerId, id: peerId,
multiaddrs,
protocols: [], protocols: [],
multiaddrs: peerInfo.multiaddrs,
}, },
}) })
); );
@ -209,8 +209,8 @@ export class PeerExchangeDiscovery
} }
export function wakuPeerExchangeDiscovery(): ( export function wakuPeerExchangeDiscovery(): (
components: PeerExchangeComponents components: Libp2pComponents
) => PeerExchangeDiscovery { ) => PeerExchangeDiscovery {
return (components: PeerExchangeComponents) => return (components: Libp2pComponents) =>
new PeerExchangeDiscovery(components); new PeerExchangeDiscovery(components);
} }

View File

@ -50,8 +50,6 @@
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",

View File

@ -49,7 +49,7 @@
"node": ">=16" "node": ">=16"
}, },
"dependencies": { "dependencies": {
"@chainsafe/libp2p-gossipsub": "^6.1.0", "@chainsafe/libp2p-gossipsub": "^9.1.0",
"@noble/hashes": "^1.3.0", "@noble/hashes": "^1.3.0",
"@waku/core": "0.0.20", "@waku/core": "0.0.20",
"@waku/interfaces": "0.0.15", "@waku/interfaces": "0.0.15",
@ -60,16 +60,11 @@
"fast-check": "^3.8.1" "fast-check": "^3.8.1"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-pubsub": "^4.0.1",
"@rollup/plugin-commonjs": "^24.1.0", "@rollup/plugin-commonjs": "^24.1.0",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.8.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.8",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"rollup": "^3.15.0", "rollup": "^3.15.0",
"ts-loader": "^9.4.2", "ts-loader": "^9.4.2",
"ts-node": "^10.9.1", "ts-node": "^10.9.1",

View File

@ -6,7 +6,6 @@ import {
} from "@chainsafe/libp2p-gossipsub"; } from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PubSub } from "@libp2p/interface-pubsub"; import type { PubSub } from "@libp2p/interface-pubsub";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { DefaultPubSubTopic } from "@waku/core"; import { DefaultPubSubTopic } from "@waku/core";
@ -19,6 +18,7 @@ import {
IEncoder, IEncoder,
IMessage, IMessage,
IRelay, IRelay,
Libp2p,
ProtocolCreateOptions, ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
SendError, SendError,
@ -59,13 +59,13 @@ class Relay implements IRelay {
private observers: Map<ContentTopic, Set<unknown>>; private observers: Map<ContentTopic, Set<unknown>>;
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) { constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) {
if (!this.isRelayPubSub(libp2p.pubsub)) { if (!this.isRelayPubSub(libp2p.services.pubsub)) {
throw Error( throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}` `Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
); );
} }
this.gossipSub = libp2p.pubsub as GossipSub; this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
if (this.gossipSub.isStarted()) { if (this.gossipSub.isStarted()) {
@ -193,18 +193,26 @@ class Relay implements IRelay {
return; return;
} }
await Promise.all( await Promise.all(
Array.from(observers).map(async ({ decoder, callback }) => { Array.from(observers).map(({ decoder, callback }) => {
const protoMsg = await decoder.fromWireToProtoObj(bytes); return (async () => {
if (!protoMsg) { try {
log("Internal error: message previously decoded failed on 2nd pass."); const protoMsg = await decoder.fromWireToProtoObj(bytes);
return; if (!protoMsg) {
} log(
const msg = await decoder.fromProtoObj(pubSubTopic, protoMsg); "Internal error: message previously decoded failed on 2nd pass."
if (msg) { );
callback(msg); return;
} else { }
log("Failed to decode messages on", topicOnlyMsg.contentTopic); const msg = await decoder.fromProtoObj(pubSubTopic, protoMsg);
} if (msg) {
await callback(msg);
} else {
log("Failed to decode messages on", topicOnlyMsg.contentTopic);
}
} catch (error) {
log("Error while decoding message:", error);
}
})();
}) })
); );
} }
@ -217,7 +225,7 @@ class Relay implements IRelay {
private gossipSubSubscribe(pubSubTopic: string): void { private gossipSubSubscribe(pubSubTopic: string): void {
this.gossipSub.addEventListener( this.gossipSub.addEventListener(
"gossipsub:message", "gossipsub:message",
async (event: CustomEvent<GossipsubMessage>) => { (event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubSubTopic) return; if (event.detail.msg.topic !== pubSubTopic) return;
log(`Message received on ${pubSubTopic}`); log(`Message received on ${pubSubTopic}`);
@ -232,7 +240,7 @@ class Relay implements IRelay {
this.gossipSub.subscribe(pubSubTopic); this.gossipSub.subscribe(pubSubTopic);
} }
private isRelayPubSub(pubsub: PubSub): boolean { private isRelayPubSub(pubsub: PubSub | undefined): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) || false; return pubsub?.multicodecs?.includes(Relay.multicodec) || false;
} }
} }

View File

@ -48,44 +48,37 @@
"node": ">=16" "node": ">=16"
}, },
"dependencies": { "dependencies": {
"@chainsafe/libp2p-noise": "^11.0.0", "@chainsafe/libp2p-noise": "^12.0.1",
"@libp2p/mplex": "^7.1.1", "@libp2p/mplex": "^8.0.4",
"@libp2p/websockets": "^5.0.3", "@libp2p/websockets": "^6.0.3",
"@waku/utils": "0.0.8", "@waku/utils": "0.0.8",
"@waku/relay": "0.0.3", "@waku/relay": "0.0.3",
"@waku/core": "0.0.20", "@waku/core": "0.0.20",
"@waku/interfaces": "0.0.15", "@waku/interfaces": "0.0.15",
"@waku/dns-discovery": "0.0.14", "@waku/dns-discovery": "0.0.14",
"libp2p": "^0.42.2" "libp2p": "^0.45.9"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-address-manager": "^2.0.4", "@libp2p/interface-address-manager": "^3.0.1",
"@libp2p/interface-connection": "^3.0.8", "@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-connection-manager": "^1.3.7", "@libp2p/interface-connection-manager": "^3.0.1",
"@libp2p/interface-content-routing": "^2.1.1", "@libp2p/interface-content-routing": "^2.1.1",
"@libp2p/interface-dht": "^2.0.1", "@libp2p/interface-dht": "^2.0.3",
"@libp2p/interface-libp2p": "^1.1.2", "@libp2p/interface-libp2p": "^3.2.0",
"@libp2p/interface-metrics": "^4.0.7", "@libp2p/interface-metrics": "^4.0.8",
"@libp2p/interface-peer-discovery": "^1.0.5", "@libp2p/interface-peer-discovery": "^2.0.0",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-routing": "^1.0.8", "@libp2p/interface-peer-routing": "^1.1.1",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-store": "^2.0.4",
"@libp2p/interface-pubsub": "^3.0.6", "@libp2p/interface-pubsub": "^4.0.1",
"@libp2p/interface-registrar": "^2.0.8", "@libp2p/interface-registrar": "^2.0.12",
"@libp2p/interface-transport": "^2.1.1", "@libp2p/interface-transport": "^4.0.3",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@chainsafe/libp2p-gossipsub": "^9.1.0",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"interface-datastore": "^7.0.4", "interface-datastore": "^7.0.4",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"prettier": "^2.8.8", "prettier": "^2.8.8",

View File

@ -1,6 +1,5 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import { noise } from "@chainsafe/libp2p-noise"; import { noise } from "@chainsafe/libp2p-noise";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerDiscovery } from "@libp2p/interface-peer-discovery"; import type { PeerDiscovery } from "@libp2p/interface-peer-discovery";
import { mplex } from "@libp2p/mplex"; import { mplex } from "@libp2p/mplex";
import { webSockets } from "@libp2p/websockets"; import { webSockets } from "@libp2p/websockets";
@ -19,14 +18,16 @@ import type {
FullNode, FullNode,
IFilter, IFilter,
IFilterV2, IFilterV2,
Libp2p,
Libp2pComponents,
LightNode, LightNode,
ProtocolCreateOptions, ProtocolCreateOptions,
RelayNode, RelayNode,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { createLibp2p, Libp2pOptions } from "libp2p"; import { createLibp2p, Libp2pOptions } from "libp2p";
import { identifyService } from "libp2p/identify";
import type { Libp2pComponents } from "./libp2p_components.js"; import { pingService } from "libp2p/ping";
const DEFAULT_NODE_REQUIREMENTS = { const DEFAULT_NODE_REQUIREMENTS = {
lightPush: 1, lightPush: 1,
@ -174,25 +175,31 @@ export function defaultPeerDiscovery(): (
return wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS); return wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS);
} }
type PubsubService = {
pubsub?: (components: Libp2pComponents) => GossipSub;
};
export async function defaultLibp2p( export async function defaultLibp2p(
wakuGossipSub?: (components: Libp2pComponents) => GossipSub, wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<Libp2pOptions>, options?: Partial<Libp2pOptions>,
userAgent?: string userAgent?: string
): Promise<Libp2p> { ): Promise<Libp2p> {
const libp2pOpts = Object.assign( const pubsubService: PubsubService = wakuGossipSub
{ ? { pubsub: wakuGossipSub }
transports: [webSockets({ filter: filterAll })], : {};
streamMuxers: [mplex()],
connectionEncryption: [noise()],
identify: {
host: {
agentVersion: userAgent ?? DefaultUserAgent,
},
},
} as Libp2pOptions,
wakuGossipSub ? { pubsub: wakuGossipSub } : {},
options ?? {}
);
return createLibp2p(libp2pOpts); return createLibp2p({
transports: [webSockets({ filter: filterAll })],
streamMuxers: [mplex()],
connectionEncryption: [noise()],
...options,
services: {
identify: identifyService({
agentVersion: userAgent ?? DefaultUserAgent,
}),
ping: pingService(),
...pubsubService,
...options?.services,
},
}) as any as Libp2p; // TODO: make libp2p include it;
} }

View File

@ -1,39 +0,0 @@
import type { AddressManager } from "@libp2p/interface-address-manager";
import type {
ConnectionGater,
ConnectionProtector,
} from "@libp2p/interface-connection";
import type {
ConnectionManager,
Dialer,
} from "@libp2p/interface-connection-manager";
import type { ContentRouting } from "@libp2p/interface-content-routing";
import type { DualDHT } from "@libp2p/interface-dht";
import type { Metrics } from "@libp2p/interface-metrics";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerRouting } from "@libp2p/interface-peer-routing";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Registrar } from "@libp2p/interface-registrar";
import type { TransportManager, Upgrader } from "@libp2p/interface-transport";
import type { Datastore } from "interface-datastore";
// TODO: Get libp2p to export this.
export interface Libp2pComponents {
peerId: PeerId;
addressManager: AddressManager;
peerStore: PeerStore;
upgrader: Upgrader;
registrar: Registrar;
connectionManager: ConnectionManager;
transportManager: TransportManager;
connectionGater: ConnectionGater;
contentRouting: ContentRouting;
peerRouting: PeerRouting;
datastore: Datastore;
connectionProtector?: ConnectionProtector;
dialer: Dialer;
metrics?: Metrics;
dht?: DualDHT;
pubsub?: PubSub;
}

View File

@ -49,7 +49,7 @@
"node": ">=16" "node": ">=16"
}, },
"dependencies": { "dependencies": {
"@libp2p/peer-id": "^2.0.3", "@libp2p/peer-id": "^2.0.4",
"@waku/core": "*", "@waku/core": "*",
"@waku/enr": "*", "@waku/enr": "*",
"@waku/interfaces": "*", "@waku/interfaces": "*",
@ -59,35 +59,33 @@
"dockerode": "^3.3.5", "dockerode": "^3.3.5",
"p-timeout": "^6.1.0", "p-timeout": "^6.1.0",
"portfinder": "^1.0.32", "portfinder": "^1.0.32",
"sinon": "^15.2.0",
"tail": "^2.2.6" "tail": "^2.2.6"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/bootstrap": "^6.0.3", "@libp2p/bootstrap": "^8.0.0",
"@libp2p/components": "^3.1.1",
"@libp2p/interface-peer-discovery-compliance-tests": "^2.0.8", "@libp2p/interface-peer-discovery-compliance-tests": "^2.0.8",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@types/chai": "^4.3.4", "@types/chai": "^4.3.4",
"@types/dockerode": "^3.3.17", "@types/dockerode": "^3.3.17",
"@types/mocha": "^10.0.1", "@types/mocha": "^10.0.1",
"@types/tail": "^2.2.1", "@types/tail": "^2.2.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/sdk": "*", "@waku/sdk": "*",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.62.0",
"@waku/dns-discovery": "*", "@waku/dns-discovery": "*",
"@waku/message-encryption": "*", "@waku/message-encryption": "*",
"@waku/peer-exchange": "*", "@waku/peer-exchange": "*",
"@waku/sdk": "*",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"debug": "^4.3.4", "debug": "^4.3.4",
"eslint": "^8.41.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"mocha": "^10.2.0", "mocha": "^10.2.0",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"prettier": "^2.8.8", "prettier": "^2.8.8",
"typescript": "^5.0.4" "typescript": "^5.0.4",
"interface-datastore": "^8.2.3",
"libp2p": "^0.45.9",
"datastore-core": "^9.2.0"
} }
} }

View File

@ -170,7 +170,7 @@ export default class Dockerode {
await this.container.stop(); await this.container.stop();
await this.container.remove(); await this.container.remove();
this.containerId = undefined; delete this.containerId;
} }
private async confirmImageExistsOrPull(): Promise<void> { private async confirmImageExistsOrPull(): Promise<void> {

View File

@ -165,7 +165,8 @@ export class NimGoNode {
} }
public async stop(): Promise<void> { public async stop(): Promise<void> {
this.docker?.stop(); await this.docker?.container?.stop();
delete this.docker;
} }
async waitForLog(msg: string, timeout: number): Promise<void> { async waitForLog(msg: string, timeout: number): Promise<void> {

View File

@ -0,0 +1,196 @@
import { CustomEvent } from "@libp2p/interfaces/events";
import { ConnectionManager, KeepAliveOptions } from "@waku/core";
import { LightNode, Tags } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import sinon, { SinonSpy, SinonStub } from "sinon";
import { delay } from "../dist/delay.js";
const KEEP_ALIVE_OPTIONS: KeepAliveOptions = {
pingKeepAlive: 0,
relayKeepAlive: 5 * 1000,
};
const TEST_TIMEOUT = 10_000;
const DELAY_MS = 1_000;
describe("ConnectionManager", function () {
let connectionManager: ConnectionManager | undefined;
let waku: LightNode;
let peerId: string;
let getConnectionsStub: SinonStub;
let getTagNamesForPeerStub: SinonStub;
let dialPeerStub: SinonStub;
beforeEach(async function () {
waku = await createLightNode();
peerId = Math.random().toString(36).substring(7);
connectionManager = ConnectionManager.create(
peerId,
waku.libp2p,
KEEP_ALIVE_OPTIONS
);
});
afterEach(async () => {
await waku.stop();
sinon.restore();
});
describe("attemptDial method", function () {
let attemptDialSpy: SinonSpy;
beforeEach(function () {
attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial");
});
afterEach(function () {
attemptDialSpy.restore();
});
it("should be called on all `peer:discovery` events", async function () {
this.timeout(TEST_TIMEOUT);
const totalPeerIds = 1;
for (let i = 1; i <= totalPeerIds; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: `peer-id-${i}` })
);
}
expect(attemptDialSpy.callCount).to.equal(
totalPeerIds,
"attemptDial should be called once for each peer:discovery event"
);
});
});
describe("dialPeer method", function () {
beforeEach(function () {
getConnectionsStub = sinon.stub(
(connectionManager as any).libp2p,
"getConnections"
);
getTagNamesForPeerStub = sinon.stub(
connectionManager as any,
"getTagNamesForPeer"
);
dialPeerStub = sinon.stub(connectionManager as any, "dialPeer");
});
afterEach(function () {
dialPeerStub.restore();
getTagNamesForPeerStub.restore();
getConnectionsStub.restore();
});
describe("For bootstrap peers", function () {
it("should be called for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
);
// wait for the async function calls within attemptDial to finish
await delay(DELAY_MS);
// check that dialPeer was called once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should be called for bootstrap peers"
);
});
it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// emit first peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
);
// simulate that the peer is connected
getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]);
// emit multiple peer:discovery events
const totalBootstrapPeers = 5;
for (let i = 1; i <= totalBootstrapPeers; i++) {
await delay(500);
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: `bootstrap-peer-id-${i}`,
})
);
}
// check that dialPeer was called only once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should not be called more than once for bootstrap peers"
);
});
});
describe("For peer-exchange peers", function () {
it("should be called for peers with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "px-peer" })
);
// wait for the async function calls within attemptDial to finish
await delay(DELAY_MS);
// check that dialPeer was called once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should be called for peers with PEER_EXCHANGE tags"
);
});
it("should be called for every peer with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
// emit multiple peer:discovery events
const totalPxPeers = 5;
for (let i = 0; i < totalPxPeers; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: `px-peer-id-${i}` })
);
await delay(500);
}
// check that dialPeer was called for each peer with PEER_EXCHANGE tags
expect(dialPeerStub.callCount).to.equal(totalPxPeers);
});
});
});
});

View File

@ -1,6 +1,5 @@
import { Components } from "@libp2p/components";
import tests from "@libp2p/interface-peer-discovery-compliance-tests"; import tests from "@libp2p/interface-peer-discovery-compliance-tests";
import { Peer } from "@libp2p/interface-peer-store"; import { EventEmitter } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { PersistentPeerStore } from "@libp2p/peer-store"; import { PersistentPeerStore } from "@libp2p/peer-store";
import { import {
@ -9,25 +8,28 @@ import {
PeerDiscoveryDns, PeerDiscoveryDns,
wakuDnsDiscovery, wakuDnsDiscovery,
} from "@waku/dns-discovery"; } from "@waku/dns-discovery";
import { Libp2pComponents } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
import { MemoryDatastore } from "datastore-core"; import { MemoryDatastore } from "datastore-core/memory";
import { Datastore } from "interface-datastore";
import { delay } from "../src/delay.js"; import { delay } from "../src/delay.js";
const maxQuantity = 3; const maxQuantity = 3;
describe("DNS Discovery: Compliance Test", async function () { describe("DNS Discovery: Compliance Test", function () {
this.timeout(10000); this.timeout(10000);
tests({ tests({
async setup() { async setup() {
// create libp2p mock peerStore // create libp2p mock peerStore
const components = new Components({ const components = {
peerStore: new PersistentPeerStore({ peerStore: new PersistentPeerStore({
events: new EventEmitter(),
peerId: await createSecp256k1PeerId(), peerId: await createSecp256k1PeerId(),
datastore: new MemoryDatastore(), datastore: new MemoryDatastore() as any as Datastore,
}), }),
}); } as unknown as Libp2pComponents;
return new PeerDiscoveryDns(components, { return new PeerDiscoveryDns(components, {
enrUrls: [enrTree["PROD"]], enrUrls: [enrTree["PROD"]],
@ -69,22 +71,16 @@ describe("DNS Node Discovery [live data]", function () {
await waku.start(); await waku.start();
const allPeers = await waku.libp2p.peerStore.all(); const allPeers = await waku.libp2p.peerStore.all();
let dnsPeers = 0;
const dnsPeers: Peer[] = [];
for (const peer of allPeers) { for (const peer of allPeers) {
const tags = await waku.libp2p.peerStore.getTags(peer.id); const hasTag = peer.tags.has("bootstrap");
let hasTag = false; if (hasTag) {
for (const tag of tags) { dnsPeers += 1;
hasTag = tag.name === "bootstrap";
if (hasTag) {
dnsPeers.push(peer);
break;
}
} }
expect(hasTag).to.be.eq(true); expect(hasTag).to.be.eq(true);
} }
expect(dnsPeers.length).to.eq(maxQuantity); expect(dnsPeers).to.eq(maxQuantity);
}); });
it(`should retrieve ${maxQuantity} multiaddrs for test.waku.nodes.status.im`, async function () { it(`should retrieve ${maxQuantity} multiaddrs for test.waku.nodes.status.im`, async function () {

View File

@ -57,8 +57,8 @@ describe("Waku Light Push [node only]", () => {
afterEach(async function () { afterEach(async function () {
try { try {
nwaku?.stop(); await nwaku?.stop();
waku?.stop(); await waku?.stop();
} catch (e) { } catch (e) {
console.error("Failed to stop nodes: ", e); console.error("Failed to stop nodes: ", e);
} }
@ -124,8 +124,8 @@ describe("Waku Light Push [node only] - custom pubsub topic", () => {
afterEach(async function () { afterEach(async function () {
try { try {
nwaku?.stop(); await nwaku?.stop();
waku?.stop(); await waku?.stop();
} catch (e) { } catch (e) {
console.error("Failed to stop nodes: ", e); console.error("Failed to stop nodes: ", e);
} }

View File

@ -19,49 +19,49 @@ import { makeLogFileName } from "../src/log_file.js";
import { NimGoNode } from "../src/node/node.js"; import { NimGoNode } from "../src/node/node.js";
describe("Peer Exchange", () => { describe("Peer Exchange", () => {
let waku: LightNode; describe("Auto Discovery", function () {
let waku: LightNode;
afterEach(async function () { afterEach(async function () {
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); await waku?.stop();
});
it("Auto discovery", async function () {
// skipping in CI as this test demonstrates Peer Exchange working with the test fleet
// but not with locally run nwaku nodes
if (process.env.CI) {
this.skip();
}
this.timeout(50_000);
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: getPredefinedBootstrapNodes(Fleet.Test, 3) }),
wakuPeerExchangeDiscovery(),
],
},
}); });
await waku.start(); it("connection with fleet nodes", async function () {
// skipping in CI as this test demonstrates Peer Exchange working with the test fleet
// but not with locally run nwaku nodes
if (process.env.CI) {
this.skip();
}
const foundPxPeer = await new Promise<boolean>((resolve) => { this.timeout(50_000);
const testNodes = getPredefinedBootstrapNodes(Fleet.Test, 3);
waku.libp2p.addEventListener("peer:discovery", (evt) => { waku = await createLightNode({
const { multiaddrs } = evt.detail; libp2p: {
multiaddrs.forEach((ma) => { peerDiscovery: [
const isBootstrapNode = testNodes.find((n) => n === ma.toString()); bootstrap({ list: getPredefinedBootstrapNodes(Fleet.Test, 3) }),
wakuPeerExchangeDiscovery(),
],
},
});
await waku.start();
const foundPxPeer = await new Promise<boolean>((resolve) => {
const testNodes = getPredefinedBootstrapNodes(Fleet.Test, 3);
waku.libp2p.addEventListener("peer:discovery", (evt) => {
const peerId = evt.detail.id.toString();
const isBootstrapNode = testNodes.find((n) => n.includes(peerId));
if (!isBootstrapNode) { if (!isBootstrapNode) {
resolve(true); resolve(true);
} }
}); });
}); });
});
expect(foundPxPeer).to.be.true; expect(foundPxPeer).to.be.true;
});
}); });
describe("Locally run nodes", () => { describe("Locally Run Nodes", () => {
let waku: LightNode; let waku: LightNode;
let nwaku1: NimGoNode; let nwaku1: NimGoNode;
let nwaku2: NimGoNode; let nwaku2: NimGoNode;
@ -72,9 +72,10 @@ describe("Peer Exchange", () => {
}); });
afterEach(async function () { afterEach(async function () {
!!nwaku1 && nwaku1.stop(); this.timeout(10_000);
!!nwaku2 && nwaku2.stop(); await nwaku1?.stop();
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); await nwaku2?.stop();
await waku?.stop();
}); });
it("nwaku interop", async function () { it("nwaku interop", async function () {
@ -102,23 +103,7 @@ describe("Peer Exchange", () => {
await waku.start(); await waku.start();
await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
await new Promise<void>((resolve) => { const components = waku.libp2p.components as unknown as Libp2pComponents;
waku.libp2p.peerStore.addEventListener("change:protocols", (evt) => {
if (evt.detail.protocols.includes(PeerExchangeCodec)) {
resolve();
}
});
});
// the forced type casting is done in ref to https://github.com/libp2p/js-libp2p-interfaces/issues/338#issuecomment-1431643645
const { connectionManager, registrar, peerStore } =
waku.libp2p as unknown as Libp2pComponents;
const components = {
connectionManager: connectionManager,
registrar: registrar,
peerStore: peerStore,
};
const peerExchange = new WakuPeerExchange(components); const peerExchange = new WakuPeerExchange(components);
const numPeersToRequest = 1; const numPeersToRequest = 1;
@ -142,11 +127,12 @@ describe("Peer Exchange", () => {
expect(doesPeerIdExistInResponse).to.be.equal(true); expect(doesPeerIdExistInResponse).to.be.equal(true);
expect(waku.libp2p.peerStore.has(await nwaku2.getPeerId())).to.be.true; expect(await waku.libp2p.peerStore.has(await nwaku2.getPeerId())).to.be
.true;
}); });
}); });
describe("compliance test", async function () { describe("Compliance Test", function () {
this.timeout(55_000); this.timeout(55_000);
let waku: LightNode; let waku: LightNode;
@ -175,35 +161,26 @@ describe("Peer Exchange", () => {
discv5BootstrapNode: enr, discv5BootstrapNode: enr,
}); });
waku = await createLightNode(); waku = await createLightNode({
libp2p: {
peerDiscovery: [wakuPeerExchangeDiscovery()],
},
});
const peerExchange = waku.libp2p.components["components"][
"peer-discovery-0"
] as PeerExchangeDiscovery;
await waku.start(); await waku.start();
const nwaku2Ma = await nwaku2.getMultiaddrWithId(); const nwaku2Ma = await nwaku2.getMultiaddrWithId();
await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
await new Promise<void>((resolve) => {
waku.libp2p.peerStore.addEventListener("change:protocols", (evt) => {
if (evt.detail.protocols.includes(PeerExchangeCodec)) {
resolve();
}
});
});
// the forced type casting is done in ref to https://github.com/libp2p/js-libp2p-interfaces/issues/338#issuecomment-1431643645 return peerExchange;
const { connectionManager, registrar, peerStore } =
waku.libp2p as unknown as Libp2pComponents;
const components = {
connectionManager: connectionManager,
registrar: registrar,
peerStore: peerStore,
};
return new PeerExchangeDiscovery(components);
}, },
teardown: async () => { teardown: async () => {
!!nwaku1 && (await nwaku1.stop()); await nwaku1?.stop();
!!nwaku2 && (await nwaku2.stop()); await nwaku2?.stop();
!!waku && (await waku.stop()); await waku?.stop();
}, },
}); });
}); });

View File

@ -67,10 +67,9 @@ describe("Waku Relay [node only]", () => {
}).then((waku) => waku.start().then(() => waku)), }).then((waku) => waku.start().then(() => waku)),
]); ]);
log("Instances started, adding waku2 to waku1's address book"); log("Instances started, adding waku2 to waku1's address book");
await waku1.libp2p.peerStore.addressBook.set( await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
waku2.libp2p.peerId, multiaddrs: waku2.libp2p.getMultiaddrs(),
waku2.libp2p.getMultiaddrs() });
);
await waku1.dial(waku2.libp2p.peerId); await waku1.dial(waku2.libp2p.peerId);
log("Wait for mutual pubsub subscription"); log("Wait for mutual pubsub subscription");
@ -90,11 +89,11 @@ describe("Waku Relay [node only]", () => {
it("Subscribe", async function () { it("Subscribe", async function () {
log("Getting subscribers"); log("Getting subscribers");
const subscribers1 = waku1.libp2p.pubsub const subscribers1 = waku1.libp2p.services
.getSubscribers(DefaultPubSubTopic) .pubsub!.getSubscribers(DefaultPubSubTopic)
.map((p) => p.toString()); .map((p) => p.toString());
const subscribers2 = waku2.libp2p.pubsub const subscribers2 = waku2.libp2p.services
.getSubscribers(DefaultPubSubTopic) .pubsub!.getSubscribers(DefaultPubSubTopic)
.map((p) => p.toString()); .map((p) => p.toString());
log("Asserting mutual subscription"); log("Asserting mutual subscription");
@ -121,7 +120,7 @@ describe("Waku Relay [node only]", () => {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise( const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.subscribe([TestDecoder], resolve); void waku2.relay.subscribe([TestDecoder], resolve);
} }
); );
@ -152,12 +151,12 @@ describe("Waku Relay [node only]", () => {
const barDecoder = createDecoder(barContentTopic); const barDecoder = createDecoder(barContentTopic);
const fooMessages: DecodedMessage[] = []; const fooMessages: DecodedMessage[] = [];
waku2.relay.subscribe([fooDecoder], (msg) => { void waku2.relay.subscribe([fooDecoder], (msg) => {
fooMessages.push(msg); fooMessages.push(msg);
}); });
const barMessages: DecodedMessage[] = []; const barMessages: DecodedMessage[] = [];
waku2.relay.subscribe([barDecoder], (msg) => { void waku2.relay.subscribe([barDecoder], (msg) => {
barMessages.push(msg); barMessages.push(msg);
}); });
@ -207,10 +206,10 @@ describe("Waku Relay [node only]", () => {
const symDecoder = createSymDecoder(symTopic, symKey); const symDecoder = createSymDecoder(symTopic, symKey);
const msgs: DecodedMessage[] = []; const msgs: DecodedMessage[] = [];
waku2.relay.subscribe([eciesDecoder], (wakuMsg) => { void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => {
msgs.push(wakuMsg); msgs.push(wakuMsg);
}); });
waku2.relay.subscribe([symDecoder], (wakuMsg) => { void waku2.relay.subscribe([symDecoder], (wakuMsg) => {
msgs.push(wakuMsg); msgs.push(wakuMsg);
}); });
@ -291,14 +290,12 @@ describe("Waku Relay [node only]", () => {
}).then((waku) => waku.start().then(() => waku)), }).then((waku) => waku.start().then(() => waku)),
]); ]);
await waku1.libp2p.peerStore.addressBook.set( await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
waku2.libp2p.peerId, multiaddrs: waku2.libp2p.getMultiaddrs(),
waku2.libp2p.getMultiaddrs() });
); await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
await waku3.libp2p.peerStore.addressBook.set( multiaddrs: waku2.libp2p.getMultiaddrs(),
waku2.libp2p.peerId, });
waku2.libp2p.getMultiaddrs()
);
await Promise.all([ await Promise.all([
waku1.dial(waku2.libp2p.peerId), waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId), waku3.dial(waku2.libp2p.peerId),
@ -313,7 +310,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise( const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.subscribe([TestDecoder], resolve); void waku2.relay.subscribe([TestDecoder], resolve);
} }
); );
@ -321,7 +318,7 @@ describe("Waku Relay [node only]", () => {
// pubsub topic. // pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise( const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => { (resolve, reject) => {
waku3.relay.subscribe([TestDecoder], reject); void waku3.relay.subscribe([TestDecoder], reject);
setTimeout(resolve, 1000); setTimeout(resolve, 1000);
} }
); );
@ -356,10 +353,9 @@ describe("Waku Relay [node only]", () => {
}).then((waku) => waku.start().then(() => waku)), }).then((waku) => waku.start().then(() => waku)),
]); ]);
await waku1.libp2p.peerStore.addressBook.set( await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
waku2.libp2p.peerId, multiaddrs: waku2.libp2p.getMultiaddrs(),
waku2.libp2p.getMultiaddrs() });
);
await Promise.all([waku1.dial(waku2.libp2p.peerId)]); await Promise.all([waku1.dial(waku2.libp2p.peerId)]);
await Promise.all([ await Promise.all([
@ -369,7 +365,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise( const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.subscribe([TestDecoder], () => void waku2.relay.subscribe([TestDecoder], () =>
resolve({ resolve({
payload: new Uint8Array([]), payload: new Uint8Array([]),
} as DecodedMessage) } as DecodedMessage)
@ -428,7 +424,8 @@ describe("Waku Relay [node only]", () => {
while (subscribers.length === 0) { while (subscribers.length === 0) {
await delay(200); await delay(200);
subscribers = waku.libp2p.pubsub.getSubscribers(DefaultPubSubTopic); subscribers =
waku.libp2p.services.pubsub!.getSubscribers(DefaultPubSubTopic);
} }
const nimPeerId = await nwaku.getPeerId(); const nimPeerId = await nwaku.getPeerId();
@ -463,7 +460,7 @@ describe("Waku Relay [node only]", () => {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise( const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) => void waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) =>
resolve(msg) resolve(msg)
); );
} }
@ -535,7 +532,7 @@ describe("Waku Relay [node only]", () => {
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise( const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.subscribe(TestDecoder, resolve); void waku2.relay.subscribe(TestDecoder, resolve);
} }
); );

View File

@ -90,7 +90,8 @@ describe("Wait for remote peer", function () {
done(); done();
} }
); );
}); })
.catch((e) => done(e));
}); });
it("Store - dialed first", async function () { it("Store - dialed first", async function () {

View File

@ -63,7 +63,7 @@ describe("Waku Dial [node only]", function () {
let nwaku: NimGoNode; let nwaku: NimGoNode;
afterEach(async function () { afterEach(async function () {
!!nwaku && nwaku.stop(); !!nwaku && (await nwaku.stop());
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
}); });
@ -83,7 +83,7 @@ describe("Waku Dial [node only]", function () {
const connectedPeerID: PeerId = await new Promise((resolve) => { const connectedPeerID: PeerId = await new Promise((resolve) => {
waku.libp2p.addEventListener("peer:connect", (evt) => { waku.libp2p.addEventListener("peer:connect", (evt) => {
resolve(evt.detail.remotePeer); resolve(evt.detail);
}); });
}); });
@ -108,7 +108,7 @@ describe("Waku Dial [node only]", function () {
const connectedPeerID: PeerId = await new Promise((resolve) => { const connectedPeerID: PeerId = await new Promise((resolve) => {
waku.libp2p.addEventListener("peer:connect", (evt) => { waku.libp2p.addEventListener("peer:connect", (evt) => {
resolve(evt.detail.remotePeer); resolve(evt.detail);
}); });
}); });
@ -139,10 +139,9 @@ describe("Decryption Keys", () => {
}).then((waku) => waku.start().then(() => waku)), }).then((waku) => waku.start().then(() => waku)),
]); ]);
await waku1.libp2p.peerStore.addressBook.set( await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
waku2.libp2p.peerId, multiaddrs: waku2.libp2p.getMultiaddrs(),
waku2.libp2p.getMultiaddrs() });
);
await waku1.dial(waku2.libp2p.peerId); await waku1.dial(waku2.libp2p.peerId);
await Promise.all([ await Promise.all([
@ -175,7 +174,7 @@ describe("Decryption Keys", () => {
const receivedMsgPromise: Promise<DecodedMessage> = new Promise( const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => { (resolve) => {
waku2.relay.subscribe([decoder], resolve); void waku2.relay.subscribe([decoder], resolve);
} }
); );
@ -214,22 +213,21 @@ describe("User Agent", () => {
}).then((waku) => waku.start().then(() => waku)), }).then((waku) => waku.start().then(() => waku)),
]); ]);
await waku1.libp2p.peerStore.addressBook.set( await waku1.libp2p.peerStore.save(waku2.libp2p.peerId, {
waku2.libp2p.peerId, multiaddrs: waku2.libp2p.getMultiaddrs(),
waku2.libp2p.getMultiaddrs() });
);
await waku1.dial(waku2.libp2p.peerId); await waku1.dial(waku2.libp2p.peerId);
await waitForRemotePeer(waku1); await waitForRemotePeer(waku1);
const [waku1PeerInfo, waku2PeerInfo] = await Promise.all([ const [waku1PeerInfo, waku2PeerInfo] = await Promise.all([
waku2.libp2p.peerStore.metadataBook.get(waku1.libp2p.peerId), waku2.libp2p.peerStore.get(waku1.libp2p.peerId),
waku1.libp2p.peerStore.metadataBook.get(waku2.libp2p.peerId), waku1.libp2p.peerStore.get(waku2.libp2p.peerId),
]); ]);
expect(bytesToUtf8(waku1PeerInfo.get("AgentVersion")!)).to.eq( expect(bytesToUtf8(waku1PeerInfo.metadata.get("AgentVersion")!)).to.eq(
waku1UserAgent waku1UserAgent
); );
expect(bytesToUtf8(waku2PeerInfo.get("AgentVersion")!)).to.eq( expect(bytesToUtf8(waku2PeerInfo.metadata.get("AgentVersion")!)).to.eq(
DefaultUserAgent DefaultUserAgent
); );
}); });

View File

@ -69,23 +69,15 @@
"uint8arrays": "^4.0.3" "uint8arrays": "^4.0.3"
}, },
"devDependencies": { "devDependencies": {
"@libp2p/interface-connection": "^3.0.8", "@libp2p/interface-connection": "^5.1.1",
"@libp2p/interface-peer-id": "^2.0.1", "@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-store": "^1.2.8", "@libp2p/interface-peer-store": "^2.0.4",
"@rollup/plugin-commonjs": "^24.0.1", "@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.0.2", "@rollup/plugin-node-resolve": "^15.0.2",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^5.59.8",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"@waku/interfaces": "0.0.15", "@waku/interfaces": "0.0.15",
"cspell": "^6.31.1", "cspell": "^6.31.1",
"eslint": "^8.35.0",
"eslint-config-prettier": "^8.6.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^5.0.4",
"eslint-plugin-import": "^2.27.5",
"eslint-plugin-prettier": "^4.2.1",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"prettier": "^2.8.8", "prettier": "^2.8.8",
"rollup": "^3.21.3", "rollup": "^3.21.3",