feat(filter): enhancing protocol peer management with mutex locks (#2137)

* chore: improvements

* chore: add logs for subscription maintenance

* chore: update logging

* chore: trimming down BaseProtocolCore

* chore: track peers in a hashmap instead of array

* chore: peer mgmt responds to conenction/disconnection and improve logging

* feat: add mutex locks to tackle race conditions over shared state

* fix: build

* chore: some mutex lock-release improvements

* feat: peer manager

* chore: rm tests for remove internal util

* chore: update HealthManager updates

* chore: update tests

* rm: only

* fix: hasPeers management

* chore: add modularity to getting connected peers

* chore: improve logs & add debug

* chore: renewal doesnt disconnect, only removes

* chore: await for sequential operations

* chore: add TODO

* chore: minor improvements

* chore: fix rebase

* chore: update playright

* chore: remove additional arg

* chore: update interafce

* feat(peer-manager): unit tests

* chore: improve hasPeers()

* chore: update lockfile

* feat: Filter reacts to peer:disconnect event, add tests

* chore: fix lock

* chore: update playright

* chore: update protocol health for lightpush

* chore: remove .only

* chore: address comments and improvements

* fix: tsconfig
This commit is contained in:
Danish Arora 2024-10-11 03:17:12 +05:30 committed by GitHub
parent 75fcca4cd9
commit b2efce5ec2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 4787 additions and 3745 deletions

View File

@ -20,7 +20,7 @@ jobs:
timeout-minutes: 60 timeout-minutes: 60
runs-on: ubuntu-latest runs-on: ubuntu-latest
container: container:
image: mcr.microsoft.com/playwright:v1.46.0-jammy image: mcr.microsoft.com/playwright:v1.48.0-jammy
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- uses: actions/setup-node@v3 - uses: actions/setup-node@v3

7274
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +1,12 @@
import type { Libp2p } from "@libp2p/interface"; import type { Libp2p } from "@libp2p/interface";
import type { Peer, PeerStore, Stream } from "@libp2p/interface"; import type { Peer, Stream } from "@libp2p/interface";
import type { import type {
IBaseProtocolCore, IBaseProtocolCore,
Libp2pComponents, Libp2pComponents,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils"; import { Logger } from "@waku/utils";
import { import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
sortPeersByLatency
} from "@waku/utils/libp2p";
import { filterPeersByDiscovery } from "./filterPeers.js"; import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js"; import { StreamManager } from "./stream_manager/index.js";
@ -26,7 +22,7 @@ export class BaseProtocol implements IBaseProtocolCore {
protected constructor( protected constructor(
public multicodec: string, public multicodec: string,
private components: Libp2pComponents, protected components: Libp2pComponents,
private log: Logger, private log: Logger,
public readonly pubsubTopics: PubsubTopic[] public readonly pubsubTopics: PubsubTopic[]
) { ) {
@ -50,25 +46,22 @@ export class BaseProtocol implements IBaseProtocolCore {
return this.streamManager.getStream(peer); return this.streamManager.getStream(peer);
} }
public get peerStore(): PeerStore {
return this.components.peerStore;
}
/** /**
* Returns known peers from the address book (`libp2p.peerStore`) that support * Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these * the class protocol. Waku may or may not be currently connected to these
* peers. * peers.
*/ */
public async allPeers(): Promise<Peer[]> { public async allPeers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]); return getPeersForProtocol(this.components.peerStore, [this.multicodec]);
} }
public async connectedPeers(): Promise<Peer[]> { public async connectedPeers(): Promise<Peer[]> {
const peers = await this.allPeers(); const peers = await this.allPeers();
return peers.filter((peer) => { return peers.filter((peer) => {
return ( const connections = this.components.connectionManager.getConnections(
this.components.connectionManager.getConnections(peer.id).length > 0 peer.id
); );
return connections.length > 0;
}); });
} }
@ -77,8 +70,7 @@ export class BaseProtocol implements IBaseProtocolCore {
* *
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
* @returns A list of peers that support the protocol sorted by latency.
*/ */
public async getPeers( public async getPeers(
{ {
@ -88,29 +80,23 @@ export class BaseProtocol implements IBaseProtocolCore {
numPeers: number; numPeers: number;
maxBootstrapPeers: number; maxBootstrapPeers: number;
} = { } = {
maxBootstrapPeers: 1, maxBootstrapPeers: 0,
numPeers: 0 numPeers: 0
} }
): Promise<Peer[]> { ): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured) // Retrieve all connected peers that support the protocol & shard (if configured)
const connectedPeersForProtocolAndShard = const allAvailableConnectedPeers = await this.connectedPeers();
await getConnectedPeersForProtocolAndShard(
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec],
pubsubTopicsToShardInfo(this.pubsubTopics)
);
// Filter the peers based on discovery & number of peers requested // Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery( const filteredPeers = filterPeersByDiscovery(
connectedPeersForProtocolAndShard, allAvailableConnectedPeers,
numPeers, numPeers,
maxBootstrapPeers maxBootstrapPeers
); );
// Sort the peers by latency // Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency( const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore, this.components.peerStore,
filteredPeers filteredPeers
); );

View File

