chore: refactor sub dir paths

This commit is contained in:
Danish Arora 2024-08-20 19:07:07 +05:30
parent 1de1fd420e
commit f0eb244804
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
9 changed files with 360 additions and 310 deletions

33
package-lock.json generated
View File

@ -5957,6 +5957,7 @@
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/@npmcli/fs/-/fs-3.1.1.tgz",
"integrity": "sha512-q9CRWjpHCMIh5sVyefoD1cA7PkvILqCZsnSOEUUivORLjxCO/Irmue2DprETiNgEqktDBZaM1Bi+jrarx1XdCg==",
"dev": true,
"license": "ISC",
"dependencies": {
"semver": "^7.3.5"
@ -12697,6 +12698,7 @@
"version": "18.0.4",
"resolved": "https://registry.npmjs.org/cacache/-/cacache-18.0.4.tgz",
"integrity": "sha512-B+L5iIa9mgcjLbliir2th36yEwPftrzteHYujzsx3dFP/31GCHcIeS8f5MGd80odLOjaOvSpU3EEAmRQptkxLQ==",
"dev": true,
"license": "ISC",
"dependencies": {
"@npmcli/fs": "^3.1.0",
@ -12720,6 +12722,7 @@
"version": "10.4.5",
"resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz",
"integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==",
"dev": true,
"license": "ISC",
"dependencies": {
"foreground-child": "^3.1.0",
@ -12740,12 +12743,14 @@
"version": "10.4.3",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz",
"integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==",
"dev": true,
"license": "ISC"
},
"node_modules/cacache/node_modules/minimatch": {
"version": "9.0.5",
"resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.5.tgz",
"integrity": "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow==",
"dev": true,
"license": "ISC",
"dependencies": {
"brace-expansion": "^2.0.1"
@ -12761,6 +12766,7 @@
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/p-map/-/p-map-4.0.0.tgz",
"integrity": "sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==",
"dev": true,
"license": "MIT",
"dependencies": {
"aggregate-error": "^3.0.0"
@ -13198,6 +13204,7 @@
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/chownr/-/chownr-2.0.0.tgz",
"integrity": "sha512-bIomtDF5KGpdogkLd9VspvFzk9KfpyyGlS8YFVZl7TGPBHL5snIOnxeshwVgPteQ9b4Eydl+pVbIyE1DcvCWgQ==",
"dev": true,
"license": "ISC",
"engines": {
"node": ">=10"
@ -18432,6 +18439,7 @@
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-3.0.3.tgz",
"integrity": "sha512-XUBA9XClHbnJWSfBzjkm6RvPsyg3sryZt06BEQoXcF7EK/xpGaQYJgQKDJSUH5SGZ76Y7pFx1QBnXz09rU5Fbw==",
"dev": true,
"license": "ISC",
"dependencies": {
"minipass": "^7.0.3"
@ -19598,6 +19606,7 @@
"version": "4.1.1",
"resolved": "https://registry.npmjs.org/ini/-/ini-4.1.1.tgz",
"integrity": "sha512-QQnnxNyfvmHFIsj7gkPcYymR8Jdw/o7mp5ZFihxn6h8Ci6fh3Dx4E1gPjpQEpIuPo9XVNY/ZUwh4BPMjGyL01g==",
"dev": true,
"license": "ISC",
"engines": {
"node": "^14.17.0 || ^16.13.0 || >=18.0.0"
@ -24978,6 +24987,7 @@
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/minipass-collect/-/minipass-collect-2.0.1.tgz",
"integrity": "sha512-D7V8PO9oaz7PWGLbCACuI1qEOsq7UKfLotx/C0Aet43fCUB/wfQ7DYeq2oR/svFJGYDHPr38SHATeaj/ZoKHKw==",
"dev": true,
"license": "ISC",
"dependencies": {
"minipass": "^7.0.3"
@ -24990,6 +25000,7 @@
"version": "1.0.5",
"resolved": "https://registry.npmjs.org/minipass-flush/-/minipass-flush-1.0.5.tgz",
"integrity": "sha512-JmQSYYpPUqX5Jyn1mXaRwOda1uQ8HP5KAT/oDSLCzt1BYRhQU0/hDtsB1ufZfEEzMZ9aAVmsBw8+FWsIXlClWw==",
"dev": true,
"license": "ISC",
"dependencies": {
"minipass": "^3.0.0"
@ -25002,6 +25013,7 @@
"version": "3.3.6",
"resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz",
"integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
@ -25014,6 +25026,7 @@
"version": "1.2.4",
"resolved": "https://registry.npmjs.org/minipass-pipeline/-/minipass-pipeline-1.2.4.tgz",
"integrity": "sha512-xuIq7cIOt09RPRJ19gdi4b+RiNvDFYe5JH+ggNvBqGqpQXcru3PcRmOZuHBKWK1Txf9+cQ+HMVN4d6z46LZP7A==",
"dev": true,
"license": "ISC",
"dependencies": {
"minipass": "^3.0.0"
@ -25026,6 +25039,7 @@
"version": "3.3.6",
"resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz",
"integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
@ -25038,6 +25052,7 @@
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/minizlib/-/minizlib-2.1.2.tgz",
"integrity": "sha512-bAxsR8BVfj60DWXHE3u30oHzfl4G7khkSuPW+qvpd7jFRHm7dLxOjUk1EHACJ/hxLY8phGJ0YhYHZo7jil7Qdg==",
"dev": true,
"license": "MIT",
"dependencies": {
"minipass": "^3.0.0",
@ -25051,6 +25066,7 @@
"version": "3.3.6",
"resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz",
"integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
@ -25852,6 +25868,7 @@
"version": "7.0.0",
"resolved": "https://registry.npmjs.org/npm-package-arg/-/npm-package-arg-7.0.0.tgz",
"integrity": "sha512-xXxr8y5U0kl8dVkz2oK7yZjPBvqM2fwaO5l3Yg13p03v8+E3qQcD0JNhHzjL1vyGgxcKkD0cco+NLR72iuPk3g==",
"dev": true,
"license": "ISC",
"dependencies": {
"hosted-git-info": "^3.0.2",
@ -25864,12 +25881,14 @@
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/builtins/-/builtins-1.0.3.tgz",
"integrity": "sha512-uYBjakWipfaO/bXI7E8rq6kpwHRZK5cNYrUv2OzZSI/FvmdMyXJ2tG9dKcjEC5YHmHpUAwsargWIZNWdxb/bnQ==",
"dev": true,
"license": "MIT"
},
"node_modules/npm-package-arg/node_modules/hosted-git-info": {
"version": "3.0.8",
"resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-3.0.8.tgz",
"integrity": "sha512-aXpmwoOhRBrw6X3j0h5RloK4x1OzsxMPyxqIHyNfSe2pypkVTZFpEiRoSipPEPlMrh0HW/XsjkJ5WgnCirpNUw==",
"dev": true,
"license": "ISC",
"dependencies": {
"lru-cache": "^6.0.0"
@ -25882,6 +25901,7 @@
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz",
"integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
@ -25894,6 +25914,7 @@
"version": "5.7.2",
"resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz",
"integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==",
"dev": true,
"license": "ISC",
"bin": {
"semver": "bin/semver"
@ -25903,6 +25924,7 @@
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/validate-npm-package-name/-/validate-npm-package-name-3.0.0.tgz",
"integrity": "sha512-M6w37eVCMMouJ9V/sdPGnC5H4uDr73/+xdq0FBLO3TFFX1+7wiUY6Es328NN+y43tmY+doUdN9g9J21vqB7iLw==",
"dev": true,
"license": "ISC",
"dependencies": {
"builtins": "^1.0.3"
@ -29374,6 +29396,7 @@
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz",
"integrity": "sha512-B5JU3cabzk8c67mRRd3ECmROafjYMXbuzlwtqdM8IbS8ktlTix8aFGb2bAGKrSRIlnfKwovGUUr72JUPyOb6kQ==",
"dev": true,
"license": "MIT",
"engines": {
"node": ">=0.10.0"
@ -29393,6 +29416,7 @@
"resolved": "https://registry.npmjs.org/osenv/-/osenv-0.1.5.tgz",
"integrity": "sha512-0CWcCECdMVc2Rw3U5w9ZjqX6ga6ubk1xDVKxtBQPK7wis/0F2r9T6k4ydGYhecl7YUBxBVxhL5oisPsNxAPe2g==",
"deprecated": "This package is no longer supported.",
"dev": true,
"license": "ISC",
"dependencies": {
"os-homedir": "^1.0.0",
@ -32143,6 +32167,7 @@
"version": "0.11.0",
"resolved": "https://registry.npmjs.org/qrcode-terminal/-/qrcode-terminal-0.11.0.tgz",
"integrity": "sha512-Uu7ii+FQy4Qf82G4xu7ShHhjhGahEpCWc3x8UavY3CTcWV+ufmmCtwkr7ZKsX42jdL0kr1B5FKUeqJvAn51jzQ==",
"dev": true,
"bin": {
"qrcode-terminal": "bin/qrcode-terminal.js"
}
@ -35605,6 +35630,7 @@
"version": "10.0.6",
"resolved": "https://registry.npmjs.org/ssri/-/ssri-10.0.6.tgz",
"integrity": "sha512-MGrFH9Z4NP9Iyhqn16sDtBpRRNJ0Y2hNa6D65h736fVSaPCHr4DM4sWUNvVaSuC+0OBGhwsrydQwmgfg5LncqQ==",
"dev": true,
"license": "ISC",
"dependencies": {
"minipass": "^7.0.3"
@ -36317,6 +36343,7 @@
"version": "6.2.1",
"resolved": "https://registry.npmjs.org/tar/-/tar-6.2.1.tgz",
"integrity": "sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==",
"dev": true,
"license": "ISC",
"dependencies": {
"chownr": "^2.0.0",
@ -36361,6 +36388,7 @@
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-2.1.0.tgz",
"integrity": "sha512-V/JgOLFCS+R6Vcq0slCuaeWEdNC3ouDlJMNIsacH2VtALiu9mV4LPrHc5cDl8k5aw6J8jwgWWpiTo5RYhmIzvg==",
"dev": true,
"license": "ISC",
"dependencies": {
"minipass": "^3.0.0"
@ -36373,6 +36401,7 @@
"version": "3.3.6",
"resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz",
"integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==",
"dev": true,
"license": "ISC",
"dependencies": {
"yallist": "^4.0.0"
@ -36385,6 +36414,7 @@
"version": "5.0.0",
"resolved": "https://registry.npmjs.org/minipass/-/minipass-5.0.0.tgz",
"integrity": "sha512-3FnjYuehv9k6ovOEbyOswadCDPX1piCfhV8ncmYtHOjuPwylVWsghTLo7rabjC3Rx5xD4HDx8Wm1xnMF7S5qFQ==",
"dev": true,
"license": "ISC",
"engines": {
"node": ">=8"
@ -36394,6 +36424,7 @@
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-1.0.4.tgz",
"integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==",
"dev": true,
"license": "MIT",
"bin": {
"mkdirp": "bin/cmd.js"
@ -37666,6 +37697,7 @@
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/unique-filename/-/unique-filename-3.0.0.tgz",
"integrity": "sha512-afXhuC55wkAmZ0P18QsVE6kp8JaxrEokN2HGIoIVv2ijHQd419H0+6EigAFcIzXeMIkcIkNBpB3L/DXB3cTS/g==",
"dev": true,
"license": "ISC",
"dependencies": {
"unique-slug": "^4.0.0"
@ -37678,6 +37710,7 @@
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/unique-slug/-/unique-slug-4.0.0.tgz",
"integrity": "sha512-WrcA6AyEfqDX5bWige/4NQfPZMtASNVxdmWR76WESYQVAACSgWcR6e9i0mofqqBxYFtL4oAxPIptY73/0YE1DQ==",
"dev": true,
"license": "ISC",
"dependencies": {
"imurmurhash": "^0.1.4"

View File

@ -10,9 +10,9 @@ export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";
export * from "./waku.js";
export { createLightNode, defaultLibp2p } from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuStore } from "./protocols/store.js";
export { wakuLightPush } from "./protocols/lightpush/index.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store/index.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";

View File

@ -0,0 +1,5 @@
export const DEFAULT_KEEP_ALIVE = 30 * 1000;
export const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
};

View File

@ -0,0 +1,305 @@
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
type CreateSubscriptionResult,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
type Libp2p,
NetworkConfig,
type ProtocolCreateOptions,
ProtocolError,
type ProtocolUseOptions,
type PubsubTopic,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
} from "@waku/interfaces";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { BaseProtocolSDK } from "../base_protocol.js";
import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js";
import { SubscriptionManager } from "./subscription_manager.js";
const log = new Logger("sdk:filter");
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private activeSubscriptions = new Map<string, SubscriptionManager>();
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
connectionManager.configuredPubsubTopics,
libp2p
),
connectionManager,
{ numPeersToUse: options?.numPeersToUse }
);
this.protocol = this.core as FilterCore;
this.activeSubscriptions = new Map();
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
}
const pubsubTopic = uniquePubsubTopics[0];
const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);
if (error) {
return {
subscription: null,
error: error,
results: null
};
}
const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}
log.info(
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(
pubsubTopic,
this.protocol,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
);
return {
error: null,
subscription
};
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
if (uniquePubsubTopics.length === 0) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
);
}
if (uniquePubsubTopics.length > 1) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
);
}
const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}
}
export function wakuFilter(
connectionManager: ConnectionManager,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
}

View File

@ -1,54 +1,34 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import type { Peer, PeerId } from "@libp2p/interface";
import { FilterCore } from "@waku/core";
import {
type Callback,
type ContentTopic,
type CoreProtocolResult,
type CreateSubscriptionResult,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
type IProtoMessage,
type ISubscriptionSDK,
type Libp2p,
NetworkConfig,
type PeerIdStr,
type ProtocolCreateOptions,
ProtocolError,
type ProtocolUseOptions,
type PubsubTopic,
type SDKProtocolResult,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
type SubscribeOptions
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { groupByContentTopic, Logger } from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol.js";
import { FilterReliabilityMonitor as ReliabilityMonitor } from "./reliability_monitor.js";
import { FilterReliabilityMonitor as ReliabilityMonitor } from "../reliability_monitor";
import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants";
type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
const log = new Logger("sdk:filter");
const DEFAULT_MAX_PINGS = 3;
const DEFAULT_KEEP_ALIVE = 30 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: DEFAULT_KEEP_ALIVE
};
const log = new Logger("sdk:filter:subscription_manager");
export class SubscriptionManager implements ISubscriptionSDK {
private keepAliveTimer: number | null = null;
@ -336,279 +316,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private activeSubscriptions = new Map<string, SubscriptionManager>();
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
connectionManager.configuredPubsubTopics,
libp2p
),
connectionManager,
{ numPeersToUse: options?.numPeersToUse }
);
this.protocol = this.core as FilterCore;
this.activeSubscriptions = new Map();
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
}
const pubsubTopic = uniquePubsubTopics[0];
const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);
if (error) {
return {
subscription: null,
error: error,
results: null
};
}
const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}
log.info(
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(
pubsubTopic,
this.protocol,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
);
return {
error: null,
subscription
};
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
if (uniquePubsubTopics.length === 0) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
);
}
if (uniquePubsubTopics.length > 1) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
);
}
const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}
}
export function wakuFilter(
connectionManager: ConnectionManager,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
}
async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubsubTopic: PubsubTopic,

View File

@ -13,7 +13,7 @@ import {
} from "@waku/interfaces";
import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol.js";
import { BaseProtocolSDK } from "../base_protocol.js";
const log = new Logger("sdk:light-push");

View File

@ -8,7 +8,7 @@ const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
const log = new Logger("waku:message-monitor");
export class ReliabilityMonitor {
export class FilterReliabilityMonitor {
private readonly receivedMessagesHashes: Set<string> = new Set();
private readonly messageHashesByPeer: Map<PeerIdStr, Set<string>> = new Map();
private readonly missedMessagesByPeer: Map<PeerIdStr, number> = new Map();

View File

@ -10,7 +10,7 @@ import {
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol";
import { BaseProtocolSDK } from "../base_protocol";
const DEFAULT_NUM_PEERS = 1;

View File

@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces";
import { wakuRelay } from "@waku/relay";
import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";
import { wakuFilter } from "./protocols/filter/index.js";
import { wakuLightPush } from "./protocols/lightpush/index.js";
import { wakuStore } from "./protocols/store/index.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;