From 1aa57649aec5df16a9cd8cd8164106eb8e87e3c0 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 24 Sep 2024 11:19:38 +0530 Subject: [PATCH] feat: add mutex locks to tackle race conditions over shared state chore: some mutex lock-release improvements --- packages/sdk/package.json | 5 +- packages/sdk/src/protocols/base_protocol.ts | 121 +++++++++--------- .../tests/tests/light-push/index.node.spec.ts | 1 + .../tests/light-push/peer_management.spec.ts | 2 +- 4 files changed, 69 insertions(+), 60 deletions(-) diff --git a/packages/sdk/package.json b/packages/sdk/package.json index e2a6c6dc0b..befad4a17f 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -58,7 +58,7 @@ }, "dependencies": { "@chainsafe/libp2p-noise": "^15.1.0", - "@libp2p/bootstrap": "^10.1.2", + "@libp2p/bootstrap": "^10", "@libp2p/identify": "^2.1.2", "@libp2p/mplex": "^10.1.2", "@libp2p/ping": "^1.1.2", @@ -67,9 +67,10 @@ "@waku/core": "0.0.32", "@waku/discovery": "0.0.5", "@waku/interfaces": "0.0.27", + "@waku/message-hash": "0.1.16", "@waku/proto": "^0.0.8", "@waku/utils": "0.0.20", - "@waku/message-hash": "0.1.16", + "async-mutex": "^0.5.0", "libp2p": "^1.8.1" }, "devDependencies": { diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 161ccd2a01..7aa95a3d17 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -8,6 +8,7 @@ import { ProtocolUseOptions } from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; +import { Mutex } from "async-mutex"; interface Options { numPeersToUse?: number; @@ -27,11 +28,12 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { > | null = null; private log: Logger; - private maintainPeersLock = false; private readonly renewPeersLocker = new RenewPeerLocker( RENEW_TIME_LOCK_DURATION ); + private peersMutex = new Mutex(); + public constructor( protected core: BaseProtocol, protected connectionManager: ConnectionManager, @@ -45,7 +47,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; - void this.setupEventListeners(); + // void this.setupEventListeners(); void this.startMaintainPeersInterval(maintainPeersInterval); } @@ -61,7 +63,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); - await this.connectionManager.dropConnection(peerToDisconnect); + await this.connectionManager.dropConnection(peerToDisconnect); const peer = (await this.findAndAddPeers(1))[0]; if (!peer) { @@ -95,16 +97,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } - private setupEventListeners(): void { - this.core.addLibp2pEventListener( - "peer:connect", - () => void this.confirmPeers() - ); - this.core.addLibp2pEventListener( - "peer:disconnect", - () => void this.confirmPeers() - ); - } + // private setupEventListeners(): void { + // this.core.addLibp2pEventListener( + // "peer:connect", + // () => void this.confirmPeers() + // ); + // this.core.addLibp2pEventListener( + // "peer:disconnect", + // () => void this.confirmPeers() + // ); + // } /** * Checks if there are sufficient peers to send a message to. @@ -120,18 +122,34 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ): Promise { const { forceUseAllPeers = false, maxAttempts = 3 } = options; - if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; + let needsMaintenance: boolean; + let currentPeerCount: number; + + const release = await this.peersMutex.acquire(); + try { + currentPeerCount = this.connectedPeers.length; + needsMaintenance = forceUseAllPeers || currentPeerCount === 0; + } finally { + release(); + } + + if (!needsMaintenance) return true; let attempts = 0; while (attempts < maxAttempts) { attempts++; if (await this.maintainPeers()) { - if (this.peers.size < this.numPeersToUse) { - this.log.warn( - `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` - ); + const finalRelease = await this.peersMutex.acquire(); + try { + if (this.peers.size < this.numPeersToUse) { + this.log.warn( + `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` + ); + } + return true; + } finally { + finalRelease(); } - return true; } if (!autoRetry) { return false; @@ -181,9 +199,13 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * Maintains the peers list to `numPeersToUse`. */ private async maintainPeers(): Promise { - if (this.maintainPeersLock) { - return false; - } + try { + await this.confirmPeers(); + + const numPeersToAdd = await this.peersMutex.runExclusive(() => { + this.log.info(`Maintaining peers, current count: ${this.peers.size}`); + return this.numPeersToUse - this.peers.size; + }); this.maintainPeersLock = true; this.log.info(`Maintaining peers, current count: ${this.peers.length}`); @@ -197,27 +219,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } else { await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd)); } - - this.log.info( - `Peer maintenance completed, current count: ${this.peers.size}` - ); - this.renewPeersLocker.cleanUnlocked(); - return true; - } catch (error) { - if (error instanceof Error) { - this.log.error("Error during peer maintenance", { - error: error.message, - stack: error.stack - }); - } else { - this.log.error("Error during peer maintenance", { - error: String(error) - }); - } - return false; - } finally { - this.maintainPeersLock = false; - } + }); } /** @@ -225,27 +227,32 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param numPeers The number of peers to find and add. */ private async findAndAddPeers(numPeers: number): Promise { - this.log.info(`Finding and adding ${numPeers} new peers`); + let newPeers: Peer[]; + const release = await this.peersMutex.acquire(); try { - const additionalPeers = await this.findAdditionalPeers(numPeers); - const dials = additionalPeers.map((peer) => - this.connectionManager.attemptDial(peer.id) - ); + this.log.info(`Finding and adding ${numPeers} new peers`); + newPeers = await this.findAdditionalPeers(numPeers); + } finally { + release(); + } - await Promise.all(dials); + const dials = await Promise.all( + newPeers.map((peer) => this.connectionManager.attemptDial(peer.id)) + ); - additionalPeers.forEach((peer) => + const finalRelease = await this.peersMutex.acquire(); + try { + const successfulPeers = newPeers.filter((_, index) => dials[index]); + successfulPeers.forEach((peer) => this.peers.set(peer.id.toString(), peer) ); - this.updatePeers(this.peers); - + this.updatePeers(new Map(this.peers)); this.log.info( - `Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}` + `Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}` ); - return additionalPeers; - } catch (error) { - this.log.error("Error finding and adding new peers:", error); - throw error; + return successfulPeers; + } finally { + finalRelease(); } } diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 44c6dbee7c..fdc0915a92 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -72,6 +72,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); + expect(pushResponse.successes.length).to.eq(numServiceNodes); } diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 2718c9b29f..9cd66d1812 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -52,7 +52,7 @@ describe("Waku Light Push: Peer Management: E2E", function () { } }); - it("Failed peers are renewed", async function () { + it.only("Failed peers are renewed", async function () { // send a lightpush request -- should have all successes const response1 = await waku.lightPush.send( encoder,