feat: add keep alive to Filter (#1970)

* fix: use pubsubTopic from current ones if not set

* fix: improve type on dial method

* enforce same pubusb on filter.subscribe, make content topic to pubsub mapping default for decoder / encoder

* fix mapping problem

* update tests

* add error handling

* fix typo

* up lock

* rm lock

* up lock

* remove only

* feat: implement keep alive for filter subscription

* remove

* address comments
This commit is contained in:
Sasha 2024-04-29 23:31:09 +02:00 committed by GitHub
parent bc98b4fb0d
commit 1a6bc4f8ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 62 additions and 9 deletions

View File

@ -8,14 +8,15 @@ import type {
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";
export type ContentFilter = {
contentTopic: string;
export type SubscribeOptions = {
keepAlive?: number;
};
export interface IFilterSubscription {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
callback: Callback<T>,
options?: SubscribeOptions
): Promise<void>;
unsubscribe(contentTopics: ContentTopic[]): Promise<void>;
@ -25,11 +26,10 @@ export interface IFilterSubscription {
unsubscribeAll(): Promise<void>;
}
export type IFilter = IReceiver & IBaseProtocolCore;
export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: SubscribeOptions
): Promise<IFilterSubscription>;
};

View File

@ -12,6 +12,7 @@ import type {
ProtocolCreateOptions,
PubsubTopic,
ShardingParams,
SubscribeOptions,
Unsubscribe
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
@ -33,10 +34,16 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
const log = new Logger("sdk:filter");
const MINUTE = 60 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE
};
export class SubscriptionManager {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private subscriptionCallbacks: Map<
ContentTopic,
@ -55,7 +62,8 @@ export class SubscriptionManager {
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<void> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
@ -94,6 +102,10 @@ export class SubscriptionManager {
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
}
}
async unsubscribe(contentTopics: ContentTopic[]): Promise<void> {
@ -108,6 +120,10 @@ export class SubscriptionManager {
const results = await Promise.allSettled(promises);
this.handleErrors(results, "unsubscribe");
if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) {
this.stopKeepAlivePings();
}
}
async ping(): Promise<void> {
@ -130,6 +146,10 @@ export class SubscriptionManager {
this.subscriptionCallbacks.clear();
this.handleErrors(results, "unsubscribeAll");
if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}
}
async processIncomingMessage(message: WakuMessage): Promise<void> {
@ -193,6 +213,38 @@ export class SubscriptionManager {
log.info(`${type} successful for all peers`);
}
}
private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
}
this.keepAliveTimer = setInterval(() => {
const run = async (): Promise<void> => {
try {
log.info("Recurring ping to peers.");
await this.ping();
} catch (error) {
log.error("Stopping recurring pings due to failure", error);
this.stopKeepAlivePings();
}
};
void run();
}, interval) as unknown as number;
}
private stopKeepAlivePings(): void {
if (!this.keepAliveTimer) {
log.info("Already stopped recurring pings.");
return;
}
log.info("Stopping recurring pings.");
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}
}
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
@ -291,7 +343,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<Unsubscribe> {
const pubsubTopics = this.getPubsubTopics<T>(decoders);
@ -309,7 +362,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
const subscription = await this.createSubscription(pubsubTopics[0]);
await subscription.subscribe(decoders, callback);
await subscription.subscribe(decoders, callback, options);
const contentTopics = Array.from(
groupByContentTopic(