chore: refactor peer-exchange according to nwaku 0.15.0 (#1193)

* merge with master: adhere acc to updated API

* bump go-waku to 0.5.2
This commit is contained in:
Danish Arora 2023-03-14 13:56:15 +05:30 committed by GitHub
parent a30b2bd747
commit a20b797c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 4042 additions and 3083 deletions

View File

@ -98,7 +98,7 @@ jobs:
node_with_go_waku: node_with_go_waku:
runs-on: ubuntu-latest runs-on: ubuntu-latest
env: env:
GO_WAKU_VERSION: "0.5.1" GO_WAKU_VERSION: "0.5.2"
WAKU_SERVICE_NODE_DIR: ../../go-waku WAKU_SERVICE_NODE_DIR: ../../go-waku
WAKU_SERVICE_NODE_BIN: ./waku WAKU_SERVICE_NODE_BIN: ./waku
WAKU_SERVICE_NODE_PARAMS: "--min-relay-peers-to-publish=0" # Can be removed once https://github.com/status-im/nwaku/issues/1004 is done WAKU_SERVICE_NODE_PARAMS: "--min-relay-peers-to-publish=0" # Can be removed once https://github.com/status-im/nwaku/issues/1004 is done

6901
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +1,12 @@
import type { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerStore } from "@libp2p/interface-peer-store"; import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Registrar } from "@libp2p/interface-registrar";
import { IEnr } from "./enr.js"; import { IEnr } from "./enr.js";
import { PointToPointProtocol } from "./protocols.js"; import { PointToPointProtocol } from "./protocols.js";
export interface IPeerExchange extends PointToPointProtocol { export interface IPeerExchange extends PointToPointProtocol {
query( query(params: PeerExchangeQueryParams): Promise<PeerInfo[]>;
params: PeerExchangeQueryParams,
callback: (response: PeerExchangeResponse) => Promise<void> | void
): Promise<void>;
} }
export interface PeerExchangeQueryParams { export interface PeerExchangeQueryParams {
@ -29,5 +25,4 @@ export interface PeerInfo {
export interface PeerExchangeComponents { export interface PeerExchangeComponents {
connectionManager: ConnectionManager; connectionManager: ConnectionManager;
peerStore: PeerStore; peerStore: PeerStore;
registrar: Registrar;
} }

View File

@ -1,20 +1,17 @@
import type { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerStore } from "@libp2p/interface-peer-store"; import type { PeerStore } from "@libp2p/interface-peer-store";
import type {
IncomingStreamData,
Registrar,
} from "@libp2p/interface-registrar";
import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { EnrDecoder } from "@waku/enr"; import { EnrDecoder } from "@waku/enr";
import type { import type {
IPeerExchange, IPeerExchange,
PeerExchangeQueryParams, PeerExchangeQueryParams,
PeerExchangeResponse, PeerInfo,
} from "@waku/interfaces"; } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { PeerExchangeRPC } from "./rpc.js"; import { PeerExchangeRPC } from "./rpc.js";
@ -24,7 +21,6 @@ const log = debug("waku:peer-exchange");
export interface PeerExchangeComponents { export interface PeerExchangeComponents {
peerStore: PeerStore; peerStore: PeerStore;
registrar: Registrar;
connectionManager: ConnectionManager; connectionManager: ConnectionManager;
} }
@ -32,9 +28,7 @@ export interface PeerExchangeComponents {
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/) * Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/ */
export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
private callback: multicodec: string;
| ((response: PeerExchangeResponse) => Promise<void>)
| undefined;
/** /**
* @param components - libp2p components * @param components - libp2p components
@ -47,20 +41,13 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
components.connectionManager components.connectionManager
) )
); );
this.components.registrar this.multicodec = PeerExchangeCodec;
.handle(PeerExchangeCodec, this.handler.bind(this))
.catch((e) => log("Failed to register peer exchange protocol", e));
} }
/** /**
* Make a peer exchange query to a peer * Make a peer exchange query to a peer
*/ */
async query( async query(params: PeerExchangeQueryParams): Promise<PeerInfo[]> {
params: PeerExchangeQueryParams,
callback: (response: PeerExchangeResponse) => Promise<void>
): Promise<void> {
this.callback = callback;
const { numPeers } = params; const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({ const rpcQuery = PeerExchangeRPC.createRequest({
@ -71,22 +58,20 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
const stream = await this.newStream(peer); const stream = await this.newStream(peer);
await pipe( const res = await pipe(
[rpcQuery.encode()], [rpcQuery.encode()],
lp.encode(), lp.encode(),
stream, stream,
lp.decode(), lp.decode(),
async (source) => await all(source) async (source) => await all(source)
); );
}
/** try {
* Handle a peer exchange query response const bytes = new Uint8ArrayList();
*/ res.forEach((chunk) => {
private handler(streamData: IncomingStreamData): void { bytes.append(chunk);
const { stream } = streamData; });
pipe(stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const decoded = PeerExchangeRPC.decode(bytes).response; const decoded = PeerExchangeRPC.decode(bytes).response;
if (!decoded) { if (!decoded) {
@ -105,11 +90,11 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
}; };
}); });
if (!this.callback) throw new Error("Callback not set"); return peerInfos;
} catch (err) {
await this.callback({ peerInfos }); log("Failed to decode push reply", err);
throw new Error("Failed to decode push reply");
} }
}).catch((err) => log("Failed to handle peer exchange request", err));
} }
} }

View File

