From f0eb24480447a7caa5909b87bb0a1c0186b961a2 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 20 Aug 2024 19:07:07 +0530 Subject: [PATCH] chore: refactor sub dir paths --- package-lock.json | 33 ++ packages/sdk/src/index.ts | 6 +- .../sdk/src/protocols/filter/constants.ts | 5 + packages/sdk/src/protocols/filter/index.ts | 305 +++++++++++++++++ .../subscription_manager.ts} | 309 +----------------- .../{light_push.ts => lightpush/index.ts} | 2 +- .../sdk/src/protocols/reliability_monitor.ts | 2 +- .../protocols/{store.ts => store/index.ts} | 2 +- packages/sdk/src/waku.ts | 6 +- 9 files changed, 360 insertions(+), 310 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/constants.ts create mode 100644 packages/sdk/src/protocols/filter/index.ts rename packages/sdk/src/protocols/{filter.ts => filter/subscription_manager.ts} (50%) rename packages/sdk/src/protocols/{light_push.ts => lightpush/index.ts} (98%) rename packages/sdk/src/protocols/{store.ts => store/index.ts} (99%) diff --git a/package-lock.json b/package-lock.json index 29bb4a0582..2b97ccf57f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5957,6 +5957,7 @@ "version": "3.1.1", "resolved": "https://registry.npmjs.org/@npmcli/fs/-/fs-3.1.1.tgz", "integrity": "sha512-q9CRWjpHCMIh5sVyefoD1cA7PkvILqCZsnSOEUUivORLjxCO/Irmue2DprETiNgEqktDBZaM1Bi+jrarx1XdCg==", + "dev": true, "license": "ISC", "dependencies": { "semver": "^7.3.5" @@ -12697,6 +12698,7 @@ "version": "18.0.4", "resolved": "https://registry.npmjs.org/cacache/-/cacache-18.0.4.tgz", "integrity": "sha512-B+L5iIa9mgcjLbliir2th36yEwPftrzteHYujzsx3dFP/31GCHcIeS8f5MGd80odLOjaOvSpU3EEAmRQptkxLQ==", + "dev": true, "license": "ISC", "dependencies": { "@npmcli/fs": "^3.1.0", @@ -12720,6 +12722,7 @@ "version": "10.4.5", "resolved": "https://registry.npmjs.org/glob/-/glob-10.4.5.tgz", "integrity": "sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==", + "dev": true, "license": "ISC", "dependencies": { "foreground-child": "^3.1.0", @@ -12740,12 +12743,14 @@ "version": "10.4.3", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-10.4.3.tgz", "integrity": "sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ==", + "dev": true, "license": "ISC" }, "node_modules/cacache/node_modules/minimatch": { "version": "9.0.5", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.5.tgz", "integrity": "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow==", + "dev": true, "license": "ISC", "dependencies": { "brace-expansion": "^2.0.1" @@ -12761,6 +12766,7 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/p-map/-/p-map-4.0.0.tgz", "integrity": "sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==", + "dev": true, "license": "MIT", "dependencies": { "aggregate-error": "^3.0.0" @@ -13198,6 +13204,7 @@ "version": "2.0.0", "resolved": "https://registry.npmjs.org/chownr/-/chownr-2.0.0.tgz", "integrity": "sha512-bIomtDF5KGpdogkLd9VspvFzk9KfpyyGlS8YFVZl7TGPBHL5snIOnxeshwVgPteQ9b4Eydl+pVbIyE1DcvCWgQ==", + "dev": true, "license": "ISC", "engines": { "node": ">=10" @@ -18432,6 +18439,7 @@ "version": "3.0.3", "resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-3.0.3.tgz", "integrity": "sha512-XUBA9XClHbnJWSfBzjkm6RvPsyg3sryZt06BEQoXcF7EK/xpGaQYJgQKDJSUH5SGZ76Y7pFx1QBnXz09rU5Fbw==", + "dev": true, "license": "ISC", "dependencies": { "minipass": "^7.0.3" @@ -19598,6 +19606,7 @@ "version": "4.1.1", "resolved": "https://registry.npmjs.org/ini/-/ini-4.1.1.tgz", "integrity": "sha512-QQnnxNyfvmHFIsj7gkPcYymR8Jdw/o7mp5ZFihxn6h8Ci6fh3Dx4E1gPjpQEpIuPo9XVNY/ZUwh4BPMjGyL01g==", + "dev": true, "license": "ISC", "engines": { "node": "^14.17.0 || ^16.13.0 || >=18.0.0" @@ -24978,6 +24987,7 @@ "version": "2.0.1", "resolved": "https://registry.npmjs.org/minipass-collect/-/minipass-collect-2.0.1.tgz", "integrity": "sha512-D7V8PO9oaz7PWGLbCACuI1qEOsq7UKfLotx/C0Aet43fCUB/wfQ7DYeq2oR/svFJGYDHPr38SHATeaj/ZoKHKw==", + "dev": true, "license": "ISC", "dependencies": { "minipass": "^7.0.3" @@ -24990,6 +25000,7 @@ "version": "1.0.5", "resolved": "https://registry.npmjs.org/minipass-flush/-/minipass-flush-1.0.5.tgz", "integrity": "sha512-JmQSYYpPUqX5Jyn1mXaRwOda1uQ8HP5KAT/oDSLCzt1BYRhQU0/hDtsB1ufZfEEzMZ9aAVmsBw8+FWsIXlClWw==", + "dev": true, "license": "ISC", "dependencies": { "minipass": "^3.0.0" @@ -25002,6 +25013,7 @@ "version": "3.3.6", "resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz", "integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==", + "dev": true, "license": "ISC", "dependencies": { "yallist": "^4.0.0" @@ -25014,6 +25026,7 @@ "version": "1.2.4", "resolved": "https://registry.npmjs.org/minipass-pipeline/-/minipass-pipeline-1.2.4.tgz", "integrity": "sha512-xuIq7cIOt09RPRJ19gdi4b+RiNvDFYe5JH+ggNvBqGqpQXcru3PcRmOZuHBKWK1Txf9+cQ+HMVN4d6z46LZP7A==", + "dev": true, "license": "ISC", "dependencies": { "minipass": "^3.0.0" @@ -25026,6 +25039,7 @@ "version": "3.3.6", "resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz", "integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==", + "dev": true, "license": "ISC", "dependencies": { "yallist": "^4.0.0" @@ -25038,6 +25052,7 @@ "version": "2.1.2", "resolved": "https://registry.npmjs.org/minizlib/-/minizlib-2.1.2.tgz", "integrity": "sha512-bAxsR8BVfj60DWXHE3u30oHzfl4G7khkSuPW+qvpd7jFRHm7dLxOjUk1EHACJ/hxLY8phGJ0YhYHZo7jil7Qdg==", + "dev": true, "license": "MIT", "dependencies": { "minipass": "^3.0.0", @@ -25051,6 +25066,7 @@ "version": "3.3.6", "resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz", "integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==", + "dev": true, "license": "ISC", "dependencies": { "yallist": "^4.0.0" @@ -25852,6 +25868,7 @@ "version": "7.0.0", "resolved": "https://registry.npmjs.org/npm-package-arg/-/npm-package-arg-7.0.0.tgz", "integrity": "sha512-xXxr8y5U0kl8dVkz2oK7yZjPBvqM2fwaO5l3Yg13p03v8+E3qQcD0JNhHzjL1vyGgxcKkD0cco+NLR72iuPk3g==", + "dev": true, "license": "ISC", "dependencies": { "hosted-git-info": "^3.0.2", @@ -25864,12 +25881,14 @@ "version": "1.0.3", "resolved": "https://registry.npmjs.org/builtins/-/builtins-1.0.3.tgz", "integrity": "sha512-uYBjakWipfaO/bXI7E8rq6kpwHRZK5cNYrUv2OzZSI/FvmdMyXJ2tG9dKcjEC5YHmHpUAwsargWIZNWdxb/bnQ==", + "dev": true, "license": "MIT" }, "node_modules/npm-package-arg/node_modules/hosted-git-info": { "version": "3.0.8", "resolved": "https://registry.npmjs.org/hosted-git-info/-/hosted-git-info-3.0.8.tgz", "integrity": "sha512-aXpmwoOhRBrw6X3j0h5RloK4x1OzsxMPyxqIHyNfSe2pypkVTZFpEiRoSipPEPlMrh0HW/XsjkJ5WgnCirpNUw==", + "dev": true, "license": "ISC", "dependencies": { "lru-cache": "^6.0.0" @@ -25882,6 +25901,7 @@ "version": "6.0.0", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", "integrity": "sha512-Jo6dJ04CmSjuznwJSS3pUeWmd/H0ffTlkXXgwZi+eq1UCmqQwCh+eLsYOYCwY991i2Fah4h1BEMCx4qThGbsiA==", + "dev": true, "license": "ISC", "dependencies": { "yallist": "^4.0.0" @@ -25894,6 +25914,7 @@ "version": "5.7.2", "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", + "dev": true, "license": "ISC", "bin": { "semver": "bin/semver" @@ -25903,6 +25924,7 @@ "version": "3.0.0", "resolved": "https://registry.npmjs.org/validate-npm-package-name/-/validate-npm-package-name-3.0.0.tgz", "integrity": "sha512-M6w37eVCMMouJ9V/sdPGnC5H4uDr73/+xdq0FBLO3TFFX1+7wiUY6Es328NN+y43tmY+doUdN9g9J21vqB7iLw==", + "dev": true, "license": "ISC", "dependencies": { "builtins": "^1.0.3" @@ -29374,6 +29396,7 @@ "version": "1.0.2", "resolved": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz", "integrity": "sha512-B5JU3cabzk8c67mRRd3ECmROafjYMXbuzlwtqdM8IbS8ktlTix8aFGb2bAGKrSRIlnfKwovGUUr72JUPyOb6kQ==", + "dev": true, "license": "MIT", "engines": { "node": ">=0.10.0" @@ -29393,6 +29416,7 @@ "resolved": "https://registry.npmjs.org/osenv/-/osenv-0.1.5.tgz", "integrity": "sha512-0CWcCECdMVc2Rw3U5w9ZjqX6ga6ubk1xDVKxtBQPK7wis/0F2r9T6k4ydGYhecl7YUBxBVxhL5oisPsNxAPe2g==", "deprecated": "This package is no longer supported.", + "dev": true, "license": "ISC", "dependencies": { "os-homedir": "^1.0.0", @@ -32143,6 +32167,7 @@ "version": "0.11.0", "resolved": "https://registry.npmjs.org/qrcode-terminal/-/qrcode-terminal-0.11.0.tgz", "integrity": "sha512-Uu7ii+FQy4Qf82G4xu7ShHhjhGahEpCWc3x8UavY3CTcWV+ufmmCtwkr7ZKsX42jdL0kr1B5FKUeqJvAn51jzQ==", + "dev": true, "bin": { "qrcode-terminal": "bin/qrcode-terminal.js" } @@ -35605,6 +35630,7 @@ "version": "10.0.6", "resolved": "https://registry.npmjs.org/ssri/-/ssri-10.0.6.tgz", "integrity": "sha512-MGrFH9Z4NP9Iyhqn16sDtBpRRNJ0Y2hNa6D65h736fVSaPCHr4DM4sWUNvVaSuC+0OBGhwsrydQwmgfg5LncqQ==", + "dev": true, "license": "ISC", "dependencies": { "minipass": "^7.0.3" @@ -36317,6 +36343,7 @@ "version": "6.2.1", "resolved": "https://registry.npmjs.org/tar/-/tar-6.2.1.tgz", "integrity": "sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==", + "dev": true, "license": "ISC", "dependencies": { "chownr": "^2.0.0", @@ -36361,6 +36388,7 @@ "version": "2.1.0", "resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-2.1.0.tgz", "integrity": "sha512-V/JgOLFCS+R6Vcq0slCuaeWEdNC3ouDlJMNIsacH2VtALiu9mV4LPrHc5cDl8k5aw6J8jwgWWpiTo5RYhmIzvg==", + "dev": true, "license": "ISC", "dependencies": { "minipass": "^3.0.0" @@ -36373,6 +36401,7 @@ "version": "3.3.6", "resolved": "https://registry.npmjs.org/minipass/-/minipass-3.3.6.tgz", "integrity": "sha512-DxiNidxSEK+tHG6zOIklvNOwm3hvCrbUrdtzY74U6HKTJxvIDfOUL5W5P2Ghd3DTkhhKPYGqeNUIh5qcM4YBfw==", + "dev": true, "license": "ISC", "dependencies": { "yallist": "^4.0.0" @@ -36385,6 +36414,7 @@ "version": "5.0.0", "resolved": "https://registry.npmjs.org/minipass/-/minipass-5.0.0.tgz", "integrity": "sha512-3FnjYuehv9k6ovOEbyOswadCDPX1piCfhV8ncmYtHOjuPwylVWsghTLo7rabjC3Rx5xD4HDx8Wm1xnMF7S5qFQ==", + "dev": true, "license": "ISC", "engines": { "node": ">=8" @@ -36394,6 +36424,7 @@ "version": "1.0.4", "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-1.0.4.tgz", "integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==", + "dev": true, "license": "MIT", "bin": { "mkdirp": "bin/cmd.js" @@ -37666,6 +37697,7 @@ "version": "3.0.0", "resolved": "https://registry.npmjs.org/unique-filename/-/unique-filename-3.0.0.tgz", "integrity": "sha512-afXhuC55wkAmZ0P18QsVE6kp8JaxrEokN2HGIoIVv2ijHQd419H0+6EigAFcIzXeMIkcIkNBpB3L/DXB3cTS/g==", + "dev": true, "license": "ISC", "dependencies": { "unique-slug": "^4.0.0" @@ -37678,6 +37710,7 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/unique-slug/-/unique-slug-4.0.0.tgz", "integrity": "sha512-WrcA6AyEfqDX5bWige/4NQfPZMtASNVxdmWR76WESYQVAACSgWcR6e9i0mofqqBxYFtL4oAxPIptY73/0YE1DQ==", + "dev": true, "license": "ISC", "dependencies": { "imurmurhash": "^0.1.4" diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 2bbdfc0d30..b29020618a 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -10,9 +10,9 @@ export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; export * from "./waku.js"; export { createLightNode, defaultLibp2p } from "./create/index.js"; -export { wakuLightPush } from "./protocols/light_push.js"; -export { wakuFilter } from "./protocols/filter.js"; -export { wakuStore } from "./protocols/store.js"; +export { wakuLightPush } from "./protocols/lightpush/index.js"; +export { wakuFilter } from "./protocols/filter/index.js"; +export { wakuStore } from "./protocols/store/index.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts new file mode 100644 index 0000000000..3889e7638b --- /dev/null +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -0,0 +1,5 @@ +export const DEFAULT_KEEP_ALIVE = 30 * 1000; + +export const DEFAULT_SUBSCRIBE_OPTIONS = { + keepAlive: DEFAULT_KEEP_ALIVE +}; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts new file mode 100644 index 0000000000..d87cc97468 --- /dev/null +++ b/packages/sdk/src/protocols/filter/index.ts @@ -0,0 +1,305 @@ +import { ConnectionManager, FilterCore } from "@waku/core"; +import { + type Callback, + type CreateSubscriptionResult, + type IAsyncIterator, + type IDecodedMessage, + type IDecoder, + type IFilterSDK, + type Libp2p, + NetworkConfig, + type ProtocolCreateOptions, + ProtocolError, + type ProtocolUseOptions, + type PubsubTopic, + type SubscribeOptions, + SubscribeResult, + type Unsubscribe +} from "@waku/interfaces"; +import { + ensurePubsubTopicIsConfigured, + groupByContentTopic, + Logger, + shardInfoToPubsubTopics, + toAsyncIterator +} from "@waku/utils"; + +import { BaseProtocolSDK } from "../base_protocol.js"; + +import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; +import { SubscriptionManager } from "./subscription_manager.js"; + +const log = new Logger("sdk:filter"); + +class FilterSDK extends BaseProtocolSDK implements IFilterSDK { + public readonly protocol: FilterCore; + + private activeSubscriptions = new Map(); + + public constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { + super( + new FilterCore( + async (pubsubTopic, wakuMessage, peerIdStr) => { + const subscription = this.getActiveSubscription(pubsubTopic); + if (!subscription) { + log.error( + `No subscription locally registered for topic ${pubsubTopic}` + ); + return; + } + + await subscription.processIncomingMessage(wakuMessage, peerIdStr); + }, + connectionManager.configuredPubsubTopics, + libp2p + ), + connectionManager, + { numPeersToUse: options?.numPeersToUse } + ); + + this.protocol = this.core as FilterCore; + + this.activeSubscriptions = new Map(); + } + + /** + * Opens a subscription with the Filter protocol using the provided decoders and callback. + * This method combines the functionality of creating a subscription and subscribing to it. + * + * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. + * @param {Callback} callback - The callback function to be invoked with decoded messages. + * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol. + * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. + * + * @returns {Promise} A promise that resolves to an object containing: + * - subscription: The created subscription object if successful, or null if failed. + * - error: A ProtocolError if the subscription creation failed, or null if successful. + * - results: An object containing arrays of failures and successes from the subscription process. + * Only present if the subscription was created successfully. + * + * @throws {Error} If there's an unexpected error during the subscription process. + * + * @remarks + * This method attempts to create a subscription using the pubsub topic derived from the provided decoders, + * then tries to subscribe using the created subscription. The return value should be interpreted as follows: + * - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely. + * - If `subscription` is non-null and `error` is null, the subscription was created successfully. + * In this case, check the `results` field for detailed information about successes and failures during the subscription process. + * - Even if the subscription was created successfully, there might be some failures in the results. + * + * @example + * ```typescript + * const {subscription, error, results} = await waku.filter.subscribe(decoders, callback); + * if (!subscription || error) { + * console.error("Failed to create subscription:", error); + * } + * console.log("Subscription created successfully"); + * if (results.failures.length > 0) { + * console.warn("Some errors occurred during subscription:", results.failures); + * } + * console.log("Successful subscriptions:", results.successes); + * + * ``` + */ + public async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + protocolUseOptions?: ProtocolUseOptions, + subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); + + if (uniquePubsubTopics.length !== 1) { + return { + subscription: null, + error: ProtocolError.INVALID_DECODER_TOPICS, + results: null + }; + } + + const pubsubTopic = uniquePubsubTopics[0]; + + const { subscription, error } = await this.createSubscription( + pubsubTopic, + protocolUseOptions + ); + + if (error) { + return { + subscription: null, + error: error, + results: null + }; + } + + const { failures, successes } = await subscription.subscribe( + decoders, + callback, + subscribeOptions + ); + return { + subscription, + error: null, + results: { + failures: failures, + successes: successes + } + }; + } + + /** + * Creates a new subscription to the given pubsub topic. + * The subscription is made to multiple peers for decentralization. + * @param pubsubTopicShardInfo The pubsub topic to subscribe to. + * @returns The subscription object. + */ + private async createSubscription( + pubsubTopicShardInfo: NetworkConfig | PubsubTopic, + options?: ProtocolUseOptions + ): Promise { + options = { + autoRetry: true, + ...options + } as ProtocolUseOptions; + + const pubsubTopic = + typeof pubsubTopicShardInfo == "string" + ? pubsubTopicShardInfo + : shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0]; + + ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); + + const hasPeers = await this.hasPeers(options); + if (!hasPeers) { + return { + error: ProtocolError.NO_PEER_AVAILABLE, + subscription: null + }; + } + + log.info( + `Creating filter subscription with ${this.connectedPeers.length} peers: `, + this.connectedPeers.map((peer) => peer.id.toString()) + ); + + const subscription = + this.getActiveSubscription(pubsubTopic) ?? + this.setActiveSubscription( + pubsubTopic, + new SubscriptionManager( + pubsubTopic, + this.protocol, + () => this.connectedPeers, + this.renewPeer.bind(this) + ) + ); + + return { + error: null, + subscription + }; + } + + /** + * This method is used to satisfy the `IReceiver` interface. + * + * @hidden + * + * @param decoders The decoders to use for the subscription. + * @param callback The callback function to use for the subscription. + * @param opts Optional protocol options for the subscription. + * + * @returns A Promise that resolves to a function that unsubscribes from the subscription. + * + * @remarks + * This method should not be used directly. + * Instead, use `createSubscription` to create a new subscription. + */ + public async subscribeWithUnsubscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); + + if (uniquePubsubTopics.length === 0) { + throw Error( + "Failed to subscribe: no pubsubTopic found on decoders provided." + ); + } + + if (uniquePubsubTopics.length > 1) { + throw Error( + "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." + ); + } + + const { subscription, error } = await this.createSubscription( + uniquePubsubTopics[0] + ); + + if (error) { + throw Error(`Failed to create subscription: ${error}`); + } + + await subscription.subscribe(decoders, callback, options); + + const contentTopics = Array.from( + groupByContentTopic( + Array.isArray(decoders) ? decoders : [decoders] + ).keys() + ); + + return async () => { + await subscription.unsubscribe(contentTopics); + }; + } + + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[] + ): Promise> { + return toAsyncIterator(this, decoders); + } + + //TODO: move to SubscriptionManager + private getActiveSubscription( + pubsubTopic: PubsubTopic + ): SubscriptionManager | undefined { + return this.activeSubscriptions.get(pubsubTopic); + } + + private setActiveSubscription( + pubsubTopic: PubsubTopic, + subscription: SubscriptionManager + ): SubscriptionManager { + this.activeSubscriptions.set(pubsubTopic, subscription); + return subscription; + } + + private getUniquePubsubTopics( + decoders: IDecoder | IDecoder[] + ): string[] { + if (!Array.isArray(decoders)) { + return [decoders.pubsubTopic]; + } + + if (decoders.length === 0) { + return []; + } + + const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); + + return [...pubsubTopics]; + } +} + +export function wakuFilter( + connectionManager: ConnectionManager, + init?: ProtocolCreateOptions +): (libp2p: Libp2p) => IFilterSDK { + return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); +} diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts similarity index 50% rename from packages/sdk/src/protocols/filter.ts rename to packages/sdk/src/protocols/filter/subscription_manager.ts index eddad0a49f..bc6df6fff2 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -1,54 +1,34 @@ -import type { Peer } from "@libp2p/interface"; -import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, FilterCore } from "@waku/core"; +import type { Peer, PeerId } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, type CoreProtocolResult, - type CreateSubscriptionResult, - type IAsyncIterator, type IDecodedMessage, type IDecoder, - type IFilterSDK, type IProtoMessage, type ISubscriptionSDK, - type Libp2p, - NetworkConfig, type PeerIdStr, - type ProtocolCreateOptions, ProtocolError, - type ProtocolUseOptions, type PubsubTopic, type SDKProtocolResult, - type SubscribeOptions, - SubscribeResult, - type Unsubscribe + type SubscribeOptions } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { - ensurePubsubTopicIsConfigured, - groupByContentTopic, - Logger, - shardInfoToPubsubTopics, - toAsyncIterator -} from "@waku/utils"; +import { groupByContentTopic, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; -import { FilterReliabilityMonitor as ReliabilityMonitor } from "./reliability_monitor.js"; +import { FilterReliabilityMonitor as ReliabilityMonitor } from "../reliability_monitor"; + +import { DEFAULT_KEEP_ALIVE, DEFAULT_SUBSCRIBE_OPTIONS } from "./constants"; type SubscriptionCallback = { decoders: IDecoder[]; callback: Callback; }; -const log = new Logger("sdk:filter"); - const DEFAULT_MAX_PINGS = 3; -const DEFAULT_KEEP_ALIVE = 30 * 1000; -const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE -}; +const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscriptionSDK { private keepAliveTimer: number | null = null; @@ -336,279 +316,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } } -class FilterSDK extends BaseProtocolSDK implements IFilterSDK { - public readonly protocol: FilterCore; - - private activeSubscriptions = new Map(); - - public constructor( - connectionManager: ConnectionManager, - libp2p: Libp2p, - options?: ProtocolCreateOptions - ) { - super( - new FilterCore( - async (pubsubTopic, wakuMessage, peerIdStr) => { - const subscription = this.getActiveSubscription(pubsubTopic); - if (!subscription) { - log.error( - `No subscription locally registered for topic ${pubsubTopic}` - ); - return; - } - - await subscription.processIncomingMessage(wakuMessage, peerIdStr); - }, - connectionManager.configuredPubsubTopics, - libp2p - ), - connectionManager, - { numPeersToUse: options?.numPeersToUse } - ); - - this.protocol = this.core as FilterCore; - - this.activeSubscriptions = new Map(); - } - - /** - * Opens a subscription with the Filter protocol using the provided decoders and callback. - * This method combines the functionality of creating a subscription and subscribing to it. - * - * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. - * @param {Callback} callback - The callback function to be invoked with decoded messages. - * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol. - * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. - * - * @returns {Promise} A promise that resolves to an object containing: - * - subscription: The created subscription object if successful, or null if failed. - * - error: A ProtocolError if the subscription creation failed, or null if successful. - * - results: An object containing arrays of failures and successes from the subscription process. - * Only present if the subscription was created successfully. - * - * @throws {Error} If there's an unexpected error during the subscription process. - * - * @remarks - * This method attempts to create a subscription using the pubsub topic derived from the provided decoders, - * then tries to subscribe using the created subscription. The return value should be interpreted as follows: - * - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely. - * - If `subscription` is non-null and `error` is null, the subscription was created successfully. - * In this case, check the `results` field for detailed information about successes and failures during the subscription process. - * - Even if the subscription was created successfully, there might be some failures in the results. - * - * @example - * ```typescript - * const {subscription, error, results} = await waku.filter.subscribe(decoders, callback); - * if (!subscription || error) { - * console.error("Failed to create subscription:", error); - * } - * console.log("Subscription created successfully"); - * if (results.failures.length > 0) { - * console.warn("Some errors occurred during subscription:", results.failures); - * } - * console.log("Successful subscriptions:", results.successes); - * - * ``` - */ - public async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - protocolUseOptions?: ProtocolUseOptions, - subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - - if (uniquePubsubTopics.length !== 1) { - return { - subscription: null, - error: ProtocolError.INVALID_DECODER_TOPICS, - results: null - }; - } - - const pubsubTopic = uniquePubsubTopics[0]; - - const { subscription, error } = await this.createSubscription( - pubsubTopic, - protocolUseOptions - ); - - if (error) { - return { - subscription: null, - error: error, - results: null - }; - } - - const { failures, successes } = await subscription.subscribe( - decoders, - callback, - subscribeOptions - ); - return { - subscription, - error: null, - results: { - failures: failures, - successes: successes - } - }; - } - - /** - * Creates a new subscription to the given pubsub topic. - * The subscription is made to multiple peers for decentralization. - * @param pubsubTopicShardInfo The pubsub topic to subscribe to. - * @returns The subscription object. - */ - private async createSubscription( - pubsubTopicShardInfo: NetworkConfig | PubsubTopic, - options?: ProtocolUseOptions - ): Promise { - options = { - autoRetry: true, - ...options - } as ProtocolUseOptions; - - const pubsubTopic = - typeof pubsubTopicShardInfo == "string" - ? pubsubTopicShardInfo - : shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0]; - - ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - - const hasPeers = await this.hasPeers(options); - if (!hasPeers) { - return { - error: ProtocolError.NO_PEER_AVAILABLE, - subscription: null - }; - } - - log.info( - `Creating filter subscription with ${this.connectedPeers.length} peers: `, - this.connectedPeers.map((peer) => peer.id.toString()) - ); - - const subscription = - this.getActiveSubscription(pubsubTopic) ?? - this.setActiveSubscription( - pubsubTopic, - new SubscriptionManager( - pubsubTopic, - this.protocol, - () => this.connectedPeers, - this.renewPeer.bind(this) - ) - ); - - return { - error: null, - subscription - }; - } - - /** - * This method is used to satisfy the `IReceiver` interface. - * - * @hidden - * - * @param decoders The decoders to use for the subscription. - * @param callback The callback function to use for the subscription. - * @param opts Optional protocol options for the subscription. - * - * @returns A Promise that resolves to a function that unsubscribes from the subscription. - * - * @remarks - * This method should not be used directly. - * Instead, use `createSubscription` to create a new subscription. - */ - public async subscribeWithUnsubscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - - if (uniquePubsubTopics.length === 0) { - throw Error( - "Failed to subscribe: no pubsubTopic found on decoders provided." - ); - } - - if (uniquePubsubTopics.length > 1) { - throw Error( - "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." - ); - } - - const { subscription, error } = await this.createSubscription( - uniquePubsubTopics[0] - ); - - if (error) { - throw Error(`Failed to create subscription: ${error}`); - } - - await subscription.subscribe(decoders, callback, options); - - const contentTopics = Array.from( - groupByContentTopic( - Array.isArray(decoders) ? decoders : [decoders] - ).keys() - ); - - return async () => { - await subscription.unsubscribe(contentTopics); - }; - } - - public toSubscriptionIterator( - decoders: IDecoder | IDecoder[] - ): Promise> { - return toAsyncIterator(this, decoders); - } - - //TODO: move to SubscriptionManager - private getActiveSubscription( - pubsubTopic: PubsubTopic - ): SubscriptionManager | undefined { - return this.activeSubscriptions.get(pubsubTopic); - } - - private setActiveSubscription( - pubsubTopic: PubsubTopic, - subscription: SubscriptionManager - ): SubscriptionManager { - this.activeSubscriptions.set(pubsubTopic, subscription); - return subscription; - } - - private getUniquePubsubTopics( - decoders: IDecoder | IDecoder[] - ): string[] { - if (!Array.isArray(decoders)) { - return [decoders.pubsubTopic]; - } - - if (decoders.length === 0) { - return []; - } - - const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); - - return [...pubsubTopics]; - } -} - -export function wakuFilter( - connectionManager: ConnectionManager, - init?: ProtocolCreateOptions -): (libp2p: Libp2p) => IFilterSDK { - return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); -} - async function pushMessage( subscriptionCallback: SubscriptionCallback, pubsubTopic: PubsubTopic, diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/lightpush/index.ts similarity index 98% rename from packages/sdk/src/protocols/light_push.ts rename to packages/sdk/src/protocols/lightpush/index.ts index 1a49fa66f6..ef6630d9fc 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/lightpush/index.ts @@ -13,7 +13,7 @@ import { } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol.js"; +import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); diff --git a/packages/sdk/src/protocols/reliability_monitor.ts b/packages/sdk/src/protocols/reliability_monitor.ts index 67b911a476..1a739c91a5 100644 --- a/packages/sdk/src/protocols/reliability_monitor.ts +++ b/packages/sdk/src/protocols/reliability_monitor.ts @@ -8,7 +8,7 @@ const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const log = new Logger("waku:message-monitor"); -export class ReliabilityMonitor { +export class FilterReliabilityMonitor { private readonly receivedMessagesHashes: Set = new Set(); private readonly messageHashesByPeer: Map> = new Map(); private readonly missedMessagesByPeer: Map = new Map(); diff --git a/packages/sdk/src/protocols/store.ts b/packages/sdk/src/protocols/store/index.ts similarity index 99% rename from packages/sdk/src/protocols/store.ts rename to packages/sdk/src/protocols/store/index.ts index 0390c8a40f..840499e090 100644 --- a/packages/sdk/src/protocols/store.ts +++ b/packages/sdk/src/protocols/store/index.ts @@ -10,7 +10,7 @@ import { import { messageHash } from "@waku/message-hash"; import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils"; -import { BaseProtocolSDK } from "./base_protocol"; +import { BaseProtocolSDK } from "../base_protocol"; const DEFAULT_NUM_PEERS = 1; diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index ae79a71849..d216604902 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,9 +17,9 @@ import { Protocols } from "@waku/interfaces"; import { wakuRelay } from "@waku/relay"; import { Logger } from "@waku/utils"; -import { wakuFilter } from "./protocols/filter.js"; -import { wakuLightPush } from "./protocols/light_push.js"; -import { wakuStore } from "./protocols/store.js"; +import { wakuFilter } from "./protocols/filter/index.js"; +import { wakuLightPush } from "./protocols/lightpush/index.js"; +import { wakuStore } from "./protocols/store/index.js"; export const DefaultPingKeepAliveValueSecs = 5 * 60; export const DefaultRelayKeepAliveValueSecs = 5 * 60;