chore: some mutex lock-release improvements

This commit is contained in:
Danish Arora 2024-09-24 13:11:51 +05:30
parent 557ee96b52
commit 558fb6ae2e
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
3 changed files with 115 additions and 95 deletions

View File

@ -32,8 +32,6 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
RENEW_TIME_LOCK_DURATION
);
private maintainPeersMutex = new Mutex();
private confirmPeersMutex = new Mutex();
private peersMutex = new Mutex();
public constructor(
@ -49,7 +47,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
void this.setupEventListeners();
// void this.setupEventListeners();
void this.startMaintainPeersInterval(maintainPeersInterval);
}
@ -100,16 +98,16 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
}
private setupEventListeners(): void {
this.core.addLibp2pEventListener(
"peer:connect",
() => void this.confirmPeers()
);
this.core.addLibp2pEventListener(
"peer:disconnect",
() => void this.confirmPeers()
);
}
// private setupEventListeners(): void {
// this.core.addLibp2pEventListener(
// "peer:connect",
// () => void this.confirmPeers()
// );
// this.core.addLibp2pEventListener(
// "peer:disconnect",
// () => void this.confirmPeers()
// );
// }
/**
* Checks if there are peers to send a message to.
@ -128,39 +126,53 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
protected hasPeers = async (
options: Partial<ProtocolUseOptions> = {}
): Promise<boolean> => {
return this.peersMutex.runExclusive(async () => {
const {
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;
const {
autoRetry = false,
forceUseAllPeers = false,
initialDelay = 10,
maxAttempts = 3,
maxDelay = 100
} = options;
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;
let needsMaintenance: boolean;
let currentPeerCount: number;
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
const release = await this.peersMutex.acquire();
try {
currentPeerCount = this.connectedPeers.length;
needsMaintenance = forceUseAllPeers || currentPeerCount === 0;
} finally {
release();
}
if (!needsMaintenance) return true;
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
const finalRelease = await this.peersMutex.acquire();
try {
if (this.peers.size < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
);
}
return true;
} finally {
finalRelease();
}
if (!autoRetry) return false;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
}
if (!autoRetry) return false;
const delayMs = Math.min(
initialDelay * Math.pow(2, attempts - 1),
maxDelay
);
await delay(delayMs);
}
this.log.error("Failed to find peers to send message to");
return false;
});
this.log.error("Failed to find peers to send message to");
return false;
};
/**
@ -189,52 +201,54 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* Maintains the peers list to `numPeersToUse`.
*/
private async maintainPeers(): Promise<boolean> {
return await this.maintainPeersMutex.runExclusive(async () => {
try {
await this.confirmPeers();
try {
await this.confirmPeers();
const numPeersToAdd = await this.peersMutex.runExclusive(() => {
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
return this.numPeersToUse - this.peers.size;
});
const numPeersToAdd = this.numPeersToUse - this.peers.size;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
if (numPeersToAdd < 0) {
this.log.warn(`
Peer maintenance completed, but there are more than ${this.numPeersToUse} peers.
This should not happen.
`);
}
await this.peersMutex.runExclusive(() => {
this.log.info(
`Peer maintenance completed, current count: ${this.peers.size}`
);
this.renewPeersLocker.cleanUnlocked();
return true;
} catch (error) {
if (error instanceof Error) {
this.log.error("Error during peer maintenance", {
error: error.message,
stack: error.stack
});
} else {
this.log.error("Error during peer maintenance", {
error: String(error)
});
}
return false;
}
});
});
return true;
} catch (error) {
this.log.error("Error during peer maintenance", error);
return false;
}
}
private async confirmPeers(): Promise<void> {
await this.confirmPeersMutex.runExclusive(async () => {
const connectedPeers = await this.core.connectedPeers();
const currentPeerIds = new Set(this.peers.keys());
const connectedPeers = await this.core.connectedPeers();
const currentPeerIds = new Set(this.peers.keys());
// Peers to add (connected but not in our list)
const peersToAdd = connectedPeers.filter(
(p) => !currentPeerIds.has(p.id.toString())
);
// Peers to add (connected but not in our list)
const peersToAdd = connectedPeers.filter(
(p) => !currentPeerIds.has(p.id.toString())
);
// Peers to remove (in our list but not connected)
const peersToRemove = Array.from(this.peers.values()).filter(
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);
// Peers to remove (in our list but not connected)
const peersToRemove = Array.from(this.peers.values()).filter(
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);
await this.peersMutex.runExclusive(async () => {
// Add new peers
for (const peer of peersToAdd) {
this.peers.set(peer.id.toString(), peer);
@ -257,30 +271,33 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
return this.peersMutex.runExclusive(async () => {
let newPeers: Peer[];
const release = await this.peersMutex.acquire();
try {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
const dials = additionalPeers.map((peer) =>
this.connectionManager.attemptDial(peer.id)
);
newPeers = await this.findAdditionalPeers(numPeers);
} finally {
release();
}
await Promise.all(dials);
const dials = await Promise.all(
newPeers.map((peer) => this.connectionManager.attemptDial(peer.id))
);
additionalPeers.forEach((peer) =>
this.peers.set(peer.id.toString(), peer)
);
this.updatePeers(new Map(this.peers));
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
}
});
const finalRelease = await this.peersMutex.acquire();
try {
const successfulPeers = newPeers.filter((_, index) => dials[index]);
successfulPeers.forEach((peer) =>
this.peers.set(peer.id.toString(), peer)
);
this.updatePeers(new Map(this.peers));
this.log.info(
`Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}`
);
return successfulPeers;
} finally {
finalRelease();
}
}
/**

View File

@ -72,6 +72,7 @@ const runTests = (strictNodeCheck: boolean): void => {
const pushResponse = await waku.lightPush.send(TestEncoder, {
payload: utf8ToBytes(generateMessageText(i))
});
expect(pushResponse.successes.length).to.eq(numServiceNodes);
}

View File

@ -47,12 +47,14 @@ describe("Waku Light Push: Connection Management: E2E", function () {
expect(failures?.length || 0).to.equal(0);
});
it("should push to available amount of connection if less than required", async function () {
const connections = waku.libp2p.getConnections();
await Promise.all(
connections
.slice(0, connections.length - 1)
.map((c) => waku.connectionManager.dropConnection(c.remotePeer))
it.only("Failed peers are renewed", async function () {
// send a lightpush request -- should have all successes
const response1 = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(response1.successes.length).to.be.equal(
waku.lightPush.numPeersToUse
);
const { successes, failures } = await waku.lightPush.send(encoder, {