diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index f80962019a..8930843227 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -45,6 +45,10 @@ export async function runNodes( const { context, networkConfig, createNode, protocols } = options; const nwaku = new ServiceNode(makeLogFileName(context)); + // Mesh peer: gives `nwaku` at least one gossipsub mesh peer so that REST + // publishes (e.g. used by store tests to seed messages) don't fail with + // NoPeersToPublish. Relay-only — no store/filter/lightpush needed. + const nwakuPeer = new ServiceNode(makeLogFileName(context) + "-peer"); const nwakuArgs: Args = { filter: true, @@ -94,6 +98,25 @@ export async function runNodes( await nwaku.start(nwakuArgs, { retries: 3 }); + // Spin up a relay-only peer subscribed to the same shards/topics and chain + // it to `nwaku` via --staticnode. Once the mesh forms, REST publishes on + // `nwaku` have at least one mesh peer and succeed. Bind its lifecycle to + // `nwaku` so existing tearDownNodes(nwaku, ...) calls stop both. + const nwakuPeerArgs: Args = { + relay: true, + clusterId: networkConfig.clusterId, + staticnode: await nwaku.getExternalMultiaddr() + }; + if (isAutoSharding(networkConfig)) { + nwakuPeerArgs.numShardsInNetwork = networkConfig.numShardsInCluster; + nwakuPeerArgs.contentTopic = options.contentTopics ?? []; + } else if (isStaticSharding(networkConfig) && options.relayShards) { + nwakuPeerArgs.shard = options.relayShards; + nwakuPeerArgs.numShardsInNetwork = 0; + } + await nwakuPeer.start(nwakuPeerArgs, { retries: 3 }); + nwaku.setCompanion(nwakuPeer); + log.info("Starting js waku node with :", JSON.stringify(jswakuArgs)); let waku: WakuNode | undefined; try { @@ -110,7 +133,15 @@ export async function runNodes( await waku.dial(await nwaku.getMultiaddrWithId()); await waku.waitForPeers(protocols); - await nwaku.ensureSubscriptions(routingInfos.map((r) => r.pubsubTopic)); + const pubsubTopics = routingInfos.map((r) => r.pubsubTopic); + await nwaku.ensureSubscriptions(pubsubTopics); + // Peer must also be subscribed for the gossipsub mesh on `pubsubTopics` + // to form between the two nwaku nodes. + await nwakuPeer.ensureSubscriptions(pubsubTopics); + // Give gossipsub a heartbeat (~1s) to GRAFT both nodes into the mesh + // before any test publishes. Without this, the first publish can still + // race the mesh and fail with NoPeersToPublish. + await nwaku.waitForMeshPeers(pubsubTopics, { timeoutMs: 5000 }); return [nwaku, waku as T]; } else { diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index 7e089c804d..476471ff31 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -60,6 +60,9 @@ export class ServiceNode { private readonly logPath: string; private restPort?: number; private args?: Args; + // Optional companion node started by runNodes() to give the primary nwaku + // a gossipsub mesh peer. Its lifecycle is bound to this node's stop(). + private companion?: ServiceNode; public readonly version: NwakuVersion | undefined; @@ -206,12 +209,58 @@ export class ServiceNode { public async stop(): Promise { await this.docker?.stop(); delete this.docker; + if (this.companion) { + await this.companion.stop(); + delete this.companion; + } + } + + /** + * Used by runNodes() to attach a relay-only mesh peer to this node, so + * that REST `/relay/v1/auto/messages` publishes on this node have at least + * one gossipsub mesh peer and don't fail with NoPeersToPublish. + */ + public setCompanion(node: ServiceNode): void { + this.companion = node; } public async waitForLog(msg: string, timeout: number): Promise { return waitForLine(this.logPath, msg, timeout); } + /** + * Polls /admin/v1/peers/mesh until every shard corresponding to the given + * pubsub topics has at least one mesh peer. Used by runNodes() to ensure + * the gossipsub mesh is grafted before tests publish. + */ + public async waitForMeshPeers( + pubsubTopics: PubsubTopic[], + options: { timeoutMs?: number; intervalMs?: number } = {} + ): Promise { + const timeoutMs = options.timeoutMs ?? 5000; + const intervalMs = options.intervalMs ?? 200; + const deadline = Date.now() + timeoutMs; + + while (Date.now() < deadline) { + const mesh = await this.restCall<{ shard: number; peers: unknown[] }[]>( + "/admin/v1/peers/mesh", + "GET", + undefined, + async (response) => { + if (response.status !== 200) return []; + const data = await response.json(); + return Array.isArray(data) ? data : []; + } + ); + const haveAnyMesh = mesh.some((ps) => (ps.peers?.length ?? 0) > 0); + if (haveAnyMesh) return; + await delay(intervalMs); + } + log.warn( + `Timed out after ${timeoutMs}ms waiting for gossipsub mesh peers on ${pubsubTopics.join(", ")}` + ); + } + /** * Calls nwaku REST API "/admin/v1/peers" to check for known peers. Be aware that it doesn't recognize js-waku as a node * @throws