From 0245f060f5dbeb83af5c8dc594a0456b3c0d4874 Mon Sep 17 00:00:00 2001 From: Sasha Date: Wed, 9 Oct 2024 01:38:38 +0200 Subject: [PATCH] feat: use reliability manager in lightpush only if retry passed, imrpvoe implementation --- .../src/protocols/light_push/light_push.ts | 6 +- packages/sdk/src/reliability_monitor/index.ts | 4 +- .../sdk/src/reliability_monitor/sender.ts | 73 ++++++++----------- 3 files changed, 38 insertions(+), 45 deletions(-) diff --git a/packages/sdk/src/protocols/light_push/light_push.ts b/packages/sdk/src/protocols/light_push/light_push.ts index 7007ff28d5..17d5fa2933 100644 --- a/packages/sdk/src/protocols/light_push/light_push.ts +++ b/packages/sdk/src/protocols/light_push/light_push.ts @@ -104,10 +104,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush { connectedPeer.id.equals(failure.peerId) ); - if (connectedPeer) { + if (connectedPeer && _options?.autoRetry) { void this.reliabilityMonitor.attemptRetriesOrRenew( - connectedPeer.id, - () => this.protocol.send(encoder, message, connectedPeer) + connectedPeer, + (peer) => this.protocol.send(encoder, message, peer) ); } } diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index 1e420dd921..4bbc43708f 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -1,4 +1,4 @@ -import type { Peer, PeerId } from "@libp2p/interface"; +import type { Connection, Peer, PeerId } from "@libp2p/interface"; import { ContentTopic, CoreProtocolResult, @@ -42,10 +42,12 @@ export class ReliabilityMonitorManager { } public static createSenderMonitor( + getConnection: () => Connection[], renewPeer: (peerId: PeerId) => Promise ): SenderReliabilityMonitor { if (!ReliabilityMonitorManager.senderMonitor) { ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( + getConnection, renewPeer ); } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts index 0ffe9a1659..438dbf75cd 100644 --- a/packages/sdk/src/reliability_monitor/sender.ts +++ b/packages/sdk/src/reliability_monitor/sender.ts @@ -1,56 +1,47 @@ -import type { Peer, PeerId } from "@libp2p/interface"; -import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces"; +import type { Connection, Peer, PeerId } from "@libp2p/interface"; +import { CoreProtocolResult } from "@waku/interfaces"; import { Logger } from "@waku/utils"; const log = new Logger("sdk:sender:reliability_monitor"); -const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3; +const DEFAULT_MAX_ATTEMPTS = 3; export class SenderReliabilityMonitor { - private attempts: Map = new Map(); - private readonly maxAttemptsBeforeRenewal = - DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; - - public constructor(private renewPeer: (peerId: PeerId) => Promise) {} + public constructor( + private getConnections: () => Connection[], + private renewPeer: (peerId: PeerId) => Promise + ) {} public async attemptRetriesOrRenew( - peerId: PeerId, - protocolSend: () => Promise + peerToUse: Peer, + protocolSend: (p: Peer) => Promise ): Promise { - const peerIdStr = peerId.toString(); - const currentAttempts = this.attempts.get(peerIdStr) || 0; - this.attempts.set(peerIdStr, currentAttempts + 1); - - if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) { - try { - const result = await protocolSend(); - if (result.success) { - log.info(`Successfully sent message after retry to ${peerIdStr}`); - this.attempts.delete(peerIdStr); - } else { - log.error( - `Failed to send message after retry to ${peerIdStr}: ${result.failure}` - ); - await this.attemptRetriesOrRenew(peerId, protocolSend); + let forceRenew = false; + for (let i = 0; i < DEFAULT_MAX_ATTEMPTS; i++) { + const connections = this.getConnections(); + if ( + forceRenew || + !connections.find((c) => c.remotePeer.equals(peerToUse.id)) + ) { + try { + peerToUse = await this.renewPeer(peerToUse.id); + forceRenew = false; + } catch (e) { + log.error(`Failed to renew peer ${peerToUse.id.toString()}: ${e}`); + return; } - } catch (error) { - log.error( - `Failed to send message after retry to ${peerIdStr}: ${error}` - ); - await this.attemptRetriesOrRenew(peerId, protocolSend); } - } else { - try { - const newPeer = await this.renewPeer(peerId); - log.info( - `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` - ); - this.attempts.delete(peerIdStr); - this.attempts.set(newPeer.id.toString(), 0); - await protocolSend(); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); + try { + const result = await protocolSend(peerToUse); + + if (result.success) { + return; + } + + forceRenew = true; + } catch (_e) { + continue; } } }