diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 06684d7e09..c5c508e742 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -30,3 +30,4 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; export { ConnectionManager } from "./lib/connection_manager.js"; export { KeepAliveManager } from "./lib/keep_alive_manager.js"; +export { StreamManager } from "./lib/stream_manager.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index d139eac3f9..22497b425b 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -3,11 +3,9 @@ import type { Stream } from "@libp2p/interface/connection"; import type { PeerId } from "@libp2p/interface/peer-id"; import { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol -} from "@waku/utils/libp2p"; +import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; + +import { StreamManager } from "./stream_manager.js"; /** * A class with predefined helpers, to be used as a base to implement Waku @@ -16,6 +14,7 @@ import { export class BaseProtocol implements IBaseProtocol { public readonly addLibp2pEventListener: Libp2p["addEventListener"]; public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; + protected streamManager: StreamManager; constructor( public multicodec: string, @@ -27,6 +26,17 @@ export class BaseProtocol implements IBaseProtocol { this.removeLibp2pEventListener = components.events.removeEventListener.bind( components.events ); + + this.streamManager = new StreamManager( + multicodec, + components.connectionManager.getConnections.bind( + components.connectionManager + ), + this.addLibp2pEventListener + ); + } + protected async getStream(peer: Peer): Promise { + return this.streamManager.getStream(peer); } public get peerStore(): PeerStore { @@ -50,15 +60,4 @@ export class BaseProtocol implements IBaseProtocol { ); return peer; } - protected async newStream(peer: Peer): Promise { - const connections = this.components.connectionManager.getConnections( - peer.id - ); - const connection = selectConnection(connections); - if (!connection) { - throw new Error("Failed to get a connection to the peer"); - } - - return connection.newStream(this.multicodec); - } } diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 85cd87be92..cefce17d2e 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -271,7 +271,7 @@ class Filter extends BaseProtocol implements IReceiver { this.setActiveSubscription( _pubSubTopic, peer.id.toString(), - new Subscription(_pubSubTopic, peer, this.newStream.bind(this, peer)) + new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer)) ); return subscription; diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index ce886410ea..51174b1ad8 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -103,7 +103,7 @@ class LightPush extends BaseProtocol implements ILightPush { let error: undefined | SendError = undefined; const peer = await this.getPeer(opts?.peerId); - const stream = await this.newStream(peer); + const stream = await this.getStream(peer); try { const res = await pipe( diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 05068c1358..b8aa405d95 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -254,7 +254,7 @@ class Store extends BaseProtocol implements IStore { const peer = await this.getPeer(options?.peerId); for await (const messages of paginate( - this.newStream.bind(this, peer), + this.getStream.bind(this, peer), queryOpts, decodersAsMap, options?.cursor diff --git a/packages/core/src/lib/stream_manager.ts b/packages/core/src/lib/stream_manager.ts new file mode 100644 index 0000000000..add8ac78e7 --- /dev/null +++ b/packages/core/src/lib/stream_manager.ts @@ -0,0 +1,69 @@ +import type { PeerUpdate } from "@libp2p/interface"; +import type { Stream } from "@libp2p/interface/connection"; +import { Peer } from "@libp2p/interface/peer-store"; +import { Libp2p } from "@waku/interfaces"; +import { selectConnection } from "@waku/utils/libp2p"; +import debug from "debug"; + +export class StreamManager { + private streamPool: Map>; + private log: debug.Debugger; + + constructor( + public multicodec: string, + public getConnections: Libp2p["getConnections"], + public addEventListener: Libp2p["addEventListener"] + ) { + this.log = debug(`waku:stream-manager:${multicodec}`); + this.addEventListener( + "peer:update", + this.handlePeerUpdateStreamPool.bind(this) + ); + this.getStream = this.getStream.bind(this); + this.streamPool = new Map(); + } + + public async getStream(peer: Peer): Promise { + const peerIdStr = peer.id.toString(); + const streamPromise = this.streamPool.get(peerIdStr); + + if (!streamPromise) { + return this.newStream(peer); // fallback by creating a new stream on the spot + } + + // We have the stream, let's remove it from the map + this.streamPool.delete(peerIdStr); + + this.prepareNewStream(peer); + + const stream = await streamPromise; + + if (stream.status === "closed") { + return this.newStream(peer); // fallback by creating a new stream on the spot + } + + return stream; + } + + private async newStream(peer: Peer): Promise { + const connections = this.getConnections(peer.id); + const connection = selectConnection(connections); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + return connection.newStream(this.multicodec); + } + + private prepareNewStream(peer: Peer): void { + const streamPromise = this.newStream(peer); + this.streamPool.set(peer.id.toString(), streamPromise); + } + + private handlePeerUpdateStreamPool = (evt: CustomEvent): void => { + const peer = evt.detail.peer; + if (peer.protocols.includes(this.multicodec)) { + this.log(`Optimistically opening a stream to ${peer.id.toString()}`); + this.prepareNewStream(peer); + } + }; +} diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index 94a8a3db94..4a088f51d7 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { const peer = await this.getPeer(params.peerId); - const stream = await this.newStream(peer); + const stream = await this.getStream(peer); const res = await pipe( [rpcQuery.encode()],