diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index d59ddce86b..9c1bc031b4 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -8,6 +8,7 @@ import type { import { Logger, pubsubTopicsToShardInfo } from "@waku/utils"; import { getConnectedPeersForProtocolAndShard, + getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; @@ -25,7 +26,7 @@ export class BaseProtocol implements IBaseProtocolCore { protected constructor( public multicodec: string, - private components: Libp2pComponents, + protected components: Libp2pComponents, private log: Logger, public readonly pubsubTopics: PubsubTopic[] ) { @@ -55,21 +56,21 @@ export class BaseProtocol implements IBaseProtocolCore { * the class protocol. Waku may or may not be currently connected to these * peers. */ - // public async allPeers(): Promise { - // return getPeersForProtocol(this.peerStore, [this.multicodec]); - // } + public async allPeers(): Promise { + return getPeersForProtocol(this.components.peerStore, [this.multicodec]); + } - // public async connectedPeers(): Promise { - // const peers = await this.allPeers(); - // return peers.filter((peer) => { - // const connections = this.components.connectionManager.getConnections( - // peer.id - // ); - // return connections.some((c) => - // c.streams.some((s) => s.protocol === this.multicodec) - // ); - // }); - // } + public async connectedPeers(): Promise { + const peers = await this.allPeers(); + return peers.filter((peer) => { + const connections = this.components.connectionManager.getConnections( + peer.id + ); + return connections.some((c) => + c.streams.some((s) => s.protocol === this.multicodec) + ); + }); + } /** * Retrieves a list of connected peers that support the protocol. The list is sorted by latency. diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 3dbf7ed8a2..1744450c7b 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -45,7 +45,7 @@ class Metadata extends BaseProtocol implements IMetadata { pubsubTopicsToShardInfo(this.pubsubTopics) ); - const peer = await this.peerStore.get(peerId); + const peer = await this.libp2pComponents.peerStore.get(peerId); if (!peer) { return { shardInfo: null, diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 43d337728d..6030b86f76 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { numPeers: BigInt(numPeers) }); - const peer = await this.peerStore.get(peerId); + const peer = await this.components.peerStore.get(peerId); if (!peer) { return { peerInfos: null, diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 341f0bf1db..4ec4c83090 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -16,6 +16,8 @@ export enum Protocols { export type IBaseProtocolCore = { multicodec: string; + allPeers: () => Promise; + connectedPeers: () => Promise; addLibp2pEventListener: Libp2p["addEventListener"]; removeLibp2pEventListener: Libp2p["removeEventListener"]; };