feat: peer exchange discovery improvements (#2537)

* rename, mark stuff to remove

* rename type

* update exports from discovery package, improve PX implementation

* re-structure px discovery, comment tests

* implement UT, E2E Tests, clean code a bit, implement recurring PX discovery, move DNS E2E Tests to separate folder, remove not needed E2E tests

* fix discovery dialer e2e test

* mark as started

* fix dns tests

* attempt to fix e2e

* skip test

* update tests

* fix typo

* add catch all in stream manager, update tests

* update mock

* update test
This commit is contained in:
Sasha 2025-08-14 09:47:55 +02:00 committed by GitHub
parent c161b37d08
commit 95da57a870
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1094 additions and 853 deletions

View File

@ -24,6 +24,7 @@ export class StreamManager {
}
public async getStream(peerId: PeerId): Promise<Stream | undefined> {
try {
const peerIdStr = peerId.toString();
const scheduledStream = this.streamPool.get(peerIdStr);
@ -45,6 +46,10 @@ export class StreamManager {
this.lockStream(peerIdStr, stream);
return stream;
} catch (error) {
this.log.error(`Failed to getStream:`, error);
return;
}
}
private async createStream(

View File

@ -53,7 +53,6 @@ class MockDNS implements DnsClient {
public resolveTXT(fqdn: string): Promise<string[]> {
if (this.fqdnThrows.includes(fqdn)) {
this.hasThrown = true;
console.log("throwing");
throw "Mock DNS throws.";
}

View File

@ -1,20 +1,13 @@
// DNS Discovery
export { PeerDiscoveryDns, wakuDnsDiscovery } from "./dns/dns_discovery.js";
export { enrTree } from "./dns/constants.js";
export { DnsNodeDiscovery } from "./dns/dns.js";
// Peer Exchange Discovery
export {
wakuPeerExchange,
PeerExchangeCodec,
WakuPeerExchange
} from "./peer-exchange/waku_peer_exchange.js";
export {
wakuPeerExchangeDiscovery,
PeerExchangeDiscovery
} from "./peer-exchange/waku_peer_exchange_discovery.js";
PeerExchangeDiscovery,
PeerExchangeCodec
} from "./peer-exchange/index.js";
// Local Peer Cache Discovery
export {
LocalPeerCacheDiscovery,
wakuLocalPeerCacheDiscovery

View File

@ -0,0 +1,10 @@
import { Tags } from "@waku/interfaces";
// amount of peers available per specification
export const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 60;
export const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE;
export const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50;
export const DEFAULT_PEER_EXCHANGE_TAG_TTL = 30_000;
export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";

View File

@ -1,10 +1,5 @@
export {
wakuPeerExchange,
PeerExchangeCodec,
WakuPeerExchange
} from "./waku_peer_exchange.js";
export {
wakuPeerExchangeDiscovery,
PeerExchangeDiscovery,
Options
} from "./waku_peer_exchange_discovery.js";
PeerExchangeDiscovery
} from "./peer_exchange_discovery.js";
export { PeerExchangeCodec } from "./constants.js";

View File

@ -0,0 +1,321 @@
import { EnrDecoder } from "@waku/enr";
import { ProtocolError } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { PeerExchange } from "./peer_exchange.js";
import { PeerExchangeRPC } from "./rpc.js";
describe("PeerExchange", () => {
let peerExchange: PeerExchange;
let mockComponents: any;
let mockStreamManager: any;
let mockPeerStore: any;
let mockStream: any;
let mockPeerId: any;
beforeEach(() => {
mockPeerId = {
toString: () => "test-peer-id",
equals: (other: any) => other && other.toString() === "test-peer-id"
};
mockStream = {
sink: sinon.stub(),
source: (async function* () {
const data = new Uint8Array([0, 0, 0, 4, 1, 2, 3, 4]);
yield data;
})()
};
mockStreamManager = {
getStream: sinon.stub().resolves(mockStream)
};
mockPeerStore = {
has: sinon.stub().resolves(true)
};
mockComponents = {
peerStore: mockPeerStore,
events: {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub()
}
};
peerExchange = new PeerExchange(mockComponents as any);
(peerExchange as any).streamManager = mockStreamManager;
});
afterEach(() => {
sinon.restore();
});
describe("constructor", () => {
it("should initialize with libp2p components", () => {
const components = {
peerStore: {},
events: {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub()
}
} as any;
const instance = new PeerExchange(components);
expect(instance).to.be.instanceOf(PeerExchange);
});
});
describe("query", () => {
let queryParams: any;
beforeEach(() => {
queryParams = {
numPeers: 5,
peerId: mockPeerId
};
});
it("should successfully query peers and return peer infos", async () => {
const mockResponse = {
peerInfos: [
{ enr: new Uint8Array([1, 2, 3]) },
{ enr: new Uint8Array([4, 5, 6]) }
]
};
const mockRpcResponse = {
response: mockResponse
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
const mockEnr = { toString: () => "mock-enr" };
sinon.stub(EnrDecoder, "fromRLP").resolves(mockEnr as any);
const result = await peerExchange.query(queryParams);
expect(result.error).to.be.null;
expect(result.peerInfos).to.have.length(2);
expect(result.peerInfos![0]).to.have.property("ENR");
expect(result.peerInfos![1]).to.have.property("ENR");
});
it("should handle empty peer infos gracefully", async () => {
const mockResponse = {
peerInfos: []
};
const mockRpcResponse = {
response: mockResponse
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
const result = await peerExchange.query(queryParams);
expect(result.error).to.be.null;
expect(result.peerInfos).to.have.length(0);
});
it("should filter out undefined ENRs", async () => {
const mockResponse = {
peerInfos: [
{ enr: new Uint8Array([1, 2, 3]) },
{ enr: undefined },
{ enr: new Uint8Array([4, 5, 6]) }
]
};
const mockRpcResponse = {
response: mockResponse
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
const mockEnr = { toString: () => "mock-enr" };
sinon.stub(EnrDecoder, "fromRLP").resolves(mockEnr as any);
const result = await peerExchange.query(queryParams);
expect(result.error).to.be.null;
expect(result.peerInfos).to.have.length(2);
});
it("should return NO_PEER_AVAILABLE when peer is not in peer store", async () => {
mockPeerStore.has.resolves(false);
const result = await peerExchange.query(queryParams);
expect(result.error).to.equal(ProtocolError.NO_PEER_AVAILABLE);
expect(result.peerInfos).to.be.null;
});
it("should return NO_STREAM_AVAILABLE when stream creation fails", async () => {
mockStreamManager.getStream.returns(undefined);
const result = await peerExchange.query(queryParams);
expect(result.error).to.equal(ProtocolError.NO_STREAM_AVAILABLE);
expect(result.peerInfos).to.be.null;
});
it("should return EMPTY_PAYLOAD when response field is missing", async () => {
const mockRpcResponse = {
response: undefined
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
const result = await peerExchange.query(queryParams);
expect(result.error).to.equal(ProtocolError.EMPTY_PAYLOAD);
expect(result.peerInfos).to.be.null;
});
it("should return DECODE_FAILED when RPC decode fails", async () => {
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").throws(new Error("Decode failed"));
const result = await peerExchange.query(queryParams);
expect(result.error).to.equal(ProtocolError.DECODE_FAILED);
expect(result.peerInfos).to.be.null;
});
it("should return DECODE_FAILED when ENR decoding fails", async () => {
const mockResponse = {
peerInfos: [{ enr: new Uint8Array([1, 2, 3]) }]
};
const mockRpcResponse = {
response: mockResponse
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
sinon.stub(EnrDecoder, "fromRLP").rejects(new Error("ENR decode failed"));
const result = await peerExchange.query(queryParams);
expect(result.error).to.equal(ProtocolError.DECODE_FAILED);
expect(result.peerInfos).to.be.null;
});
it("should handle malformed response data", async () => {
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").throws(new Error("Malformed data"));
const result = await peerExchange.query(queryParams);
expect(result.error).to.equal(ProtocolError.DECODE_FAILED);
expect(result.peerInfos).to.be.null;
});
it("should handle large number of peers request", async () => {
const largeQueryParams = {
numPeers: 1000,
peerId: mockPeerId
};
const mockResponse = {
peerInfos: Array(1000).fill({ enr: new Uint8Array([1, 2, 3]) })
};
const mockRpcResponse = {
response: mockResponse
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
const mockEnr = { toString: () => "mock-enr" };
sinon.stub(EnrDecoder, "fromRLP").resolves(mockEnr as any);
const result = await peerExchange.query(largeQueryParams);
expect(result.error).to.be.null;
expect(result.peerInfos).to.have.length(1000);
});
it("should handle zero peers request", async () => {
const zeroQueryParams = {
numPeers: 0,
peerId: mockPeerId
};
const mockResponse = {
peerInfos: []
};
const mockRpcResponse = {
response: mockResponse
};
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any);
sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any);
const result = await peerExchange.query(zeroQueryParams);
expect(result.error).to.be.null;
expect(result.peerInfos).to.have.length(0);
});
it("should create RPC request with correct parameters", async () => {
const mockRpcQuery = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
const createRequestStub = sinon
.stub(PeerExchangeRPC, "createRequest")
.returns(mockRpcQuery as any);
sinon
.stub(PeerExchangeRPC, "decode")
.returns({ response: { peerInfos: [] } } as any);
await peerExchange.query(queryParams);
expect(createRequestStub.calledOnce).to.be.true;
expect(createRequestStub.firstCall.args[0]).to.deep.equal({
numPeers: BigInt(queryParams.numPeers)
});
});
it("should create PeerExchange instance with components", () => {
const instance = new PeerExchange(mockComponents as any);
expect(instance).to.be.instanceOf(PeerExchange);
});
});
});

View File

@ -14,16 +14,15 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { PeerExchangeCodec } from "./constants.js";
import { PeerExchangeRPC } from "./rpc.js";
export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
const log = new Logger("peer-exchange");
/**
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/
export class WakuPeerExchange implements IPeerExchange {
export class PeerExchange implements IPeerExchange {
private readonly streamManager: StreamManager;
/**
@ -45,8 +44,8 @@ export class WakuPeerExchange implements IPeerExchange {
numPeers: BigInt(numPeers)
});
const peer = await this.components.peerStore.get(peerId);
if (!peer) {
const hasPeer = await this.components.peerStore.has(peerId);
if (!hasPeer) {
return {
peerInfos: null,
error: ProtocolError.NO_PEER_AVAILABLE
@ -56,7 +55,9 @@ export class WakuPeerExchange implements IPeerExchange {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
log.error(
`Failed to get a stream for remote peer:${peerId?.toString?.()}`
);
return {
peerInfos: null,
error: ProtocolError.NO_STREAM_AVAILABLE
@ -110,13 +111,3 @@ export class WakuPeerExchange implements IPeerExchange {
}
}
}
/**
*
* @returns A function that creates a new peer exchange protocol
*/
export function wakuPeerExchange(): (
components: Libp2pComponents
) => WakuPeerExchange {
return (components: Libp2pComponents) => new WakuPeerExchange(components);
}

View File

@ -0,0 +1,386 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { peerDiscoverySymbol as symbol } from "@libp2p/interface";
import type {
IdentifyResult,
PeerDiscoveryEvents,
PeerId
} from "@libp2p/interface";
import {
type IPeerExchange,
type Libp2pComponents,
ProtocolError
} from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { PeerExchangeCodec } from "./constants.js";
import {
PeerExchangeDiscovery,
wakuPeerExchangeDiscovery
} from "./peer_exchange_discovery.js";
describe("PeerExchangeDiscovery", () => {
let peerExchangeDiscovery: PeerExchangeDiscovery;
let mockComponents: Libp2pComponents;
let mockEvents: TypedEventEmitter<PeerDiscoveryEvents>;
let mockConnectionManager: any;
let mockPeerStore: any;
let mockPeerId: PeerId;
let clock: sinon.SinonFakeTimers;
beforeEach(() => {
clock = sinon.useFakeTimers();
mockPeerId = {
toString: sinon.stub().returns("peer-id-1"),
toBytes: sinon.stub().returns(new Uint8Array([1, 2, 3]))
} as unknown as PeerId;
mockEvents = new TypedEventEmitter<PeerDiscoveryEvents>();
mockConnectionManager = {
getConnections: sinon.stub().returns([{ remotePeer: mockPeerId }])
};
mockPeerStore = {
get: sinon.stub().resolves({
id: mockPeerId,
protocols: [PeerExchangeCodec]
}),
merge: sinon.stub().resolves(undefined),
has: sinon.stub().resolves(true)
};
mockComponents = {
events: mockEvents,
connectionManager: mockConnectionManager,
peerStore: mockPeerStore
} as unknown as Libp2pComponents;
peerExchangeDiscovery = new PeerExchangeDiscovery(mockComponents, {});
});
afterEach(() => {
clock.restore();
sinon.restore();
});
describe("constructor", () => {
it("should initialize with default options", () => {
const discovery = new PeerExchangeDiscovery(mockComponents);
expect(discovery).to.be.instanceOf(PeerExchangeDiscovery);
expect(discovery[symbol]).to.be.true;
expect(discovery[Symbol.toStringTag]).to.equal("@waku/peer-exchange");
});
it("should initialize with custom TTL", () => {
const customTTL = 60000;
const discovery = new PeerExchangeDiscovery(mockComponents, {
TTL: customTTL
});
expect(discovery).to.be.instanceOf(PeerExchangeDiscovery);
});
});
describe("start", () => {
it("should start peer exchange discovery", () => {
const addEventListenerSpy = sinon.spy(mockEvents, "addEventListener");
peerExchangeDiscovery.start();
expect(addEventListenerSpy.called).to.be.true;
});
it("should not start if already started", () => {
const addEventListenerSpy = sinon.spy(mockEvents, "addEventListener");
peerExchangeDiscovery.start();
peerExchangeDiscovery.start();
expect(addEventListenerSpy.calledOnce).to.be.true;
});
});
describe("stop", () => {
it("should stop peer exchange discovery", () => {
const removeEventListenerSpy = sinon.spy(
mockEvents,
"removeEventListener"
);
peerExchangeDiscovery.start();
peerExchangeDiscovery.stop();
expect(removeEventListenerSpy.called).to.be.true;
});
it("should not stop if not started", () => {
const removeEventListenerSpy = sinon.spy(
mockEvents,
"removeEventListener"
);
peerExchangeDiscovery.stop();
expect(removeEventListenerSpy.called).to.be.false;
});
});
describe("handleDiscoveredPeer", () => {
beforeEach(() => {
peerExchangeDiscovery.start();
});
it("should handle peer identify event", async () => {
const runQuerySpy = sinon.spy(peerExchangeDiscovery as any, "runQuery");
const mockIdentifyResult: IdentifyResult = {
peerId: mockPeerId,
protocols: [PeerExchangeCodec],
listenAddrs: [],
connection: {} as any
};
const event = new CustomEvent<IdentifyResult>("peer:identify", {
detail: mockIdentifyResult
});
await peerExchangeDiscovery["handleDiscoveredPeer"](event);
expect(runQuerySpy.called).to.be.true;
});
it("should skip peers without peer exchange protocol", async () => {
const mockIdentifyResult: IdentifyResult = {
peerId: mockPeerId,
protocols: ["other-protocol"],
listenAddrs: [],
connection: {} as any
};
const event = new CustomEvent<IdentifyResult>("peer:identify", {
detail: mockIdentifyResult
});
await peerExchangeDiscovery["handleDiscoveredPeer"](event);
expect(mockPeerStore.get.called).to.be.false;
});
});
describe("handlePeriodicDiscovery", () => {
beforeEach(() => {
peerExchangeDiscovery.start();
});
it("should query peers that support peer exchange", async () => {
await peerExchangeDiscovery["handlePeriodicDiscovery"]();
expect(mockConnectionManager.getConnections.called).to.be.true;
expect(mockPeerStore.get.called).to.be.true;
});
it("should skip peers that don't support peer exchange", async () => {
mockPeerStore.get.resolves({
id: mockPeerId,
protocols: ["other-protocol"]
});
await peerExchangeDiscovery["handlePeriodicDiscovery"]();
expect(mockConnectionManager.getConnections.called).to.be.true;
expect(mockPeerStore.get.called).to.be.true;
});
it("should handle peer store errors gracefully", async () => {
mockPeerStore.get.rejects(new Error("Peer store error"));
await peerExchangeDiscovery["handlePeriodicDiscovery"]();
expect(mockConnectionManager.getConnections.called).to.be.true;
});
it("should skip peers that were recently queried", async () => {
const peerIdStr = mockPeerId.toString();
peerExchangeDiscovery["peerExpirationRecords"].set(
peerIdStr,
Date.now() + 10000
);
await peerExchangeDiscovery["handlePeriodicDiscovery"]();
expect(mockPeerStore.get.called).to.be.false;
});
});
describe("runQuery", () => {
beforeEach(() => {
peerExchangeDiscovery.start();
});
it("should query peer with peer exchange protocol", async () => {
const querySpy = sinon.spy(peerExchangeDiscovery as any, "query");
await peerExchangeDiscovery["runQuery"](mockPeerId, [PeerExchangeCodec]);
expect(querySpy.called).to.be.true;
});
it("should skip peers without peer exchange protocol", async () => {
const querySpy = sinon.spy(peerExchangeDiscovery as any, "query");
await peerExchangeDiscovery["runQuery"](mockPeerId, ["other-protocol"]);
expect(querySpy.called).to.be.false;
});
it("should skip already querying peers", async () => {
peerExchangeDiscovery["queryingPeers"].add(mockPeerId.toString());
const querySpy = sinon.spy(peerExchangeDiscovery as any, "query");
await peerExchangeDiscovery["runQuery"](mockPeerId, [PeerExchangeCodec]);
expect(querySpy.called).to.be.false;
});
it("should handle query errors gracefully", async () => {
const queryStub = sinon
.stub(peerExchangeDiscovery as any, "query")
.rejects(new Error("Query failed"));
await peerExchangeDiscovery["runQuery"](mockPeerId, [PeerExchangeCodec]);
expect(queryStub.called).to.be.true;
expect(peerExchangeDiscovery["queryingPeers"].has(mockPeerId.toString()))
.to.be.false;
});
});
describe("query", () => {
beforeEach(() => {
peerExchangeDiscovery.start();
});
it("should process successful peer exchange query", async () => {
const mockENR = {
peerInfo: {
id: mockPeerId,
multiaddrs: []
},
shardInfo: { clusterId: 1, shards: [1] }
};
const internalPeerExchange = (peerExchangeDiscovery as any)[
"peerExchange"
] as IPeerExchange;
sinon.stub(internalPeerExchange, "query").resolves({
peerInfos: [{ ENR: mockENR as any }],
error: null
});
const dispatchEventSpy = sinon.spy(
peerExchangeDiscovery,
"dispatchEvent"
);
await peerExchangeDiscovery["query"](mockPeerId);
expect(mockPeerStore.merge.called).to.be.true;
expect(dispatchEventSpy.called).to.be.true;
});
it("should handle query errors", async () => {
const internalPeerExchange = (peerExchangeDiscovery as any)[
"peerExchange"
] as IPeerExchange;
sinon.stub(internalPeerExchange, "query").resolves({
peerInfos: null,
error: ProtocolError.NO_PEER_AVAILABLE
});
await peerExchangeDiscovery["query"](mockPeerId);
expect(mockPeerStore.merge.called).to.be.false;
});
it("should skip peers without ENR", async () => {
const internalPeerExchange = (peerExchangeDiscovery as any)[
"peerExchange"
] as IPeerExchange;
sinon.stub(internalPeerExchange, "query").resolves({
peerInfos: [{ ENR: undefined }],
error: null
});
await peerExchangeDiscovery["query"](mockPeerId);
expect(mockPeerStore.merge.called).to.be.false;
});
it("should skip peers without peerInfo in ENR", async () => {
const internalPeerExchange = (peerExchangeDiscovery as any)[
"peerExchange"
] as IPeerExchange;
sinon.stub(internalPeerExchange, "query").resolves({
peerInfos: [{ ENR: { peerInfo: undefined } as any }],
error: null
});
await peerExchangeDiscovery["query"](mockPeerId);
expect(mockPeerStore.merge.called).to.be.false;
});
it("should handle ENR without shardInfo", async () => {
const mockENRWithoutShard = {
peerInfo: {
id: mockPeerId,
multiaddrs: []
}
};
const internalPeerExchange = (peerExchangeDiscovery as any)[
"peerExchange"
] as IPeerExchange;
sinon.stub(internalPeerExchange, "query").resolves({
peerInfos: [{ ENR: mockENRWithoutShard as any }],
error: null
});
await peerExchangeDiscovery["query"](mockPeerId);
expect(mockPeerStore.merge.called).to.be.true;
});
});
describe("continuous discovery interval", () => {
it("should start periodic discovery on start", () => {
const setIntervalSpy = sinon.spy(global, "setInterval");
peerExchangeDiscovery.start();
expect(setIntervalSpy.called).to.be.true;
});
it("should clear interval on stop", () => {
const clearIntervalSpy = sinon.spy(global, "clearInterval");
peerExchangeDiscovery.start();
peerExchangeDiscovery.stop();
expect(clearIntervalSpy.called).to.be.true;
});
});
describe("wakuPeerExchangeDiscovery factory", () => {
it("should create PeerExchangeDiscovery instance", () => {
const factory = wakuPeerExchangeDiscovery({ TTL: 60000 });
const discovery = factory(mockComponents);
expect(discovery).to.be.instanceOf(PeerExchangeDiscovery);
});
it("should create PeerExchangeDiscovery with default options", () => {
const factory = wakuPeerExchangeDiscovery();
const discovery = factory(mockComponents);
expect(discovery).to.be.instanceOf(PeerExchangeDiscovery);
});
});
});

View File

@ -0,0 +1,246 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { peerDiscoverySymbol as symbol } from "@libp2p/interface";
import type {
IdentifyResult,
PeerDiscovery,
PeerDiscoveryEvents,
PeerId,
PeerInfo
} from "@libp2p/interface";
import {
type IPeerExchange,
type Libp2pComponents,
type Libp2pEventHandler
} from "@waku/interfaces";
import { encodeRelayShard, Logger } from "@waku/utils";
import {
DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
DEFAULT_PEER_EXCHANGE_TAG_NAME,
DEFAULT_PEER_EXCHANGE_TAG_TTL,
DEFAULT_PEER_EXCHANGE_TAG_VALUE,
PeerExchangeCodec
} from "./constants.js";
import { PeerExchange } from "./peer_exchange.js";
const log = new Logger("peer-exchange-discovery");
interface PeerExchangeDiscoveryOptions {
/**
* Peer TTL in milliseconds.
* This is the time after which a peer will be considered stale and will be re-queried via peer exchange.
*
* @default 30_000
*/
TTL: number;
}
export class PeerExchangeDiscovery
extends TypedEventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery
{
private readonly components: Libp2pComponents;
private readonly peerExchange: IPeerExchange;
private readonly options: PeerExchangeDiscoveryOptions;
private isStarted: boolean = false;
private queryingPeers: Set<string> = new Set();
private peerExpirationRecords: Map<string, number> = new Map();
private continuousDiscoveryInterval: NodeJS.Timeout | null = null;
public constructor(
components: Libp2pComponents,
options: Partial<PeerExchangeDiscoveryOptions> = {}
) {
super();
this.components = components;
this.peerExchange = new PeerExchange(components);
this.options = {
...options,
TTL: options.TTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL
};
this.handleDiscoveredPeer = this.handleDiscoveredPeer.bind(this);
}
/**
* Start Peer Exchange.
* Subscribe to "peer:identify" events and handle them.
*/
public start(): void {
if (this.isStarted) {
return;
}
log.info("Starting peer exchange node discovery, discovering peers");
this.isStarted = true;
this.components.events.addEventListener(
"peer:identify",
this.handleDiscoveredPeer as Libp2pEventHandler<IdentifyResult>
);
this.continuousDiscoveryInterval = setInterval(() => {
void this.handlePeriodicDiscovery();
}, this.options.TTL);
}
/**
* Stop Peer Exchange.
* Unsubscribe from "peer:identify" events.
*/
public stop(): void {
if (!this.isStarted) {
return;
}
log.info("Stopping peer exchange node discovery");
this.isStarted = false;
this.queryingPeers.clear();
this.peerExpirationRecords.clear();
if (this.continuousDiscoveryInterval) {
clearInterval(this.continuousDiscoveryInterval);
}
this.components.events.removeEventListener(
"peer:identify",
this.handleDiscoveredPeer as Libp2pEventHandler<IdentifyResult>
);
}
public get [symbol](): true {
return true;
}
public get [Symbol.toStringTag](): string {
return "@waku/peer-exchange";
}
private async handleDiscoveredPeer(
event: CustomEvent<IdentifyResult>
): Promise<void> {
void this.runQuery(event.detail.peerId, event.detail.protocols);
}
private async handlePeriodicDiscovery(): Promise<void> {
const connections = this.components.connectionManager.getConnections();
await Promise.all(
connections.map(async (connection) => {
try {
const peerIdStr = connection.remotePeer.toString();
const shouldQuery = this.peerExpirationRecords.has(peerIdStr)
? this.peerExpirationRecords.get(peerIdStr)! <= Date.now()
: true;
if (!shouldQuery) {
return null;
}
const peer = await this.components.peerStore.get(
connection.remotePeer
);
return this.runQuery(connection.remotePeer, peer.protocols);
} catch (error) {
log.warn("Error getting peer info", error);
return null;
}
})
);
}
private async runQuery(peerId: PeerId, protocols: string[]): Promise<void> {
if (
!protocols.includes(PeerExchangeCodec) ||
this.queryingPeers.has(peerId.toString())
) {
log.info(
`Skipping peer ${peerId} as it is already querying or does not support peer exchange`
);
return;
}
try {
this.queryingPeers.add(peerId.toString());
await this.query(peerId);
} catch (error) {
log.error("Error querying peer", error);
}
this.peerExpirationRecords.set(
peerId.toString(),
Date.now() + this.options.TTL
);
this.queryingPeers.delete(peerId.toString());
}
private async query(peerId: PeerId): Promise<void> {
const peerIdStr = peerId.toString();
log.info(`Querying peer exchange for ${peerIdStr}`);
const { error, peerInfos } = await this.peerExchange.query({
numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
peerId
});
if (error) {
log.error(`Peer exchange query to ${peerIdStr} failed`, error);
return;
}
for (const { ENR } of peerInfos) {
if (!ENR) {
log.warn(`No ENR in peerInfo object from ${peerIdStr}, skipping`);
continue;
}
const { peerInfo, shardInfo } = ENR;
if (!peerInfo) {
log.warn(`No peerInfo in ENR from ${peerIdStr}, skipping`);
continue;
}
// merge is smart enough to overwrite only changed parts
await this.components.peerStore.merge(peerInfo.id, {
tags: {
[DEFAULT_PEER_EXCHANGE_TAG_NAME]: {
value: DEFAULT_PEER_EXCHANGE_TAG_VALUE
}
},
...(shardInfo && {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
}),
...(peerInfo.multiaddrs && {
multiaddrs: peerInfo.multiaddrs
})
});
log.info(`Discovered peer: ${peerInfo.id.toString()}`);
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", {
detail: {
id: peerInfo.id,
multiaddrs: peerInfo.multiaddrs
}
})
);
}
}
}
export function wakuPeerExchangeDiscovery(
options: Partial<PeerExchangeDiscoveryOptions> = {}
): (components: Libp2pComponents) => PeerExchangeDiscovery {
return (components: Libp2pComponents) =>
new PeerExchangeDiscovery(components, options);
}

View File

@ -1,317 +0,0 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { peerDiscoverySymbol as symbol } from "@libp2p/interface";
import type {
IdentifyResult,
PeerDiscovery,
PeerDiscoveryEvents,
PeerId,
PeerInfo
} from "@libp2p/interface";
import {
type Libp2pComponents,
type PeerExchangeQueryResult,
ShardInfo,
Tags
} from "@waku/interfaces";
import { decodeRelayShard, encodeRelayShard, Logger } from "@waku/utils";
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
const log = new Logger("peer-exchange-discovery");
const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10;
const DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS = 10 * 1000;
const DEFAULT_MAX_RETRIES = 3;
export interface Options {
/**
* Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap')
*/
tagName?: string;
/**
* The bootstrap peer tag will have this value (default: 50)
*/
tagValue?: number;
/**
* Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes)
*/
tagTTL?: number;
/**
* The interval between queries to a peer (default: 10 seconds)
* The interval will increase by a factor of an incrementing number (starting at 1)
* until it reaches the maximum attempts before backoff
*/
queryInterval?: number;
/**
* The number of attempts before the queries to a peer are aborted (default: 3)
*/
maxRetries?: number;
}
interface CustomDiscoveryEvent extends PeerDiscoveryEvents {
"waku:peer-exchange:started": CustomEvent<boolean>;
}
const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE;
const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50;
const DEFAULT_PEER_EXCHANGE_TAG_TTL = 100_000_000;
export class PeerExchangeDiscovery
extends TypedEventEmitter<CustomDiscoveryEvent>
implements PeerDiscovery
{
private readonly components: Libp2pComponents;
private readonly peerExchange: WakuPeerExchange;
private readonly options: Options;
private isStarted: boolean;
private queryingPeers: Set<string> = new Set();
private queryAttempts: Map<string, number> = new Map();
private readonly handleDiscoveredPeer = (
event: CustomEvent<IdentifyResult>
): void => {
const { protocols, peerId } = event.detail;
if (
!protocols.includes(PeerExchangeCodec) ||
this.queryingPeers.has(peerId.toString())
)
return;
this.queryingPeers.add(peerId.toString());
this.startRecurringQueries(peerId).catch((error) =>
log.error(`Error querying peer ${error}`)
);
};
public constructor(components: Libp2pComponents, options: Options = {}) {
super();
this.components = components;
this.peerExchange = new WakuPeerExchange(components);
this.options = options;
this.isStarted = false;
}
/**
* Start emitting events
*/
public start(): void {
if (this.isStarted) {
return;
}
this.dispatchEvent(
new CustomEvent("waku:peer-exchange:started", { detail: true })
);
log.info("Starting peer exchange node discovery, discovering peers");
// might be better to use "peer:identify" or "peer:update"
this.components.events.addEventListener(
"peer:identify",
this.handleDiscoveredPeer
);
}
/**
* Remove event listener
*/
public stop(): void {
if (!this.isStarted) return;
log.info("Stopping peer exchange node discovery");
this.isStarted = false;
this.queryingPeers.clear();
this.components.events.removeEventListener(
"peer:identify",
this.handleDiscoveredPeer
);
}
public get [symbol](): true {
return true;
}
public get [Symbol.toStringTag](): string {
return "@waku/peer-exchange";
}
private readonly startRecurringQueries = async (
peerId: PeerId
): Promise<void> => {
const peerIdStr = peerId.toString();
const {
queryInterval = DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS,
maxRetries = DEFAULT_MAX_RETRIES
} = this.options;
log.info(
`Querying peer: ${peerIdStr} (attempt ${
this.queryAttempts.get(peerIdStr) ?? 1
})`
);
await this.query(peerId);
const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1;
if (currentAttempt > maxRetries) {
this.abortQueriesForPeer(peerIdStr);
return;
}
setTimeout(() => {
this.queryAttempts.set(peerIdStr, currentAttempt + 1);
this.startRecurringQueries(peerId).catch((error) => {
log.error(`Error in startRecurringQueries: ${error}`);
});
}, queryInterval * currentAttempt);
};
private async query(peerId: PeerId): Promise<PeerExchangeQueryResult> {
const { error, peerInfos } = await this.peerExchange.query({
numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
peerId
});
if (error) {
log.error("Peer exchange query failed", error);
return { error, peerInfos: null };
}
for (const _peerInfo of peerInfos) {
const { ENR } = _peerInfo;
if (!ENR) {
log.warn("No ENR in peerInfo object, skipping");
continue;
}
const { peerId, peerInfo, shardInfo } = ENR;
if (!peerId || !peerInfo) {
continue;
}
const hasPeer = await this.components.peerStore.has(peerId);
if (hasPeer) {
const { hasMultiaddrDiff, hasShardDiff } = await this.checkPeerInfoDiff(
peerInfo,
shardInfo
);
if (hasMultiaddrDiff || hasShardDiff) {
log.info(
`Peer ${peerId.toString()} has updated multiaddrs or shardInfo, updating`
);
if (hasMultiaddrDiff) {
log.info(
`Peer ${peerId.toString()} has updated multiaddrs, updating`
);
await this.components.peerStore.patch(peerId, {
multiaddrs: peerInfo.multiaddrs
});
}
if (hasShardDiff && shardInfo) {
log.info(
`Peer ${peerId.toString()} has updated shardInfo, updating`
);
await this.components.peerStore.merge(peerId, {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
});
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", {
detail: {
id: peerId,
multiaddrs: peerInfo.multiaddrs
}
})
);
}
continue;
}
}
// update the tags for the peer
await this.components.peerStore.save(peerId, {
tags: {
[DEFAULT_PEER_EXCHANGE_TAG_NAME]: {
value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE,
ttl: this.options.tagTTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL
}
},
...(shardInfo && {
metadata: {
shardInfo: encodeRelayShard(shardInfo)
}
}),
...(peerInfo.multiaddrs && {
multiaddrs: peerInfo.multiaddrs
})
});
log.info(`Discovered peer: ${peerId.toString()}`);
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", {
detail: {
id: peerId,
multiaddrs: peerInfo.multiaddrs
}
})
);
}
return { error: null, peerInfos };
}
private abortQueriesForPeer(peerIdStr: string): void {
log.info(`Aborting queries for peer: ${peerIdStr}`);
this.queryingPeers.delete(peerIdStr);
this.queryAttempts.delete(peerIdStr);
}
private async checkPeerInfoDiff(
peerInfo: PeerInfo,
shardInfo?: ShardInfo
): Promise<{ hasMultiaddrDiff: boolean; hasShardDiff: boolean }> {
const { id: peerId } = peerInfo;
const peer = await this.components.peerStore.get(peerId);
const existingMultiaddrs = peer.addresses.map((a) =>
a.multiaddr.toString()
);
const newMultiaddrs = peerInfo.multiaddrs.map((ma) => ma.toString());
const hasMultiaddrDiff = existingMultiaddrs.some(
(ma) => !newMultiaddrs.includes(ma)
);
let hasShardDiff: boolean = false;
const existingShardInfoBytes = peer.metadata.get("shardInfo");
if (existingShardInfoBytes) {
const existingShardInfo = decodeRelayShard(existingShardInfoBytes);
if (existingShardInfo || shardInfo) {
hasShardDiff =
existingShardInfo.clusterId !== shardInfo?.clusterId ||
existingShardInfo.shards.some(
(shard) => !shardInfo?.shards.includes(shard)
);
}
}
return { hasMultiaddrDiff, hasShardDiff };
}
}
export function wakuPeerExchangeDiscovery(): (
components: Libp2pComponents
) => PeerExchangeDiscovery {
return (components: Libp2pComponents) =>
new PeerExchangeDiscovery(components);
}

