diff --git a/package-lock.json b/package-lock.json index e75157726c..7137f28674 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,10 +12,10 @@ "packages/message-hash", "packages/enr", "packages/core", - "packages/relay", "packages/discovery", "packages/message-encryption", "packages/sdk", + "packages/relay", "packages/tests", "packages/browser-tests", "packages/build-utils", @@ -2653,9 +2653,9 @@ } }, "node_modules/@chainsafe/libp2p-noise": { - "version": "15.1.0", - "resolved": "https://registry.npmjs.org/@chainsafe/libp2p-noise/-/libp2p-noise-15.1.0.tgz", - "integrity": "sha512-84S/Uk7ZZRYpSlE5d1odMmQTl5g5Da8etgcf4QI7arTAHkvBs3il7yGHIPt4wV4EV0qIMG+XjdGIYihRXfI2/w==", + "version": "15.1.2", + "resolved": "https://registry.npmjs.org/@chainsafe/libp2p-noise/-/libp2p-noise-15.1.2.tgz", + "integrity": "sha512-o6mqsAbaCBucgdLOOHtkwtGVL1c8RLKhlTnHQY+leazY+thiE1Sm6qPCwsTHKQnWii1q5hDVI2Q0l9QgYi5v4Q==", "license": "Apache-2.0 OR MIT", "dependencies": { "@chainsafe/as-chacha20poly1305": "^0.1.0", @@ -2663,7 +2663,7 @@ "@libp2p/crypto": "^4.0.0", "@libp2p/interface": "^1.5.0", "@libp2p/peer-id": "^4.0.0", - "@noble/ciphers": "^0.5.1", + "@noble/ciphers": "^0.6.0", "@noble/curves": "^1.1.0", "@noble/hashes": "^1.3.1", "it-length-prefixed": "^9.0.1", @@ -5884,9 +5884,9 @@ } }, "node_modules/@noble/ciphers": { - "version": "0.5.3", - "resolved": "https://registry.npmjs.org/@noble/ciphers/-/ciphers-0.5.3.tgz", - "integrity": "sha512-B0+6IIHiqEs3BPMT0hcRmHvEj2QHOLu+uwt+tqDDeVd0oyVzh7BPrDcPjRnV1PV/5LaknXJJQvOuRGR0zQJz+w==", + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/@noble/ciphers/-/ciphers-0.6.0.tgz", + "integrity": "sha512-mIbq/R9QXk5/cTfESb1OKtyFnk7oc1Om/8onA1158K9/OZUQFDEVy55jVTato+xmp3XX6F6Qh0zz0Nc1AxAlRQ==", "license": "MIT", "funding": { "url": "https://paulmillr.com/funding/" @@ -39077,14 +39077,14 @@ }, "packages/core": { "name": "@waku/core", - "version": "0.0.30", + "version": "0.0.31", "license": "MIT OR Apache-2.0", "dependencies": { "@libp2p/ping": "^1.1.2", - "@waku/enr": "^0.0.24", - "@waku/interfaces": "0.0.25", - "@waku/proto": "0.0.7", - "@waku/utils": "0.0.18", + "@waku/enr": "^0.0.25", + "@waku/interfaces": "0.0.26", + "@waku/proto": "0.0.8", + "@waku/utils": "0.0.19", "debug": "^4.3.4", "it-all": "^3.0.4", "it-length-prefixed": "^9.0.4", @@ -39148,14 +39148,14 @@ }, "packages/discovery": { "name": "@waku/discovery", - "version": "0.0.3", + "version": "0.0.4", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/core": "0.0.30", - "@waku/enr": "0.0.24", - "@waku/interfaces": "0.0.25", - "@waku/proto": "^0.0.7", - "@waku/utils": "0.0.18", + "@waku/core": "0.0.31", + "@waku/enr": "0.0.25", + "@waku/interfaces": "0.0.26", + "@waku/proto": "^0.0.8", + "@waku/utils": "0.0.19", "debug": "^4.3.4", "dns-query": "^0.11.2", "hi-base32": "^0.5.1", @@ -39202,7 +39202,7 @@ }, "packages/enr": { "name": "@waku/enr", - "version": "0.0.24", + "version": "0.0.25", "license": "MIT OR Apache-2.0", "dependencies": { "@ethersproject/rlp": "^5.7.0", @@ -39210,7 +39210,7 @@ "@libp2p/peer-id": "^4.2.1", "@multiformats/multiaddr": "^12.0.0", "@noble/secp256k1": "^1.7.1", - "@waku/utils": "0.0.18", + "@waku/utils": "0.0.19", "debug": "^4.3.4", "js-sha3": "^0.9.2" }, @@ -39222,7 +39222,7 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.6", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.25", + "@waku/interfaces": "0.0.26", "chai": "^4.3.10", "cspell": "^8.6.1", "fast-check": "^3.19.0", @@ -39251,10 +39251,10 @@ }, "packages/interfaces": { "name": "@waku/interfaces", - "version": "0.0.25", + "version": "0.0.26", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/proto": "^0.0.7" + "@waku/proto": "^0.0.8" }, "devDependencies": { "@chainsafe/libp2p-gossipsub": "^13.1.0", @@ -39269,14 +39269,14 @@ }, "packages/message-encryption": { "name": "@waku/message-encryption", - "version": "0.0.28", + "version": "0.0.29", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/secp256k1": "^1.7.1", - "@waku/core": "0.0.30", - "@waku/interfaces": "0.0.25", - "@waku/proto": "0.0.7", - "@waku/utils": "0.0.18", + "@waku/core": "0.0.31", + "@waku/interfaces": "0.0.26", + "@waku/proto": "0.0.8", + "@waku/utils": "0.0.19", "debug": "^4.3.4", "js-sha3": "^0.9.2", "uint8arrays": "^5.0.1" @@ -39313,11 +39313,11 @@ }, "packages/message-hash": { "name": "@waku/message-hash", - "version": "0.1.14", + "version": "0.1.15", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/utils": "0.0.18" + "@waku/utils": "0.0.19" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", @@ -39327,7 +39327,7 @@ "@types/debug": "^4.1.12", "@types/mocha": "^10.0.6", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.25", + "@waku/interfaces": "0.0.26", "chai": "^4.3.10", "cspell": "^8.6.1", "fast-check": "^3.19.0", @@ -39353,7 +39353,7 @@ }, "packages/proto": { "name": "@waku/proto", - "version": "0.0.7", + "version": "0.0.8", "license": "MIT OR Apache-2.0", "dependencies": { "protons-runtime": "^5.4.0" @@ -39395,15 +39395,16 @@ }, "packages/relay": { "name": "@waku/relay", - "version": "0.0.13", + "version": "0.0.14", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-gossipsub": "^13.1.0", "@noble/hashes": "^1.3.2", - "@waku/core": "0.0.30", - "@waku/interfaces": "0.0.25", - "@waku/proto": "0.0.7", - "@waku/utils": "0.0.18", + "@waku/core": "0.0.31", + "@waku/interfaces": "0.0.26", + "@waku/proto": "0.0.8", + "@waku/sdk": "0.0.27", + "@waku/utils": "0.0.19", "chai": "^4.3.10", "debug": "^4.3.4", "fast-check": "^3.19.0" @@ -39436,7 +39437,7 @@ }, "packages/sdk": { "name": "@waku/sdk", - "version": "0.0.26", + "version": "0.0.27", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "^15.1.0", @@ -39446,16 +39447,14 @@ "@libp2p/ping": "^1.1.2", "@libp2p/websockets": "^8.1.4", "@noble/hashes": "^1.3.3", - "@waku/core": "0.0.30", - "@waku/discovery": "0.0.3", - "@waku/interfaces": "0.0.25", - "@waku/proto": "^0.0.7", - "@waku/relay": "0.0.13", - "@waku/utils": "0.0.18", + "@waku/core": "0.0.31", + "@waku/discovery": "0.0.4", + "@waku/interfaces": "0.0.26", + "@waku/proto": "^0.0.8", + "@waku/utils": "0.0.19", "libp2p": "^1.8.1" }, "devDependencies": { - "@chainsafe/libp2p-gossipsub": "^13.1.0", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", @@ -39474,7 +39473,6 @@ "@waku/core": "0.0.30", "@waku/interfaces": "0.0.25", "@waku/message-hash": "^0.1.14", - "@waku/relay": "0.0.13", "@waku/utils": "0.0.18" }, "peerDependenciesMeta": { @@ -39517,6 +39515,7 @@ "@types/tail": "^2.2.3", "@waku/discovery": "*", "@waku/message-encryption": "*", + "@waku/relay": "*", "@waku/sdk": "*", "allure-commandline": "^2.27.0", "allure-mocha": "^2.9.2", @@ -39536,11 +39535,11 @@ }, "packages/utils": { "name": "@waku/utils", - "version": "0.0.18", + "version": "0.0.19", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/interfaces": "0.0.25", + "@waku/interfaces": "0.0.26", "chai": "^4.3.10", "debug": "^4.3.4", "uint8arrays": "^5.0.1" diff --git a/package.json b/package.json index 17d9323b77..05b2caeb79 100644 --- a/package.json +++ b/package.json @@ -9,10 +9,10 @@ "packages/message-hash", "packages/enr", "packages/core", - "packages/relay", "packages/discovery", "packages/message-encryption", "packages/sdk", + "packages/relay", "packages/tests", "packages/browser-tests", "packages/build-utils", diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index c1eb1c3a0e..c5659c4a92 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -45,10 +45,3 @@ export interface RelayNode extends Waku { filter: undefined; lightPush: undefined; } - -export interface FullNode extends Waku { - relay: IRelay; - store: IStoreSDK; - filter: IFilterSDK; - lightPush: ILightPushSDK; -} diff --git a/packages/relay/package.json b/packages/relay/package.json index 5320f03cd2..6cc75b20f6 100644 --- a/packages/relay/package.json +++ b/packages/relay/package.json @@ -52,6 +52,7 @@ "@chainsafe/libp2p-gossipsub": "^13.1.0", "@noble/hashes": "^1.3.2", "@waku/core": "0.0.31", + "@waku/sdk": "0.0.27", "@waku/interfaces": "0.0.26", "@waku/proto": "0.0.8", "@waku/utils": "0.0.19", diff --git a/packages/relay/src/create.ts b/packages/relay/src/create.ts new file mode 100644 index 0000000000..448598162b --- /dev/null +++ b/packages/relay/src/create.ts @@ -0,0 +1,44 @@ +import type { RelayNode } from "@waku/interfaces"; +import { + createLibp2pAndUpdateOptions, + CreateWakuNodeOptions, + WakuNode, + WakuOptions +} from "@waku/sdk"; + +import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js"; + +/** + * Create a Waku node that uses Waku Relay to send and receive messages, + * enabling some privacy preserving properties. + * * @remarks + * This function creates a Relay Node using the Waku Relay protocol. + * While it is technically possible to use this function in a browser environment, + * it is not recommended due to potential performance issues and limited browser capabilities. + * If you are developing a browser-based application, consider alternative approaches like creating a Light Node + * or use this function with caution. + */ +export async function createRelayNode( + options: CreateWakuNodeOptions & Partial +): Promise { + options = { + ...options, + libp2p: { + ...options.libp2p, + services: { + pubsub: wakuGossipSub(options) + } + } + }; + + const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); + const relay = wakuRelay(pubsubTopics || [])(libp2p); + + return new WakuNode( + pubsubTopics, + options as WakuOptions, + libp2p, + {}, + relay + ) as RelayNode; +} diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 6724ae344c..be614733b9 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -1,318 +1,2 @@ -import { - GossipSub, - GossipSubComponents, - GossipsubMessage, - GossipsubOpts -} from "@chainsafe/libp2p-gossipsub"; -import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; -import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; -import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface"; -import { sha256 } from "@noble/hashes/sha256"; -import { - ActiveSubscriptions, - Callback, - IAsyncIterator, - IDecodedMessage, - IDecoder, - IEncoder, - IMessage, - IRelay, - Libp2p, - ProtocolCreateOptions, - ProtocolError, - PubsubTopic, - SDKProtocolResult -} from "@waku/interfaces"; -import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; -import { pushOrInitMapSet } from "@waku/utils"; -import { Logger } from "@waku/utils"; - -import { RelayCodecs } from "./constants.js"; -import { messageValidator } from "./message_validator.js"; -import { TopicOnlyDecoder } from "./topic_only_message.js"; - -const log = new Logger("relay"); - -export type Observer = { - decoder: IDecoder; - callback: Callback; -}; - -export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; -export type ContentTopic = string; - -/** - * Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/). - * Throws if libp2p.pubsub does not support Waku Relay - */ -class Relay implements IRelay { - public readonly pubsubTopics: Set; - private defaultDecoder: IDecoder; - - public static multicodec: string = RelayCodecs[0]; - public readonly gossipSub: GossipSub; - - /** - * observers called when receiving new message. - * Observers under key `""` are always called. - */ - private observers: Map>>; - - public constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) { - if (!this.isRelayPubsub(libp2p.services.pubsub)) { - throw Error( - `Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}` - ); - } - - this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubsubTopics = new Set(pubsubTopics); - - if (this.gossipSub.isStarted()) { - this.subscribeToAllTopics(); - } - - this.observers = new Map(); - - // TODO: User might want to decide what decoder should be used (e.g. for RLN) - this.defaultDecoder = new TopicOnlyDecoder(pubsubTopics[0]); - } - - /** - * Mounts the gossipsub protocol onto the libp2p node - * and subscribes to all the topics. - * - * @override - * @returns {void} - */ - public async start(): Promise { - if (this.gossipSub.isStarted()) { - throw Error("GossipSub already started."); - } - - await this.gossipSub.start(); - this.subscribeToAllTopics(); - } - - /** - * Send Waku message. - */ - public async send( - encoder: IEncoder, - message: IMessage - ): Promise { - const successes: PeerId[] = []; - - const { pubsubTopic } = encoder; - if (!this.pubsubTopics.has(pubsubTopic)) { - log.error("Failed to send waku relay: topic not configured"); - return { - successes, - failures: [ - { - error: ProtocolError.TOPIC_NOT_CONFIGURED - } - ] - }; - } - - const msg = await encoder.toWire(message); - if (!msg) { - log.error("Failed to encode message, aborting publish"); - return { - successes, - failures: [ - { - error: ProtocolError.ENCODE_FAILED - } - ] - }; - } - - if (!isWireSizeUnderCap(msg)) { - log.error("Failed to send waku relay: message is bigger that 1MB"); - return { - successes, - failures: [ - { - error: ProtocolError.SIZE_TOO_BIG - } - ] - }; - } - - const { recipients } = await this.gossipSub.publish(pubsubTopic, msg); - return { - successes: recipients, - failures: [] - }; - } - - public subscribeWithUnsubscribe( - decoders: IDecoder | IDecoder[], - callback: Callback - ): () => void { - const observers: Array<[PubsubTopic, Observer]> = []; - - for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) { - const { pubsubTopic } = decoder; - const ctObs: Map>> = this.observers.get( - pubsubTopic - ) ?? new Map(); - const observer = { pubsubTopic, decoder, callback }; - pushOrInitMapSet(ctObs, decoder.contentTopic, observer); - - this.observers.set(pubsubTopic, ctObs); - observers.push([pubsubTopic, observer]); - } - - return () => { - this.removeObservers(observers); - }; - } - - public subscribe = this.subscribeWithUnsubscribe; - - private removeObservers( - observers: Array<[PubsubTopic, Observer]> - ): void { - for (const [pubsubTopic, observer] of observers) { - const ctObs = this.observers.get(pubsubTopic); - if (!ctObs) continue; - - const contentTopic = observer.decoder.contentTopic; - const _obs = ctObs.get(contentTopic); - if (!_obs) continue; - - _obs.delete(observer); - ctObs.set(contentTopic, _obs); - this.observers.set(pubsubTopic, ctObs); - } - } - - public toSubscriptionIterator( - decoders: IDecoder | IDecoder[] - ): Promise> { - return toAsyncIterator(this, decoders); - } - - public getActiveSubscriptions(): ActiveSubscriptions { - const map = new Map(); - for (const pubsubTopic of this.pubsubTopics) { - map.set(pubsubTopic, Array.from(this.observers.keys())); - } - return map; - } - - public getMeshPeers(topic?: TopicStr): PeerIdStr[] { - // if no TopicStr is provided - returns empty array - return this.gossipSub.getMeshPeers(topic || ""); - } - - private subscribeToAllTopics(): void { - for (const pubsubTopic of this.pubsubTopics) { - this.gossipSubSubscribe(pubsubTopic); - } - } - - private async processIncomingMessage( - pubsubTopic: string, - bytes: Uint8Array - ): Promise { - const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes); - if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { - log.warn("Message does not have a content topic, skipping"); - return; - } - - // Retrieve the map of content topics for the given pubsubTopic - const contentTopicMap = this.observers.get(pubsubTopic); - if (!contentTopicMap) { - return; - } - - // Retrieve the set of observers for the given contentTopic - const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set< - Observer - >; - if (!observers) { - return; - } - - await Promise.all( - Array.from(observers).map(({ decoder, callback }) => { - return (async () => { - try { - const protoMsg = await decoder.fromWireToProtoObj(bytes); - if (!protoMsg) { - log.error( - "Internal error: message previously decoded failed on 2nd pass." - ); - return; - } - const msg = await decoder.fromProtoObj(pubsubTopic, protoMsg); - if (msg) { - await callback(msg); - } else { - log.error( - "Failed to decode messages on", - topicOnlyMsg.contentTopic - ); - } - } catch (error) { - log.error("Error while decoding message:", error); - } - })(); - }) - ); - } - - /** - * Subscribe to a pubsub topic and start emitting Waku messages to observers. - * - * @override - */ - private gossipSubSubscribe(pubsubTopic: string): void { - this.gossipSub.addEventListener( - "gossipsub:message", - (event: CustomEvent) => { - if (event.detail.msg.topic !== pubsubTopic) return; - - this.processIncomingMessage( - event.detail.msg.topic, - event.detail.msg.data - ).catch((e) => log.error("Failed to process incoming message", e)); - } - ); - - this.gossipSub.topicValidators.set(pubsubTopic, messageValidator); - this.gossipSub.subscribe(pubsubTopic); - } - - private isRelayPubsub(pubsub: Libp2pPubsub | undefined): boolean { - return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false; - } -} - -export function wakuRelay( - pubsubTopics: PubsubTopic[] -): (libp2p: Libp2p) => IRelay { - return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics); -} - -export function wakuGossipSub( - init: Partial = {} -): (components: GossipSubComponents) => GossipSub { - return (components: GossipSubComponents) => { - init = { - ...init, - msgIdFn: ({ data }) => sha256(data), - // Ensure that no signature is included nor expected in the messages. - globalSignaturePolicy: SignaturePolicy.StrictNoSign, - fallbackToFloodsub: false - }; - const pubsub = new GossipSub(components, init); - pubsub.multicodecs = RelayCodecs; - return pubsub; - }; -} +export * from "./relay.js"; +export * from "./create.js"; diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts new file mode 100644 index 0000000000..6724ae344c --- /dev/null +++ b/packages/relay/src/relay.ts @@ -0,0 +1,318 @@ +import { + GossipSub, + GossipSubComponents, + GossipsubMessage, + GossipsubOpts +} from "@chainsafe/libp2p-gossipsub"; +import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; +import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; +import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface"; +import { sha256 } from "@noble/hashes/sha256"; +import { + ActiveSubscriptions, + Callback, + IAsyncIterator, + IDecodedMessage, + IDecoder, + IEncoder, + IMessage, + IRelay, + Libp2p, + ProtocolCreateOptions, + ProtocolError, + PubsubTopic, + SDKProtocolResult +} from "@waku/interfaces"; +import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; +import { pushOrInitMapSet } from "@waku/utils"; +import { Logger } from "@waku/utils"; + +import { RelayCodecs } from "./constants.js"; +import { messageValidator } from "./message_validator.js"; +import { TopicOnlyDecoder } from "./topic_only_message.js"; + +const log = new Logger("relay"); + +export type Observer = { + decoder: IDecoder; + callback: Callback; +}; + +export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts; +export type ContentTopic = string; + +/** + * Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/). + * Throws if libp2p.pubsub does not support Waku Relay + */ +class Relay implements IRelay { + public readonly pubsubTopics: Set; + private defaultDecoder: IDecoder; + + public static multicodec: string = RelayCodecs[0]; + public readonly gossipSub: GossipSub; + + /** + * observers called when receiving new message. + * Observers under key `""` are always called. + */ + private observers: Map>>; + + public constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) { + if (!this.isRelayPubsub(libp2p.services.pubsub)) { + throw Error( + `Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}` + ); + } + + this.gossipSub = libp2p.services.pubsub as GossipSub; + this.pubsubTopics = new Set(pubsubTopics); + + if (this.gossipSub.isStarted()) { + this.subscribeToAllTopics(); + } + + this.observers = new Map(); + + // TODO: User might want to decide what decoder should be used (e.g. for RLN) + this.defaultDecoder = new TopicOnlyDecoder(pubsubTopics[0]); + } + + /** + * Mounts the gossipsub protocol onto the libp2p node + * and subscribes to all the topics. + * + * @override + * @returns {void} + */ + public async start(): Promise { + if (this.gossipSub.isStarted()) { + throw Error("GossipSub already started."); + } + + await this.gossipSub.start(); + this.subscribeToAllTopics(); + } + + /** + * Send Waku message. + */ + public async send( + encoder: IEncoder, + message: IMessage + ): Promise { + const successes: PeerId[] = []; + + const { pubsubTopic } = encoder; + if (!this.pubsubTopics.has(pubsubTopic)) { + log.error("Failed to send waku relay: topic not configured"); + return { + successes, + failures: [ + { + error: ProtocolError.TOPIC_NOT_CONFIGURED + } + ] + }; + } + + const msg = await encoder.toWire(message); + if (!msg) { + log.error("Failed to encode message, aborting publish"); + return { + successes, + failures: [ + { + error: ProtocolError.ENCODE_FAILED + } + ] + }; + } + + if (!isWireSizeUnderCap(msg)) { + log.error("Failed to send waku relay: message is bigger that 1MB"); + return { + successes, + failures: [ + { + error: ProtocolError.SIZE_TOO_BIG + } + ] + }; + } + + const { recipients } = await this.gossipSub.publish(pubsubTopic, msg); + return { + successes: recipients, + failures: [] + }; + } + + public subscribeWithUnsubscribe( + decoders: IDecoder | IDecoder[], + callback: Callback + ): () => void { + const observers: Array<[PubsubTopic, Observer]> = []; + + for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) { + const { pubsubTopic } = decoder; + const ctObs: Map>> = this.observers.get( + pubsubTopic + ) ?? new Map(); + const observer = { pubsubTopic, decoder, callback }; + pushOrInitMapSet(ctObs, decoder.contentTopic, observer); + + this.observers.set(pubsubTopic, ctObs); + observers.push([pubsubTopic, observer]); + } + + return () => { + this.removeObservers(observers); + }; + } + + public subscribe = this.subscribeWithUnsubscribe; + + private removeObservers( + observers: Array<[PubsubTopic, Observer]> + ): void { + for (const [pubsubTopic, observer] of observers) { + const ctObs = this.observers.get(pubsubTopic); + if (!ctObs) continue; + + const contentTopic = observer.decoder.contentTopic; + const _obs = ctObs.get(contentTopic); + if (!_obs) continue; + + _obs.delete(observer); + ctObs.set(contentTopic, _obs); + this.observers.set(pubsubTopic, ctObs); + } + } + + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[] + ): Promise> { + return toAsyncIterator(this, decoders); + } + + public getActiveSubscriptions(): ActiveSubscriptions { + const map = new Map(); + for (const pubsubTopic of this.pubsubTopics) { + map.set(pubsubTopic, Array.from(this.observers.keys())); + } + return map; + } + + public getMeshPeers(topic?: TopicStr): PeerIdStr[] { + // if no TopicStr is provided - returns empty array + return this.gossipSub.getMeshPeers(topic || ""); + } + + private subscribeToAllTopics(): void { + for (const pubsubTopic of this.pubsubTopics) { + this.gossipSubSubscribe(pubsubTopic); + } + } + + private async processIncomingMessage( + pubsubTopic: string, + bytes: Uint8Array + ): Promise { + const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes); + if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) { + log.warn("Message does not have a content topic, skipping"); + return; + } + + // Retrieve the map of content topics for the given pubsubTopic + const contentTopicMap = this.observers.get(pubsubTopic); + if (!contentTopicMap) { + return; + } + + // Retrieve the set of observers for the given contentTopic + const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set< + Observer + >; + if (!observers) { + return; + } + + await Promise.all( + Array.from(observers).map(({ decoder, callback }) => { + return (async () => { + try { + const protoMsg = await decoder.fromWireToProtoObj(bytes); + if (!protoMsg) { + log.error( + "Internal error: message previously decoded failed on 2nd pass." + ); + return; + } + const msg = await decoder.fromProtoObj(pubsubTopic, protoMsg); + if (msg) { + await callback(msg); + } else { + log.error( + "Failed to decode messages on", + topicOnlyMsg.contentTopic + ); + } + } catch (error) { + log.error("Error while decoding message:", error); + } + })(); + }) + ); + } + + /** + * Subscribe to a pubsub topic and start emitting Waku messages to observers. + * + * @override + */ + private gossipSubSubscribe(pubsubTopic: string): void { + this.gossipSub.addEventListener( + "gossipsub:message", + (event: CustomEvent) => { + if (event.detail.msg.topic !== pubsubTopic) return; + + this.processIncomingMessage( + event.detail.msg.topic, + event.detail.msg.data + ).catch((e) => log.error("Failed to process incoming message", e)); + } + ); + + this.gossipSub.topicValidators.set(pubsubTopic, messageValidator); + this.gossipSub.subscribe(pubsubTopic); + } + + private isRelayPubsub(pubsub: Libp2pPubsub | undefined): boolean { + return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false; + } +} + +export function wakuRelay( + pubsubTopics: PubsubTopic[] +): (libp2p: Libp2p) => IRelay { + return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics); +} + +export function wakuGossipSub( + init: Partial = {} +): (components: GossipSubComponents) => GossipSub { + return (components: GossipSubComponents) => { + init = { + ...init, + msgIdFn: ({ data }) => sha256(data), + // Ensure that no signature is included nor expected in the messages. + globalSignaturePolicy: SignaturePolicy.StrictNoSign, + fallbackToFloodsub: false + }; + const pubsub = new GossipSub(components, init); + pubsub.multicodecs = RelayCodecs; + return pubsub; + }; +} diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 8a80d97787..4241df2229 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -8,10 +8,6 @@ ".": { "types": "./dist/index.d.ts", "import": "./dist/index.js" - }, - "./relay": { - "types": "./dist/relay-node/index.d.ts", - "import": "./dist/relay-node/index.js" } }, "typesVersions": { @@ -72,12 +68,10 @@ "@waku/discovery": "0.0.4", "@waku/interfaces": "0.0.26", "@waku/proto": "^0.0.8", - "@waku/relay": "0.0.14", "@waku/utils": "0.0.19", "libp2p": "^1.8.1" }, "devDependencies": { - "@chainsafe/libp2p-gossipsub": "^13.1.0", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", @@ -93,7 +87,6 @@ "@waku/core": "0.0.30", "@waku/interfaces": "0.0.25", "@waku/message-hash": "^0.1.14", - "@waku/relay": "0.0.13", "@waku/utils": "0.0.18" }, "peerDependenciesMeta": { diff --git a/packages/sdk/src/create/index.ts b/packages/sdk/src/create/index.ts index ea0315451a..3c4d7ef825 100644 --- a/packages/sdk/src/create/index.ts +++ b/packages/sdk/src/create/index.ts @@ -1,2 +1,2 @@ export { createLightNode } from "./create.js"; -export { defaultLibp2p } from "./libp2p.js"; +export { defaultLibp2p, createLibp2pAndUpdateOptions } from "./libp2p.js"; diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index 84ce5efc5f..bbeb7a3c0a 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -1,4 +1,3 @@ -import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import { noise } from "@chainsafe/libp2p-noise"; import { bootstrap } from "@libp2p/bootstrap"; import { identify } from "@libp2p/identify"; @@ -15,7 +14,6 @@ import { type Libp2pComponents, PubsubTopic } from "@waku/interfaces"; -import { wakuGossipSub } from "@waku/relay"; import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { createLibp2p } from "libp2p"; @@ -27,10 +25,6 @@ import { import { defaultPeerDiscoveries } from "./discovery.js"; -type PubsubService = { - pubsub?: (components: Libp2pComponents) => GossipSub; -}; - type MetadataService = { metadata?: (components: Libp2pComponents) => IMetadata; }; @@ -39,7 +33,6 @@ const log = new Logger("sdk:create"); export async function defaultLibp2p( pubsubTopics: PubsubTopic[], - wakuGossipSub?: PubsubService["pubsub"], options?: Partial, userAgent?: string ): Promise { @@ -56,10 +49,6 @@ export async function defaultLibp2p( /* eslint-enable no-console */ } - const pubsubService: PubsubService = wakuGossipSub - ? { pubsub: wakuGossipSub } - : {}; - const metadataService: MetadataService = pubsubTopics ? { metadata: wakuMetadata(pubsubTopics) } : {}; @@ -86,7 +75,6 @@ export async function defaultLibp2p( options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams }), ...metadataService, - ...pubsubService, ...options?.services } }) as any as Libp2p; // TODO: make libp2p include it; @@ -116,7 +104,6 @@ export async function createLibp2pAndUpdateOptions( const libp2p = await defaultLibp2p( pubsubTopics, - wakuGossipSub(options), libp2pOptions, options?.userAgent ); diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 2bbdfc0d30..d0ce50a9b3 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -9,7 +9,11 @@ export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; export * from "./waku.js"; -export { createLightNode, defaultLibp2p } from "./create/index.js"; +export { + createLightNode, + defaultLibp2p, + createLibp2pAndUpdateOptions +} from "./create/index.js"; export { wakuLightPush } from "./protocols/light_push.js"; export { wakuFilter } from "./protocols/filter.js"; export { wakuStore } from "./protocols/store.js"; @@ -17,4 +21,3 @@ export { wakuStore } from "./protocols/store.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; export * from "@waku/interfaces"; -export * as relay from "@waku/relay"; diff --git a/packages/sdk/src/relay-node/index.ts b/packages/sdk/src/relay-node/index.ts deleted file mode 100644 index 4a5de91494..0000000000 --- a/packages/sdk/src/relay-node/index.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { type FullNode, type RelayNode } from "@waku/interfaces"; -import { RelayCreateOptions } from "@waku/relay"; - -import { createLibp2pAndUpdateOptions } from "../create/libp2p.js"; -import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; - -/** - * Create a Waku node that uses Waku Relay to send and receive messages, - * enabling some privacy preserving properties. - * * @remarks - * This function creates a Relay Node using the Waku Relay protocol. - * While it is technically possible to use this function in a browser environment, - * it is not recommended due to potential performance issues and limited browser capabilities. - * If you are developing a browser-based application, consider alternative approaches like creating a Light Node - * or use this function with caution. - */ -export async function createRelayNode( - options: CreateWakuNodeOptions & Partial -): Promise { - const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); - - return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, { - relay: true - }) as RelayNode; -} - -/** - * Create a Waku node that uses all Waku protocols. - * - * This helper is not recommended except if: - * - you are interfacing with nwaku v0.11 or below - * - you are doing some form of testing - * - * If you are building a full node, it is recommended to use - * [nwaku](github.com/status-im/nwaku) and its JSON RPC API or wip REST API. - * - * @see https://github.com/status-im/nwaku/issues/1085 - * @internal - */ -export async function createFullNode( - options: CreateWakuNodeOptions & Partial -): Promise { - const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); - - return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, { - filter: true, - lightpush: true, - relay: true, - store: true - }) as FullNode; -} diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index ae79a71849..a53e9143b6 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -14,7 +14,6 @@ import type { Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { wakuRelay } from "@waku/relay"; import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter.js"; @@ -57,7 +56,6 @@ type ProtocolsEnabled = { filter?: boolean; lightpush?: boolean; store?: boolean; - relay?: boolean; }; export class WakuNode implements Waku { @@ -73,15 +71,16 @@ export class WakuNode implements Waku { public readonly pubsubTopics: PubsubTopic[], options: WakuOptions, libp2p: Libp2p, - protocolsEnabled: ProtocolsEnabled + protocolsEnabled: ProtocolsEnabled, + relay?: IRelay ) { + this.relay = relay; this.libp2p = libp2p; protocolsEnabled = { filter: false, lightpush: false, store: false, - relay: false, ...protocolsEnabled }; @@ -118,11 +117,6 @@ export class WakuNode implements Waku { this.filter = filter(libp2p); } - if (protocolsEnabled.relay) { - const relay = wakuRelay(this.pubsubTopics); - this.relay = relay(libp2p); - } - log.info( "Waku node created", peerId, diff --git a/packages/tests/package.json b/packages/tests/package.json index a59a2ffb3f..12d88f395a 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -77,6 +77,7 @@ "@waku/discovery": "*", "@waku/message-encryption": "*", "@waku/sdk": "*", + "@waku/relay": "*", "allure-commandline": "^2.27.0", "allure-mocha": "^2.9.2", "chai": "^4.3.10", diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index d5509d7f32..56af7bd7a0 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -4,8 +4,8 @@ import { ProtocolCreateOptions, Protocols } from "@waku/interfaces"; +import { createRelayNode } from "@waku/relay"; import { createLightNode, WakuNode } from "@waku/sdk"; -import { createRelayNode } from "@waku/sdk/relay"; import { derivePubsubTopicsFromNetworkConfig, Logger, diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index b9545f1ebe..eec669bffd 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -205,7 +205,7 @@ export class ServiceNode { } /** - * Calls nwaku REST API "/admin/v1/peers" to check for known peers + * Calls nwaku REST API "/admin/v1/peers" to check for known peers. Be aware that it doesn't recognize js-waku as a node * @throws */ public async peers(): Promise { diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 46423f5cdb..76c2ba5762 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -65,13 +65,12 @@ export async function runMultipleNodes( ); const wakuConnections = waku.libp2p.getConnections(); - const nodePeers = await node.peers(); - if (wakuConnections.length < 1 || nodePeers.length < 1) { - throw new Error( - `Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}` - ); + if (wakuConnections.length < 1) { + throw new Error(`Expected at least 1 connection for js-waku.`); } + + await node.waitForLog(waku.libp2p.peerId.toString(), 100); } await waitForConnections(numServiceNodes, waku); diff --git a/packages/tests/tests/connection-mananger/connection_state.spec.ts b/packages/tests/tests/connection-mananger/connection_state.spec.ts index 2764c36dc8..0226df443f 100644 --- a/packages/tests/tests/connection-mananger/connection_state.spec.ts +++ b/packages/tests/tests/connection-mananger/connection_state.spec.ts @@ -1,7 +1,7 @@ import { Multiaddr } from "@multiformats/multiaddr"; import { EConnectionStateEvents, LightNode, Protocols } from "@waku/interfaces"; +import { createRelayNode } from "@waku/relay"; import { createLightNode } from "@waku/sdk"; -import { createRelayNode } from "@waku/sdk/relay"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/enr.node.spec.ts b/packages/tests/tests/enr.node.spec.ts index 383a8be946..18ca31a04e 100644 --- a/packages/tests/tests/enr.node.spec.ts +++ b/packages/tests/tests/enr.node.spec.ts @@ -2,7 +2,7 @@ import { waitForRemotePeer } from "@waku/core"; import { EnrDecoder } from "@waku/enr"; import type { RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { createRelayNode } from "@waku/sdk/relay"; +import { createRelayNode } from "@waku/relay"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 1976b5e075..34999295cb 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -107,13 +107,12 @@ export async function runMultipleNodes( await node.ensureSubscriptions(pubsubTopics); const wakuConnections = waku.libp2p.getConnections(); - const nodePeers = await node.peers(); - if (wakuConnections.length < 1 || nodePeers.length < 1) { - throw new Error( - `Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}` - ); + if (wakuConnections.length < 1) { + throw new Error(`Expected at least 1 connection for js-waku.`); } + + await node.waitForLog(waku.libp2p.peerId.toString(), 100); } await waitForConnections(numServiceNodes, waku); diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts index e071cf830e..b9cc3eeb70 100644 --- a/packages/tests/tests/relay/interop.node.spec.ts +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface"; import { DecodedMessage, waitForRemotePeer } from "@waku/core"; import { Protocols, RelayNode } from "@waku/interfaces"; -import { createRelayNode } from "@waku/sdk/relay"; +import { createRelayNode } from "@waku/relay"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index 58dbe9e553..91915f76a0 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -11,7 +11,7 @@ import { SingleShardInfo } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { createRelayNode } from "@waku/sdk/relay"; +import { createRelayNode } from "@waku/relay"; import { contentTopicToPubsubTopic, pubsubTopicToSingleShardInfo, diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts index f2072830d0..c147ed3595 100644 --- a/packages/tests/tests/relay/subscribe.node.spec.ts +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -1,6 +1,6 @@ import { createDecoder, createEncoder } from "@waku/core"; import { RelayNode } from "@waku/interfaces"; -import { createRelayNode } from "@waku/sdk/relay"; +import { createRelayNode } from "@waku/relay"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/relay/utils.ts b/packages/tests/tests/relay/utils.ts index e6ab379eee..090d6221e6 100644 --- a/packages/tests/tests/relay/utils.ts +++ b/packages/tests/tests/relay/utils.ts @@ -5,7 +5,7 @@ import { RelayNode, ShardInfo } from "@waku/interfaces"; -import { createRelayNode } from "@waku/sdk/relay"; +import { createRelayNode } from "@waku/relay"; import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; import { Context } from "mocha"; diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 248b9509eb..efaa92b030 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -114,14 +114,13 @@ export async function startAndConnectLightNode( await waitForRemotePeer(waku, [Protocols.Store]); const wakuConnections = waku.libp2p.getConnections(); - const nwakuPeers = await instance.peers(); - if (wakuConnections.length < 1 || nwakuPeers.length < 1) { - throw new Error( - `Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}` - ); + if (wakuConnections.length < 1) { + throw new Error(`Expected at least 1 connection for js-waku.`); } + await instance.waitForLog(waku.libp2p.peerId.toString(), 100); + log.info("Waku node created"); return waku; } diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 8fab283d75..90393bf2f3 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -1,8 +1,8 @@ import { waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; +import { createRelayNode } from "@waku/relay"; import { createLightNode } from "@waku/sdk"; -import { createRelayNode } from "@waku/sdk/relay"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 531ebddc0a..3881971e1e 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -8,12 +8,12 @@ import { createDecoder, createEncoder } from "@waku/message-encryption/symmetric"; +import { createRelayNode } from "@waku/relay"; import { createLightNode, createEncoder as createPlainEncoder, DefaultUserAgent } from "@waku/sdk"; -import { createRelayNode } from "@waku/sdk/relay"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai";