fix: attempt to fix some of the Filter issues (#2183)

* feat: lighten retry logic for LightPush

* update tests

* remove base protocol sdk from light push, add unit tests for light push

* remove replaced test

* ensure numPeersToUse is respected

* turn off check for missing messages

* fix recurring ping

* add useful logs

* skip tests

* remove comment

* feat: check filter subscriptions against lightPush (#2185)
This commit is contained in:
Sasha 2024-10-17 01:01:21 +02:00 committed by GitHub
parent 4049123f14
commit ded994f8ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 219 additions and 147 deletions

View File

@ -20,7 +20,7 @@ export type SubscriptionCallback<T extends IDecodedMessage> = {
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
maxMissedMessagesThreshold?: number;
enableLightPushFilterCheck?: boolean;
};
export interface ISubscription {

View File

@ -2,7 +2,7 @@ import { sha256 } from "@noble/hashes/sha256";
import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import {
bytesToUtf8,
bytesToHex,
concat,
numberToBytes,
utf8ToBytes
@ -56,6 +56,6 @@ export function messageHashStr(
message: IProtoMessage | IDecodedMessage
): string {
const hash = messageHash(pubsubTopic, message);
const hashStr = bytesToUtf8(hash);
const hashStr = bytesToHex(hash);
return hashStr;
}

View File

@ -1,5 +1,8 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;
export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
keepAlive: DEFAULT_KEEP_ALIVE,
enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK
};

View File

@ -6,6 +6,7 @@ import {
type IDecodedMessage,
type IDecoder,
type IFilter,
type ILightPush,
type Libp2p,
NetworkConfig,
type ProtocolCreateOptions,
@ -38,7 +39,8 @@ class Filter extends BaseProtocolSDK implements IFilter {
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private libp2p: Libp2p,
private lightPush?: ILightPush,
options?: ProtocolCreateOptions
) {
super(
@ -195,7 +197,9 @@ class Filter extends BaseProtocolSDK implements IFilter {
this.protocol,
this.connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
this.renewPeer.bind(this),
this.libp2p,
this.lightPush
)
);
@ -300,7 +304,9 @@ class Filter extends BaseProtocolSDK implements IFilter {
export function wakuFilter(
connectionManager: ConnectionManager,
lightPush?: ILightPush,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(connectionManager, libp2p, init);
return (libp2p: Libp2p) =>
new Filter(connectionManager, libp2p, lightPush, init);
}

View File

@ -1,6 +1,12 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
ConnectionManager,
createDecoder,
createEncoder,
FilterCore,
LightPushCore
} from "@waku/core";
import {
type Callback,
type ContentTopic,
@ -8,8 +14,10 @@ import {
EConnectionStateEvents,
type IDecodedMessage,
type IDecoder,
type ILightPush,
type IProtoMessage,
type ISubscription,
type Libp2p,
type PeerIdStr,
ProtocolError,
type PubsubTopic,
@ -23,14 +31,23 @@ import { groupByContentTopic, Logger } from "@waku/utils";
import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";
import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import {
DEFAULT_KEEP_ALIVE,
DEFAULT_LIGHT_PUSH_FILTER_CHECK,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL,
DEFAULT_SUBSCRIBE_OPTIONS
} from "./constants.js";
const log = new Logger("sdk:filter:subscription_manager");
export class SubscriptionManager implements ISubscription {
private reliabilityMonitor: ReceiverReliabilityMonitor;
private keepAliveTimer: number | null = null;
private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE;
private keepAliveInterval: ReturnType<typeof setInterval> | null = null;
private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK;
private subscriptionCallbacks: Map<
ContentTopic,
SubscriptionCallback<IDecodedMessage>
@ -43,7 +60,9 @@ export class SubscriptionManager implements ISubscription {
private readonly getPeers: () => Peer[],
private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>
) => Promise<Peer | undefined>,
private readonly libp2p: Libp2p,
private readonly lightPush?: ILightPush
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
@ -54,7 +73,8 @@ export class SubscriptionManager implements ISubscription {
this.renewPeer.bind(this),
() => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol)
this.protocol.addLibp2pEventListener.bind(this.protocol),
this.sendLightPushCheckMessage.bind(this)
);
}
@ -63,11 +83,10 @@ export class SubscriptionManager implements ISubscription {
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.reliabilityMonitor.setMaxMissedMessagesThreshold(
options.maxMissedMessagesThreshold
);
this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed);
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE;
this.enableLightPushFilterCheck =
options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK;
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
@ -85,11 +104,20 @@ export class SubscriptionManager implements ISubscription {
}
}
if (this.enableLightPushFilterCheck) {
decodersArray.push(
createDecoder(
this.buildLightPushContentTopic(),
this.pubsubTopic
) as IDecoder<T>
);
}
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
this.subscribeWithPeerVerification(peer, contentTopics)
);
const results = await Promise.allSettled(promises);
@ -107,12 +135,17 @@ export class SubscriptionManager implements ISubscription {
callback
} as unknown as SubscriptionCallback<IDecodedMessage>;
// don't handle case of internal content topic
if (contentTopic === this.buildLightPushContentTopic()) {
return;
}
// The callback and decoder may override previous values, this is on
// purpose as the user may call `subscribe` to refresh the subscription
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
this.startSubscriptionsMaintenance(this.keepAliveTimer);
this.startSubscriptionsMaintenance(this.keepAliveTimeout);
return finalResult;
}
@ -174,10 +207,9 @@ export class SubscriptionManager implements ISubscription {
message: WakuMessage,
peerIdStr: PeerIdStr
): Promise<void> {
const alreadyReceived = this.reliabilityMonitor.processIncomingMessage(
message,
this.pubsubTopic,
peerIdStr
const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived(
peerIdStr,
message as IProtoMessage
);
if (alreadyReceived) {
@ -200,6 +232,19 @@ export class SubscriptionManager implements ISubscription {
await pushMessage(subscriptionCallback, this.pubsubTopic, message);
}
private async subscribeWithPeerVerification(
peer: Peer,
contentTopics: string[]
): Promise<CoreProtocolResult> {
const result = await this.protocol.subscribe(
this.pubsubTopic,
peer,
contentTopics
);
await this.sendLightPushCheckMessage(peer);
return result;
}
private handleResult(
results: PromiseSettledResult<CoreProtocolResult>[],
type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll"
@ -240,23 +285,26 @@ export class SubscriptionManager implements ISubscription {
let result;
try {
result = await this.protocol.ping(peer);
return result;
} catch (error) {
return {
result = {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
} finally {
void this.reliabilityMonitor.handlePingResult(peerId, result);
}
}
private startSubscriptionsMaintenance(interval: number): void {
log.info(
`Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}`
);
await this.reliabilityMonitor.handlePingResult(peerId, result);
return result;
}
private startSubscriptionsMaintenance(timeout: number): void {
log.info("Starting subscriptions maintenance");
this.startKeepAlivePings(interval);
this.startKeepAlivePings(timeout);
this.startConnectionListener();
}
@ -295,31 +343,69 @@ export class SubscriptionManager implements ISubscription {
log.error(`networkStateListener failed to recover: ${err}`);
}
this.startKeepAlivePings(this.keepAliveTimer || DEFAULT_KEEP_ALIVE);
this.startKeepAlivePings(this.keepAliveTimeout);
}
private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
private startKeepAlivePings(timeout: number): void {
if (this.keepAliveInterval) {
log.info("Recurring pings already set up.");
return;
}
this.keepAliveTimer = setInterval(() => {
void this.ping()
.then(() => log.info("Keep-alive ping successful"))
.catch((error) => log.error("Error in keep-alive ping cycle:", error));
}, interval) as unknown as number;
this.keepAliveInterval = setInterval(() => {
void this.ping();
}, timeout);
}
private stopKeepAlivePings(): void {
if (!this.keepAliveTimer) {
if (!this.keepAliveInterval) {
log.info("Already stopped recurring pings.");
return;
}
log.info("Stopping recurring pings.");
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = null;
}
private async sendLightPushCheckMessage(peer: Peer): Promise<void> {
if (
this.lightPush &&
this.libp2p &&
this.reliabilityMonitor.shouldVerifyPeer(peer.id)
) {
const encoder = createEncoder({
contentTopic: this.buildLightPushContentTopic(),
pubsubTopic: this.pubsubTopic,
ephemeral: true
});
const message = { payload: new Uint8Array(1) };
const protoMessage = await encoder.toProtoObj(message);
// make a delay to be sure message is send when subscription is in place
setTimeout(
(async () => {
const result = await (this.lightPush!.protocol as LightPushCore).send(
encoder,
message,
peer
);
this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage);
if (result.failure) {
log.error(
`failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}`
);
return;
}
}) as () => void,
DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL
);
}
}
private buildLightPushContentTopic(): string {
return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`;
}
}

View File

@ -24,7 +24,8 @@ export class ReliabilityMonitorManager {
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>,
addLibp2pEventListener: Libp2p["addEventListener"]
addLibp2pEventListener: Libp2p["addEventListener"],
sendLightPushMessage: (peer: Peer) => Promise<void>
): ReceiverReliabilityMonitor {
if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) {
return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!;
@ -36,7 +37,8 @@ export class ReliabilityMonitorManager {
renewPeer,
getContentTopics,
protocolSubscribe,
addLibp2pEventListener
addLibp2pEventListener,
sendLightPushMessage
);
ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor);
return monitor;
@ -50,7 +52,6 @@ export class ReliabilityMonitorManager {
public static stopAll(): void {
for (const [pubsubTopic, monitor] of this.receiverMonitors) {
monitor.setMaxMissedMessagesThreshold(undefined);
monitor.setMaxPingFailures(undefined);
this.receiverMonitors.delete(pubsubTopic);
}

View File

@ -8,24 +8,20 @@ import {
PubsubTopic
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
type ReceivedMessageHashes = {
all: Set<string>;
nodes: Record<PeerIdStr, Set<string>>;
};
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
import { bytesToUtf8 } from "@waku/utils/bytes";
const log = new Logger("sdk:receiver:reliability_monitor");
const DEFAULT_MAX_PINGS = 3;
const MESSAGE_VERIFICATION_DELAY = 5_000;
export class ReceiverReliabilityMonitor {
private receivedMessagesHashes: ReceivedMessageHashes;
private missedMessagesByPeer: Map<string, number> = new Map();
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
private receivedMessagesFormPeer = new Set<string>();
private receivedMessages = new Set<string>();
private scheduledVerification = new Map<string, number>();
private verifiedPeers = new Set<string>();
private peerFailures: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private peerRenewalLocks: Set<PeerIdStr> = new Set();
@ -40,18 +36,9 @@ export class ReceiverReliabilityMonitor {
peer: Peer,
contentTopics: ContentTopic[]
) => Promise<CoreProtocolResult>,
private addLibp2pEventListener: Libp2p["addEventListener"]
private addLibp2pEventListener: Libp2p["addEventListener"],
private sendLightPushMessage: (peer: Peer) => Promise<void>
) {
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
this.receivedMessagesHashes = {
all: new Set(),
nodes: {
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
}
};
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
this.addLibp2pEventListener("peer:disconnect", (evt) => {
const peerId = evt.detail;
if (this.getPeers().some((p) => p.id.equals(peerId))) {
@ -60,13 +47,6 @@ export class ReceiverReliabilityMonitor {
});
}
public setMaxMissedMessagesThreshold(value: number | undefined): void {
if (value === undefined) {
return;
}
this.maxMissedMessagesThreshold = value;
}
public setMaxPingFailures(value: number | undefined): void {
if (value === undefined) {
return;
@ -88,6 +68,9 @@ export class ReceiverReliabilityMonitor {
if (failures >= this.maxPingFailures) {
try {
log.info(
`Attempting to renew ${peerId.toString()} due to ping failures.`
);
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
@ -96,77 +79,79 @@ export class ReceiverReliabilityMonitor {
}
}
public processIncomingMessage(
message: WakuMessage,
pubsubTopic: PubsubTopic,
peerIdStr?: string
public notifyMessageReceived(
peerIdStr: string,
message: IProtoMessage
): boolean {
const alreadyReceived = this.addMessageToCache(
message,
pubsubTopic,
peerIdStr
);
void this.checkAndRenewPeers();
return alreadyReceived;
}
const hash = this.buildMessageHash(message);
private addMessageToCache(
message: WakuMessage,
pubsubTopic: PubsubTopic,
peerIdStr?: string
): boolean {
const hashedMessageStr = messageHashStr(
pubsubTopic,
message as IProtoMessage
);
this.verifiedPeers.add(peerIdStr);
this.receivedMessagesFormPeer.add(`${peerIdStr}-${hash}`);
const alreadyReceived =
this.receivedMessagesHashes.all.has(hashedMessageStr);
this.receivedMessagesHashes.all.add(hashedMessageStr);
if (peerIdStr) {
const hashesForPeer = this.receivedMessagesHashes.nodes[peerIdStr];
if (!hashesForPeer) {
log.warn(
`Peer ${peerIdStr} not initialized in receivedMessagesHashes.nodes, adding it.`
);
this.receivedMessagesHashes.nodes[peerIdStr] = new Set();
}
this.receivedMessagesHashes.nodes[peerIdStr].add(hashedMessageStr);
}
return alreadyReceived;
}
private async checkAndRenewPeers(): Promise<void> {
for (const hash of this.receivedMessagesHashes.all) {
for (const [peerIdStr, hashes] of Object.entries(
this.receivedMessagesHashes.nodes
)) {
if (!hashes.has(hash)) {
this.incrementMissedMessageCount(peerIdStr);
if (this.shouldRenewPeer(peerIdStr)) {
log.info(
`Peer ${peerIdStr} has missed too many messages, renewing.`
`notifyMessage received debug: ephemeral:${message.ephemeral}\t${bytesToUtf8(message.payload)}`
);
const peerId = this.getPeers().find(
(p) => p.id.toString() === peerIdStr
)?.id;
if (!peerId) {
log.error(
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
);
continue;
log.info(`notifyMessage received: peer:${peerIdStr}\tmessage:${hash}`);
if (this.receivedMessages.has(hash)) {
return true;
}
try {
this.receivedMessages.add(hash);
return false;
}
public notifyMessageSent(peerId: PeerId, message: IProtoMessage): void {
const peerIdStr = peerId.toString();
const hash = this.buildMessageHash(message);
log.info(`notifyMessage sent debug: ${bytesToUtf8(message.payload)}`);
if (this.scheduledVerification.has(peerIdStr)) {
log.warn(
`notifyMessage sent: attempting to schedule verification for pending peer:${peerIdStr}\tmessage:${hash}`
);
return;
}
const timeout = window.setTimeout(
(async () => {
const receivedAnyMessage = this.verifiedPeers.has(peerIdStr);
const receivedTestMessage = this.receivedMessagesFormPeer.has(
`${peerIdStr}-${hash}`
);
if (receivedAnyMessage || receivedTestMessage) {
log.info(
`notifyMessage sent setTimeout: verified that peer pushes filter messages, peer:${peerIdStr}\tmessage:${hash}`
);
return;
}
log.warn(
`notifyMessage sent setTimeout: peer didn't return probe message, attempting renewAndSubscribe, peer:${peerIdStr}\tmessage:${hash}`
);
this.scheduledVerification.delete(peerIdStr);
await this.renewAndSubscribePeer(peerId);
} catch (error) {
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
}
}
}
}) as () => void,
MESSAGE_VERIFICATION_DELAY
);
this.scheduledVerification.set(peerIdStr, timeout);
}
public shouldVerifyPeer(peerId: PeerId): boolean {
const peerIdStr = peerId.toString();
const isPeerVerified = this.verifiedPeers.has(peerIdStr);
const isVerificationPending = this.scheduledVerification.has(peerIdStr);
return !(isPeerVerified || isVerificationPending);
}
private buildMessageHash(message: IProtoMessage): string {
return messageHashStr(this.pubsubTopic, message);
}
private async renewAndSubscribePeer(
@ -193,12 +178,9 @@ export class ReceiverReliabilityMonitor {
this.getContentTopics()
);
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
await this.sendLightPushMessage(newPeer);
this.peerFailures.delete(peerIdStr);
this.missedMessagesByPeer.delete(peerIdStr);
delete this.receivedMessagesHashes.nodes[peerIdStr];
return newPeer;
} catch (error) {
@ -208,14 +190,4 @@ export class ReceiverReliabilityMonitor {
this.peerRenewalLocks.delete(peerIdStr);
}
}
private incrementMissedMessageCount(peerIdStr: string): void {
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
}
private shouldRenewPeer(peerIdStr: string): boolean {
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
return missedMessages > this.maxMissedMessagesThreshold;
}
}

View File

@ -116,7 +116,11 @@ export class WakuNode implements IWaku {
}
if (protocolsEnabled.filter) {
const filter = wakuFilter(this.connectionManager, options);
const filter = wakuFilter(
this.connectionManager,
this.lightPush,
options
);
this.filter = filter(libp2p);
}