test: peer exchange tests (#1859)

* peer exchange tests

* adjust after test with nwaku 25
This commit is contained in:
Florin Barbu 2024-02-27 09:54:34 +02:00 committed by GitHub
parent 44dc47c2b1
commit df8c0d79f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 473 additions and 180 deletions

View File

@ -7,7 +7,7 @@ const log = new Logger("test:mocha-hook");
function withGracefulTimeout(
asyncOperation: () => Promise<void>,
doneCallback: (error?: unknown) => void,
timeoutDuration: number = MOCHA_HOOK_MAX_TIMEOUT
timeoutDuration = MOCHA_HOOK_MAX_TIMEOUT
): void {
let operationCompleted = false;
@ -45,24 +45,26 @@ function withGracefulTimeout(
export const beforeEachCustom = function (
suite: Suite,
cb: () => Promise<void>
cb: () => Promise<void>,
timeout = MOCHA_HOOK_MAX_TIMEOUT
): void {
const timeoutBefore = suite.timeout();
suite.timeout(MOCHA_HOOK_MAX_TIMEOUT);
suite.timeout(timeout);
suite.beforeEach((done) => {
withGracefulTimeout(cb, done);
withGracefulTimeout(cb, done, timeout);
});
suite.timeout(timeoutBefore); // restore timeout to the original value
};
export const afterEachCustom = function (
suite: Suite,
cb: () => Promise<void>
cb: () => Promise<void>,
timeout = MOCHA_HOOK_MAX_TIMEOUT
): void {
const timeoutBefore = suite.timeout();
suite.timeout(MOCHA_HOOK_MAX_TIMEOUT);
suite.timeout(timeout);
suite.afterEach((done) => {
withGracefulTimeout(cb, done);
withGracefulTimeout(cb, done, timeout);
});
suite.timeout(timeoutBefore); // restore timeout to the original value
};

View File

@ -0,0 +1,64 @@
import tests from "@libp2p/interface-compliance-tests/peer-discovery";
import type { LightNode } from "@waku/interfaces";
import { PeerExchangeCodec, PeerExchangeDiscovery } from "@waku/peer-exchange";
import { createLightNode } from "@waku/sdk";
import { singleShardInfoToPubsubTopic } from "@waku/utils";
import {
beforeEachCustom,
makeLogFileName,
ServiceNode,
tearDownNodes
} from "../../src/index.js";
const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })];
describe("Peer Exchange", function () {
describe("Compliance Test", function () {
this.timeout(100_000);
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
beforeEachCustom(this, async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
});
tests({
async setup() {
await nwaku1.start({
relay: true,
discv5Discovery: true,
peerExchange: true
});
const enr = (await nwaku1.info()).enrUri;
await nwaku2.start({
relay: true,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr
});
waku = await createLightNode();
await waku.start();
const nwaku2Ma = await nwaku2.getMultiaddrWithId();
// we do this because we want peer-exchange discovery to get initialised before we dial the peer which contains info about the other peer
setTimeout(() => {
void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
}, 1000);
return new PeerExchangeDiscovery(waku.libp2p.components, pubsubTopic);
},
teardown: async () => {
this.timeout(15000);
await tearDownNodes([nwaku1, nwaku2], waku);
}
});
});
});

View File

@ -0,0 +1,236 @@
import { bootstrap } from "@libp2p/bootstrap";
import type { PeerId } from "@libp2p/interface";
import type { LightNode, PeersByDiscoveryResult } from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { createLightNode, Tags } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import Sinon, { SinonSpy } from "sinon";
import {
afterEachCustom,
beforeEachCustom,
makeLogFileName,
ServiceNode,
tearDownNodes
} from "../../src/index.js";
export const log = new Logger("test:pe");
const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })];
describe("Peer Exchange", function () {
this.timeout(150_000);
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
let nwaku3: ServiceNode;
let dialPeerSpy: SinonSpy;
let nwaku1PeerId: PeerId;
beforeEachCustom(this, async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
await nwaku1.start({
pubsubTopic: pubsubTopic,
discv5Discovery: true,
peerExchange: true,
relay: true
});
await nwaku2.start({
pubsubTopic: pubsubTopic,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true
});
nwaku1PeerId = await nwaku1.getPeerId();
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku1, nwaku2, nwaku3], waku);
});
it("getPeersByDiscovery", async function () {
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery(pubsubTopic)
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
resolve();
}
}
})();
});
});
expect(dialPeerSpy.callCount).to.equal(1);
const peers_after = <PeersByDiscoveryResult>(
await waku.connectionManager.getPeersByDiscovery()
);
const discovered_peer_exchange = peers_after.DISCOVERED[Tags.PEER_EXCHANGE];
const discovered_bootstram = peers_after.DISCOVERED[Tags.BOOTSTRAP];
const connected_peer_exchange = peers_after.CONNECTED[Tags.PEER_EXCHANGE];
const connected_bootstram = peers_after.CONNECTED[Tags.BOOTSTRAP];
expect(discovered_peer_exchange.length).to.eq(1);
expect(discovered_peer_exchange[0].id.toString()).to.eq(
nwaku1PeerId.toString()
);
expect(discovered_peer_exchange[0].tags.has("peer-exchange")).to.be.true;
expect(discovered_bootstram.length).to.eq(1);
expect(connected_peer_exchange.length).to.eq(0);
expect(connected_bootstram.length).to.eq(1);
});
// will be skipped until https://github.com/waku-org/js-waku/issues/1860 is fixed
it.skip("new peer added after a peer was already found", async function () {
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery(pubsubTopic)
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
resolve();
}
}
})();
});
});
nwaku3 = new ServiceNode(makeLogFileName(this) + "3");
await nwaku3.start({
pubsubTopic: pubsubTopic,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true,
lightpush: true,
filter: true
});
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 2) {
resolve();
}
}
})();
});
});
});
// will be skipped until https://github.com/waku-org/js-waku/issues/1858 is fixed
it.skip("wrong wakuPeerExchangeDiscovery pubsub topic", async function () {
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery(["wrong"])
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
const timeoutId = setTimeout(() => {
resolve();
}, 40000);
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
clearTimeout(timeoutId);
resolve();
}
}
})();
});
});
expect(
pxPeersDiscovered.size,
"No peer should have been discovered"
).to.equal(0);
});
it("peerDiscovery without wakuPeerExchangeDiscovery", async function () {
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] })
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
const timeoutId = setTimeout(() => {
resolve();
}, 40000);
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
clearTimeout(timeoutId);
resolve();
}
}
})();
});
});
expect(
pxPeersDiscovered.size,
"No peer should have been discovered"
).to.equal(0);
});
});