@ -301,8 +301,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
() => { () => {
log.info("Receiving pipe closed."); log.info("Receiving pipe closed.");
}, },
(e) => { async (e) => {
log.error("Error with receiving pipe", e); log.error(
`Error with receiving pipe on peer:${connection.remotePeer.toString()} -- stream:${stream.id} -- protocol:${stream.protocol}: `,
e
);
} }
); );
} catch (e) { } catch (e) {

View File

@ -45,7 +45,7 @@ class Metadata extends BaseProtocol implements IMetadata {
pubsubTopicsToShardInfo(this.pubsubTopics) pubsubTopicsToShardInfo(this.pubsubTopics)
); );
const peer = await this.peerStore.get(peerId); const peer = await this.libp2pComponents.peerStore.get(peerId);
if (!peer) { if (!peer) {
return { return {
shardInfo: null, shardInfo: null,

View File

@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers) numPeers: BigInt(numPeers)
}); });
const peer = await this.peerStore.get(peerId); const peer = await this.components.peerStore.get(peerId);
if (!peer) { if (!peer) {
return { return {
peerInfos: null, peerInfos: null,

View File

@ -1,6 +1,6 @@
import type { Libp2p } from "@libp2p/interface"; import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface"; import type { Peer } from "@libp2p/interface";
import type { CreateLibp2pOptions } from "./libp2p.js"; import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js"; import type { IDecodedMessage } from "./message.js";
@ -16,7 +16,6 @@ export enum Protocols {
export type IBaseProtocolCore = { export type IBaseProtocolCore = {
multicodec: string; multicodec: string;
peerStore: PeerStore;
allPeers: () => Promise<Peer[]>; allPeers: () => Promise<Peer[]>;
connectedPeers: () => Promise<Peer[]>; connectedPeers: () => Promise<Peer[]>;
addLibp2pEventListener: Libp2p["addEventListener"]; addLibp2pEventListener: Libp2p["addEventListener"];
@ -25,7 +24,7 @@ export type IBaseProtocolCore = {
export type IBaseProtocolSDK = { export type IBaseProtocolSDK = {
readonly connectedPeers: Peer[]; readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>; renewPeer: (peerToDisconnect: PeerId) => Promise<Peer | undefined>;
readonly numPeersToUse: number; readonly numPeersToUse: number;
}; };
@ -36,10 +35,6 @@ export type NetworkConfig = StaticSharding | AutoSharding;
* Options for using LightPush and Filter * Options for using LightPush and Filter
*/ */
export type ProtocolUseOptions = { export type ProtocolUseOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/** /**
* Optional flag to force using all available peers * Optional flag to force using all available peers
*/ */
@ -48,14 +43,6 @@ export type ProtocolUseOptions = {
* Optional maximum number of attempts for exponential backoff * Optional maximum number of attempts for exponential backoff
*/ */
maxAttempts?: number; maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
}; };
export type ProtocolCreateOptions = { export type ProtocolCreateOptions = {

View File

@ -61,7 +61,7 @@
}, },
"dependencies": { "dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0", "@chainsafe/libp2p-noise": "^15.1.0",
"@libp2p/bootstrap": "^10.1.2", "@libp2p/bootstrap": "^10",
"@libp2p/identify": "^2.1.2", "@libp2p/identify": "^2.1.2",
"@libp2p/mplex": "^10.1.2", "@libp2p/mplex": "^10.1.2",
"@libp2p/ping": "^1.1.2", "@libp2p/ping": "^1.1.2",
@ -73,23 +73,24 @@
"@waku/proto": "^0.0.8", "@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20", "@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16", "@waku/message-hash": "0.1.16",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1" "libp2p": "^1.8.1"
}, },
"devDependencies": { "devDependencies": {
"@types/mocha": "^10.0.6",
"@types/chai": "^4.3.11", "@types/chai": "^4.3.11",
"@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0", "@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3", "@rollup/plugin-node-resolve": "^15.2.3",
"@rollup/plugin-replace": "^5.0.5", "@rollup/plugin-replace": "^5.0.5",
"@types/mocha": "^10.0.9",
"@waku/build-utils": "*", "@waku/build-utils": "*",
"mocha": "^10.3.0", "chai": "^5.1.1",
"sinon": "^18.0.0",
"chai": "^4.3.10",
"cspell": "^8.6.1", "cspell": "^8.6.1",
"interface-datastore": "^8.2.10", "interface-datastore": "^8.2.10",
"mocha": "^10.7.3",
"npm-run-all": "^4.1.5", "npm-run-all": "^4.1.5",
"rollup": "^4.12.0" "rollup": "^4.12.0",
"sinon": "^19.0.2"
}, },
"peerDependencies": { "peerDependencies": {
"@libp2p/bootstrap": "^10" "@libp2p/bootstrap": "^10"

View File

@ -1,36 +1,27 @@
import type { Peer, PeerId } from "@libp2p/interface"; import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, getHealthManager } from "@waku/core"; import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
IBaseProtocolSDK, import { Logger } from "@waku/utils";
IHealthManager,
ProtocolUseOptions import { PeerManager } from "./peer_manager.js";
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
interface Options { interface Options {
numPeersToUse?: number; numPeersToUse?: number;
maintainPeersInterval?: number; maintainPeersInterval?: number;
} }
const RENEW_TIME_LOCK_DURATION = 30 * 1000; const DEFAULT_NUM_PEERS_TO_USE = 2;
export const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
export class BaseProtocolSDK implements IBaseProtocolSDK { export class BaseProtocolSDK implements IBaseProtocolSDK {
protected healthManager: IHealthManager; private peerManager: PeerManager;
public readonly numPeersToUse: number; public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType< private maintainPeersIntervalId: ReturnType<
typeof window.setInterval typeof window.setInterval
> | null = null; > | null = null;
private log: Logger; private log: Logger;
private maintainPeersLock = false;
private readonly renewPeersLocker = new RenewPeerLocker(
RENEW_TIME_LOCK_DURATION
);
public constructor( public constructor(
protected core: BaseProtocol, protected core: BaseProtocol,
protected connectionManager: ConnectionManager, protected connectionManager: ConnectionManager,
@ -38,17 +29,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
) { ) {
this.log = new Logger(`sdk:${core.multicodec}`); this.log = new Logger(`sdk:${core.multicodec}`);
this.healthManager = getHealthManager(); this.peerManager = new PeerManager(connectionManager, core, this.log);
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval = const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
this.log.info(
`Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms`
);
void this.startMaintainPeersInterval(maintainPeersInterval); void this.startMaintainPeersInterval(maintainPeersInterval);
} }
public get connectedPeers(): Peer[] { public get connectedPeers(): Peer[] {
return this.peers; return this.peerManager.getPeers();
} }
/** /**
@ -56,27 +50,23 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param peerToDisconnect The peer to disconnect from. * @param peerToDisconnect The peer to disconnect from.
* @returns The new peer that was found and connected to. * @returns The new peer that was found and connected to.
*/ */
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> { public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
this.log.info(`Renewing peer ${peerToDisconnect}`); this.log.info(`Attempting to renew peer ${peerToDisconnect}`);
const peer = (await this.findAndAddPeers(1))[0]; const newPeer = await this.peerManager.findPeers(1);
if (!peer) { if (newPeer.length === 0) {
throw Error("Failed to find a new peer to replace the disconnected one."); this.log.error(
"Failed to find a new peer to replace the disconnected one"
);
return undefined;
} }
const updatedPeers = this.peers.filter( await this.peerManager.removePeer(peerToDisconnect);
(peer) => !peer.id.equals(peerToDisconnect) await this.peerManager.addPeer(newPeer[0]);
);
this.updatePeers(updatedPeers);
await this.connectionManager.dropConnection(peerToDisconnect);
this.log.info( this.log.info(`Successfully renewed peer. New peer: ${newPeer[0].id}`);
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
this.renewPeersLocker.lock(peerToDisconnect); return newPeer[0];
return peer;
} }
/** /**
@ -87,75 +77,76 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
clearInterval(this.maintainPeersIntervalId); clearInterval(this.maintainPeersIntervalId);
this.maintainPeersIntervalId = null; this.maintainPeersIntervalId = null;
this.log.info("Maintain peers interval stopped"); this.log.info("Maintain peers interval stopped");
} else {
this.log.info("Maintain peers interval was not running");
} }
} }
/** /**
* Checks if there are peers to send a message to. * Checks if there are sufficient peers to send a message to.
* If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`. * If `forceUseAllPeers` is `false` (default), returns `true` if there are any connected peers.
* If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager. * If `forceUseAllPeers` is `true`, attempts to connect to `numPeersToUse` peers.
* If `autoRetry` is `false`, returns `false` if no peers are found.
* If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff.
* Returns `true` if peers are found, `false` otherwise.
* @param options Optional options object * @param options Optional options object
* @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) * @param options.forceUseAllPeers Optional flag to force connecting to `numPeersToUse` peers (default: false)
* @param options.forceUseAllPeers Optional flag to force using all available peers (default: false) * @param options.maxAttempts Optional maximum number of attempts to reach the required number of peers (default: 3)
* @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10) * @returns `true` if the required number of peers are connected, `false` otherwise
* @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3)
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
*/ */
protected hasPeers = async ( protected async hasPeers(
options: Partial<ProtocolUseOptions> = {} options: Partial<ProtocolUseOptions> = {}
): Promise<boolean> => { ): Promise<boolean> {
const { const { forceUseAllPeers = false, maxAttempts = 3 } = options;
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; this.log.info(
`Checking for peers. forceUseAllPeers: ${forceUseAllPeers}, maxAttempts: ${maxAttempts}`
let attempts = 0; );
while (attempts < maxAttempts) {
attempts++; for (let attempts = 0; attempts < maxAttempts; attempts++) {
if (await this.maintainPeers()) { this.log.info(
if (this.peers.length < this.numPeersToUse) { `Attempt ${attempts + 1}/${maxAttempts} to reach required number of peers`
this.log.warn( );
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` await this.maintainPeers();
if (!forceUseAllPeers && this.connectedPeers.length > 0) {
this.log.info(
`At least one peer connected (${this.connectedPeers.length}), not forcing use of all peers`
); );
}
return true; return true;
} }
if (!autoRetry) return false;
const delayMs = Math.min( if (this.connectedPeers.length >= this.numPeersToUse) {
initialDelay * Math.pow(2, attempts - 1), this.log.info(
maxDelay `Required number of peers (${this.numPeersToUse}) reached`
); );
await delay(delayMs); return true;
} }
this.log.error("Failed to find peers to send message to"); this.log.warn(
`Found only ${this.connectedPeers.length}/${this.numPeersToUse} required peers. Retrying...`
);
}
this.log.error(
`Failed to find required number of peers (${this.numPeersToUse}) after ${maxAttempts} attempts`
);
return false; return false;
}; }
/** /**
* Starts an interval to maintain the peers list to `numPeersToUse`. * Starts an interval to maintain the peers list to `numPeersToUse`.
* @param interval The interval in milliseconds to maintain the peers. * @param interval The interval in milliseconds to maintain the peers.
*/ */
private async startMaintainPeersInterval(interval: number): Promise<void> { private async startMaintainPeersInterval(interval: number): Promise<void> {
this.log.info("Starting maintain peers interval"); this.log.info(
`Starting maintain peers interval with ${interval}ms interval`
);
try { try {
await this.maintainPeers();
this.maintainPeersIntervalId = setInterval(() => { this.maintainPeersIntervalId = setInterval(() => {
this.log.info("Running scheduled peer maintenance");
this.maintainPeers().catch((error) => { this.maintainPeers().catch((error) => {
this.log.error("Error during maintain peers interval:", error); this.log.error("Error during scheduled peer maintenance:", error);
}); });
}, interval); }, interval);
this.log.info( this.log.info("Maintain peers interval started successfully");
`Maintain peers interval started with interval ${interval}ms`
);
} catch (error) { } catch (error) {
this.log.error("Error starting maintain peers interval:", error); this.log.error("Error starting maintain peers interval:", error);
throw error; throw error;
@ -165,122 +156,36 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
/** /**
* Maintains the peers list to `numPeersToUse`. * Maintains the peers list to `numPeersToUse`.
*/ */
private async maintainPeers(): Promise<boolean> { private async maintainPeers(): Promise<void> {
if (this.maintainPeersLock) { try {
return false; const currentPeerCount = await this.peerManager.getPeerCount();
const numPeersToAdd = this.numPeersToUse - currentPeerCount;
this.log.info(
`Current peer count: ${currentPeerCount}, target: ${this.numPeersToUse}`
);
if (numPeersToAdd === 0) {
this.log.info("Peer count is at target, no maintenance required");
return;
} }
this.maintainPeersLock = true;
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
try {
const numPeersToAdd = this.numPeersToUse - this.peers.length;
if (numPeersToAdd > 0) { if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd); this.log.info(`Attempting to add ${numPeersToAdd} peer(s)`);
} await this.peerManager.findAndAddPeers(numPeersToAdd);
} else {
this.log.info( this.log.info(
`Peer maintenance completed, current count: ${this.peers.length}` `Attempting to remove ${Math.abs(numPeersToAdd)} excess peer(s)`
); );
this.renewPeersLocker.cleanUnlocked(); await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd));
} finally {
this.maintainPeersLock = false;
}
return true;
} }
/** const finalPeerCount = await this.peerManager.getPeerCount();
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);
await Promise.all(dials);
const updatedPeers = [...this.peers, ...additionalPeers];
this.updatePeers(updatedPeers);
this.log.info( this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` `Peer maintenance completed. Initial count: ${currentPeerCount}, Final count: ${finalPeerCount}`
); );
return additionalPeers;
} catch (error) { } catch (error) {
this.log.error("Error finding and adding new peers:", error); this.log.error("Error during peer maintenance", { error });
throw error;
} }
} }
/**
* Finds additional peers.
* Attempts to find peers without using bootstrap peers first,
* If no peers are found,
* tries with bootstrap peers.
* @param numPeers The number of peers to find.
*/
private async findAdditionalPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding ${numPeers} additional peers`);
try {
let newPeers = await this.core.allPeers();
if (newPeers.length === 0) {
this.log.warn("No new peers found.");
}
newPeers = newPeers
.filter(
(peer) => this.peers.some((p) => p.id.equals(peer.id)) === false
)
.filter((peer) => !this.renewPeersLocker.isLocked(peer.id))
.slice(0, numPeers);
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);
throw error;
}
}
private updatePeers(peers: Peer[]): void {
this.peers = peers;
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.length
);
}
}
class RenewPeerLocker {
private readonly peers: Map<string, number> = new Map();
public constructor(private lockDuration: number) {}
public lock(id: PeerId): void {
this.peers.set(id.toString(), Date.now());
}
public isLocked(id: PeerId): boolean {
const time = this.peers.get(id.toString());
if (time && !this.isTimeUnlocked(time)) {
return true;
}
return false;
}
public cleanUnlocked(): void {
Object.entries(this.peers).forEach(([id, lock]) => {
if (this.isTimeUnlocked(lock)) {
this.peers.delete(id.toString());
}
});
}
private isTimeUnlocked(time: number): boolean {
return Date.now() - time >= this.lockDuration;
}
} }

View File

@ -1,4 +1,4 @@
export const DEFAULT_KEEP_ALIVE = 30 * 1000; export const DEFAULT_KEEP_ALIVE = 10_000;
export const DEFAULT_SUBSCRIBE_OPTIONS = { export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE keepAlive: DEFAULT_KEEP_ALIVE

View File

@ -51,9 +51,9 @@ class Filter extends BaseProtocolSDK implements IFilter {
); );
return; return;
} }
await subscription.processIncomingMessage(wakuMessage, peerIdStr); await subscription.processIncomingMessage(wakuMessage, peerIdStr);
}, },
connectionManager.configuredPubsubTopics, connectionManager.configuredPubsubTopics,
libp2p libp2p
), ),

View File

@ -41,7 +41,9 @@ export class SubscriptionManager implements ISubscription {
private readonly protocol: FilterCore, private readonly protocol: FilterCore,
private readonly connectionManager: ConnectionManager, private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[], private readonly getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer> private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>
) { ) {
this.pubsubTopic = pubsubTopic; this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map(); this.subscriptionCallbacks = new Map();
@ -51,7 +53,8 @@ export class SubscriptionManager implements ISubscription {
this.getPeers.bind(this), this.getPeers.bind(this),
this.renewPeer.bind(this), this.renewPeer.bind(this),
() => Array.from(this.subscriptionCallbacks.keys()), () => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol) this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol)
); );
} }
@ -142,6 +145,7 @@ export class SubscriptionManager implements ISubscription {
} }
public async ping(peerId?: PeerId): Promise<SDKProtocolResult> { public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
log.info("Sending keep-alive ping");
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id); const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);
const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
@ -251,11 +255,13 @@ export class SubscriptionManager implements ISubscription {
} }
private startSubscriptionsMaintenance(interval: number): void { private startSubscriptionsMaintenance(interval: number): void {
log.info("Starting subscriptions maintenance");
this.startKeepAlivePings(interval); this.startKeepAlivePings(interval);
this.startConnectionListener(); this.startConnectionListener();
} }
private stopSubscriptionsMaintenance(): void { private stopSubscriptionsMaintenance(): void {
log.info("Stopping subscriptions maintenance");
this.stopKeepAlivePings(); this.stopKeepAlivePings();
this.stopConnectionListener(); this.stopConnectionListener();
} }
@ -299,9 +305,9 @@ export class SubscriptionManager implements ISubscription {
} }
this.keepAliveTimer = setInterval(() => { this.keepAliveTimer = setInterval(() => {
void this.ping().catch((error) => { void this.ping()
log.error("Error in keep-alive ping cycle:", error); .then(() => log.info("Keep-alive ping successful"))
}); .catch((error) => log.error("Error in keep-alive ping cycle:", error));
}, interval) as unknown as number; }, interval) as unknown as number;
} }

View File

@ -1,5 +1,10 @@
import type { Peer, PeerId } from "@libp2p/interface"; import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCodec, LightPushCore } from "@waku/core"; import {
ConnectionManager,
getHealthManager,
LightPushCodec,
LightPushCore
} from "@waku/core";
import { import {
Failure, Failure,
type IEncoder, type IEncoder,
@ -113,7 +118,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
} }
} }
this.healthManager.updateProtocolHealth(LightPushCodec, successes.length); getHealthManager().updateProtocolHealth(
this.protocol.multicodec,
successes.length
);
return { return {
successes, successes,

View File

@ -0,0 +1,148 @@
import { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCodec } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { Logger } from "@waku/utils";
import { expect } from "chai";
import sinon from "sinon";
import { PeerManager } from "./peer_manager.js";
describe("PeerManager", () => {
let peerManager: PeerManager;
let mockConnectionManager: sinon.SinonStubbedInstance<ConnectionManager>;
let mockCore: sinon.SinonStubbedInstance<BaseProtocol>;
let mockLogger: any;
beforeEach(() => {
mockConnectionManager = sinon.createStubInstance(ConnectionManager);
mockCore = sinon.createStubInstance(BaseProtocol);
mockLogger = {
info: sinon.stub(),
warn: sinon.stub(),
error: sinon.stub(),
debug: sinon.stub(),
extend: sinon.stub().returns({
info: sinon.stub(),
warn: sinon.stub(),
error: sinon.stub(),
debug: sinon.stub()
})
};
mockCore.multicodec = LightPushCodec;
peerManager = new PeerManager(
mockConnectionManager as any,
mockCore as any,
mockLogger as Logger
);
});
afterEach(() => {
sinon.restore();
});
const createMockPeer = (id: string): Peer =>
({
id: {
toString: () => id
} as PeerId
}) as Peer;
describe("addPeer", () => {
it("should add a peer", async () => {
const peer = createMockPeer("peer1");
await peerManager.addPeer(peer);
expect(mockConnectionManager.attemptDial.calledWith(peer.id)).to.be.true;
expect(
mockLogger.info.calledWith(sinon.match(/Added and dialed peer: peer1/))
).to.be.true;
expect(await peerManager.getPeerCount()).to.equal(1);
});
});
describe("removePeer", () => {
it("should remove a peer", async () => {
const peer = createMockPeer("peer1");
await peerManager.addPeer(peer);
await peerManager.removePeer(peer.id);
expect(mockLogger.info.calledWith(sinon.match(/Removed peer: peer1/))).to
.be.true;
expect(await peerManager.getPeerCount()).to.equal(0);
});
});
describe("getPeerCount", () => {
it("should return the correct number of peers", async () => {
await peerManager.addPeer(createMockPeer("peer1"));
await peerManager.addPeer(createMockPeer("peer2"));
const count = await peerManager.getPeerCount();
expect(count).to.equal(2);
});
});
describe("hasPeers", () => {
it("should return true when peers exist", async () => {
await peerManager.addPeer(createMockPeer("peer1"));
const result = await peerManager.hasPeers();
expect(result).to.be.true;
});
it("should return false when no peers exist", async () => {
const result = await peerManager.hasPeers();
expect(result).to.be.false;
});
});
describe("removeExcessPeers", () => {
it("should remove the specified number of excess peers", async () => {
await peerManager.addPeer(createMockPeer("peer1"));
await peerManager.addPeer(createMockPeer("peer2"));
await peerManager.addPeer(createMockPeer("peer3"));
await peerManager.removeExcessPeers(2);
const count = await peerManager.getPeerCount();
expect(count).to.equal(1);
expect(mockLogger.info.calledWith(`Removing 2 excess peer(s)`)).to.be
.true;
});
});
describe("findAndAddPeers", () => {
it("should find and add new peers", async () => {
const newPeers = [createMockPeer("peer1"), createMockPeer("peer2")];
mockCore.getPeers.resolves(newPeers);
const addedPeers = await peerManager.findAndAddPeers(2);
expect(addedPeers).to.have.lengthOf(2);
expect(mockConnectionManager.attemptDial.callCount).to.equal(2);
});
it("should not add existing peers", async () => {
const existingPeer = createMockPeer("existing");
await peerManager.addPeer(existingPeer);
const newPeers = [existingPeer, createMockPeer("new")];
mockCore.getPeers.resolves(newPeers);
const addedPeers = await peerManager.findAndAddPeers(2);
expect(addedPeers).to.have.lengthOf(1);
expect(mockConnectionManager.attemptDial.callCount).to.equal(2); // Once for existing, once for new
});
it("should log when no additional peers are found", async () => {
mockCore.getPeers.resolves([]);
await peerManager.findAndAddPeers(2);
expect(mockLogger.warn.calledWith("No additional peers found")).to.be
.true;
});
});
});

View File

@ -0,0 +1,113 @@
import { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IHealthManager } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Mutex } from "async-mutex";
export class PeerManager {
private peers: Map<string, Peer> = new Map();
private healthManager: IHealthManager;
private readMutex = new Mutex();
private writeMutex = new Mutex();
private writeLockHolder: string | null = null;
public constructor(
private readonly connectionManager: ConnectionManager,
private readonly core: BaseProtocol,
private readonly log: Logger
) {
this.healthManager = getHealthManager();
this.healthManager.updateProtocolHealth(this.core.multicodec, 0);
}
public getWriteLockHolder(): string | null {
return this.writeLockHolder;
}
public getPeers(): Peer[] {
return Array.from(this.peers.values());
}
public async addPeer(peer: Peer): Promise<void> {
return this.writeMutex.runExclusive(async () => {
this.writeLockHolder = `addPeer: ${peer.id.toString()}`;
await this.connectionManager.attemptDial(peer.id);
this.peers.set(peer.id.toString(), peer);
this.log.info(`Added and dialed peer: ${peer.id.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null;
});
}
public async removePeer(peerId: PeerId): Promise<void> {
return this.writeMutex.runExclusive(() => {
this.writeLockHolder = `removePeer: ${peerId.toString()}`;
this.peers.delete(peerId.toString());
this.log.info(`Removed peer: ${peerId.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null;
});
}
public async getPeerCount(): Promise<number> {
return this.readMutex.runExclusive(() => this.peers.size);
}
public async hasPeers(): Promise<boolean> {
return this.readMutex.runExclusive(() => this.peers.size > 0);
}
public async removeExcessPeers(excessPeers: number): Promise<void> {
this.log.info(`Removing ${excessPeers} excess peer(s)`);
const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers);
for (const peer of peersToRemove) {
await this.removePeer(peer.id);
}
}
/**
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
public async findAndAddPeers(numPeers: number): Promise<Peer[]> {
const additionalPeers = await this.findPeers(numPeers);
if (additionalPeers.length === 0) {
this.log.warn("No additional peers found");
return [];
}
return this.addMultiplePeers(additionalPeers);
}
/**
* Finds additional peers.
* @param numPeers The number of peers to find.
*/
public async findPeers(numPeers: number): Promise<Peer[]> {
const connectedPeers = await this.core.getPeers();
return this.readMutex.runExclusive(async () => {
const newPeers = connectedPeers
.filter((peer) => !this.peers.has(peer.id.toString()))
.slice(0, numPeers);
return newPeers;
});
}
public async addMultiplePeers(peers: Peer[]): Promise<Peer[]> {
const addedPeers: Peer[] = [];
for (const peer of peers) {
await this.addPeer(peer);
addedPeers.push(peer);
}
return addedPeers;
}
}

View File

@ -2,6 +2,7 @@ import type { Peer, PeerId } from "@libp2p/interface";
import { import {
ContentTopic, ContentTopic,
CoreProtocolResult, CoreProtocolResult,
Libp2p,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -18,13 +19,14 @@ export class ReliabilityMonitorManager {
public static createReceiverMonitor( public static createReceiverMonitor(
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
getPeers: () => Peer[], getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>, renewPeer: (peerId: PeerId) => Promise<Peer | undefined>,
getContentTopics: () => ContentTopic[], getContentTopics: () => ContentTopic[],
protocolSubscribe: ( protocolSubscribe: (
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
peer: Peer, peer: Peer,
contentTopics: ContentTopic[] contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult> ) => Promise<CoreProtocolResult>,
addLibp2pEventListener: Libp2p["addEventListener"]
): ReceiverReliabilityMonitor { ): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
@ -35,14 +37,15 @@ export class ReliabilityMonitorManager {
getPeers, getPeers,
renewPeer, renewPeer,
getContentTopics, getContentTopics,
protocolSubscribe protocolSubscribe,
addLibp2pEventListener
); );
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor; return monitor;
} }
public static createSenderMonitor( public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer> renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
): SenderReliabilityMonitor { ): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) { if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(

View File

@ -3,6 +3,7 @@ import {
ContentTopic, ContentTopic,
CoreProtocolResult, CoreProtocolResult,
IProtoMessage, IProtoMessage,
Libp2p,
PeerIdStr, PeerIdStr,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -32,13 +33,14 @@ export class ReceiverReliabilityMonitor {
public constructor( public constructor(
private readonly pubsubTopic: PubsubTopic, private readonly pubsubTopic: PubsubTopic,
private getPeers: () => Peer[], private getPeers: () => Peer[],
private renewPeer: (peerId: PeerId) => Promise<Peer>, private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>,
private getContentTopics: () => ContentTopic[], private getContentTopics: () => ContentTopic[],
private protocolSubscribe: ( private protocolSubscribe: (
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
peer: Peer, peer: Peer,
contentTopics: ContentTopic[] contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult> ) => Promise<CoreProtocolResult>,
private addLibp2pEventListener: Libp2p["addEventListener"]
) { ) {
const allPeerIdStr = this.getPeers().map((p) => p.id.toString()); const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
@ -49,6 +51,13 @@ export class ReceiverReliabilityMonitor {
} }
}; };
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0)); allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
this.addLibp2pEventListener("peer:disconnect", (evt) => {
const peerId = evt.detail;
if (this.getPeers().some((p) => p.id.equals(peerId))) {
void this.renewAndSubscribePeer(peerId);
}
});
} }
public setMaxMissedMessagesThreshold(value: number | undefined): void { public setMaxMissedMessagesThreshold(value: number | undefined): void {
@ -163,15 +172,21 @@ export class ReceiverReliabilityMonitor {
private async renewAndSubscribePeer( private async renewAndSubscribePeer(
peerId: PeerId peerId: PeerId
): Promise<Peer | undefined> { ): Promise<Peer | undefined> {
const peerIdStr = peerId.toString();
try { try {
if (this.peerRenewalLocks.has(peerId.toString())) { if (this.peerRenewalLocks.has(peerIdStr)) {
log.info(`Peer ${peerId.toString()} is already being renewed.`); log.info(`Peer ${peerIdStr} is already being renewed.`);
return; return;
} }
this.peerRenewalLocks.add(peerId.toString()); this.peerRenewalLocks.add(peerIdStr);
const newPeer = await this.renewPeer(peerId); const newPeer = await this.renewPeer(peerId);
if (!newPeer) {
log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`);
return;
}
await this.protocolSubscribe( await this.protocolSubscribe(
this.pubsubTopic, this.pubsubTopic,
newPeer, newPeer,
@ -181,16 +196,16 @@ export class ReceiverReliabilityMonitor {
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set(); this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0); this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
this.peerFailures.delete(peerId.toString()); this.peerFailures.delete(peerIdStr);
this.missedMessagesByPeer.delete(peerId.toString()); this.missedMessagesByPeer.delete(peerIdStr);
delete this.receivedMessagesHashes.nodes[peerId.toString()]; delete this.receivedMessagesHashes.nodes[peerIdStr];
return newPeer; return newPeer;
} catch (error) { } catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`); log.error(`Failed to renew peer ${peerIdStr}: ${error}.`);
return; return;
} finally { } finally {
this.peerRenewalLocks.delete(peerId.toString()); this.peerRenewalLocks.delete(peerIdStr);
} }
} }

View File

@ -11,7 +11,9 @@ export class SenderReliabilityMonitor {
private readonly maxAttemptsBeforeRenewal = private readonly maxAttemptsBeforeRenewal =
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;
public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {} public constructor(
private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
) {}
public async attemptRetriesOrRenew( public async attemptRetriesOrRenew(
peerId: PeerId, peerId: PeerId,
@ -42,6 +44,7 @@ export class SenderReliabilityMonitor {
} else { } else {
try { try {
const newPeer = await this.renewPeer(peerId); const newPeer = await this.renewPeer(peerId);
if (newPeer) {
log.info( log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
); );
@ -49,6 +52,11 @@ export class SenderReliabilityMonitor {
this.attempts.delete(peerIdStr); this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0); this.attempts.set(newPeer.id.toString(), 0);
await protocolSend(); await protocolSend();
} else {
log.error(
`Failed to renew peer ${peerId.toString()}: New peer is undefined`
);
}
} catch (error) { } catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
} }

View File

@ -272,4 +272,52 @@ describe("Waku Filter: Peer Management: E2E", function () {
expect(waku.filter.connectedPeers.length).to.equal(2); expect(waku.filter.connectedPeers.length).to.equal(2);
}); });
it("Renews peer for Filter on peer:disconnect event", async function () {
this.timeout(30000);
const messages: DecodedMessage[] = [];
const { error, subscription } = await waku.filter.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);
if (error) {
throw error;
}
const initialPeers = waku.filter.connectedPeers;
expect(initialPeers.length).to.equal(waku.filter.numPeersToUse);
const peerToDisconnect = initialPeers[0];
await waku.connectionManager.dropConnection(peerToDisconnect.id);
await delay(5000);
expect(waku.filter.connectedPeers.length).to.equal(
waku.filter.numPeersToUse
);
const stillConnected = waku.filter.connectedPeers.some((peer) =>
peer.id.equals(peerToDisconnect.id)
);
expect(stillConnected).to.be.false;
await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello after disconnect")
});
await delay(2000);
expect(messages.length).to.equal(1);
expect(new TextDecoder().decode(messages[0].payload)).to.equal(
"Hello after disconnect"
);
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
});
}); });

View File

@ -1,22 +1,13 @@
import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import type { Connection, Peer, PeerStore } from "@libp2p/interface";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { LightPushCodec } from "@waku/core";
import { import {
ContentTopicInfo,
createLightNode, createLightNode,
Libp2pComponents, Libp2pComponents,
type LightNode, type LightNode,
Protocols,
ShardInfo,
Tags, Tags,
utf8ToBytes utf8ToBytes
} from "@waku/sdk"; } from "@waku/sdk";
import { import { encodeRelayShard } from "@waku/utils";
encodeRelayShard,
ensureShardingConfigured,
shardInfoToPubsubTopics
} from "@waku/utils";
import { getConnectedPeersForProtocolAndShard } from "@waku/utils/libp2p";
import { expect } from "chai"; import { expect } from "chai";
import fc from "fast-check"; import fc from "fast-check";
import Sinon from "sinon"; import Sinon from "sinon";
@ -24,414 +15,9 @@ import Sinon from "sinon";
import { import {
afterEachCustom, afterEachCustom,
beforeEachCustom, beforeEachCustom,
DefaultTestShardInfo, DefaultTestShardInfo
delay,
makeLogFileName,
ServiceNode,
tearDownNodes
} from "../src/index.js"; } from "../src/index.js";
describe("getConnectedPeersForProtocolAndShard", function () {
let waku: LightNode;
let serviceNode1: ServiceNode;
let serviceNode2: ServiceNode;
const contentTopic = "/test/2/waku-light-push/utf8";
const autoshardingClusterId = 6;
beforeEachCustom(this, async () => {
serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
serviceNode2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
});
afterEachCustom(this, async () => {
await tearDownNodes([serviceNode1, serviceNode2], waku);
});
it("same cluster, same shard: nodes connect", async function () {
this.timeout(15000);
const shardInfo: ShardInfo = {
clusterId: 2,
shards: [2]
};
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true,
relay: true
});
const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo).shardInfo
);
expect(peers.length).to.be.greaterThan(0);
});
it("same cluster, different shard: nodes don't connect", async function () {
this.timeout(15000);
const shardInfo1: ShardInfo = {
clusterId: 2,
shards: [1]
};
const shardInfo2: ShardInfo = {
clusterId: 2,
shards: [2]
};
// Separate shard
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});
// Same shard
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, same shard: nodes don't connect", async function () {
this.timeout(15000);
const shardInfo1: ShardInfo = {
clusterId: 2,
shards: [1]
};
const shardInfo2: ShardInfo = {
clusterId: 3,
shards: [1]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, different shard: nodes don't connect", async function () {
this.timeout(15000);
const shardInfo1: ShardInfo = {
clusterId: 2,
shards: [1]
};
const shardInfo2: ShardInfo = {
clusterId: 3,
shards: [2]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2");
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
await waku.start();
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});
it("same cluster, same shard: nodes connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo).shardInfo
);
expect(peers.length).to.be.greaterThan(0);
});
it("same cluster, different shard: nodes connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo1: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
const shardInfo2: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: ["/test/5/waku-light-push/utf8"]
};
// Separate shard
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
// Same shard
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, same shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo1: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: [contentTopic]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
it("different cluster, different shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);
const shardInfo1: ContentTopicInfo = {
clusterId: autoshardingClusterId,
contentTopics: [contentTopic]
};
const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: ["/test/5/waku-light-push/utf8"]
};
// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
contentTopic: [contentTopic],
lightpush: true,
relay: true
});
// and another node in the same cluster cluster as our node
const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2");
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});
const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
await waku.start();
await waku.waitForPeers([Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
ensureShardingConfigured(shardInfo2).shardInfo
);
expect(peers.length).to.be.equal(1);
});
});
describe("getPeers", function () { describe("getPeers", function () {
let peerStore: PeerStore; let peerStore: PeerStore;
let connectionManager: Libp2pComponents["connectionManager"]; let connectionManager: Libp2pComponents["connectionManager"];
@ -566,7 +152,8 @@ describe("getPeers", function () {
for (const peer of allPeers) { for (const peer of allPeers) {
connections.push({ connections.push({
status: "open", status: "open",
remotePeer: peer.id remotePeer: peer.id,
streams: [{ protocol: waku.lightPush.protocol.multicodec }]
} as unknown as Connection); } as unknown as Connection);
} }
return connections; return connections;

View File

@ -72,6 +72,7 @@ const runTests = (strictNodeCheck: boolean): void => {
const pushResponse = await waku.lightPush.send(TestEncoder, { const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i)) payload: utf8ToBytes(generateMessageText(i))
}); });
expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(pushResponse.successes.length).to.eq(numServiceNodes);
} }

View File

@ -47,12 +47,19 @@ describe("Waku Light Push: Connection Management: E2E", function () {
expect(failures?.length || 0).to.equal(0); expect(failures?.length || 0).to.equal(0);
}); });
it("should push to available amount of connection if less than required", async function () { // skipped because of https://github.com/waku-org/js-waku/pull/2155#discussion_r1787452696
const connections = waku.libp2p.getConnections(); it.skip("Failed peers are renewed", async function () {
await Promise.all( // send a lightpush request -- should have all successes
connections const response1 = await waku.lightPush.send(
.slice(0, connections.length - 1) encoder,
.map((c) => waku.connectionManager.dropConnection(c.remotePeer)) {
payload: utf8ToBytes("Hello_World")
},
{ forceUseAllPeers: true }
);
expect(response1.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
); );
const { successes, failures } = await waku.lightPush.send(encoder, { const { successes, failures } = await waku.lightPush.send(encoder, {

View File

@ -1,8 +1,6 @@
import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import type { Peer, PeerStore } from "@libp2p/interface";
import { ShardInfo } from "@waku/interfaces";
import { bytesToUtf8 } from "../bytes/index.js"; import { bytesToUtf8 } from "../bytes/index.js";
import { decodeRelayShard } from "../common/relay_shard_codec.js";
/** /**
* Returns a pseudo-random peer that supports the given protocol. * Returns a pseudo-random peer that supports the given protocol.
@ -69,39 +67,3 @@ export async function getPeersForProtocol(
}); });
return peers; return peers;
} }
export async function getConnectedPeersForProtocolAndShard(
connections: Connection[],
peerStore: PeerStore,
protocols: string[],
shardInfo?: ShardInfo
): Promise<Peer[]> {
const openConnections = connections.filter(
(connection) => connection.status === "open"
);
const peerPromises = openConnections.map(async (connection) => {
const peer = await peerStore.get(connection.remotePeer);
const supportsProtocol = protocols.some((protocol) =>
peer.protocols.includes(protocol)
);
if (supportsProtocol) {
if (shardInfo) {
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
const peerShardInfo =
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);
if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) {
return peer;
}
} else {
return peer;
}
}
return null;
});
const peersWithNulls = await Promise.all(peerPromises);
return peersWithNulls.filter((peer): peer is Peer => peer !== null);
}