From 558fb6ae2e7605379683617e9f94a19590ea59ee Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 24 Sep 2024 13:11:51 +0530 Subject: [PATCH] chore: some mutex lock-release improvements --- packages/sdk/src/protocols/base_protocol.ts | 195 ++++++++++-------- .../tests/tests/light-push/index.node.spec.ts | 1 + .../tests/light-push/peer_management.spec.ts | 14 +- 3 files changed, 115 insertions(+), 95 deletions(-) diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index a672f37589..64650ffab2 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -32,8 +32,6 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { RENEW_TIME_LOCK_DURATION ); - private maintainPeersMutex = new Mutex(); - private confirmPeersMutex = new Mutex(); private peersMutex = new Mutex(); public constructor( @@ -49,7 +47,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; - void this.setupEventListeners(); + // void this.setupEventListeners(); void this.startMaintainPeersInterval(maintainPeersInterval); } @@ -100,16 +98,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } - private setupEventListeners(): void { - this.core.addLibp2pEventListener( - "peer:connect", - () => void this.confirmPeers() - ); - this.core.addLibp2pEventListener( - "peer:disconnect", - () => void this.confirmPeers() - ); - } + // private setupEventListeners(): void { + // this.core.addLibp2pEventListener( + // "peer:connect", + // () => void this.confirmPeers() + // ); + // this.core.addLibp2pEventListener( + // "peer:disconnect", + // () => void this.confirmPeers() + // ); + // } /** * Checks if there are peers to send a message to. @@ -128,39 +126,53 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { protected hasPeers = async ( options: Partial = {} ): Promise => { - return this.peersMutex.runExclusive(async () => { - const { - autoRetry = false, - forceUseAllPeers = false, - initialDelay = 10, - maxAttempts = 3, - maxDelay = 100 - } = options; + const { + autoRetry = false, + forceUseAllPeers = false, + initialDelay = 10, + maxAttempts = 3, + maxDelay = 100 + } = options; - if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; + let needsMaintenance: boolean; + let currentPeerCount: number; - let attempts = 0; - while (attempts < maxAttempts) { - attempts++; - if (await this.maintainPeers()) { + const release = await this.peersMutex.acquire(); + try { + currentPeerCount = this.connectedPeers.length; + needsMaintenance = forceUseAllPeers || currentPeerCount === 0; + } finally { + release(); + } + + if (!needsMaintenance) return true; + + let attempts = 0; + while (attempts < maxAttempts) { + attempts++; + if (await this.maintainPeers()) { + const finalRelease = await this.peersMutex.acquire(); + try { if (this.peers.size < this.numPeersToUse) { this.log.warn( `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` ); } return true; + } finally { + finalRelease(); } - if (!autoRetry) return false; - const delayMs = Math.min( - initialDelay * Math.pow(2, attempts - 1), - maxDelay - ); - await delay(delayMs); } + if (!autoRetry) return false; + const delayMs = Math.min( + initialDelay * Math.pow(2, attempts - 1), + maxDelay + ); + await delay(delayMs); + } - this.log.error("Failed to find peers to send message to"); - return false; - }); + this.log.error("Failed to find peers to send message to"); + return false; }; /** @@ -189,52 +201,54 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * Maintains the peers list to `numPeersToUse`. */ private async maintainPeers(): Promise { - return await this.maintainPeersMutex.runExclusive(async () => { - try { - await this.confirmPeers(); + try { + await this.confirmPeers(); + + const numPeersToAdd = await this.peersMutex.runExclusive(() => { this.log.info(`Maintaining peers, current count: ${this.peers.size}`); + return this.numPeersToUse - this.peers.size; + }); - const numPeersToAdd = this.numPeersToUse - this.peers.size; - if (numPeersToAdd > 0) { - await this.findAndAddPeers(numPeersToAdd); - } + if (numPeersToAdd > 0) { + await this.findAndAddPeers(numPeersToAdd); + } + if (numPeersToAdd < 0) { + this.log.warn(` + Peer maintenance completed, but there are more than ${this.numPeersToUse} peers. + This should not happen. + `); + } + + await this.peersMutex.runExclusive(() => { this.log.info( `Peer maintenance completed, current count: ${this.peers.size}` ); this.renewPeersLocker.cleanUnlocked(); - return true; - } catch (error) { - if (error instanceof Error) { - this.log.error("Error during peer maintenance", { - error: error.message, - stack: error.stack - }); - } else { - this.log.error("Error during peer maintenance", { - error: String(error) - }); - } - return false; - } - }); + }); + + return true; + } catch (error) { + this.log.error("Error during peer maintenance", error); + return false; + } } private async confirmPeers(): Promise { - await this.confirmPeersMutex.runExclusive(async () => { - const connectedPeers = await this.core.connectedPeers(); - const currentPeerIds = new Set(this.peers.keys()); + const connectedPeers = await this.core.connectedPeers(); + const currentPeerIds = new Set(this.peers.keys()); - // Peers to add (connected but not in our list) - const peersToAdd = connectedPeers.filter( - (p) => !currentPeerIds.has(p.id.toString()) - ); + // Peers to add (connected but not in our list) + const peersToAdd = connectedPeers.filter( + (p) => !currentPeerIds.has(p.id.toString()) + ); - // Peers to remove (in our list but not connected) - const peersToRemove = Array.from(this.peers.values()).filter( - (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) - ); + // Peers to remove (in our list but not connected) + const peersToRemove = Array.from(this.peers.values()).filter( + (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) + ); + await this.peersMutex.runExclusive(async () => { // Add new peers for (const peer of peersToAdd) { this.peers.set(peer.id.toString(), peer); @@ -257,30 +271,33 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param numPeers The number of peers to find and add. */ private async findAndAddPeers(numPeers: number): Promise { - return this.peersMutex.runExclusive(async () => { + let newPeers: Peer[]; + const release = await this.peersMutex.acquire(); + try { this.log.info(`Finding and adding ${numPeers} new peers`); - try { - const additionalPeers = await this.findAdditionalPeers(numPeers); - const dials = additionalPeers.map((peer) => - this.connectionManager.attemptDial(peer.id) - ); + newPeers = await this.findAdditionalPeers(numPeers); + } finally { + release(); + } - await Promise.all(dials); + const dials = await Promise.all( + newPeers.map((peer) => this.connectionManager.attemptDial(peer.id)) + ); - additionalPeers.forEach((peer) => - this.peers.set(peer.id.toString(), peer) - ); - this.updatePeers(new Map(this.peers)); - - this.log.info( - `Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}` - ); - return additionalPeers; - } catch (error) { - this.log.error("Error finding and adding new peers:", error); - throw error; - } - }); + const finalRelease = await this.peersMutex.acquire(); + try { + const successfulPeers = newPeers.filter((_, index) => dials[index]); + successfulPeers.forEach((peer) => + this.peers.set(peer.id.toString(), peer) + ); + this.updatePeers(new Map(this.peers)); + this.log.info( + `Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}` + ); + return successfulPeers; + } finally { + finalRelease(); + } } /** diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 3b332d245d..d9bc508bc5 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -72,6 +72,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); + expect(pushResponse.successes.length).to.eq(numServiceNodes); } diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 389cb936b4..21ad42f407 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -47,12 +47,14 @@ describe("Waku Light Push: Connection Management: E2E", function () { expect(failures?.length || 0).to.equal(0); }); - it("should push to available amount of connection if less than required", async function () { - const connections = waku.libp2p.getConnections(); - await Promise.all( - connections - .slice(0, connections.length - 1) - .map((c) => waku.connectionManager.dropConnection(c.remotePeer)) + it.only("Failed peers are renewed", async function () { + // send a lightpush request -- should have all successes + const response1 = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello_World") + }); + + expect(response1.successes.length).to.be.equal( + waku.lightPush.numPeersToUse ); const { successes, failures } = await waku.lightPush.send(encoder, {