chore: remove relay from sdk package (#2040)

* remove full node

* remove relay from sdk

* comment node counter check

* try using logline

* up comment
This commit is contained in:
Sasha 2024-08-31 15:18:51 +02:00 committed by GitHub
parent f9361bf774
commit 5cfe9327b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 446 additions and 483 deletions

97
package-lock.json generated
View File

@ -12,10 +12,10 @@
"packages/message-hash",
"packages/enr",
"packages/core",
"packages/relay",
"packages/discovery",
"packages/message-encryption",
"packages/sdk",
"packages/relay",
"packages/tests",
"packages/browser-tests",
"packages/build-utils",
@ -2653,9 +2653,9 @@
}
},
"node_modules/@chainsafe/libp2p-noise": {
"version": "15.1.0",
"resolved": "https://registry.npmjs.org/@chainsafe/libp2p-noise/-/libp2p-noise-15.1.0.tgz",
"integrity": "sha512-84S/Uk7ZZRYpSlE5d1odMmQTl5g5Da8etgcf4QI7arTAHkvBs3il7yGHIPt4wV4EV0qIMG+XjdGIYihRXfI2/w==",
"version": "15.1.2",
"resolved": "https://registry.npmjs.org/@chainsafe/libp2p-noise/-/libp2p-noise-15.1.2.tgz",
"integrity": "sha512-o6mqsAbaCBucgdLOOHtkwtGVL1c8RLKhlTnHQY+leazY+thiE1Sm6qPCwsTHKQnWii1q5hDVI2Q0l9QgYi5v4Q==",
"license": "Apache-2.0 OR MIT",
"dependencies": {
"@chainsafe/as-chacha20poly1305": "^0.1.0",
@ -2663,7 +2663,7 @@
"@libp2p/crypto": "^4.0.0",
"@libp2p/interface": "^1.5.0",
"@libp2p/peer-id": "^4.0.0",
"@noble/ciphers": "^0.5.1",
"@noble/ciphers": "^0.6.0",
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"it-length-prefixed": "^9.0.1",
@ -5884,9 +5884,9 @@
}
},
"node_modules/@noble/ciphers": {
"version": "0.5.3",
"resolved": "https://registry.npmjs.org/@noble/ciphers/-/ciphers-0.5.3.tgz",
"integrity": "sha512-B0+6IIHiqEs3BPMT0hcRmHvEj2QHOLu+uwt+tqDDeVd0oyVzh7BPrDcPjRnV1PV/5LaknXJJQvOuRGR0zQJz+w==",
"version": "0.6.0",
"resolved": "https://registry.npmjs.org/@noble/ciphers/-/ciphers-0.6.0.tgz",
"integrity": "sha512-mIbq/R9QXk5/cTfESb1OKtyFnk7oc1Om/8onA1158K9/OZUQFDEVy55jVTato+xmp3XX6F6Qh0zz0Nc1AxAlRQ==",
"license": "MIT",
"funding": {
"url": "https://paulmillr.com/funding/"
@ -39077,14 +39077,14 @@
},
"packages/core": {
"name": "@waku/core",
"version": "0.0.30",
"version": "0.0.31",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@libp2p/ping": "^1.1.2",
"@waku/enr": "^0.0.24",
"@waku/interfaces": "0.0.25",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.18",
"@waku/enr": "^0.0.25",
"@waku/interfaces": "0.0.26",
"@waku/proto": "0.0.8",
"@waku/utils": "0.0.19",
"debug": "^4.3.4",
"it-all": "^3.0.4",
"it-length-prefixed": "^9.0.4",
@ -39148,14 +39148,14 @@
},
"packages/discovery": {
"name": "@waku/discovery",
"version": "0.0.3",
"version": "0.0.4",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/core": "0.0.30",
"@waku/enr": "0.0.24",
"@waku/interfaces": "0.0.25",
"@waku/proto": "^0.0.7",
"@waku/utils": "0.0.18",
"@waku/core": "0.0.31",
"@waku/enr": "0.0.25",
"@waku/interfaces": "0.0.26",
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.19",
"debug": "^4.3.4",
"dns-query": "^0.11.2",
"hi-base32": "^0.5.1",
@ -39202,7 +39202,7 @@
},
"packages/enr": {
"name": "@waku/enr",
"version": "0.0.24",
"version": "0.0.25",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@ethersproject/rlp": "^5.7.0",
@ -39210,7 +39210,7 @@
"@libp2p/peer-id": "^4.2.1",
"@multiformats/multiaddr": "^12.0.0",
"@noble/secp256k1": "^1.7.1",
"@waku/utils": "0.0.18",
"@waku/utils": "0.0.19",
"debug": "^4.3.4",
"js-sha3": "^0.9.2"
},
@ -39222,7 +39222,7 @@
"@types/chai": "^4.3.11",
"@types/mocha": "^10.0.6",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.25",
"@waku/interfaces": "0.0.26",
"chai": "^4.3.10",
"cspell": "^8.6.1",
"fast-check": "^3.19.0",
@ -39251,10 +39251,10 @@
},
"packages/interfaces": {
"name": "@waku/interfaces",
"version": "0.0.25",
"version": "0.0.26",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@waku/proto": "^0.0.7"
"@waku/proto": "^0.0.8"
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^13.1.0",
@ -39269,14 +39269,14 @@
},
"packages/message-encryption": {
"name": "@waku/message-encryption",
"version": "0.0.28",
"version": "0.0.29",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/secp256k1": "^1.7.1",
"@waku/core": "0.0.30",
"@waku/interfaces": "0.0.25",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.18",
"@waku/core": "0.0.31",
"@waku/interfaces": "0.0.26",
"@waku/proto": "0.0.8",
"@waku/utils": "0.0.19",
"debug": "^4.3.4",
"js-sha3": "^0.9.2",
"uint8arrays": "^5.0.1"
@ -39313,11 +39313,11 @@
},
"packages/message-hash": {
"name": "@waku/message-hash",
"version": "0.1.14",
"version": "0.1.15",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/utils": "0.0.18"
"@waku/utils": "0.0.19"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",
@ -39327,7 +39327,7 @@
"@types/debug": "^4.1.12",
"@types/mocha": "^10.0.6",
"@waku/build-utils": "*",
"@waku/interfaces": "0.0.25",
"@waku/interfaces": "0.0.26",
"chai": "^4.3.10",
"cspell": "^8.6.1",
"fast-check": "^3.19.0",
@ -39353,7 +39353,7 @@
},
"packages/proto": {
"name": "@waku/proto",
"version": "0.0.7",
"version": "0.0.8",
"license": "MIT OR Apache-2.0",
"dependencies": {
"protons-runtime": "^5.4.0"
@ -39395,15 +39395,16 @@
},
"packages/relay": {
"name": "@waku/relay",
"version": "0.0.13",
"version": "0.0.14",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^13.1.0",
"@noble/hashes": "^1.3.2",
"@waku/core": "0.0.30",
"@waku/interfaces": "0.0.25",
"@waku/proto": "0.0.7",
"@waku/utils": "0.0.18",
"@waku/core": "0.0.31",
"@waku/interfaces": "0.0.26",
"@waku/proto": "0.0.8",
"@waku/sdk": "0.0.27",
"@waku/utils": "0.0.19",
"chai": "^4.3.10",
"debug": "^4.3.4",
"fast-check": "^3.19.0"
@ -39436,7 +39437,7 @@
},
"packages/sdk": {
"name": "@waku/sdk",
"version": "0.0.26",
"version": "0.0.27",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@chainsafe/libp2p-noise": "^15.1.0",
@ -39446,16 +39447,14 @@
"@libp2p/ping": "^1.1.2",
"@libp2p/websockets": "^8.1.4",
"@noble/hashes": "^1.3.3",
"@waku/core": "0.0.30",
"@waku/discovery": "0.0.3",
"@waku/interfaces": "0.0.25",
"@waku/proto": "^0.0.7",
"@waku/relay": "0.0.13",
"@waku/utils": "0.0.18",
"@waku/core": "0.0.31",
"@waku/discovery": "0.0.4",
"@waku/interfaces": "0.0.26",
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.19",
"libp2p": "^1.8.1"
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^13.1.0",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
@ -39474,7 +39473,6 @@
"@waku/core": "0.0.30",
"@waku/interfaces": "0.0.25",
"@waku/message-hash": "^0.1.14",
"@waku/relay": "0.0.13",
"@waku/utils": "0.0.18"
},
"peerDependenciesMeta": {
@ -39517,6 +39515,7 @@
"@types/tail": "^2.2.3",
"@waku/discovery": "*",
"@waku/message-encryption": "*",
"@waku/relay": "*",
"@waku/sdk": "*",
"allure-commandline": "^2.27.0",
"allure-mocha": "^2.9.2",
@ -39536,11 +39535,11 @@
},
"packages/utils": {
"name": "@waku/utils",
"version": "0.0.18",
"version": "0.0.19",
"license": "MIT OR Apache-2.0",
"dependencies": {
"@noble/hashes": "^1.3.2",
"@waku/interfaces": "0.0.25",
"@waku/interfaces": "0.0.26",
"chai": "^4.3.10",
"debug": "^4.3.4",
"uint8arrays": "^5.0.1"

View File

@ -9,10 +9,10 @@
"packages/message-hash",
"packages/enr",
"packages/core",
"packages/relay",
"packages/discovery",
"packages/message-encryption",
"packages/sdk",
"packages/relay",
"packages/tests",
"packages/browser-tests",
"packages/build-utils",

View File

@ -45,10 +45,3 @@ export interface RelayNode extends Waku {
filter: undefined;
lightPush: undefined;
}
export interface FullNode extends Waku {
relay: IRelay;
store: IStoreSDK;
filter: IFilterSDK;
lightPush: ILightPushSDK;
}

View File

@ -52,6 +52,7 @@
"@chainsafe/libp2p-gossipsub": "^13.1.0",
"@noble/hashes": "^1.3.2",
"@waku/core": "0.0.31",
"@waku/sdk": "0.0.27",
"@waku/interfaces": "0.0.26",
"@waku/proto": "0.0.8",
"@waku/utils": "0.0.19",

View File

@ -0,0 +1,44 @@
import type { RelayNode } from "@waku/interfaces";
import {
createLibp2pAndUpdateOptions,
CreateWakuNodeOptions,
WakuNode,
WakuOptions
} from "@waku/sdk";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "./relay.js";
/**
* Create a Waku node that uses Waku Relay to send and receive messages,
* enabling some privacy preserving properties.
* * @remarks
* This function creates a Relay Node using the Waku Relay protocol.
* While it is technically possible to use this function in a browser environment,
* it is not recommended due to potential performance issues and limited browser capabilities.
* If you are developing a browser-based application, consider alternative approaches like creating a Light Node
* or use this function with caution.
*/
export async function createRelayNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
): Promise<RelayNode> {
options = {
...options,
libp2p: {
...options.libp2p,
services: {
pubsub: wakuGossipSub(options)
}
}
};
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);
const relay = wakuRelay(pubsubTopics || [])(libp2p);
return new WakuNode(
pubsubTopics,
options as WakuOptions,
libp2p,
{},
relay
) as RelayNode;
}

View File

@ -1,318 +1,2 @@
import {
GossipSub,
GossipSubComponents,
GossipsubMessage,
GossipsubOpts
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import {
ActiveSubscriptions,
Callback,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
IRelay,
Libp2p,
ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
SDKProtocolResult
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
import { Logger } from "@waku/utils";
import { RelayCodecs } from "./constants.js";
import { messageValidator } from "./message_validator.js";
import { TopicOnlyDecoder } from "./topic_only_message.js";
const log = new Logger("relay");
export type Observer<T extends IDecodedMessage> = {
decoder: IDecoder<T>;
callback: Callback<T>;
};
export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type ContentTopic = string;
/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Throws if libp2p.pubsub does not support Waku Relay
*/
class Relay implements IRelay {
public readonly pubsubTopics: Set<PubsubTopic>;
private defaultDecoder: IDecoder<IDecodedMessage>;
public static multicodec: string = RelayCodecs[0];
public readonly gossipSub: GossipSub;
/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
public constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) {
if (!this.isRelayPubsub(libp2p.services.pubsub)) {
throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
);
}
this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = new Set(pubsubTopics);
if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
}
this.observers = new Map();
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder(pubsubTopics[0]);
}
/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to all the topics.
*
* @override
* @returns {void}
*/
public async start(): Promise<void> {
if (this.gossipSub.isStarted()) {
throw Error("GossipSub already started.");
}
await this.gossipSub.start();
this.subscribeToAllTopics();
}
/**
* Send Waku message.
*/
public async send(
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const { pubsubTopic } = encoder;
if (!this.pubsubTopics.has(pubsubTopic)) {
log.error("Failed to send waku relay: topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
]
};
}
const msg = await encoder.toWire(message);
if (!msg) {
log.error("Failed to encode message, aborting publish");
return {
successes,
failures: [
{
error: ProtocolError.ENCODE_FAILED
}
]
};
}
if (!isWireSizeUnderCap(msg)) {
log.error("Failed to send waku relay: message is bigger that 1MB");
return {
successes,
failures: [
{
error: ProtocolError.SIZE_TOO_BIG
}
]
};
}
const { recipients } = await this.gossipSub.publish(pubsubTopic, msg);
return {
successes: recipients,
failures: []
};
}
public subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observers: Array<[PubsubTopic, Observer<T>]> = [];
for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) {
const { pubsubTopic } = decoder;
const ctObs: Map<ContentTopic, Set<Observer<T>>> = this.observers.get(
pubsubTopic
) ?? new Map();
const observer = { pubsubTopic, decoder, callback };
pushOrInitMapSet(ctObs, decoder.contentTopic, observer);
this.observers.set(pubsubTopic, ctObs);
observers.push([pubsubTopic, observer]);
}
return () => {
this.removeObservers(observers);
};
}
public subscribe = this.subscribeWithUnsubscribe;
private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubsubTopic, Observer<T>]>
): void {
for (const [pubsubTopic, observer] of observers) {
const ctObs = this.observers.get(pubsubTopic);
if (!ctObs) continue;
const contentTopic = observer.decoder.contentTopic;
const _obs = ctObs.get(contentTopic);
if (!_obs) continue;
_obs.delete(observer);
ctObs.set(contentTopic, _obs);
this.observers.set(pubsubTopic, ctObs);
}
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
for (const pubsubTopic of this.pubsubTopics) {
map.set(pubsubTopic, Array.from(this.observers.keys()));
}
return map;
}
public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
// if no TopicStr is provided - returns empty array
return this.gossipSub.getMeshPeers(topic || "");
}
private subscribeToAllTopics(): void {
for (const pubsubTopic of this.pubsubTopics) {
this.gossipSubSubscribe(pubsubTopic);
}
}
private async processIncomingMessage<T extends IDecodedMessage>(
pubsubTopic: string,
bytes: Uint8Array
): Promise<void> {
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log.warn("Message does not have a content topic, skipping");
return;
}
// Retrieve the map of content topics for the given pubsubTopic
const contentTopicMap = this.observers.get(pubsubTopic);
if (!contentTopicMap) {
return;
}
// Retrieve the set of observers for the given contentTopic
const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set<
Observer<T>
>;
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(({ decoder, callback }) => {
return (async () => {
try {
const protoMsg = await decoder.fromWireToProtoObj(bytes);
if (!protoMsg) {
log.error(
"Internal error: message previously decoded failed on 2nd pass."
);
return;
}
const msg = await decoder.fromProtoObj(pubsubTopic, protoMsg);
if (msg) {
await callback(msg);
} else {
log.error(
"Failed to decode messages on",
topicOnlyMsg.contentTopic
);
}
} catch (error) {
log.error("Error while decoding message:", error);
}
})();
})
);
}
/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
* @override
*/
private gossipSubSubscribe(pubsubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubsubTopic) return;
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
}
);
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
this.gossipSub.subscribe(pubsubTopic);
}
private isRelayPubsub(pubsub: Libp2pPubsub | undefined): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false;
}
}
export function wakuRelay(
pubsubTopics: PubsubTopic[]
): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics);
}
export function wakuGossipSub(
init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => GossipSub {
return (components: GossipSubComponents) => {
init = {
...init,
msgIdFn: ({ data }) => sha256(data),
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false
};
const pubsub = new GossipSub(components, init);
pubsub.multicodecs = RelayCodecs;
return pubsub;
};
}
export * from "./relay.js";
export * from "./create.js";

318
packages/relay/src/relay.ts Normal file
View File

@ -0,0 +1,318 @@
import {
GossipSub,
GossipSubComponents,
GossipsubMessage,
GossipsubOpts
} from "@chainsafe/libp2p-gossipsub";
import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import type { PubSub as Libp2pPubsub, PeerId } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import {
ActiveSubscriptions,
Callback,
IAsyncIterator,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
IRelay,
Libp2p,
ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
SDKProtocolResult
} from "@waku/interfaces";
import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils";
import { Logger } from "@waku/utils";
import { RelayCodecs } from "./constants.js";
import { messageValidator } from "./message_validator.js";
import { TopicOnlyDecoder } from "./topic_only_message.js";
const log = new Logger("relay");
export type Observer<T extends IDecodedMessage> = {
decoder: IDecoder<T>;
callback: Callback<T>;
};
export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
export type ContentTopic = string;
/**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
* Throws if libp2p.pubsub does not support Waku Relay
*/
class Relay implements IRelay {
public readonly pubsubTopics: Set<PubsubTopic>;
private defaultDecoder: IDecoder<IDecodedMessage>;
public static multicodec: string = RelayCodecs[0];
public readonly gossipSub: GossipSub;
/**
* observers called when receiving new message.
* Observers under key `""` are always called.
*/
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
public constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) {
if (!this.isRelayPubsub(libp2p.services.pubsub)) {
throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
);
}
this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = new Set(pubsubTopics);
if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
}
this.observers = new Map();
// TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder(pubsubTopics[0]);
}
/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to all the topics.
*
* @override
* @returns {void}
*/
public async start(): Promise<void> {
if (this.gossipSub.isStarted()) {
throw Error("GossipSub already started.");
}
await this.gossipSub.start();
this.subscribeToAllTopics();
}
/**
* Send Waku message.
*/
public async send(
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
const successes: PeerId[] = [];
const { pubsubTopic } = encoder;
if (!this.pubsubTopics.has(pubsubTopic)) {
log.error("Failed to send waku relay: topic not configured");
return {
successes,
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
]
};
}
const msg = await encoder.toWire(message);
if (!msg) {
log.error("Failed to encode message, aborting publish");
return {
successes,
failures: [
{
error: ProtocolError.ENCODE_FAILED
}
]
};
}
if (!isWireSizeUnderCap(msg)) {
log.error("Failed to send waku relay: message is bigger that 1MB");
return {
successes,
failures: [
{
error: ProtocolError.SIZE_TOO_BIG
}
]
};
}
const { recipients } = await this.gossipSub.publish(pubsubTopic, msg);
return {
successes: recipients,
failures: []
};
}
public subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
const observers: Array<[PubsubTopic, Observer<T>]> = [];
for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) {
const { pubsubTopic } = decoder;
const ctObs: Map<ContentTopic, Set<Observer<T>>> = this.observers.get(
pubsubTopic
) ?? new Map();
const observer = { pubsubTopic, decoder, callback };
pushOrInitMapSet(ctObs, decoder.contentTopic, observer);
this.observers.set(pubsubTopic, ctObs);
observers.push([pubsubTopic, observer]);
}
return () => {
this.removeObservers(observers);
};
}
public subscribe = this.subscribeWithUnsubscribe;
private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubsubTopic, Observer<T>]>
): void {
for (const [pubsubTopic, observer] of observers) {
const ctObs = this.observers.get(pubsubTopic);
if (!ctObs) continue;
const contentTopic = observer.decoder.contentTopic;
const _obs = ctObs.get(contentTopic);
if (!_obs) continue;
_obs.delete(observer);
ctObs.set(contentTopic, _obs);
this.observers.set(pubsubTopic, ctObs);
}
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
public getActiveSubscriptions(): ActiveSubscriptions {
const map = new Map();
for (const pubsubTopic of this.pubsubTopics) {
map.set(pubsubTopic, Array.from(this.observers.keys()));
}
return map;
}
public getMeshPeers(topic?: TopicStr): PeerIdStr[] {
// if no TopicStr is provided - returns empty array
return this.gossipSub.getMeshPeers(topic || "");
}
private subscribeToAllTopics(): void {
for (const pubsubTopic of this.pubsubTopics) {
this.gossipSubSubscribe(pubsubTopic);
}
}
private async processIncomingMessage<T extends IDecodedMessage>(
pubsubTopic: string,
bytes: Uint8Array
): Promise<void> {
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
if (!topicOnlyMsg || !topicOnlyMsg.contentTopic) {
log.warn("Message does not have a content topic, skipping");
return;
}
// Retrieve the map of content topics for the given pubsubTopic
const contentTopicMap = this.observers.get(pubsubTopic);
if (!contentTopicMap) {
return;
}
// Retrieve the set of observers for the given contentTopic
const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set<
Observer<T>
>;
if (!observers) {
return;
}
await Promise.all(
Array.from(observers).map(({ decoder, callback }) => {
return (async () => {
try {
const protoMsg = await decoder.fromWireToProtoObj(bytes);
if (!protoMsg) {
log.error(
"Internal error: message previously decoded failed on 2nd pass."
);
return;
}
const msg = await decoder.fromProtoObj(pubsubTopic, protoMsg);
if (msg) {
await callback(msg);
} else {
log.error(
"Failed to decode messages on",
topicOnlyMsg.contentTopic
);
}
} catch (error) {
log.error("Error while decoding message:", error);
}
})();
})
);
}
/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
* @override
*/
private gossipSubSubscribe(pubsubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubsubTopic) return;
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
}
);
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
this.gossipSub.subscribe(pubsubTopic);
}
private isRelayPubsub(pubsub: Libp2pPubsub | undefined): boolean {
return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false;
}
}
export function wakuRelay(
pubsubTopics: PubsubTopic[]
): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics);
}
export function wakuGossipSub(
init: Partial<RelayCreateOptions> = {}
): (components: GossipSubComponents) => GossipSub {
return (components: GossipSubComponents) => {
init = {
...init,
msgIdFn: ({ data }) => sha256(data),
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false
};
const pubsub = new GossipSub(components, init);
pubsub.multicodecs = RelayCodecs;
return pubsub;
};
}

