chore: peer mgmt responds to conenction/disconnection and improve logging

This commit is contained in:
Danish Arora 2024-09-24 10:55:34 +05:30
parent 128041b539
commit ac1a0304f3
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E

View File

@ -45,6 +45,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
void this.setupEventListeners();
void this.startMaintainPeersInterval(maintainPeersInterval);
}
@ -93,6 +94,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
}
private setupEventListeners(): void {
this.core.addLibp2pEventListener(
"peer:connect",
() => void this.confirmPeers()
);
this.core.addLibp2pEventListener(
"peer:disconnect",
() => void this.confirmPeers()
);
}
/**
* Checks if there are peers to send a message to.
* If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`.
@ -174,40 +186,65 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
this.maintainPeersLock = true;
await this.confirmPeers();
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
try {
await this.confirmPeers();
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
const numPeersToAdd = this.numPeersToUse - this.peers.size;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
this.log.info(
`Peer maintenance completed, current count: ${this.peers.size}`
);
this.renewPeersLocker.cleanUnlocked();
return true;
} catch (error) {
if (error instanceof Error) {
this.log.error("Error during peer maintenance", {
error: error.message,
stack: error.stack
});
} else {
this.log.error("Error during peer maintenance", {
error: String(error)
});
}
return false;
} finally {
this.maintainPeersLock = false;
}
return true;
}
private async confirmPeers(): Promise<void> {
const connectedPeers = await this.core.connectedPeers();
const currentPeers = Array.from(this.peers.values());
const currentPeerIds = new Set(this.peers.keys());
// Peers to add (connected but not in our list)
const peersToAdd = connectedPeers.filter(
(p) => !currentPeers.some((cp) => cp.id.equals(p.id))
(p) => !currentPeerIds.has(p.id.toString())
);
const peersToRemove = currentPeers.filter(
// Peers to remove (in our list but not connected)
const peersToRemove = Array.from(this.peers.values()).filter(
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);
peersToAdd.forEach((p) => this.peers.set(p.id.toString(), p));
peersToRemove.forEach((p) => {
this.peers.delete(p.id.toString());
});
// Add new peers
for (const peer of peersToAdd) {
this.peers.set(peer.id.toString(), peer);
this.log.info(`Added new peer: ${peer.id.toString()}`);
}
this.updatePeers(this.peers);
// Remove disconnected peers
for (const peer of peersToRemove) {
this.peers.delete(peer.id.toString());
this.log.info(`Removed disconnected peer: ${peer.id.toString()}`);
}
this.updatePeers(Array.from(this.peers.values()));
this.log.info(`Peers confirmed. Current count: ${this.peers.size}`);
}
/**