diff --git a/packages/core/src/lib/connection_manager/connection_manager.spec.ts b/packages/core/src/lib/connection_manager/connection_manager.spec.ts index 40c33a921b..25235cba12 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.spec.ts @@ -157,7 +157,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); @@ -168,7 +167,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -180,7 +178,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); @@ -197,7 +194,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, config: customConfig }); @@ -209,7 +205,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -224,7 +219,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -255,7 +249,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -287,7 +280,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -316,7 +308,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -367,7 +358,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -409,7 +399,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -540,22 +529,9 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); - - it("should return true when topic is configured", () => { - const result = connectionManager.isTopicConfigured("/waku/2/rs/1/0"); - - expect(result).to.be.true; - }); - - it("should return false when topic is not configured", () => { - const result = connectionManager.isTopicConfigured("/waku/2/rs/1/99"); - - expect(result).to.be.false; - }); }); describe("isPeerOnTopic", () => { @@ -563,7 +539,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 4dca6d9164..952ab32d53 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -5,8 +5,7 @@ import { IConnectionManager, IRelay, IWakuEventEmitter, - NetworkConfig, - PubsubTopic + NetworkConfig } from "@waku/interfaces"; import { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -33,15 +32,12 @@ const DEFAULT_DIAL_COOLDOWN_SEC = 10; type ConnectionManagerConstructorOptions = { libp2p: Libp2p; events: IWakuEventEmitter; - pubsubTopics: PubsubTopic[]; networkConfig: NetworkConfig; relay?: IRelay; config?: Partial; }; export class ConnectionManager implements IConnectionManager { - private readonly pubsubTopics: PubsubTopic[]; - private readonly keepAliveManager: KeepAliveManager; private readonly discoveryDialer: DiscoveryDialer; private readonly dialer: Dialer; @@ -54,7 +50,6 @@ export class ConnectionManager implements IConnectionManager { public constructor(options: ConnectionManagerConstructorOptions) { this.libp2p = options.libp2p; - this.pubsubTopics = options.pubsubTopics; this.options = { maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, @@ -189,10 +184,6 @@ export class ConnectionManager implements IConnectionManager { return result; } - public isTopicConfigured(pubsubTopic: PubsubTopic): boolean { - return this.pubsubTopics.includes(pubsubTopic); - } - public async hasShardInfo(peerId: PeerId): Promise { return this.shardReader.hasShardInfo(peerId); } diff --git a/packages/core/src/lib/metadata/metadata.ts b/packages/core/src/lib/metadata/metadata.ts index 10e4b7616c..ac4707e575 100644 --- a/packages/core/src/lib/metadata/metadata.ts +++ b/packages/core/src/lib/metadata/metadata.ts @@ -1,16 +1,16 @@ import type { PeerId } from "@libp2p/interface"; import { IncomingStreamData } from "@libp2p/interface"; import { + type ClusterId, type IMetadata, type Libp2pComponents, type MetadataQueryResult, type PeerIdStr, ProtocolError, - PubsubTopic, type ShardInfo } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; -import { encodeRelayShard, Logger, pubsubTopicsToShardInfo } from "@waku/utils"; +import { encodeRelayShard, Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -30,7 +30,7 @@ class Metadata implements IMetadata { public readonly multicodec = MetadataCodec; public constructor( - public pubsubTopics: PubsubTopic[], + public clusterId: ClusterId, libp2p: Libp2pComponents ) { this.streamManager = new StreamManager(MetadataCodec, libp2p); @@ -44,9 +44,10 @@ class Metadata implements IMetadata { * Make a metadata query to a peer */ public async query(peerId: PeerId): Promise { - const request = proto_metadata.WakuMetadataRequest.encode( - pubsubTopicsToShardInfo(this.pubsubTopics) - ); + const request = proto_metadata.WakuMetadataRequest.encode({ + clusterId: this.clusterId, + shards: [] // Only services node need to provide shards + }); const peer = await this.libp2pComponents.peerStore.get(peerId); if (!peer) { @@ -112,9 +113,10 @@ class Metadata implements IMetadata { private async onRequest(streamData: IncomingStreamData): Promise { try { const { stream, connection } = streamData; - const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode( - pubsubTopicsToShardInfo(this.pubsubTopics) - ); + const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode({ + clusterId: this.clusterId, + shards: [] // Only service nodes need to provide shards + }); const encodedResponse = await pipe( [encodedShardInfo], @@ -178,8 +180,7 @@ class Metadata implements IMetadata { } export function wakuMetadata( - pubsubTopics: PubsubTopic[] + clusterId: ClusterId ): (components: Libp2pComponents) => IMetadata { - return (components: Libp2pComponents) => - new Metadata(pubsubTopics, components); + return (components: Libp2pComponents) => new Metadata(clusterId, components); } diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 35640378df..301ca94155 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -1,8 +1,6 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; -import type { PubsubTopic } from "./misc.js"; - // Peer tags export enum Tags { BOOTSTRAP = "bootstrap", @@ -156,22 +154,6 @@ export interface IConnectionManager { */ getConnectedPeers(codec?: string): Promise; - /** - * Checks if a specific pubsub topic is configured in the connection manager. - * - * @param pubsubTopic - The pubsub topic to check - * @returns True if the topic is configured, false otherwise - * - * @example - * ```typescript - * const isConfigured = connectionManager.isTopicConfigured("/waku/2/default-waku/proto"); - * if (isConfigured) { - * console.log("Topic is configured"); - * } - * ``` - */ - isTopicConfigured(pubsubTopic: PubsubTopic): boolean; - /** * Checks if a peer has shard info. * diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index 91261ead37..32ce59c2e6 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,13 +1,13 @@ import type { PeerId } from "@libp2p/interface"; -import { PubsubTopic, ThisOrThat } from "./misc.js"; -import type { ShardInfo } from "./sharding.js"; +import { ThisOrThat } from "./misc.js"; +import type { ClusterId, ShardInfo } from "./sharding.js"; export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>; export interface IMetadata { readonly multicodec: string; - readonly pubsubTopics: PubsubTopic[]; + readonly clusterId: ClusterId; confirmOrAttemptHandshake(peerId: PeerId): Promise; query(peerId: PeerId): Promise; } diff --git a/packages/interfaces/src/sharding.ts b/packages/interfaces/src/sharding.ts index 110bf0e74f..c2364b6396 100644 --- a/packages/interfaces/src/sharding.ts +++ b/packages/interfaces/src/sharding.ts @@ -4,9 +4,10 @@ export type ShardInfo = { }; export type ContentTopicInfo = { - clusterId?: number; + clusterId?: number; // TODO: This should be mandatory on a network config contentTopics: string[]; }; export type StaticSharding = ShardInfo; export type AutoSharding = ContentTopicInfo; +export type ClusterId = number; diff --git a/packages/relay/src/create.ts b/packages/relay/src/create.ts index 49702ad356..0d330dffd6 100644 --- a/packages/relay/src/create.ts +++ b/packages/relay/src/create.ts @@ -1,5 +1,7 @@ import type { CreateNodeOptions, RelayNode } from "@waku/interfaces"; +import { DefaultNetworkConfig } from "@waku/interfaces"; import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk"; +import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; import { Relay, RelayCreateOptions, wakuGossipSub } from "./relay.js"; @@ -26,14 +28,16 @@ export async function createRelayNode( } }; - const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); + const libp2p = await createLibp2pAndUpdateOptions(options); + const pubsubTopics = derivePubsubTopicsFromNetworkConfig( + options.networkConfig ?? DefaultNetworkConfig + ); const relay = new Relay({ - pubsubTopics: pubsubTopics || [], + pubsubTopics, libp2p }); const node = new WakuNode( - pubsubTopics, options as CreateNodeOptions, libp2p, {}, diff --git a/packages/sdk/src/create/create.ts b/packages/sdk/src/create/create.ts index 3f11e56048..d3838288c7 100644 --- a/packages/sdk/src/create/create.ts +++ b/packages/sdk/src/create/create.ts @@ -12,9 +12,9 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js"; export async function createLightNode( options: CreateNodeOptions = {} ): Promise { - const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); + const libp2p = await createLibp2pAndUpdateOptions(options); - const node = new WakuNode(pubsubTopics, options, libp2p, { + const node = new WakuNode(options, libp2p, { store: true, lightpush: true, filter: true diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index 8f91d3660b..958605ddce 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -7,32 +7,27 @@ import { webSockets } from "@libp2p/websockets"; import { all as filterAll, wss } from "@libp2p/websockets/filters"; import { wakuMetadata } from "@waku/core"; import { + type ClusterId, type CreateLibp2pOptions, type CreateNodeOptions, + DEFAULT_CLUSTER_ID, DefaultNetworkConfig, - type IMetadata, - type Libp2p, - type Libp2pComponents, - PubsubTopic + type Libp2p } from "@waku/interfaces"; -import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; +import { Logger } from "@waku/utils"; import { createLibp2p } from "libp2p"; import { isTestEnvironment } from "../env.js"; import { getPeerDiscoveries } from "./discovery.js"; -type MetadataService = { - metadata?: (components: Libp2pComponents) => IMetadata; -}; - const log = new Logger("sdk:create"); const DefaultUserAgent = "js-waku"; const DefaultPingMaxInboundStreams = 10; export async function defaultLibp2p( - pubsubTopics: PubsubTopic[], + clusterId: ClusterId, options?: Partial, userAgent?: string ): Promise { @@ -49,10 +44,6 @@ export async function defaultLibp2p( /* eslint-enable no-console */ } - const metadataService: MetadataService = pubsubTopics - ? { metadata: wakuMetadata(pubsubTopics) } - : {}; - const filter = options?.filterMultiaddrs === false || isTestEnvironment() ? filterAll @@ -71,7 +62,7 @@ export async function defaultLibp2p( maxInboundStreams: options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams }), - ...metadataService, + metadata: wakuMetadata(clusterId), ...options?.services } }) as any as Libp2p; // TODO: make libp2p include it; @@ -85,12 +76,11 @@ const DEFAULT_DISCOVERIES_ENABLED = { export async function createLibp2pAndUpdateOptions( options: CreateNodeOptions -): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { - const { networkConfig } = options; - const pubsubTopics = derivePubsubTopicsFromNetworkConfig( - networkConfig ?? DefaultNetworkConfig - ); - log.info("Creating Waku node with pubsub topics", pubsubTopics); +): Promise { + const networkConfig = options.networkConfig ?? DefaultNetworkConfig; + const clusterId = networkConfig.clusterId ?? DEFAULT_CLUSTER_ID; + + log.info("Creating Waku node with cluster id: ", clusterId); const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; @@ -117,11 +107,5 @@ export async function createLibp2pAndUpdateOptions( libp2pOptions.peerDiscovery = peerDiscovery; - const libp2p = await defaultLibp2p( - pubsubTopics, - libp2pOptions, - options?.userAgent - ); - - return { libp2p, pubsubTopics }; + return defaultLibp2p(clusterId, libp2pOptions, options?.userAgent); } diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index 440303e56d..2010ba576c 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -37,22 +37,6 @@ describe("Filter SDK", () => { sinon.restore(); }); - it("should throw error when subscribing with unsupported pubsub topic", async () => { - const unsupportedDecoder = createDecoder( - CONTENT_TOPIC, - "/unsupported/topic" - ); - - try { - await filter.subscribe(unsupportedDecoder, callback); - expect.fail("Should have thrown an error"); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic /unsupported/topic has not been configured on this instance." - ); - } - }); - it("should successfully subscribe to supported pubsub topic", async () => { const addStub = sinon.stub(Subscription.prototype, "add").resolves(true); const startStub = sinon.stub(Subscription.prototype, "start"); @@ -64,22 +48,6 @@ describe("Filter SDK", () => { expect(startStub.calledOnce).to.be.true; }); - it("should throw error when unsubscribing with unsupported pubsub topic", async () => { - const unsupportedDecoder = createDecoder( - CONTENT_TOPIC, - "/unsupported/topic" - ); - - try { - await filter.unsubscribe(unsupportedDecoder); - expect.fail("Should have thrown an error"); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic /unsupported/topic has not been configured on this instance." - ); - } - }); - it("should return false when unsubscribing from a non-existing subscription", async () => { const result = await filter.unsubscribe(decoder); expect(result).to.be.false; @@ -183,9 +151,9 @@ type MockFilterOptions = { }; function mockFilter(options: MockFilterOptions): Filter { - const filter = new Filter({ + // we're not actually testing FilterCore functionality here + return new Filter({ libp2p: options.libp2p, - connectionManager: options.connectionManager || mockConnectionManager(), peerManager: options.peerManager || mockPeerManager(), options: { numPeersToUse: 2, @@ -193,9 +161,6 @@ function mockFilter(options: MockFilterOptions): Filter { keepAliveIntervalMs: 60_000 } }); - - // we're not actually testing FilterCore functionality here - return filter; } function createMockMessage(contentTopic: string): IProtoMessage { diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 3fc144ff39..43895fab7c 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -1,4 +1,4 @@ -import { ConnectionManager, FilterCore } from "@waku/core"; +import { FilterCore } from "@waku/core"; import type { Callback, FilterProtocolOptions, @@ -21,7 +21,6 @@ type PubsubTopic = string; export class Filter implements IFilter { private readonly protocol: FilterCore; private readonly peerManager: PeerManager; - private readonly connectionManager: ConnectionManager; private readonly config: FilterProtocolOptions; private subscriptions = new Map(); @@ -35,7 +34,6 @@ export class Filter implements IFilter { }; this.peerManager = params.peerManager; - this.connectionManager = params.connectionManager; this.protocol = new FilterCore( this.onIncomingMessage.bind(this), @@ -75,7 +73,6 @@ export class Filter implements IFilter { ); this.throwIfTopicNotSame(pubsubTopics); - this.throwIfTopicNotSupported(singlePubsubTopic); let subscription = this.subscriptions.get(singlePubsubTopic); if (!subscription) { @@ -117,7 +114,6 @@ export class Filter implements IFilter { ); this.throwIfTopicNotSame(pubsubTopics); - this.throwIfTopicNotSupported(singlePubsubTopic); const subscription = this.subscriptions.get(singlePubsubTopic); if (!subscription) { @@ -170,15 +166,4 @@ export class Filter implements IFilter { ); } } - - private throwIfTopicNotSupported(pubsubTopic: string): void { - const supportedPubsubTopic = - this.connectionManager.isTopicConfigured(pubsubTopic); - - if (!supportedPubsubTopic) { - throw Error( - `Pubsub topic ${pubsubTopic} has not been configured on this instance.` - ); - } - } } diff --git a/packages/sdk/src/filter/types.ts b/packages/sdk/src/filter/types.ts index 34842a1b22..44326728d1 100644 --- a/packages/sdk/src/filter/types.ts +++ b/packages/sdk/src/filter/types.ts @@ -1,15 +1,13 @@ -import { ConnectionManager } from "@waku/core"; -import { FilterCore } from "@waku/core"; +import type { FilterCore } from "@waku/core"; import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces"; -import { WakuMessage } from "@waku/proto"; +import type { WakuMessage } from "@waku/proto"; -import { PeerManager } from "../peer_manager/index.js"; +import type { PeerManager } from "../peer_manager/index.js"; export type FilterConstructorParams = { options?: Partial; libp2p: Libp2p; peerManager: PeerManager; - connectionManager: ConnectionManager; }; export type SubscriptionEvents = { diff --git a/packages/sdk/src/light_push/light_push.spec.ts b/packages/sdk/src/light_push/light_push.spec.ts index 0d44a430f2..114f0e413c 100644 --- a/packages/sdk/src/light_push/light_push.spec.ts +++ b/packages/sdk/src/light_push/light_push.spec.ts @@ -1,10 +1,5 @@ import { Peer, PeerId } from "@libp2p/interface"; -import { - ConnectionManager, - createEncoder, - Encoder, - LightPushCodec -} from "@waku/core"; +import { createEncoder, Encoder, LightPushCodec } from "@waku/core"; import { Libp2p, ProtocolError } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -14,7 +9,6 @@ import { PeerManager } from "../peer_manager/index.js"; import { LightPush } from "./light_push.js"; -const PUBSUB_TOPIC = "/waku/2/rs/1/4"; const CONTENT_TOPIC = "/test/1/waku-light-push/utf8"; describe("LightPush SDK", () => { @@ -28,19 +22,6 @@ describe("LightPush SDK", () => { lightPush = mockLightPush({ libp2p }); }); - it("should fail to send if pubsub topics are misconfigured", async () => { - lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] }); - - const result = await lightPush.send(encoder, { - payload: utf8ToBytes("test") - }); - const failures = result.failures ?? []; - - expect(failures.length).to.be.eq(1); - expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED)) - .to.be.true; - }); - it("should fail to send if no connected peers found", async () => { const result = await lightPush.send(encoder, { payload: utf8ToBytes("test") @@ -168,10 +149,6 @@ type MockLightPushOptions = { function mockLightPush(options: MockLightPushOptions): LightPush { const lightPush = new LightPush({ - connectionManager: { - isTopicConfigured: (topic: string) => - (options.pubsubTopics || [PUBSUB_TOPIC]).includes(topic) - } as unknown as ConnectionManager, peerManager: { getPeers: () => options.libp2p diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 18719ec210..5789f351bd 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, LightPushCore } from "@waku/core"; +import { LightPushCore } from "@waku/core"; import { type CoreProtocolResult, Failure, @@ -8,7 +8,7 @@ import { type IMessage, type ISendOptions, type Libp2p, - LightPushProtocolOptions, + type LightPushProtocolOptions, ProtocolError, Protocols, SDKProtocolResult @@ -30,7 +30,6 @@ const DEFAULT_SEND_OPTIONS: LightPushProtocolOptions = { }; type LightPushConstructorParams = { - connectionManager: ConnectionManager; peerManager: PeerManager; libp2p: Libp2p; options?: Partial; @@ -40,7 +39,6 @@ export class LightPush implements ILightPush { private readonly config: LightPushProtocolOptions; private readonly retryManager: RetryManager; private readonly peerManager: PeerManager; - private readonly connectionManager: ConnectionManager; private readonly protocol: LightPushCore; public constructor(params: LightPushConstructorParams) { @@ -50,7 +48,6 @@ export class LightPush implements ILightPush { } as LightPushProtocolOptions; this.peerManager = params.peerManager; - this.connectionManager = params.connectionManager; this.protocol = new LightPushCore(params.libp2p); this.retryManager = new RetryManager({ peerManager: params.peerManager, @@ -84,17 +81,6 @@ export class LightPush implements ILightPush { log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); - if (!this.connectionManager.isTopicConfigured(pubsubTopic)) { - return { - successes: [], - failures: [ - { - error: ProtocolError.TOPIC_NOT_CONFIGURED - } - ] - }; - } - const peerIds = await this.peerManager.getPeers({ protocol: Protocols.LightPush, pubsubTopic: encoder.pubsubTopic diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 874244a35a..126cab8678 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface"; import { peerIdFromString } from "@libp2p/peer-id"; import { multiaddr } from "@multiformats/multiaddr"; -import { ConnectionManager, messageHash, StoreCore } from "@waku/core"; +import { messageHash, StoreCore } from "@waku/core"; import { IDecodedMessage, IDecoder, @@ -21,7 +21,6 @@ const log = new Logger("waku:store:sdk"); type StoreConstructorParams = { libp2p: Libp2p; peerManager: PeerManager; - connectionManager: ConnectionManager; options?: Partial; }; @@ -33,13 +32,11 @@ export class Store implements IStore { private readonly options: Partial; private readonly libp2p: Libp2p; private readonly peerManager: PeerManager; - private readonly connectionManager: ConnectionManager; private readonly protocol: StoreCore; public constructor(params: StoreConstructorParams) { this.options = params.options || {}; this.peerManager = params.peerManager; - this.connectionManager = params.connectionManager; this.libp2p = params.libp2p; this.protocol = new StoreCore(params.libp2p); @@ -229,14 +226,6 @@ export class Store implements IStore { } const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0]; - const isTopicSupported = - this.connectionManager.isTopicConfigured(pubsubTopicForQuery); - - if (!isTopicSupported) { - throw new Error( - `Pubsub topic ${pubsubTopicForQuery} has not been configured on this instance.` - ); - } const decodersAsMap = new Map(); decoders.forEach((dec) => { @@ -271,11 +260,9 @@ export class Store implements IStore { pubsubTopic }); - const peer = this.options.peers + return this.options.peers ? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers) : peers[0]; - - return peer; } private async getPeerFromConfigurationOrFirst( diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index e547130927..df6e845e3a 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -20,8 +20,7 @@ import type { IWaku, IWakuEventEmitter, Libp2p, - NetworkConfig, - PubsubTopic + NetworkConfig } from "@waku/interfaces"; import { DefaultNetworkConfig, @@ -67,7 +66,6 @@ export class WakuNode implements IWaku { private readonly healthIndicator: HealthIndicator; public constructor( - pubsubTopics: PubsubTopic[], options: CreateNodeOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled, @@ -90,7 +88,6 @@ export class WakuNode implements IWaku { libp2p, relay: this.relay, events: this.events, - pubsubTopics: pubsubTopics, networkConfig: this.networkConfig, config: options?.connectionManager }); @@ -108,7 +105,6 @@ export class WakuNode implements IWaku { if (protocolsEnabled.store) { this.store = new Store({ libp2p, - connectionManager: this.connectionManager, peerManager: this.peerManager, options: options?.store }); @@ -118,7 +114,6 @@ export class WakuNode implements IWaku { this.lightPush = new LightPush({ libp2p, peerManager: this.peerManager, - connectionManager: this.connectionManager, options: options?.lightPush }); } @@ -126,7 +121,6 @@ export class WakuNode implements IWaku { if (protocolsEnabled.filter) { this.filter = new Filter({ libp2p, - connectionManager: this.connectionManager, peerManager: this.peerManager, options: options.filter }); diff --git a/packages/tests/tests/connection-mananger/network_monitor.spec.ts b/packages/tests/tests/connection-mananger/network_monitor.spec.ts index 9c73207cdb..371660f378 100644 --- a/packages/tests/tests/connection-mananger/network_monitor.spec.ts +++ b/packages/tests/tests/connection-mananger/network_monitor.spec.ts @@ -228,7 +228,7 @@ describe("waku:connection", function () { globalThis.dispatchEvent = undefined; }); - it(`should emit events and trasition isConnected state when has peers or no peers`, async function () { + it(`should emit events and transition isConnected state when has peers or no peers`, async function () { const privateKey1 = await generateKeyPair("secp256k1"); const privateKey2 = await generateKeyPair("secp256k1"); const peerIdPx = peerIdFromPrivateKey(privateKey1); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 6fb522376e..15d0c9b476 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,5 +1,5 @@ -import { LightNode, ProtocolError } from "@waku/interfaces"; -import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { LightNode } from "@waku/interfaces"; +import { createEncoder, utf8ToBytes } from "@waku/sdk"; import { contentTopicToPubsubTopic, contentTopicToShardIndex @@ -201,58 +201,4 @@ describe("Autosharding: Running Nodes", function () { }) ).to.eq(true); }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { clusterId, contentTopics: [ContentTopic] }, - { lightpush: true, filter: true }, - false, - numServiceNodes, - true - ); - - // use a content topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); - - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - if (successes.length > 0 || failures?.length === 0) { - throw new Error("The request should've thrown an error"); - } - - const errors = failures?.map((failure) => failure.error); - expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); - }); - - it("start node with empty content topic", async function () { - try { - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [] - } - }); - throw new Error( - "Starting the node with no content topic should've thrown an error" - ); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - "Invalid content topics configuration: please provide at least one content topic" - ) - ) { - throw err; - } - } - }); }); diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 776c106bf6..aa55abac45 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -1,5 +1,5 @@ -import { LightNode, ProtocolError, SingleShardInfo } from "@waku/interfaces"; -import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { LightNode, SingleShardInfo } from "@waku/interfaces"; +import { createEncoder, utf8ToBytes } from "@waku/sdk"; import { shardInfoToPubsubTopics, singleShardInfosToShardInfo, @@ -197,52 +197,5 @@ describe("Static Sharding: Running Nodes", function () { }) ).to.eq(true); }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - this.timeout(15_000); - - // use a pubsub topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId, - shard: 4 - } - }); - - const request = await waku?.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - if ( - (request?.successes.length || 0) > 0 || - request?.failures?.length === 0 - ) { - throw new Error("The request should've thrown an error"); - } - - const errors = request?.failures?.map((failure) => failure.error); - expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); - }); - - it("start node with empty shard should fail", async function () { - try { - waku = await createLightNode({ - networkConfig: { clusterId: clusterId, shards: [] } - }); - throw new Error( - "Starting the node with no shard should've thrown an error" - ); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - "Invalid shards configuration: please provide at least one shard" - ) - ) { - throw err; - } - } - }); }); }); diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts index e39ce3265b..89f80d5cdf 100644 --- a/packages/tests/tests/store/error_handling.node.spec.ts +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -1,4 +1,3 @@ -import { createDecoder } from "@waku/core"; import { IMessage, type LightNode } from "@waku/interfaces"; import { determinePubsubTopic } from "@waku/utils"; import { expect } from "chai"; @@ -13,7 +12,6 @@ import { import { processQueriedMessages, runStoreNodes, - TestContentTopic1, TestDecoder, TestDecoder2, TestShardInfo @@ -32,28 +30,6 @@ describe("Waku Store, error handling", function () { await tearDownNodes(nwaku, waku); }); - it("Query Generator, Wrong PubsubTopic", async function () { - const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); - - try { - for await (const msgPromises of waku.store.queryGenerator([ - wrongDecoder - ])) { - void msgPromises; - } - throw new Error("QueryGenerator was successful but was expected to fail"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` - ) - ) { - throw err; - } - } - }); - it("Query Generator, Multiple PubsubTopics", async function () { try { for await (const msgPromises of waku.store.queryGenerator([ @@ -101,23 +77,6 @@ describe("Waku Store, error handling", function () { expect(messages?.length).eq(0); }); - it("Query with Ordered Callback, Wrong PubsubTopic", async function () { - const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); - try { - await waku.store.queryWithOrderedCallback([wrongDecoder], async () => {}); - throw new Error("QueryGenerator was successful but was expected to fail"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` - ) - ) { - throw err; - } - } - }); - it("Query with Ordered Callback, Multiple PubsubTopics", async function () { try { await waku.store.queryWithOrderedCallback( @@ -159,23 +118,6 @@ describe("Waku Store, error handling", function () { expect(messages?.length).eq(0); }); - it("Query with Promise Callback, Wrong PubsubTopic", async function () { - const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); - try { - await waku.store.queryWithPromiseCallback([wrongDecoder], async () => {}); - throw new Error("QueryGenerator was successful but was expected to fail"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` - ) - ) { - throw err; - } - } - }); - it("Query with Promise Callback, Multiple PubsubTopics", async function () { try { await waku.store.queryWithPromiseCallback(