View File

@ -94,7 +94,6 @@ describe.skip("DiscoveryDialer", function () {
});
});
// TODO(weboko): investigate why peer-exchange discovery is not working https://github.com/waku-org/js-waku/issues/2446
await waku.libp2p.peerStore.save(secondPeerId, {
multiaddrs: [maddrs[1]]
});

View File

@ -15,7 +15,7 @@ import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import { MemoryDatastore } from "datastore-core/memory";
import { delay } from "../src/index.js";
import { delay } from "../../src/index.js";
const maxQuantity = 3;

View File

@ -0,0 +1,58 @@
import { bootstrap } from "@libp2p/bootstrap";
import {
DnsNodeDiscovery,
enrTree,
wakuPeerExchangeDiscovery
} from "@waku/discovery";
import type { LightNode } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import { afterEachCustom, tearDownNodes } from "../../src/index.js";
describe("DNS Discovery", function () {
let waku: LightNode;
const predefinedNodes: string[] = [];
afterEachCustom(this, async () => {
await tearDownNodes([], waku);
});
it(`should discover peers other than used for bootstrapping`, async function () {
this.timeout(50_000);
const dns = await DnsNodeDiscovery.dnsOverHttp();
const dnsEnrs = [];
for await (const node of dns.getNextPeer([enrTree["SANDBOX"]])) {
dnsEnrs.push(node);
}
const dnsPeerMultiaddrs = dnsEnrs
.flatMap(
(enr) => enr.peerInfo?.multiaddrs.map((ma) => ma.toString()) ?? []
)
.filter((ma) => ma.includes("wss"));
const networkConfig = { clusterId: 2, numShardsInCluster: 0 };
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: dnsPeerMultiaddrs }),
wakuPeerExchangeDiscovery()
]
},
networkConfig
});
const foundPxPeer = await new Promise<boolean>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
const peerId = evt.detail.id.toString();
const isBootstrapNode = predefinedNodes.find((n) => n.includes(peerId));
if (!isBootstrapNode) {
resolve(true);
}
});
});
expect(foundPxPeer).to.be.true;
});
});

