feat: setup sender monitor

This commit is contained in:
Danish Arora 2024-09-13 15:27:55 +05:30
parent 0fe48693c2
commit c0e6e05de8
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
3 changed files with 99 additions and 12 deletions

View File

@ -13,6 +13,8 @@ import {
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { SenderReliabilityMonitor } from "../../reliability_monitor/sender.js";
import { BaseProtocolSDK } from "../base_protocol.js";
const log = new Logger("sdk:light-push");
@ -20,6 +22,8 @@ const log = new Logger("sdk:light-push");
class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
public readonly protocol: LightPushCore;
private readonly reliabilityMonitor: SenderReliabilityMonitor;
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
@ -33,6 +37,10 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
);
this.reliabilityMonitor = ReliabilityMonitorManager.createSenderMonitor(
this.renewPeer.bind(this)
);
this.protocol = this.core as LightPushCore;
}
@ -89,16 +97,23 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
successes.push(success);
}
if (failure) {
failures.push(failure);
if (failure.peerId) {
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
const peer = this.connectedPeers.find((connectedPeer) =>
connectedPeer.id.equals(failure.peerId)
);
if (peer) {
log.info(`
Failed to send message to peer ${failure.peerId}.
Retrying the message with the same peer in the background.
If this fails, the peer will be renewed.
`);
void this.reliabilityMonitor.attemptRetriesOrRenew(
failure.peerId,
() => this.protocol.send(encoder, message, peer)
);
}
}
failures.push(failure);
}
} else {
log.error("Failed unexpectedly while sending:", result.reason);

View File

@ -6,12 +6,14 @@ import {
} from "@waku/interfaces";
import { ReceiverReliabilityMonitor } from "./receiver.js";
import { SenderReliabilityMonitor } from "./sender.js";
export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
private static senderMonitor: SenderReliabilityMonitor | undefined;
public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
@ -39,17 +41,30 @@ export class ReliabilityMonitorManager {
return monitor;
}
private constructor() {}
public static destroy(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
public static createSenderMonitor(
renewPeer: (peerId: PeerId) => Promise<Peer>
): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
renewPeer
);
}
return ReliabilityMonitorManager.senderMonitor;
}
public static destroyAll(): void {
private constructor() {}
public static stop(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}
public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
this.senderMonitor = undefined;
}
}
}

View File

@ -0,0 +1,57 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces";
import { Logger } from "@waku/utils";
const log = new Logger("sdk:sender:reliability_monitor");
const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3;
export class SenderReliabilityMonitor {
private attempts: Map<PeerIdStr, number> = new Map();
private readonly maxAttemptsBeforeRenewal =
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;
public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}
public async attemptRetriesOrRenew(
peerId: PeerId,
protocolSend: () => Promise<CoreProtocolResult>
): Promise<void> {
const peerIdStr = peerId.toString();
const currentAttempts = this.attempts.get(peerIdStr) || 0;
this.attempts.set(peerIdStr, currentAttempts + 1);
if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) {
try {
const result = await protocolSend();
if (result.success) {
log.info(`Successfully sent message after retry to ${peerIdStr}`);
this.attempts.delete(peerIdStr);
} else {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${result.failure}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} catch (error) {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${error}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
}
} else {
try {
const newPeer = await this.renewPeer(peerId);
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);
this.attempts.delete(peerIdStr);
this.attempts.set(newPeer.id.toString(), 0);
await protocolSend();
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
}
}
}
}