@ -149,14 +149,11 @@ export class PeerExchangeDiscovery
}, queryInterval * currentAttempt); }, queryInterval * currentAttempt);
}; };
private query(peerId: PeerId): Promise<void> { private async query(peerId: PeerId): Promise<void> {
return this.peerExchange.query( const peerInfos = await this.peerExchange.query({
{
numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
peerId, peerId,
}, });
async (response) => {
const { peerInfos } = response;
for (const _peerInfo of peerInfos) { for (const _peerInfo of peerInfos) {
const { ENR } = _peerInfo; const { ENR } = _peerInfo;
@ -165,25 +162,21 @@ export class PeerExchangeDiscovery
continue; continue;
} }
const peerInfo = ENR.peerInfo; const { peerId, peerInfo } = ENR;
if (!peerId || !peerInfo) continue;
const { multiaddrs } = peerInfo;
if ( if (
!peerInfo || (await this.components.peerStore.getTags(peerId)).find(
!peerInfo.id ||
!peerInfo.multiaddrs ||
!peerInfo.multiaddrs.length
)
continue;
if (
(await this.components.peerStore.getTags(peerInfo.id)).find(
({ name }) => name === DEFAULT_PEER_EXCHANGE_TAG_NAME ({ name }) => name === DEFAULT_PEER_EXCHANGE_TAG_NAME
) )
) )
continue; continue;
await this.components.peerStore.tagPeer( await this.components.peerStore.tagPeer(
peerInfo.id, peerId,
DEFAULT_PEER_EXCHANGE_TAG_NAME, DEFAULT_PEER_EXCHANGE_TAG_NAME,
{ {
value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE, value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE,
@ -193,13 +186,15 @@ export class PeerExchangeDiscovery
this.dispatchEvent( this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { new CustomEvent<PeerInfo>("peer", {
detail: peerInfo, detail: {
id: peerId,
multiaddrs,
protocols: [],
},
}) })
); );
} }
} }
);
}
private abortQueriesForPeer(peerIdStr: string): void { private abortQueriesForPeer(peerIdStr: string): void {
log(`Aborting queries for peer: ${peerIdStr}`); log(`Aborting queries for peer: ${peerIdStr}`);

View File

@ -4,7 +4,7 @@ import {
getPredefinedBootstrapNodes, getPredefinedBootstrapNodes,
} from "@waku/core/lib/predefined_bootstrap_nodes"; } from "@waku/core/lib/predefined_bootstrap_nodes";
import { createLightNode } from "@waku/create"; import { createLightNode } from "@waku/create";
import type { LightNode, PeerExchangeResponse } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces";
import { import {
PeerExchangeCodec, PeerExchangeCodec,
WakuPeerExchange, WakuPeerExchange,
@ -18,19 +18,17 @@ import { Nwaku } from "../src/nwaku.js";
describe("Peer Exchange", () => { describe("Peer Exchange", () => {
let waku: LightNode; let waku: LightNode;
before(async function () {
// skipping in CI as this test demonstrates Peer Exchange working with the test fleet
// but not with locally run nwaku nodes
if (process.env.CI) {
this.skip();
}
});
afterEach(async function () { afterEach(async function () {
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
}); });
it("Auto discovery", async function () { it("Auto discovery", async function () {
// skipping in CI as this test demonstrates Peer Exchange working with the test fleet
// but not with locally run nwaku nodes
if (process.env.CI) {
this.skip();
}
this.timeout(50_000); this.timeout(50_000);
waku = await createLightNode({ waku = await createLightNode({
@ -77,7 +75,7 @@ describe("Peer Exchange", () => {
}); });
it("nwaku interop", async function () { it("nwaku interop", async function () {
this.timeout(25_000); this.timeout(15_000);
await nwaku1.start({ await nwaku1.start({
discv5Discovery: true, discv5Discovery: true,
@ -115,45 +113,28 @@ describe("Peer Exchange", () => {
// @ts-ignore // @ts-ignore
connectionManager: waku.libp2p.connectionManager, connectionManager: waku.libp2p.connectionManager,
peerStore: waku.libp2p.peerStore, peerStore: waku.libp2p.peerStore,
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
registrar: waku.libp2p.registrar,
}); });
let receivedCallback = false;
const numPeersToRequest = 1; const numPeersToRequest = 1;
const callback = async (
response: PeerExchangeResponse const peerInfos = await peerExchange.query({
): Promise<void> => { numPeers: numPeersToRequest,
const doesMultiaddrExist = response.peerInfos.find( });
expect(peerInfos.length).to.be.greaterThan(0);
expect(peerInfos.length).to.be.lessThanOrEqual(numPeersToRequest);
expect(peerInfos[0].ENR).to.not.be.null;
const doesMultiaddrExist =
peerInfos.find(
(peerInfo) => (peerInfo) =>
peerInfo.ENR?.getFullMultiaddrs()?.find((multiaddr) => peerInfo.ENR?.getFullMultiaddrs()?.find((multiaddr) =>
multiaddr.equals(nwaku1Ma) multiaddr.equals(nwaku1Ma)
) !== undefined ) !== undefined
); ) !== undefined;
expect(response.peerInfos.length).to.be.greaterThan(0);
expect(response.peerInfos.length).to.be.lessThanOrEqual(
numPeersToRequest
);
expect(response.peerInfos[0].ENR).to.not.be.null;
expect(doesMultiaddrExist).to.be.equal(true); expect(doesMultiaddrExist).to.be.equal(true);
expect(waku.libp2p.peerStore.has(await nwaku2.getPeerId())).to.be.true; expect(waku.libp2p.peerStore.has(await nwaku2.getPeerId())).to.be.true;
receivedCallback = true;
};
await peerExchange.query(
{
numPeers: numPeersToRequest,
},
callback
);
expect(receivedCallback).to.be.true;
}); });
}); });
}); });