chore: restructure to use `Receiver` based structuring

This commit is contained in:
Danish Arora 2024-09-11 13:51:47 +05:30
parent b2d2030405
commit 6e0cddf096
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
4 changed files with 32 additions and 22 deletions

View File

@ -32,6 +32,8 @@ export type SubscribeOptions = {
export type IFilter = IReceiver & IBaseProtocolCore; export type IFilter = IReceiver & IBaseProtocolCore;
export interface ISubscriptionSDK { export interface ISubscriptionSDK {
readonly pubsubTopic: PubsubTopic;
subscribe<T extends IDecodedMessage>( subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[], decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>, callback: Callback<T>,

View File

@ -27,7 +27,7 @@ import {
} from "@waku/utils"; } from "@waku/utils";
import { BaseProtocolSDK } from "../base_protocol.js"; import { BaseProtocolSDK } from "../base_protocol.js";
import { MessageReliabilityMonitor } from "../message_reliability_monitor.js"; import { ReceiverReliabilityMonitor } from "../message_reliability_monitor.js";
import { SubscriptionManager } from "./subscription_manager.js"; import { SubscriptionManager } from "./subscription_manager.js";
@ -72,17 +72,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this._connectionManager = connectionManager; this._connectionManager = connectionManager;
} }
public setIncomingMessageHandler(
handler: (
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerIdStr: string
) => void
): void {
this.handleIncomingMessage = handler;
this.protocol.incomingMessageHandler = handler;
}
/** /**
* Opens a subscription with the Filter protocol using the provided decoders and callback. * Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it. * This method combines the functionality of creating a subscription and subscribing to it.
@ -216,7 +205,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
) )
); );
new MessageReliabilityMonitor(this, subscription); new ReceiverReliabilityMonitor(this, subscription);
} }
return { return {
@ -286,6 +275,17 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
return toAsyncIterator(this, decoders); return toAsyncIterator(this, decoders);
} }
public setIncomingMessageHandler(
handler: (
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerIdStr: string
) => void
): void {
this.handleIncomingMessage = handler;
this.protocol.incomingMessageHandler = handler;
}
//TODO: move to SubscriptionManager //TODO: move to SubscriptionManager
private getActiveSubscription( private getActiveSubscription(
pubsubTopic: PubsubTopic pubsubTopic: PubsubTopic

View File

@ -13,15 +13,14 @@ const log = new Logger("sdk:message_reliability_monitor");
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
export class MessageReliabilityManager { export class MessageReliabilityTracker {
public constructor(private filter: IFilterSDK) { public static receiverMonitor: Map<PubsubTopic, ReceiverReliabilityMonitor> =
this.filter.activeSubscriptions.forEach((subscription) => { new Map();
new MessageReliabilityMonitor(this.filter, subscription);
}); public constructor() {}
}
} }
export class MessageReliabilityMonitor { export class ReceiverReliabilityMonitor {
private receivedMessagesHashStr: string[] = []; private receivedMessagesHashStr: string[] = [];
private receivedMessagesHashes: { private receivedMessagesHashes: {
all: Set<string>; all: Set<string>;
@ -34,6 +33,11 @@ export class MessageReliabilityMonitor {
private filter: IFilterSDK, private filter: IFilterSDK,
private subscription: ISubscriptionSDK private subscription: ISubscriptionSDK
) { ) {
MessageReliabilityTracker.receiverMonitor.set(
this.subscription.pubsubTopic,
this
);
this.receivedMessagesHashes = { this.receivedMessagesHashes = {
all: new Set(), all: new Set(),
nodes: {} nodes: {}
@ -42,6 +46,12 @@ export class MessageReliabilityMonitor {
this.filter.setIncomingMessageHandler(this.handleFilterMessage.bind(this)); this.filter.setIncomingMessageHandler(this.handleFilterMessage.bind(this));
} }
public destructor(): void {
MessageReliabilityTracker.receiverMonitor.delete(
this.subscription.pubsubTopic
);
}
private handleFilterMessage( private handleFilterMessage(
pubsubTopic: PubsubTopic, pubsubTopic: PubsubTopic,
message: WakuMessage, message: WakuMessage,

View File

@ -18,7 +18,6 @@ import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter/index.js"; import { wakuFilter } from "./protocols/filter/index.js";
import { wakuLightPush } from "./protocols/light_push.js"; import { wakuLightPush } from "./protocols/light_push.js";
import { MessageReliabilityManager } from "./protocols/message_reliability_monitor.js";
import { wakuStore } from "./protocols/store.js"; import { wakuStore } from "./protocols/store.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultPingKeepAliveValueSecs = 5 * 60;
@ -116,7 +115,6 @@ export class WakuNode implements Waku {
if (protocolsEnabled.filter) { if (protocolsEnabled.filter) {
const filter = wakuFilter(this.connectionManager); const filter = wakuFilter(this.connectionManager);
this.filter = filter(libp2p); this.filter = filter(libp2p);
new MessageReliabilityManager(this.filter);
} }
log.info( log.info(