feat: use reliability manager in lightpush only if retry passed, imrpvoe implementation

This commit is contained in:
Sasha 2024-10-09 01:38:38 +02:00
parent 75fcca4cd9
commit 0245f060f5
No known key found for this signature in database
3 changed files with 38 additions and 45 deletions

View File

@ -104,10 +104,10 @@ class LightPush extends BaseProtocolSDK implements ILightPush {
connectedPeer.id.equals(failure.peerId) connectedPeer.id.equals(failure.peerId)
); );
if (connectedPeer) { if (connectedPeer && _options?.autoRetry) {
void this.reliabilityMonitor.attemptRetriesOrRenew( void this.reliabilityMonitor.attemptRetriesOrRenew(
connectedPeer.id, connectedPeer,
() => this.protocol.send(encoder, message, connectedPeer) (peer) => this.protocol.send(encoder, message, peer)
); );
} }
} }

View File

@ -1,4 +1,4 @@
import type { Peer, PeerId } from "@libp2p/interface"; import type { Connection, Peer, PeerId } from "@libp2p/interface";
import { import {
ContentTopic, ContentTopic,
CoreProtocolResult, CoreProtocolResult,
@ -42,10 +42,12 @@ export class ReliabilityMonitorManager {
} }
public static createSenderMonitor( public static createSenderMonitor(
getConnection: () => Connection[],
renewPeer: (peerId: PeerId) => Promise<Peer> renewPeer: (peerId: PeerId) => Promise<Peer>
): SenderReliabilityMonitor { ): SenderReliabilityMonitor {
if (!ReliabilityMonitorManager.senderMonitor) { if (!ReliabilityMonitorManager.senderMonitor) {
ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor( ReliabilityMonitorManager.senderMonitor = new SenderReliabilityMonitor(
getConnection,
renewPeer renewPeer
); );
} }

View File

@ -1,56 +1,47 @@
import type { Peer, PeerId } from "@libp2p/interface"; import type { Connection, Peer, PeerId } from "@libp2p/interface";
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces"; import { CoreProtocolResult } from "@waku/interfaces";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
const log = new Logger("sdk:sender:reliability_monitor"); const log = new Logger("sdk:sender:reliability_monitor");
const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3; const DEFAULT_MAX_ATTEMPTS = 3;
export class SenderReliabilityMonitor { export class SenderReliabilityMonitor {
private attempts: Map<PeerIdStr, number> = new Map(); public constructor(
private readonly maxAttemptsBeforeRenewal = private getConnections: () => Connection[],
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL; private renewPeer: (peerId: PeerId) => Promise<Peer>
) {}
public constructor(private renewPeer: (peerId: PeerId) => Promise<Peer>) {}
public async attemptRetriesOrRenew( public async attemptRetriesOrRenew(
peerId: PeerId, peerToUse: Peer,
protocolSend: () => Promise<CoreProtocolResult> protocolSend: (p: Peer) => Promise<CoreProtocolResult>
): Promise<void> { ): Promise<void> {
const peerIdStr = peerId.toString(); let forceRenew = false;
const currentAttempts = this.attempts.get(peerIdStr) || 0; for (let i = 0; i < DEFAULT_MAX_ATTEMPTS; i++) {
this.attempts.set(peerIdStr, currentAttempts + 1); const connections = this.getConnections();
if (
if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) { forceRenew ||
try { !connections.find((c) => c.remotePeer.equals(peerToUse.id))
const result = await protocolSend(); ) {
if (result.success) { try {
log.info(`Successfully sent message after retry to ${peerIdStr}`); peerToUse = await this.renewPeer(peerToUse.id);
this.attempts.delete(peerIdStr); forceRenew = false;
} else { } catch (e) {
log.error( log.error(`Failed to renew peer ${peerToUse.id.toString()}: ${e}`);
`Failed to send message after retry to ${peerIdStr}: ${result.failure}` return;
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
} }
} catch (error) {
log.error(
`Failed to send message after retry to ${peerIdStr}: ${error}`
);
await this.attemptRetriesOrRenew(peerId, protocolSend);
} }
} else {
try {
const newPeer = await this.renewPeer(peerId);
log.info(
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
);
this.attempts.delete(peerIdStr); try {
this.attempts.set(newPeer.id.toString(), 0); const result = await protocolSend(peerToUse);
await protocolSend();
} catch (error) { if (result.success) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`); return;
}
forceRenew = true;
} catch (_e) {
continue;
} }
} }
} }