mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-10 01:33:13 +00:00
* chore: improvements * chore: add logs for subscription maintenance * chore: update logging * chore: trimming down BaseProtocolCore * chore: track peers in a hashmap instead of array * chore: peer mgmt responds to conenction/disconnection and improve logging * feat: add mutex locks to tackle race conditions over shared state * fix: build * chore: some mutex lock-release improvements * feat: peer manager * chore: rm tests for remove internal util * chore: update HealthManager updates * chore: update tests * rm: only * fix: hasPeers management * chore: add modularity to getting connected peers * chore: improve logs & add debug * chore: renewal doesnt disconnect, only removes * chore: await for sequential operations * chore: add TODO * chore: minor improvements * chore: fix rebase * chore: update playright * chore: remove additional arg * chore: update interafce * feat(peer-manager): unit tests * chore: improve hasPeers() * chore: update lockfile * feat: Filter reacts to peer:disconnect event, add tests * chore: fix lock * chore: update playright * chore: update protocol health for lightpush * chore: remove .only * chore: address comments and improvements * fix: tsconfig
66 lines
2.1 KiB
TypeScript
66 lines
2.1 KiB
TypeScript
import type { Peer, PeerId } from "@libp2p/interface";
|
|
import { CoreProtocolResult, PeerIdStr } from "@waku/interfaces";
|
|
import { Logger } from "@waku/utils";
|
|
|
|
const log = new Logger("sdk:sender:reliability_monitor");
|
|
|
|
const DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL = 3;
|
|
|
|
export class SenderReliabilityMonitor {
|
|
private attempts: Map<PeerIdStr, number> = new Map();
|
|
private readonly maxAttemptsBeforeRenewal =
|
|
DEFAULT_MAX_ATTEMPTS_BEFORE_RENEWAL;
|
|
|
|
public constructor(
|
|
private renewPeer: (peerId: PeerId) => Promise<Peer | undefined>
|
|
) {}
|
|
|
|
public async attemptRetriesOrRenew(
|
|
peerId: PeerId,
|
|
protocolSend: () => Promise<CoreProtocolResult>
|
|
): Promise<void> {
|
|
const peerIdStr = peerId.toString();
|
|
const currentAttempts = this.attempts.get(peerIdStr) || 0;
|
|
this.attempts.set(peerIdStr, currentAttempts + 1);
|
|
|
|
if (currentAttempts + 1 < this.maxAttemptsBeforeRenewal) {
|
|
try {
|
|
const result = await protocolSend();
|
|
if (result.success) {
|
|
log.info(`Successfully sent message after retry to ${peerIdStr}`);
|
|
this.attempts.delete(peerIdStr);
|
|
} else {
|
|
log.error(
|
|
`Failed to send message after retry to ${peerIdStr}: ${result.failure}`
|
|
);
|
|
await this.attemptRetriesOrRenew(peerId, protocolSend);
|
|
}
|
|
} catch (error) {
|
|
log.error(
|
|
`Failed to send message after retry to ${peerIdStr}: ${error}`
|
|
);
|
|
await this.attemptRetriesOrRenew(peerId, protocolSend);
|
|
}
|
|
} else {
|
|
try {
|
|
const newPeer = await this.renewPeer(peerId);
|
|
if (newPeer) {
|
|
log.info(
|
|
`Renewed peer ${peerId.toString()} to ${newPeer.id.toString()}`
|
|
);
|
|
|
|
this.attempts.delete(peerIdStr);
|
|
this.attempts.set(newPeer.id.toString(), 0);
|
|
await protocolSend();
|
|
} else {
|
|
log.error(
|
|
`Failed to renew peer ${peerId.toString()}: New peer is undefined`
|
|
);
|
|
}
|
|
} catch (error) {
|
|
log.error(`Failed to renew peer ${peerId.toString()}: ${error}`);
|
|
}
|
|
}
|
|
}
|
|
}
|