Merge pull request #1816 from waku-org/feat/decouple-sharding-params

feat: decouple sharding params out of core
This commit is contained in:
Arseniy Klempner 2024-02-08 10:42:07 -08:00 committed by GitHub
commit 1ffa5db7e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 150 additions and 134 deletions

View File

@ -6,12 +6,7 @@ import type {
ProtocolCreateOptions, ProtocolCreateOptions,
PubsubTopic PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { DefaultPubsubTopic } from "@waku/interfaces"; import { ensureShardingConfigured, Logger } from "@waku/utils";
import {
ensureShardingConfigured,
Logger,
shardInfoToPubsubTopics
} from "@waku/utils";
import { import {
getConnectedPeersForProtocolAndShard, getConnectedPeersForProtocolAndShard,
getPeersForProtocol, getPeersForProtocol,
@ -32,16 +27,14 @@ export class BaseProtocol implements IBaseProtocol {
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
readonly numPeersToUse: number; readonly numPeersToUse: number;
protected streamManager: StreamManager; protected streamManager: StreamManager;
protected pubsubTopics: PubsubTopic[];
constructor( constructor(
public multicodec: string, public multicodec: string,
private components: Libp2pComponents, private components: Libp2pComponents,
private log: Logger, private log: Logger,
protected pubsubTopics: PubsubTopic[],
private options?: ProtocolCreateOptions private options?: ProtocolCreateOptions
) { ) {
this.pubsubTopics = this.initializePubsubTopic(options);
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
this.addLibp2pEventListener = components.events.addEventListener.bind( this.addLibp2pEventListener = components.events.addEventListener.bind(
@ -143,15 +136,4 @@ export class BaseProtocol implements IBaseProtocol {
return sortedFilteredPeers; return sortedFilteredPeers;
} }
private initializePubsubTopic(
options?: ProtocolCreateOptions
): PubsubTopic[] {
return (
options?.pubsubTopics ??
(options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: [DefaultPubsubTopic])
);
}
} }

View File

@ -358,7 +358,13 @@ class Filter extends BaseProtocol implements IReceiver {
} }
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, options); super(
FilterCodecs.SUBSCRIBE,
libp2p.components,
log,
options!.pubsubTopics!,
options
);
libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e); log.error("Failed to register ", FilterCodecs.PUSH, e);
@ -493,7 +499,7 @@ class Filter extends BaseProtocol implements IReceiver {
} }
export function wakuFilter( export function wakuFilter(
init: Partial<ProtocolCreateOptions> = {} init: ProtocolCreateOptions = { pubsubTopics: [] }
): (libp2p: Libp2p) => IFilter { ): (libp2p: Libp2p) => IFilter {
return (libp2p: Libp2p) => new Filter(libp2p, init); return (libp2p: Libp2p) => new Filter(libp2p, init);
} }

View File

@ -43,7 +43,13 @@ type PreparePushMessageResult =
*/ */
class LightPush extends BaseProtocol implements ILightPush { class LightPush extends BaseProtocol implements ILightPush {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components, log, options); super(
LightPushCodec,
libp2p.components,
log,
options!.pubsubTopics!,
options
);
} }
private async preparePushMessage( private async preparePushMessage(

View File

@ -4,11 +4,10 @@ import type {
IMetadata, IMetadata,
Libp2pComponents, Libp2pComponents,
PeerIdStr, PeerIdStr,
ShardInfo, ShardInfo
ShardingParams
} from "@waku/interfaces"; } from "@waku/interfaces";
import { proto_metadata } from "@waku/proto"; import { proto_metadata } from "@waku/proto";
import { encodeRelayShard, Logger } from "@waku/utils"; import { encodeRelayShard, Logger, shardInfoToPubsubTopics } from "@waku/utils";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
@ -25,10 +24,15 @@ class Metadata extends BaseProtocol implements IMetadata {
handshakesConfirmed: Set<PeerIdStr> = new Set(); handshakesConfirmed: Set<PeerIdStr> = new Set();
constructor( constructor(
public shardInfo: ShardingParams, public shardInfo: ShardInfo,
libp2p: Libp2pComponents libp2p: Libp2pComponents
) { ) {
super(MetadataCodec, libp2p.components, log, shardInfo && { shardInfo }); super(
MetadataCodec,
libp2p.components,
log,
shardInfoToPubsubTopics(shardInfo)
);
this.libp2pComponents = libp2p; this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => { void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData); void this.onRequest(streamData);

View File

@ -76,7 +76,7 @@ class Store extends BaseProtocol implements IStore {
private readonly NUM_PEERS_PROTOCOL = 1; private readonly NUM_PEERS_PROTOCOL = 1;
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components, log, options); super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options);
} }
/** /**

View File

@ -31,11 +31,6 @@ export async function waitForRemotePeer(
): Promise<void> { ): Promise<void> {
protocols = protocols ?? getEnabledProtocols(waku); protocols = protocols ?? getEnabledProtocols(waku);
const isShardingEnabled = waku.shardInfo !== undefined;
const metadataService = isShardingEnabled
? waku.libp2p.services.metadata
: undefined;
if (!waku.isStarted()) return Promise.reject("Waku node is not started"); if (!waku.isStarted()) return Promise.reject("Waku node is not started");
const promises = []; const promises = [];
@ -49,19 +44,25 @@ export async function waitForRemotePeer(
if (protocols.includes(Protocols.Store)) { if (protocols.includes(Protocols.Store)) {
if (!waku.store) if (!waku.store)
throw new Error("Cannot wait for Store peer: protocol not mounted"); throw new Error("Cannot wait for Store peer: protocol not mounted");
promises.push(waitForConnectedPeer(waku.store, metadataService)); promises.push(
waitForConnectedPeer(waku.store, waku.libp2p.services.metadata)
);
} }
if (protocols.includes(Protocols.LightPush)) { if (protocols.includes(Protocols.LightPush)) {
if (!waku.lightPush) if (!waku.lightPush)
throw new Error("Cannot wait for LightPush peer: protocol not mounted"); throw new Error("Cannot wait for LightPush peer: protocol not mounted");
promises.push(waitForConnectedPeer(waku.lightPush, metadataService)); promises.push(
waitForConnectedPeer(waku.lightPush, waku.libp2p.services.metadata)
);
} }
if (protocols.includes(Protocols.Filter)) { if (protocols.includes(Protocols.Filter)) {
if (!waku.filter) if (!waku.filter)
throw new Error("Cannot wait for Filter peer: protocol not mounted"); throw new Error("Cannot wait for Filter peer: protocol not mounted");
promises.push(waitForConnectedPeer(waku.filter, metadataService)); promises.push(
waitForConnectedPeer(waku.filter, waku.libp2p.services.metadata)
);
} }
if (timeoutMs) { if (timeoutMs) {

View File

@ -8,11 +8,10 @@ import type {
IStore, IStore,
Libp2p, Libp2p,
PubsubTopic, PubsubTopic,
ShardingParams,
Waku Waku
} from "@waku/interfaces"; } from "@waku/interfaces";
import { DefaultPubsubTopic, Protocols } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces";
import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; import { Logger } from "@waku/utils";
import { ConnectionManager } from "./connection_manager.js"; import { ConnectionManager } from "./connection_manager.js";
@ -42,6 +41,7 @@ export interface WakuOptions {
* @default {@link @waku/core.DefaultUserAgent} * @default {@link @waku/core.DefaultUserAgent}
*/ */
userAgent?: string; userAgent?: string;
pubsubTopics: PubsubTopic[];
} }
export class WakuNode implements Waku { export class WakuNode implements Waku {
@ -55,20 +55,16 @@ export class WakuNode implements Waku {
constructor( constructor(
options: WakuOptions, options: WakuOptions,
pubsubTopics: PubsubTopic[] = [],
libp2p: Libp2p, libp2p: Libp2p,
private pubsubShardInfo?: ShardingParams,
store?: (libp2p: Libp2p) => IStore, store?: (libp2p: Libp2p) => IStore,
lightPush?: (libp2p: Libp2p) => ILightPush, lightPush?: (libp2p: Libp2p) => ILightPush,
filter?: (libp2p: Libp2p) => IFilter, filter?: (libp2p: Libp2p) => IFilter,
relay?: (libp2p: Libp2p) => IRelay relay?: (libp2p: Libp2p) => IRelay
) { ) {
if (!pubsubShardInfo) { if (options.pubsubTopics.length == 0) {
this.pubsubTopics = throw new Error("At least one pubsub topic must be provided");
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
} }
this.pubsubTopics = options.pubsubTopics;
this.libp2p = libp2p; this.libp2p = libp2p;
@ -110,10 +106,6 @@ export class WakuNode implements Waku {
); );
} }
get shardInfo(): ShardingParams | undefined {
return this.pubsubShardInfo;
}
/** /**
* Dials to the provided peer. * Dials to the provided peer.
* *

View File

@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js";
import type { IFilter } from "./filter.js"; import type { IFilter } from "./filter.js";
import type { Libp2p } from "./libp2p.js"; import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js"; import type { ILightPush } from "./light_push.js";
import { Protocols, ShardingParams } from "./protocols.js"; import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js"; import type { IRelay } from "./relay.js";
import type { IStore } from "./store.js"; import type { IStore } from "./store.js";
@ -16,8 +16,6 @@ export interface Waku {
filter?: IFilter; filter?: IFilter;
lightPush?: ILightPush; lightPush?: ILightPush;
shardInfo?: ShardingParams;
connectionManager: IConnectionManager; connectionManager: IConnectionManager;
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>; dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;

View File

@ -4,7 +4,8 @@ import type {
IPeerExchange, IPeerExchange,
Libp2pComponents, Libp2pComponents,
PeerExchangeQueryParams, PeerExchangeQueryParams,
PeerInfo PeerInfo,
PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { isDefined } from "@waku/utils"; import { isDefined } from "@waku/utils";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
@ -26,8 +27,8 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
/** /**
* @param components - libp2p components * @param components - libp2p components
*/ */
constructor(components: Libp2pComponents) { constructor(components: Libp2pComponents, pubsubTopics: PubsubTopic[]) {
super(PeerExchangeCodec, components, log); super(PeerExchangeCodec, components, log, pubsubTopics);
} }
/** /**
@ -91,8 +92,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* *
* @returns A function that creates a new peer exchange protocol * @returns A function that creates a new peer exchange protocol
*/ */
export function wakuPeerExchange(): ( export function wakuPeerExchange(
components: Libp2pComponents pubsubTopics: PubsubTopic[]
) => WakuPeerExchange { ): (components: Libp2pComponents) => WakuPeerExchange {
return (components: Libp2pComponents) => new WakuPeerExchange(components); return (components: Libp2pComponents) =>
new WakuPeerExchange(components, pubsubTopics);
} }

View File

@ -7,7 +7,7 @@ import type {
PeerId, PeerId,
PeerInfo PeerInfo
} from "@libp2p/interface"; } from "@libp2p/interface";
import { Libp2pComponents, Tags } from "@waku/interfaces"; import { Libp2pComponents, PubsubTopic, Tags } from "@waku/interfaces";
import { encodeRelayShard, Logger } from "@waku/utils"; import { encodeRelayShard, Logger } from "@waku/utils";
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
@ -77,10 +77,14 @@ export class PeerExchangeDiscovery
); );
}; };
constructor(components: Libp2pComponents, options: Options = {}) { constructor(
components: Libp2pComponents,
pubsubTopics: PubsubTopic[],
options: Options = {}
) {
super(); super();
this.components = components; this.components = components;
this.peerExchange = new WakuPeerExchange(components); this.peerExchange = new WakuPeerExchange(components, pubsubTopics);
this.options = options; this.options = options;
this.isStarted = false; this.isStarted = false;
} }
@ -219,9 +223,9 @@ export class PeerExchangeDiscovery
} }
} }
export function wakuPeerExchangeDiscovery(): ( export function wakuPeerExchangeDiscovery(
components: Libp2pComponents pubsubTopics: PubsubTopic[]
) => PeerExchangeDiscovery { ): (components: Libp2pComponents) => PeerExchangeDiscovery {
return (components: Libp2pComponents) => return (components: Libp2pComponents) =>
new PeerExchangeDiscovery(components); new PeerExchangeDiscovery(components, pubsubTopics);
} }

