mirror of
https://github.com/waku-org/js-waku.git
synced 2025-01-13 05:54:54 +00:00
feat(filter): reliability monitor as a separate class to handle reliability logic (#2117)
* chore: move SubscriptionManager to a separate file * feat: introduce ReliabilityMonitor * fix: peer data updates * fix: logical error when returning includesMessage * chore: handle edge case * chore: move ping failures handling inside monitor * chore: move renewal logic to monitor * chore: improve structuring * chore: update logger * chore: readd connectionListener() logic from merge * chore: minor fixes * chore: improve * chore: setup destruction of ReliabilityMonitors * fix: condition for ping failure * fix: setters * chore: handle race condition & fix test
This commit is contained in:
parent
9d9a696024
commit
7ad1d321ca
@ -12,6 +12,11 @@ import type {
|
||||
} from "./protocols.js";
|
||||
import type { IReceiver } from "./receiver.js";
|
||||
|
||||
export type SubscriptionCallback<T extends IDecodedMessage> = {
|
||||
decoders: IDecoder<T>[];
|
||||
callback: Callback<T>;
|
||||
};
|
||||
|
||||
export type SubscribeOptions = {
|
||||
keepAlive?: number;
|
||||
pingsBeforePeerRenewed?: number;
|
||||
|
@ -15,7 +15,7 @@ export {
|
||||
createLibp2pAndUpdateOptions
|
||||
} from "./create/index.js";
|
||||
export { wakuLightPush } from "./protocols/light_push.js";
|
||||
export { wakuFilter } from "./protocols/filter.js";
|
||||
export { wakuFilter } from "./protocols/filter/index.js";
|
||||
export { wakuStore } from "./protocols/store.js";
|
||||
|
||||
export * as waku from "@waku/core";
|
||||
|
@ -33,7 +33,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||
|
||||
public constructor(
|
||||
protected core: BaseProtocol,
|
||||
private connectionManager: ConnectionManager,
|
||||
protected connectionManager: ConnectionManager,
|
||||
options: Options
|
||||
) {
|
||||
this.log = new Logger(`sdk:${core.multicodec}`);
|
||||
|
@ -1,763 +0,0 @@
|
||||
import type { Peer } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager, FilterCore } from "@waku/core";
|
||||
import {
|
||||
type Callback,
|
||||
type ContentTopic,
|
||||
type CoreProtocolResult,
|
||||
type CreateSubscriptionResult,
|
||||
EConnectionStateEvents,
|
||||
type IAsyncIterator,
|
||||
type IDecodedMessage,
|
||||
type IDecoder,
|
||||
type IFilterSDK,
|
||||
type IProtoMessage,
|
||||
type ISubscriptionSDK,
|
||||
type Libp2p,
|
||||
NetworkConfig,
|
||||
type PeerIdStr,
|
||||
type ProtocolCreateOptions,
|
||||
ProtocolError,
|
||||
type ProtocolUseOptions,
|
||||
type PubsubTopic,
|
||||
type SDKProtocolResult,
|
||||
type SubscribeOptions,
|
||||
SubscribeResult,
|
||||
type Unsubscribe
|
||||
} from "@waku/interfaces";
|
||||
import { messageHashStr } from "@waku/message-hash";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import {
|
||||
ensurePubsubTopicIsConfigured,
|
||||
groupByContentTopic,
|
||||
Logger,
|
||||
shardInfoToPubsubTopics,
|
||||
toAsyncIterator
|
||||
} from "@waku/utils";
|
||||
|
||||
import { BaseProtocolSDK } from "./base_protocol.js";
|
||||
|
||||
type SubscriptionCallback<T extends IDecodedMessage> = {
|
||||
decoders: IDecoder<T>[];
|
||||
callback: Callback<T>;
|
||||
};
|
||||
|
||||
type ReceivedMessageHashes = {
|
||||
all: Set<string>;
|
||||
nodes: {
|
||||
[peerId: PeerIdStr]: Set<string>;
|
||||
};
|
||||
};
|
||||
|
||||
const log = new Logger("sdk:filter");
|
||||
|
||||
const DEFAULT_MAX_PINGS = 2;
|
||||
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
|
||||
const DEFAULT_KEEP_ALIVE = 60 * 1000;
|
||||
|
||||
export class SubscriptionManager implements ISubscriptionSDK {
|
||||
private subscriptionCallbacks: Map<
|
||||
ContentTopic,
|
||||
SubscriptionCallback<IDecodedMessage>
|
||||
> = new Map();
|
||||
private readonly receivedMessagesHashStr: string[] = [];
|
||||
private peerFailures: Map<string, number> = new Map();
|
||||
private readonly receivedMessagesHashes: ReceivedMessageHashes;
|
||||
private missedMessagesByPeer: Map<string, number> = new Map();
|
||||
|
||||
private keepAliveInterval: number = DEFAULT_KEEP_ALIVE;
|
||||
private maxPingFailures: number = DEFAULT_MAX_PINGS;
|
||||
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
|
||||
|
||||
private keepAliveTimer: number | null = null;
|
||||
|
||||
public constructor(
|
||||
private readonly pubsubTopic: PubsubTopic,
|
||||
private readonly protocol: FilterCore,
|
||||
private readonly connectionManager: ConnectionManager,
|
||||
private readonly getPeers: () => Peer[],
|
||||
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
|
||||
) {
|
||||
this.pubsubTopic = pubsubTopic;
|
||||
|
||||
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
|
||||
this.receivedMessagesHashes = {
|
||||
all: new Set(),
|
||||
nodes: {
|
||||
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
|
||||
}
|
||||
};
|
||||
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
|
||||
}
|
||||
|
||||
private addHash(hash: string, peerIdStr?: string): void {
|
||||
this.receivedMessagesHashes.all.add(hash);
|
||||
|
||||
if (!peerIdStr) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.receivedMessagesHashes.nodes[peerIdStr]) {
|
||||
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
|
||||
}
|
||||
|
||||
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
|
||||
}
|
||||
|
||||
public async subscribe<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[],
|
||||
callback: Callback<T>,
|
||||
options: SubscribeOptions = {}
|
||||
): Promise<SDKProtocolResult> {
|
||||
this.keepAliveInterval = options.keepAlive || DEFAULT_KEEP_ALIVE;
|
||||
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
|
||||
this.maxMissedMessagesThreshold =
|
||||
options.maxMissedMessagesThreshold ||
|
||||
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
|
||||
|
||||
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
|
||||
|
||||
// check that all decoders are configured for the same pubsub topic as this subscription
|
||||
for (const decoder of decodersArray) {
|
||||
if (decoder.pubsubTopic !== this.pubsubTopic) {
|
||||
return {
|
||||
failures: [
|
||||
{
|
||||
error: ProtocolError.TOPIC_DECODER_MISMATCH
|
||||
}
|
||||
],
|
||||
successes: []
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const decodersGroupedByCT = groupByContentTopic(decodersArray);
|
||||
const contentTopics = Array.from(decodersGroupedByCT.keys());
|
||||
|
||||
const promises = this.getPeers().map(async (peer) =>
|
||||
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
|
||||
);
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
const finalResult = this.handleResult(results, "subscribe");
|
||||
|
||||
// Save the callback functions by content topics so they
|
||||
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
|
||||
// is called for those content topics
|
||||
decodersGroupedByCT.forEach((decoders, contentTopic) => {
|
||||
// Cast the type because a given `subscriptionCallbacks` map may hold
|
||||
// Decoder that decode to different implementations of `IDecodedMessage`
|
||||
const subscriptionCallback = {
|
||||
decoders,
|
||||
callback
|
||||
} as unknown as SubscriptionCallback<IDecodedMessage>;
|
||||
|
||||
// The callback and decoder may override previous values, this is on
|
||||
// purpose as the user may call `subscribe` to refresh the subscription
|
||||
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
|
||||
});
|
||||
|
||||
this.startSubscriptionsMaintenance(this.keepAliveInterval);
|
||||
|
||||
return finalResult;
|
||||
}
|
||||
|
||||
public async unsubscribe(
|
||||
contentTopics: ContentTopic[]
|
||||
): Promise<SDKProtocolResult> {
|
||||
const promises = this.getPeers().map(async (peer) => {
|
||||
const response = await this.protocol.unsubscribe(
|
||||
this.pubsubTopic,
|
||||
peer,
|
||||
contentTopics
|
||||
);
|
||||
|
||||
contentTopics.forEach((contentTopic: string) => {
|
||||
this.subscriptionCallbacks.delete(contentTopic);
|
||||
});
|
||||
|
||||
return response;
|
||||
});
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
const finalResult = this.handleResult(results, "unsubscribe");
|
||||
|
||||
if (this.subscriptionCallbacks.size === 0) {
|
||||
this.stopSubscriptionsMaintenance();
|
||||
}
|
||||
|
||||
return finalResult;
|
||||
}
|
||||
|
||||
public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
|
||||
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);
|
||||
|
||||
const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
return this.handleResult(results, "ping");
|
||||
}
|
||||
|
||||
public async unsubscribeAll(): Promise<SDKProtocolResult> {
|
||||
const promises = this.getPeers().map(async (peer) =>
|
||||
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
|
||||
);
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
this.subscriptionCallbacks.clear();
|
||||
|
||||
const finalResult = this.handleResult(results, "unsubscribeAll");
|
||||
|
||||
this.stopSubscriptionsMaintenance();
|
||||
|
||||
return finalResult;
|
||||
}
|
||||
|
||||
private async validateMessage(): Promise<void> {
|
||||
for (const hash of this.receivedMessagesHashes.all) {
|
||||
for (const [peerIdStr, hashes] of Object.entries(
|
||||
this.receivedMessagesHashes.nodes
|
||||
)) {
|
||||
if (!hashes.has(hash)) {
|
||||
this.incrementMissedMessageCount(peerIdStr);
|
||||
if (this.shouldRenewPeer(peerIdStr)) {
|
||||
log.info(
|
||||
`Peer ${peerIdStr} has missed too many messages, renewing.`
|
||||
);
|
||||
const peerId = this.getPeers().find(
|
||||
(p) => p.id.toString() === peerIdStr
|
||||
)?.id;
|
||||
if (!peerId) {
|
||||
log.error(
|
||||
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await this.renewAndSubscribePeer(peerId);
|
||||
} catch (error) {
|
||||
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async processIncomingMessage(
|
||||
message: WakuMessage,
|
||||
peerIdStr: PeerIdStr
|
||||
): Promise<void> {
|
||||
const hashedMessageStr = messageHashStr(
|
||||
this.pubsubTopic,
|
||||
message as IProtoMessage
|
||||
);
|
||||
|
||||
this.addHash(hashedMessageStr, peerIdStr);
|
||||
void this.validateMessage();
|
||||
|
||||
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
|
||||
log.info("Message already received, skipping");
|
||||
return;
|
||||
}
|
||||
this.receivedMessagesHashStr.push(hashedMessageStr);
|
||||
|
||||
const { contentTopic } = message;
|
||||
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
|
||||
if (!subscriptionCallback) {
|
||||
log.error("No subscription callback available for ", contentTopic);
|
||||
return;
|
||||
}
|
||||
log.info(
|
||||
"Processing message with content topic ",
|
||||
contentTopic,
|
||||
" on pubsub topic ",
|
||||
this.pubsubTopic
|
||||
);
|
||||
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
|
||||
}
|
||||
|
||||
private handleResult(
|
||||
results: PromiseSettledResult<CoreProtocolResult>[],
|
||||
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
|
||||
): SDKProtocolResult {
|
||||
const result: SDKProtocolResult = { failures: [], successes: [] };
|
||||
|
||||
for (const promiseResult of results) {
|
||||
if (promiseResult.status === "rejected") {
|
||||
log.error(
|
||||
`Failed to resolve ${type} promise successfully: `,
|
||||
promiseResult.reason
|
||||
);
|
||||
result.failures.push({ error: ProtocolError.GENERIC_FAIL });
|
||||
} else {
|
||||
const coreResult = promiseResult.value;
|
||||
if (coreResult.failure) {
|
||||
result.failures.push(coreResult.failure);
|
||||
} else {
|
||||
result.successes.push(coreResult.success);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private async pingSpecificPeer(peerId: PeerId): Promise<CoreProtocolResult> {
|
||||
const peer = this.getPeers().find((p) => p.id.equals(peerId));
|
||||
if (!peer) {
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
peerId,
|
||||
error: ProtocolError.NO_PEER_AVAILABLE
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.protocol.ping(peer);
|
||||
if (result.failure) {
|
||||
await this.handlePeerFailure(peerId);
|
||||
} else {
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
await this.handlePeerFailure(peerId);
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
peerId,
|
||||
error: ProtocolError.GENERIC_FAIL
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private async handlePeerFailure(peerId: PeerId): Promise<void> {
|
||||
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
|
||||
this.peerFailures.set(peerId.toString(), failures);
|
||||
|
||||
if (failures > this.maxPingFailures) {
|
||||
try {
|
||||
await this.renewAndSubscribePeer(peerId);
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
} catch (error) {
|
||||
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async renewAndSubscribePeer(
|
||||
peerId: PeerId
|
||||
): Promise<Peer | undefined> {
|
||||
try {
|
||||
const newPeer = await this.renewPeer(peerId);
|
||||
await this.protocol.subscribe(
|
||||
this.pubsubTopic,
|
||||
newPeer,
|
||||
Array.from(this.subscriptionCallbacks.keys())
|
||||
);
|
||||
|
||||
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
|
||||
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
|
||||
|
||||
return newPeer;
|
||||
} catch (error) {
|
||||
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
|
||||
return;
|
||||
} finally {
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
this.missedMessagesByPeer.delete(peerId.toString());
|
||||
delete this.receivedMessagesHashes.nodes[peerId.toString()];
|
||||
}
|
||||
}
|
||||
|
||||
private startSubscriptionsMaintenance(interval: number): void {
|
||||
this.startKeepAlivePings(interval);
|
||||
this.startConnectionListener();
|
||||
}
|
||||
|
||||
private stopSubscriptionsMaintenance(): void {
|
||||
this.stopKeepAlivePings();
|
||||
this.stopConnectionListener();
|
||||
}
|
||||
|
||||
private startKeepAlivePings(interval: number): void {
|
||||
if (this.keepAliveTimer) {
|
||||
log.info("Recurring pings already set up.");
|
||||
return;
|
||||
}
|
||||
|
||||
this.keepAliveTimer = setInterval(() => {
|
||||
void this.ping().catch((error) => {
|
||||
log.error("Error in keep-alive ping cycle:", error);
|
||||
});
|
||||
}, interval) as unknown as number;
|
||||
}
|
||||
|
||||
private stopKeepAlivePings(): void {
|
||||
if (!this.keepAliveTimer) {
|
||||
log.info("Already stopped recurring pings.");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping recurring pings.");
|
||||
clearInterval(this.keepAliveTimer);
|
||||
this.keepAliveTimer = null;
|
||||
}
|
||||
|
||||
private startConnectionListener(): void {
|
||||
this.connectionManager.addEventListener(
|
||||
EConnectionStateEvents.CONNECTION_STATUS,
|
||||
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
|
||||
);
|
||||
}
|
||||
|
||||
private stopConnectionListener(): void {
|
||||
this.connectionManager.removeEventListener(
|
||||
EConnectionStateEvents.CONNECTION_STATUS,
|
||||
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
|
||||
);
|
||||
}
|
||||
|
||||
private async connectionListener({
|
||||
detail: isConnected
|
||||
}: CustomEvent<boolean>): Promise<void> {
|
||||
if (!isConnected) {
|
||||
this.stopKeepAlivePings();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await this.ping();
|
||||
const renewPeerPromises = result.failures.map(
|
||||
async (v): Promise<void> => {
|
||||
if (v.peerId) {
|
||||
await this.renewAndSubscribePeer(v.peerId);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
await Promise.all(renewPeerPromises);
|
||||
} catch (err) {
|
||||
log.error(`networkStateListener failed to recover: ${err}`);
|
||||
}
|
||||
|
||||
this.startKeepAlivePings(this.keepAliveInterval);
|
||||
}
|
||||
|
||||
private incrementMissedMessageCount(peerIdStr: string): void {
|
||||
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
|
||||
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
|
||||
}
|
||||
|
||||
private shouldRenewPeer(peerIdStr: string): boolean {
|
||||
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
|
||||
return missedMessages > this.maxMissedMessagesThreshold;
|
||||
}
|
||||
}
|
||||
|
||||
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
|
||||
public readonly protocol: FilterCore;
|
||||
private readonly _connectionManager: ConnectionManager;
|
||||
|
||||
private activeSubscriptions = new Map<string, SubscriptionManager>();
|
||||
|
||||
public constructor(
|
||||
connectionManager: ConnectionManager,
|
||||
libp2p: Libp2p,
|
||||
options?: ProtocolCreateOptions
|
||||
) {
|
||||
super(
|
||||
new FilterCore(
|
||||
async (pubsubTopic, wakuMessage, peerIdStr) => {
|
||||
const subscription = this.getActiveSubscription(pubsubTopic);
|
||||
if (!subscription) {
|
||||
log.error(
|
||||
`No subscription locally registered for topic ${pubsubTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
|
||||
},
|
||||
connectionManager.configuredPubsubTopics,
|
||||
libp2p
|
||||
),
|
||||
connectionManager,
|
||||
{ numPeersToUse: options?.numPeersToUse }
|
||||
);
|
||||
|
||||
this.protocol = this.core as FilterCore;
|
||||
this._connectionManager = connectionManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a subscription with the Filter protocol using the provided decoders and callback.
|
||||
* This method combines the functionality of creating a subscription and subscribing to it.
|
||||
*
|
||||
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
|
||||
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
|
||||
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
|
||||
* @param {SubscribeOptions} [subscribeOptions] - Options for the subscription.
|
||||
*
|
||||
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
|
||||
* - subscription: The created subscription object if successful, or null if failed.
|
||||
* - error: A ProtocolError if the subscription creation failed, or null if successful.
|
||||
* - results: An object containing arrays of failures and successes from the subscription process.
|
||||
* Only present if the subscription was created successfully.
|
||||
*
|
||||
* @throws {Error} If there's an unexpected error during the subscription process.
|
||||
*
|
||||
* @remarks
|
||||
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
|
||||
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
|
||||
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
|
||||
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
|
||||
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
|
||||
* - Even if the subscription was created successfully, there might be some failures in the results.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
|
||||
* if (!subscription || error) {
|
||||
* console.error("Failed to create subscription:", error);
|
||||
* }
|
||||
* console.log("Subscription created successfully");
|
||||
* if (results.failures.length > 0) {
|
||||
* console.warn("Some errors occurred during subscription:", results.failures);
|
||||
* }
|
||||
* console.log("Successful subscriptions:", results.successes);
|
||||
*
|
||||
* ```
|
||||
*/
|
||||
public async subscribe<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[],
|
||||
callback: Callback<T>,
|
||||
protocolUseOptions?: ProtocolUseOptions,
|
||||
subscribeOptions?: SubscribeOptions
|
||||
): Promise<SubscribeResult> {
|
||||
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
|
||||
|
||||
if (uniquePubsubTopics.length !== 1) {
|
||||
return {
|
||||
subscription: null,
|
||||
error: ProtocolError.INVALID_DECODER_TOPICS,
|
||||
results: null
|
||||
};
|
||||
}
|
||||
|
||||
const pubsubTopic = uniquePubsubTopics[0];
|
||||
|
||||
const { subscription, error } = await this.createSubscription(
|
||||
pubsubTopic,
|
||||
protocolUseOptions
|
||||
);
|
||||
|
||||
if (error) {
|
||||
return {
|
||||
subscription: null,
|
||||
error: error,
|
||||
results: null
|
||||
};
|
||||
}
|
||||
|
||||
const { failures, successes } = await subscription.subscribe(
|
||||
decoders,
|
||||
callback,
|
||||
subscribeOptions
|
||||
);
|
||||
return {
|
||||
subscription,
|
||||
error: null,
|
||||
results: {
|
||||
failures: failures,
|
||||
successes: successes
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new subscription to the given pubsub topic.
|
||||
* The subscription is made to multiple peers for decentralization.
|
||||
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
|
||||
* @returns The subscription object.
|
||||
*/
|
||||
private async createSubscription(
|
||||
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
|
||||
options?: ProtocolUseOptions
|
||||
): Promise<CreateSubscriptionResult> {
|
||||
options = {
|
||||
autoRetry: true,
|
||||
...options
|
||||
} as ProtocolUseOptions;
|
||||
|
||||
const pubsubTopic =
|
||||
typeof pubsubTopicShardInfo == "string"
|
||||
? pubsubTopicShardInfo
|
||||
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
|
||||
|
||||
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
|
||||
|
||||
const hasPeers = await this.hasPeers(options);
|
||||
if (!hasPeers) {
|
||||
return {
|
||||
error: ProtocolError.NO_PEER_AVAILABLE,
|
||||
subscription: null
|
||||
};
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
|
||||
this.connectedPeers.map((peer) => peer.id.toString())
|
||||
);
|
||||
|
||||
const subscription =
|
||||
this.getActiveSubscription(pubsubTopic) ??
|
||||
this.setActiveSubscription(
|
||||
pubsubTopic,
|
||||
new SubscriptionManager(
|
||||
pubsubTopic,
|
||||
this.protocol,
|
||||
this._connectionManager,
|
||||
() => this.connectedPeers,
|
||||
this.renewPeer.bind(this)
|
||||
)
|
||||
);
|
||||
|
||||
return {
|
||||
error: null,
|
||||
subscription
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to satisfy the `IReceiver` interface.
|
||||
*
|
||||
* @hidden
|
||||
*
|
||||
* @param decoders The decoders to use for the subscription.
|
||||
* @param callback The callback function to use for the subscription.
|
||||
* @param opts Optional protocol options for the subscription.
|
||||
*
|
||||
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
|
||||
*
|
||||
* @remarks
|
||||
* This method should not be used directly.
|
||||
* Instead, use `createSubscription` to create a new subscription.
|
||||
*/
|
||||
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[],
|
||||
callback: Callback<T>,
|
||||
options?: SubscribeOptions
|
||||
): Promise<Unsubscribe> {
|
||||
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
|
||||
|
||||
if (uniquePubsubTopics.length === 0) {
|
||||
throw Error(
|
||||
"Failed to subscribe: no pubsubTopic found on decoders provided."
|
||||
);
|
||||
}
|
||||
|
||||
if (uniquePubsubTopics.length > 1) {
|
||||
throw Error(
|
||||
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
|
||||
);
|
||||
}
|
||||
|
||||
const { subscription, error } = await this.createSubscription(
|
||||
uniquePubsubTopics[0]
|
||||
);
|
||||
|
||||
if (error) {
|
||||
throw Error(`Failed to create subscription: ${error}`);
|
||||
}
|
||||
|
||||
await subscription.subscribe(decoders, callback, options);
|
||||
|
||||
const contentTopics = Array.from(
|
||||
groupByContentTopic(
|
||||
Array.isArray(decoders) ? decoders : [decoders]
|
||||
).keys()
|
||||
);
|
||||
|
||||
return async () => {
|
||||
await subscription.unsubscribe(contentTopics);
|
||||
};
|
||||
}
|
||||
|
||||
public toSubscriptionIterator<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[]
|
||||
): Promise<IAsyncIterator<T>> {
|
||||
return toAsyncIterator(this, decoders);
|
||||
}
|
||||
|
||||
//TODO: move to SubscriptionManager
|
||||
private getActiveSubscription(
|
||||
pubsubTopic: PubsubTopic
|
||||
): SubscriptionManager | undefined {
|
||||
return this.activeSubscriptions.get(pubsubTopic);
|
||||
}
|
||||
|
||||
private setActiveSubscription(
|
||||
pubsubTopic: PubsubTopic,
|
||||
subscription: SubscriptionManager
|
||||
): SubscriptionManager {
|
||||
this.activeSubscriptions.set(pubsubTopic, subscription);
|
||||
return subscription;
|
||||
}
|
||||
|
||||
private getUniquePubsubTopics<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[]
|
||||
): string[] {
|
||||
if (!Array.isArray(decoders)) {
|
||||
return [decoders.pubsubTopic];
|
||||
}
|
||||
|
||||
if (decoders.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
|
||||
|
||||
return [...pubsubTopics];
|
||||
}
|
||||
}
|
||||
|
||||
export function wakuFilter(
|
||||
connectionManager: ConnectionManager,
|
||||
init?: ProtocolCreateOptions
|
||||
): (libp2p: Libp2p) => IFilterSDK {
|
||||
return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
|
||||
}
|
||||
|
||||
async function pushMessage<T extends IDecodedMessage>(
|
||||
subscriptionCallback: SubscriptionCallback<T>,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage
|
||||
): Promise<void> {
|
||||
const { decoders, callback } = subscriptionCallback;
|
||||
|
||||
const { contentTopic } = message;
|
||||
if (!contentTopic) {
|
||||
log.warn("Message has no content topic, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const decodePromises = decoders.map((dec) =>
|
||||
dec
|
||||
.fromProtoObj(pubsubTopic, message as IProtoMessage)
|
||||
.then((decoded) => decoded || Promise.reject("Decoding failed"))
|
||||
);
|
||||
|
||||
const decodedMessage = await Promise.any(decodePromises);
|
||||
|
||||
await callback(decodedMessage);
|
||||
} catch (e) {
|
||||
log.error("Error decoding message", e);
|
||||
}
|
||||
}
|
5
packages/sdk/src/protocols/filter/constants.ts
Normal file
5
packages/sdk/src/protocols/filter/constants.ts
Normal file
@ -0,0 +1,5 @@
|
||||
export const DEFAULT_KEEP_ALIVE = 30 * 1000;
|
||||
|
||||
export const DEFAULT_SUBSCRIBE_OPTIONS = {
|
||||
keepAlive: DEFAULT_KEEP_ALIVE
|
||||
};
|
306
packages/sdk/src/protocols/filter/index.ts
Normal file
306
packages/sdk/src/protocols/filter/index.ts
Normal file
@ -0,0 +1,306 @@
|
||||
import { ConnectionManager, FilterCore } from "@waku/core";
|
||||
import {
|
||||
type Callback,
|
||||
type CreateSubscriptionResult,
|
||||
type IAsyncIterator,
|
||||
type IDecodedMessage,
|
||||
type IDecoder,
|
||||
type IFilterSDK,
|
||||
type Libp2p,
|
||||
NetworkConfig,
|
||||
type ProtocolCreateOptions,
|
||||
ProtocolError,
|
||||
type ProtocolUseOptions,
|
||||
type PubsubTopic,
|
||||
type SubscribeOptions,
|
||||
SubscribeResult,
|
||||
type Unsubscribe
|
||||
} from "@waku/interfaces";
|
||||
import {
|
||||
ensurePubsubTopicIsConfigured,
|
||||
groupByContentTopic,
|
||||
Logger,
|
||||
shardInfoToPubsubTopics,
|
||||
toAsyncIterator
|
||||
} from "@waku/utils";
|
||||
|
||||
import { BaseProtocolSDK } from "../base_protocol.js";
|
||||
|
||||
import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
|
||||
import { SubscriptionManager } from "./subscription_manager.js";
|
||||
|
||||
const log = new Logger("sdk:filter");
|
||||
|
||||
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
|
||||
public readonly protocol: FilterCore;
|
||||
|
||||
private activeSubscriptions = new Map<string, SubscriptionManager>();
|
||||
|
||||
public constructor(
|
||||
connectionManager: ConnectionManager,
|
||||
libp2p: Libp2p,
|
||||
options?: ProtocolCreateOptions
|
||||
) {
|
||||
super(
|
||||
new FilterCore(
|
||||
async (pubsubTopic, wakuMessage, peerIdStr) => {
|
||||
const subscription = this.getActiveSubscription(pubsubTopic);
|
||||
if (!subscription) {
|
||||
log.error(
|
||||
`No subscription locally registered for topic ${pubsubTopic}`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
|
||||
},
|
||||
connectionManager.configuredPubsubTopics,
|
||||
libp2p
|
||||
),
|
||||
connectionManager,
|
||||
{ numPeersToUse: options?.numPeersToUse }
|
||||
);
|
||||
|
||||
this.protocol = this.core as FilterCore;
|
||||
|
||||
this.activeSubscriptions = new Map();
|
||||
}
|
||||
|
||||
/**
|
||||
* Opens a subscription with the Filter protocol using the provided decoders and callback.
|
||||
* This method combines the functionality of creating a subscription and subscribing to it.
|
||||
*
|
||||
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
|
||||
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
|
||||
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
|
||||
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
|
||||
*
|
||||
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
|
||||
* - subscription: The created subscription object if successful, or null if failed.
|
||||
* - error: A ProtocolError if the subscription creation failed, or null if successful.
|
||||
* - results: An object containing arrays of failures and successes from the subscription process.
|
||||
* Only present if the subscription was created successfully.
|
||||
*
|
||||
* @throws {Error} If there's an unexpected error during the subscription process.
|
||||
*
|
||||
* @remarks
|
||||
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
|
||||
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
|
||||
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
|
||||
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
|
||||
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
|
||||
* - Even if the subscription was created successfully, there might be some failures in the results.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
|
||||
* if (!subscription || error) {
|
||||
* console.error("Failed to create subscription:", error);
|
||||
* }
|
||||
* console.log("Subscription created successfully");
|
||||
* if (results.failures.length > 0) {
|
||||
* console.warn("Some errors occurred during subscription:", results.failures);
|
||||
* }
|
||||
* console.log("Successful subscriptions:", results.successes);
|
||||
*
|
||||
* ```
|
||||
*/
|
||||
public async subscribe<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[],
|
||||
callback: Callback<T>,
|
||||
protocolUseOptions?: ProtocolUseOptions,
|
||||
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
|
||||
): Promise<SubscribeResult> {
|
||||
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
|
||||
|
||||
if (uniquePubsubTopics.length !== 1) {
|
||||
return {
|
||||
subscription: null,
|
||||
error: ProtocolError.INVALID_DECODER_TOPICS,
|
||||
results: null
|
||||
};
|
||||
}
|
||||
|
||||
const pubsubTopic = uniquePubsubTopics[0];
|
||||
|
||||
const { subscription, error } = await this.createSubscription(
|
||||
pubsubTopic,
|
||||
protocolUseOptions
|
||||
);
|
||||
|
||||
if (error) {
|
||||
return {
|
||||
subscription: null,
|
||||
error: error,
|
||||
results: null
|
||||
};
|
||||
}
|
||||
|
||||
const { failures, successes } = await subscription.subscribe(
|
||||
decoders,
|
||||
callback,
|
||||
subscribeOptions
|
||||
);
|
||||
return {
|
||||
subscription,
|
||||
error: null,
|
||||
results: {
|
||||
failures: failures,
|
||||
successes: successes
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new subscription to the given pubsub topic.
|
||||
* The subscription is made to multiple peers for decentralization.
|
||||
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
|
||||
* @returns The subscription object.
|
||||
*/
|
||||
private async createSubscription(
|
||||
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
|
||||
options?: ProtocolUseOptions
|
||||
): Promise<CreateSubscriptionResult> {
|
||||
options = {
|
||||
autoRetry: true,
|
||||
...options
|
||||
} as ProtocolUseOptions;
|
||||
|
||||
const pubsubTopic =
|
||||
typeof pubsubTopicShardInfo == "string"
|
||||
? pubsubTopicShardInfo
|
||||
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
|
||||
|
||||
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
|
||||
|
||||
const hasPeers = await this.hasPeers(options);
|
||||
if (!hasPeers) {
|
||||
return {
|
||||
error: ProtocolError.NO_PEER_AVAILABLE,
|
||||
subscription: null
|
||||
};
|
||||
}
|
||||
|
||||
log.info(
|
||||
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
|
||||
this.connectedPeers.map((peer) => peer.id.toString())
|
||||
);
|
||||
|
||||
const subscription =
|
||||
this.getActiveSubscription(pubsubTopic) ??
|
||||
this.setActiveSubscription(
|
||||
pubsubTopic,
|
||||
new SubscriptionManager(
|
||||
pubsubTopic,
|
||||
this.protocol,
|
||||
this.connectionManager,
|
||||
() => this.connectedPeers,
|
||||
this.renewPeer.bind(this)
|
||||
)
|
||||
);
|
||||
|
||||
return {
|
||||
error: null,
|
||||
subscription
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to satisfy the `IReceiver` interface.
|
||||
*
|
||||
* @hidden
|
||||
*
|
||||
* @param decoders The decoders to use for the subscription.
|
||||
* @param callback The callback function to use for the subscription.
|
||||
* @param opts Optional protocol options for the subscription.
|
||||
*
|
||||
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
|
||||
*
|
||||
* @remarks
|
||||
* This method should not be used directly.
|
||||
* Instead, use `createSubscription` to create a new subscription.
|
||||
*/
|
||||
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[],
|
||||
callback: Callback<T>,
|
||||
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
|
||||
): Promise<Unsubscribe> {
|
||||
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
|
||||
|
||||
if (uniquePubsubTopics.length === 0) {
|
||||
throw Error(
|
||||
"Failed to subscribe: no pubsubTopic found on decoders provided."
|
||||
);
|
||||
}
|
||||
|
||||
if (uniquePubsubTopics.length > 1) {
|
||||
throw Error(
|
||||
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
|
||||
);
|
||||
}
|
||||
|
||||
const { subscription, error } = await this.createSubscription(
|
||||
uniquePubsubTopics[0]
|
||||
);
|
||||
|
||||
if (error) {
|
||||
throw Error(`Failed to create subscription: ${error}`);
|
||||
}
|
||||
|
||||
await subscription.subscribe(decoders, callback, options);
|
||||
|
||||
const contentTopics = Array.from(
|
||||
groupByContentTopic(
|
||||
Array.isArray(decoders) ? decoders : [decoders]
|
||||
).keys()
|
||||
);
|
||||
|
||||
return async () => {
|
||||
await subscription.unsubscribe(contentTopics);
|
||||
};
|
||||
}
|
||||
|
||||
public toSubscriptionIterator<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[]
|
||||
): Promise<IAsyncIterator<T>> {
|
||||
return toAsyncIterator(this, decoders);
|
||||
}
|
||||
|
||||
//TODO: move to SubscriptionManager
|
||||
private getActiveSubscription(
|
||||
pubsubTopic: PubsubTopic
|
||||
): SubscriptionManager | undefined {
|
||||
return this.activeSubscriptions.get(pubsubTopic);
|
||||
}
|
||||
|
||||
private setActiveSubscription(
|
||||
pubsubTopic: PubsubTopic,
|
||||
subscription: SubscriptionManager
|
||||
): SubscriptionManager {
|
||||
this.activeSubscriptions.set(pubsubTopic, subscription);
|
||||
return subscription;
|
||||
}
|
||||
|
||||
private getUniquePubsubTopics<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[]
|
||||
): string[] {
|
||||
if (!Array.isArray(decoders)) {
|
||||
return [decoders.pubsubTopic];
|
||||
}
|
||||
|
||||
if (decoders.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
|
||||
|
||||
return [...pubsubTopics];
|
||||
}
|
||||
}
|
||||
|
||||
export function wakuFilter(
|
||||
connectionManager: ConnectionManager,
|
||||
init?: ProtocolCreateOptions
|
||||
): (libp2p: Libp2p) => IFilterSDK {
|
||||
return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
|
||||
}
|
253
packages/sdk/src/protocols/filter/reliability_monitor.ts
Normal file
253
packages/sdk/src/protocols/filter/reliability_monitor.ts
Normal file
@ -0,0 +1,253 @@
|
||||
import type { Peer, PeerId } from "@libp2p/interface";
|
||||
import {
|
||||
ContentTopic,
|
||||
CoreProtocolResult,
|
||||
IProtoMessage,
|
||||
PeerIdStr,
|
||||
PubsubTopic
|
||||
} from "@waku/interfaces";
|
||||
import { messageHashStr } from "@waku/message-hash";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
type ReceivedMessageHashes = {
|
||||
all: Set<string>;
|
||||
nodes: Record<PeerIdStr, Set<string>>;
|
||||
};
|
||||
|
||||
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
|
||||
|
||||
const log = new Logger("sdk:receiver:reliability_monitor");
|
||||
|
||||
const DEFAULT_MAX_PINGS = 3;
|
||||
|
||||
export class ReliabilityMonitorManager {
|
||||
private static receiverMonitors: Map<
|
||||
PubsubTopic,
|
||||
ReceiverReliabilityMonitor
|
||||
> = new Map();
|
||||
|
||||
public static createReceiverMonitor(
|
||||
pubsubTopic: PubsubTopic,
|
||||
getPeers: () => Peer[],
|
||||
renewPeer: (peerId: PeerId) => Promise<Peer>,
|
||||
getContentTopics: () => ContentTopic[],
|
||||
protocolSubscribe: (
|
||||
pubsubTopic: PubsubTopic,
|
||||
peer: Peer,
|
||||
contentTopics: ContentTopic[]
|
||||
) => Promise<CoreProtocolResult>
|
||||
): ReceiverReliabilityMonitor {
|
||||
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
|
||||
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
|
||||
}
|
||||
|
||||
const monitor = new ReceiverReliabilityMonitor(
|
||||
pubsubTopic,
|
||||
getPeers,
|
||||
renewPeer,
|
||||
getContentTopics,
|
||||
protocolSubscribe
|
||||
);
|
||||
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
|
||||
return monitor;
|
||||
}
|
||||
|
||||
private constructor() {}
|
||||
|
||||
public static destroy(pubsubTopic: PubsubTopic): void {
|
||||
this.receiverMonitors.delete(pubsubTopic);
|
||||
}
|
||||
|
||||
public static destroyAll(): void {
|
||||
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
|
||||
monitor.setMaxMissedMessagesThreshold(undefined);
|
||||
monitor.setMaxPingFailures(undefined);
|
||||
this.receiverMonitors.delete(pubsubTopic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class ReceiverReliabilityMonitor {
|
||||
private receivedMessagesHashes: ReceivedMessageHashes;
|
||||
private missedMessagesByPeer: Map<string, number> = new Map();
|
||||
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
|
||||
private peerFailures: Map<string, number> = new Map();
|
||||
private maxPingFailures: number = DEFAULT_MAX_PINGS;
|
||||
private peerRenewalLocks: Set<PeerIdStr> = new Set();
|
||||
|
||||
public constructor(
|
||||
private readonly pubsubTopic: PubsubTopic,
|
||||
private getPeers: () => Peer[],
|
||||
private renewPeer: (peerId: PeerId) => Promise<Peer>,
|
||||
private getContentTopics: () => ContentTopic[],
|
||||
private protocolSubscribe: (
|
||||
pubsubTopic: PubsubTopic,
|
||||
peer: Peer,
|
||||
contentTopics: ContentTopic[]
|
||||
) => Promise<CoreProtocolResult>
|
||||
) {
|
||||
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
|
||||
|
||||
this.receivedMessagesHashes = {
|
||||
all: new Set(),
|
||||
nodes: {
|
||||
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
|
||||
}
|
||||
};
|
||||
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
|
||||
}
|
||||
|
||||
public setMaxMissedMessagesThreshold(value: number | undefined): void {
|
||||
if (value === undefined) {
|
||||
return;
|
||||
}
|
||||
this.maxMissedMessagesThreshold = value;
|
||||
}
|
||||
|
||||
public setMaxPingFailures(value: number | undefined): void {
|
||||
if (value === undefined) {
|
||||
return;
|
||||
}
|
||||
this.maxPingFailures = value;
|
||||
}
|
||||
|
||||
public async handlePingResult(
|
||||
peerId: PeerId,
|
||||
result?: CoreProtocolResult
|
||||
): Promise<void> {
|
||||
if (result?.success) {
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
|
||||
this.peerFailures.set(peerId.toString(), failures);
|
||||
|
||||
if (failures >= this.maxPingFailures) {
|
||||
try {
|
||||
await this.renewAndSubscribePeer(peerId);
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
} catch (error) {
|
||||
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public processIncomingMessage(
|
||||
message: WakuMessage,
|
||||
pubsubTopic: PubsubTopic,
|
||||
peerIdStr?: string
|
||||
): boolean {
|
||||
const alreadyReceived = this.addMessageToCache(
|
||||
message,
|
||||
pubsubTopic,
|
||||
peerIdStr
|
||||
);
|
||||
void this.checkAndRenewPeers();
|
||||
return alreadyReceived;
|
||||
}
|
||||
|
||||
private addMessageToCache(
|
||||
message: WakuMessage,
|
||||
pubsubTopic: PubsubTopic,
|
||||
peerIdStr?: string
|
||||
): boolean {
|
||||
const hashedMessageStr = messageHashStr(
|
||||
pubsubTopic,
|
||||
message as IProtoMessage
|
||||
);
|
||||
|
||||
const alreadyReceived =
|
||||
this.receivedMessagesHashes.all.has(hashedMessageStr);
|
||||
this.receivedMessagesHashes.all.add(hashedMessageStr);
|
||||
|
||||
if (peerIdStr) {
|
||||
const hashesForPeer = this.receivedMessagesHashes.nodes[peerIdStr];
|
||||
if (!hashesForPeer) {
|
||||
log.warn(
|
||||
`Peer ${peerIdStr} not initialized in receivedMessagesHashes.nodes, adding it.`
|
||||
);
|
||||
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
|
||||
}
|
||||
this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr);
|
||||
}
|
||||
|
||||
return alreadyReceived;
|
||||
}
|
||||
|
||||
private async checkAndRenewPeers(): Promise<void> {
|
||||
for (const hash of this.receivedMessagesHashes.all) {
|
||||
for (const [peerIdStr, hashes] of Object.entries(
|
||||
this.receivedMessagesHashes.nodes
|
||||
)) {
|
||||
if (!hashes.has(hash)) {
|
||||
this.incrementMissedMessageCount(peerIdStr);
|
||||
if (this.shouldRenewPeer(peerIdStr)) {
|
||||
log.info(
|
||||
`Peer ${peerIdStr} has missed too many messages, renewing.`
|
||||
);
|
||||
const peerId = this.getPeers().find(
|
||||
(p) => p.id.toString() === peerIdStr
|
||||
)?.id;
|
||||
if (!peerId) {
|
||||
log.error(
|
||||
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await this.renewAndSubscribePeer(peerId);
|
||||
} catch (error) {
|
||||
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async renewAndSubscribePeer(
|
||||
peerId: PeerId
|
||||
): Promise<Peer | undefined> {
|
||||
try {
|
||||
if (this.peerRenewalLocks.has(peerId.toString())) {
|
||||
log.info(`Peer ${peerId.toString()} is already being renewed.`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.peerRenewalLocks.add(peerId.toString());
|
||||
|
||||
const newPeer = await this.renewPeer(peerId);
|
||||
await this.protocolSubscribe(
|
||||
this.pubsubTopic,
|
||||
newPeer,
|
||||
this.getContentTopics()
|
||||
);
|
||||
|
||||
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
|
||||
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
|
||||
|
||||
this.peerFailures.delete(peerId.toString());
|
||||
this.missedMessagesByPeer.delete(peerId.toString());
|
||||
delete this.receivedMessagesHashes.nodes[peerId.toString()];
|
||||
|
||||
return newPeer;
|
||||
} catch (error) {
|
||||
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
|
||||
return;
|
||||
} finally {
|
||||
this.peerRenewalLocks.delete(peerId.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private incrementMissedMessageCount(peerIdStr: string): void {
|
||||
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
|
||||
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
|
||||
}
|
||||
|
||||
private shouldRenewPeer(peerIdStr: string): boolean {
|
||||
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
|
||||
return missedMessages > this.maxMissedMessagesThreshold;
|
||||
}
|
||||
}
|
347
packages/sdk/src/protocols/filter/subscription_manager.ts
Normal file
347
packages/sdk/src/protocols/filter/subscription_manager.ts
Normal file
@ -0,0 +1,347 @@
|
||||
import type { Peer } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager, FilterCore } from "@waku/core";
|
||||
import {
|
||||
type Callback,
|
||||
type ContentTopic,
|
||||
type CoreProtocolResult,
|
||||
EConnectionStateEvents,
|
||||
type IDecodedMessage,
|
||||
type IDecoder,
|
||||
type IProtoMessage,
|
||||
type ISubscriptionSDK,
|
||||
type PeerIdStr,
|
||||
ProtocolError,
|
||||
type PubsubTopic,
|
||||
type SDKProtocolResult,
|
||||
type SubscribeOptions,
|
||||
SubscriptionCallback
|
||||
} from "@waku/interfaces";
|
||||
import { WakuMessage } from "@waku/proto";
|
||||
import { groupByContentTopic, Logger } from "@waku/utils";
|
||||
|
||||
import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
|
||||
import {
|
||||
ReceiverReliabilityMonitor,
|
||||
ReliabilityMonitorManager
|
||||
} from "./reliability_monitor.js";
|
||||
|
||||
const log = new Logger("sdk:filter:subscription_manager");
|
||||
|
||||
export class SubscriptionManager implements ISubscriptionSDK {
|
||||
private reliabilityMonitor: ReceiverReliabilityMonitor;
|
||||
|
||||
private keepAliveTimer: number | null = null;
|
||||
private subscriptionCallbacks: Map<
|
||||
ContentTopic,
|
||||
SubscriptionCallback<IDecodedMessage>
|
||||
>;
|
||||
|
||||
public constructor(
|
||||
private readonly pubsubTopic: PubsubTopic,
|
||||
private readonly protocol: FilterCore,
|
||||
private readonly connectionManager: ConnectionManager,
|
||||
private readonly getPeers: () => Peer[],
|
||||
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
|
||||
) {
|
||||
this.pubsubTopic = pubsubTopic;
|
||||
this.subscriptionCallbacks = new Map();
|
||||
|
||||
this.reliabilityMonitor = ReliabilityMonitorManager.createReceiverMonitor(
|
||||
this.pubsubTopic,
|
||||
this.getPeers.bind(this),
|
||||
this.renewPeer.bind(this),
|
||||
() => Array.from(this.subscriptionCallbacks.keys()),
|
||||
this.protocol.subscribe.bind(this.protocol)
|
||||
);
|
||||
}
|
||||
|
||||
public async subscribe<T extends IDecodedMessage>(
|
||||
decoders: IDecoder<T> | IDecoder<T>[],
|
||||
callback: Callback<T>,
|
||||
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
|
||||
): Promise<SDKProtocolResult> {
|
||||
this.reliabilityMonitor.setMaxMissedMessagesThreshold(
|
||||
options.maxMissedMessagesThreshold
|
||||
);
|
||||
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
|
||||
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
|
||||
|
||||
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
|
||||
|
||||
// check that all decoders are configured for the same pubsub topic as this subscription
|
||||
for (const decoder of decodersArray) {
|
||||
if (decoder.pubsubTopic !== this.pubsubTopic) {
|
||||
return {
|
||||
failures: [
|
||||
{
|
||||
error: ProtocolError.TOPIC_DECODER_MISMATCH
|
||||
}
|
||||
],
|
||||
successes: []
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const decodersGroupedByCT = groupByContentTopic(decodersArray);
|
||||
const contentTopics = Array.from(decodersGroupedByCT.keys());
|
||||
|
||||
const promises = this.getPeers().map(async (peer) =>
|
||||
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
|
||||
);
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
const finalResult = this.handleResult(results, "subscribe");
|
||||
|
||||
// Save the callback functions by content topics so they
|
||||
// can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`)
|
||||
// is called for those content topics
|
||||
decodersGroupedByCT.forEach((decoders, contentTopic) => {
|
||||
// Cast the type because a given `subscriptionCallbacks` map may hold
|
||||
// Decoder that decode to different implementations of `IDecodedMessage`
|
||||
const subscriptionCallback = {
|
||||
decoders,
|
||||
callback
|
||||
} as unknown as SubscriptionCallback<IDecodedMessage>;
|
||||
|
||||
// The callback and decoder may override previous values, this is on
|
||||
// purpose as the user may call `subscribe` to refresh the subscription
|
||||
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
|
||||
});
|
||||
|
||||
this.startSubscriptionsMaintenance(this.keepAliveTimer);
|
||||
|
||||
return finalResult;
|
||||
}
|
||||
|
||||
public async unsubscribe(
|
||||
contentTopics: ContentTopic[]
|
||||
): Promise<SDKProtocolResult> {
|
||||
const promises = this.getPeers().map(async (peer) => {
|
||||
const response = await this.protocol.unsubscribe(
|
||||
this.pubsubTopic,
|
||||
peer,
|
||||
contentTopics
|
||||
);
|
||||
|
||||
contentTopics.forEach((contentTopic: string) => {
|
||||
this.subscriptionCallbacks.delete(contentTopic);
|
||||
});
|
||||
|
||||
return response;
|
||||
});
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
const finalResult = this.handleResult(results, "unsubscribe");
|
||||
|
||||
if (this.subscriptionCallbacks.size === 0) {
|
||||
this.stopSubscriptionsMaintenance();
|
||||
}
|
||||
|
||||
return finalResult;
|
||||
}
|
||||
|
||||
public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
|
||||
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);
|
||||
|
||||
const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
return this.handleResult(results, "ping");
|
||||
}
|
||||
|
||||
public async unsubscribeAll(): Promise<SDKProtocolResult> {
|
||||
const promises = this.getPeers().map(async (peer) =>
|
||||
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
|
||||
);
|
||||
|
||||
const results = await Promise.allSettled(promises);
|
||||
|
||||
this.subscriptionCallbacks.clear();
|
||||
|
||||
const finalResult = this.handleResult(results, "unsubscribeAll");
|
||||
|
||||
this.stopSubscriptionsMaintenance();
|
||||
|
||||
return finalResult;
|
||||
}
|
||||
|
||||
public async processIncomingMessage(
|
||||
message: WakuMessage,
|
||||
peerIdStr: PeerIdStr
|
||||
): Promise<void> {
|
||||
const alreadyReceived = this.reliabilityMonitor.processIncomingMessage(
|
||||
message,
|
||||
this.pubsubTopic,
|
||||
peerIdStr
|
||||
);
|
||||
|
||||
if (alreadyReceived) {
|
||||
log.info("Message already received, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
const { contentTopic } = message;
|
||||
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
|
||||
if (!subscriptionCallback) {
|
||||
log.error("No subscription callback available for ", contentTopic);
|
||||
return;
|
||||
}
|
||||
log.info(
|
||||
"Processing message with content topic ",
|
||||
contentTopic,
|
||||
" on pubsub topic ",
|
||||
this.pubsubTopic
|
||||
);
|
||||
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
|
||||
}
|
||||
|
||||
private handleResult(
|
||||
results: PromiseSettledResult<CoreProtocolResult>[],
|
||||
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
|
||||
): SDKProtocolResult {
|
||||
const result: SDKProtocolResult = { failures: [], successes: [] };
|
||||
|
||||
for (const promiseResult of results) {
|
||||
if (promiseResult.status === "rejected") {
|
||||
log.error(
|
||||
`Failed to resolve ${type} promise successfully: `,
|
||||
promiseResult.reason
|
||||
);
|
||||
result.failures.push({ error: ProtocolError.GENERIC_FAIL });
|
||||
} else {
|
||||
const coreResult = promiseResult.value;
|
||||
if (coreResult.failure) {
|
||||
result.failures.push(coreResult.failure);
|
||||
} else {
|
||||
result.successes.push(coreResult.success);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private async pingSpecificPeer(peerId: PeerId): Promise<CoreProtocolResult> {
|
||||
const peer = this.getPeers().find((p) => p.id.equals(peerId));
|
||||
if (!peer) {
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
peerId,
|
||||
error: ProtocolError.NO_PEER_AVAILABLE
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
let result;
|
||||
try {
|
||||
result = await this.protocol.ping(peer);
|
||||
return result;
|
||||
} catch (error) {
|
||||
return {
|
||||
success: null,
|
||||
failure: {
|
||||
peerId,
|
||||
error: ProtocolError.GENERIC_FAIL
|
||||
}
|
||||
};
|
||||
} finally {
|
||||
void this.reliabilityMonitor.handlePingResult(peerId, result);
|
||||
}
|
||||
}
|
||||
|
||||
private startSubscriptionsMaintenance(interval: number): void {
|
||||
this.startKeepAlivePings(interval);
|
||||
this.startConnectionListener();
|
||||
}
|
||||
|
||||
private stopSubscriptionsMaintenance(): void {
|
||||
this.stopKeepAlivePings();
|
||||
this.stopConnectionListener();
|
||||
}
|
||||
|
||||
private startConnectionListener(): void {
|
||||
this.connectionManager.addEventListener(
|
||||
EConnectionStateEvents.CONNECTION_STATUS,
|
||||
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
|
||||
);
|
||||
}
|
||||
|
||||
private stopConnectionListener(): void {
|
||||
this.connectionManager.removeEventListener(
|
||||
EConnectionStateEvents.CONNECTION_STATUS,
|
||||
this.connectionListener.bind(this) as (v: CustomEvent<boolean>) => void
|
||||
);
|
||||
}
|
||||
|
||||
private async connectionListener({
|
||||
detail: isConnected
|
||||
}: CustomEvent<boolean>): Promise<void> {
|
||||
if (!isConnected) {
|
||||
this.stopKeepAlivePings();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// we do nothing here, as the renewal process is managed internally by `this.ping()`
|
||||
await this.ping();
|
||||
} catch (err) {
|
||||
log.error(`networkStateListener failed to recover: ${err}`);
|
||||
}
|
||||
|
||||
this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE);
|
||||
}
|
||||
|
||||
private startKeepAlivePings(interval: number): void {
|
||||
if (this.keepAliveTimer) {
|
||||
log.info("Recurring pings already set up.");
|
||||
return;
|
||||
}
|
||||
|
||||
this.keepAliveTimer = setInterval(() => {
|
||||
void this.ping().catch((error) => {
|
||||
log.error("Error in keep-alive ping cycle:", error);
|
||||
});
|
||||
}, interval) as unknown as number;
|
||||
}
|
||||
|
||||
private stopKeepAlivePings(): void {
|
||||
if (!this.keepAliveTimer) {
|
||||
log.info("Already stopped recurring pings.");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Stopping recurring pings.");
|
||||
clearInterval(this.keepAliveTimer);
|
||||
this.keepAliveTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
async function pushMessage<T extends IDecodedMessage>(
|
||||
subscriptionCallback: SubscriptionCallback<T>,
|
||||
pubsubTopic: PubsubTopic,
|
||||
message: WakuMessage
|
||||
): Promise<void> {
|
||||
const { decoders, callback } = subscriptionCallback;
|
||||
|
||||
const { contentTopic } = message;
|
||||
if (!contentTopic) {
|
||||
log.warn("Message has no content topic, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const decodePromises = decoders.map((dec) =>
|
||||
dec
|
||||
.fromProtoObj(pubsubTopic, message as IProtoMessage)
|
||||
.then((decoded) => decoded || Promise.reject("Decoding failed"))
|
||||
);
|
||||
|
||||
const decodedMessage = await Promise.any(decodePromises);
|
||||
|
||||
await callback(decodedMessage);
|
||||
} catch (e) {
|
||||
log.error("Error decoding message", e);
|
||||
}
|
||||
}
|
@ -16,7 +16,8 @@ import type {
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { wakuFilter } from "./protocols/filter.js";
|
||||
import { wakuFilter } from "./protocols/filter/index.js";
|
||||
import { ReliabilityMonitorManager } from "./protocols/filter/reliability_monitor.js";
|
||||
import { wakuLightPush } from "./protocols/light_push.js";
|
||||
import { wakuStore } from "./protocols/store.js";
|
||||
|
||||
@ -195,6 +196,7 @@ export class WakuNode implements Waku {
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
ReliabilityMonitorManager.destroyAll();
|
||||
this.connectionManager.stop();
|
||||
await this.libp2p.stop();
|
||||
}
|
||||
|
@ -187,9 +187,12 @@ describe("Waku Filter: Peer Management: E2E", function () {
|
||||
// One more failure should trigger renewal
|
||||
await subscription.ping(targetPeer.id);
|
||||
|
||||
// adds delay as renewal happens as an async operation in the bg
|
||||
await delay(300);
|
||||
|
||||
expect(
|
||||
waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id))
|
||||
).to.be.false;
|
||||
).to.eq(false);
|
||||
expect(waku.filter.connectedPeers.length).to.equal(
|
||||
waku.filter.numPeersToUse
|
||||
);
|
||||
|
Loading…
x
Reference in New Issue
Block a user