chore: track peers in a hashmap instead of array

This commit is contained in:
Danish Arora 2024-09-23 14:44:33 +05:30
parent 6117593d72
commit 128041b539
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E

View File

@ -4,6 +4,7 @@ import { BaseProtocol } from "@waku/core/lib/base_protocol";
import {
IBaseProtocolSDK,
IHealthManager,
PeerIdStr,
ProtocolUseOptions
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
@ -20,7 +21,7 @@ const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
export class BaseProtocolSDK implements IBaseProtocolSDK {
protected healthManager: IHealthManager;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private peers: Map<PeerIdStr, Peer> = new Map();
private maintainPeersIntervalId: ReturnType<
typeof window.setInterval
> | null = null;
@ -48,7 +49,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
public get connectedPeers(): Peer[] {
return this.peers;
return Array.from(this.peers.values());
}
/**
@ -59,12 +60,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
this.updatePeers(updatedPeers);
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers.delete(peerToDisconnect.toString());
this.updatePeers(this.peers);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
@ -124,9 +124,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
while (attempts < maxAttempts) {
attempts++;
if (await this.maintainPeers()) {
if (this.peers.length < this.numPeersToUse) {
if (this.peers.size < this.numPeersToUse) {
this.log.warn(
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}`
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
);
}
return true;
@ -175,14 +175,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
this.maintainPeersLock = true;
await this.confirmPeers();
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
try {
const numPeersToAdd = this.numPeersToUse - this.peers.length;
const numPeersToAdd = this.numPeersToUse - this.peers.size;
if (numPeersToAdd > 0) {
await this.findAndAddPeers(numPeersToAdd);
}
this.log.info(
`Peer maintenance completed, current count: ${this.peers.length}`
`Peer maintenance completed, current count: ${this.peers.size}`
);
this.renewPeersLocker.cleanUnlocked();
} finally {
@ -193,7 +193,8 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
private async confirmPeers(): Promise<void> {
const connectedPeers = await this.core.connectedPeers();
const currentPeers = this.peers;
const currentPeers = Array.from(this.peers.values());
const peersToAdd = connectedPeers.filter(
(p) => !currentPeers.some((cp) => cp.id.equals(p.id))
);
@ -201,10 +202,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
);
peersToAdd.forEach((p) => this.peers.push(p));
peersToAdd.forEach((p) => this.peers.set(p.id.toString(), p));
peersToRemove.forEach((p) => {
const index = this.peers.findIndex((cp) => cp.id.equals(p.id));
if (index !== -1) this.peers.splice(index, 1);
this.peers.delete(p.id.toString());
});
this.updatePeers(this.peers);
@ -224,11 +224,13 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
await Promise.all(dials);
const updatedPeers = [...this.peers, ...additionalPeers];
this.updatePeers(updatedPeers);
additionalPeers.forEach((peer) =>
this.peers.set(peer.id.toString(), peer)
);
this.updatePeers(this.peers);
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}`
);
return additionalPeers;
} catch (error) {
@ -254,9 +256,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
newPeers = newPeers
.filter(
(peer) => this.peers.some((p) => p.id.equals(peer.id)) === false
)
.filter((peer) => !this.peers.has(peer.id.toString()))
.filter((peer) => !this.renewPeersLocker.isLocked(peer.id))
.slice(0, numPeers);
@ -267,11 +267,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
}
}
private updatePeers(peers: Peer[]): void {
private updatePeers(peers: Map<PeerIdStr, Peer>): void {
this.peers = peers;
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.length
this.peers.size
);
}
}
@ -296,7 +296,7 @@ class RenewPeerLocker {
}
public cleanUnlocked(): void {
Object.entries(this.peers).forEach(([id, lock]) => {
Array.from(this.peers.entries()).forEach(([id, lock]) => {
if (this.isTimeUnlocked(lock)) {
this.peers.delete(id.toString());
}