diff --git a/.size-limit.cjs b/.size-limit.cjs index dcba50b6e..7d08b8669 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -44,7 +44,7 @@ module.exports = [ }, { name: "Waku Filter", - path: "packages/core/bundle/index.js", + path: "packages/sdk/bundle/index.js", import: "{ wakuFilter }", }, { diff --git a/package-lock.json b/package-lock.json index 2ec7f0433..5b0b33e00 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36494,15 +36494,14 @@ }, "packages/core": { "name": "@waku/core", - "version": "0.0.27", + "version": "0.0.28", "license": "MIT OR Apache-2.0", "dependencies": { "@libp2p/ping": "^1.0.12", - "@waku/enr": "^0.0.21", - "@waku/interfaces": "0.0.22", - "@waku/message-hash": "^0.1.11", + "@waku/enr": "^0.0.22", + "@waku/interfaces": "0.0.23", "@waku/proto": "0.0.6", - "@waku/utils": "0.0.15", + "@waku/utils": "0.0.16", "debug": "^4.3.4", "it-all": "^3.0.4", "it-length-prefixed": "^9.0.4", @@ -36538,7 +36537,6 @@ "@multiformats/multiaddr": "^12.0.0", "@waku/enr": "^0.0.21", "@waku/interfaces": "0.0.22", - "@waku/message-hash": "^0.1.11", "@waku/proto": "0.0.6", "@waku/utils": "0.0.15", "libp2p": "^1.1.2" @@ -36557,11 +36555,11 @@ "version": "0.0.1", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/core": "0.0.27", - "@waku/enr": "0.0.21", - "@waku/interfaces": "0.0.22", + "@waku/core": "0.0.28", + "@waku/enr": "0.0.22", + "@waku/interfaces": "0.0.23", "@waku/proto": "^0.0.6", - "@waku/utils": "0.0.15", + "@waku/utils": "0.0.16", "debug": "^4.3.4", "dns-query": "^0.11.2", "hi-base32": "^0.5.1", @@ -36608,7 +36606,7 @@ }, "packages/enr": { "name": "@waku/enr", - "version": "0.0.21", + "version": "0.0.22", "license": "MIT OR Apache-2.0", "dependencies": { "@ethersproject/rlp": "^5.7.0", @@ -36616,7 +36614,7 @@ "@libp2p/peer-id": "^4.0.4", "@multiformats/multiaddr": "^12.0.0", "@noble/secp256k1": "^1.7.1", - "@waku/utils": "0.0.15", + "@waku/utils": "0.0.16", "debug": "^4.3.4", "js-sha3": "^0.9.2" }, @@ -36628,7 +36626,7 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.6", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.22", + "@waku/interfaces": "0.0.23", "chai": "^4.3.10", "cspell": "^8.6.1", "fast-check": "^3.15.1", @@ -36657,7 +36655,7 @@ }, "packages/interfaces": { "name": "@waku/interfaces", - "version": "0.0.22", + "version": "0.0.23", "license": "MIT OR Apache-2.0", "dependencies": { "@waku/proto": "^0.0.6" @@ -36675,14 +36673,14 @@ }, "packages/message-encryption": { "name": "@waku/message-encryption", - "version": "0.0.25", + "version": "0.0.26", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/secp256k1": "^1.7.1", - "@waku/core": "0.0.27", - "@waku/interfaces": "0.0.22", + "@waku/core": "0.0.28", + "@waku/interfaces": "0.0.23", "@waku/proto": "0.0.6", - "@waku/utils": "0.0.15", + "@waku/utils": "0.0.16", "debug": "^4.3.4", "js-sha3": "^0.9.2", "uint8arrays": "^5.0.1" @@ -36719,11 +36717,11 @@ }, "packages/message-hash": { "name": "@waku/message-hash", - "version": "0.1.11", + "version": "0.1.12", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/utils": "0.0.15" + "@waku/utils": "0.0.16" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", @@ -36733,7 +36731,7 @@ "@types/debug": "^4.1.12", "@types/mocha": "^10.0.6", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.22", + "@waku/interfaces": "0.0.23", "chai": "^4.3.10", "cspell": "^8.6.1", "fast-check": "^3.15.1", @@ -36801,15 +36799,15 @@ }, "packages/relay": { "name": "@waku/relay", - "version": "0.0.10", + "version": "0.0.11", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-gossipsub": "^12.0.0", "@noble/hashes": "^1.3.2", - "@waku/core": "0.0.27", - "@waku/interfaces": "0.0.22", + "@waku/core": "0.0.28", + "@waku/interfaces": "0.0.23", "@waku/proto": "0.0.6", - "@waku/utils": "0.0.15", + "@waku/utils": "0.0.16", "chai": "^4.3.10", "debug": "^4.3.4", "fast-check": "^3.15.1" @@ -36842,7 +36840,7 @@ }, "packages/sdk": { "name": "@waku/sdk", - "version": "0.0.23", + "version": "0.0.24", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "^14.1.0", @@ -36852,11 +36850,12 @@ "@libp2p/ping": "^1.0.12", "@libp2p/websockets": "^8.0.11", "@noble/hashes": "^1.3.3", - "@waku/core": "0.0.27", + "@waku/core": "0.0.28", "@waku/discovery": "0.0.1", - "@waku/interfaces": "0.0.22", - "@waku/relay": "0.0.10", - "@waku/utils": "0.0.15", + "@waku/interfaces": "0.0.23", + "@waku/proto": "^0.0.6", + "@waku/relay": "0.0.11", + "@waku/utils": "0.0.16", "libp2p": "^1.1.2" }, "devDependencies": { @@ -36876,10 +36875,11 @@ }, "peerDependencies": { "@libp2p/bootstrap": "^10", - "@waku/core": "0.0.27", - "@waku/interfaces": "0.0.22", - "@waku/relay": "0.0.10", - "@waku/utils": "0.0.15" + "@waku/core": "0.0.28", + "@waku/interfaces": "0.0.23", + "@waku/message-hash": "^0.1.12", + "@waku/relay": "0.0.11", + "@waku/utils": "0.0.16" }, "peerDependenciesMeta": { "@libp2p/bootstrap": { @@ -36940,11 +36940,11 @@ }, "packages/utils": { "name": "@waku/utils", - "version": "0.0.15", + "version": "0.0.16", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/interfaces": "0.0.22", + "@waku/interfaces": "0.0.23", "chai": "^4.3.10", "debug": "^4.3.4", "uint8arrays": "^5.0.1" diff --git a/packages/core/package.json b/packages/core/package.json index 806512a6e..8fa55b9c7 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -75,7 +75,6 @@ "@libp2p/ping": "^1.0.12", "@waku/enr": "^0.0.22", "@waku/interfaces": "0.0.23", - "@waku/message-hash": "^0.1.12", "@waku/proto": "0.0.6", "@waku/utils": "0.0.16", "debug": "^4.3.4", @@ -111,7 +110,6 @@ "libp2p": "^1.1.2", "@waku/enr": "^0.0.21", "@waku/interfaces": "0.0.22", - "@waku/message-hash": "^0.1.11", "@waku/proto": "0.0.6", "@waku/utils": "0.0.15" }, diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f6a4ca2ea..f71ac2568 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -7,7 +7,7 @@ export type { export * as message from "./lib/message/index.js"; export * as waku_filter from "./lib/filter/index.js"; -export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; +export { FilterCore, FilterCodecs } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js"; diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 8936ef27b..790a1953d 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -1,30 +1,13 @@ -import { Stream } from "@libp2p/interface"; import type { Peer } from "@libp2p/interface"; import type { IncomingStreamData } from "@libp2p/interface-internal"; import type { - Callback, ContentTopic, - IAsyncIterator, - IDecodedMessage, - IDecoder, - IFilter, - IProtoMessage, - IReceiver, + IBaseProtocolCore, Libp2p, ProtocolCreateOptions, - PubsubTopic, - SingleShardInfo, - Unsubscribe + PubsubTopic } from "@waku/interfaces"; -import { DefaultPubsubTopic } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; -import { - ensurePubsubTopicIsConfigured, - groupByContentTopic, - singleShardInfoToPubsubTopic, - toAsyncIterator -} from "@waku/utils"; import { Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -40,329 +23,20 @@ import { const log = new Logger("filter:v2"); -type SubscriptionCallback = { - decoders: IDecoder[]; - callback: Callback; -}; - export const FilterCodecs = { SUBSCRIBE: "/vac/waku/filter-subscribe/2.0.0-beta1", PUSH: "/vac/waku/filter-push/2.0.0-beta1" }; -/** - * A subscription object refers to a subscription to a given pubsub topic. - */ -class Subscription { - readonly peers: Peer[]; - private readonly pubsubTopic: PubsubTopic; - private newStream: (peer: Peer) => Promise; - readonly receivedMessagesHashStr: string[] = []; - - private subscriptionCallbacks: Map< - ContentTopic, - SubscriptionCallback - >; - +export class FilterCore extends BaseProtocol implements IBaseProtocolCore { constructor( - pubsubTopic: PubsubTopic, - remotePeers: Peer[], - newStream: (peer: Peer) => Promise + private handleIncomingMessage: ( + pubsubTopic: PubsubTopic, + wakuMessage: WakuMessage + ) => Promise, + libp2p: Libp2p, + options?: ProtocolCreateOptions ) { - this.peers = remotePeers; - this.pubsubTopic = pubsubTopic; - this.newStream = newStream; - this.subscriptionCallbacks = new Map(); - } - - async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback - ): Promise { - const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; - - // check that all decoders are configured for the same pubsub topic as this subscription - decodersArray.forEach((decoder) => { - if (decoder.pubsubTopic !== this.pubsubTopic) { - throw new Error( - `Pubsub topic not configured: decoder is configured for pubsub topic ${decoder.pubsubTopic} but this subscription is for pubsub topic ${this.pubsubTopic}. Please create a new Subscription for the different pubsub topic.` - ); - } - }); - - const decodersGroupedByCT = groupByContentTopic(decodersArray); - const contentTopics = Array.from(decodersGroupedByCT.keys()); - - const promises = this.peers.map(async (peer) => { - const stream = await this.newStream(peer); - - const request = FilterSubscribeRpc.createSubscribeRequest( - this.pubsubTopic, - contentTopics - ); - - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); - - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` - ); - } - - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); - - if (statusCode < 200 || statusCode >= 300) { - throw new Error( - `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` - ); - } - - log.info( - "Subscribed to peer ", - peer.id.toString(), - "for content topics", - contentTopics - ); - } catch (e) { - throw new Error( - "Error subscribing to peer: " + - peer.id.toString() + - " for content topics: " + - contentTopics + - ": " + - e - ); - } - }); - - const results = await Promise.allSettled(promises); - - this.handleErrors(results, "subscribe"); - - // Save the callback functions by content topics so they - // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) - // is called for those content topics - decodersGroupedByCT.forEach((decoders, contentTopic) => { - // Cast the type because a given `subscriptionCallbacks` map may hold - // Decoder that decode to different implementations of `IDecodedMessage` - const subscriptionCallback = { - decoders, - callback - } as unknown as SubscriptionCallback; - - // The callback and decoder may override previous values, this is on - // purpose as the user may call `subscribe` to refresh the subscription - this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); - }); - } - - async unsubscribe(contentTopics: ContentTopic[]): Promise { - const promises = this.peers.map(async (peer) => { - const stream = await this.newStream(peer); - const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( - this.pubsubTopic, - contentTopics - ); - - try { - await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); - } catch (error) { - throw new Error("Error unsubscribing: " + error); - } - - contentTopics.forEach((contentTopic: string) => { - this.subscriptionCallbacks.delete(contentTopic); - }); - }); - - const results = await Promise.allSettled(promises); - - this.handleErrors(results, "unsubscribe"); - } - - async ping(): Promise { - const promises = this.peers.map(async (peer) => { - const stream = await this.newStream(peer); - - const request = FilterSubscribeRpc.createSubscriberPingRequest(); - - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); - - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` - ); - } - - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); - - if (statusCode < 200 || statusCode >= 300) { - throw new Error( - `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` - ); - } - log.info(`Ping successful for peer ${peer.id.toString()}`); - } catch (error) { - log.error("Error pinging: ", error); - throw error; // Rethrow the actual error instead of wrapping it - } - }); - - const results = await Promise.allSettled(promises); - - this.handleErrors(results, "ping"); - } - - async unsubscribeAll(): Promise { - const promises = this.peers.map(async (peer) => { - const stream = await this.newStream(peer); - - const request = FilterSubscribeRpc.createUnsubscribeAllRequest( - this.pubsubTopic - ); - - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); - - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` - ); - } - - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); - - if (statusCode < 200 || statusCode >= 300) { - throw new Error( - `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` - ); - } - - this.subscriptionCallbacks.clear(); - log.info( - `Unsubscribed from all content topics for pubsub topic ${this.pubsubTopic}` - ); - } catch (error) { - throw new Error( - "Error unsubscribing from all content topics: " + error - ); - } - }); - - const results = await Promise.allSettled(promises); - - this.handleErrors(results, "unsubscribeAll"); - } - - async processMessage(message: WakuMessage): Promise { - const hashedMessageStr = messageHashStr( - this.pubsubTopic, - message as IProtoMessage - ); - if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { - log.info("Message already received, skipping"); - return; - } - this.receivedMessagesHashStr.push(hashedMessageStr); - - const { contentTopic } = message; - const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); - if (!subscriptionCallback) { - log.error("No subscription callback available for ", contentTopic); - return; - } - log.info( - "Processing message with content topic ", - contentTopic, - " on pubsub topic ", - this.pubsubTopic - ); - await pushMessage(subscriptionCallback, this.pubsubTopic, message); - } - - // Filter out only the rejected promises and extract & handle their reasons - private handleErrors( - results: PromiseSettledResult[], - type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" - ): void { - const errors = results - .filter( - (result): result is PromiseRejectedResult => - result.status === "rejected" - ) - .map((rejectedResult) => rejectedResult.reason); - - if (errors.length === this.peers.length) { - const errorCounts = new Map(); - // TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952 - errors.forEach((error) => { - const message = error instanceof Error ? error.message : String(error); - errorCounts.set(message, (errorCounts.get(message) || 0) + 1); - }); - - const uniqueErrorMessages = Array.from( - errorCounts, - ([message, count]) => `${message} (occurred ${count} times)` - ).join(", "); - throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`); - } else if (errors.length > 0) { - // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) - log.warn( - `Some ${type} failed. These will be refreshed with new peers`, - errors - ); - } else { - log.info(`${type} successful for all peers`); - } - } -} - -const DEFAULT_NUM_PEERS = 3; - -class Filter extends BaseProtocol implements IReceiver { - private activeSubscriptions = new Map(); - - private getActiveSubscription( - pubsubTopic: PubsubTopic - ): Subscription | undefined { - return this.activeSubscriptions.get(pubsubTopic); - } - - private setActiveSubscription( - pubsubTopic: PubsubTopic, - subscription: Subscription - ): Subscription { - this.activeSubscriptions.set(pubsubTopic, subscription); - return subscription; - } - - //TODO: Remove when FilterCore and FilterSDK are introduced - private readonly numPeersToUse: number; - - constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super( FilterCodecs.SUBSCRIBE, libp2p.components, @@ -371,92 +45,9 @@ class Filter extends BaseProtocol implements IReceiver { options ); - this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS; - libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); }); - - this.activeSubscriptions = new Map(); - } - - /** - * Creates a new subscription to the given pubsub topic. - * The subscription is made to multiple peers for decentralization. - * @param pubsubTopicShardInfo The pubsub topic to subscribe to. - * @returns The subscription object. - */ - async createSubscription( - pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic - ): Promise { - const pubsubTopic = - typeof pubsubTopicShardInfo == "string" - ? pubsubTopicShardInfo - : singleShardInfoToPubsubTopic(pubsubTopicShardInfo); - - ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); - - const peers = await this.getPeers({ - maxBootstrapPeers: 1, - numPeers: this.numPeersToUse - }); - if (peers.length === 0) { - throw new Error("No peer found to initiate subscription."); - } - - log.info( - `Creating filter subscription with ${peers.length} peers: `, - peers.map((peer) => peer.id.toString()) - ); - - const subscription = - this.getActiveSubscription(pubsubTopic) ?? - this.setActiveSubscription( - pubsubTopic, - new Subscription(pubsubTopic, peers, this.getStream.bind(this)) - ); - - return subscription; - } - - public toSubscriptionIterator( - decoders: IDecoder | IDecoder[] - ): Promise> { - return toAsyncIterator(this, decoders); - } - - /** - * This method is used to satisfy the `IReceiver` interface. - * - * @hidden - * - * @param decoders The decoders to use for the subscription. - * @param callback The callback function to use for the subscription. - * @param opts Optional protocol options for the subscription. - * - * @returns A Promise that resolves to a function that unsubscribes from the subscription. - * - * @remarks - * This method should not be used directly. - * Instead, use `createSubscription` to create a new subscription. - */ - async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback - ): Promise { - const subscription = await this.createSubscription(); - - await subscription.subscribe(decoders, callback); - - const contentTopics = Array.from( - groupByContentTopic( - Array.isArray(decoders) ? decoders : [decoders] - ).keys() - ); - - return async () => { - await subscription.unsubscribe(contentTopics); - }; } private onRequest(streamData: IncomingStreamData): void { @@ -480,16 +71,7 @@ class Filter extends BaseProtocol implements IReceiver { return; } - const subscription = this.getActiveSubscription(pubsubTopic); - - if (!subscription) { - log.error( - `No subscription locally registered for topic ${pubsubTopic}` - ); - return; - } - - await subscription.processMessage(wakuMessage); + await this.handleIncomingMessage(pubsubTopic, wakuMessage); } }).then( () => { @@ -503,38 +85,118 @@ class Filter extends BaseProtocol implements IReceiver { log.error("Error decoding message", e); } } -} -export function wakuFilter( - init: ProtocolCreateOptions = { pubsubTopics: [] } -): (libp2p: Libp2p) => IFilter { - return (libp2p: Libp2p) => new Filter(libp2p, init); -} + async subscribe( + pubsubTopic: PubsubTopic, + peer: Peer, + contentTopics: ContentTopic[] + ): Promise { + const stream = await this.getStream(peer); -async function pushMessage( - subscriptionCallback: SubscriptionCallback, - pubsubTopic: PubsubTopic, - message: WakuMessage -): Promise { - const { decoders, callback } = subscriptionCallback; - - const { contentTopic } = message; - if (!contentTopic) { - log.warn("Message has no content topic, skipping"); - return; - } - - try { - const decodePromises = decoders.map((dec) => - dec - .fromProtoObj(pubsubTopic, message as IProtoMessage) - .then((decoded) => decoded || Promise.reject("Decoding failed")) + const request = FilterSubscribeRpc.createSubscribeRequest( + pubsubTopic, + contentTopics ); - const decodedMessage = await Promise.any(decodePromises); + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); - await callback(decodedMessage); - } catch (e) { - log.error("Error decoding message", e); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } + } + + async unsubscribe( + pubsubTopic: PubsubTopic, + peer: Peer, + contentTopics: ContentTopic[] + ): Promise { + const stream = await this.getStream(peer); + const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( + pubsubTopic, + contentTopics + ); + + await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); + } + + async unsubscribeAll(pubsubTopic: PubsubTopic, peer: Peer): Promise { + const stream = await this.getStream(peer); + + const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic); + + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } + } + + async ping(peer: Peer): Promise { + const stream = await this.getStream(peer); + + const request = FilterSubscribeRpc.createSubscriberPingRequest(); + + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } + log.info(`Ping successful for peer ${peer.id.toString()}`); + } catch (error) { + log.error("Error pinging: ", error); + throw error; // Rethrow the actual error instead of wrapping it + } } } diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index ac812cb55..809264a64 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -70,7 +70,7 @@ export async function waitForRemotePeer( if (!waku.filter) throw new Error("Cannot wait for Filter peer: protocol not mounted"); promises.push( - waitForConnectedPeer(waku.filter, waku.libp2p.services.metadata) + waitForConnectedPeer(waku.filter.protocol, waku.libp2p.services.metadata) ); } diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 76139170b..1565ec482 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -2,7 +2,11 @@ import type { PeerId } from "@libp2p/interface"; import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js"; import type { ContentTopic, PubsubTopic } from "./misc.js"; -import type { Callback, IBaseProtocolCore } from "./protocols.js"; +import type { + Callback, + IBaseProtocolCore, + IBaseProtocolSDK +} from "./protocols.js"; import type { IReceiver } from "./receiver.js"; export type ContentFilter = { @@ -22,8 +26,10 @@ export interface IFilterSubscription { unsubscribeAll(): Promise; } -export type IFilter = IReceiver & - IBaseProtocolCore & { +export type IFilter = IReceiver & IBaseProtocolCore; + +export type IFilterSDK = IReceiver & + IBaseProtocolSDK & { protocol: IBaseProtocolCore } & { createSubscription( pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic, peerId?: PeerId diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 2b03db79b..a14680c4a 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -2,7 +2,7 @@ import type { PeerId, Stream } from "@libp2p/interface"; import type { Multiaddr } from "@multiformats/multiaddr"; import { IConnectionManager } from "./connection_manager.js"; -import type { IFilter } from "./filter.js"; +import type { IFilterSDK } from "./filter.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPushSDK } from "./light_push.js"; import { Protocols } from "./protocols.js"; @@ -13,7 +13,7 @@ export interface Waku { libp2p: Libp2p; relay?: IRelay; store?: IStoreSDK; - filter?: IFilter; + filter?: IFilterSDK; lightPush?: ILightPushSDK; connectionManager: IConnectionManager; @@ -32,7 +32,7 @@ export interface Waku { export interface LightNode extends Waku { relay: undefined; store: IStoreSDK; - filter: IFilter; + filter: IFilterSDK; lightPush: ILightPushSDK; } @@ -46,6 +46,6 @@ export interface RelayNode extends Waku { export interface FullNode extends Waku { relay: IRelay; store: IStoreSDK; - filter: IFilter; + filter: IFilterSDK; lightPush: ILightPushSDK; } diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 9f3d93f8d..ba9189839 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -71,6 +71,7 @@ "@waku/core": "0.0.28", "@waku/discovery": "0.0.1", "@waku/interfaces": "0.0.23", + "@waku/proto": "^0.0.6", "@waku/relay": "0.0.11", "@waku/utils": "0.0.16", "libp2p": "^1.1.2" @@ -89,10 +90,11 @@ }, "peerDependencies": { "@libp2p/bootstrap": "^10", - "@waku/core": "0.0.27", - "@waku/interfaces": "0.0.22", - "@waku/relay": "0.0.10", - "@waku/utils": "0.0.15" + "@waku/core": "0.0.28", + "@waku/interfaces": "0.0.23", + "@waku/message-hash": "^0.1.12", + "@waku/relay": "0.0.11", + "@waku/utils": "0.0.16" }, "peerDependenciesMeta": { "@waku/interfaces": { diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index c263ef731..8fc6b3f64 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -13,6 +13,7 @@ export * from "./waku.js"; export { createLightNode, createNode } from "./light-node/index.js"; export { wakuLightPush } from "./protocols/light_push.js"; +export { wakuFilter } from "./protocols/filter.js"; export { wakuStore } from "./protocols/store.js"; export * as waku from "@waku/core"; diff --git a/packages/sdk/src/light-node/index.ts b/packages/sdk/src/light-node/index.ts index 754f1a0f4..c42c2e923 100644 --- a/packages/sdk/src/light-node/index.ts +++ b/packages/sdk/src/light-node/index.ts @@ -1,6 +1,6 @@ -import { wakuFilter } from "@waku/core"; import { type Libp2pComponents, type LightNode } from "@waku/interfaces"; +import { wakuFilter } from "../protocols/filter.js"; import { wakuLightPush } from "../protocols/light_push.js"; import { wakuStore } from "../protocols/store.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 42dfa8a3b..6de20f5bf 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,4 +1,4 @@ -import { IBaseProtocolSDK } from ".."; +import { IBaseProtocolSDK } from "@waku/interfaces"; interface Options { numPeersToUse?: number; diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts new file mode 100644 index 000000000..83883f5ca --- /dev/null +++ b/packages/sdk/src/protocols/filter.ts @@ -0,0 +1,351 @@ +import type { Peer } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import { + type Callback, + ContentTopic, + DefaultPubsubTopic, + type IAsyncIterator, + type IDecodedMessage, + type IDecoder, + type IFilterSDK, + IProtoMessage, + type Libp2p, + type ProtocolCreateOptions, + type PubsubTopic, + type SingleShardInfo, + type Unsubscribe +} from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; +import { WakuMessage } from "@waku/proto"; +import { + ensurePubsubTopicIsConfigured, + groupByContentTopic, + Logger, + singleShardInfoToPubsubTopic, + toAsyncIterator +} from "@waku/utils"; + +import { BaseProtocolSDK } from "./base_protocol"; + +type SubscriptionCallback = { + decoders: IDecoder[]; + callback: Callback; +}; + +const log = new Logger("sdk:filter"); + +export class SubscriptionManager { + private readonly pubsubTopic: PubsubTopic; + readonly peers: Peer[]; + readonly receivedMessagesHashStr: string[] = []; + + private subscriptionCallbacks: Map< + ContentTopic, + SubscriptionCallback + >; + + constructor( + pubsubTopic: PubsubTopic, + remotePeers: Peer[], + private protocol: FilterCore + ) { + this.peers = remotePeers; + this.pubsubTopic = pubsubTopic; + this.subscriptionCallbacks = new Map(); + } + + async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback + ): Promise { + const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; + + // check that all decoders are configured for the same pubsub topic as this subscription + decodersArray.forEach((decoder) => { + if (decoder.pubsubTopic !== this.pubsubTopic) { + throw new Error( + `Pubsub topic not configured: decoder is configured for pubsub topic ${decoder.pubsubTopic} but this subscription is for pubsub topic ${this.pubsubTopic}. Please create a new Subscription for the different pubsub topic.` + ); + } + }); + + const decodersGroupedByCT = groupByContentTopic(decodersArray); + const contentTopics = Array.from(decodersGroupedByCT.keys()); + + const promises = this.peers.map(async (peer) => { + await this.protocol.subscribe(this.pubsubTopic, peer, contentTopics); + }); + + const results = await Promise.allSettled(promises); + + this.handleErrors(results, "subscribe"); + + // Save the callback functions by content topics so they + // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) + // is called for those content topics + decodersGroupedByCT.forEach((decoders, contentTopic) => { + // Cast the type because a given `subscriptionCallbacks` map may hold + // Decoder that decode to different implementations of `IDecodedMessage` + const subscriptionCallback = { + decoders, + callback + } as unknown as SubscriptionCallback; + + // The callback and decoder may override previous values, this is on + // purpose as the user may call `subscribe` to refresh the subscription + this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); + }); + } + + async unsubscribe(contentTopics: ContentTopic[]): Promise { + const promises = this.peers.map(async (peer) => { + await this.protocol.unsubscribe(this.pubsubTopic, peer, contentTopics); + + contentTopics.forEach((contentTopic: string) => { + this.subscriptionCallbacks.delete(contentTopic); + }); + }); + + const results = await Promise.allSettled(promises); + + this.handleErrors(results, "unsubscribe"); + } + + async ping(): Promise { + const promises = this.peers.map(async (peer) => { + await this.protocol.ping(peer); + }); + + const results = await Promise.allSettled(promises); + + this.handleErrors(results, "ping"); + } + + async unsubscribeAll(): Promise { + const promises = this.peers.map(async (peer) => { + await this.protocol.unsubscribeAll(this.pubsubTopic, peer); + }); + + const results = await Promise.allSettled(promises); + + this.subscriptionCallbacks.clear(); + + this.handleErrors(results, "unsubscribeAll"); + } + + async processIncomingMessage(message: WakuMessage): Promise { + const hashedMessageStr = messageHashStr( + this.pubsubTopic, + message as IProtoMessage + ); + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + log.info("Message already received, skipping"); + return; + } + this.receivedMessagesHashStr.push(hashedMessageStr); + + const { contentTopic } = message; + const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); + if (!subscriptionCallback) { + log.error("No subscription callback available for ", contentTopic); + return; + } + log.info( + "Processing message with content topic ", + contentTopic, + " on pubsub topic ", + this.pubsubTopic + ); + await pushMessage(subscriptionCallback, this.pubsubTopic, message); + } + + // Filter out only the rejected promises and extract & handle their reasons + private handleErrors( + results: PromiseSettledResult[], + type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" + ): void { + const errors = results + .filter( + (result): result is PromiseRejectedResult => + result.status === "rejected" + ) + .map((rejectedResult) => rejectedResult.reason); + + if (errors.length === this.peers.length) { + const errorCounts = new Map(); + // TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952 + errors.forEach((error) => { + const message = error instanceof Error ? error.message : String(error); + errorCounts.set(message, (errorCounts.get(message) || 0) + 1); + }); + + const uniqueErrorMessages = Array.from( + errorCounts, + ([message, count]) => `${message} (occurred ${count} times)` + ).join(", "); + throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`); + } else if (errors.length > 0) { + // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) + log.warn( + `Some ${type} failed. These will be refreshed with new peers`, + errors + ); + } else { + log.info(`${type} successful for all peers`); + } + } +} + +class FilterSDK extends BaseProtocolSDK implements IFilterSDK { + public readonly protocol: FilterCore; + + private activeSubscriptions = new Map(); + private async handleIncomingMessage( + pubsubTopic: PubsubTopic, + wakuMessage: WakuMessage + ): Promise { + const subscription = this.getActiveSubscription(pubsubTopic); + if (!subscription) { + log.error(`No subscription locally registered for topic ${pubsubTopic}`); + return; + } + + await subscription.processIncomingMessage(wakuMessage); + } + + constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { + super({ numPeersToUse: options?.numPeersToUse }); + this.protocol = new FilterCore( + this.handleIncomingMessage.bind(this), + libp2p, + options + ); + this.activeSubscriptions = new Map(); + } + + //TODO: move to SubscriptionManager + private getActiveSubscription( + pubsubTopic: PubsubTopic + ): SubscriptionManager | undefined { + return this.activeSubscriptions.get(pubsubTopic); + } + + private setActiveSubscription( + pubsubTopic: PubsubTopic, + subscription: SubscriptionManager + ): SubscriptionManager { + this.activeSubscriptions.set(pubsubTopic, subscription); + return subscription; + } + + /** + * Creates a new subscription to the given pubsub topic. + * The subscription is made to multiple peers for decentralization. + * @param pubsubTopicShardInfo The pubsub topic to subscribe to. + * @returns The subscription object. + */ + async createSubscription( + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic + ): Promise { + const pubsubTopic = + typeof pubsubTopicShardInfo == "string" + ? pubsubTopicShardInfo + : singleShardInfoToPubsubTopic(pubsubTopicShardInfo); + + ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); + + const peers = await this.protocol.getPeers(); + if (peers.length === 0) { + throw new Error("No peer found to initiate subscription."); + } + + log.info( + `Creating filter subscription with ${peers.length} peers: `, + peers.map((peer) => peer.id.toString()) + ); + + const subscription = + this.getActiveSubscription(pubsubTopic) ?? + this.setActiveSubscription( + pubsubTopic, + new SubscriptionManager(pubsubTopic, peers, this.protocol) + ); + + return subscription; + } + + //TODO: remove this dependency on IReceiver + /** + * This method is used to satisfy the `IReceiver` interface. + * + * @hidden + * + * @param decoders The decoders to use for the subscription. + * @param callback The callback function to use for the subscription. + * @param opts Optional protocol options for the subscription. + * + * @returns A Promise that resolves to a function that unsubscribes from the subscription. + * + * @remarks + * This method should not be used directly. + * Instead, use `createSubscription` to create a new subscription. + */ + async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback + ): Promise { + const subscription = await this.createSubscription(); + + await subscription.subscribe(decoders, callback); + + const contentTopics = Array.from( + groupByContentTopic( + Array.isArray(decoders) ? decoders : [decoders] + ).keys() + ); + + return async () => { + await subscription.unsubscribe(contentTopics); + }; + } + + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[] + ): Promise> { + return toAsyncIterator(this, decoders); + } +} + +export function wakuFilter( + init: ProtocolCreateOptions +): (libp2p: Libp2p) => IFilterSDK { + return (libp2p: Libp2p) => new FilterSDK(libp2p, init); +} + +async function pushMessage( + subscriptionCallback: SubscriptionCallback, + pubsubTopic: PubsubTopic, + message: WakuMessage +): Promise { + const { decoders, callback } = subscriptionCallback; + + const { contentTopic } = message; + if (!contentTopic) { + log.warn("Message has no content topic, skipping"); + return; + } + + try { + const decodePromises = decoders.map((dec) => + dec + .fromProtoObj(pubsubTopic, message as IProtoMessage) + .then((decoded) => decoded || Promise.reject("Decoding failed")) + ); + + const decodedMessage = await Promise.any(decodePromises); + + await callback(decodedMessage); + } catch (e) { + log.error("Error decoding message", e); + } +} diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index 0584878d4..6f61c7198 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -14,14 +14,13 @@ import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; import { BaseProtocolSDK } from "./base_protocol.js"; -const DEFAULT_NUM_PEERS = 3; const log = new Logger("sdk:light-push"); -export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { +class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { public readonly protocol: LightPushCore; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super({ numPeersToUse: options?.numPeersToUse ?? DEFAULT_NUM_PEERS }); + super({ numPeersToUse: options?.numPeersToUse }); this.protocol = new LightPushCore(libp2p, options); } diff --git a/packages/sdk/src/relay-node/index.ts b/packages/sdk/src/relay-node/index.ts index 4d7d040cc..2ae2f6b40 100644 --- a/packages/sdk/src/relay-node/index.ts +++ b/packages/sdk/src/relay-node/index.ts @@ -1,7 +1,7 @@ -import { wakuFilter } from "@waku/core"; import { type FullNode, type RelayNode } from "@waku/interfaces"; import { RelayCreateOptions, wakuRelay } from "@waku/relay"; +import { wakuFilter } from "../protocols/filter.js"; import { wakuLightPush } from "../protocols/light_push.js"; import { wakuStore } from "../protocols/store.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index dac557999..a68a31761 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -4,7 +4,7 @@ import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; import { ConnectionManager, DecodedMessage } from "@waku/core"; import type { Callback, - IFilter, + IFilterSDK, IFilterSubscription, ILightPushSDK, IRelay, @@ -56,7 +56,7 @@ export class WakuNode implements Waku { public libp2p: Libp2p; public relay?: IRelay; public store?: IStoreSDK; - public filter?: IFilter; + public filter?: IFilterSDK; public lightPush?: ILightPushSDK; public connectionManager: ConnectionManager; public readonly pubsubTopics: PubsubTopic[]; @@ -66,7 +66,7 @@ export class WakuNode implements Waku { libp2p: Libp2p, store?: (libp2p: Libp2p) => IStoreSDK, lightPush?: (libp2p: Libp2p) => ILightPushSDK, - filter?: (libp2p: Libp2p) => IFilter, + filter?: (libp2p: Libp2p) => IFilterSDK, relay?: (libp2p: Libp2p) => IRelay ) { if (options.pubsubTopics.length == 0) { @@ -166,7 +166,7 @@ export class WakuNode implements Waku { } if (_protocols.includes(Protocols.Filter)) { if (this.filter) { - codecs.push(this.filter.multicodec); + codecs.push(this.filter.protocol.multicodec); } else { log.error( "Filter codec not included in dial codec: protocol not mounted locally" diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts index 31991ac48..006340a87 100644 --- a/packages/tests/tests/sdk/content_topic.spec.ts +++ b/packages/tests/tests/sdk/content_topic.spec.ts @@ -1,4 +1,4 @@ -import { wakuFilter } from "@waku/core"; +import { wakuFilter } from "@waku/sdk"; import { bytesToUtf8, createEncoder, diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index d86406aee..c345ea1d4 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -200,7 +200,7 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitForRemotePeer(waku2, [Protocols.Filter]); - const peers = (await waku2.filter.connectedPeers()).map((peer) => + const peers = (await waku2.filter.protocol.connectedPeers()).map((peer) => peer.id.toString() ); @@ -232,8 +232,8 @@ describe("Wait for remote peer", function () { Protocols.LightPush ]); - const filterPeers = (await waku2.filter.connectedPeers()).map((peer) => - peer.id.toString() + const filterPeers = (await waku2.filter.protocol.connectedPeers()).map( + (peer) => peer.id.toString() ); const storePeers = (await waku2.store.protocol.connectedPeers()).map( (peer) => peer.id.toString()