feat: add mutex locks to tackle race conditions over shared state

chore: some mutex lock-release improvements
This commit is contained in:
Danish Arora 2024-09-24 11:19:38 +05:30
parent d38ba9cf1c
commit 1aa57649ae
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
4 changed files with 69 additions and 60 deletions

View File

@ -58,7 +58,7 @@
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
"@libp2p/bootstrap": "^10.1.2",
"@libp2p/bootstrap": "^10",
"@libp2p/identify": "^2.1.2",
"@libp2p/mplex": "^10.1.2",
"@libp2p/ping": "^1.1.2",
@ -67,9 +67,10 @@
"@waku/core": "0.0.32",
"@waku/discovery": "0.0.5",
"@waku/interfaces": "0.0.27",
"@waku/message-hash": "0.1.16",
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1"
},
"devDependencies": {

View File

@ -8,6 +8,7 @@ import {
ProtocolUseOptions
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
import { Mutex } from "async-mutex";
interface Options {
numPeersToUse?: number;
@ -27,11 +28,12 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
> | null = null;
private log: Logger;
private maintainPeersLock = false;
private readonly renewPeersLocker = new RenewPeerLocker(
RENEW_TIME_LOCK_DURATION
);
private peersMutex = new Mutex();
public constructor(
protected core: BaseProtocol,
protected connectionManager: ConnectionManager,
@ -45,7 +47,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
void this.setupEventListeners();
// void this.setupEventListeners();
void this.startMaintainPeersInterval(maintainPeersInterval);
}
@ -61,7 +63,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
await this.connectionManager.dropConnection(peerToDisconnect);
await this.connectionManager.dropConnection(peerToDisconnect);
const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
@ -95,16 +97,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
}
private setupEventListeners(): void {
this.core.addLibp2pEventListener(
"peer:connect",
() => void this.confirmPeers()
);
this.core.addLibp2pEventListener(
"peer:disconnect",
() => void this.confirmPeers()
);
}
// private setupEventListeners(): void {
// this.core.addLibp2pEventListener(
// "peer:connect",
// () => void this.confirmPeers()
// );
// this.core.addLibp2pEventListener(
// "peer:disconnect",
// () => void this.confirmPeers()
// );
// }
/**
* Checks if there are sufficient peers to send a message to.
@ -120,18 +122,34 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
): Promise<boolean> {
const { forceUseAllPeers = false, maxAttempts = 3 } = options;
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;
let needsMaintenance: boolean;
let currentPeerCount: number;
const release = await this.peersMutex.acquire();
try {
currentPeerCount = this.connectedPeers.length;
needsMaintenance = forceUseAllPeers || currentPeerCount === 0;
} finally {
release();
}
if (!needsMaintenance) return true;
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
if (this.peers.size < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
);
const finalRelease = await this.peersMutex.acquire();
try {
if (this.peers.size < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
);
}
return true;
} finally {
finalRelease();
}
return true;
}
if (!autoRetry) {
return false;
@ -181,9 +199,13 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* Maintains the peers list to `numPeersToUse`.
*/
private async maintainPeers(): Promise<boolean> {
if (this.maintainPeersLock) {
return false;
}
try {
await this.confirmPeers();
const numPeersToAdd = await this.peersMutex.runExclusive(() => {
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
return this.numPeersToUse - this.peers.size;
});
this.maintainPeersLock = true;
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
@ -197,27 +219,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
} else {
await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd));
}
this.log.info(
`Peer maintenance completed, current count: ${this.peers.size}`
);
this.renewPeersLocker.cleanUnlocked();
return true;
} catch (error) {
if (error instanceof Error) {
this.log.error("Error during peer maintenance", {
error: error.message,
stack: error.stack
});
} else {
this.log.error("Error during peer maintenance", {
error: String(error)
});
}
return false;
} finally {
this.maintainPeersLock = false;
}
});
}
/**
@ -225,27 +227,32 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding and adding ${numPeers} new peers`);
let newPeers: Peer[];
const release = await this.peersMutex.acquire();
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);
this.log.info(`Finding and adding ${numPeers} new peers`);
newPeers = await this.findAdditionalPeers(numPeers);
} finally {
release();
}
await Promise.all(dials);
const dials = await Promise.all(
newPeers.map((peer) => this.connectionManager.attemptDial(peer.id))
);
additionalPeers.forEach((peer) =>
const finalRelease = await this.peersMutex.acquire();
try {
const successfulPeers = newPeers.filter((_, index) => dials[index]);
successfulPeers.forEach((peer) =>
this.peers.set(peer.id.toString(), peer)
);
this.updatePeers(this.peers);
this.updatePeers(new Map(this.peers));
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}`
`Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
return successfulPeers;
} finally {
finalRelease();
}
}

View File

@ -72,6 +72,7 @@ const runTests = (strictNodeCheck: boolean): void => {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i))
});
expect(pushResponse.successes.length).to.eq(numServiceNodes);
}

View File

@ -52,7 +52,7 @@ describe("Waku Light Push: Peer Management: E2E", function () {
}
});
it("Failed peers are renewed", async function () {
it.only("Failed peers are renewed", async function () {
// send a lightpush request -- should have all successes
const response1 = await waku.lightPush.send(
encoder,