feat(filter): peer/subscription renewal with recurring Filter pings (#2052)

* chore: renewPeer() returns the new found peer

* feat: ping & peer renewal

* chore: add tests

* fix: tests

* chore: remove only

* chore: remove comments

* chore(tests): decrease timeout

* chore: add array index validation

* chore: remove only

* chore: move defaults into a separate variable

* chore: update lightpush with new API

* chore: include peer renewals within `ping` instead of `interval`

* chore: update tests

* chore: add new test

* chore: address comments
This commit is contained in:
Danish Arora 2024-07-10 15:34:16 +05:30 committed by GitHub
parent 68590f0a3d
commit 318667e442
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 246 additions and 69 deletions

View File

@ -1,3 +1,5 @@
import type { PeerId } from "@libp2p/interface";
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
import type {
@ -13,6 +15,7 @@ import type { IReceiver } from "./receiver.js";
export type SubscribeOptions = {
keepAlive?: number;
pingsBeforePeerRenewed?: number;
};
export type IFilter = IReceiver & IBaseProtocolCore;
@ -26,7 +29,7 @@ export interface ISubscriptionSDK {
unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
ping(): Promise<SDKProtocolResult>;
ping(peerId?: PeerId): Promise<SDKProtocolResult>;
unsubscribeAll(): Promise<SDKProtocolResult>;
}

View File

@ -25,7 +25,7 @@ export type IBaseProtocolCore = {
};
export type IBaseProtocolSDK = {
renewPeer: (peerToDisconnect: PeerId) => Promise<void>;
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly connectedPeers: Peer[];
readonly numPeersToUse: number;
};

View File

@ -42,22 +42,24 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
/**
* Disconnects from a peer and tries to find a new one to replace it.
* @param peerToDisconnect The peer to disconnect from.
* @returns The new peer that was found and connected to.
*/
public async renewPeer(peerToDisconnect: PeerId): Promise<void> {
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
this.log.info(`Renewing peer ${peerToDisconnect}`);
try {
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
await this.connectionManager.dropConnection(peerToDisconnect);
this.peers = this.peers.filter((peer) => peer.id !== peerToDisconnect);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
await this.findAndAddPeers(1);
} catch (error) {
this.log.info(
"Peer renewal failed, relying on the interval to find a new peer"
const peer = (await this.findAndAddPeers(1))[0];
if (!peer) {
throw new Error(
"Failed to find a new peer to replace the disconnected one"
);
}
return peer;
}
/**
@ -171,7 +173,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* Finds and adds new peers to the peers list.
* @param numPeers The number of peers to find and add.
*/
private async findAndAddPeers(numPeers: number): Promise<void> {
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
this.log.info(`Finding and adding ${numPeers} new peers`);
try {
const additionalPeers = await this.findAdditionalPeers(numPeers);
@ -179,6 +181,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
return additionalPeers;
} catch (error) {
this.log.error("Error finding and adding new peers:", error);
throw error;
@ -197,20 +200,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
try {
let newPeers = await this.core.getPeers({
maxBootstrapPeers: 0,
numPeers: numPeers
numPeers: 0
});
if (newPeers.length === 0) {
this.log.warn("No new peers found, trying with bootstrap peers");
newPeers = await this.core.getPeers({
maxBootstrapPeers: numPeers,
numPeers: numPeers
numPeers: 0
});
}
newPeers = newPeers.filter(
(peer) => this.peers.some((p) => p.id === peer.id) === false
);
newPeers = newPeers
.filter((peer) => this.peers.some((p) => p.id === peer.id) === false)
.slice(0, numPeers);
return newPeers;
} catch (error) {
this.log.error("Error finding additional peers:", error);

View File

@ -1,4 +1,5 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
@ -41,6 +42,8 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
const log = new Logger("sdk:filter");
const MINUTE = 60 * 1000;
const DEFAULT_MAX_PINGS = 3;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE
};
@ -48,6 +51,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
private peerFailures: Map<string, number> = new Map();
private maxPingFailures: number = DEFAULT_MAX_PINGS;
private subscriptionCallbacks: Map<
ContentTopic,
@ -56,18 +61,21 @@ export class SubscriptionManager implements ISubscriptionSDK {
constructor(
pubsubTopic: PubsubTopic,
private peers: Peer[],
private protocol: FilterCore
private protocol: FilterCore,
private getPeers: () => Peer[],
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
) {
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
}
async subscribe<T extends IDecodedMessage>(
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
// check that all decoders are configured for the same pubsub topic as this subscription
@ -87,7 +95,7 @@ export class SubscriptionManager implements ISubscriptionSDK {
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());
const promises = this.peers.map(async (peer) =>
const promises = this.getPeers().map(async (peer) =>
this.protocol.subscribe(this.pubsubTopic, peer, contentTopics)
);
@ -111,15 +119,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});
if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
if (options.keepAlive) {
this.startKeepAlivePings(options);
}
return finalResult;
}
async unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => {
public async unsubscribe(
contentTopics: ContentTopic[]
): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) => {
const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
@ -143,16 +153,17 @@ export class SubscriptionManager implements ISubscriptionSDK {
return finalResult;
}
async ping(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) => this.protocol.ping(peer));
public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);
const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
const results = await Promise.allSettled(promises);
return this.handleResult(results, "ping");
}
async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.peers.map(async (peer) =>
public async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) =>
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
);
@ -217,31 +228,78 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
}
// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
return result;
}
private startKeepAlivePings(interval: number): void {
private async pingSpecificPeer(peerId: PeerId): Promise<CoreProtocolResult> {
const peer = this.getPeers().find((p) => p.id.equals(peerId));
if (!peer) {
return {
success: null,
failure: {
peerId,
error: ProtocolError.NO_PEER_AVAILABLE
}
};
}
try {
const result = await this.protocol.ping(peer);
if (result.failure) {
await this.handlePeerFailure(peerId);
} else {
this.peerFailures.delete(peerId.toString());
}
return result;
} catch (error) {
await this.handlePeerFailure(peerId);
return {
success: null,
failure: {
peerId,
error: ProtocolError.GENERIC_FAIL
}
};
}
}
private async handlePeerFailure(peerId: PeerId): Promise<void> {
const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1;
this.peerFailures.set(peerId.toString(), failures);
if (failures > this.maxPingFailures) {
try {
await this.renewAndSubscribePeer(peerId);
this.peerFailures.delete(peerId.toString());
} catch (error) {
log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`);
}
}
}
private async renewAndSubscribePeer(peerId: PeerId): Promise<Peer> {
const newPeer = await this.renewPeer(peerId);
await this.protocol.subscribe(
this.pubsubTopic,
newPeer,
Array.from(this.subscriptionCallbacks.keys())
);
return newPeer;
}
private startKeepAlivePings(options: SubscribeOptions): void {
const { keepAlive } = options;
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
}
this.keepAliveTimer = setInterval(() => {
const run = async (): Promise<void> => {
try {
log.info("Recurring ping to peers.");
await this.ping();
} catch (error) {
log.error("Stopping recurring pings due to failure", error);
this.stopKeepAlivePings();
}
};
void run();
}, interval) as unknown as number;
void this.ping().catch((error) => {
log.error("Error in keep-alive ping cycle:", error);
});
}, keepAlive) as unknown as number;
}
private stopKeepAlivePings(): void {
@ -345,7 +403,12 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol)
new SubscriptionManager(
pubsubTopic,
this.protocol,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
);
return {

View File

@ -86,7 +86,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
}
if (failure) {
if (failure.peerId) {
await this.renewPeer(failure.peerId);
try {
await this.renewPeer(failure.peerId);
log.info("Renewed peer", failure.peerId.toString());
} catch (error) {
log.error("Failed to renew peer", error);
}
}
failures.push(failure);

View File

@ -1,10 +1,15 @@
import { DefaultPubsubTopic, LightNode } from "@waku/interfaces";
import {
DefaultPubsubTopic,
ISubscriptionSDK,
LightNode
} from "@waku/interfaces";
import {
createDecoder,
createEncoder,
DecodedMessage,
utf8ToBytes
} from "@waku/sdk";
import { delay } from "@waku/utils";
import { expect } from "chai";
import { describe } from "mocha";
@ -18,25 +23,11 @@ import {
teardownNodesWithRedundancy
} from "../filter/utils.js";
//TODO: add unit tests,
describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
5
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
let subscription: ISubscriptionSDK;
const pubsubTopic = DefaultPubsubTopic;
const contentTopic = "/test";
@ -48,13 +39,26 @@ describe("Waku Filter: Peer Management: E2E", function () {
const decoder = createDecoder(contentTopic, pubsubTopic);
it("Number of peers are maintained correctly", async function () {
const { error, subscription } =
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
5
);
const { error, subscription: sub } =
await waku.filter.createSubscription(pubsubTopic);
if (!subscription || error) {
expect.fail("Could not create subscription");
if (!sub || error) {
throw new Error("Could not create subscription");
}
subscription = sub;
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
it("Number of peers are maintained correctly", async function () {
const messages: DecodedMessage[] = [];
const { failures, successes } = await subscription.subscribe(
[decoder],
@ -74,4 +78,103 @@ describe("Waku Filter: Peer Management: E2E", function () {
expect(failures.length).to.equal(0);
}
});
it("Ping succeeds for all connected peers", async function () {
await subscription.subscribe([decoder], () => {});
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
});
it("Ping fails for unsubscribed peers", async function () {
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(0);
expect(pingResult.failures.length).to.be.greaterThan(0);
});
it("Keep-alive pings maintain the connection", async function () {
await subscription.subscribe([decoder], () => {}, { keepAlive: 100 });
await delay(1000);
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
});
it("Renews peer on consistent ping failures", async function () {
const maxPingFailures = 3;
await subscription.subscribe([decoder], () => {}, {
pingsBeforePeerRenewed: maxPingFailures
});
const disconnectedNodePeerId = waku.filter.connectedPeers[0].id;
await waku.connectionManager.dropConnection(disconnectedNodePeerId);
// Ping multiple times to exceed max failures
for (let i = 0; i <= maxPingFailures; i++) {
await subscription.ping();
await delay(100);
}
const pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
expect(pingResult.failures.length).to.equal(0);
expect(waku.filter.connectedPeers.length).to.equal(
waku.filter.numPeersToUse
);
expect(
waku.filter.connectedPeers.some((peer) =>
peer.id.equals(disconnectedNodePeerId)
)
).to.eq(false);
});
it("Tracks peer failures correctly", async function () {
const maxPingFailures = 3;
await subscription.subscribe([decoder], () => {}, {
pingsBeforePeerRenewed: maxPingFailures
});
const targetPeer = waku.filter.connectedPeers[0];
await waku.connectionManager.dropConnection(targetPeer.id);
for (let i = 0; i < maxPingFailures; i++) {
await subscription.ping(targetPeer.id);
}
// At this point, the peer should not be renewed yet
expect(
waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id))
).to.be.true;
// One more failure should trigger renewal
await subscription.ping(targetPeer.id);
expect(
waku.filter.connectedPeers.some((peer) => peer.id.equals(targetPeer.id))
).to.be.false;
expect(waku.filter.connectedPeers.length).to.equal(
waku.filter.numPeersToUse
);
});
it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () {
for (let i = 0; i < 3; i++) {
await subscription.subscribe([decoder], () => {});
let pingResult = await subscription.ping();
expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
await subscription.unsubscribe([contentTopic]);
pingResult = await subscription.ping();
expect(pingResult.failures.length).to.be.greaterThan(0);
}
await subscription.subscribe([decoder], () => {});
const finalPingResult = await subscription.ping();
expect(finalPingResult.successes.length).to.equal(
waku.filter.numPeersToUse
);
});
});