feat: abstract a ReliabilityMonitor over FilterSDK

This commit is contained in:
Danish Arora 2024-09-10 16:13:12 +05:30
parent 365c08403e
commit 777f497d13
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
6 changed files with 213 additions and 123 deletions

View File

@ -36,7 +36,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
) => void,
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
@ -291,7 +291,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
return;
}
await this.handleIncomingMessage(
this.handleIncomingMessage(
pubsubTopic,
wakuMessage,
connection.remotePeer.toString()

View File

@ -1,7 +1,13 @@
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { WakuMessage } from "@waku/proto";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, ThisOrThat } from "./misc.js";
import type {
ContentTopic,
PeerIdStr,
PubsubTopic,
ThisOrThat
} from "./misc.js";
import type {
Callback,
IBaseProtocolCore,
@ -12,6 +18,11 @@ import type {
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";
export type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
@ -26,12 +37,11 @@ export interface ISubscriptionSDK {
callback: Callback<T>,
options?: SubscribeOptions
): Promise<SDKProtocolResult>;
unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
ping(peerId?: PeerId): Promise<SDKProtocolResult>;
unsubscribeAll(): Promise<SDKProtocolResult>;
renewAndSubscribePeer(peerId: PeerId): Promise<Peer | undefined>;
}
export type IFilterSDK = IReceiver &
@ -42,6 +52,21 @@ export type IFilterSDK = IReceiver &
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult>;
activeSubscriptions: Map<PubsubTopic, ISubscriptionSDK>;
setIncomingMessageHandler(
handler: (
pubsubTopic: ContentTopic,
message: WakuMessage,
peerIdStr: PeerIdStr
) => void
): void;
handleIncomingMessage: (
pubsubTopic: ContentTopic,
message: WakuMessage,
peerIdStr: PeerIdStr
) => void;
};
export type SubscribeResult = SubscriptionSuccess | SubscriptionError;

View File

@ -8,6 +8,7 @@ import {
type IFilterSDK,
type Libp2p,
NetworkConfig,
PeerIdStr,
type ProtocolCreateOptions,
ProtocolError,
type ProtocolUseOptions,
@ -16,6 +17,7 @@ import {
SubscribeResult,
type Unsubscribe
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
@ -34,7 +36,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private readonly _connectionManager: ConnectionManager;
private activeSubscriptions = new Map<string, SubscriptionManager>();
public activeSubscriptions = new Map<PubsubTopic, SubscriptionManager>();
public constructor(
connectionManager: ConnectionManager,
@ -43,17 +45,9 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
) {
super(
new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
(pubsubTopic, message, peerIdStr) =>
this.handleIncomingMessage(pubsubTopic, message, peerIdStr),
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
connectionManager.configuredPubsubTopics,
libp2p
),
@ -65,6 +59,30 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this._connectionManager = connectionManager;
}
public handleIncomingMessage: (
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerIdStr: PeerIdStr
) => void = (pubsubTopic, message) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(`No subscription locally registered for topic ${pubsubTopic}`);
return;
}
void subscription.processIncomingMessage(message);
};
public setIncomingMessageHandler(
handler: (
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerIdStr: string
) => void
): void {
this.handleIncomingMessage = handler;
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.

View File

@ -9,24 +9,15 @@ import {
IDecoder,
IProtoMessage,
ISubscriptionSDK,
PeerIdStr,
ProtocolError,
PubsubTopic,
SDKProtocolResult,
SubscribeOptions,
SubscriptionCallback
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";
type ReceivedMessageHashes = {
all: Set<string>;
nodes: {
[peerId: PeerIdStr]: Set<string>;
};
};
export const DEFAULT_MAX_PINGS = 2;
export const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
export const DEFAULT_KEEP_ALIVE = 60 * 1000;
@ -38,48 +29,21 @@ export class SubscriptionManager implements ISubscriptionSDK {
ContentTopic,
SubscriptionCallback<IDecodedMessage>
> = new Map();
private readonly receivedMessagesHashStr: string[] = [];
private peerFailures: Map<string, number> = new Map();
private readonly receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();
private keepAliveInterval: number = DEFAULT_KEEP_ALIVE;
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private keepAliveTimer: number | null = null;
public constructor(
private readonly pubsubTopic: PubsubTopic,
public readonly pubsubTopic: PubsubTopic,
private readonly protocol: FilterCore,
private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
nodes: {
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
}
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}
private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);
if (!peerIdStr) {
return;
}
if (!this.receivedMessagesHashes.nodes[peerIdStr]) {
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
}
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
}
public async subscribe<T extends IDecodedMessage>(
@ -89,9 +53,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
): Promise<SDKProtocolResult> {
this.keepAliveInterval = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
this.maxMissedMessagesThreshold =
options.maxMissedMessagesThreshold ||
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
@ -193,55 +154,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}
private async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}
public async processIncomingMessage(
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const hashedMessageStr = messageHashStr(
this.pubsubTopic,
message as IProtoMessage
);
this.addHash(hashedMessageStr, peerIdStr);
void this.validateMessage();
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
return;
}
this.receivedMessagesHashStr.push(hashedMessageStr);
public async processIncomingMessage(message: WakuMessage): Promise<void> {
const { contentTopic } = message;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
if (!subscriptionCallback) {
@ -328,7 +241,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
private async renewAndSubscribePeer(
public async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
try {
@ -339,17 +252,12 @@ export class SubscriptionManager implements ISubscriptionSDK {
Array.from(this.subscriptionCallbacks.keys())
);
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
}
}
@ -426,16 +334,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.startKeepAlivePings(this.keepAliveInterval);
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}
async function pushMessage<T extends IDecodedMessage>(

View File

@ -0,0 +1,147 @@
import {
IFilterSDK,
IProtoMessage,
ISubscriptionSDK,
PeerIdStr,
PubsubTopic
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
const log = new Logger("sdk:message_reliability_monitor");
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
export class MessageReliabilityManager {
public constructor(private filter: IFilterSDK) {
this.filter.activeSubscriptions.forEach((subscription) => {
new MessageReliabilityMonitor(this.filter, subscription);
});
}
}
export class MessageReliabilityMonitor {
private receivedMessagesHashStr: string[] = [];
private receivedMessagesHashes: {
all: Set<string>;
nodes: Record<string, Set<string>>;
};
private missedMessagesByPeer: Map<string, number> = new Map();
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
public constructor(
private filter: IFilterSDK,
private subscription: ISubscriptionSDK
) {
this.receivedMessagesHashes = {
all: new Set(),
nodes: {}
};
this.initializeListeners();
}
private initializeListeners(): void {
this.filter.setIncomingMessageHandler(this.handleFilterMessage.bind(this));
}
private handleFilterMessage(
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerIdStr: string
): void {
const isReceived = this.isMessageAlreadyReceived(
pubsubTopic,
message,
peerIdStr
);
if (isReceived) {
return;
}
void this.validatePreviousMessage();
this.filter.handleIncomingMessage(pubsubTopic, message, peerIdStr);
}
private isMessageAlreadyReceived(
pubsubTopic: PubsubTopic,
message: WakuMessage,
peerIdStr?: string
): boolean {
const hashedMessageStr = messageHashStr(
pubsubTopic,
message as IProtoMessage
);
this.receivedMessagesHashes.all.add(hashedMessageStr);
if (peerIdStr) {
if (!this.receivedMessagesHashes.nodes[peerIdStr]) {
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
}
this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr);
}
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
return true;
} else {
this.receivedMessagesHashStr.push(hashedMessageStr);
return false;
}
}
private async validatePreviousMessage(): Promise<void> {
if (this.receivedMessagesHashStr.length < 2) {
return; // Not enough messages to validate
}
const previousMessageHash =
this.receivedMessagesHashStr[this.receivedMessagesHashStr.length - 2];
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(previousMessageHash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(`Peer ${peerIdStr} has missed too many messages, renewing.`);
await this.renewPeer(peerIdStr);
}
}
}
}
private async renewPeer(peerIdStr: PeerIdStr): Promise<void> {
try {
const peers = await this.filter.protocol.peerStore.all();
const peerId = peers.find((p) => p.id.toString() === peerIdStr)?.id;
if (!peerId) {
log.error(`Peer ${peerIdStr} not found in peer store`);
return;
}
await this.subscription.renewAndSubscribePeer(peerId);
this.missedMessagesByPeer.delete(peerIdStr);
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
log.info(`Successfully renewed peer ${peerIdStr}`);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}`, error);
}
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
public setMaxMissedMessagesThreshold(value: number): void {
this.maxMissedMessagesThreshold = value;
}
}

View File

@ -18,6 +18,7 @@ import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter/index.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { MessageReliabilityMonitor } from "./protocols/message_reliability_monitor.js";
import { wakuStore } from "./protocols/store.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60;
@ -115,6 +116,7 @@ export class WakuNode implements Waku {
if (protocolsEnabled.filter) {
const filter = wakuFilter(this.connectionManager);
this.filter = filter(libp2p);
new MessageReliabilityMonitor(this.filter);
}
log.info(