View File

@ -36,6 +36,11 @@ describe("Peer Exchange", function () {
peerExchange: true,
discv5BootstrapNode: enr
});
const waitForNodesToMountPeerExchange = new Promise((resolve) =>
setTimeout(resolve, 10_000)
);
await waitForNodesToMountPeerExchange;
});
tests({
@ -43,22 +48,16 @@ describe("Peer Exchange", function () {
waku = await createLightNode({
networkConfig: DefaultTestNetworkConfig
});
await waku.start();
const nwaku2Ma = await nwaku2.getMultiaddrWithId();
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
const peerExchange = new PeerExchangeDiscovery(waku.libp2p.components);
peerExchange.addEventListener("waku:peer-exchange:started", (event) => {
if (event.detail === true) {
void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
}
});
await waku.libp2p.dialProtocol(nwaku1Ma, PeerExchangeCodec);
return peerExchange;
},
teardown: async () => {
this.timeout(15000);
this.timeout(15_000);
await tearDownNodes([nwaku1, nwaku2], waku);
}
});

View File

@ -1,127 +0,0 @@
import { generateKeyPair } from "@libp2p/crypto/keys";
import { type PeerId } from "@libp2p/interface";
import { peerIdFromPrivateKey } from "@libp2p/peer-id";
import { multiaddr } from "@multiformats/multiaddr";
import { PeerExchangeDiscovery } from "@waku/discovery";
import { IEnr, LightNode } from "@waku/interfaces";
import { createLightNode, ShardInfo } from "@waku/sdk";
import { decodeRelayShard } from "@waku/utils";
import { expect } from "chai";
import Sinon from "sinon";
describe("Peer Exchange Continuous Discovery", () => {
let peerExchangeDiscovery: PeerExchangeDiscovery;
let queryStub: Sinon.SinonStub;
let peerId: PeerId;
let randomPeerId: PeerId;
let waku: LightNode;
const shardInfo: ShardInfo = {
clusterId: 2,
shards: [1, 2]
};
const multiaddrs = [multiaddr("/ip4/127.0.0.1/udp/1234")];
beforeEach(async () => {
waku = await createLightNode();
peerExchangeDiscovery = new PeerExchangeDiscovery(waku.libp2p.components);
queryStub = Sinon.stub(
(peerExchangeDiscovery as any).peerExchange,
"query" as any
);
await discoverPeerOnce();
});
it("Should update multiaddrs", async () => {
const newMultiaddrs = [multiaddr("/ip4/192.168.1.1/udp/1234")];
const newPeerInfo = {
ENR: {
peerId,
shardInfo,
peerInfo: {
multiaddrs: newMultiaddrs,
id: peerId
}
} as IEnr
};
queryStub.resolves({ error: null, peerInfos: [newPeerInfo] });
const newResult = await (peerExchangeDiscovery as any).query(randomPeerId);
expect(newResult.error).to.be.null;
const newPeers = await waku.libp2p.peerStore.all();
expect(newPeers.length).to.equal(1);
const newPeer = newPeers[0];
expect(newPeer.addresses.length).to.equal(1);
expect(newPeer.addresses[0].multiaddr.toString()).to.equal(
newMultiaddrs[0].toString()
);
});
it("Should update shard info", async () => {
const newShardInfo: ShardInfo = {
clusterId: 2,
shards: [1, 2, 3]
};
const newPeerInfo = {
ENR: {
peerId,
shardInfo: newShardInfo,
peerInfo: {
multiaddrs: multiaddrs,
id: peerId
}
} as IEnr
};
queryStub.resolves({ error: null, peerInfos: [newPeerInfo] });
const newResult = await (peerExchangeDiscovery as any).query(randomPeerId);
expect(newResult.error).to.be.null;
const newPeers = await waku.libp2p.peerStore.all();
expect(newPeers.length).to.equal(1);
const newPeer = newPeers[0];
expect(newPeer.addresses.length).to.equal(1);
expect(newPeer.addresses[0].multiaddr.toString()).to.equal(
multiaddrs[0].toString()
);
const _shardInfo = decodeRelayShard(newPeer.metadata.get("shardInfo")!);
expect(_shardInfo).to.deep.equal(newShardInfo);
});
async function discoverPeerOnce(): Promise<void> {
const privateKey = await generateKeyPair("secp256k1");
peerId = peerIdFromPrivateKey(privateKey);
const enr: IEnr = {
peerId,
shardInfo,
peerInfo: {
multiaddrs: multiaddrs,
id: peerId
}
} as IEnr;
const peerInfo = {
ENR: enr
};
queryStub.resolves({ error: null, peerInfos: [peerInfo] });
const privateKeyRandom = await generateKeyPair("secp256k1");
randomPeerId = peerIdFromPrivateKey(privateKeyRandom);
const result = await (peerExchangeDiscovery as any).query(randomPeerId);
expect(result.error).to.be.null;
const peers = await waku.libp2p.peerStore.all();
expect(peers.length).to.equal(1);
const peer = peers[0];
expect(peer.addresses.length).to.equal(1);
expect(peer.addresses[0].multiaddr.toString()).to.equal(
multiaddrs[0].toString()
);
const _shardInfo = decodeRelayShard(peer.metadata.get("shardInfo")!);
expect(_shardInfo).to.deep.equal(shardInfo);
}
});

View File

@ -1,64 +0,0 @@
import { bootstrap } from "@libp2p/bootstrap";
import {
DnsNodeDiscovery,
enrTree,
wakuPeerExchangeDiscovery
} from "@waku/discovery";
import type { LightNode } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import { afterEachCustom, tearDownNodes } from "../../src/index.js";
describe("Peer Exchange", () => {
describe("Auto Discovery", function () {
let waku: LightNode;
const predefinedNodes: string[] = [];
afterEachCustom(this, async () => {
await tearDownNodes([], waku);
});
it(`should discover peers other than used for bootstrapping`, async function () {
this.timeout(50_000);
const dns = await DnsNodeDiscovery.dnsOverHttp();
const dnsEnrs = [];
for await (const node of dns.getNextPeer([enrTree["SANDBOX"]])) {
dnsEnrs.push(node);
}
const dnsPeerMultiaddrs = dnsEnrs
.flatMap(
(enr) => enr.peerInfo?.multiaddrs.map((ma) => ma.toString()) ?? []
)
.filter((ma) => ma.includes("wss"));
const networkConfig = { clusterId: 2, numShardsInCluster: 0 };
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: dnsPeerMultiaddrs }),
wakuPeerExchangeDiscovery()
]
},
networkConfig
});
await waku.start();
const foundPxPeer = await new Promise<boolean>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
const peerId = evt.detail.id.toString();
const isBootstrapNode = predefinedNodes.find((n) =>
n.includes(peerId)
);
if (!isBootstrapNode) {
resolve(true);
}
});
});
expect(foundPxPeer).to.be.true;
});
});
});

