move subscribe options to createLightNode Fitler protocol options

This commit is contained in:
Sasha 2024-10-27 18:13:26 +01:00
parent 7eb67ff751
commit 09127dfc91
No known key found for this signature in database
7 changed files with 74 additions and 45 deletions

View File

@ -15,17 +15,34 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
callback: Callback<T>;
};
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
enableLightPushFilterCheck?: boolean;
export type FilterProtocolOptions = {
/**
* Interval with which Filter subscription will attempt to send ping requests to subscribed peers.
*
* @default 60_000
*/
keepAliveIntervalMs: number;
/**
* Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one.
*
* @default 3
*/
pingsBeforePeerRenewed: number;
/**
* Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription.
* In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer.
*
* @default false
*/
enableLightPushFilterCheck: boolean;
};
export interface ISubscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options?: SubscribeOptions
callback: Callback<T>
): Promise<SDKProtocolResult>;
unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
@ -38,8 +55,7 @@ export interface ISubscription {
export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
subscribeOptions?: SubscribeOptions
callback: Callback<T>
): Promise<SubscribeResult>;
};

View File

@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { ConnectionManagerOptions } from "./connection_manager.js";
import type { FilterProtocolOptions } from "./filter.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { ThisAndThat, ThisOrThat } from "./misc.js";
@ -86,9 +87,16 @@ export type ProtocolCreateOptions = {
bootstrapPeers?: string[];
/**
* Configuration for connection manager. If not specified - default values are applied.
* Configuration for connection manager.
* If not specified - default values are applied.
*/
connectionManager?: Partial<ConnectionManagerOptions>;
/**
* Configuration for Filter protocol.
* If not specified - default values are applied.
*/
filter?: Partial<FilterProtocolOptions>;
};
export type Callback<T extends IDecodedMessage> = (

View File

@ -1,8 +1,4 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_MAX_PINGS = 3;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;
export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};

View File

@ -2,6 +2,7 @@ import { ConnectionManager, FilterCore } from "@waku/core";
import type {
Callback,
CreateSubscriptionResult,
FilterProtocolOptions,
IAsyncIterator,
IDecodedMessage,
IDecoder,
@ -10,7 +11,6 @@ import type {
IProtoMessage,
Libp2p,
PubsubTopic,
SubscribeOptions,
SubscribeResult,
Unsubscribe
} from "@waku/interfaces";
@ -25,15 +25,16 @@ import {
import { PeerManager } from "../peer_manager.js";
import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import { MessageCache } from "./message_cache.js";
import { SubscriptionManager } from "./subscription_manager.js";
import { buildConfig } from "./utils.js";
const log = new Logger("sdk:filter");
class Filter implements IFilter {
public readonly protocol: FilterCore;
private readonly config: FilterProtocolOptions;
private readonly messageCache: MessageCache;
private activeSubscriptions = new Map<string, SubscriptionManager>();
@ -41,8 +42,10 @@ class Filter implements IFilter {
private connectionManager: ConnectionManager,
private libp2p: Libp2p,
private peerManager: PeerManager,
private lightPush?: ILightPush
private lightPush?: ILightPush,
config?: Partial<FilterProtocolOptions>
) {
this.config = buildConfig(config);
this.messageCache = new MessageCache(libp2p);
this.protocol = new FilterCore(
@ -79,7 +82,6 @@ class Filter implements IFilter {
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
@ -113,8 +115,7 @@ class Filter implements IFilter {
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
callback: Callback<T>
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
@ -140,8 +141,7 @@ class Filter implements IFilter {
const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
callback
);
return {
subscription,
@ -192,6 +192,7 @@ class Filter implements IFilter {
this.connectionManager,
this.peerManager,
this.libp2p,
this.config,
this.lightPush
)
);
@ -219,8 +220,7 @@ class Filter implements IFilter {
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
callback: Callback<T>
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
@ -244,7 +244,7 @@ class Filter implements IFilter {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options);
await subscription.subscribe(decoders, callback);
const contentTopics = Array.from(
groupByContentTopic(
@ -298,8 +298,9 @@ class Filter implements IFilter {
export function wakuFilter(
connectionManager: ConnectionManager,
peerManager: PeerManager,
lightPush?: ILightPush
lightPush?: ILightPush,
config?: Partial<FilterProtocolOptions>
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) =>
new Filter(connectionManager, libp2p, peerManager, lightPush);
new Filter(connectionManager, libp2p, peerManager, lightPush, config);
}

View File

@ -12,6 +12,7 @@ import {
type ContentTopic,
type CoreProtocolResult,
EConnectionStateEvents,
FilterProtocolOptions,
type IDecodedMessage,
type IDecoder,
type ILightPush,
@ -22,7 +23,6 @@ import {
ProtocolError,
type PubsubTopic,
type SDKProtocolResult,
type SubscribeOptions,
SubscriptionCallback
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
@ -32,23 +32,17 @@ import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";
import { PeerManager } from "../peer_manager.js";
import {
DEFAULT_KEEP_ALIVE,
DEFAULT_LIGHT_PUSH_FILTER_CHECK,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL,
DEFAULT_SUBSCRIBE_OPTIONS
} from "./constants.js";
import { DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL } from "./constants.js";
const log = new Logger("sdk:filter:subscription_manager");
export class SubscriptionManager implements ISubscription {
private reliabilityMonitor: ReceiverReliabilityMonitor;
private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE;
private keepAliveTimeout: number;
private enableLightPushFilterCheck: boolean;
private keepAliveInterval: ReturnType<typeof setInterval> | null = null;
private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK;
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
@ -60,6 +54,7 @@ export class SubscriptionManager implements ISubscription {
private readonly connectionManager: ConnectionManager,
private readonly peerManager: PeerManager,
private readonly libp2p: Libp2p,
config: FilterProtocolOptions,
private readonly lightPush?: ILightPush
) {
this.pubsubTopic = pubsubTopic;
@ -72,18 +67,15 @@ export class SubscriptionManager implements ISubscription {
this.protocol.subscribe.bind(this.protocol),
this.sendLightPushCheckMessage.bind(this)
);
this.reliabilityMonitor.setMaxPingFailures(config.pingsBeforePeerRenewed);
this.keepAliveTimeout = config.keepAliveIntervalMs;
this.enableLightPushFilterCheck = config.enableLightPushFilterCheck;
}
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
callback: Callback<T>
): Promise<SDKProtocolResult> {
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.enableLightPushFilterCheck =
options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK;
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
// check that all decoders are configured for the same pubsub topic as this subscription

View File

@ -0,0 +1,15 @@
import { FilterProtocolOptions } from "@waku/interfaces";
import * as C from "./constants.js";
export const buildConfig = (
config?: Partial<FilterProtocolOptions>
): FilterProtocolOptions => {
return {
keepAliveIntervalMs: config?.keepAliveIntervalMs || C.DEFAULT_KEEP_ALIVE,
pingsBeforePeerRenewed:
config?.pingsBeforePeerRenewed || C.DEFAULT_MAX_PINGS,
enableLightPushFilterCheck:
config?.enableLightPushFilterCheck || C.DEFAULT_LIGHT_PUSH_FILTER_CHECK
};
};

View File

@ -92,7 +92,8 @@ export class WakuNode implements IWaku {
const filter = wakuFilter(
this.connectionManager,
this.peerManager,
this.lightPush
this.lightPush,
options.filter
);
this.filter = filter(libp2p);
}