diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index a7ed7c386f..f470bb662c 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -4,6 +4,7 @@ import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { IBaseProtocolSDK, IHealthManager, + PeerIdStr, ProtocolUseOptions } from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; @@ -20,7 +21,7 @@ const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { protected healthManager: IHealthManager; public readonly numPeersToUse: number; - private peers: Peer[] = []; + private peers: Map = new Map(); private maintainPeersIntervalId: ReturnType< typeof window.setInterval > | null = null; @@ -48,7 +49,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } public get connectedPeers(): Peer[] { - return this.peers; + return Array.from(this.peers.values()); } /** @@ -59,12 +60,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { public async renewPeer(peerToDisconnect: PeerId): Promise { this.log.info(`Renewing peer ${peerToDisconnect}`); - const updatedPeers = this.peers.filter( - (peer) => !peer.id.equals(peerToDisconnect) - ); - this.updatePeers(updatedPeers); await this.connectionManager.dropConnection(peerToDisconnect); + this.peers.delete(peerToDisconnect.toString()); + this.updatePeers(this.peers); + this.log.info( `Peer ${peerToDisconnect} disconnected and removed from the peer list` ); @@ -124,9 +124,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { while (attempts < maxAttempts) { attempts++; if (await this.maintainPeers()) { - if (this.peers.length < this.numPeersToUse) { + if (this.peers.size < this.numPeersToUse) { this.log.warn( - `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` + `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` ); } return true; @@ -175,14 +175,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.maintainPeersLock = true; await this.confirmPeers(); - this.log.info(`Maintaining peers, current count: ${this.peers.length}`); + this.log.info(`Maintaining peers, current count: ${this.peers.size}`); try { - const numPeersToAdd = this.numPeersToUse - this.peers.length; + const numPeersToAdd = this.numPeersToUse - this.peers.size; if (numPeersToAdd > 0) { await this.findAndAddPeers(numPeersToAdd); } this.log.info( - `Peer maintenance completed, current count: ${this.peers.length}` + `Peer maintenance completed, current count: ${this.peers.size}` ); this.renewPeersLocker.cleanUnlocked(); } finally { @@ -193,7 +193,8 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { private async confirmPeers(): Promise { const connectedPeers = await this.core.connectedPeers(); - const currentPeers = this.peers; + const currentPeers = Array.from(this.peers.values()); + const peersToAdd = connectedPeers.filter( (p) => !currentPeers.some((cp) => cp.id.equals(p.id)) ); @@ -201,10 +202,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) ); - peersToAdd.forEach((p) => this.peers.push(p)); + peersToAdd.forEach((p) => this.peers.set(p.id.toString(), p)); peersToRemove.forEach((p) => { - const index = this.peers.findIndex((cp) => cp.id.equals(p.id)); - if (index !== -1) this.peers.splice(index, 1); + this.peers.delete(p.id.toString()); }); this.updatePeers(this.peers); @@ -224,11 +224,13 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { await Promise.all(dials); - const updatedPeers = [...this.peers, ...additionalPeers]; - this.updatePeers(updatedPeers); + additionalPeers.forEach((peer) => + this.peers.set(peer.id.toString(), peer) + ); + this.updatePeers(this.peers); this.log.info( - `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` + `Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}` ); return additionalPeers; } catch (error) { @@ -254,9 +256,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } newPeers = newPeers - .filter( - (peer) => this.peers.some((p) => p.id.equals(peer.id)) === false - ) + .filter((peer) => !this.peers.has(peer.id.toString())) .filter((peer) => !this.renewPeersLocker.isLocked(peer.id)) .slice(0, numPeers); @@ -267,11 +267,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } - private updatePeers(peers: Peer[]): void { + private updatePeers(peers: Map): void { this.peers = peers; this.healthManager.updateProtocolHealth( this.core.multicodec, - this.peers.length + this.peers.size ); } } @@ -296,7 +296,7 @@ class RenewPeerLocker { } public cleanUnlocked(): void { - Object.entries(this.peers).forEach(([id, lock]) => { + Array.from(this.peers.entries()).forEach(([id, lock]) => { if (this.isTimeUnlocked(lock)) { this.peers.delete(id.toString()); }