View File

@ -8,7 +8,7 @@ import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { createLightNode, DefaultPubsubTopic } from "@waku/sdk";
import { expect } from "chai";
import { afterEachCustom, tearDownNodes } from "../src";
import { afterEachCustom, tearDownNodes } from "../../src";
describe("Peer Exchange", () => {
describe("Auto Discovery", function () {

View File

@ -0,0 +1,163 @@
import { bootstrap } from "@libp2p/bootstrap";
import type { PeerId } from "@libp2p/interface";
import { multiaddr } from "@multiformats/multiaddr";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { LightNode, PeerInfo } from "@waku/interfaces";
import {
PeerExchangeCodec,
WakuPeerExchange,
wakuPeerExchangeDiscovery
} from "@waku/peer-exchange";
import { createLightNode, Libp2pComponents } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
makeLogFileName,
ServiceNode,
tearDownNodes,
waitForRemotePeerWithCodec
} from "../../src/index.js";
export const log = new Logger("test:pe");
const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })];
describe("Peer Exchange Query", function () {
this.timeout(30_000);
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
let nwaku3: ServiceNode;
let nwaku1PeerId: PeerId;
let nwaku3MA: Multiaddr;
let nwaku3PeerId: PeerId;
let components: Libp2pComponents;
let peerExchange: WakuPeerExchange;
let numPeersToRequest: number;
let peerInfos: PeerInfo[];
beforeEachCustom(
this,
async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3");
await nwaku1.start({
pubsubTopic: pubsubTopic,
discv5Discovery: true,
peerExchange: true,
relay: true
});
nwaku1PeerId = await nwaku1.getPeerId();
await nwaku2.start({
pubsubTopic: pubsubTopic,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true
});
await nwaku3.start({
pubsubTopic: pubsubTopic,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku2.info()).enrUri,
relay: true
});
nwaku3MA = await nwaku3.getMultiaddrWithId();
nwaku3PeerId = await nwaku3.getPeerId();
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3MA.toString()] }),
wakuPeerExchangeDiscovery(pubsubTopic)
]
}
});
await waku.start();
await waku.libp2p.dialProtocol(nwaku3MA, PeerExchangeCodec);
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku3PeerId);
components = waku.libp2p.components as unknown as Libp2pComponents;
peerExchange = new WakuPeerExchange(components, pubsubTopic);
numPeersToRequest = 2;
// querying the connected peer
peerInfos = [];
while (peerInfos.length != numPeersToRequest) {
try {
peerInfos = (await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: numPeersToRequest
})) as PeerInfo[];
} catch (error) {
log.error("Error encountered, retrying...");
}
await delay(2000);
}
},
100000
);
afterEachCustom(this, async () => {
await tearDownNodes([nwaku1, nwaku2, nwaku3], waku);
});
it("connected peers and dial", async function () {
expect(peerInfos[0].ENR).to.not.be.null;
expect(peerInfos[0].ENR?.peerInfo?.multiaddrs).to.not.be.null;
const peerWsMA = peerInfos[0].ENR?.peerInfo?.multiaddrs[2];
const localPeerWsMAAsString = peerWsMA
?.toString()
.replace(/\/ip4\/[\d.]+\//, "/ip4/127.0.0.1/");
const localPeerWsMA = multiaddr(localPeerWsMAAsString);
let foundNodePeerId: PeerId | undefined = undefined;
const doesPeerIdExistInResponse = peerInfos.some(({ ENR }) => {
foundNodePeerId = ENR?.peerInfo?.id;
return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString();
});
if (!foundNodePeerId) {
throw new Error("Peer1 ID not found");
}
expect(doesPeerIdExistInResponse, "peer not found").to.be.equal(true);
await waku.libp2p.dialProtocol(localPeerWsMA, PeerExchangeCodec);
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, foundNodePeerId);
});
it("more peers than existing", async function () {
const peerInfo = await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: 5
});
expect(peerInfo?.length).to.be.eq(numPeersToRequest);
});
it("less peers than existing", async function () {
const peerInfo = await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: 1
});
expect(peerInfo?.length).to.be.eq(1);
});
it("non connected peers", async function () {
// querying the non connected peer
try {
await peerExchange.query({
peerId: nwaku1PeerId,
numPeers: numPeersToRequest
});
throw new Error("Query on not connected peer succeeded unexpectedly.");
} catch (error) {
if (!(error instanceof Error && error.message === "Not Found")) {
throw error;
}
}
});
});

