fix: improve error handling for stream manager (#2546)

* fix: improve error handling for stream manager

* fix browser tests

* update logs for stream manager

* fix tests

* fix playwright
This commit is contained in:
Sasha 2025-08-12 23:25:23 +02:00 committed by GitHub
parent dc5155056b
commit ada265731a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 129 additions and 127 deletions

41
package-lock.json generated
View File

@ -7649,7 +7649,7 @@
"version": "7.7.0",
"resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.7.0.tgz",
"integrity": "sha512-k107IF4+Xr7UHjwDc7Cfd6PRQfbdkiRabXGRjo07b4WyPahFBZCZ1sE+BNxYIJPPg73UkfOsVOLwqVc/6ETrIA==",
"dev": true,
"devOptional": true,
"license": "MIT"
},
"node_modules/@types/send": {
@ -7871,7 +7871,7 @@
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/scope-manager/-/scope-manager-6.21.0.tgz",
"integrity": "sha512-OwLUIWZJry80O99zvqXVEioyniJMa+d2GrqpUTqi5/v5D5rOrppJVBPa0yKCblcigC0/aYAzxxqQ1B+DS2RYsg==",
"dev": true,
"devOptional": true,
"license": "MIT",
"dependencies": {
"@typescript-eslint/types": "6.21.0",
@ -7933,7 +7933,7 @@
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/types/-/types-6.21.0.tgz",
"integrity": "sha512-1kFmZ1rOm5epu9NZEZm1kckCDGj5UJEf7P1kliH4LKu/RkwpsfqqGmY2OOcUs18lSlQBKLDYBOGxRVtrMN5lpg==",
"dev": true,
"devOptional": true,
"license": "MIT",
"engines": {
"node": "^16.0.0 || >=18.0.0"
@ -7947,7 +7947,7 @@
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/typescript-estree/-/typescript-estree-6.21.0.tgz",
"integrity": "sha512-6npJTkZcO+y2/kr+z0hc4HwNfrrP4kNYh57ek7yCNlrBjWQ1Y0OS7jiZTkgumrvkX5HkEKXFZkkdFNkaW2wmUQ==",
"dev": true,
"devOptional": true,
"license": "BSD-2-Clause",
"dependencies": {
"@typescript-eslint/types": "6.21.0",
@ -7976,7 +7976,7 @@
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-6.21.0.tgz",
"integrity": "sha512-NfWVaC8HP9T8cbKQxHcsJBY5YE1O33+jpMwN45qzWWaPDZgLIbo12toGMWnmhvCpd3sIxkpDw3Wv1B3dYrbDQQ==",
"dev": true,
"devOptional": true,
"license": "MIT",
"dependencies": {
"@eslint-community/eslint-utils": "^4.4.0",
@ -8002,7 +8002,7 @@
"version": "6.21.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-6.21.0.tgz",
"integrity": "sha512-JJtkDduxLi9bivAB+cYOVMtbkqdPOhZ+ZI5LC47MIRrDV4Yn2o+ZnW10Nkmr28xRpSpdJ6Sm42Hjf2+REYXm0A==",
"dev": true,
"devOptional": true,
"license": "MIT",
"dependencies": {
"@typescript-eslint/types": "6.21.0",
@ -9127,31 +9127,6 @@
"typescript": ">=4.8.4 <5.9.0"
}
},
"node_modules/aegir/node_modules/@typescript-eslint/utils": {
"version": "8.34.1",
"resolved": "https://registry.npmjs.org/@typescript-eslint/utils/-/utils-8.34.1.tgz",
"integrity": "sha512-mqOwUdZ3KjtGk7xJJnLbHxTuWVn3GO2WZZuM+Slhkun4+qthLdXx32C8xIXbO1kfCECb3jIs3eoxK3eryk7aoQ==",
"license": "MIT",
"optional": true,
"peer": true,
"dependencies": {
"@eslint-community/eslint-utils": "^4.7.0",
"@typescript-eslint/scope-manager": "8.34.1",
"@typescript-eslint/types": "8.34.1",
"@typescript-eslint/typescript-estree": "8.34.1"
},
"engines": {
"node": "^18.18.0 || ^20.9.0 || >=21.1.0"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/typescript-eslint"
},
"peerDependencies": {
"eslint": "^8.57.0 || ^9.0.0",
"typescript": ">=4.8.4 <5.9.0"
}
},
"node_modules/aegir/node_modules/@typescript-eslint/visitor-keys": {
"version": "8.34.1",
"resolved": "https://registry.npmjs.org/@typescript-eslint/visitor-keys/-/visitor-keys-8.34.1.tgz",
@ -22612,7 +22587,7 @@
"version": "9.0.3",
"resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.3.tgz",
"integrity": "sha512-RHiac9mvaRw0x3AYRgDC1CxAP7HTcNrrECeA8YYJeWnpo+2Q5CegtZjaotWTWxDG3UeGA1coE05iH1mPjT/2mg==",
"dev": true,
"devOptional": true,
"license": "ISC",
"dependencies": {
"brace-expansion": "^2.0.1"
@ -34122,7 +34097,7 @@
"version": "1.4.3",
"resolved": "https://registry.npmjs.org/ts-api-utils/-/ts-api-utils-1.4.3.tgz",
"integrity": "sha512-i3eMG77UTMD0hZhgRS562pv83RC6ukSAC2GMNWc+9dieh/+jDM5u5YG+NHX6VNDRHQcHwmsTHctP9LhbC3WxVw==",
"dev": true,
"devOptional": true,
"license": "MIT",
"engines": {
"node": ">=16"

View File

@ -23,7 +23,7 @@ test.describe("waku", () => {
try {
await window.wakuAPI.createWakuNode({
...config.defaultNodeConfig,
networkConfig: config.cluster42.networkConfig
networkConfig: config.networkConfig
});
await window.wakuAPI.startNode();
return { success: true };

View File

@ -1,21 +1,5 @@
export const NETWORK_CONFIG = {
cluster42: {
networkConfig: {
clusterId: 42,
shards: [0]
},
peers: [
"/dns4/waku-test.bloxy.one/tcp/8095/wss/p2p/16Uiu2HAmSZbDB7CusdRhgkD81VssRjQV5ZH13FbzCGcdnbbh6VwZ",
"/dns4/waku.fryorcraken.xyz/tcp/8000/wss/p2p/16Uiu2HAmMRvhDHrtiHft1FTUYnn6cVA8AWVrTyLUayJJ3MWpUZDB",
"/dns4/ivansete.xyz/tcp/8000/wss/p2p/16Uiu2HAmDAHuJ8w9zgxVnhtFe8otWNJdCewPAerJJPbXJcn8tu4r"
]
},
sandbox: {
networkConfig: {
clusterId: 1,
shards: [0]
},
"waku.sandbox": {
peers: [
"/dns4/node-01.do-ams3.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb",
"/dns4/node-01.gc-us-central1-a.waku.sandbox.status.im/tcp/30303/p2p/16Uiu2HAmRv1iQ3NoMMcjbtRmKxPuYBbF9nLYz2SDv9MTN8WhGuUU",
@ -23,6 +7,19 @@ export const NETWORK_CONFIG = {
]
},
"waku.test": {
peers: [
"/dns4/node-01.do-ams3.waku.test.status.im/tcp/8000/wss/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W",
"/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/8000/wss/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG",
"/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/8000/wss/p2p/16Uiu2HAkzHaTP5JsUwfR9NR8Rj9HC24puS6ocaU8wze4QrXr9iXp"
]
},
networkConfig: {
clusterId: 1,
shards: [0]
},
// Default node configuration
defaultNodeConfig: {
defaultBootstrap: false
@ -35,6 +32,4 @@ export const NETWORK_CONFIG = {
}
};
// Active environment - change this to switch between cluster42 and sandbox
export const ACTIVE_ENV = 'cluster42';
export const ACTIVE_PEERS = NETWORK_CONFIG[ACTIVE_ENV].peers;
export const ACTIVE_PEERS = NETWORK_CONFIG["waku.test"].peers;

View File

@ -1,4 +1,4 @@
import type { PeerId, Stream } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { IncomingStreamData } from "@libp2p/interface-internal";
import {
type ContentTopic,
@ -65,6 +65,16 @@ export class FilterCore {
): Promise<CoreProtocolResult> {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
}
const request = FilterSubscribeRpc.createSubscribeRequest(
pubsubTopic,
contentTopics
@ -121,14 +131,10 @@ export class FilterCore {
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peerId.toString()}`,
error
);
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
success: null,
failure: {
@ -168,6 +174,17 @@ export class FilterCore {
): Promise<CoreProtocolResult> {
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
success: null,
failure: {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peerId
}
};
}
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
const res = await pipe(
@ -211,14 +228,10 @@ export class FilterCore {
}
public async ping(peerId: PeerId): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peerId.toString()}`,
error
);
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
success: null,
failure: {

View File

@ -1,4 +1,4 @@
import type { PeerId, Stream } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import {
type CoreProtocolResult,
type IEncoder,
@ -95,11 +95,10 @@ export class LightPushCore {
};
}
let stream: Stream;
try {
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
success: null,
failure: {

View File

@ -57,11 +57,10 @@ class Metadata implements IMetadata {
};
}
let stream;
try {
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
shardInfo: null,
error: ProtocolError.NO_STREAM_AVAILABLE

View File

@ -155,7 +155,7 @@ describe("StoreCore", () => {
});
it("ends if stream creation fails", async () => {
mockStreamManager.getStream.rejects(new Error("Stream creation failed"));
mockStreamManager.getStream.resolves(undefined as any);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
const result = await generator.next();
expect(result.done).to.be.true;

View File

@ -80,11 +80,12 @@ export class StoreCore {
contentTopics: queryOpts.contentTopics
});
let stream;
try {
stream = await this.streamManager.getStream(peerId);
} catch (e) {
log.error("Failed to get stream", e);
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(
`Failed to get a stream for remote peer:${peerId.toString()}`
);
break;
}

View File

@ -45,21 +45,13 @@ describe("StreamManager", () => {
}
});
it("should throw if no connection provided", async () => {
it("should return undefined if no connection provided", async () => {
streamManager["libp2p"]["connectionManager"]["getConnections"] = (
_peerId: PeerId | undefined
) => [];
let error: Error | undefined;
try {
await streamManager.getStream(mockPeer.id);
} catch (e) {
error = e as Error;
}
expect(error).not.to.be.undefined;
expect(error?.message).to.include(mockPeer.id.toString());
expect(error?.message).to.include(MULTICODEC);
const stream = await streamManager.getStream(mockPeer.id);
expect(stream).to.be.undefined;
});
it("should create a new stream if no existing for protocol found", async () => {
@ -114,8 +106,11 @@ describe("StreamManager", () => {
streamManager.getStream(mockPeer.id)
]);
expect(stream1).to.not.be.undefined;
expect(stream2).to.not.be.undefined;
const expected = ["1", "2"].toString();
const actual = [stream1.id, stream2.id].sort().toString();
const actual = [stream1?.id, stream2?.id].sort().toString();
expect(actual).to.be.eq(expected);
});
@ -124,7 +119,9 @@ describe("StreamManager", () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } })
new CustomEvent("peer:update", {
detail: { peer: { id: mockPeer.id, protocols: [] } }
})
);
expect(scheduleNewStreamSpy.calledOnce).to.be.false;
@ -135,7 +132,7 @@ describe("StreamManager", () => {
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
detail: { peer: { id: mockPeer.id, protocols: [MULTICODEC] } }
})
);
@ -160,7 +157,7 @@ describe("StreamManager", () => {
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
detail: { peer: { id: mockPeer.id, protocols: [MULTICODEC] } }
})
);

View File

@ -23,7 +23,7 @@ export class StreamManager {
);
}
public async getStream(peerId: PeerId): Promise<Stream> {
public async getStream(peerId: PeerId): Promise<Stream | undefined> {
const peerIdStr = peerId.toString();
const scheduledStream = this.streamPool.get(peerIdStr);
@ -32,30 +32,33 @@ export class StreamManager {
await scheduledStream;
}
let stream = this.getOpenStreamForCodec(peerId);
const stream =
this.getOpenStreamForCodec(peerId) || (await this.createStream(peerId));
if (stream) {
this.log.info(
`Found existing stream peerId=${peerIdStr} multicodec=${this.multicodec}`
);
this.lockStream(peerIdStr, stream);
return stream;
if (!stream) {
return;
}
stream = await this.createStream(peerId);
this.lockStream(peerIdStr, stream);
this.log.info(
`Using stream for peerId=${peerIdStr} multicodec=${this.multicodec}`
);
this.lockStream(peerIdStr, stream);
return stream;
}
private async createStream(peerId: PeerId, retries = 0): Promise<Stream> {
private async createStream(
peerId: PeerId,
retries = 0
): Promise<Stream | undefined> {
const connections = this.libp2p.connectionManager.getConnections(peerId);
const connection = selectOpenConnection(connections);
if (!connection) {
throw new Error(
this.log.error(
`Failed to get a connection to the peer peerId=${peerId.toString()} multicodec=${this.multicodec}`
);
return;
}
let lastError: unknown;
@ -77,9 +80,10 @@ export class StreamManager {
}
if (!stream) {
throw new Error(
this.log.error(
`Failed to create a new stream for ${peerId.toString()} -- ` + lastError
);
return;
}
return stream;
@ -141,6 +145,9 @@ export class StreamManager {
const connection = selectOpenConnection(connections);
if (!connection) {
this.log.info(
`No open connection found for peerId=${peerId.toString()} multicodec=${this.multicodec}`
);
return;
}
@ -149,16 +156,27 @@ export class StreamManager {
);
if (!stream) {
this.log.info(
`No open stream found for peerId=${peerId.toString()} multicodec=${this.multicodec}`
);
return;
}
const isStreamUnusable = ["done", "closed", "closing"].includes(
stream.writeStatus || ""
);
if (isStreamUnusable || this.isStreamLocked(stream)) {
this.log.info(
`Stream for peerId=${peerId.toString()} multicodec=${this.multicodec} is unusable`
);
return;
}
this.log.info(
`Found open stream for peerId=${peerId.toString()} multicodec=${this.multicodec}`
);
return stream;
}

View File

@ -1,5 +1,6 @@
import type { DnsClient } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { DnsNodeDiscovery } from "./dns.js";
import testData from "./testdata.json" with { type: "json" };
@ -225,13 +226,18 @@ describe("DNS Node Discovery w/ capabilities", () => {
const dnsNodeDiscovery = new DnsNodeDiscovery(mockDns);
const iterator = dnsNodeDiscovery.getNextPeer([mockData.enrTree]);
const { value: peer } = await iterator.next();
const randomStub = sinon.stub(Math, "random").returns(0);
try {
const iterator = dnsNodeDiscovery.getNextPeer([mockData.enrTree]);
const { value: peer } = await iterator.next();
expect(peer.peerId?.toString()).to.eq(
"16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F"
);
expect(mockDns.hasThrown).to.be.false;
expect(peer.peerId?.toString()).to.eq(
"16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F"
);
expect(mockDns.hasThrown).to.be.false;
} finally {
randomStub.restore();
}
});
it("retrieves all peers (3) when branch entries are composed of multiple strings", async function () {

View File

@ -53,11 +53,10 @@ export class WakuPeerExchange implements IPeerExchange {
};
}
let stream;
try {
stream = await this.streamManager.getStream(peerId);
} catch (err) {
log.error("Failed to get stream", err);
const stream = await this.streamManager.getStream(peerId);
if (!stream) {
log.error(`Failed to get a stream for remote peer:${peerId.toString()}`);
return {
peerInfos: null,
error: ProtocolError.NO_STREAM_AVAILABLE