feat(filter): use protocol peer management (#2047)

* feat: leverage protocol peer management

* chore: add test

* chore: address comments

* chore: add todo
This commit is contained in:
Danish Arora 2024-07-03 12:09:34 +05:30 committed by GitHub
parent 42126a6301
commit 4db508b962
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 128 additions and 53 deletions

View File

@ -5,6 +5,7 @@ import type {
IBaseProtocolCore,
IBaseProtocolSDK,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult,
ShardingParams
} from "./protocols.js";
@ -34,7 +35,7 @@ export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: SubscribeOptions
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult>;
};

View File

@ -43,6 +43,33 @@ export type ApplicationInfo = {
export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;
//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
/**
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};
export type ProtocolCreateOptions = {
/**
* @deprecated

View File

@ -1,36 +1,10 @@
import type { IEncoder, IMessage } from "./message.js";
import { SDKProtocolResult } from "./protocols.js";
import { ProtocolUseOptions, SDKProtocolResult } from "./protocols.js";
export interface ISender {
send: (
encoder: IEncoder,
message: IMessage,
sendOptions?: SendOptions
sendOptions?: ProtocolUseOptions
) => Promise<SDKProtocolResult>;
}
/**
* Options for using LightPush
*/
export type SendOptions = {
/**
* Optional flag to enable auto-retry with exponential backoff
*/
autoRetry?: boolean;
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
/**
* Optional initial delay in milliseconds for exponential backoff
*/
initialDelay?: number;
/**
* Optional maximum delay in milliseconds for exponential backoff
*/
maxDelay?: number;
};

View File

@ -1,7 +1,7 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, SendOptions } from "@waku/interfaces";
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
interface Options {
@ -86,7 +86,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
*/
protected hasPeers = async (
options: Partial<SendOptions> = {}
options: Partial<ProtocolUseOptions> = {}
): Promise<boolean> => {
const {
autoRetry = false,

View File

@ -14,6 +14,7 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
ProtocolUseOptions,
type PubsubTopic,
SDKProtocolResult,
type ShardingParams,
@ -45,7 +46,6 @@ const DEFAULT_SUBSCRIBE_OPTIONS = {
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;
@ -56,10 +56,9 @@ export class SubscriptionManager implements ISubscriptionSDK {
constructor(
pubsubTopic: PubsubTopic,
remotePeers: Peer[],
private peers: Peer[],
private protocol: FilterCore
) {
this.peers = remotePeers;
this.pubsubTopic = pubsubTopic;
this.subscriptionCallbacks = new Map();
}
@ -314,8 +313,14 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @returns The subscription object.
*/
async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic
pubsubTopicShardInfo: ShardingParams | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
@ -323,17 +328,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
let peers: Peer[] = [];
try {
peers = await this.protocol.getPeers();
} catch (error) {
log.error("Error getting peers to initiate subscription: ", error);
return {
error: ProtocolError.GENERIC_FAIL,
subscription: null
};
}
if (peers.length === 0) {
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
@ -341,15 +337,15 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
}
log.info(
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(pubsubTopic, peers, this.protocol)
new SubscriptionManager(pubsubTopic, this.connectedPeers, this.protocol)
);
return {

View File

@ -8,8 +8,8 @@ import {
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
SDKProtocolResult,
SendOptions
ProtocolUseOptions,
SDKProtocolResult
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
@ -35,12 +35,12 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
async send(
encoder: IEncoder,
message: IMessage,
_options?: SendOptions
_options?: ProtocolUseOptions
): Promise<SDKProtocolResult> {
const options = {
autoRetry: true,
..._options
} as SendOptions;
} as ProtocolUseOptions;
const successes: PeerId[] = [];
const failures: Failure[] = [];

View File

@ -0,0 +1,77 @@
import { DefaultPubsubTopic, LightNode } from "@waku/interfaces";
import {
createDecoder,
createEncoder,
DecodedMessage,
utf8ToBytes
} from "@waku/sdk";
import { expect } from "chai";
import { describe } from "mocha";
import {
afterEachCustom,
beforeEachCustom,
ServiceNodesFleet
} from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy
} from "../filter/utils.js";
//TODO: add unit tests,
describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
undefined,
undefined,
5
);
});
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
const pubsubTopic = DefaultPubsubTopic;
const contentTopic = "/test";
const encoder = createEncoder({
pubsubTopic,
contentTopic
});
const decoder = createDecoder(contentTopic, pubsubTopic);
it("Number of peers are maintained correctly", async function () {
const { error, subscription } =
await waku.filter.createSubscription(pubsubTopic);
if (!subscription || error) {
expect.fail("Could not create subscription");
}
const messages: DecodedMessage[] = [];
const { failures, successes } = await subscription.subscribe(
[decoder],
(msg) => {
messages.push(msg);
}
);
await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello_World")
});
expect(successes.length).to.be.greaterThan(0);
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);
if (failures) {
expect(failures.length).to.equal(0);
}
});
});