View File

@ -24,11 +24,7 @@ import {
SendError, SendError,
SendResult SendResult
} from "@waku/interfaces"; } from "@waku/interfaces";
import { import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils";
isWireSizeUnderCap,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { pushOrInitMapSet } from "@waku/utils"; import { pushOrInitMapSet } from "@waku/utils";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
@ -63,7 +59,7 @@ class Relay implements IRelay {
*/ */
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>; private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
constructor(libp2p: Libp2p, options?: Partial<RelayCreateOptions>) { constructor(libp2p: Libp2p, pubsubTopics: PubsubTopic[]) {
if (!this.isRelayPubsub(libp2p.services.pubsub)) { if (!this.isRelayPubsub(libp2p.services.pubsub)) {
throw Error( throw Error(
`Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}` `Failed to initialize Relay. libp2p.pubsub does not support ${Relay.multicodec}`
@ -71,11 +67,7 @@ class Relay implements IRelay {
} }
this.gossipSub = libp2p.services.pubsub as GossipSub; this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = new Set( this.pubsubTopics = new Set(pubsubTopics);
options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: options?.pubsubTopics ?? [DefaultPubsubTopic]
);
if (this.gossipSub.isStarted()) { if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics(); this.subscribeToAllTopics();
@ -283,9 +275,9 @@ class Relay implements IRelay {
} }
export function wakuRelay( export function wakuRelay(
init: Partial<ProtocolCreateOptions> = {} pubsubTopics: PubsubTopic[]
): (libp2p: Libp2p) => IRelay { ): (libp2p: Libp2p) => IRelay {
return (libp2p: Libp2p) => new Relay(libp2p, init); return (libp2p: Libp2p) => new Relay(libp2p, pubsubTopics);
} }
export function wakuGossipSub( export function wakuGossipSub(

View File

@ -18,12 +18,14 @@ import {
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery"; import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import { import {
type CreateLibp2pOptions, type CreateLibp2pOptions,
DefaultPubsubTopic,
type FullNode, type FullNode,
type IMetadata, type IMetadata,
type Libp2p, type Libp2p,
type Libp2pComponents, type Libp2pComponents,
type LightNode, type LightNode,
type ProtocolCreateOptions, type ProtocolCreateOptions,
PubsubTopic,
type ShardInfo type ShardInfo
} from "@waku/interfaces"; } from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
@ -43,20 +45,24 @@ export { Libp2pComponents };
* Create a Waku node configured to use autosharding or static sharding. * Create a Waku node configured to use autosharding or static sharding.
*/ */
export async function createNode( export async function createNode(
options?: ProtocolCreateOptions & WakuOptions & Partial<RelayCreateOptions> options?: ProtocolCreateOptions &
Partial<WakuOptions> &
Partial<RelayCreateOptions>
): Promise<LightNode> { ): Promise<LightNode> {
options = options ?? {}; options = options ?? { pubsubTopics: [] };
if (!options.shardInfo) { if (!options.shardInfo) {
throw new Error("Shard info must be set"); throw new Error("Shard info must be set");
} }
const shardInfo = ensureShardingConfigured(options.shardInfo); const shardInfo = ensureShardingConfigured(options.shardInfo);
options.pubsubTopics = shardInfo.pubsubTopics;
options.shardInfo = shardInfo.shardInfo;
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) { if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries()); peerDiscovery.push(...defaultPeerDiscoveries(shardInfo.pubsubTopics));
Object.assign(libp2pOptions, { peerDiscovery }); Object.assign(libp2pOptions, { peerDiscovery });
} }
@ -72,10 +78,8 @@ export async function createNode(
const filter = wakuFilter(options); const filter = wakuFilter(options);
return new WakuNode( return new WakuNode(
options ?? {}, options as WakuOptions,
[],
libp2p, libp2p,
shardInfo.shardInfo,
store, store,
lightPush, lightPush,
filter filter
@ -88,7 +92,7 @@ export async function createNode(
* Uses Waku Filter V2 by default. * Uses Waku Filter V2 by default.
*/ */
export async function createLightNode( export async function createLightNode(
options?: ProtocolCreateOptions & WakuOptions options?: ProtocolCreateOptions & Partial<WakuOptions>
): Promise<LightNode> { ): Promise<LightNode> {
options = options ?? {}; options = options ?? {};
@ -96,10 +100,13 @@ export async function createLightNode(
? ensureShardingConfigured(options.shardInfo) ? ensureShardingConfigured(options.shardInfo)
: undefined; : undefined;
options.pubsubTopics = shardInfo?.pubsubTopics ??
options.pubsubTopics ?? [DefaultPubsubTopic];
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) { if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries()); peerDiscovery.push(...defaultPeerDiscoveries(options.pubsubTopics));
Object.assign(libp2pOptions, { peerDiscovery }); Object.assign(libp2pOptions, { peerDiscovery });
} }
@ -115,10 +122,8 @@ export async function createLightNode(
const filter = wakuFilter(options); const filter = wakuFilter(options);
return new WakuNode( return new WakuNode(
options ?? {}, options as WakuOptions,
options.pubsubTopics,
libp2p, libp2p,
shardInfo?.shardingParams,
store, store,
lightPush, lightPush,
filter filter
@ -139,18 +144,25 @@ export async function createLightNode(
* @internal * @internal
*/ */
export async function createFullNode( export async function createFullNode(
options?: ProtocolCreateOptions & WakuOptions & Partial<RelayCreateOptions> options?: ProtocolCreateOptions &
Partial<WakuOptions> &
Partial<RelayCreateOptions>
): Promise<FullNode> { ): Promise<FullNode> {
options = options ?? {}; options = options ?? { pubsubTopics: [] };
const shardInfo = options.shardInfo const shardInfo = options.shardInfo
? ensureShardingConfigured(options.shardInfo) ? ensureShardingConfigured(options.shardInfo)
: undefined; : undefined;
const pubsubTopics = shardInfo?.pubsubTopics ??
options.pubsubTopics ?? [DefaultPubsubTopic];
options.pubsubTopics = pubsubTopics;
options.shardInfo = shardInfo?.shardInfo;
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) { if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries()); peerDiscovery.push(...defaultPeerDiscoveries(pubsubTopics));
Object.assign(libp2pOptions, { peerDiscovery }); Object.assign(libp2pOptions, { peerDiscovery });
} }
@ -164,13 +176,11 @@ export async function createFullNode(
const store = wakuStore(options); const store = wakuStore(options);
const lightPush = wakuLightPush(options); const lightPush = wakuLightPush(options);
const filter = wakuFilter(options); const filter = wakuFilter(options);
const relay = wakuRelay(options); const relay = wakuRelay(pubsubTopics);
return new WakuNode( return new WakuNode(
options ?? {}, options as WakuOptions,
options.pubsubTopics,
libp2p, libp2p,
shardInfo?.shardingParams,
store, store,
lightPush, lightPush,
filter, filter,
@ -178,12 +188,12 @@ export async function createFullNode(
) as FullNode; ) as FullNode;
} }
export function defaultPeerDiscoveries(): (( export function defaultPeerDiscoveries(
components: Libp2pComponents pubsubTopics: PubsubTopic[]
) => PeerDiscovery)[] { ): ((components: Libp2pComponents) => PeerDiscovery)[] {
const discoveries = [ const discoveries = [
wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS), wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS),
wakuPeerExchangeDiscovery() wakuPeerExchangeDiscovery(pubsubTopics)
]; ];
return discoveries; return discoveries;
} }

View File

@ -1,5 +1,9 @@
import { WakuNode, WakuOptions } from "@waku/core"; import { WakuNode, WakuOptions } from "@waku/core";
import type { ProtocolCreateOptions, RelayNode } from "@waku/interfaces"; import {
DefaultPubsubTopic,
type ProtocolCreateOptions,
type RelayNode
} from "@waku/interfaces";
import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay"; import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils"; import { ensureShardingConfigured } from "@waku/utils";
@ -16,21 +20,27 @@ import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js";
* or use this function with caution. * or use this function with caution.
*/ */
export async function createRelayNode( export async function createRelayNode(
options?: ProtocolCreateOptions & WakuOptions & Partial<RelayCreateOptions> options?: ProtocolCreateOptions &
Partial<WakuOptions> &
Partial<RelayCreateOptions>
): Promise<RelayNode> { ): Promise<RelayNode> {
options = options ?? {}; options = options ?? { pubsubTopics: [] };
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries());
Object.assign(libp2pOptions, { peerDiscovery });
}
const shardInfo = options.shardInfo const shardInfo = options.shardInfo
? ensureShardingConfigured(options.shardInfo) ? ensureShardingConfigured(options.shardInfo)
: undefined; : undefined;
options.pubsubTopics = shardInfo?.pubsubTopics ??
options.pubsubTopics ?? [DefaultPubsubTopic];
if (options?.defaultBootstrap) {
peerDiscovery.push(...defaultPeerDiscoveries(options.pubsubTopics));
Object.assign(libp2pOptions, { peerDiscovery });
}
const libp2p = await defaultLibp2p( const libp2p = await defaultLibp2p(
shardInfo?.shardInfo, shardInfo?.shardInfo,
wakuGossipSub(options), wakuGossipSub(options),
@ -38,13 +48,11 @@ export async function createRelayNode(
options?.userAgent options?.userAgent
); );
const relay = wakuRelay(options); const relay = wakuRelay(options.pubsubTopics);
return new WakuNode( return new WakuNode(
options, options as WakuOptions,
options.pubsubTopics,
libp2p, libp2p,
shardInfo?.shardingParams,
undefined, undefined,
undefined, undefined,
undefined, undefined,

View File

@ -24,7 +24,9 @@ describe("ConnectionManager", function () {
let waku: LightNode; let waku: LightNode;
beforeEach(async function () { beforeEach(async function () {
waku = await createLightNode(); waku = await createLightNode({
shardInfo: { shards: [0] }
});
}); });
afterEach(async () => { afterEach(async () => {
@ -271,7 +273,7 @@ describe("ConnectionManager", function () {
this.beforeEach(async function () { this.beforeEach(async function () {
this.timeout(15000); this.timeout(15000);
waku = await createLightNode(); waku = await createLightNode({ shardInfo: { shards: [0] } });
isPeerTopicConfigured = sinon.stub( isPeerTopicConfigured = sinon.stub(
waku.connectionManager as any, waku.connectionManager as any,
"isPeerTopicConfigured" "isPeerTopicConfigured"

View File

@ -63,7 +63,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
waku.libp2p.getConnections(), waku.libp2p.getConnections(),
waku.libp2p.peerStore, waku.libp2p.peerStore,
waku.libp2p.getProtocols(), waku.libp2p.getProtocols(),
shardInfo ensureShardingConfigured(shardInfo).shardInfo
); );
expect(peers.length).to.be.greaterThan(0); expect(peers.length).to.be.greaterThan(0);
}); });

View File

@ -7,7 +7,12 @@ import {
PeerExchangeDiscovery, PeerExchangeDiscovery,
WakuPeerExchange WakuPeerExchange
} from "@waku/peer-exchange"; } from "@waku/peer-exchange";
import { createLightNode, Libp2pComponents } from "@waku/sdk"; import {
createLightNode,
DEFAULT_CLUSTER_ID,
DefaultPubsubTopic,
Libp2pComponents
} from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
import { import {
@ -34,13 +39,14 @@ describe("Peer Exchange", () => {
await tearDownNodes([nwaku1, nwaku2], waku); await tearDownNodes([nwaku1, nwaku2], waku);
}); });
it("nwaku interop", async function () { it.skip("nwaku interop", async function () {
this.timeout(55_000); this.timeout(55_000);
await nwaku1.start({ await nwaku1.start({
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true peerExchange: true,
clusterId: DEFAULT_CLUSTER_ID
}); });
const enr = (await nwaku1.info()).enrUri; const enr = (await nwaku1.info()).enrUri;
@ -49,20 +55,23 @@ describe("Peer Exchange", () => {
relay: true, relay: true,
discv5Discovery: true, discv5Discovery: true,
peerExchange: true, peerExchange: true,
discv5BootstrapNode: enr discv5BootstrapNode: enr,
clusterId: DEFAULT_CLUSTER_ID
}); });
const nwaku1PeerId = await nwaku1.getPeerId(); const nwaku1PeerId = await nwaku1.getPeerId();
const nwaku2PeerId = await nwaku2.getPeerId(); const nwaku2PeerId = await nwaku2.getPeerId();
const nwaku2Ma = await nwaku2.getMultiaddrWithId(); const nwaku2Ma = await nwaku2.getMultiaddrWithId();
waku = await createLightNode(); waku = await createLightNode({ shardInfo: { shards: [0] } });
await waku.start(); await waku.start();
await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku2PeerId); await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku2PeerId);
const components = waku.libp2p.components as unknown as Libp2pComponents; const components = waku.libp2p.components as unknown as Libp2pComponents;
const peerExchange = new WakuPeerExchange(components); const peerExchange = new WakuPeerExchange(components, [
DefaultPubsubTopic
]);
const numPeersToRequest = 1; const numPeersToRequest = 1;
@ -149,7 +158,9 @@ describe("Peer Exchange", () => {
void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec);
}, 1000); }, 1000);
return new PeerExchangeDiscovery(waku.libp2p.components); return new PeerExchangeDiscovery(waku.libp2p.components, [
DefaultPubsubTopic
]);
}, },
teardown: async () => { teardown: async () => {
this.timeout(15000); this.timeout(15000);

View File

@ -5,7 +5,7 @@ import {
} from "@waku/core/lib/predefined_bootstrap_nodes"; } from "@waku/core/lib/predefined_bootstrap_nodes";
import type { LightNode } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { createLightNode } from "@waku/sdk"; import { createLightNode, DefaultPubsubTopic } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
import { tearDownNodes } from "../src"; import { tearDownNodes } from "../src";
@ -33,7 +33,7 @@ describe("Peer Exchange", () => {
libp2p: { libp2p: {
peerDiscovery: [ peerDiscovery: [
bootstrap({ list: predefinedNodes }), bootstrap({ list: predefinedNodes }),
wakuPeerExchangeDiscovery() wakuPeerExchangeDiscovery([DefaultPubsubTopic])
] ]
} }
}); });

View File

@ -88,7 +88,7 @@ describe("Static Sharding: Peer Management", function () {
libp2p: { libp2p: {
peerDiscovery: [ peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }), bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery() wakuPeerExchangeDiscovery(pubsubTopics)
] ]
} }
}); });
@ -163,7 +163,7 @@ describe("Static Sharding: Peer Management", function () {
libp2p: { libp2p: {
peerDiscovery: [ peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }), bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery() wakuPeerExchangeDiscovery(pubsubTopicsToDial)
] ]
} }
}); });
@ -262,7 +262,7 @@ describe("Autosharding: Peer Management", function () {
libp2p: { libp2p: {
peerDiscovery: [ peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }), bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery() wakuPeerExchangeDiscovery(pubsubTopics)
] ]
} }
}); });
@ -336,7 +336,7 @@ describe("Autosharding: Peer Management", function () {
libp2p: { libp2p: {
peerDiscovery: [ peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }), bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery() wakuPeerExchangeDiscovery(pubsubTopicsToDial)
] ]
} }
}); });

View File

@ -131,7 +131,7 @@ describe("Autosharding: Running Nodes", () => {
this.timeout(15_000); this.timeout(15_000);
waku = await createLightNode({ waku = await createLightNode({
shardInfo: { shardInfo: {
...shardInfoBothShards, clusterId: 0,
// For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards
contentTopics: [ContentTopic, ContentTopic2] contentTopics: [ContentTopic, ContentTopic2]
} }

View File

@ -280,9 +280,7 @@ export const ensureShardingConfigured = (
) )
); );
const shards = Array.from( const shards = Array.from(
new Set( new Set(contentTopics.map((topic) => contentTopicToShardIndex(topic)))
contentTopics.map((topic) => contentTopicToShardIndex(topic, clusterId))
)
); );
return { return {
shardingParams: { clusterId, contentTopics }, shardingParams: { clusterId, contentTopics },