View File

@ -1,172 +0,0 @@
import type { PeerId } from "@libp2p/interface";
import tests from "@libp2p/interface-compliance-tests/peer-discovery";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { LightNode, PeerInfo } from "@waku/interfaces";
import {
PeerExchangeCodec,
PeerExchangeDiscovery,
WakuPeerExchange
} from "@waku/peer-exchange";
import {
createLightNode,
DEFAULT_CLUSTER_ID,
DefaultPubsubTopic,
Libp2pComponents
} from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
makeLogFileName,
ServiceNode,
tearDownNodes,
waitForRemotePeerWithCodec
} from "../src/index.js";
describe("Peer Exchange", function () {
describe("Locally Run Nodes", () => {
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
beforeEachCustom(this, async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
});
afterEachCustom(this, async () => {
await tearDownNodes([nwaku1, nwaku2], waku);
});
it.skip("nwaku interop", async function () {
this.timeout(100_000);
await nwaku1.start({
relay: true,
discv5Discovery: true,
peerExchange: true,
clusterId: DEFAULT_CLUSTER_ID
});
const enr = (await nwaku1.info()).enrUri;
await nwaku2.start({
relay: true,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr,
clusterId: DEFAULT_CLUSTER_ID
});
const nwaku1PeerId = await nwaku1.getPeerId();
const nwaku2PeerId = await nwaku2.getPeerId();
const nwaku2Ma = await nwaku2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: { shards: [0] } });
await waku.start();
await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku2PeerId);
const components = waku.libp2p.components as unknown as Libp2pComponents;
const peerExchange = new WakuPeerExchange(components, [
DefaultPubsubTopic
]);
const numPeersToRequest = 1;
let peerInfos: PeerInfo[] = [];
while (peerInfos.length <= 0) {
peerInfos = (await peerExchange.query({
peerId: nwaku2PeerId,
numPeers: numPeersToRequest
})) as PeerInfo[];
await delay(3000);
}
expect(peerInfos.length).to.be.greaterThan(0);
expect(peerInfos.length).to.be.lessThanOrEqual(numPeersToRequest);
expect(peerInfos[0].ENR).to.not.be.null;
expect(peerInfos[0].ENR?.peerInfo?.multiaddrs).to.not.be.null;
let foundNodeMultiaddrs: Multiaddr[] = [];
let foundNodePeerId: PeerId | undefined = undefined;
const doesPeerIdExistInResponse = peerInfos.some(({ ENR }) => {
foundNodeMultiaddrs = ENR?.peerInfo?.multiaddrs ?? [];
foundNodePeerId = ENR?.peerInfo?.id;
return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString();
});
if (!foundNodePeerId) {
throw new Error("Peer ID not found");
}
if (!foundNodePeerId) {
throw new Error("Peer ID not found");
}
expect(doesPeerIdExistInResponse).to.be.equal(true);
await waku.libp2p.dialProtocol(foundNodeMultiaddrs, PeerExchangeCodec);
await waitForRemotePeerWithCodec(
waku,
PeerExchangeCodec,
foundNodePeerId
);
expect(await waku.libp2p.peerStore.has(nwaku1PeerId)).to.eq(true);
expect(waku.libp2p.getConnections()).has.length(2);
});
});
describe("Compliance Test", function () {
this.timeout(100_000);
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
beforeEachCustom(this, async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
});
tests({
async setup() {
await nwaku1.start({
relay: true,
discv5Discovery: true,
peerExchange: true
});
const enr = (await nwaku1.info()).enrUri;
await nwaku2.start({
relay: true,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: enr
});
waku = await createLightNode();
await waku.start();
const nwaku2Ma = await nwaku2.getMultiaddrWithId();
// we do this because we want peer-exchange discovery to get initialised before we dial the peer which contains info about the other peer
setTimeout(() => {
void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
}, 1000);
return new PeerExchangeDiscovery(waku.libp2p.components, [
DefaultPubsubTopic
]);
},
teardown: async () => {
this.timeout(15000);
await tearDownNodes([nwaku1, nwaku2], waku);
}
});
});
});