feat: introduce ReliabilityMonitor

This commit is contained in:
Danish Arora 2024-08-28 13:16:07 +05:30
parent cd97aefc27
commit 4f4b91a5a1
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
2 changed files with 139 additions and 87 deletions

View File

@ -0,0 +1,114 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { IProtoMessage, PeerIdStr, PubsubTopic } from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
type ReceivedMessageHashes = {
all: Set<string>;
nodes: {
[peerId: PeerIdStr]: Set<string>;
};
};
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
const log = new Logger("sdk:filter:reliability_monitor");
export class ReliabilityMonitor {
public receivedMessagesHashStr: string[] = [];
public receivedMessagesHashes: ReceivedMessageHashes;
public missedMessagesByPeer: Map<string, number> = new Map();
public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
public constructor(
private getPeers: () => Peer[],
private renewAndSubscribePeer: (peerId: PeerId) => Promise<Peer | undefined>
) {
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
nodes: {
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
}
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}
public setMaxMissedMessagesThreshold(value: number | undefined): void {
if (value === undefined) {
return;
}
this.maxMissedMessagesThreshold = value;
}
public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}
public addMessage(
message: WakuMessage,
pubsubTopic: PubsubTopic,
peerIdStr?: string
): boolean {
const hashedMessageStr = messageHashStr(
pubsubTopic,
message as IProtoMessage
);
this.receivedMessagesHashes.all.add(hashedMessageStr);
if (peerIdStr) {
this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr);
}
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
return false;
} else {
this.receivedMessagesHashStr.push(hashedMessageStr);
return true;
}
}
public async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}

View File

@ -16,31 +16,21 @@ import {
type SubscribeOptions,
SubscriptionCallback
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, Logger } from "@waku/utils";
import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
type ReceivedMessageHashes = {
all: Set<string>;
nodes: {
[peerId: PeerIdStr]: Set<string>;
};
};
import { ReliabilityMonitor } from "./reliability_monitor.js";
const DEFAULT_MAX_PINGS = 3;
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
const log = new Logger("sdk:filter:subscription_manager");
export class SubscriptionManager implements ISubscriptionSDK {
private readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private readonly receivedMessagesHashes: ReceivedMessageHashes;
private peerFailures: Map<string, number> = new Map();
private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private reliabilityMonitor: ReliabilityMonitor;
private subscriptionCallbacks: Map<
ContentTopic,
@ -55,26 +45,10 @@ export class SubscriptionManager implements ISubscriptionSDK {
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
nodes: {
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
}
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}
public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}
private addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);
if (peerIdStr) {
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
}
this.reliabilityMonitor = new ReliabilityMonitor(
getPeers.bind(this),
this.renewAndSubscribePeer.bind(this)
);
}
public async subscribe<T extends IDecodedMessage>(
@ -84,9 +58,9 @@ export class SubscriptionManager implements ISubscriptionSDK {
): Promise<SDKProtocolResult> {
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
this.maxMissedMessagesThreshold =
options.maxMissedMessagesThreshold ||
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
this.reliabilityMonitor.setMaxMissedMessagesThreshold(
options.maxMissedMessagesThreshold
);
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
@ -194,54 +168,21 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}
private async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}
public async processIncomingMessage(
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const hashedMessageStr = messageHashStr(
const includesMessage = this.reliabilityMonitor.addMessage(
message,
this.pubsubTopic,
message as IProtoMessage
peerIdStr
);
void this.reliabilityMonitor.validateMessage();
this.addHash(hashedMessageStr, peerIdStr);
void this.validateMessage();
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
if (includesMessage) {
log.info("Message already received, skipping");
return;
}
this.receivedMessagesHashStr.push(hashedMessageStr);
const { contentTopic } = message;
const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic);
@ -340,8 +281,13 @@ export class SubscriptionManager implements ISubscriptionSDK {
Array.from(this.subscriptionCallbacks.keys())
);
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
this.reliabilityMonitor.receivedMessagesHashes.nodes[
newPeer.id.toString()
] = new Set();
this.reliabilityMonitor.missedMessagesByPeer.set(
newPeer.id.toString(),
0
);
return newPeer;
} catch (error) {
@ -349,8 +295,10 @@ export class SubscriptionManager implements ISubscriptionSDK {
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
this.reliabilityMonitor.missedMessagesByPeer.delete(peerId.toString());
delete this.reliabilityMonitor.receivedMessagesHashes.nodes[
peerId.toString()
];
}
}
@ -378,16 +326,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}
async function pushMessage<T extends IDecodedMessage>(