From e49e7289ae632a1c2f3d11b9dc668ca68749ac7c Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 14 May 2024 16:31:38 +0530 Subject: [PATCH] chore: improve StreamManager (#1994) * chore: improve stream manager * chore: some cleaning * chore: update API * chore: rename `getConnectionStatus` to `isConnected` * chore: use throw/catch * chore: remove await * remove redundant function and rename * chore: increase test timeout --- package-lock.json | 78 ++++++++-------- packages/core/src/lib/light_push/index.ts | 11 +-- packages/core/src/lib/metadata/index.ts | 11 ++- packages/core/src/lib/store/index.ts | 8 +- packages/core/src/lib/stream_manager.ts | 91 ++++++++++++++----- .../src/peer-exchange/waku_peer_exchange.ts | 11 ++- packages/interfaces/src/protocols.ts | 5 + .../filter/single_node/subscribe.node.spec.ts | 2 +- .../tests/tests/filter/subscribe.node.spec.ts | 2 +- 9 files changed, 143 insertions(+), 76 deletions(-) diff --git a/package-lock.json b/package-lock.json index 5b0b33e003..f1ba0148d8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36494,14 +36494,14 @@ }, "packages/core": { "name": "@waku/core", - "version": "0.0.28", + "version": "0.0.29", "license": "MIT OR Apache-2.0", "dependencies": { "@libp2p/ping": "^1.0.12", - "@waku/enr": "^0.0.22", - "@waku/interfaces": "0.0.23", - "@waku/proto": "0.0.6", - "@waku/utils": "0.0.16", + "@waku/enr": "^0.0.23", + "@waku/interfaces": "0.0.24", + "@waku/proto": "0.0.7", + "@waku/utils": "0.0.17", "debug": "^4.3.4", "it-all": "^3.0.4", "it-length-prefixed": "^9.0.4", @@ -36552,14 +36552,14 @@ }, "packages/discovery": { "name": "@waku/discovery", - "version": "0.0.1", + "version": "0.0.2", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/core": "0.0.28", - "@waku/enr": "0.0.22", - "@waku/interfaces": "0.0.23", - "@waku/proto": "^0.0.6", - "@waku/utils": "0.0.16", + "@waku/core": "0.0.29", + "@waku/enr": "0.0.23", + "@waku/interfaces": "0.0.24", + "@waku/proto": "^0.0.7", + "@waku/utils": "0.0.17", "debug": "^4.3.4", "dns-query": "^0.11.2", "hi-base32": "^0.5.1", @@ -36606,7 +36606,7 @@ }, "packages/enr": { "name": "@waku/enr", - "version": "0.0.22", + "version": "0.0.23", "license": "MIT OR Apache-2.0", "dependencies": { "@ethersproject/rlp": "^5.7.0", @@ -36614,7 +36614,7 @@ "@libp2p/peer-id": "^4.0.4", "@multiformats/multiaddr": "^12.0.0", "@noble/secp256k1": "^1.7.1", - "@waku/utils": "0.0.16", + "@waku/utils": "0.0.17", "debug": "^4.3.4", "js-sha3": "^0.9.2" }, @@ -36626,7 +36626,7 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.6", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.23", + "@waku/interfaces": "0.0.24", "chai": "^4.3.10", "cspell": "^8.6.1", "fast-check": "^3.15.1", @@ -36655,10 +36655,10 @@ }, "packages/interfaces": { "name": "@waku/interfaces", - "version": "0.0.23", + "version": "0.0.24", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/proto": "^0.0.6" + "@waku/proto": "^0.0.7" }, "devDependencies": { "@chainsafe/libp2p-gossipsub": "^12.0.0", @@ -36673,14 +36673,14 @@ }, "packages/message-encryption": { "name": "@waku/message-encryption", - "version": "0.0.26", + "version": "0.0.27", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/secp256k1": "^1.7.1", - "@waku/core": "0.0.28", - "@waku/interfaces": "0.0.23", - "@waku/proto": "0.0.6", - "@waku/utils": "0.0.16", + "@waku/core": "0.0.29", + "@waku/interfaces": "0.0.24", + "@waku/proto": "0.0.7", + "@waku/utils": "0.0.17", "debug": "^4.3.4", "js-sha3": "^0.9.2", "uint8arrays": "^5.0.1" @@ -36717,11 +36717,11 @@ }, "packages/message-hash": { "name": "@waku/message-hash", - "version": "0.1.12", + "version": "0.1.13", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/utils": "0.0.16" + "@waku/utils": "0.0.17" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", @@ -36731,7 +36731,7 @@ "@types/debug": "^4.1.12", "@types/mocha": "^10.0.6", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.23", + "@waku/interfaces": "0.0.24", "chai": "^4.3.10", "cspell": "^8.6.1", "fast-check": "^3.15.1", @@ -36757,7 +36757,7 @@ }, "packages/proto": { "name": "@waku/proto", - "version": "0.0.6", + "version": "0.0.7", "license": "MIT OR Apache-2.0", "dependencies": { "protons-runtime": "^5.4.0" @@ -36799,15 +36799,15 @@ }, "packages/relay": { "name": "@waku/relay", - "version": "0.0.11", + "version": "0.0.12", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-gossipsub": "^12.0.0", "@noble/hashes": "^1.3.2", - "@waku/core": "0.0.28", - "@waku/interfaces": "0.0.23", - "@waku/proto": "0.0.6", - "@waku/utils": "0.0.16", + "@waku/core": "0.0.29", + "@waku/interfaces": "0.0.24", + "@waku/proto": "0.0.7", + "@waku/utils": "0.0.17", "chai": "^4.3.10", "debug": "^4.3.4", "fast-check": "^3.15.1" @@ -36840,7 +36840,7 @@ }, "packages/sdk": { "name": "@waku/sdk", - "version": "0.0.24", + "version": "0.0.25", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "^14.1.0", @@ -36850,12 +36850,12 @@ "@libp2p/ping": "^1.0.12", "@libp2p/websockets": "^8.0.11", "@noble/hashes": "^1.3.3", - "@waku/core": "0.0.28", - "@waku/discovery": "0.0.1", - "@waku/interfaces": "0.0.23", - "@waku/proto": "^0.0.6", - "@waku/relay": "0.0.11", - "@waku/utils": "0.0.16", + "@waku/core": "0.0.29", + "@waku/discovery": "0.0.2", + "@waku/interfaces": "0.0.24", + "@waku/proto": "^0.0.7", + "@waku/relay": "0.0.12", + "@waku/utils": "0.0.17", "libp2p": "^1.1.2" }, "devDependencies": { @@ -36940,11 +36940,11 @@ }, "packages/utils": { "name": "@waku/utils", - "version": "0.0.16", + "version": "0.0.17", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/interfaces": "0.0.23", + "@waku/interfaces": "0.0.24", "chai": "^4.3.10", "debug": "^4.3.4", "uint8arrays": "^5.0.1" diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 2b8fb3ebd2..e1936023f8 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -98,18 +98,15 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { }; } - let stream: Stream | undefined; + let stream: Stream; try { stream = await this.getStream(peer); - } catch (err) { - log.error( - `Failed to get a stream for remote peer${peer.id.toString()}`, - err - ); + } catch (error) { + log.error("Failed to get stream", error); return { success: null, failure: { - error: ProtocolError.REMOTE_PEER_FAULT, + error: ProtocolError.NO_STREAM_AVAILABLE, peerId: peer.id } }; diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index e402b7d37c..e879e29c38 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -85,7 +85,16 @@ class Metadata extends BaseProtocol implements IMetadata { }; } - const stream = await this.getStream(peer); + let stream; + try { + stream = await this.getStream(peer); + } catch (error) { + log.error("Failed to get stream", error); + return { + shardInfo: null, + error: ProtocolError.NO_STREAM_AVAILABLE + }; + } const encodedResponse = await pipe( [request], diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 5d90882b98..fbf806efe0 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -92,7 +92,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore { const historyRpcQuery = HistoryRpc.createQuery(queryOpts); - const stream = await this.getStream(peer); + let stream; + try { + stream = await this.getStream(peer); + } catch (e) { + log.error("Failed to get stream", e); + break; + } const res = await pipe( [historyRpcQuery.encode()], diff --git a/packages/core/src/lib/stream_manager.ts b/packages/core/src/lib/stream_manager.ts index 834bb2f047..aeb6072d0e 100644 --- a/packages/core/src/lib/stream_manager.ts +++ b/packages/core/src/lib/stream_manager.ts @@ -1,11 +1,15 @@ import type { PeerUpdate, Stream } from "@libp2p/interface"; -import { Peer } from "@libp2p/interface"; +import type { Peer, PeerId } from "@libp2p/interface"; import { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { selectConnection } from "@waku/utils/libp2p"; +const CONNECTION_TIMEOUT = 5_000; +const RETRY_BACKOFF_BASE = 1_000; +const MAX_RETRIES = 3; + export class StreamManager { - private streamPool: Map>; + private readonly streamPool: Map>; private readonly log: Logger; constructor( @@ -14,12 +18,8 @@ export class StreamManager { public addEventListener: Libp2p["addEventListener"] ) { this.log = new Logger(`stream-manager:${multicodec}`); - this.addEventListener( - "peer:update", - this.handlePeerUpdateStreamPool.bind(this) - ); - this.getStream = this.getStream.bind(this); this.streamPool = new Map(); + this.addEventListener("peer:update", this.handlePeerUpdateStreamPool); } public async getStream(peer: Peer): Promise { @@ -27,47 +27,88 @@ export class StreamManager { const streamPromise = this.streamPool.get(peerIdStr); if (!streamPromise) { - return this.newStream(peer); // fallback by creating a new stream on the spot + return this.createStream(peer); } - // We have the stream, let's remove it from the map this.streamPool.delete(peerIdStr); + this.prepareStream(peer); - this.prepareNewStream(peer); - - const stream = await streamPromise; - - if (!stream || stream.status === "closed") { - return this.newStream(peer); // fallback by creating a new stream on the spot + try { + const stream = await streamPromise; + if (stream && stream.status !== "closed") { + return stream; + } + } catch (error) { + this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error); + this.log.warn("Attempting to create a new stream for the peer"); } - return stream; + return this.createStream(peer); } - private async newStream(peer: Peer): Promise { + private async createStream(peer: Peer, retries = 0): Promise { const connections = this.getConnections(peer.id); const connection = selectConnection(connections); + if (!connection) { throw new Error("Failed to get a connection to the peer"); } - return connection.newStream(this.multicodec); + + try { + return await connection.newStream(this.multicodec); + } catch (error) { + if (retries < MAX_RETRIES) { + const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries); + await new Promise((resolve) => setTimeout(resolve, backoff)); + return this.createStream(peer, retries + 1); + } + throw new Error( + `Failed to create a new stream for ${peer.id.toString()} -- ` + error + ); + } } - private prepareNewStream(peer: Peer): void { - const streamPromise = this.newStream(peer).catch(() => { - // No error thrown as this call is not triggered by the user + private prepareStream(peer: Peer): void { + const timeoutPromise = new Promise((resolve) => + setTimeout(resolve, CONNECTION_TIMEOUT) + ); + + const streamPromise = Promise.race([ + this.createStream(peer), + timeoutPromise.then(() => { + throw new Error("Connection timeout"); + }) + ]).catch((error) => { this.log.error( - `Failed to prepare a new stream for ${peer.id.toString()}` + `Failed to prepare a new stream for ${peer.id.toString()} -- `, + error ); }); + this.streamPool.set(peer.id.toString(), streamPromise); } private handlePeerUpdateStreamPool = (evt: CustomEvent): void => { - const peer = evt.detail.peer; + const { peer } = evt.detail; + if (peer.protocols.includes(this.multicodec)) { - this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`); - this.prepareNewStream(peer); + const isConnected = this.isConnectedTo(peer.id); + + if (isConnected) { + this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`); + this.prepareStream(peer); + } else { + const peerIdStr = peer.id.toString(); + this.streamPool.delete(peerIdStr); + this.log.info( + `Removed pending stream for disconnected peer ${peerIdStr}` + ); + } } }; + + private isConnectedTo(peerId: PeerId): boolean { + const connections = this.getConnections(peerId); + return connections.some((connection) => connection.status === "open"); + } } diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 078094cf9d..ee4d02cfd7 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -51,7 +51,16 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { }; } - const stream = await this.getStream(peer); + let stream; + try { + stream = await this.getStream(peer); + } catch (err) { + log.error("Failed to get stream", err); + return { + peerInfos: null, + error: ProtocolError.NO_STREAM_AVAILABLE + }; + } const res = await pipe( [rpcQuery.encode()], diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0ea4557614..b80b68860e 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -147,6 +147,11 @@ export enum ProtocolError { * on the connection manager before retrying. */ NO_PEER_AVAILABLE = "No peer available", + /** + * Failure to find a stream to the peer. This may be because the connection with the peer is not still alive. + * Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used. + */ + NO_STREAM_AVAILABLE = "No stream available", /** * The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE` * or `DECODE_FAILED` can be used. diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts index a521147921..bead2d8db2 100644 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts @@ -249,7 +249,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { }); it("Subscribe to 100 topics (new limit) at once and receives messages", async function () { - this.timeout(50000); + this.timeout(100_000); const topicCount = 100; const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index ee248c68ec..052108b87b 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -293,7 +293,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); it("Subscribe to 100 topics (new limit) at once and receives messages", async function () { - this.timeout(50000); + this.timeout(100_000); const topicCount = 100; const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });