diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 3ebbc3c443..648dd5c3eb 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -1,5 +1,5 @@ import type { Libp2p } from "@libp2p/interface"; -import type { Peer, PeerStore, Stream } from "@libp2p/interface"; +import type { Peer, Stream } from "@libp2p/interface"; import type { IBaseProtocolCore, Libp2pComponents, @@ -46,18 +46,15 @@ export class BaseProtocol implements IBaseProtocolCore { return this.streamManager.getStream(peer); } - public get peerStore(): PeerStore { - return this.components.peerStore; - } - + //TODO: move to SDK /** * Returns known peers from the address book (`libp2p.peerStore`) that support * the class protocol. Waku may or may not be currently connected to these * peers. */ - public async allPeers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } + // public async allPeers(): Promise { + // return getPeersForProtocol(this.peerStore, [this.multicodec]); + // } public async connectedPeers(withOpenStreams = false): Promise { const peers = await this.allPeers(); @@ -91,7 +88,7 @@ export class BaseProtocol implements IBaseProtocolCore { const connectedPeersForProtocolAndShard = await getConnectedPeersForProtocolAndShard( this.components.connectionManager.getConnections(), - this.peerStore, + this.components.peerStore, [this.multicodec], pubsubTopicsToShardInfo(this.pubsubTopics) ); @@ -105,7 +102,7 @@ export class BaseProtocol implements IBaseProtocolCore { // Sort the peers by latency const sortedFilteredPeers = await sortPeersByLatency( - this.peerStore, + this.components.peerStore, filteredPeers ); diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index fe7282909e..3c681d3849 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -1,6 +1,6 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import type { Peer, PeerStore } from "@libp2p/interface"; +import type { Peer } from "@libp2p/interface"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 7aa95a3d17..0c79ef0326 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -14,7 +14,6 @@ interface Options { numPeersToUse?: number; maintainPeersInterval?: number; } -///TODO: update HealthManager const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index f1613a8021..dcdc024f1b 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -1,11 +1,14 @@ import { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager } from "@waku/core"; +import { ConnectionManager, getHealthManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; +import { IHealthManager } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { Mutex } from "async-mutex"; export class PeerManager { private peers: Map = new Map(); + private healthManager: IHealthManager; + private readMutex = new Mutex(); private writeMutex = new Mutex(); private writeLockHolder: string | null = null; @@ -14,7 +17,10 @@ export class PeerManager { private readonly connectionManager: ConnectionManager, private readonly core: BaseProtocol, private readonly log: Logger - ) {} + ) { + this.healthManager = getHealthManager(); + this.healthManager.updateProtocolHealth(this.core.multicodec, 0); + } public getWriteLockHolder(): string | null { return this.writeLockHolder; @@ -30,6 +36,10 @@ export class PeerManager { await this.connectionManager.attemptDial(peer.id); this.peers.set(peer.id.toString(), peer); this.log.info(`Added and dialed peer: ${peer.id.toString()}`); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.size + ); this.writeLockHolder = null; }); } @@ -39,6 +49,10 @@ export class PeerManager { this.writeLockHolder = `removePeer: ${peerId.toString()}`; this.peers.delete(peerId.toString()); this.log.info(`Removed peer: ${peerId.toString()}`); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.size + ); this.writeLockHolder = null; }); } diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 252340a5bf..dd0ba3ad5c 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -1,22 +1,13 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { LightPushCodec, waitForRemotePeer } from "@waku/core"; import { - ContentTopicInfo, createLightNode, Libp2pComponents, type LightNode, - Protocols, - ShardInfo, Tags, utf8ToBytes } from "@waku/sdk"; -import { - encodeRelayShard, - ensureShardingConfigured, - shardInfoToPubsubTopics -} from "@waku/utils"; -import { getConnectedPeersForProtocolAndShard } from "@waku/utils/libp2p"; +import { encodeRelayShard } from "@waku/utils"; import { expect } from "chai"; import fc from "fast-check"; import Sinon from "sinon"; @@ -24,414 +15,9 @@ import Sinon from "sinon"; import { afterEachCustom, beforeEachCustom, - DefaultTestShardInfo, - delay, - makeLogFileName, - ServiceNode, - tearDownNodes + DefaultTestShardInfo } from "../src/index.js"; -describe("getConnectedPeersForProtocolAndShard", function () { - let waku: LightNode; - let serviceNode1: ServiceNode; - let serviceNode2: ServiceNode; - const contentTopic = "/test/2/waku-light-push/utf8"; - const autoshardingClusterId = 6; - - beforeEachCustom(this, async () => { - serviceNode1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - serviceNode2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([serviceNode1, serviceNode2], waku); - }); - - it("same cluster, same shard: nodes connect", async function () { - this.timeout(15000); - - const shardInfo: ShardInfo = { - clusterId: 2, - shards: [2] - }; - - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true, - relay: true - }); - - const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo }); - await waku.start(); - await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); - await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo).shardInfo - ); - expect(peers.length).to.be.greaterThan(0); - }); - - it("same cluster, different shard: nodes don't connect", async function () { - this.timeout(15000); - - const shardInfo1: ShardInfo = { - clusterId: 2, - shards: [1] - }; - - const shardInfo2: ShardInfo = { - clusterId: 2, - shards: [2] - }; - - // Separate shard - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true, - relay: true - }); - - // Same shard - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, same shard: nodes don't connect", async function () { - this.timeout(15000); - - const shardInfo1: ShardInfo = { - clusterId: 2, - shards: [1] - }; - - const shardInfo2: ShardInfo = { - clusterId: 3, - shards: [1] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - shardInfo2 - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, different shard: nodes don't connect", async function () { - this.timeout(15000); - - const shardInfo1: ShardInfo = { - clusterId: 2, - shards: [1] - }; - - const shardInfo2: ShardInfo = { - clusterId: 3, - shards: [2] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); - const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - shardInfo2 - ); - expect(peers.length).to.be.equal(1); - }); - - it("same cluster, same shard: nodes connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo }); - await waku.start(); - await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); - await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo).shardInfo - ); - expect(peers.length).to.be.greaterThan(0); - }); - - it("same cluster, different shard: nodes connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo1: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - const shardInfo2: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: ["/test/5/waku-light-push/utf8"] - }; - - // Separate shard - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - // Same shard - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, same shard: nodes don't connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo1: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - const shardInfo2: ContentTopicInfo = { - clusterId: 2, - contentTopics: [contentTopic] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); - const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); - - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); - - it("different cluster, different shard: nodes don't connect (autosharding)", async function () { - this.timeout(15000); - - const shardInfo1: ContentTopicInfo = { - clusterId: autoshardingClusterId, - contentTopics: [contentTopic] - }; - - const shardInfo2: ContentTopicInfo = { - clusterId: 2, - contentTopics: ["/test/5/waku-light-push/utf8"] - }; - - // we start one node in a separate cluster - await serviceNode1.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - contentTopic: [contentTopic], - lightpush: true, - relay: true - }); - - // and another node in the same cluster cluster as our node - const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); - await serviceNode2.start({ - discv5Discovery: true, - peerExchange: true, - clusterId: shardInfo2.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true, - relay: true - }); - - const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); - const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); - - waku = await createLightNode({ networkConfig: shardInfo2 }); - await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); - await delay(500); - await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); - await waku.start(); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - const peers = await getConnectedPeersForProtocolAndShard( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - waku.libp2p.getProtocols(), - ensureShardingConfigured(shardInfo2).shardInfo - ); - expect(peers.length).to.be.equal(1); - }); -}); - describe("getPeers", function () { let peerStore: PeerStore; let connectionManager: Libp2pComponents["connectionManager"]; @@ -566,7 +152,8 @@ describe("getPeers", function () { for (const peer of allPeers) { connections.push({ status: "open", - remotePeer: peer.id + remotePeer: peer.id, + streams: [{ protocol: waku.lightPush.protocol.multicodec }] } as unknown as Connection); } return connections;