make PeerManager use only ConnectionManager, move getPeers to ConnectionManager, remove not needed code

This commit is contained in:
Sasha 2024-10-20 22:54:36 +02:00
parent e897d5cd6f
commit e1813bc47b
No known key found for this signature in database
20 changed files with 73 additions and 683 deletions

View File

@ -15,11 +15,10 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";
export { ConnectionManager } from "./lib/connection_manager.js";
export { ConnectionManager } from "./lib/connection_manager/index.js";
export { getHealthManager } from "./lib/health_manager.js";
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager/index.js";
export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";

View File

@ -5,10 +5,8 @@ import type {
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";
import { getPeersForProtocol } from "@waku/utils/libp2p";
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";
/**
@ -23,7 +21,6 @@ export class BaseProtocol implements IBaseProtocolCore {
protected constructor(
public multicodec: string,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
@ -64,54 +61,4 @@ export class BaseProtocol implements IBaseProtocolCore {
return connections.length > 0;
});
}
/**
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();
// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);
// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.components.peerStore,
filteredPeers
);
if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}
if (sortedFilteredPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
);
}
return sortedFilteredPeers;
}
}

View File

@ -19,6 +19,7 @@ import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils";
import { Logger } from "@waku/utils";
import { KeepAliveManager } from "./keep_alive_manager.js";
import { getPeerPing } from "./utils.js";
const log = new Logger("connection-manager");
@ -180,6 +181,29 @@ export class ConnectionManager
);
}
public async getConnectedPeers(codec?: string): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();
if (peerIDs.length === 0) {
return [];
}
const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);
return peers
.filter((p) => !!p)
.filter((p) => (codec ? (p as Peer).protocols.includes(codec) : true))
.sort((left, right) => getPeerPing(left) - getPeerPing(right)) as Peer[];
}
private async dialPeerStorePeers(): Promise<void> {
const peerInfos = await this.libp2p.peerStore.all();
const dialPromises = [];

View File

@ -0,0 +1 @@
export { ConnectionManager } from "./connection_manager.js";

View File

@ -3,7 +3,7 @@ import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces";
import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { createEncoder } from "./message/version_0.js";
import { createEncoder } from "../message/version_0.js";
const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = new Logger("keep-alive");

View File

@ -0,0 +1,25 @@
import type { Peer } from "@libp2p/interface";
import { bytesToUtf8 } from "@waku/utils/bytes";
/**
* Reads peer's metadata and retrieves ping value.
* @param peer Peer or null
* @returns -1 if no ping attached, otherwise returns ping value
*/
export const getPeerPing = (peer: Peer | null): number => {
if (!peer) {
return -1;
}
try {
const bytes = peer.metadata.get("ping");
if (!bytes) {
return -1;
}
return Number(bytesToUtf8(bytes));
} catch (e) {
return -1;
}
};

View File

@ -40,7 +40,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics);
super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics);
libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {

View File

@ -1,144 +0,0 @@
import { Peer } from "@libp2p/interface";
import type { Tag } from "@libp2p/interface";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Tags } from "@waku/interfaces";
import { expect } from "chai";
import { filterPeersByDiscovery } from "./filterPeers.js";
describe("filterPeersByDiscovery function", function () {
it("should return all peers when numPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = filterPeersByDiscovery(mockPeers, 0, 10);
expect(result.length).to.deep.equal(mockPeers.length);
});
it("should return all non-bootstrap peers and no bootstrap peer when numPeers is 0 and maxBootstrapPeers is 0", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = filterPeersByDiscovery(mockPeers, 0, 0);
// result should have no bootstrap peers, and a total of 2 peers
expect(result.length).to.equal(2);
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.equal(0);
});
it("should return one bootstrap peer, and all non-boostrap peers, when numPeers is 0 & maxBootstrap is 1", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const peer5 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer5,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = filterPeersByDiscovery(mockPeers, 0, 1);
// result should have 1 bootstrap peers, and a total of 4 peers
expect(result.length).to.equal(4);
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.equal(1);
});
it("should return only bootstrap peers up to maxBootstrapPeers", async function () {
const peer1 = await createSecp256k1PeerId();
const peer2 = await createSecp256k1PeerId();
const peer3 = await createSecp256k1PeerId();
const peer4 = await createSecp256k1PeerId();
const peer5 = await createSecp256k1PeerId();
const mockPeers = [
{
id: peer1,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer2,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer3,
tags: new Map<string, Tag>([[Tags.BOOTSTRAP, { value: 100 }]])
},
{
id: peer4,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
},
{
id: peer5,
tags: new Map<string, Tag>([[Tags.PEER_EXCHANGE, { value: 100 }]])
}
] as unknown as Peer[];
const result = filterPeersByDiscovery(mockPeers, 5, 2);
// check that result has at least 2 bootstrap peers and no more than 5 peers
expect(result.length).to.be.at.least(2);
expect(result.length).to.be.at.most(5);
expect(result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length);
});
});