View File

@ -13,6 +13,7 @@ import {
DefaultTestClusterId,
DefaultTestNetworkConfig,
DefaultTestShardInfo,
delay,
makeLogFileName,
ServiceNode,
tearDownNodes
@ -22,6 +23,7 @@ export const log = new Logger("test:pe");
describe("Peer Exchange", function () {
this.timeout(150_000);
let ctx: Mocha.Context;
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
@ -29,8 +31,9 @@ describe("Peer Exchange", function () {
let dialPeerSpy: SinonSpy;
beforeEachCustom(this, async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
ctx = this.ctx;
nwaku1 = new ServiceNode(makeLogFileName(ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(ctx) + "2");
await nwaku1.start({
clusterId: DefaultTestClusterId,
shard: DefaultTestShardInfo.shards,
@ -46,6 +49,8 @@ describe("Peer Exchange", function () {
discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true
});
await delay(10_000); // wait for peer exchange to finish, nwaku takes ~10s
});
afterEachCustom(this, async () => {
@ -62,7 +67,6 @@ describe("Peer Exchange", function () {
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).libp2p, "dial");
const pxPeersDiscovered = new Set<PeerId>();
@ -87,104 +91,49 @@ describe("Peer Exchange", function () {
expect(pxPeersDiscovered.size).to.equal(1);
});
// will be skipped until https://github.com/waku-org/js-waku/issues/1860 is fixed
it.skip("new peer added after a peer was already found", async function () {
it("new peer added after a peer was already found", async function () {
waku = await createLightNode({
networkConfig: DefaultTestNetworkConfig,
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery()
]
peerDiscovery: [wakuPeerExchangeDiscovery({ TTL: 1_000 })]
}
});
await waku.start();
await waku.dial(await nwaku2.getMultiaddrWithId());
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
resolve();
}
}
})();
});
});
nwaku3 = new ServiceNode(makeLogFileName(this) + "3");
nwaku3 = new ServiceNode(makeLogFileName(ctx) + "3");
await nwaku3.start({
clusterId: DefaultTestClusterId,
shard: DefaultTestShardInfo.shards,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true,
lightpush: true,
filter: true
relay: true
});
await new Promise<void>((resolve) => {
const nwaku3PeerId = (await nwaku3.getPeerId()).toString();
const pxPeersDiscoveredPromise = new Promise<boolean>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
setTimeout(() => {
resolve(false);
}, 10_000);
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 2) {
resolve();
}
}
})();
});
});
});
// will be skipped until https://github.com/waku-org/js-waku/issues/1858 is fixed
it.skip("wrong wakuPeerExchangeDiscovery pubsub topic", async function () {
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery()
]
}
});
await waku.start();
dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
const timeoutId = setTimeout(() => {
resolve();
}, 40000);
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
const peer = await waku.libp2p.peerStore.get(peerId);
const tags = Array.from(peer.tags.keys());
if (tags.includes(Tags.PEER_EXCHANGE)) {
pxPeersDiscovered.add(peerId);
if (pxPeersDiscovered.size === 1) {
clearTimeout(timeoutId);
resolve();
}
if (
tags.includes(Tags.PEER_EXCHANGE) &&
peerId.toString() === nwaku3PeerId
) {
resolve(true);
}
})();
});
});
expect(
pxPeersDiscovered.size,
"No peer should have been discovered"
).to.equal(0);
const pxPeersDiscovered = await pxPeersDiscoveredPromise;
expect(pxPeersDiscovered, "Peer was found").to.be.true;
});
it("peerDiscovery without wakuPeerExchangeDiscovery", async function () {

View File

@ -1,200 +0,0 @@
import { bootstrap } from "@libp2p/bootstrap";
import type { PeerId } from "@libp2p/interface";
import { multiaddr } from "@multiformats/multiaddr";
import type { Multiaddr } from "@multiformats/multiaddr";
import {
PeerExchangeCodec,
WakuPeerExchange,
wakuPeerExchangeDiscovery
} from "@waku/discovery";
import type { LightNode, PeerExchangeQueryResult } from "@waku/interfaces";
import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk";
import { Logger } from "@waku/utils";
import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
delay,
makeLogFileName,
ServiceNode,
tearDownNodes,
waitForRemotePeerWithCodec
} from "../../src/index.js";
export const log = new Logger("test:pe");
const ShardInfo = { clusterId: 0, shards: [2] };
describe("Peer Exchange Query", function () {
this.timeout(30_000);
let waku: LightNode;
let nwaku1: ServiceNode;
let nwaku2: ServiceNode;
let nwaku3: ServiceNode;
let nwaku1PeerId: PeerId;
let nwaku3MA: Multiaddr;
let nwaku3PeerId: PeerId;
let components: Libp2pComponents;
let peerExchange: WakuPeerExchange;
let numPeersToRequest: number;
let queryResult: PeerExchangeQueryResult;
beforeEachCustom(
this,
async () => {
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3");
await nwaku1.start({
shard: ShardInfo.shards,
clusterId: ShardInfo.clusterId,
discv5Discovery: true,
peerExchange: true,
relay: true
});
nwaku1PeerId = await nwaku1.getPeerId();
await nwaku2.start({
shard: ShardInfo.shards,
clusterId: ShardInfo.clusterId,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku1.info()).enrUri,
relay: true
});
await nwaku3.start({
shard: ShardInfo.shards,
clusterId: ShardInfo.clusterId,
discv5Discovery: true,
peerExchange: true,
discv5BootstrapNode: (await nwaku2.info()).enrUri,
relay: true
});
nwaku3MA = await nwaku3.getMultiaddrWithId();
nwaku3PeerId = await nwaku3.getPeerId();
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3MA.toString()] }),
wakuPeerExchangeDiscovery()
]
}
});
await waku.start();
await waku.libp2p.dialProtocol(nwaku3MA, PeerExchangeCodec);
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku3PeerId);
components = waku.libp2p.components as unknown as Libp2pComponents;
peerExchange = new WakuPeerExchange(components);
numPeersToRequest = 2;
const startTime = Date.now();
while (true) {
if (Date.now() - startTime > 100_000) {
log.error("Timeout reached, exiting the loop.");
break;
}
await delay(2000);
try {
queryResult = await Promise.race([
peerExchange.query({
peerId: nwaku3PeerId,
numPeers: numPeersToRequest
}),
new Promise<PeerExchangeQueryResult>((resolve) =>
setTimeout(
() =>
resolve({
peerInfos: null,
error: ProtocolError.GENERIC_FAIL
}),
5000
)
)
]);
const hasErrors = queryResult?.error !== null;
const hasPeerInfos =
queryResult?.peerInfos &&
queryResult.peerInfos.length === numPeersToRequest;
if (hasErrors) {
log.error("Error encountered, retrying...", queryResult.error);
continue;
}
if (!hasPeerInfos) {
log.warn(
"Peer info not available or does not match the requested number of peers, retrying..."
);
continue;
}
break;
} catch (error) {
log.warn("Error encountered, retrying...", error);
}
}
},
120_000
);
afterEachCustom(this, async () => {
await tearDownNodes([nwaku1, nwaku2, nwaku3], waku);
});
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("connected peers and dial", async function () {
expect(queryResult.error).to.be.null;
expect(queryResult.peerInfos?.[0].ENR).to.not.be.null;
expect(queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs).to.not.be.null;
const peerWsMA = queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs[2];
const localPeerWsMAAsString = peerWsMA
?.toString()
.replace(/\/ip4\/[\d.]+\//, "/ip4/127.0.0.1/");
const localPeerWsMA = multiaddr(localPeerWsMAAsString);
let foundNodePeerId: PeerId | undefined = undefined;
const doesPeerIdExistInResponse = queryResult.peerInfos?.some(({ ENR }) => {
foundNodePeerId = ENR?.peerInfo?.id;
return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString();
});
if (!foundNodePeerId) {
throw new Error("Peer1 ID not found");
}
expect(doesPeerIdExistInResponse, "peer not found").to.be.equal(true);
await waku.libp2p.dialProtocol(localPeerWsMA, PeerExchangeCodec);
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, foundNodePeerId);
});
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("more peers than existing", async function () {
const result = await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: 5
});
expect(result.error).to.be.null;
expect(result.peerInfos?.length).to.be.eq(numPeersToRequest);
});
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("less peers than existing", async function () {
const result = await peerExchange.query({
peerId: nwaku3PeerId,
numPeers: 1
});
expect(result.error).to.be.null;
expect(result.peerInfos?.length).to.be.eq(1);
});
// slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911
it.skip("non connected peers", async function () {
// querying the non connected peer
const result = await peerExchange.query({
peerId: nwaku1PeerId,
numPeers: numPeersToRequest
});
expect(result.error).to.be.eq(ProtocolError.NO_PEER_AVAILABLE);
expect(result.peerInfos).to.be.null;
});
});

View File

@ -164,18 +164,15 @@ describe("Static Sharding: Peer Management", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery()
wakuPeerExchangeDiscovery({ TTL: 1000 })
]
}
});
dialPeerSpy = Sinon.spy((waku as any).libp2p, "dial");
await waku.start();
const pxPeersDiscovered = new Set<PeerId>();
await new Promise<void>((resolve) => {
const pxPeersDiscoveredPromise = new Promise<void>((resolve) => {
waku.libp2p.addEventListener("peer:discovery", (evt) => {
return void (async () => {
const peerId = evt.detail.id;
@ -191,7 +188,8 @@ describe("Static Sharding: Peer Management", function () {
});
});
await delay(1000);
await delay(10_000);
await pxPeersDiscoveredPromise;
expect(dialPeerSpy.callCount).to.equal(3);
});
});