diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f51901a109..c7f7462ca4 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -14,6 +14,7 @@ export * as waku_filter from "./lib/filter/index.js"; export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; +export { LightPushCodec } from "./lib/light_push/index.js"; export { wakuLightPush } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 22d2ded86e..b1f4620cc2 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -10,7 +10,11 @@ import type { } from "@waku/interfaces"; import { DefaultPubsubTopic } from "@waku/interfaces"; import { shardInfoToPubsubTopics } from "@waku/utils"; -import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; +import { + getConnectedPeersForProtocol, + getPeersForProtocol, + selectPeerForProtocol +} from "@waku/utils/libp2p"; import { filterPeers } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; @@ -70,7 +74,7 @@ export class BaseProtocol implements IBaseProtocol { } /** - * Retrieves a list of peers based on the specified criteria. + * Retrieves a list of connected peers based on the specified criteria. * * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. @@ -88,10 +92,12 @@ export class BaseProtocol implements IBaseProtocol { numPeers: 0 } ): Promise { - // Retrieve all peers that support the protocol - const allPeersForProtocol = await getPeersForProtocol(this.peerStore, [ - this.multicodec - ]); + // Retrieve all connected peers that support the protocol + const allPeersForProtocol = await getConnectedPeersForProtocol( + this.components.connectionManager.getConnections(), + this.peerStore, + [this.multicodec] + ); // Filter the peers based on the specified criteria return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index ecb1f02bf5..5f82879f01 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -2,11 +2,16 @@ import type { PeerStore } from "@libp2p/interface/peer-store"; import type { Peer } from "@libp2p/interface/peer-store"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import { DefaultPubsubTopic, LightNode, Protocols } from "@waku/interfaces"; +import { LightPushCodec } from "@waku/core"; +import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { selectPeerForProtocol } from "@waku/utils/libp2p"; +import { + getConnectedPeersForProtocol, + selectPeerForProtocol +} from "@waku/utils/libp2p"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; import sinon from "sinon"; @@ -260,3 +265,37 @@ describe("selectPeerForProtocol", () => { ); }); }); + +describe("getConnectedPeersForProtocol", function () { + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ + filter: true, + lightpush: true, + relay: true + }); + waku = await createLightNode(); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter]); + }); + + afterEach(async function () { + this.timeout(10000); + await tearDownNodes(nwaku, waku); + }); + + it("returns all connected peers that support the protocol", async function () { + const peers = await getConnectedPeersForProtocol( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + [LightPushCodec] + ); + + expect(peers.length).to.eq(1); + }); +}); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index b5db2b73c2..883160cc89 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -66,6 +66,27 @@ export async function getPeersForProtocol( return peers; } +export async function getConnectedPeersForProtocol( + connections: Connection[], + peerStore: PeerStore, + protocols: string[] +): Promise { + const openConnections = connections.filter( + (connection) => connection.status === "open" + ); + + const peerPromises = openConnections.map(async (connection) => { + const peer = await peerStore.get(connection.remotePeer); + const supportsProtocol = peer.protocols.some((protocol) => + protocols.includes(protocol) + ); + return supportsProtocol ? peer : null; + }); + + const peersWithNulls = await Promise.all(peerPromises); + return peersWithNulls.filter((peer): peer is Peer => peer !== null); +} + /** * Returns a peer that supports the given protocol. * If peerId is provided, the peer with that id is returned.