View File

@ -8,10 +8,6 @@
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
},
"./relay": {
"types": "./dist/relay-node/index.d.ts",
"import": "./dist/relay-node/index.js"
}
},
"typesVersions": {
@ -72,12 +68,10 @@
"@waku/discovery": "0.0.4",
"@waku/interfaces": "0.0.26",
"@waku/proto": "^0.0.8",
"@waku/relay": "0.0.14",
"@waku/utils": "0.0.19",
"libp2p": "^1.8.1"
},
"devDependencies": {
"@chainsafe/libp2p-gossipsub": "^13.1.0",
"@rollup/plugin-commonjs": "^25.0.7",
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
@ -93,7 +87,6 @@
"@waku/core": "0.0.30",
"@waku/interfaces": "0.0.25",
"@waku/message-hash": "^0.1.14",
"@waku/relay": "0.0.13",
"@waku/utils": "0.0.18"
},
"peerDependenciesMeta": {

View File

@ -1,2 +1,2 @@
export { createLightNode } from "./create.js";
export { defaultLibp2p } from "./libp2p.js";
export { defaultLibp2p, createLibp2pAndUpdateOptions } from "./libp2p.js";

View File

@ -1,4 +1,3 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import { noise } from "@chainsafe/libp2p-noise";
import { bootstrap } from "@libp2p/bootstrap";
import { identify } from "@libp2p/identify";
@ -15,7 +14,6 @@ import {
type Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { wakuGossipSub } from "@waku/relay";
import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils";
import { createLibp2p } from "libp2p";
@ -27,10 +25,6 @@ import {
import { defaultPeerDiscoveries } from "./discovery.js";
type PubsubService = {
pubsub?: (components: Libp2pComponents) => GossipSub;
};
type MetadataService = {
metadata?: (components: Libp2pComponents) => IMetadata;
};
@ -39,7 +33,6 @@ const log = new Logger("sdk:create");
export async function defaultLibp2p(
pubsubTopics: PubsubTopic[],
wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<CreateLibp2pOptions>,
userAgent?: string
): Promise<Libp2p> {
@ -56,10 +49,6 @@ export async function defaultLibp2p(
/* eslint-enable no-console */
}
const pubsubService: PubsubService = wakuGossipSub
? { pubsub: wakuGossipSub }
: {};
const metadataService: MetadataService = pubsubTopics
? { metadata: wakuMetadata(pubsubTopics) }
: {};
@ -86,7 +75,6 @@ export async function defaultLibp2p(
options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams
}),
...metadataService,
...pubsubService,
...options?.services
}
}) as any as Libp2p; // TODO: make libp2p include it;
@ -116,7 +104,6 @@ export async function createLibp2pAndUpdateOptions(
const libp2p = await defaultLibp2p(
pubsubTopics,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
);

View File

@ -9,7 +9,11 @@ export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";
export * from "./waku.js";
export { createLightNode, defaultLibp2p } from "./create/index.js";
export {
createLightNode,
defaultLibp2p,
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuStore } from "./protocols/store.js";
@ -17,4 +21,3 @@ export { wakuStore } from "./protocols/store.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";
export * from "@waku/interfaces";
export * as relay from "@waku/relay";

View File

@ -1,51 +0,0 @@
import { type FullNode, type RelayNode } from "@waku/interfaces";
import { RelayCreateOptions } from "@waku/relay";
import { createLibp2pAndUpdateOptions } from "../create/libp2p.js";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";
/**
* Create a Waku node that uses Waku Relay to send and receive messages,
* enabling some privacy preserving properties.
* * @remarks
* This function creates a Relay Node using the Waku Relay protocol.
* While it is technically possible to use this function in a browser environment,
* it is not recommended due to potential performance issues and limited browser capabilities.
* If you are developing a browser-based application, consider alternative approaches like creating a Light Node
* or use this function with caution.
*/
export async function createRelayNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
): Promise<RelayNode> {
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);
return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, {
relay: true
}) as RelayNode;
}
/**
* Create a Waku node that uses all Waku protocols.
*
* This helper is not recommended except if:
* - you are interfacing with nwaku v0.11 or below
* - you are doing some form of testing
*
* If you are building a full node, it is recommended to use
* [nwaku](github.com/status-im/nwaku) and its JSON RPC API or wip REST API.
*
* @see https://github.com/status-im/nwaku/issues/1085
* @internal
*/
export async function createFullNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
): Promise<FullNode> {
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);
return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, {
filter: true,
lightpush: true,
relay: true,
store: true
}) as FullNode;
}

