feat!: deprecate named pubsub topics and use static/auto sharding (#2097)

* feat: deprecate named sharding & protocols adhere
simplify network config type, all protocols use pubsub topic internally

* chore: update tests

* tests: rm application info

* chore: use static sharding and auto sharding terminologies

* chore: update docs for network config

* chore: update interfaces

* tests: update tests error message

* chore: remove `ShardingParams` type and fix test
This commit is contained in:
Danish Arora 2024-08-13 05:23:20 +05:30 committed by GitHub
parent 86f730f958
commit 5ce36c8f18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
54 changed files with 379 additions and 790 deletions

View File

@ -3,10 +3,9 @@ import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
ProtocolCreateOptions,
PubsubTopic
} from "@waku/interfaces";
import { ensureShardingConfigured, Logger } from "@waku/utils";
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import {
getConnectedPeersForProtocolAndShard,
getPeersForProtocol,
@ -29,8 +28,7 @@ export class BaseProtocol implements IBaseProtocolCore {
public multicodec: string,
private components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[],
private options?: ProtocolCreateOptions
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
@ -100,9 +98,7 @@ export class BaseProtocol implements IBaseProtocolCore {
this.components.connectionManager.getConnections(),
this.peerStore,
[this.multicodec],
this.options?.shardInfo
? ensureShardingConfigured(this.options.shardInfo).shardInfo
: undefined
pubsubTopicsToShardInfo(this.pubsubTopics)
);
// Filter the peers based on discovery & number of peers requested

View File

@ -173,7 +173,7 @@ export class ConnectionManager
private constructor(
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
private configuredPubsubTopics: PubsubTopic[],
public readonly configuredPubsubTopics: PubsubTopic[],
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {

View File

@ -5,7 +5,6 @@ import {
type CoreProtocolResult,
type IBaseProtocolCore,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
type PubsubTopic
} from "@waku/interfaces";
@ -38,16 +37,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
wakuMessage: WakuMessage,
peerIdStr: string
) => Promise<void>,
libp2p: Libp2p,
options?: ProtocolCreateOptions
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(
FilterCodecs.SUBSCRIBE,
libp2p.components,
log,
options!.pubsubTopics!,
options
);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics);
libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {

View File

@ -5,8 +5,8 @@ import {
type IEncoder,
type IMessage,
type Libp2p,
type ProtocolCreateOptions,
ProtocolError,
PubsubTopic,
type ThisOrThat
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
@ -32,14 +32,11 @@ type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
LightPushCodec,
libp2p.components,
log,
options!.pubsubTopics!,
options
);
public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(LightPushCodec, libp2p.components, log, pubsubTopics);
}
private async preparePushMessage(

View File

@ -6,10 +6,11 @@ import {
type MetadataQueryResult,
type PeerIdStr,
ProtocolError,
PubsubTopic,
type ShardInfo
} from "@waku/interfaces";
import { proto_metadata } from "@waku/proto";
import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils";
import { encodeRelayShard, Logger, pubsubTopicsToShardInfo } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
@ -26,15 +27,10 @@ class Metadata extends BaseProtocol implements IMetadata {
protected handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();
public constructor(
public shardInfo: ShardInfo,
public pubsubTopics: PubsubTopic[],
libp2p: Libp2pComponents
) {
super(
MetadataCodec,
libp2p.components,
log,
shardInfoToPubsubTopics(shardInfo)
);
super(MetadataCodec, libp2p.components, log, pubsubTopics);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
@ -45,7 +41,9 @@ class Metadata extends BaseProtocol implements IMetadata {
* Make a metadata query to a peer
*/
public async query(peerId: PeerId): Promise<MetadataQueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);
const request = proto_metadata.WakuMetadataRequest.encode(
pubsubTopicsToShardInfo(this.pubsubTopics)
);
const peer = await this.peerStore.get(peerId);
if (!peer) {
@ -112,7 +110,7 @@ class Metadata extends BaseProtocol implements IMetadata {
try {
const { stream, connection } = streamData;
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode(
this.shardInfo
pubsubTopicsToShardInfo(this.pubsubTopics)
);
const encodedResponse = await pipe(
@ -177,7 +175,8 @@ class Metadata extends BaseProtocol implements IMetadata {
}
export function wakuMetadata(
shardInfo: ShardInfo
pubsubTopics: PubsubTopic[]
): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => new Metadata(shardInfo, components);
return (components: Libp2pComponents) =>
new Metadata(pubsubTopics, components);
}

View File

@ -4,7 +4,7 @@ import {
IDecoder,
IStoreCore,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
QueryRequestParams
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -28,14 +28,11 @@ const log = new Logger("store");
export const StoreCodec = "/vac/waku/store-query/3.0.0";
export class StoreCore extends BaseProtocol implements IStoreCore {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
StoreCodec,
libp2p.components,
log,
options?.pubsubTopics || [],
options
);
public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(StoreCodec, libp2p.components, log, pubsubTopics);
}
public async *queryPerPage<T extends IDecodedMessage>(

View File

@ -1,5 +1,7 @@
import type { Peer, PeerId, TypedEventEmitter } from "@libp2p/interface";
import { PubsubTopic } from "./misc";
export enum Tags {
BOOTSTRAP = "bootstrap",
PEER_EXCHANGE = "peer-exchange",
@ -61,6 +63,7 @@ export interface IConnectionStateEvents {
export interface IConnectionManager
extends TypedEventEmitter<IPeersByDiscoveryEvents & IConnectionStateEvents> {
configuredPubsubTopics: PubsubTopic[];
dropConnection(peerId: PeerId): Promise<void>;
getPeersByDiscovery(): Promise<PeersByDiscoveryResult>;
stop(): void;

View File

@ -1,4 +1,4 @@
import { ShardInfo } from "./enr";
import type { ShardInfo } from "./sharding";
/**
* The default cluster ID for The Waku Network
@ -12,3 +12,5 @@ export const DefaultShardInfo: ShardInfo = {
clusterId: DEFAULT_CLUSTER_ID,
shards: [0, 1, 2, 3, 4, 5, 6, 7, 8]
};
export const DefaultNetworkConfig = DefaultShardInfo;

View File

@ -2,6 +2,8 @@ import type { PeerId } from "@libp2p/interface";
import type { PeerInfo } from "@libp2p/interface";
import type { Multiaddr } from "@multiformats/multiaddr";
import { ShardInfo } from "./sharding";
export type ENRKey = string;
export type ENRValue = Uint8Array;
/**
@ -18,11 +20,6 @@ export interface Waku2 {
lightPush: boolean;
}
export interface ShardInfo {
clusterId: number;
shards: number[];
}
export interface IEnr extends Map<ENRKey, ENRValue> {
nodeId?: NodeId;
peerId?: PeerId;

View File

@ -18,3 +18,4 @@ export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";
export * from "./health_manager.js";
export * from "./sharding.js";

View File

@ -1,14 +1,14 @@
import type { PeerId } from "@libp2p/interface";
import { type ShardInfo } from "./enr.js";
import { ThisOrThat } from "./misc.js";
import type { IBaseProtocolCore, ShardingParams } from "./protocols.js";
import { PubsubTopic, ThisOrThat } from "./misc.js";
import type { IBaseProtocolCore } from "./protocols.js";
import type { ShardInfo } from "./sharding.js";
export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>;
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> {
shardInfo: ShardingParams;
pubsubTopics: PubsubTopic[];
confirmOrAttemptHandshake(peerId: PeerId): Promise<MetadataQueryResult>;
query(peerId: PeerId): Promise<MetadataQueryResult>;
}

View File

@ -2,10 +2,10 @@ import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerStore } from "@libp2p/interface";
import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic, ThisAndThat, ThisOrThat } from "./misc.js";
import { ThisAndThat, ThisOrThat } from "./misc.js";
import { AutoSharding, StaticSharding } from "./sharding.js";
export enum Protocols {
Relay = "relay",
@ -15,7 +15,6 @@ export enum Protocols {
}
export type IBaseProtocolCore = {
shardInfo?: ShardInfo;
multicodec: string;
peerStore: PeerStore;
allPeers: () => Promise<Peer[]>;
@ -30,18 +29,7 @@ export type IBaseProtocolSDK = {
readonly numPeersToUse: number;
};
export type ContentTopicInfo = {
clusterId?: number;
contentTopics: string[];
};
export type ApplicationInfo = {
clusterId: number;
application: string;
version: string;
};
export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo;
export type NetworkConfig = StaticSharding | AutoSharding;
//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
/**
@ -72,38 +60,35 @@ export type ProtocolUseOptions = {
export type ProtocolCreateOptions = {
/**
* @deprecated
* Should be used ONLY if some other than The Waku Network is in use.
* Configuration for determining the network in use.
*
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#pubsub-topics) for details.
*
* This is used by:
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
*
* If no pubsub topic is specified, the default pubsub topic will be determined from DefaultShardInfo.
*
* You cannot add or remove pubsub topics after initialization of the node.
*/
pubsubTopics?: PubsubTopic[];
/**
* ShardInfo is used to determine which network is in use.
* Defaults to {@link @waku/interfaces!DefaultShardInfo}.
* Default value is configured for The Waku Network
*
* The format to specify a shard is:
* clusterId: number, shards: number[]
* If using Static Sharding:
* Default value is configured for The Waku Network.
* The format to specify a shard is: clusterId: number, shards: number[]
* To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
*/
shardInfo?: Partial<ShardingParams>;
/**
* Content topics are used to determine network in use.
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details.
*
* If using Auto Sharding:
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details.
* You cannot add or remove content topics after initialization of the node.
*/
contentTopics?: string[];
/**
* Configuration for determining the network in use.
* Network configuration refers to the shards and clusters used in the network.
*
* If using Static Sharding:
* Cluster ID and shards are specified in the format: clusterId: number, shards: number[]
* The default value is configured for The Waku Network => clusterId: 0, shards: [0, 1, 2, 3, 4, 5, 6, 7]
* To learn more about the sharding specification, see [Relay Sharding](https://rfc.vac.dev/spec/51/).
*
* If using Auto Sharding:
* Cluster ID and content topics are specified in the format: clusterId: number, contentTopics: string[]
* Content topics are used to determine the shards to be configured for the network.
* Cluster ID is optional, and defaults to The Waku Network's cluster ID => 0
* To specify content topics, see [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md#content-topics) for details
*
* @default { clusterId: 1, shards: [0, 1, 2, 3, 4, 5, 6, 7] }
*/
networkConfig?: NetworkConfig;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)

View File

@ -0,0 +1,12 @@
export type ShardInfo = {
clusterId: number;
shards: number[];
};
export type ContentTopicInfo = {
clusterId?: number;
contentTopics: string[];
};
export type StaticSharding = ShardInfo;
export type AutoSharding = ContentTopicInfo;

View File

@ -1,6 +1,6 @@
import { type LightNode } from "@waku/interfaces";
import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";
import { CreateWakuNodeOptions, WakuNode } from "../waku.js";
import { createLibp2pAndUpdateOptions } from "./libp2p.js";
@ -12,9 +12,9 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js";
export async function createLightNode(
options: CreateWakuNodeOptions = {}
): Promise<LightNode> {
const libp2p = await createLibp2pAndUpdateOptions(options);
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);
return new WakuNode(options as WakuOptions, libp2p, {
return new WakuNode(pubsubTopics, options, libp2p, {
store: true,
lightpush: true,
filter: true

View File

@ -9,14 +9,14 @@ import { all as filterAll, wss } from "@libp2p/websockets/filters";
import { wakuMetadata } from "@waku/core";
import {
type CreateLibp2pOptions,
DefaultShardInfo,
DefaultNetworkConfig,
type IMetadata,
type Libp2p,
type Libp2pComponents,
type ShardInfo
PubsubTopic
} from "@waku/interfaces";
import { wakuGossipSub } from "@waku/relay";
import { ensureShardingConfigured, Logger } from "@waku/utils";
import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils";
import { createLibp2p } from "libp2p";
import {
@ -35,10 +35,10 @@ type MetadataService = {
metadata?: (components: Libp2pComponents) => IMetadata;
};
const logger = new Logger("sdk:create");
const log = new Logger("sdk:create");
export async function defaultLibp2p(
shardInfo?: ShardInfo,
pubsubTopics: PubsubTopic[],
wakuGossipSub?: PubsubService["pubsub"],
options?: Partial<CreateLibp2pOptions>,
userAgent?: string
@ -60,8 +60,8 @@ export async function defaultLibp2p(
? { pubsub: wakuGossipSub }
: {};
const metadataService: MetadataService = shardInfo
? { metadata: wakuMetadata(shardInfo) }
const metadataService: MetadataService = pubsubTopics
? { metadata: wakuMetadata(pubsubTopics) }
: {};
const filter = process?.env?.NODE_ENV === "test" ? filterAll : wss;
@ -91,14 +91,18 @@ export async function defaultLibp2p(
export async function createLibp2pAndUpdateOptions(
options: CreateWakuNodeOptions
): Promise<Libp2p> {
const shardInfo = configureNetworkOptions(options);
): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> {
const { networkConfig } = options;
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(
networkConfig ?? DefaultNetworkConfig
);
log.info("Creating Waku node with pubsub topics", pubsubTopics);
const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries(options.pubsubTopics!));
peerDiscovery.push(...defaultPeerDiscoveries(pubsubTopics));
}
if (options?.bootstrapPeers) {
@ -108,64 +112,11 @@ export async function createLibp2pAndUpdateOptions(
libp2pOptions.peerDiscovery = peerDiscovery;
const libp2p = await defaultLibp2p(
shardInfo,
pubsubTopics,
wakuGossipSub(options),
libp2pOptions,
options?.userAgent
);
return libp2p;
}
function configureNetworkOptions(
options: CreateWakuNodeOptions
): ShardInfo | undefined {
const flags = [
options.contentTopics,
options.pubsubTopics,
options.shardInfo
].filter((v) => !!v);
if (flags.length > 1) {
throw Error(
"Too many network configurations provided. Pass only one of: pubsubTopic, contentTopics or shardInfo."
);
}
logWhichShardInfoIsUsed(options);
if (options.contentTopics) {
options.shardInfo = { contentTopics: options.contentTopics };
}
if (!options.shardInfo) {
options.shardInfo = DefaultShardInfo;
}
const shardInfo = options.shardInfo
? ensureShardingConfigured(options.shardInfo)
: undefined;
options.pubsubTopics = options.pubsubTopics ?? shardInfo?.pubsubTopics;
return shardInfo?.shardInfo;
}
function logWhichShardInfoIsUsed(options: CreateWakuNodeOptions): void {
if (options.pubsubTopics) {
logger.info("Using pubsubTopics array to bootstrap the node.");
return;
}
if (options.contentTopics) {
logger.info(
"Using contentTopics and default cluster ID (1) to bootstrap the node."
);
return;
}
if (options.shardInfo) {
logger.info("Using shardInfo parameters to bootstrap the node.");
return;
}
return { libp2p, pubsubTopics };
}

View File

@ -13,13 +13,13 @@ import {
type IProtoMessage,
type ISubscriptionSDK,
type Libp2p,
NetworkConfig,
type PeerIdStr,
type ProtocolCreateOptions,
ProtocolError,
type ProtocolUseOptions,
type PubsubTopic,
type SDKProtocolResult,
type ShardingParams,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
@ -437,8 +437,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
libp2p,
options
connectionManager.configuredPubsubTopics,
libp2p
),
connectionManager,
{ numPeersToUse: options?.numPeersToUse }
@ -541,7 +541,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic,
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {

View File

@ -25,9 +25,13 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK {
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(new LightPushCore(libp2p, options), connectionManager, {
numPeersToUse: options?.numPeersToUse
});
super(
new LightPushCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
{
numPeersToUse: options?.numPeersToUse
}
);
this.protocol = this.core as LightPushCore;
}

View File

@ -4,7 +4,6 @@ import {
IDecoder,
IStoreSDK,
Libp2p,
ProtocolCreateOptions,
QueryRequestParams,
StoreCursor
} from "@waku/interfaces";
@ -24,14 +23,14 @@ const log = new Logger("waku:store:sdk");
export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
public readonly protocol: StoreCore;
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(new StoreCore(libp2p, options), connectionManager, {
numPeersToUse: DEFAULT_NUM_PEERS
});
public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) {
super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
{
numPeersToUse: DEFAULT_NUM_PEERS
}
);
this.protocol = this.core as StoreCore;
}
@ -238,10 +237,9 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK {
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/
export function wakuStore(
connectionManager: ConnectionManager,
init: Partial<ProtocolCreateOptions> = {}
connectionManager: ConnectionManager
): (libp2p: Libp2p) => IStoreSDK {
return (libp2p: Libp2p) => {
return new StoreSDK(connectionManager, libp2p, init);
return new StoreSDK(connectionManager, libp2p);
};
}

View File

@ -15,13 +15,11 @@ import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";
* or use this function with caution.
*/
export async function createRelayNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions> = {
pubsubTopics: []
}
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
): Promise<RelayNode> {
const libp2p = await createLibp2pAndUpdateOptions(options);
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);
return new WakuNode(options as WakuOptions, libp2p, {
return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, {
relay: true
}) as RelayNode;
}
@ -40,13 +38,11 @@ export async function createRelayNode(
* @internal
*/
export async function createFullNode(
options: CreateWakuNodeOptions & Partial<RelayCreateOptions> = {
pubsubTopics: []
}
options: CreateWakuNodeOptions & Partial<RelayCreateOptions>
): Promise<FullNode> {
const libp2p = await createLibp2pAndUpdateOptions(options);
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options);
return new WakuNode(options as WakuOptions, libp2p, {
return new WakuNode(pubsubTopics, options as WakuOptions, libp2p, {
filter: true,
lightpush: true,
relay: true,

View File

@ -48,7 +48,6 @@ export interface WakuOptions {
* @default {@link @waku/core.DefaultUserAgent}
*/
userAgent?: string;
pubsubTopics: PubsubTopic[];
}
export type CreateWakuNodeOptions = ProtocolCreateOptions &
@ -68,19 +67,14 @@ export class WakuNode implements Waku {
public filter?: IFilterSDK;
public lightPush?: ILightPushSDK;
public connectionManager: ConnectionManager;
public readonly pubsubTopics: PubsubTopic[];
public readonly health: IHealthManager;
public constructor(
public readonly pubsubTopics: PubsubTopic[],
options: WakuOptions,
libp2p: Libp2p,
protocolsEnabled: ProtocolsEnabled
) {
if (options.pubsubTopics.length == 0) {
throw new Error("At least one pubsub topic must be provided");
}
this.pubsubTopics = options.pubsubTopics;
this.libp2p = libp2p;
protocolsEnabled = {
@ -110,17 +104,17 @@ export class WakuNode implements Waku {
this.health = getHealthManager();
if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager, options);
const store = wakuStore(this.connectionManager);
this.store = store(libp2p);
}
if (protocolsEnabled.lightpush) {
const lightPush = wakuLightPush(this.connectionManager, options);
const lightPush = wakuLightPush(this.connectionManager);
this.lightPush = lightPush(libp2p);
}
if (protocolsEnabled.filter) {
const filter = wakuFilter(this.connectionManager, options);
const filter = wakuFilter(this.connectionManager);
this.filter = filter(libp2p);
}

View File

@ -1,6 +1,6 @@
import { DecodedMessage } from "@waku/core";
import { PubsubTopic, ShardingParams } from "@waku/interfaces";
import { ensureShardingConfigured, Logger } from "@waku/utils";
import { NetworkConfig } from "@waku/interfaces";
import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils";
import { expect } from "chai";
import { DefaultTestPubsubTopic } from "../constants";
@ -23,10 +23,9 @@ const log = new Logger("test:message-collector");
export class ServiceNodesFleet {
public static async createAndRun(
mochaContext: Mocha.Context,
pubsubTopics: PubsubTopic[],
nodesToCreate: number = 3,
strictChecking: boolean = false,
shardInfo?: ShardingParams,
networkConfig: NetworkConfig,
_args?: Args,
withoutFilter = false
): Promise<ServiceNodesFleet> {
@ -38,10 +37,7 @@ export class ServiceNodesFleet {
Math.random().toString(36).substring(7)
);
shardInfo = shardInfo
? ensureShardingConfigured(shardInfo).shardInfo
: undefined;
const args = getArgs(pubsubTopics, shardInfo, _args);
const args = getArgs(networkConfig, _args);
await node.start(args, {
retries: 3
});
@ -266,11 +262,8 @@ class MultipleNodesMessageCollector {
}
}
function getArgs(
pubsubTopics: PubsubTopic[],
shardInfo?: ShardingParams,
args?: Args
): Args {
function getArgs(networkConfig: NetworkConfig, args?: Args): Args {
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig);
const defaultArgs = {
lightpush: true,
filter: true,
@ -278,7 +271,7 @@ function getArgs(
peerExchange: true,
relay: true,
pubsubTopic: pubsubTopics,
...(shardInfo && { clusterId: shardInfo.clusterId })
clusterId: networkConfig.clusterId
} as Args;
return { ...defaultArgs, ...args };

View File

@ -1,13 +1,16 @@
import { waitForRemotePeer } from "@waku/core";
import {
ContentTopicInfo,
NetworkConfig,
ProtocolCreateOptions,
Protocols,
ShardingParams
Protocols
} from "@waku/interfaces";
import { createLightNode, WakuNode } from "@waku/sdk";
import { createRelayNode } from "@waku/sdk/relay";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils";
import {
derivePubsubTopicsFromNetworkConfig,
Logger,
pubsubTopicsToShardInfo
} from "@waku/utils";
import { Context } from "mocha";
import { NOISE_KEY_1 } from "../constants.js";
@ -19,7 +22,7 @@ export const log = new Logger("test:runNodes");
type RunNodesOptions = {
context: Context;
shardInfo: ShardingParams;
networkConfig: NetworkConfig;
protocols: Protocols[];
createNode: typeof createLightNode | typeof createRelayNode;
};
@ -27,14 +30,11 @@ type RunNodesOptions = {
export async function runNodes<T>(
options: RunNodesOptions
): Promise<[ServiceNode, T]> {
const { context, shardInfo, createNode, protocols } = options;
const { context, networkConfig, createNode, protocols } = options;
const nwaku = new ServiceNode(makeLogFileName(context));
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
function isContentTopicInfo(info: ShardingParams): info is ContentTopicInfo {
return (info as ContentTopicInfo).contentTopics !== undefined;
}
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig);
const shardInfo = pubsubTopicsToShardInfo(pubsubTopics);
await nwaku.start(
{
@ -43,19 +43,14 @@ export async function runNodes<T>(
relay: true,
store: true,
pubsubTopic: pubsubTopics,
// Conditionally include clusterId if shardInfo exists
...(shardInfo && { clusterId: shardInfo.clusterId }),
// Conditionally include contentTopic if shardInfo exists and clusterId is 1
...(shardInfo &&
isContentTopicInfo(shardInfo) &&
shardInfo.clusterId === 1 && { contentTopic: shardInfo.contentTopics })
clusterId: shardInfo.clusterId
},
{ retries: 3 }
);
const waku_options: ProtocolCreateOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
shardInfo
networkConfig: shardInfo
};
log.info("Starting js waku node with :", JSON.stringify(waku_options));

View File

@ -1,17 +1,18 @@
import { waitForRemotePeer } from "@waku/core";
import {
DefaultNetworkConfig,
LightNode,
NetworkConfig,
ProtocolCreateOptions,
Protocols,
ShardingParams,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { isDefined, shardInfoToPubsubTopics } from "@waku/utils";
import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";
import { DefaultTestPubsubTopic, NOISE_KEY_1 } from "../constants";
import { NOISE_KEY_1 } from "../constants";
import { ServiceNodesFleet } from "../lib";
import { Args } from "../types";
@ -19,22 +20,18 @@ import { waitForConnections } from "./waitForConnections";
export async function runMultipleNodes(
context: Context,
shardInfo?: ShardingParams,
networkConfig: NetworkConfig = DefaultNetworkConfig,
customArgs?: Args,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultTestPubsubTopic];
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
pubsubTopics,
numServiceNodes,
strictChecking,
shardInfo,
networkConfig,
customArgs,
withoutFilter
);
@ -43,15 +40,10 @@ export async function runMultipleNodes(
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
}
},
networkConfig
};
if (shardInfo) {
wakuOptions.shardInfo = shardInfo;
} else {
wakuOptions.pubsubTopics = pubsubTopics;
}
const waku = await createLightNode(wakuOptions);
await waku.start();
@ -68,7 +60,9 @@ export async function runMultipleNodes(
!customArgs?.lightpush ? undefined : Protocols.LightPush
].filter(isDefined)
);
await node.ensureSubscriptions(pubsubTopics);
await node.ensureSubscriptions(
derivePubsubTopicsFromNetworkConfig(networkConfig)
);
const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();

View File

@ -29,7 +29,11 @@ describe("Connection state", function () {
let nwaku2PeerId: Multiaddr;
beforeEachCustom(this, async () => {
waku = await createLightNode({ shardInfo: DefaultTestShardInfo });
try {
waku = await createLightNode({ networkConfig: DefaultTestShardInfo });
} catch (error) {
console.error(error);
}
nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1");
nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2");
await nwaku1.start({ filter: true });
@ -91,11 +95,11 @@ describe("Connection state", function () {
it("`waku:online` between 2 js-waku relay nodes", async function () {
const waku1 = await createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
const waku2 = await createRelayNode({
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
let eventCount1 = 0;

View File

@ -21,7 +21,7 @@ describe("Dials", function () {
let waku: LightNode;
beforeEachCustom(this, async () => {
waku = await createLightNode({ shardInfo: { shards: [0] } });
waku = await createLightNode();
isPeerTopicConfigured = sinon.stub(
waku.connectionManager as any,
"isPeerTopicConfigured"

View File

@ -19,7 +19,7 @@ describe("Events", function () {
let waku: LightNode;
this.timeout(TEST_TIMEOUT);
beforeEachCustom(this, async () => {
waku = await createLightNode({ shardInfo: { shards: [0] } });
waku = await createLightNode();
});
afterEachCustom(this, async () => {

View File

@ -19,7 +19,7 @@ describe("Public methods", function () {
let waku: LightNode;
this.timeout(TEST_TIMEOUT);
beforeEachCustom(this, async () => {
waku = await createLightNode({ shardInfo: { shards: [0] } });
waku = await createLightNode();
});
afterEachCustom(this, async () => {

View File

@ -37,7 +37,7 @@ describe("ENR Interop: ServiceNode", function () {
waku = await createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku.start();
await waku.dial(multiAddrWithId);
@ -71,7 +71,7 @@ describe("ENR Interop: ServiceNode", function () {
waku = await createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku.start();
await waku.dial(multiAddrWithId);
@ -106,7 +106,7 @@ describe("ENR Interop: ServiceNode", function () {
waku = await createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku.start();
await waku.dial(multiAddrWithId);

View File

@ -107,7 +107,7 @@ describe("Waku Message Ephemeral field", function () {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
shardInfo: {
networkConfig: {
contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic],
clusterId: ClusterId
}
@ -142,14 +142,14 @@ describe("Waku Message Ephemeral field", function () {
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: {
networkConfig: {
contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic],
clusterId: ClusterId
}
}).then((waku) => waku.start().then(() => waku)),
createLightNode({
staticNoiseKey: NOISE_KEY_2,
shardInfo: {
networkConfig: {
contentTopics: [TestContentTopic, AsymContentTopic, SymContentTopic],
clusterId: ClusterId
}

View File

@ -26,7 +26,11 @@ const runTests = (strictCheckNodes: boolean): void => {
let serviceNodes: ServiceNodesFleet;
beforeEachCustom(this, async () => {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
try {
[serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
} catch (error) {
console.error(error);
}
});
afterEachCustom(this, async () => {

View File

@ -1,4 +1,4 @@
import { LightNode, Protocols, ShardingParams } from "@waku/interfaces";
import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { Logger } from "@waku/utils";
import { Context } from "mocha";
@ -12,11 +12,11 @@ export const log = new Logger("test:filter:single_node");
export const runNodes = (
context: Context,
shardInfo: ShardingParams
shardInfo: NetworkConfig
): Promise<[ServiceNode, LightNode]> =>
runNodesBuilder<LightNode>({
context,
createNode: createLightNode,
protocols: [Protocols.LightPush, Protocols.Filter],
shardInfo
networkConfig: shardInfo
});

View File

@ -1,28 +1,24 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import {
DefaultNetworkConfig,
ISubscriptionSDK,
LightNode,
NetworkConfig,
ProtocolCreateOptions,
Protocols,
ShardingParams,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
Logger,
shardInfoToPubsubTopics
derivePubsubTopicsFromNetworkConfig,
Logger
} from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";
import { Context } from "mocha";
import pRetry from "p-retry";
import {
DefaultTestPubsubTopic,
NOISE_KEY_1,
ServiceNodesFleet,
waitForConnections
} from "../../src";
import { NOISE_KEY_1, ServiceNodesFleet, waitForConnections } from "../../src";
// Constants for test configuration.
export const log = new Logger("test:filter");
@ -69,21 +65,18 @@ export async function validatePingError(
export async function runMultipleNodes(
context: Context,
shardInfo?: ShardingParams,
networkConfig: NetworkConfig = DefaultNetworkConfig,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultTestPubsubTopic];
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig);
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
pubsubTopics,
numServiceNodes,
strictChecking,
shardInfo,
networkConfig,
undefined,
withoutFilter
);
@ -95,12 +88,6 @@ export async function runMultipleNodes(
}
};
if (shardInfo) {
wakuOptions.shardInfo = shardInfo;
} else {
wakuOptions.pubsubTopics = pubsubTopics;
}
log.info("Starting js waku node with :", JSON.stringify(wakuOptions));
let waku: LightNode | undefined;
try {

View File

@ -66,7 +66,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
waku = await createLightNode({ shardInfo });
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waitForRemotePeer(waku, [Protocols.LightPush]);
@ -115,7 +115,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
await waku.start();
@ -166,7 +166,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
@ -220,7 +220,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
@ -256,7 +256,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
waku = await createLightNode({ shardInfo });
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waitForRemotePeer(waku, [Protocols.LightPush]);
@ -307,7 +307,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
@ -360,7 +360,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);
@ -415,7 +415,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await delay(500);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
@ -454,7 +454,7 @@ describe("getPeers", function () {
let allPeers: Peer[];
beforeEachCustom(this, async () => {
waku = await createLightNode({ shardInfo: DefaultTestShardInfo });
waku = await createLightNode({ networkConfig: DefaultTestShardInfo });
peerStore = waku.libp2p.peerStore;
connectionManager = waku.libp2p.components.connectionManager;

View File

@ -133,7 +133,7 @@ async function setupTestEnvironment(
);
serviceNodes.push(...serviceNodesFleet.nodes);
} else {
waku = await createLightNode({ shardInfo: TestShardInfo });
waku = await createLightNode({ networkConfig: TestShardInfo });
}
// Create additional LightPush nodes if needed

View File

@ -3,9 +3,9 @@ import { createEncoder, waitForRemotePeer } from "@waku/core";
import {
ContentTopicInfo,
LightNode,
NetworkConfig,
Protocols,
ShardInfo,
ShardingParams,
SingleShardInfo
} from "@waku/interfaces";
import {
@ -344,7 +344,7 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
pubsubTopicShardInfo: shardInfo2
});
const testShardInfo: ShardingParams = {
const testShardInfo: NetworkConfig = {
clusterId,
shards: [
contentTopicToShardIndex(customContentTopic1),

View File

@ -48,7 +48,7 @@ describe("Metadata Protocol", function () {
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
const nwaku1PeerId = await nwaku1.getPeerId();
waku = await createLightNode({ shardInfo });
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
@ -95,7 +95,7 @@ describe("Metadata Protocol", function () {
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
const nwaku1PeerId = await nwaku1.getPeerId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
@ -141,7 +141,7 @@ describe("Metadata Protocol", function () {
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
@ -179,7 +179,7 @@ describe("Metadata Protocol", function () {
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
waku = await createLightNode({ shardInfo: shardInfo2 });
waku = await createLightNode({ networkConfig: shardInfo2 });
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
@ -215,7 +215,7 @@ describe("Metadata Protocol", function () {
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
const nwaku1PeerId = await nwaku1.getPeerId();
waku = await createLightNode({ shardInfo });
waku = await createLightNode({ networkConfig: shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);
@ -251,7 +251,10 @@ describe("Metadata Protocol", function () {
const nwaku1Ma = await nwaku1.getMultiaddrWithId();
const nwaku1PeerId = await nwaku1.getPeerId();
waku = await createLightNode({ shardInfo, pingKeepAlive: 1 });
waku = await createLightNode({
networkConfig: shardInfo,
pingKeepAlive: 1
});
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);

View File

@ -41,7 +41,7 @@ describe("Peer Exchange", function () {
tests({
async setup() {
waku = await createLightNode({ shardInfo: DefaultTestShardInfo });
waku = await createLightNode({ networkConfig: DefaultTestShardInfo });
await waku.start();
const nwaku2Ma = await nwaku2.getMultiaddrWithId();

View File

@ -53,7 +53,7 @@ describe("Peer Exchange", function () {
it("getPeersByDiscovery", async function () {
waku = await createLightNode({
shardInfo: DefaultTestShardInfo,
networkConfig: DefaultTestShardInfo,
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),

View File

@ -49,7 +49,7 @@ describe("Peer Exchange", () => {
wakuPeerExchangeDiscovery([pubsubTopic])
]
},
shardInfo: shardInfo
networkConfig: shardInfo
});
await waku.start();

View File

@ -101,7 +101,7 @@ describe("Waku Relay, Interop", function () {
const waku2 = await createRelayNode({
staticNoiseKey: NOISE_KEY_2,
emitSelf: true,
shardInfo: TestShardInfo
networkConfig: TestShardInfo
});
await waku2.start();

View File

@ -93,16 +93,16 @@ describe("Waku Relay, multiple pubsub topics", function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: testItem.shardInfo,
networkConfig: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: testItem.shardInfo,
networkConfig: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: testItem.shardInfo,
networkConfig: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
@ -200,16 +200,16 @@ describe("Waku Relay, multiple pubsub topics", function () {
// Waku1 and waku2 are using multiple pubsub topis
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: shardInfoBothShards,
networkConfig: shardInfoBothShards,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: shardInfoBothShards,
networkConfig: shardInfoBothShards,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: shardInfo1,
networkConfig: shardInfo1,
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
@ -269,11 +269,11 @@ describe("Waku Relay, multiple pubsub topics", function () {
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: shardInfo1,
networkConfig: shardInfo1,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: shardInfo1,
networkConfig: shardInfo1,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
@ -398,16 +398,16 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: testItem.shardInfo,
networkConfig: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: testItem.shardInfo,
networkConfig: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: testItem.shardInfo,
networkConfig: testItem.shardInfo,
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
@ -514,16 +514,16 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
// Waku1 and waku2 are using multiple pubsub topis
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: contentTopicInfoBothShards,
networkConfig: contentTopicInfoBothShards,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: contentTopicInfoBothShards,
networkConfig: contentTopicInfoBothShards,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: contentTopicInfo1,
networkConfig: contentTopicInfo1,
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
@ -610,11 +610,11 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
shardInfo: contentTopicInfo1,
networkConfig: contentTopicInfo1,
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
shardInfo: contentTopicInfo1,
networkConfig: contentTopicInfo1,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
@ -667,287 +667,3 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1);
});
});
describe("Waku Relay (named sharding), multiple pubsub topics", function () {
this.timeout(15000);
let waku1: RelayNode;
let waku2: RelayNode;
let waku3: RelayNode;
const customPubsubTopic1 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 1
});
const customPubsubTopic2 = singleShardInfoToPubsubTopic({
clusterId: 3,
shard: 2
});
const customContentTopic1 = "/test/2/waku-relay/utf8";
const customContentTopic2 = "/test/3/waku-relay/utf8";
const customEncoder1 = createEncoder({
pubsubTopic: customPubsubTopic1,
contentTopic: customContentTopic1
});
const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1);
const customEncoder2 = createEncoder({
pubsubTopic: customPubsubTopic2,
contentTopic: customContentTopic2
});
const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2);
afterEachCustom(this, async () => {
await tearDownNodes([], [waku1, waku2, waku3]);
});
[
{
pubsub: customPubsubTopic1,
encoder: customEncoder1,
decoder: customDecoder1
},
{
pubsub: customPubsubTopic2,
encoder: customEncoder2,
decoder: customDecoder2
}
].forEach((testItem) => {
it(`3 nodes on ${testItem.pubsub} topic`, async function () {
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
.fill(null)
.map(() => new MessageCollector());
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubsubTopics: [testItem.pubsub],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [testItem.pubsub],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [testItem.pubsub],
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector1.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector2.callback
);
await waku3.relay.subscribeWithUnsubscribe(
[testItem.decoder],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
payload: utf8ToBytes("M1")
});
const relayResponse2 = await waku2.relay.send(testItem.encoder, {
payload: utf8ToBytes("M2")
});
const relayResponse3 = await waku3.relay.send(testItem.encoder, {
payload: utf8ToBytes("M3")
});
expect(relayResponse1.successes[0].toString()).to.eq(
waku2.libp2p.peerId.toString()
);
expect(relayResponse3.successes[0].toString()).to.eq(
waku2.libp2p.peerId.toString()
);
expect(relayResponse2.successes.map((r) => r.toString())).to.include(
waku1.libp2p.peerId.toString()
);
expect(relayResponse2.successes.map((r) => r.toString())).to.include(
waku3.libp2p.peerId.toString()
);
expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq(
true
);
expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq(
true
);
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
true
);
expect(
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2")
).to.eq(true);
expect(
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3")
).to.eq(true);
expect(
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1")
).to.eq(true);
expect(
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3")
).to.eq(true);
expect(
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1")
).to.eq(true);
expect(
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2")
).to.eq(true);
});
});
it("Nodes with multiple pubsub topic", async function () {
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
.fill(null)
.map(() => new MessageCollector());
// Waku1 and waku2 are using multiple pubsub topis
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubsubTopics: [customPubsubTopic1, customPubsubTopic2],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [customPubsubTopic1, customPubsubTopic2],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [customPubsubTopic1],
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
waitForRemotePeer(waku3, [Protocols.Relay])
]);
await waku1.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector1.callback
);
await waku2.relay.subscribeWithUnsubscribe(
[customDecoder1, customDecoder2],
msgCollector2.callback
);
await waku3.relay.subscribeWithUnsubscribe(
[customDecoder1],
msgCollector3.callback
);
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") });
await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") });
await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") });
await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") });
await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") });
await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") });
expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true);
expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true);
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true);
expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true);
expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true);
expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true);
expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true);
});
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
[waku1, waku2, waku3] = await Promise.all([
createRelayNode({
pubsubTopics: [customPubsubTopic1],
staticNoiseKey: NOISE_KEY_1
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
pubsubTopics: [customPubsubTopic1],
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
staticNoiseKey: NOISE_KEY_3
}).then((waku) => waku.start().then(() => waku))
]);
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
multiaddrs: waku2.libp2p.getMultiaddrs()
});
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId)
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay])
]);
const messageText = "Communicating using a custom pubsub topic";
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve) => {
void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
}
);
// The promise **fails** if we receive a message on the default
// pubsub topic.
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
(resolve, reject) => {
void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
setTimeout(resolve, 1000);
}
);
await waku1.relay.send(customEncoder1, {
payload: utf8ToBytes(messageText)
});
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
await waku3NoMsgPromise;
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1);
});
});

View File

@ -65,7 +65,7 @@ describe("Waku Relay, Subscribe", function () {
try {
const waku = await createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: TestShardInfo
networkConfig: TestShardInfo
});
await waku.start();

View File

@ -1,9 +1,9 @@
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
import {
NetworkConfig,
Protocols,
RelayNode,
ShardInfo,
ShardingParams
ShardInfo
} from "@waku/interfaces";
import { createRelayNode } from "@waku/sdk/relay";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils";
@ -51,10 +51,10 @@ export async function waitForAllRemotePeers(
export const runRelayNodes = (
context: Context,
shardInfo: ShardingParams
networkConfig: NetworkConfig
): Promise<[ServiceNode, RelayNode]> =>
runNodes<RelayNode>({
shardInfo,
networkConfig,
context,
protocols: RELAY_PROTOCOLS,
createNode: createRelayNode
@ -65,11 +65,11 @@ export async function runJSNodes(): Promise<[RelayNode, RelayNode]> {
const [waku1, waku2] = await Promise.all([
createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: TestShardInfo
networkConfig: TestShardInfo
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
staticNoiseKey: NOISE_KEY_2,
shardInfo: TestShardInfo,
networkConfig: TestShardInfo,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku))
]);

View File

@ -55,7 +55,7 @@ describe("Autosharding: Running Nodes", function () {
await nwaku.ensureSubscriptions(pubsubTopics);
waku = await createLightNode({
shardInfo: {
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic]
}
@ -97,7 +97,7 @@ describe("Autosharding: Running Nodes", function () {
await nwaku.ensureSubscriptions(pubsubTopics);
waku = await createLightNode({
shardInfo: {
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic]
}
@ -153,7 +153,7 @@ describe("Autosharding: Running Nodes", function () {
});
waku = await createLightNode({
shardInfo: {
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic]
}
@ -216,7 +216,7 @@ describe("Autosharding: Running Nodes", function () {
});
waku = await createLightNode({
shardInfo: {
networkConfig: {
clusterId: clusterId,
// For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards
contentTopics: [ContentTopic, ContentTopic2]
@ -274,7 +274,7 @@ describe("Autosharding: Running Nodes", function () {
});
waku = await createLightNode({
shardInfo: {
networkConfig: {
clusterId: clusterId,
contentTopics: [ContentTopic]
}
@ -301,52 +301,10 @@ describe("Autosharding: Running Nodes", function () {
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
});
it("start node with ApplicationInfo", async function () {
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)];
await nwaku.start({
store: true,
lightpush: true,
relay: true,
clusterId: clusterId,
pubsubTopic: pubsubTopics,
contentTopic: [ContentTopic]
});
waku = await createLightNode({
shardInfo: {
clusterId: clusterId,
application: ContentTopic.split("/")[1],
version: ContentTopic.split("/")[2]
}
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(ContentTopic)
}
});
const request = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
expect(request.successes.length).to.eq(1);
expect(
await messageCollector.waitForMessagesAutosharding(1, {
contentTopic: ContentTopic
})
).to.eq(true);
});
it("start node with empty content topic", async function () {
try {
waku = await createLightNode({
shardInfo: {
networkConfig: {
clusterId: clusterId,
contentTopics: []
}
@ -358,7 +316,7 @@ describe("Autosharding: Running Nodes", function () {
if (
!(err instanceof Error) ||
!err.message.includes(
"Missing minimum required configuration options for static sharding or autosharding"
"Invalid content topics configuration: please provide at least one content topic"
)
) {
throw err;

View File

@ -91,7 +91,7 @@ describe("Static Sharding: Peer Management", function () {
const nwaku3Ma = await nwaku3.getMultiaddrWithId();
waku = await createLightNode({
shardInfo: shardInfo,
networkConfig: shardInfo,
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
@ -173,7 +173,7 @@ describe("Static Sharding: Peer Management", function () {
const nwaku3Ma = await nwaku3.getMultiaddrWithId();
waku = await createLightNode({
shardInfo: shardInfoToDial,
networkConfig: shardInfoToDial,
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
@ -277,7 +277,7 @@ describe("Autosharding: Peer Management", function () {
const nwaku3Ma = await nwaku3.getMultiaddrWithId();
waku = await createLightNode({
shardInfo: contentTopicInfo,
networkConfig: contentTopicInfo,
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
@ -359,7 +359,7 @@ describe("Autosharding: Peer Management", function () {
const nwaku3Ma = await nwaku3.getMultiaddrWithId();
waku = await createLightNode({
shardInfo: contentTopicInfoToDial,
networkConfig: contentTopicInfoToDial,
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),

View File

@ -59,7 +59,7 @@ describe("Static Sharding: Running Nodes", function () {
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
waku = await createLightNode({
shardInfo: shardInfo
networkConfig: shardInfo
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
@ -99,7 +99,7 @@ describe("Static Sharding: Running Nodes", function () {
await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo));
waku = await createLightNode({
shardInfo: shardInfo
networkConfig: shardInfo
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
@ -148,7 +148,7 @@ describe("Static Sharding: Running Nodes", function () {
});
waku = await createLightNode({
shardInfo: shardInfo
networkConfig: shardInfo
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
@ -214,7 +214,7 @@ describe("Static Sharding: Running Nodes", function () {
it("configure the node with multiple pubsub topics", async function () {
waku = await createLightNode({
shardInfo: shardInfoBothShards
networkConfig: shardInfoBothShards
});
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.LightPush]);
@ -253,7 +253,7 @@ describe("Static Sharding: Running Nodes", function () {
it("using a protocol with unconfigured pubsub topic should fail", async function () {
this.timeout(15_000);
waku = await createLightNode({
shardInfo: shardInfoFirstShard
networkConfig: shardInfoFirstShard
});
// use a pubsub topic that is not configured
@ -274,10 +274,10 @@ describe("Static Sharding: Running Nodes", function () {
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
});
it("start node with empty shard", async function () {
it("start node with empty shard should fail", async function () {
try {
waku = await createLightNode({
shardInfo: { clusterId: clusterId, shards: [] }
networkConfig: { clusterId: clusterId, shards: [] }
});
throw new Error(
"Starting the node with no shard should've thrown an error"
@ -286,7 +286,7 @@ describe("Static Sharding: Running Nodes", function () {
if (
!(err instanceof Error) ||
!err.message.includes(
"Missing minimum required configuration options for static sharding or autosharding"
"Invalid shards configuration: please provide at least one shard"
)
) {
throw err;

View File

@ -257,7 +257,7 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: contentTopicInfoBothShards
networkConfig: contentTopicInfoBothShards
});
await waku.start();

View File

@ -6,9 +6,9 @@ import {
} from "@waku/core";
import {
LightNode,
NetworkConfig,
Protocols,
ShardInfo,
ShardingParams,
type SingleShardInfo
} from "@waku/interfaces";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
@ -102,12 +102,12 @@ export async function processQueriedMessages(
export async function startAndConnectLightNode(
instance: ServiceNode,
shardInfo: ShardingParams
networkConfig: NetworkConfig
): Promise<LightNode> {
const waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
shardInfo: shardInfo
networkConfig: networkConfig
});
await waku.start();
await waku.dial(await instance.getMultiaddrWithId());
@ -145,11 +145,11 @@ export const adjustDate = (baseDate: Date, adjustMs: number): Date => {
export const runStoreNodes = (
context: Context,
shardInfo: ShardingParams
networkConfig: NetworkConfig
): Promise<[ServiceNode, LightNode]> =>
runNodes({
context,
shardInfo,
networkConfig,
createNode: createLightNode,
protocols: [Protocols.Store]
});

View File

@ -57,7 +57,7 @@ describe("Wait for remote peer", function () {
waku1 = await createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku1.start();
@ -77,7 +77,7 @@ describe("Wait for remote peer", function () {
this.timeout(5000);
createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
})
.then((waku1) => waku1.start().then(() => waku1))
.then((waku1) => {
@ -107,7 +107,7 @@ describe("Wait for remote peer", function () {
waku2 = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku2.start();
await waku2.dial(multiAddrWithId);
@ -136,7 +136,7 @@ describe("Wait for remote peer", function () {
waku2 = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku2.start();
const waitPromise = waitForRemotePeer(waku2, [Protocols.Store], 2000);
@ -167,7 +167,7 @@ describe("Wait for remote peer", function () {
waku2 = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku2.start();
await waku2.dial(multiAddrWithId);
@ -196,7 +196,7 @@ describe("Wait for remote peer", function () {
waku2 = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku2.start();
await waku2.dial(multiAddrWithId);
@ -225,7 +225,7 @@ describe("Wait for remote peer", function () {
waku2 = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku2.start();
await waku2.dial(multiAddrWithId);

View File

@ -54,7 +54,7 @@ describe("Waku Dial [node only]", function () {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku.start();
await waku.dial(multiAddrWithId);
@ -88,7 +88,7 @@ describe("Waku Dial [node only]", function () {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
});
await waku.start();
await waku.dial(multiAddrWithId);
@ -116,7 +116,7 @@ describe("Waku Dial [node only]", function () {
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo,
networkConfig: DefaultTestShardInfo,
libp2p: {
peerDiscovery: [bootstrap({ list: [multiAddrWithId.toString()] })]
}
@ -142,7 +142,7 @@ describe("Waku Dial [node only]", function () {
waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo,
networkConfig: DefaultTestShardInfo,
libp2p: {
peerDiscovery: [bootstrap({ list: [nwakuMa.toString()] })]
}
@ -174,11 +174,11 @@ describe("Decryption Keys", function () {
[waku1, waku2] = await Promise.all([
createRelayNode({
staticNoiseKey: NOISE_KEY_1,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
staticNoiseKey: NOISE_KEY_2,
shardInfo: DefaultTestShardInfo,
networkConfig: DefaultTestShardInfo,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku))
]);
@ -254,11 +254,11 @@ describe("User Agent", function () {
createRelayNode({
staticNoiseKey: NOISE_KEY_1,
userAgent: waku1UserAgent,
shardInfo: DefaultTestShardInfo
networkConfig: DefaultTestShardInfo
}).then((waku) => waku.start().then(() => waku)),
createRelayNode({
staticNoiseKey: NOISE_KEY_2,
shardInfo: DefaultTestShardInfo,
networkConfig: DefaultTestShardInfo,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
}).then((waku) => waku.start().then(() => waku))
]);

View File

@ -4,7 +4,7 @@ export * from "./random_subset.js";
export * from "./group_by.js";
export * from "./to_async_iterator.js";
export * from "./is_size_valid.js";
export * from "./sharding.js";
export * from "./sharding/index.js";
export * from "./push_or_init_map.js";
export * from "./relay_shard_codec.js";
export * from "./delay.js";

View File

@ -1,4 +1,4 @@
import { DEFAULT_CLUSTER_ID } from "@waku/interfaces";
import { DEFAULT_CLUSTER_ID, NetworkConfig } from "@waku/interfaces";
import { expect } from "chai";
import {
@ -13,7 +13,7 @@ import {
shardInfoToPubsubTopics,
singleShardInfosToShardInfo,
singleShardInfoToPubsubTopic
} from "./sharding";
} from ".";
const testInvalidCases = (
contentTopics: string[],
@ -284,13 +284,6 @@ describe("shardInfoToPubsubTopics", () => {
expect(topics.length).to.equal(2);
});
it("should handle application and version for autosharding", () => {
const shardInfo = { application: "app", version: "v1" };
const topics = shardInfoToPubsubTopics(shardInfo);
expect(topics).to.be.an("array").that.includes("/waku/2/rs/1/4");
expect(topics.length).to.equal(1);
});
[0, 1, 6].forEach((clusterId) => {
it(`should handle clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => {
const shardInfo = {
@ -431,7 +424,7 @@ describe("ensureShardingConfigured", () => {
it("should return valid sharding parameters for static sharding", () => {
const shardInfo = { clusterId: 1, shards: [0, 1] };
const result = ensureShardingConfigured(shardInfo);
expect(result.shardingParams).to.deep.include({
expect(result.shardInfo).to.deep.include({
clusterId: 1,
shards: [0, 1]
});
@ -443,11 +436,8 @@ describe("ensureShardingConfigured", () => {
});
it("should return valid sharding parameters for content topics autosharding", () => {
const shardInfo = { contentTopics: ["/app/v1/topic1/proto"] };
const result = ensureShardingConfigured(shardInfo);
expect(result.shardingParams).to.deep.include({
contentTopics: ["/app/v1/topic1/proto"]
});
const contentTopicInfo = { contentTopics: ["/app/v1/topic1/proto"] };
const result = ensureShardingConfigured(contentTopicInfo);
const expectedPubsubTopic = contentTopicToPubsubTopic(
"/app/v1/topic1/proto",
DEFAULT_CLUSTER_ID
@ -458,47 +448,8 @@ describe("ensureShardingConfigured", () => {
expect(result.pubsubTopics).to.include(expectedPubsubTopic);
});
it("should configure sharding based on application and version for autosharding", () => {
const shardInfo = { application: "app", version: "v1" };
const result = ensureShardingConfigured(shardInfo);
expect(result.shardingParams).to.deep.include({
application: "app",
version: "v1"
});
const expectedPubsubTopic = contentTopicToPubsubTopic(
`/app/v1/default/default`
);
expect(result.pubsubTopics).to.include(expectedPubsubTopic);
expect(result.shardInfo.shards).to.include(
pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard
);
});
[0, 1, 4].forEach((clusterId) => {
it(`should configure sharding based on clusterId, application and version for autosharding with cluster iD ${clusterId}`, () => {
const shardInfo = {
clusterId: clusterId,
application: "app",
version: "v1"
};
const result = ensureShardingConfigured(shardInfo);
expect(result.shardingParams).to.deep.include({
application: "app",
version: "v1"
});
const expectedPubsubTopic = contentTopicToPubsubTopic(
`/app/v1/default/default`,
shardInfo.clusterId
);
expect(result.pubsubTopics).to.include(expectedPubsubTopic);
expect(result.shardInfo.shards).to.include(
pubsubTopicToSingleShardInfo(expectedPubsubTopic).shard
);
});
});
it("should throw an error for missing sharding configuration", () => {
const shardInfo = {};
const shardInfo = {} as any as NetworkConfig;
expect(() => ensureShardingConfigured(shardInfo)).to.throw();
});

View File

@ -1,13 +1,43 @@
import { sha256 } from "@noble/hashes/sha256";
import {
DEFAULT_CLUSTER_ID,
NetworkConfig,
PubsubTopic,
ShardInfo,
ShardingParams,
SingleShardInfo
} from "@waku/interfaces";
import { concat, utf8ToBytes } from "../bytes/index.js";
import { concat, utf8ToBytes } from "../../bytes/index.js";
import { isAutoSharding, isStaticSharding } from "./type_guards.js";
export * from "./type_guards.js";
export function derivePubsubTopicsFromNetworkConfig(
networkConfig: NetworkConfig
): PubsubTopic[] {
if (isStaticSharding(networkConfig)) {
if (networkConfig.shards.length === 0) {
throw new Error(
"Invalid shards configuration: please provide at least one shard"
);
}
return shardInfoToPubsubTopics(networkConfig);
} else if (isAutoSharding(networkConfig)) {
if (networkConfig.contentTopics.length === 0) {
throw new Error(
"Invalid content topics configuration: please provide at least one content topic"
);
}
return networkConfig.contentTopics.map((contentTopic) =>
contentTopicToPubsubTopic(contentTopic, networkConfig.clusterId)
);
} else {
throw new Error(
"Unknown shard config. Please use ShardInfo or ContentTopicInfo"
);
}
}
export const singleShardInfoToPubsubTopic = (
shardInfo: SingleShardInfo
@ -38,7 +68,7 @@ export const singleShardInfosToShardInfo = (
};
export const shardInfoToPubsubTopics = (
shardInfo: Partial<ShardingParams>
shardInfo: Partial<NetworkConfig>
): PubsubTopic[] => {
if ("contentTopics" in shardInfo && shardInfo.contentTopics) {
// Autosharding: explicitly defined content topics
@ -98,6 +128,39 @@ export const pubsubTopicToSingleShardInfo = (
};
};
export const pubsubTopicsToShardInfo = (
pubsubTopics: PubsubTopic[]
): ShardInfo => {
const shardInfoSet = new Set<string>();
const clusterIds = new Set<number>();
for (const topic of pubsubTopics) {
const { clusterId, shard } = pubsubTopicToSingleShardInfo(topic);
shardInfoSet.add(`${clusterId}:${shard}`);
clusterIds.add(clusterId);
}
if (shardInfoSet.size === 0) {
throw new Error("No valid pubsub topics provided");
}
if (clusterIds.size > 1) {
throw new Error(
"Pubsub topics from multiple cluster IDs are not supported"
);
}
const clusterId = clusterIds.values().next().value;
const shards = Array.from(shardInfoSet).map((info) =>
parseInt(info.split(":")[1])
);
return {
clusterId,
shards
};
};
//TODO: move part of BaseProtocol instead of utils
// return `ProtocolError.TOPIC_NOT_CONFIGURED` instead of throwing
export function ensurePubsubTopicIsConfigured(
@ -248,28 +311,21 @@ export function determinePubsubTopic(
* @returns Validated sharding parameters, with any missing values set to defaults
*/
export const ensureShardingConfigured = (
shardInfo: Partial<ShardingParams>
networkConfig: NetworkConfig
): {
shardingParams: ShardingParams;
shardInfo: ShardInfo;
pubsubTopics: PubsubTopic[];
} => {
const clusterId = shardInfo.clusterId ?? DEFAULT_CLUSTER_ID;
const shards = "shards" in shardInfo ? shardInfo.shards : [];
const clusterId = networkConfig.clusterId ?? DEFAULT_CLUSTER_ID;
const shards = "shards" in networkConfig ? networkConfig.shards : [];
const contentTopics =
"contentTopics" in shardInfo ? shardInfo.contentTopics : [];
const [application, version] =
"application" in shardInfo && "version" in shardInfo
? [shardInfo.application, shardInfo.version]
: [undefined, undefined];
"contentTopics" in networkConfig ? networkConfig.contentTopics : [];
const isShardsConfigured = shards && shards.length > 0;
const isContentTopicsConfigured = contentTopics && contentTopics.length > 0;
const isApplicationVersionConfigured = application && version;
if (isShardsConfigured) {
return {
shardingParams: { clusterId, shards },
shardInfo: { clusterId, shards },
pubsubTopics: shardInfoToPubsubTopics({ clusterId, shards })
};
@ -287,27 +343,11 @@ export const ensureShardingConfigured = (
new Set(contentTopics.map((topic) => contentTopicToShardIndex(topic)))
);
return {
shardingParams: { clusterId, contentTopics },
shardInfo: { clusterId, shards },
pubsubTopics
};
}
if (isApplicationVersionConfigured) {
const pubsubTopic = contentTopicToPubsubTopic(
`/${application}/${version}/default/default`,
clusterId
);
return {
shardingParams: { clusterId, application, version },
shardInfo: {
clusterId,
shards: [pubsubTopicToSingleShardInfo(pubsubTopic).shard!]
},
pubsubTopics: [pubsubTopic]
};
}
throw new Error(
"Missing minimum required configuration options for static sharding or autosharding."
);

View File

@ -0,0 +1,19 @@
import type {
ContentTopicInfo,
ProtocolCreateOptions,
StaticSharding
} from "@waku/interfaces";
export function isStaticSharding(
config: NonNullable<ProtocolCreateOptions["networkConfig"]>
): config is StaticSharding {
return (
"clusterId" in config && "shards" in config && !("contentTopics" in config)
);
}
export function isAutoSharding(
config: NonNullable<ProtocolCreateOptions["networkConfig"]>
): config is ContentTopicInfo {
return "contentTopics" in config && "clusterId" in config;
}