feat: add mutex locks to tackle race conditions over shared state

This commit is contained in:
Danish Arora 2024-09-24 11:19:38 +05:30
parent ac1a0304f3
commit 3384b875a9
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
3 changed files with 4394 additions and 10344 deletions

14498
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -61,7 +61,7 @@
},
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
"@libp2p/bootstrap": "^10.1.2",
"@libp2p/bootstrap": "^10",
"@libp2p/identify": "^2.1.2",
"@libp2p/mplex": "^10.1.2",
"@libp2p/ping": "^1.1.2",
@ -73,6 +73,7 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.20",
"@waku/message-hash": "0.1.16",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1"
},
"devDependencies": {

View File

@ -8,6 +8,7 @@ import {
ProtocolUseOptions
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
import { Mutex } from "async-mutex";
interface Options {
numPeersToUse?: number;
@ -27,11 +28,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
> | null = null;
private log: Logger;
private maintainPeersLock = false;
private readonly renewPeersLocker = new RenewPeerLocker(
RENEW_TIME_LOCK_DURATION
);
private maintainPeersMutex = new Mutex();
private confirmPeersMutex = new Mutex();
private peersMutex = new Mutex();
public constructor(
protected core: BaseProtocol,
protected connectionManager: ConnectionManager,
@ -59,28 +63,30 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @returns The new peer that was found and connected to.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
return this.peersMutex.runExclusive(async () => {
this.log.info(`Renewing peer ${peerToDisconnect}`);
await this.connectionManager.dropConnection(peerToDisconnect);
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers.delete(peerToDisconnect.toString());
this.updatePeers(this.peers);
this.peers.delete(peerToDisconnect.toString());
this.updatePeers(new Map(this.peers));
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
const newPeer = await this.findAndAddPeers(1);
if (newPeer.length === 0) {
this.log.error(
"Failed to find a new peer to replace the disconnected one."
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
return undefined;
}
this.renewPeersLocker.lock(peerToDisconnect);
const newPeer = await this.findAndAddPeers(1);
if (newPeer.length === 0) {
this.log.error(
"Failed to find a new peer to replace the disconnected one."
);
return undefined;
}
return newPeer[0];
this.renewPeersLocker.lock(peerToDisconnect);
return newPeer[0];
});
}
/**
@ -122,37 +128,39 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
protected hasPeers = async (
options: Partial<ProtocolUseOptions> = {}
): Promise<boolean> => {
const {
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;
return this.peersMutex.runExclusive(async () => {
const {
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
if (this.peers.size < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
);
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
if (this.peers.size < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
);
}
return true;
}
return true;
if (!autoRetry) return false;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
}
if (!autoRetry) return false;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
}
this.log.error("Failed to find peers to send message to");
return false;
this.log.error("Failed to find peers to send message to");
return false;
});
};
/**
@ -181,70 +189,67 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* Maintains the peers list to `numPeersToUse`.
*/
private async maintainPeers(): Promise<boolean> {
if (this.maintainPeersLock) {
return false;
}
return await this.maintainPeersMutex.runExclusive(async () => {
try {
await this.confirmPeers();
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
this.maintainPeersLock = true;
try {
await this.confirmPeers();
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
const numPeersToAdd = this.numPeersToUse - this.peers.size;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
const numPeersToAdd = this.numPeersToUse - this.peers.size;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
this.log.info(
`Peer maintenance completed, current count: ${this.peers.size}`
);
this.renewPeersLocker.cleanUnlocked();
return true;
} catch (error) {
if (error instanceof Error) {
this.log.error("Error during peer maintenance", {
error: error.message,
stack: error.stack
});
} else {
this.log.error("Error during peer maintenance", {
error: String(error)
});
}
return false;
}
this.log.info(
`Peer maintenance completed, current count: ${this.peers.size}`
);
this.renewPeersLocker.cleanUnlocked();
return true;
} catch (error) {
if (error instanceof Error) {
this.log.error("Error during peer maintenance", {
error: error.message,
stack: error.stack
});
} else {
this.log.error("Error during peer maintenance", {
error: String(error)
});
}
return false;
} finally {
this.maintainPeersLock = false;
}
});
}
private async confirmPeers(): Promise<void> {
const connectedPeers = await this.core.connectedPeers();
const currentPeerIds = new Set(this.peers.keys());
await this.confirmPeersMutex.runExclusive(async () => {
const connectedPeers = await this.core.connectedPeers();
const currentPeerIds = new Set(this.peers.keys());
// Peers to add (connected but not in our list)
const peersToAdd = connectedPeers.filter(
(p) => !currentPeerIds.has(p.id.toString())
);
// Peers to add (connected but not in our list)
const peersToAdd = connectedPeers.filter(
(p) => !currentPeerIds.has(p.id.toString())
);
// Peers to remove (in our list but not connected)
const peersToRemove = Array.from(this.peers.values()).filter(
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);
// Peers to remove (in our list but not connected)
const peersToRemove = Array.from(this.peers.values()).filter(
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);
// Add new peers
for (const peer of peersToAdd) {
this.peers.set(peer.id.toString(), peer);
this.log.info(`Added new peer: ${peer.id.toString()}`);
}
// Add new peers
for (const peer of peersToAdd) {
this.peers.set(peer.id.toString(), peer);
this.log.info(`Added new peer: ${peer.id.toString()}`);
}
// Remove disconnected peers
for (const peer of peersToRemove) {
this.peers.delete(peer.id.toString());
this.log.info(`Removed disconnected peer: ${peer.id.toString()}`);
}
// Remove disconnected peers
for (const peer of peersToRemove) {
this.peers.delete(peer.id.toString());
this.log.info(`Removed disconnected peer: ${peer.id.toString()}`);
}
this.updatePeers(Array.from(this.peers.values()));
this.log.info(`Peers confirmed. Current count: ${this.peers.size}`);
this.updatePeers(new Map(this.peers));
this.log.info(`Peers confirmed. Current count: ${this.peers.size}`);
});
}
/**
@ -252,28 +257,30 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);
return this.peersMutex.runExclusive(async () => {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);
await Promise.all(dials);
await Promise.all(dials);
additionalPeers.forEach((peer) =>
this.peers.set(peer.id.toString(), peer)
);
this.updatePeers(this.peers);
additionalPeers.forEach((peer) =>
this.peers.set(peer.id.toString(), peer)
);
this.updatePeers(new Map(this.peers));
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
}
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
}
});
}
/**