diff --git a/packages/sdk/src/protocols/lightpush/index.ts b/packages/sdk/src/protocols/lightpush/index.ts index 2cfdadde01..f3be55d54a 100644 --- a/packages/sdk/src/protocols/lightpush/index.ts +++ b/packages/sdk/src/protocols/lightpush/index.ts @@ -13,6 +13,8 @@ import { } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; +import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; +import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js"; import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); @@ -20,6 +22,8 @@ const log = new Logger("sdk:light-push"); class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { public readonly protocol: LightPushCore; + private readonly reliabilityMonitor: SenderReliabilityMonitor; + public constructor( connectionManager: ConnectionManager, libp2p: Libp2p, @@ -33,6 +37,10 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } ); + this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor( + this.renewPeer.bind(this) + ); + this.protocol = this.core as LightPushCore; } @@ -89,16 +97,23 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { successes.push(success); } if (failure) { + failures.push(failure); if (failure.peerId) { - try { - await this.renewPeer(failure.peerId); - log.info("Renewed peer", failure.peerId.toString()); - } catch (error) { - log.error("Failed to renew peer", error); + const peer = this.connectedPeers.find((connectedPeer) => + connectedPeer.id.equals(failure.peerId) + ); + if (peer) { + log.info(` + Failed to send message to peer ${failure.peerId}. + Retrying the message with the same peer in the background. + If this fails, the peer will be renewed. + `); + void this.reliabilityMonitor.attemptRetriesOrRenew( + failure.peerId, + () => this.protocol.send(encoder, message, peer) + ); } } - - failures.push(failure); } } else { log.error("Failed unexpectedly while sending:", result.reason); diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts index 936a54700f..1e420dd921 100644 --- a/packages/sdk/src/reliability_monitor/index.ts +++ b/packages/sdk/src/reliability_monitor/index.ts @@ -6,12 +6,14 @@ import { } from "@waku/interfaces"; import { ReceiverReliabilityMonitor } from "./receiver.js"; +import { SenderReliabilityMonitor } from "./sender.js"; export class ReliabilityMonitorManager { private static receiverMonitors: Map< PubsubTopic, ReceiverReliabilityMonitor > = new Map(); + private static senderMonitor: SenderReliabilityMonitor | undefined; public static createReceiverMonitor( pubsubTopic: PubsubTopic, @@ -39,17 +41,30 @@ export class ReliabilityMonitorManager { return monitor; } - private constructor() {} - - public static destroy(pubsubTopic: PubsubTopic): void { - this.receiverMonitors.delete(pubsubTopic); + public static createSenderMonitor( + renewPeer: (peerId: PeerId) => Promise + ): SenderReliabilityMonitor { + if (!ReliabilityMonitorManager.senderMonitor) { + ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( + renewPeer + ); + } + return ReliabilityMonitorManager.senderMonitor; } - public static destroyAll(): void { + private constructor() {} + + public static stop(pubsubTopic: PubsubTopic): void { + this.receiverMonitors.delete(pubsubTopic); + this.senderMonitor = undefined; + } + + public static stopAll(): void { for (const [pubsubTopic, monitor] of this.receiverMonitors) { monitor.setMaxMissedMessagesThreshold(undefined); monitor.setMaxPingFailures(undefined); this.receiverMonitors.delete(pubsubTopic); + this.senderMonitor = undefined; } } } diff --git a/packages/sdk/src/reliability_monitor/sender.ts b/packages/sdk/src/reliability_monitor/sender.ts new file mode 100644 index 0000000000..0ffe9a1659 --- /dev/null +++ b/packages/sdk/src/reliability_monitor/sender.ts @@ -0,0 +1,57 @@ +import type { Peer, PeerId } from "@libp2p/interface"; +import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +const log = new Logger("sdk:sender:reliability_monitor"); + +const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3; + +export class SenderReliabilityMonitor { + private attempts: Map = new Map(); + private readonly maxAttemptsBeforeRenewal = + DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; + + public constructor(private renewPeer: (peerId: PeerId) => Promise) {} + + public async attemptRetriesOrRenew( + peerId: PeerId, + protocolSend: () => Promise + ): Promise { + const peerIdStr = peerId.toString(); + const currentAttempts = this.attempts.get(peerIdStr) || 0; + this.attempts.set(peerIdStr, currentAttempts + 1); + + if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) { + try { + const result = await protocolSend(); + if (result.success) { + log.info(`Successfully sent message after retry to ${peerIdStr}`); + this.attempts.delete(peerIdStr); + } else { + log.error( + `Failed to send message after retry to ${peerIdStr}: ${result.failure}` + ); + await this.attemptRetriesOrRenew(peerId, protocolSend); + } + } catch (error) { + log.error( + `Failed to send message after retry to ${peerIdStr}: ${error}` + ); + await this.attemptRetriesOrRenew(peerId, protocolSend); + } + } else { + try { + const newPeer = await this.renewPeer(peerId); + log.info( + `Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}` + ); + + this.attempts.delete(peerIdStr); + this.attempts.set(newPeer.id.toString(), 0); + await protocolSend(); + } catch (error) { + log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); + } + } + } +}