chore: split SubscriptionManager into ReliabilityMonitor

This commit is contained in:
Danish Arora 2024-08-28 12:07:57 +05:30
parent 69b6612ec9
commit fa7ab23725
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E

View File

@ -57,28 +57,18 @@ const DEFAULT_KEEP_ALIVE = 30 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private readonly receivedMessagesHashes: ReceivedMessageHashes;
class ReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private peerFailures: Map<string, number> = new Map();
private missedMessagesByPeer: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
public constructor(
private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
@ -89,11 +79,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
}
public get messageHashes(): string[] {
return [...this.receivedMessagesHashes.all];
}
private addHash(hash: string, peerIdStr?: string): void {
public addHash(hash: string, peerIdStr?: string): void {
this.receivedMessagesHashes.all.add(hash);
if (peerIdStr) {
@ -101,16 +87,110 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
public async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}
public async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);
if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}
private async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
try {
const newPeer = await this.renewPeer(peerId);
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
}
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}
export class SubscriptionManager implements ISubscriptionSDK {
private readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
>;
private reliabilityMonitor: ReliabilityMonitor;
public constructor(
private readonly pubsubTopic: PubsubTopic,
private protocol: FilterCore,
private getPeers: () => Peer[],
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
this.reliabilityMonitor = new ReliabilityMonitor(
getPeers.bind(this),
renewPeer.bind(this)
);
}
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
this.maxMissedMessagesThreshold =
options.maxMissedMessagesThreshold ||
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
@ -218,37 +298,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}
private async validateMessage(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
}
try {
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}
}
}
public async processIncomingMessage(
message: WakuMessage,
peerIdStr: PeerIdStr
@ -258,8 +307,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
message as IProtoMessage
);
this.addHash(hashedMessageStr, peerIdStr);
void this.validateMessage();
this.reliabilityMonitor.addHash(hashedMessageStr, peerIdStr);
void this.reliabilityMonitor.validateMessage();
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
log.info("Message already received, skipping");
@ -322,13 +371,11 @@ export class SubscriptionManager implements ISubscriptionSDK {
try {
const result = await this.protocol.ping(peer);
if (result.failure) {
await this.handlePeerFailure(peerId);
} else {
this.peerFailures.delete(peerId.toString());
await this.reliabilityMonitor.handlePeerFailure(peerId);
}
return result;
} catch (error) {
await this.handlePeerFailure(peerId);
await this.reliabilityMonitor.handlePeerFailure(peerId);
return {
success: null,
failure: {
@ -339,45 +386,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
private async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);
if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}
private async renewAndSubscribePeer(
peerId: PeerId
): Promise<Peer | undefined> {
try {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
return newPeer;
} catch (error) {
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
return;
} finally {
this.peerFailures.delete(peerId.toString());
this.missedMessagesByPeer.delete(peerId.toString());
delete this.receivedMessagesHashes.nodes[peerId.toString()];
}
}
private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
if (this.keepAliveTimer) {
@ -402,16 +410,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {