diff --git a/.eslintrc.json b/.eslintrc.json index 2f31d3e6ef..52ef66a0f5 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -26,6 +26,7 @@ } ], "@typescript-eslint/explicit-member-accessibility": "error", + "@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_" }], "prettier/prettier": [ "error", { diff --git a/package-lock.json b/package-lock.json index cb8799416c..6ac618e40c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -39111,7 +39111,8 @@ "mocha": "^10.3.0", "npm-run-all": "^4.1.5", "process": "^0.11.10", - "rollup": "^4.12.0" + "rollup": "^4.12.0", + "sinon": "^18.0.0" }, "engines": { "node": ">=20" diff --git a/packages/core/package.json b/packages/core/package.json index 027c66623d..5039fb9e5c 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -92,6 +92,7 @@ "@types/uuid": "^9.0.8", "@waku/build-utils": "*", "chai": "^4.3.10", + "sinon": "^18.0.0", "cspell": "^8.6.1", "fast-check": "^3.19.0", "ignore-loader": "^0.1.2", diff --git a/packages/core/src/lib/stream_manager/stream_manager.spec.ts b/packages/core/src/lib/stream_manager/stream_manager.spec.ts new file mode 100644 index 0000000000..6706a5ff9f --- /dev/null +++ b/packages/core/src/lib/stream_manager/stream_manager.spec.ts @@ -0,0 +1,161 @@ +import { Connection, Peer, PeerId, Stream } from "@libp2p/interface"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { StreamManager } from "./stream_manager.js"; + +const MULTICODEC = "/test"; + +describe("StreamManager", () => { + let eventTarget: EventTarget; + let streamManager: StreamManager; + + const mockPeer: Peer = { + id: { + toString() { + return "1"; + } + } + } as unknown as Peer; + + beforeEach(() => { + eventTarget = new EventTarget(); + streamManager = new StreamManager( + MULTICODEC, + () => [], + eventTarget.addEventListener.bind(eventTarget) + ); + }); + + it("should return usable stream attached to connection", async () => { + for (const writeStatus of ["ready", "writing"]) { + const con1 = createMockConnection(); + con1.streams = [ + createMockStream({ id: "1", protocol: MULTICODEC, writeStatus }) + ]; + + streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1]; + + const stream = await streamManager.getStream(mockPeer); + + expect(stream).not.to.be.undefined; + expect(stream?.id).to.be.eq("1"); + } + }); + + it("should throw if no connection provided", async () => { + streamManager["getConnections"] = (_peerId: PeerId | undefined) => []; + + let error: Error | undefined; + try { + await streamManager.getStream(mockPeer); + } catch (e) { + error = e as Error; + } + + expect(error).not.to.be.undefined; + expect(error?.message).to.include(mockPeer.id.toString()); + expect(error?.message).to.include(MULTICODEC); + }); + + it("should create a new stream if no existing for protocol found", async () => { + for (const writeStatus of ["done", "closed", "closing"]) { + const con1 = createMockConnection(); + con1.streams = [ + createMockStream({ id: "1", protocol: MULTICODEC, writeStatus }) + ]; + + const newStreamSpy = sinon.spy(async (_protocol, _options) => + createMockStream({ + id: "2", + protocol: MULTICODEC, + writeStatus: "writable" + }) + ); + + con1.newStream = newStreamSpy; + streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1]; + + const stream = await streamManager.getStream(mockPeer); + + expect(stream).not.to.be.undefined; + expect(stream?.id).to.be.eq("2"); + + expect(newStreamSpy.calledOnce).to.be.true; + expect(newStreamSpy.calledWith(MULTICODEC)).to.be.true; + } + }); + + it("peer:update - should do nothing if another protocol hit", async () => { + const scheduleNewStreamSpy = sinon.spy(); + streamManager["scheduleNewStream"] = scheduleNewStreamSpy; + eventTarget.dispatchEvent( + new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } }) + ); + + expect(scheduleNewStreamSpy.calledOnce).to.be.false; + }); + + it("peer:update - should schedule stream creation IF protocol hit AND no stream found on connection", async () => { + const scheduleNewStreamSpy = sinon.spy(); + streamManager["scheduleNewStream"] = scheduleNewStreamSpy; + eventTarget.dispatchEvent( + new CustomEvent("peer:update", { + detail: { peer: { protocols: [MULTICODEC] } } + }) + ); + + expect(scheduleNewStreamSpy.calledOnce).to.be.true; + }); + + it("peer:update - should not schedule stream creation IF protocol hit AND stream found on connection", async () => { + const con1 = createMockConnection(); + con1.streams = [ + createMockStream({ + id: "1", + protocol: MULTICODEC, + writeStatus: "writable" + }) + ]; + streamManager["getConnections"] = (_id) => [con1]; + + const scheduleNewStreamSpy = sinon.spy(); + streamManager["scheduleNewStream"] = scheduleNewStreamSpy; + + eventTarget.dispatchEvent( + new CustomEvent("peer:update", { + detail: { peer: { protocols: [MULTICODEC] } } + }) + ); + + expect(scheduleNewStreamSpy.calledOnce).to.be.false; + }); +}); + +type MockConnectionOptions = { + status?: string; + open?: number; +}; + +function createMockConnection(options: MockConnectionOptions = {}): Connection { + return { + status: options.status || "open", + timeline: { + open: options.open || 1 + } + } as Connection; +} + +type MockStreamOptions = { + id?: string; + protocol?: string; + writeStatus?: string; +}; + +function createMockStream(options: MockStreamOptions): Stream { + return { + id: options.id, + protocol: options.protocol, + writeStatus: options.writeStatus || "ready" + } as Stream; +} diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 2896ff583f..6b2799a706 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -1,47 +1,41 @@ -import type { PeerUpdate, Stream } from "@libp2p/interface"; -import type { Peer, PeerId } from "@libp2p/interface"; -import { Libp2p } from "@waku/interfaces"; +import type { Peer, PeerId, PeerUpdate, Stream } from "@libp2p/interface"; +import type { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -import { selectConnection } from "./utils.js"; - -const CONNECTION_TIMEOUT = 5_000; -const RETRY_BACKOFF_BASE = 1_000; -const MAX_RETRIES = 3; +import { selectOpenConnection } from "./utils.js"; export class StreamManager { - private readonly streamPool: Map>; private readonly log: Logger; + private ongoingCreation: Set = new Set(); + private streamPool: Map> = new Map(); + public constructor( - public multicodec: string, - public getConnections: Libp2p["getConnections"], - public addEventListener: Libp2p["addEventListener"] + private multicodec: string, + private getConnections: Libp2p["getConnections"], + private addEventListener: Libp2p["addEventListener"] ) { this.log = new Logger(`stream-manager:${multicodec}`); - this.streamPool = new Map(); this.addEventListener("peer:update", this.handlePeerUpdateStreamPool); } public async getStream(peer: Peer): Promise { - const peerIdStr = peer.id.toString(); - const streamPromise = this.streamPool.get(peerIdStr); + const peerId = peer.id.toString(); - if (!streamPromise) { - return this.createStream(peer); + const scheduledStream = this.streamPool.get(peerId); + + if (scheduledStream) { + this.streamPool.delete(peerId); + await scheduledStream; } - this.streamPool.delete(peerIdStr); - this.prepareStream(peer); + const stream = this.getOpenStreamForCodec(peer.id); - try { - const stream = await streamPromise; - if (stream && stream.status !== "closed") { - return stream; - } - } catch (error) { - this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error); - this.log.warn("Attempting to create a new stream for the peer"); + if (stream) { + this.log.info( + `Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + return stream; } return this.createStream(peer); @@ -49,67 +43,112 @@ export class StreamManager { private async createStream(peer: Peer, retries = 0): Promise { const connections = this.getConnections(peer.id); - const connection = selectConnection(connections); + const connection = selectOpenConnection(connections); if (!connection) { - throw new Error("Failed to get a connection to the peer"); + throw new Error( + `Failed to get a connection to the peer peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + } + + let lastError: unknown; + let stream: Stream | undefined; + + for (let i = 0; i < retries + 1; i++) { + try { + this.log.info( + `Attempting to create a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + stream = await connection.newStream(this.multicodec); + this.log.info( + `Created stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + break; + } catch (error) { + lastError = error; + } + } + + if (!stream) { + throw new Error( + `Failed to create a new stream for ${peer.id.toString()} -- ` + + lastError + ); + } + + return stream; + } + + private async createStreamWithLock(peer: Peer): Promise { + const peerId = peer.id.toString(); + + if (this.ongoingCreation.has(peerId)) { + this.log.info( + `Skipping creation of a stream due to lock for peerId=${peerId} multicodec=${this.multicodec}` + ); + return; } try { - return await connection.newStream(this.multicodec); + this.ongoingCreation.add(peerId); + await this.createStream(peer); } catch (error) { - if (retries < MAX_RETRIES) { - const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries); - await new Promise((resolve) => setTimeout(resolve, backoff)); - return this.createStream(peer, retries + 1); - } - throw new Error( - `Failed to create a new stream for ${peer.id.toString()} -- ` + error - ); + this.log.error(`Failed to createStreamWithLock:`, error); + } finally { + this.ongoingCreation.delete(peerId); } - } - private prepareStream(peer: Peer): void { - const timeoutPromise = new Promise((resolve) => - setTimeout(resolve, CONNECTION_TIMEOUT) - ); - - const streamPromise = Promise.race([ - this.createStream(peer), - timeoutPromise.then(() => { - throw new Error("Connection timeout"); - }) - ]).catch((error) => { - this.log.error( - `Failed to prepare a new stream for ${peer.id.toString()} -- `, - error - ); - }); - - this.streamPool.set(peer.id.toString(), streamPromise); + return; } private handlePeerUpdateStreamPool = (evt: CustomEvent): void => { const { peer } = evt.detail; - if (peer.protocols.includes(this.multicodec)) { - const isConnected = this.isConnectedTo(peer.id); - - if (isConnected) { - this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`); - this.prepareStream(peer); - } else { - const peerIdStr = peer.id.toString(); - this.streamPool.delete(peerIdStr); - this.log.info( - `Removed pending stream for disconnected peer ${peerIdStr}` - ); - } + if (!peer.protocols.includes(this.multicodec)) { + return; } + + const stream = this.getOpenStreamForCodec(peer.id); + + if (stream) { + return; + } + + this.scheduleNewStream(peer); }; - private isConnectedTo(peerId: PeerId): boolean { + private scheduleNewStream(peer: Peer): void { + this.log.info( + `Scheduling creation of a stream for peerId=${peer.id.toString()} multicodec=${this.multicodec}` + ); + + // abandon previous attempt + if (this.streamPool.has(peer.id.toString())) { + this.streamPool.delete(peer.id.toString()); + } + + this.streamPool.set(peer.id.toString(), this.createStreamWithLock(peer)); + } + + private getOpenStreamForCodec(peerId: PeerId): Stream | undefined { const connections = this.getConnections(peerId); - return connections.some((connection) => connection.status === "open"); + const connection = selectOpenConnection(connections); + + if (!connection) { + return; + } + + const stream = connection.streams.find( + (s) => s.protocol === this.multicodec + ); + + const isStreamUnusable = ["done", "closed", "closing"].includes( + stream?.writeStatus || "" + ); + if (isStreamUnusable) { + return; + } + + return stream; } } diff --git a/packages/core/src/lib/stream_manager/utils.spec.ts b/packages/core/src/lib/stream_manager/utils.spec.ts new file mode 100644 index 0000000000..9cf93e8422 --- /dev/null +++ b/packages/core/src/lib/stream_manager/utils.spec.ts @@ -0,0 +1,65 @@ +import { Connection } from "@libp2p/interface"; +import { expect } from "chai"; + +import { selectOpenConnection } from "./utils.js"; + +describe("selectOpenConnection", () => { + it("returns nothing if no connections present", () => { + const connection = selectOpenConnection([]); + + expect(connection).to.be.undefined; + }); + + it("returns only open connection if one present", () => { + let expectedCon = createMockConnection({ id: "1", status: "closed" }); + let actualCon = selectOpenConnection([expectedCon]); + + expect(actualCon).to.be.undefined; + + expectedCon = createMockConnection({ id: "1", status: "open" }); + actualCon = selectOpenConnection([expectedCon]); + + expect(actualCon).not.to.be.undefined; + expect(actualCon?.id).to.be.eq("1"); + }); + + it("should return no connections if no open connection provided", () => { + const closedCon1 = createMockConnection({ status: "closed" }); + const closedCon2 = createMockConnection({ status: "closed" }); + const actualCon = selectOpenConnection([closedCon1, closedCon2]); + + expect(actualCon).to.be.undefined; + }); + + it("should select older connection if present", () => { + const con1 = createMockConnection({ + status: "open", + open: 10 + }); + const con2 = createMockConnection({ + status: "open", + open: 15 + }); + + const actualCon = selectOpenConnection([con1, con2]); + + expect(actualCon).not.to.be.undefined; + expect(actualCon?.timeline.open).to.be.eq(15); + }); +}); + +type MockConnectionOptions = { + id?: string; + status?: string; + open?: number; +}; + +function createMockConnection(options: MockConnectionOptions = {}): Connection { + return { + id: options.id, + status: options.status, + timeline: { + open: options.open + } + } as Connection; +} diff --git a/packages/core/src/lib/stream_manager/utils.ts b/packages/core/src/lib/stream_manager/utils.ts index d6f9ace5a8..89511b02d4 100644 --- a/packages/core/src/lib/stream_manager/utils.ts +++ b/packages/core/src/lib/stream_manager/utils.ts @@ -1,22 +1,10 @@ import type { Connection } from "@libp2p/interface"; -export function selectConnection( +export function selectOpenConnection( connections: Connection[] ): Connection | undefined { - if (!connections.length) return; - if (connections.length === 1) return connections[0]; - - let latestConnection: Connection | undefined; - - connections.forEach((connection) => { - if (connection.status === "open") { - if (!latestConnection) { - latestConnection = connection; - } else if (connection.timeline.open > latestConnection.timeline.open) { - latestConnection = connection; - } - } - }); - - return latestConnection; + return connections + .filter((c) => c.status === "open") + .sort((left, right) => right.timeline.open - left.timeline.open) + .at(0); } diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 52009aa05c..84a883eb0a 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -59,19 +59,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); - await this.connectionManager.dropConnection(peerToDisconnect); - const peer = (await this.findAndAddPeers(1))[0]; if (!peer) { - this.log.error( - "Failed to find a new peer to replace the disconnected one." - ); + throw Error("Failed to find a new peer to replace the disconnected one."); } const updatedPeers = this.peers.filter( (peer) => !peer.id.equals(peerToDisconnect) ); this.updatePeers(updatedPeers); + await this.connectionManager.dropConnection(peerToDisconnect); this.log.info( `Peer ${peerToDisconnect} disconnected and removed from the peer list` diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts index 928903fac4..e75f81f94d 100644 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts @@ -291,7 +291,8 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { messageCollector.verifyReceivedMessage(index, { expectedContentTopic: topic, expectedMessageText: `Message for Topic ${index + 1}`, - expectedPubsubTopic: TestPubsubTopic + expectedPubsubTopic: TestPubsubTopic, + checkTimestamp: false }); }); });