diff --git a/packages/sdk/src/wait_for_remote_peer.ts b/packages/sdk/src/wait_for_remote_peer.ts index 44bb1d63fb..1dde3f2367 100644 --- a/packages/sdk/src/wait_for_remote_peer.ts +++ b/packages/sdk/src/wait_for_remote_peer.ts @@ -31,7 +31,8 @@ export async function waitForRemotePeer( protocols?: Protocols[], timeoutMs?: number ): Promise { - protocols = protocols ?? getEnabledProtocols(waku); + // if no protocols or empty array passed - try to derive from mounted + protocols = protocols?.length ? protocols : getEnabledProtocols(waku); const connections = waku.libp2p.getConnections(); if (!waku.isStarted()) { @@ -39,8 +40,7 @@ export async function waitForRemotePeer( } if (connections.length > 0 && !protocols.includes(Protocols.Relay)) { - const success = await waitForMetadata(waku.libp2p); - + const success = await waitForMetadata(waku, protocols); if (success) { return; } @@ -135,33 +135,55 @@ async function waitForConnectedPeer( /** * Waits for the metadata from the remote peer. */ -async function waitForMetadata(libp2p: Libp2p): Promise { - const connections = libp2p.getConnections(); - const metadataService = libp2p.services.metadata; +async function waitForMetadata( + waku: Waku, + protocols: Protocols[] +): Promise { + const connectedPeers = waku.libp2p.getPeers(); + const metadataService = waku.libp2p.services.metadata; + const enabledCodes = mapProtocolsToCodecs(protocols); - if (!connections.length || !metadataService) { + if (!connectedPeers.length || !metadataService) { log.info( - `Skipping waitForMetadata due to missing connections:${connections.length} or metadataService:${!!metadataService}` + `Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}` ); return false; } - try { - // confirm at least with one connected peer - await Promise.any( - connections - .map((c) => c.remotePeer) - .map((peer) => metadataService.confirmOrAttemptHandshake(peer)) - ); + for (const peerId of connectedPeers) { + try { + const peer = await waku.libp2p.peerStore.get(peerId); + const hasSomeCodes = peer.protocols.some((c) => enabledCodes.has(c)); - return true; - } catch (e) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") { - log.error("Connection closed. Some peers can be on different shard."); + if (hasSomeCodes) { + const response = + await metadataService.confirmOrAttemptHandshake(peerId); + + if (!response.error) { + peer.protocols.forEach((c) => { + if (enabledCodes.has(c)) { + enabledCodes.set(c, true); + } + }); + + const confirmedAllCodecs = Array.from(enabledCodes.values()).every( + (v) => v + ); + + if (confirmedAllCodecs) { + return true; + } + } + } + } catch (e) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") { + log.error("Connection closed. Some peers can be on different shard."); + } + + log.error(`Error while iterating through peers: ${e}`); + continue; } - - log.error(`Error waiting for metadata: ${e}`); } return false; @@ -216,3 +238,21 @@ function getEnabledProtocols(waku: Waku): Protocols[] { return protocols; } + +function mapProtocolsToCodecs(protocols: Protocols[]): Map { + const codecs: Map = new Map(); + + const protocolToCodec: Record = { + [Protocols.Filter]: FilterCodecs.SUBSCRIBE, + [Protocols.LightPush]: LightPushCodec, + [Protocols.Store]: StoreCodec + }; + + for (const protocol of protocols) { + if (protocolToCodec[protocol]) { + codecs.set(protocolToCodec[protocol], false); + } + } + + return codecs; +} diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index dbf75e456f..07035b382c 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -7,7 +7,7 @@ import { Waku } from "@waku/interfaces"; import { createLightNode, waitForRemotePeer } from "@waku/sdk"; -import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils"; +import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; @@ -52,13 +52,7 @@ export async function runMultipleNodes( for (const node of serviceNodes.nodes) { await waku.dial(await node.getMultiaddrWithId()); - await waitForRemotePeer( - waku, - [ - !customArgs?.filter ? undefined : Protocols.Filter, - !customArgs?.lightpush ? undefined : Protocols.LightPush - ].filter(isDefined) - ); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); await node.ensureSubscriptions( derivePubsubTopicsFromNetworkConfig(networkConfig) ); diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 25ad308d8e..d89ef201cc 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -110,11 +110,7 @@ describe("Waku Message Ephemeral field", function () { await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [ - Protocols.Filter, - Protocols.LightPush, - Protocols.Store - ]); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); }); it("Ephemeral messages are not stored", async function () { diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 0818e46467..8eb7db45d7 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -38,7 +38,7 @@ describe("Waku Filter: Peer Management: E2E", function () { [serviceNodes, waku] = await runMultipleNodes( this.ctx, DefaultTestShardInfo, - undefined, + { lightpush: true, filter: true }, undefined, 5 ); @@ -222,7 +222,7 @@ describe("Waku Filter: Peer Management: E2E", function () { const [serviceNodes, waku] = await runMultipleNodes( this.ctx, DefaultTestShardInfo, - undefined, + { lightpush: true, filter: true }, undefined, 2 ); diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index f5ac0200fd..7008b9bda8 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -27,7 +27,10 @@ const runTests = (strictCheckNodes: boolean): void => { beforeEachCustom(this, async () => { try { - [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, { + lightpush: true, + filter: true + }); } catch (error) { console.error(error); } diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index bbd49be6c2..2daf8b8417 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -30,7 +30,10 @@ const runTests = (strictCheckNodes: boolean): void => { let serviceNodes: ServiceNodesFleet; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo, { + lightpush: true, + filter: true + }); }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index de644d9cc6..732cc3f6b3 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -30,10 +30,14 @@ const runTests = (strictCheckNodes: boolean): void => { let serviceNodes: ServiceNodesFleet; beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes(this.ctx, { - contentTopics: [TestContentTopic], - clusterId: ClusterId - }); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + contentTopics: [TestContentTopic], + clusterId: ClusterId + }, + { filter: true, lightpush: true } + ); }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts index a0db807616..09f0febe49 100644 --- a/packages/tests/tests/health-manager/protocols.spec.ts +++ b/packages/tests/tests/health-manager/protocols.spec.ts @@ -36,7 +36,7 @@ describe("Health Manager", function () { [serviceNodes, waku] = await runMultipleNodes( this.ctx, TestShardInfo, - undefined, + { lightpush: true, filter: true }, undefined, num ); @@ -62,7 +62,7 @@ describe("Health Manager", function () { [serviceNodes, waku] = await runMultipleNodes( this.ctx, TestShardInfo, - undefined, + { filter: true, lightpush: true }, undefined, num ); diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 44c6dbee7c..3b332d245d 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -34,7 +34,7 @@ const runTests = (strictNodeCheck: boolean): void => { [serviceNodes, waku] = await runMultipleNodes( this.ctx, TestShardInfo, - undefined, + { lightpush: true, filter: true }, strictNodeCheck, numServiceNodes, true diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 776d503cb3..389cb936b4 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -23,7 +23,7 @@ describe("Waku Light Push: Connection Management: E2E", function () { [serviceNodes, waku] = await runMultipleNodes( this.ctx, DefaultTestShardInfo, - undefined, + { lightpush: true, filter: true }, undefined, 5 );