View File

@ -1,51 +0,0 @@
import { Peer } from "@libp2p/interface";
import { Tags } from "@waku/interfaces";
/**
* Retrieves a list of peers based on the specified criteria:
* 1. If numPeers is 0, return all peers
* 2. Bootstrap peers are prioritized
* 3. Non-bootstrap peers are randomly selected to fill up to numPeers
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned, irrespective of `maxBootstrapPeers`.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns An array of peers based on the specified criteria.
*/
export function filterPeersByDiscovery(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
): Peer[] {
// Collect the bootstrap peers up to the specified maximum
let bootstrapPeers = peers
.filter((peer) => peer.tags.has(Tags.BOOTSTRAP))
.slice(0, maxBootstrapPeers);
// If numPeers is less than the number of bootstrap peers, adjust the bootstrapPeers array
if (numPeers > 0 && numPeers < bootstrapPeers.length) {
bootstrapPeers = bootstrapPeers.slice(0, numPeers);
}
// Collect non-bootstrap peers
const nonBootstrapPeers = peers.filter(
(peer) => !peer.tags.has(Tags.BOOTSTRAP)
);
// If numPeers is 0, return all peers
if (numPeers === 0) {
return [...bootstrapPeers, ...nonBootstrapPeers];
}
// Initialize the list of selected peers with the bootstrap peers
const selectedPeers: Peer[] = [...bootstrapPeers];
// Fill up to numPeers with remaining random peers if needed
while (selectedPeers.length < numPeers && nonBootstrapPeers.length > 0) {
const randomIndex = Math.floor(Math.random() * nonBootstrapPeers.length);
const randomPeer = nonBootstrapPeers.splice(randomIndex, 1)[0];
selectedPeers.push(randomPeer);
}
return selectedPeers;
}

View File

@ -37,7 +37,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(LightPushCodec, libp2p.components, log, pubsubTopics);
super(LightPushCodec, libp2p.components, pubsubTopics);
}
private async preparePushMessage(

View File

@ -30,7 +30,7 @@ class Metadata extends BaseProtocol implements IMetadata {
public pubsubTopics: PubsubTopic[],
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, log, pubsubTopics);
super(MetadataCodec, libp2p.components, pubsubTopics);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);

View File

