chore: restructure reliabiltiy monitors

This commit is contained in:
Danish Arora 2024-09-13 15:20:31 +05:30
parent 7ad1d321ca
commit 0fe48693c2
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
7 changed files with 65 additions and 56 deletions

View File

@ -14,9 +14,9 @@ export {
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";
export { wakuStore } from "./protocols/store/index.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";

View File

@ -20,11 +20,12 @@ import {
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";
import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
ReceiverReliabilityMonitor,
ReliabilityMonitorManager
} from "./reliability_monitor.js";
} from "../../reliability_monitor/receiver.js";
import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
const log = new Logger("sdk:filter:subscription_manager");

View File

@ -13,7 +13,7 @@ import {
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol.js";
import { BaseProtocolSDK } from "../base_protocol.js";
const log = new Logger("sdk:light-push");

View File

@ -10,7 +10,7 @@ import {
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol.js";
import { BaseProtocolSDK } from "../base_protocol.js";
const DEFAULT_NUM_PEERS = 1;

View File

@ -0,0 +1,55 @@
import type { Peer, PeerId } from "@libp2p/interface";
import {
ContentTopic,
CoreProtocolResult,
PubsubTopic
} from "@waku/interfaces";
import { ReceiverReliabilityMonitor } from "./receiver.js";
export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}
const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}
private constructor() {}
public static destroy(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
}
public static destroyAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
}
}

View File

@ -21,53 +21,6 @@ const log = new Logger("sdk:receiver:reliability_monitor");
const DEFAULT_MAX_PINGS = 3;
export class ReliabilityMonitorManager {
private static receiverMonitors: Map<
PubsubTopic,
ReceiverReliabilityMonitor
> = new Map();
public static createReceiverMonitor(
pubsubTopic: PubsubTopic,
getPeers: () => Peer[],
renewPeer: (peerId: PeerId) => Promise<Peer>,
getContentTopics: () => ContentTopic[],
protocolSubscribe: (
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
}
const monitor = new ReceiverReliabilityMonitor(
pubsubTopic,
getPeers,
renewPeer,
getContentTopics,
protocolSubscribe
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
}
private constructor() {}
public static destroy(pubsubTopic: PubsubTopic): void {
this.receiverMonitors.delete(pubsubTopic);
}
public static destroyAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}
}
}
export class ReceiverReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();

View File

@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter/index.js";
import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { wakuLightPush } from "./protocols/lightpush/index.js";
import { wakuStore } from "./protocols/store/index.js";
import { ReliabilityMonitorManager } from "./reliability_monitor/index.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;