From ada265731acfeddc2bfe2e8e963bc2be37f13900 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Tue, 12 Aug 2025 23:25:23 +0200 Subject: [PATCH] fix: improve error handling for stream manager (#2546) * fix: improve error handling for stream manager * fix browser tests * update logs for stream manager * fix tests * fix playwright --- package-lock.json | 41 ++++------------ packages/browser-tests/tests/headless.spec.ts | 2 +- packages/browser-tests/tests/test-config.ts | 35 ++++++-------- packages/core/src/lib/filter/filter.ts | 47 ++++++++++++------- .../core/src/lib/light_push/light_push.ts | 11 ++--- packages/core/src/lib/metadata/metadata.ts | 9 ++-- packages/core/src/lib/store/store.spec.ts | 2 +- packages/core/src/lib/store/store.ts | 11 +++-- .../lib/stream_manager/stream_manager.spec.ts | 27 +++++------ .../src/lib/stream_manager/stream_manager.ts | 44 ++++++++++++----- packages/discovery/src/dns/dns.spec.ts | 18 ++++--- .../src/peer-exchange/waku_peer_exchange.ts | 9 ++-- 12 files changed, 129 insertions(+), 127 deletions(-) diff --git a/package-lock.json b/package-lock.json index 54c698b822..fa6c8bc191 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7649,7 +7649,7 @@ "version": "7.7.0", "resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.7.0.tgz", "integrity": "sha512-k107IF4+Xr7UHjwDc7Cfd6PRQfbdkiRabXGRjo07b4WyPahFBZCZ1sE+BNxYIJPPg73UkfOsVOLwqVc/6ETrIA==", - "dev": true, + "devOptional": true, "license": "MIT" }, "node_modules/@types/send": { @@ -7871,7 +7871,7 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/scope-manager/-/scope-manager-6.21.0.tgz", "integrity": "sha512-OwLUIWZJry80O99zvqXVEioyniJMa+d2GrqpUTqi5/v5D5rOrppJVBPa0yKCblcigC0/aYAzxxqQ1B+DS2RYsg==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@typescript-eslint/types": "6.21.0", @@ -7933,7 +7933,7 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/types/-/types-6.21.0.tgz", "integrity": "sha512-1kFmZ1rOm5epu9NZEZm1kckCDGj5UJEf7P1kliH4LKu/RkwpsfqqGmY2OOcUs18lSlQBKLDYBOGxRVtrMN5lpg==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": "^16.0.0 || >=18.0.0" @@ -7947,7 +7947,7 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-6.21.0.tgz", "integrity": "sha512-6npJTkZcO+y2/kr+z0hc4HwNfrrP4kNYh57ek7yCNlrBjWQ1Y0OS7jiZTkgumrvkX5HkEKXFZkkdFNkaW2wmUQ==", - "dev": true, + "devOptional": true, "license": "BSD-2-Clause", "dependencies": { "@typescript-eslint/types": "6.21.0", @@ -7976,7 +7976,7 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-6.21.0.tgz", "integrity": "sha512-NfWVaC8HP9T8cbKQxHcsJBY5YE1O33+jpMwN45qzWWaPDZgLIbo12toGMWnmhvCpd3sIxkpDw3Wv1B3dYrbDQQ==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@eslint-community/eslint-utils": "^4.4.0", @@ -8002,7 +8002,7 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-6.21.0.tgz", "integrity": "sha512-JJtkDduxLi9bivAB+cYOVMtbkqdPOhZ+ZI5LC47MIRrDV4Yn2o+ZnW10Nkmr28xRpSpdJ6Sm42Hjf2+REYXm0A==", - "dev": true, + "devOptional": true, "license": "MIT", "dependencies": { "@typescript-eslint/types": "6.21.0", @@ -9127,31 +9127,6 @@ "typescript": ">=4.8.4 <5.9.0" } }, - "node_modules/aegir/node_modules/@typescript-eslint/utils": { - "version": "8.34.1", - "resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-8.34.1.tgz", - "integrity": "sha512-mqOwUdZ3KjtGk7xJJnLbHxTuWVn3GO2WZZuM+Slhkun4+qthLdXx32C8xIXbO1kfCECb3jIs3eoxK3eryk7aoQ==", - "license": "MIT", - "optional": true, - "peer": true, - "dependencies": { - "@eslint-community/eslint-utils": "^4.7.0", - "@typescript-eslint/scope-manager": "8.34.1", - "@typescript-eslint/types": "8.34.1", - "@typescript-eslint/typescript-estree": "8.34.1" - }, - "engines": { - "node": "^18.18.0 || ^20.9.0 || >=21.1.0" - }, - "funding": { - "type": "opencollective", - "url": "https://opencollective.com/typescript-eslint" - }, - "peerDependencies": { - "eslint": "^8.57.0 || ^9.0.0", - "typescript": ">=4.8.4 <5.9.0" - } - }, "node_modules/aegir/node_modules/@typescript-eslint/visitor-keys": { "version": "8.34.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-8.34.1.tgz", @@ -22612,7 +22587,7 @@ "version": "9.0.3", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.3.tgz", "integrity": "sha512-RHiac9mvaRw0x3AYRgDC1CxAP7HTcNrrECeA8YYJeWnpo+2Q5CegtZjaotWTWxDG3UeGA1coE05iH1mPjT/2mg==", - "dev": true, + "devOptional": true, "license": "ISC", "dependencies": { "brace-expansion": "^2.0.1" @@ -34122,7 +34097,7 @@ "version": "1.4.3", "resolved": "https://registry.npmjs.org/ts-api-utils/-/ts-api-utils-1.4.3.tgz", "integrity": "sha512-i3eMG77UTMD0hZhgRS562pv83RC6ukSAC2GMNWc+9dieh/+jDM5u5YG+NHX6VNDRHQcHwmsTHctP9LhbC3WxVw==", - "dev": true, + "devOptional": true, "license": "MIT", "engines": { "node": ">=16" diff --git a/packages/browser-tests/tests/headless.spec.ts b/packages/browser-tests/tests/headless.spec.ts index 4b8598e942..0817d1c1cf 100644 --- a/packages/browser-tests/tests/headless.spec.ts +++ b/packages/browser-tests/tests/headless.spec.ts @@ -23,7 +23,7 @@ test.describe("waku", () => { try { await window.wakuAPI.createWakuNode({ ...config.defaultNodeConfig, - networkConfig: config.cluster42.networkConfig + networkConfig: config.networkConfig }); await window.wakuAPI.startNode(); return { success: true }; diff --git a/packages/browser-tests/tests/test-config.ts b/packages/browser-tests/tests/test-config.ts index f655ec9cb9..a20295a056 100644 --- a/packages/browser-tests/tests/test-config.ts +++ b/packages/browser-tests/tests/test-config.ts @@ -1,21 +1,5 @@ export const NETWORK_CONFIG = { - cluster42: { - networkConfig: { - clusterId: 42, - shards: [0] - }, - peers: [ - "/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ", - "/dns4/waku.fryorcraken.xyz/tcp/8000/wss/p2p/16Uiu2HAmMRvhDHrtiHft1FTUYnn6cVA8AWVrTyLUayJJ3MWpUZDB", - "/dns4/ivansete.xyz/tcp/8000/wss/p2p/16Uiu2HAmDAHuJ8w9zgxVnhtFe8otWNJdCewPAerJJPbXJcn8tu4r" - ] - }, - - sandbox: { - networkConfig: { - clusterId: 1, - shards: [0] - }, + "waku.sandbox": { peers: [ "/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb", "/dns4/node-01.gc-us-central1-a.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmRv1iQ3NoMMcjbtRmKxPuYBbF9nLYz2SDv9MTN8WhGuUU", @@ -23,6 +7,19 @@ export const NETWORK_CONFIG = { ] }, + "waku.test": { + peers: [ + "/dns4/node-01.do-ams3.waku.test.status.im/tcp/8000/wss/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W", + "/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/8000/wss/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG", + "/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/8000/wss/p2p/16Uiu2HAkzHaTP5JsUwfR9NR8Rj9HC24puS6ocaU8wze4QrXr9iXp" + ] + }, + + networkConfig: { + clusterId: 1, + shards: [0] + }, + // Default node configuration defaultNodeConfig: { defaultBootstrap: false @@ -35,6 +32,4 @@ export const NETWORK_CONFIG = { } }; -// Active environment - change this to switch between cluster42 and sandbox -export const ACTIVE_ENV = 'cluster42'; -export const ACTIVE_PEERS = NETWORK_CONFIG[ACTIVE_ENV].peers; \ No newline at end of file +export const ACTIVE_PEERS = NETWORK_CONFIG["waku.test"].peers; \ No newline at end of file diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 545b1b99b9..146fcd73c2 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -1,4 +1,4 @@ -import type { PeerId, Stream } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import type { IncomingStreamData } from "@libp2p/interface-internal"; import { type ContentTopic, @@ -65,6 +65,16 @@ export class FilterCore { ): Promise { const stream = await this.streamManager.getStream(peerId); + if (!stream) { + return { + success: null, + failure: { + error: ProtocolError.NO_STREAM_AVAILABLE, + peerId: peerId + } + }; + } + const request = FilterSubscribeRpc.createSubscribeRequest( pubsubTopic, contentTopics @@ -121,14 +131,10 @@ export class FilterCore { peerId: PeerId, contentTopics: ContentTopic[] ): Promise { - let stream: Stream | undefined; - try { - stream = await this.streamManager.getStream(peerId); - } catch (error) { - log.error( - `Failed to get a stream for remote peer${peerId.toString()}`, - error - ); + const stream = await this.streamManager.getStream(peerId); + + if (!stream) { + log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); return { success: null, failure: { @@ -168,6 +174,17 @@ export class FilterCore { ): Promise { const stream = await this.streamManager.getStream(peerId); + if (!stream) { + log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); + return { + success: null, + failure: { + error: ProtocolError.NO_STREAM_AVAILABLE, + peerId: peerId + } + }; + } + const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic); const res = await pipe( @@ -211,14 +228,10 @@ export class FilterCore { } public async ping(peerId: PeerId): Promise { - let stream: Stream | undefined; - try { - stream = await this.streamManager.getStream(peerId); - } catch (error) { - log.error( - `Failed to get a stream for remote peer${peerId.toString()}`, - error - ); + const stream = await this.streamManager.getStream(peerId); + + if (!stream) { + log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); return { success: null, failure: { diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index 6c2430e5a5..6de027121b 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -1,4 +1,4 @@ -import type { PeerId, Stream } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; import { type CoreProtocolResult, type IEncoder, @@ -95,11 +95,10 @@ export class LightPushCore { }; } - let stream: Stream; - try { - stream = await this.streamManager.getStream(peerId); - } catch (error) { - log.error("Failed to get stream", error); + const stream = await this.streamManager.getStream(peerId); + + if (!stream) { + log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); return { success: null, failure: { diff --git a/packages/core/src/lib/metadata/metadata.ts b/packages/core/src/lib/metadata/metadata.ts index ac4707e575..dc500b1848 100644 --- a/packages/core/src/lib/metadata/metadata.ts +++ b/packages/core/src/lib/metadata/metadata.ts @@ -57,11 +57,10 @@ class Metadata implements IMetadata { }; } - let stream; - try { - stream = await this.streamManager.getStream(peerId); - } catch (error) { - log.error("Failed to get stream", error); + const stream = await this.streamManager.getStream(peerId); + + if (!stream) { + log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); return { shardInfo: null, error: ProtocolError.NO_STREAM_AVAILABLE diff --git a/packages/core/src/lib/store/store.spec.ts b/packages/core/src/lib/store/store.spec.ts index 1cf61eb878..9edb8510e1 100644 --- a/packages/core/src/lib/store/store.spec.ts +++ b/packages/core/src/lib/store/store.spec.ts @@ -155,7 +155,7 @@ describe("StoreCore", () => { }); it("ends if stream creation fails", async () => { - mockStreamManager.getStream.rejects(new Error("Stream creation failed")); + mockStreamManager.getStream.resolves(undefined as any); const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); const result = await generator.next(); expect(result.done).to.be.true; diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index ce61b7a553..acf2398f9b 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -80,11 +80,12 @@ export class StoreCore { contentTopics: queryOpts.contentTopics }); - let stream; - try { - stream = await this.streamManager.getStream(peerId); - } catch (e) { - log.error("Failed to get stream", e); + const stream = await this.streamManager.getStream(peerId); + + if (!stream) { + log.error( + `Failed to get a stream for remote peer:${peerId.toString()}` + ); break; } diff --git a/packages/core/src/lib/stream_manager/stream_manager.spec.ts b/packages/core/src/lib/stream_manager/stream_manager.spec.ts index 7f8db4a7ff..046efac2a0 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.spec.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.spec.ts @@ -45,21 +45,13 @@ describe("StreamManager", () => { } }); - it("should throw if no connection provided", async () => { + it("should return undefined if no connection provided", async () => { streamManager["libp2p"]["connectionManager"]["getConnections"] = ( _peerId: PeerId | undefined ) => []; - let error: Error | undefined; - try { - await streamManager.getStream(mockPeer.id); - } catch (e) { - error = e as Error; - } - - expect(error).not.to.be.undefined; - expect(error?.message).to.include(mockPeer.id.toString()); - expect(error?.message).to.include(MULTICODEC); + const stream = await streamManager.getStream(mockPeer.id); + expect(stream).to.be.undefined; }); it("should create a new stream if no existing for protocol found", async () => { @@ -114,8 +106,11 @@ describe("StreamManager", () => { streamManager.getStream(mockPeer.id) ]); + expect(stream1).to.not.be.undefined; + expect(stream2).to.not.be.undefined; + const expected = ["1", "2"].toString(); - const actual = [stream1.id, stream2.id].sort().toString(); + const actual = [stream1?.id, stream2?.id].sort().toString(); expect(actual).to.be.eq(expected); }); @@ -124,7 +119,9 @@ describe("StreamManager", () => { const scheduleNewStreamSpy = sinon.spy(); streamManager["scheduleNewStream"] = scheduleNewStreamSpy; eventTarget.dispatchEvent( - new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } }) + new CustomEvent("peer:update", { + detail: { peer: { id: mockPeer.id, protocols: [] } } + }) ); expect(scheduleNewStreamSpy.calledOnce).to.be.false; @@ -135,7 +132,7 @@ describe("StreamManager", () => { streamManager["scheduleNewStream"] = scheduleNewStreamSpy; eventTarget.dispatchEvent( new CustomEvent("peer:update", { - detail: { peer: { protocols: [MULTICODEC] } } + detail: { peer: { id: mockPeer.id, protocols: [MULTICODEC] } } }) ); @@ -160,7 +157,7 @@ describe("StreamManager", () => { eventTarget.dispatchEvent( new CustomEvent("peer:update", { - detail: { peer: { protocols: [MULTICODEC] } } + detail: { peer: { id: mockPeer.id, protocols: [MULTICODEC] } } }) ); diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index b554817746..2c21656fff 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -23,7 +23,7 @@ export class StreamManager { ); } - public async getStream(peerId: PeerId): Promise { + public async getStream(peerId: PeerId): Promise { const peerIdStr = peerId.toString(); const scheduledStream = this.streamPool.get(peerIdStr); @@ -32,30 +32,33 @@ export class StreamManager { await scheduledStream; } - let stream = this.getOpenStreamForCodec(peerId); + const stream = + this.getOpenStreamForCodec(peerId) || (await this.createStream(peerId)); - if (stream) { - this.log.info( - `Found existing stream peerId=${peerIdStr} multicodec=${this.multicodec}` - ); - this.lockStream(peerIdStr, stream); - return stream; + if (!stream) { + return; } - stream = await this.createStream(peerId); - this.lockStream(peerIdStr, stream); + this.log.info( + `Using stream for peerId=${peerIdStr} multicodec=${this.multicodec}` + ); + this.lockStream(peerIdStr, stream); return stream; } - private async createStream(peerId: PeerId, retries = 0): Promise { + private async createStream( + peerId: PeerId, + retries = 0 + ): Promise { const connections = this.libp2p.connectionManager.getConnections(peerId); const connection = selectOpenConnection(connections); if (!connection) { - throw new Error( + this.log.error( `Failed to get a connection to the peer peerId=${peerId.toString()} multicodec=${this.multicodec}` ); + return; } let lastError: unknown; @@ -77,9 +80,10 @@ export class StreamManager { } if (!stream) { - throw new Error( + this.log.error( `Failed to create a new stream for ${peerId.toString()} -- ` + lastError ); + return; } return stream; @@ -141,6 +145,9 @@ export class StreamManager { const connection = selectOpenConnection(connections); if (!connection) { + this.log.info( + `No open connection found for peerId=${peerId.toString()} multicodec=${this.multicodec}` + ); return; } @@ -149,16 +156,27 @@ export class StreamManager { ); if (!stream) { + this.log.info( + `No open stream found for peerId=${peerId.toString()} multicodec=${this.multicodec}` + ); return; } const isStreamUnusable = ["done", "closed", "closing"].includes( stream.writeStatus || "" ); + if (isStreamUnusable || this.isStreamLocked(stream)) { + this.log.info( + `Stream for peerId=${peerId.toString()} multicodec=${this.multicodec} is unusable` + ); return; } + this.log.info( + `Found open stream for peerId=${peerId.toString()} multicodec=${this.multicodec}` + ); + return stream; } diff --git a/packages/discovery/src/dns/dns.spec.ts b/packages/discovery/src/dns/dns.spec.ts index ce4e15c4ba..2925948f63 100644 --- a/packages/discovery/src/dns/dns.spec.ts +++ b/packages/discovery/src/dns/dns.spec.ts @@ -1,5 +1,6 @@ import type { DnsClient } from "@waku/interfaces"; import { expect } from "chai"; +import sinon from "sinon"; import { DnsNodeDiscovery } from "./dns.js"; import testData from "./testdata.json" with { type: "json" }; @@ -225,13 +226,18 @@ describe("DNS Node Discovery w/ capabilities", () => { const dnsNodeDiscovery = new DnsNodeDiscovery(mockDns); - const iterator = dnsNodeDiscovery.getNextPeer([mockData.enrTree]); - const { value: peer } = await iterator.next(); + const randomStub = sinon.stub(Math, "random").returns(0); + try { + const iterator = dnsNodeDiscovery.getNextPeer([mockData.enrTree]); + const { value: peer } = await iterator.next(); - expect(peer.peerId?.toString()).to.eq( - "16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F" - ); - expect(mockDns.hasThrown).to.be.false; + expect(peer.peerId?.toString()).to.eq( + "16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F" + ); + expect(mockDns.hasThrown).to.be.false; + } finally { + randomStub.restore(); + } }); it("retrieves all peers (3) when branch entries are composed of multiple strings", async function () { diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 66ff8fd4bd..290f64722e 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -53,11 +53,10 @@ export class WakuPeerExchange implements IPeerExchange { }; } - let stream; - try { - stream = await this.streamManager.getStream(peerId); - } catch (err) { - log.error("Failed to get stream", err); + const stream = await this.streamManager.getStream(peerId); + + if (!stream) { + log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); return { peerInfos: null, error: ProtocolError.NO_STREAM_AVAILABLE