@ -32,7 +32,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(StoreCodec, libp2p.components, log, pubsubTopics);
super(StoreCodec, libp2p.components, pubsubTopics);
}
public async *queryPerPage<T extends IDecodedMessage>(

View File

@ -32,7 +32,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
components: Libp2pComponents,
pubsubTopics: PubsubTopic[]
) {
super(PeerExchangeCodec, components, log, pubsubTopics);
super(PeerExchangeCodec, components, pubsubTopics);
}
/**

View File

@ -86,6 +86,7 @@ export interface IConnectionStateEvents {
export interface IConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
pubsubTopics: PubsubTopic[];
getConnectedPeers(codec?: string): Promise<Peer[]>;
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;

View File

@ -33,7 +33,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
this.peerManager = new PeerManager(connectionManager, core);
this.peerManager = new PeerManager(connectionManager);
this.log.info(
`Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms`

View File

@ -36,8 +36,8 @@ export class LightPush implements ILightPush {
public readonly protocol: LightPushCore;
public constructor(
connectionManager: ConnectionManager,
private libp2p: Libp2p,
private connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
@ -147,26 +147,9 @@ export class LightPush implements ILightPush {
}
private async getConnectedPeers(): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();
if (peerIDs.length === 0) {
return [];
}
const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);
return peers
.filter((p) => !!p)
.filter((p) => (p as Peer).protocols.includes(LightPushCodec))
.slice(0, this.numPeersToUse) as Peer[];
const peers =
await this.connectionManager.getConnectedPeers(LightPushCodec);
return peers.slice(0, this.numPeersToUse);
}
}

View File

@ -1,7 +1,5 @@
import { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IHealthManager } from "@waku/interfaces";
import { ConnectionManager } from "@waku/core";
import { Logger } from "@waku/utils";
import { Mutex } from "async-mutex";
@ -9,19 +7,12 @@ const log = new Logger("peer-manager");
export class PeerManager {
private peers: Map<string, Peer> = new Map();
private healthManager: IHealthManager;
private readMutex = new Mutex();
private writeMutex = new Mutex();
private writeLockHolder: string | null = null;
public constructor(
private readonly connectionManager: ConnectionManager,
private readonly core: BaseProtocol
) {
this.healthManager = getHealthManager();
this.healthManager.updateProtocolHealth(this.core.multicodec, 0);
}
public constructor(private readonly connectionManager: ConnectionManager) {}
public getWriteLockHolder(): string | null {
return this.writeLockHolder;
@ -37,10 +28,6 @@ export class PeerManager {
await this.connectionManager.attemptDial(peer.id);
this.peers.set(peer.id.toString(), peer);
log.info(`Added and dialed peer: ${peer.id.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null;
});
}
@ -50,10 +37,6 @@ export class PeerManager {
this.writeLockHolder = `removePeer: ${peerId.toString()}`;
this.peers.delete(peerId.toString());
log.info(`Removed peer: ${peerId.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null;
});
}
@ -92,7 +75,7 @@ export class PeerManager {
* @param numPeers The number of peers to find.
*/
public async findPeers(numPeers: number): Promise<Peer[]> {
const connectedPeers = await this.core.getPeers();
const connectedPeers = await this.connectionManager.getConnectedPeers();
return this.readMutex.runExclusive(async () => {
const newPeers = connectedPeers

View File

@ -58,12 +58,11 @@ export class Store extends BaseProtocolSDK implements IStore {
...options
};
const peer = (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
})
)[0];
const peers = await this.connectionManager.getConnectedPeers(
this.core.multicodec
);
const peer = peers[0];
if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");

View File

@ -1,339 +0,0 @@
import type { Connection, Peer, PeerStore } from "@libp2p/interface";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createLightNode,
Libp2pComponents,
type LightNode,
Tags,
utf8ToBytes
} from "@waku/sdk";
import { encodeRelayShard } from "@waku/utils";
import { expect } from "chai";
import fc from "fast-check";
import Sinon from "sinon";
import {
afterEachCustom,
beforeEachCustom,
DefaultTestShardInfo
} from "../src/index.js";
describe("getPeers", function () {
let peerStore: PeerStore;
let connectionManager: Libp2pComponents["connectionManager"];
let waku: LightNode;
const lowPingBytes = utf8ToBytes("50");
const midPingBytes = utf8ToBytes("100");
const highPingBytes = utf8ToBytes("200");
let lowPingBootstrapPeer: Peer,
lowPingNonBootstrapPeer: Peer,
midPingBootstrapPeer: Peer,
midPingNonBootstrapPeer: Peer,
highPingBootstrapPeer: Peer,
highPingNonBootstrapPeer: Peer,
differentCodecPeer: Peer,
anotherDifferentCodecPeer: Peer;
let bootstrapPeers: Peer[];
let nonBootstrapPeers: Peer[];
let allPeers: Peer[];
beforeEachCustom(this, async () => {
waku = await createLightNode({ networkConfig: DefaultTestShardInfo });
peerStore = waku.libp2p.peerStore;
connectionManager = waku.libp2p.components.connectionManager;
const [
lowPingBootstrapPeerId,
lowPingNonBootstrapPeerId,
midPingBootstrapPeerId,
midPingNonBootstrapPeerId,
highPingBootstrapPeerId,
highPingNonBootstrapPeerId,
differentCodecPeerId,
anotherDifferentCodecPeerId
] = await Promise.all([
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId()
]);
lowPingBootstrapPeer = {
id: lowPingBootstrapPeerId,
protocols: [waku.lightPush.protocol.multicodec],
metadata: new Map().set("ping", lowPingBytes),
tags: new Map().set(Tags.BOOTSTRAP, {})
} as Peer;
lowPingNonBootstrapPeer = {
id: lowPingNonBootstrapPeerId,
protocols: [waku.lightPush.protocol.multicodec],
metadata: new Map().set("ping", lowPingBytes),
tags: new Map().set(Tags.PEER_EXCHANGE, {})
} as Peer;
midPingBootstrapPeer = {
id: midPingBootstrapPeerId,
protocols: [waku.lightPush.protocol.multicodec],
metadata: new Map().set("ping", midPingBytes),
tags: new Map().set(Tags.BOOTSTRAP, {})
} as Peer;
midPingNonBootstrapPeer = {
id: midPingNonBootstrapPeerId,
protocols: [waku.lightPush.protocol.multicodec],
metadata: new Map().set("ping", midPingBytes),
tags: new Map().set(Tags.PEER_EXCHANGE, {})
} as Peer;
highPingBootstrapPeer = {
id: highPingBootstrapPeerId,
protocols: [waku.lightPush.protocol.multicodec],
metadata: new Map().set("ping", highPingBytes),
tags: new Map().set(Tags.BOOTSTRAP, {})
} as Peer;
highPingNonBootstrapPeer = {
id: highPingNonBootstrapPeerId,
protocols: [waku.lightPush.protocol.multicodec],
metadata: new Map().set("ping", highPingBytes),
tags: new Map().set(Tags.PEER_EXCHANGE, {})
} as Peer;
differentCodecPeer = {
id: differentCodecPeerId,
protocols: ["different/1"],
metadata: new Map().set("ping", lowPingBytes),
tags: new Map().set(Tags.BOOTSTRAP, {})
} as Peer;
anotherDifferentCodecPeer = {
id: anotherDifferentCodecPeerId,
protocols: ["different/2"],
metadata: new Map().set("ping", lowPingBytes),
tags: new Map().set(Tags.BOOTSTRAP, {})
} as Peer;
bootstrapPeers = [
lowPingBootstrapPeer,
midPingBootstrapPeer,
highPingBootstrapPeer
];
nonBootstrapPeers = [
lowPingNonBootstrapPeer,
midPingNonBootstrapPeer,
highPingNonBootstrapPeer
];
allPeers = [
...bootstrapPeers,
...nonBootstrapPeers,
differentCodecPeer,
anotherDifferentCodecPeer
];
allPeers.forEach((peer) => {
peer.metadata.set("shardInfo", encodeRelayShard(DefaultTestShardInfo));
});
Sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return allPeers.find((peer) => peer.id.equals(peerId))!;
});
Sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of allPeers) {
callback(peer);
}
});
// assume all peers have an opened connection
Sinon.stub(connectionManager, "getConnections").callsFake(() => {
const connections: Connection[] = [];
for (const peer of allPeers) {
connections.push({
status: "open",
remotePeer: peer.id,
streams: [{ protocol: waku.lightPush.protocol.multicodec }]
} as unknown as Connection);
}
return connections;
});
});
afterEachCustom(this, async () => {
Sinon.restore();
});
describe("getPeers with varying maxBootstrapPeers", function () {
const maxBootstrapPeersValues = [1, 2, 3, 4, 5, 6, 7];
maxBootstrapPeersValues.forEach((maxBootstrapPeers) => {
describe(`maxBootstrapPeers=${maxBootstrapPeers}`, function () {
it(`numPeers=1 -- returns one bootstrap peer `, async function () {
const result = (await (waku.lightPush.protocol as any).getPeers({
numPeers: 1,
maxBootstrapPeers
})) as Peer[];
// Should only have 1 peer
expect(result).to.have.lengthOf(1);
// The peer should be a bootstrap peer
expect(result[0].tags.has(Tags.BOOTSTRAP)).to.be.true;
// Peer should be of the same protocol
expect(
result[0].protocols.includes(waku.lightPush.protocol.multicodec)
).to.be.true;
// Peer should have the lowest ping
expect(result[0].metadata.get("ping")).to.equal(lowPingBytes);
});
it(`numPeers=2 -- returns total 2 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () {
const result = (await (waku.lightPush.protocol as any).getPeers({
numPeers: 2,
maxBootstrapPeers
})) as Peer[];
// Should only have 2 peers
expect(result).to.have.lengthOf(2);
// Should only have ${maxBootstrapPeers} bootstrap peers
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.be.lessThanOrEqual(maxBootstrapPeers);
// Should return peers with the same protocol
expect(
result.every((peer: Peer) =>
peer.protocols.includes(waku.lightPush.protocol.multicodec)
)
).to.be.true;
// All peers should be sorted by latency
// 0th index should be the lowest ping of all peers returned
expect(result[0].metadata.get("ping")).to.equal(lowPingBytes);
});
it(`numPeers=3 -- returns total 3 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () {
const result = (await (waku.lightPush.protocol as any).getPeers({
numPeers: 3,
maxBootstrapPeers
})) as Peer[];
// Should only have 3 peers
expect(result).to.have.lengthOf(3);
// Should only have ${maxBootstrapPeers} bootstrap peers
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.be.lessThanOrEqual(maxBootstrapPeers);
// Should return peers with the same protocol
expect(
result.every((peer: Peer) =>
peer.protocols.includes(waku.lightPush.protocol.multicodec)
)
).to.be.true;
// All peers should be sorted by latency
// 0th index should be the lowest ping of all peers returned
expect(result[0].metadata.get("ping")).to.equal(lowPingBytes);
});
it(`numPeers=4 -- returns total 4 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () {
const result = (await (waku.lightPush.protocol as any).getPeers({
numPeers: 4,
maxBootstrapPeers
})) as Peer[];
// Should only have 4 peers
expect(result).to.have.lengthOf(4);
// Should only have ${maxBootstrapPeers} bootstrap peers
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.be.lessThanOrEqual(maxBootstrapPeers);
// Should return peers with the same protocol
expect(
result.every((peer: Peer) =>
peer.protocols.includes(waku.lightPush.protocol.multicodec)
)
).to.be.true;
// All peers should be sorted by latency
// 0th index should be the lowest ping of all peers returned
expect(result[0].metadata.get("ping")).to.equal(lowPingBytes);
});
it(`numPeers=0 -- returns all peers including all non-bootstrap with maxBootstrapPeers: ${maxBootstrapPeers}`, async function () {
const result = (await (waku.lightPush.protocol as any).getPeers({
numPeers: 0,
maxBootstrapPeers
})) as Peer[];
// Should have all non-bootstrap peers + ${maxBootstrapPeers} bootstrap peers
// Unless bootstrapPeers.length < maxBootstrapPeers
// Then it should be all non-bootstrap peers + bootstrapPeers.length
if (maxBootstrapPeers > bootstrapPeers.length) {
expect(result).to.have.lengthOf(
nonBootstrapPeers.length + bootstrapPeers.length
);
} else {
expect(result).to.have.lengthOf(
nonBootstrapPeers.length + maxBootstrapPeers
);
}
// All peers should be bootstrap peers
expect(
result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length
).to.be.lessThanOrEqual(maxBootstrapPeers);
// Peers should be of the same protocol
expect(
result.every((peer: Peer) =>
peer.protocols.includes(waku.lightPush.protocol.multicodec)
)
).to.be.true;
// All peers returned should be sorted by latency
// 0th index should be the lowest ping of all peers returned
expect(result[0].metadata.get("ping")).to.equal(lowPingBytes);
});
});
});
});
describe("getPeers property-based tests", function () {
it("should return the correct number of peers based on numPeers and maxBootstrapPeers", async function () {
await fc.assert(
fc.asyncProperty(
//max bootstrap peers
fc.integer({ min: 1, max: 100 }),
//numPeers
fc.integer({ min: 0, max: 100 }),
async (maxBootstrapPeers, numPeers) => {
const result = (await (waku.lightPush.protocol as any).getPeers({
numPeers,
maxBootstrapPeers
})) as Peer[];
if (numPeers === 0) {
// Expect all peers when numPeers is 0
expect(result.length).to.be.greaterThanOrEqual(1);
} else {
// Expect up to numPeers peers
expect(result.length).to.be.lessThanOrEqual(numPeers);
}
}
),
{
verbose: true
}
);
});
});
});

View File

@ -1,7 +1,5 @@
import type { Peer, PeerStore } from "@libp2p/interface";
import { bytesToUtf8 } from "../bytes/index.js";
/**
* Returns a pseudo-random peer that supports the given protocol.
* Useful for protocols such as store and light push
@ -13,42 +11,6 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined {
return peers[index];
}
/**
* Function to sort peers by latency from lowest to highest
* @param peerStore - The Libp2p PeerStore
* @param peers - The list of peers to choose from
* @returns Sorted array of peers by latency
*/
export async function sortPeersByLatency(
peerStore: PeerStore,
peers: Peer[]
): Promise<Peer[]> {
if (peers.length === 0) return [];
const results = await Promise.all(
peers.map(async (peer) => {
try {
const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping");
if (!pingBytes) return { peer, ping: Infinity };
const ping = Number(bytesToUtf8(pingBytes));
return { peer, ping };
} catch (error) {
return { peer, ping: Infinity };
}
})
);
// filter out null values
const validResults = results.filter(
(result): result is { peer: Peer; ping: number } => result !== null
);
return validResults
.sort((a, b) => a.ping - b.ping)
.map((result) => result.peer);
}
/**
* Returns the list of peers that supports the given protocol.
*/