diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index f470bb662c..bcdc56cf73 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -45,6 +45,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; + void this.setupEventListeners(); void this.startMaintainPeersInterval(maintainPeersInterval); } @@ -93,6 +94,17 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } + private setupEventListeners(): void { + this.core.addLibp2pEventListener( + "peer:connect", + () => void this.confirmPeers() + ); + this.core.addLibp2pEventListener( + "peer:disconnect", + () => void this.confirmPeers() + ); + } + /** * Checks if there are peers to send a message to. * If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`. @@ -174,40 +186,65 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } this.maintainPeersLock = true; - await this.confirmPeers(); - this.log.info(`Maintaining peers, current count: ${this.peers.size}`); try { + await this.confirmPeers(); + this.log.info(`Maintaining peers, current count: ${this.peers.size}`); + const numPeersToAdd = this.numPeersToUse - this.peers.size; if (numPeersToAdd > 0) { await this.findAndAddPeers(numPeersToAdd); } + this.log.info( `Peer maintenance completed, current count: ${this.peers.size}` ); this.renewPeersLocker.cleanUnlocked(); + return true; + } catch (error) { + if (error instanceof Error) { + this.log.error("Error during peer maintenance", { + error: error.message, + stack: error.stack + }); + } else { + this.log.error("Error during peer maintenance", { + error: String(error) + }); + } + return false; } finally { this.maintainPeersLock = false; } - return true; } private async confirmPeers(): Promise { const connectedPeers = await this.core.connectedPeers(); - const currentPeers = Array.from(this.peers.values()); + const currentPeerIds = new Set(this.peers.keys()); + // Peers to add (connected but not in our list) const peersToAdd = connectedPeers.filter( - (p) => !currentPeers.some((cp) => cp.id.equals(p.id)) + (p) => !currentPeerIds.has(p.id.toString()) ); - const peersToRemove = currentPeers.filter( + + // Peers to remove (in our list but not connected) + const peersToRemove = Array.from(this.peers.values()).filter( (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) ); - peersToAdd.forEach((p) => this.peers.set(p.id.toString(), p)); - peersToRemove.forEach((p) => { - this.peers.delete(p.id.toString()); - }); + // Add new peers + for (const peer of peersToAdd) { + this.peers.set(peer.id.toString(), peer); + this.log.info(`Added new peer: ${peer.id.toString()}`); + } - this.updatePeers(this.peers); + // Remove disconnected peers + for (const peer of peersToRemove) { + this.peers.delete(peer.id.toString()); + this.log.info(`Removed disconnected peer: ${peer.id.toString()}`); + } + + this.updatePeers(Array.from(this.peers.values())); + this.log.info(`Peers confirmed. Current count: ${this.peers.size}`); } /**