View File

@ -14,7 +14,6 @@ import type {
Waku
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { wakuRelay } from "@waku/relay";
import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter.js";
@ -57,7 +56,6 @@ type ProtocolsEnabled = {
filter?: boolean;
lightpush?: boolean;
store?: boolean;
relay?: boolean;
};
export class WakuNode implements Waku {
@ -73,15 +71,16 @@ export class WakuNode implements Waku {
public readonly pubsubTopics: PubsubTopic[],
options: WakuOptions,
libp2p: Libp2p,
protocolsEnabled: ProtocolsEnabled
protocolsEnabled: ProtocolsEnabled,
relay?: IRelay
) {
this.relay = relay;
this.libp2p = libp2p;
protocolsEnabled = {
filter: false,
lightpush: false,
store: false,
relay: false,
...protocolsEnabled
};
@ -118,11 +117,6 @@ export class WakuNode implements Waku {
this.filter = filter(libp2p);
}
if (protocolsEnabled.relay) {
const relay = wakuRelay(this.pubsubTopics);
this.relay = relay(libp2p);
}
log.info(
"Waku node created",
peerId,

View File

@ -77,6 +77,7 @@
"@waku/discovery": "*",
"@waku/message-encryption": "*",
"@waku/sdk": "*",
"@waku/relay": "*",
"allure-commandline": "^2.27.0",
"allure-mocha": "^2.9.2",
"chai": "^4.3.10",

View File

@ -4,8 +4,8 @@ import {
ProtocolCreateOptions,
Protocols
} from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { createLightNode, WakuNode } from "@waku/sdk";
import { createRelayNode } from "@waku/sdk/relay";
import {
derivePubsubTopicsFromNetworkConfig,
Logger,

View File

@ -205,7 +205,7 @@ export class ServiceNode {
}
/**
* Calls nwaku REST API "/admin/v1/peers" to check for known peers
* Calls nwaku REST API "/admin/v1/peers" to check for known peers. Be aware that it doesn't recognize js-waku as a node
* @throws
*/
public async peers(): Promise<string[]> {

View File

@ -65,13 +65,12 @@ export async function runMultipleNodes(
);
const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();
if (wakuConnections.length < 1 || nodePeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}`
);
if (wakuConnections.length < 1) {
throw new Error(`Expected at least 1 connection for js-waku.`);
}
await node.waitForLog(waku.libp2p.peerId.toString(), 100);
}
await waitForConnections(numServiceNodes, waku);

View File

@ -1,7 +1,7 @@
import { Multiaddr } from "@multiformats/multiaddr";
import { EConnectionStateEvents, LightNode, Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { createLightNode } from "@waku/sdk";
import { createRelayNode } from "@waku/sdk/relay";
import { expect } from "chai";
import {

View File

@ -2,7 +2,7 @@ import { waitForRemotePeer } from "@waku/core";
import { EnrDecoder } from "@waku/enr";
import type { RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { createRelayNode } from "@waku/relay";
import { expect } from "chai";
import {

View File

@ -107,13 +107,12 @@ export async function runMultipleNodes(
await node.ensureSubscriptions(pubsubTopics);
const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();
if (wakuConnections.length < 1 || nodePeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}`
);
if (wakuConnections.length < 1) {
throw new Error(`Expected at least 1 connection for js-waku.`);
}
await node.waitForLog(waku.libp2p.peerId.toString(), 100);
}
await waitForConnections(numServiceNodes, waku);

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface";
import { DecodedMessage, waitForRemotePeer } from "@waku/core";
import { Protocols, RelayNode } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { createRelayNode } from "@waku/relay";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -11,7 +11,7 @@ import {
SingleShardInfo
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { createRelayNode } from "@waku/relay";
import {
contentTopicToPubsubTopic,
pubsubTopicToSingleShardInfo,

View File

@ -1,6 +1,6 @@
import { createDecoder, createEncoder } from "@waku/core";
import { RelayNode } from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { createRelayNode } from "@waku/relay";
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";

View File

@ -5,7 +5,7 @@ import {
RelayNode,
ShardInfo
} from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { createRelayNode } from "@waku/relay";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils";
import { Context } from "mocha";

View File

@ -114,14 +114,13 @@ export async function startAndConnectLightNode(
await waitForRemotePeer(waku, [Protocols.Store]);
const wakuConnections = waku.libp2p.getConnections();
const nwakuPeers = await instance.peers();
if (wakuConnections.length < 1 || nwakuPeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}`
);
if (wakuConnections.length < 1) {
throw new Error(`Expected at least 1 connection for js-waku.`);
}
await instance.waitForLog(waku.libp2p.peerId.toString(), 100);
log.info("Waku node created");
return waku;
}

View File

@ -1,8 +1,8 @@
import { waitForRemotePeer } from "@waku/core";
import type { LightNode, RelayNode } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRelayNode } from "@waku/relay";
import { createLightNode } from "@waku/sdk";
import { createRelayNode } from "@waku/sdk/relay";
import { expect } from "chai";
import {

View File

@ -8,12 +8,12 @@ import {
createDecoder,
createEncoder
} from "@waku/message-encryption/symmetric";
import { createRelayNode } from "@waku/relay";
import {
createLightNode,
createEncoder as createPlainEncoder,
DefaultUserAgent
} from "@waku/sdk";
import { createRelayNode } from "@waku/sdk/relay";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";