From 188e4bf92851d258f20affbcae2f4f883a78e725 Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Fri, 11 Jul 2025 13:33:45 +1000 Subject: [PATCH 1/2] fix typo --- .../tests/tests/connection-mananger/network_monitor.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tests/connection-mananger/network_monitor.spec.ts b/packages/tests/tests/connection-mananger/network_monitor.spec.ts index 9c73207cdb..371660f378 100644 --- a/packages/tests/tests/connection-mananger/network_monitor.spec.ts +++ b/packages/tests/tests/connection-mananger/network_monitor.spec.ts @@ -228,7 +228,7 @@ describe("waku:connection", function () { globalThis.dispatchEvent = undefined; }); - it(`should emit events and trasition isConnected state when has peers or no peers`, async function () { + it(`should emit events and transition isConnected state when has peers or no peers`, async function () { const privateKey1 = await generateKeyPair("secp256k1"); const privateKey2 = await generateKeyPair("secp256k1"); const peerIdPx = peerIdFromPrivateKey(privateKey1); From 6d55af947e8e57bf0d33fc6a5a67f61594e83ff1 Mon Sep 17 00:00:00 2001 From: fryorcraken Date: Fri, 11 Jul 2025 12:55:02 +1000 Subject: [PATCH 2/2] fix!: remove node level pubsub topic concept For an edge node, there is no such thing as a "pubsub topic configuration". An edge node should be able to operate for any possible shard, and it is a per-protocol matter (eg send message with light push). A relay node do subscribe to shards, but in this case, even metadata protocol does not need to advertise them, this is already handled by gossipsub. Only service node should advertise their shards via metadata protocol, which is out of scope for js-waku. # Conflicts: # packages/interfaces/src/connection_manager.ts --- .../connection_manager.spec.ts | 25 -------- .../connection_manager/connection_manager.ts | 11 +--- packages/core/src/lib/metadata/metadata.ts | 25 ++++---- packages/interfaces/src/connection_manager.ts | 18 ------ packages/interfaces/src/metadata.ts | 6 +- packages/interfaces/src/sharding.ts | 3 +- packages/relay/src/create.ts | 10 +++- packages/sdk/src/create/create.ts | 4 +- packages/sdk/src/create/libp2p.ts | 40 ++++--------- packages/sdk/src/filter/filter.spec.ts | 39 +------------ packages/sdk/src/filter/filter.ts | 17 +----- packages/sdk/src/filter/types.ts | 8 +-- .../sdk/src/light_push/light_push.spec.ts | 25 +------- packages/sdk/src/light_push/light_push.ts | 18 +----- packages/sdk/src/store/store.ts | 17 +----- packages/sdk/src/waku/waku.ts | 8 +-- .../tests/sharding/auto_sharding.spec.ts | 58 +------------------ .../tests/sharding/static_sharding.spec.ts | 51 +--------------- .../tests/store/error_handling.node.spec.ts | 58 ------------------- 19 files changed, 56 insertions(+), 385 deletions(-) diff --git a/packages/core/src/lib/connection_manager/connection_manager.spec.ts b/packages/core/src/lib/connection_manager/connection_manager.spec.ts index 40c33a921b..25235cba12 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.spec.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.spec.ts @@ -157,7 +157,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); @@ -168,7 +167,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -180,7 +178,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); @@ -197,7 +194,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, config: customConfig }); @@ -209,7 +205,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -224,7 +219,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -255,7 +249,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig, relay }); @@ -287,7 +280,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -316,7 +308,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -367,7 +358,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -409,7 +399,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); @@ -540,22 +529,9 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); - - it("should return true when topic is configured", () => { - const result = connectionManager.isTopicConfigured("/waku/2/rs/1/0"); - - expect(result).to.be.true; - }); - - it("should return false when topic is not configured", () => { - const result = connectionManager.isTopicConfigured("/waku/2/rs/1/99"); - - expect(result).to.be.false; - }); }); describe("isPeerOnTopic", () => { @@ -563,7 +539,6 @@ describe("ConnectionManager", () => { connectionManager = new ConnectionManager({ libp2p, events, - pubsubTopics, networkConfig }); }); diff --git a/packages/core/src/lib/connection_manager/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts index 4dca6d9164..952ab32d53 100644 --- a/packages/core/src/lib/connection_manager/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -5,8 +5,7 @@ import { IConnectionManager, IRelay, IWakuEventEmitter, - NetworkConfig, - PubsubTopic + NetworkConfig } from "@waku/interfaces"; import { Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -33,15 +32,12 @@ const DEFAULT_DIAL_COOLDOWN_SEC = 10; type ConnectionManagerConstructorOptions = { libp2p: Libp2p; events: IWakuEventEmitter; - pubsubTopics: PubsubTopic[]; networkConfig: NetworkConfig; relay?: IRelay; config?: Partial; }; export class ConnectionManager implements IConnectionManager { - private readonly pubsubTopics: PubsubTopic[]; - private readonly keepAliveManager: KeepAliveManager; private readonly discoveryDialer: DiscoveryDialer; private readonly dialer: Dialer; @@ -54,7 +50,6 @@ export class ConnectionManager implements IConnectionManager { public constructor(options: ConnectionManagerConstructorOptions) { this.libp2p = options.libp2p; - this.pubsubTopics = options.pubsubTopics; this.options = { maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, @@ -189,10 +184,6 @@ export class ConnectionManager implements IConnectionManager { return result; } - public isTopicConfigured(pubsubTopic: PubsubTopic): boolean { - return this.pubsubTopics.includes(pubsubTopic); - } - public async hasShardInfo(peerId: PeerId): Promise { return this.shardReader.hasShardInfo(peerId); } diff --git a/packages/core/src/lib/metadata/metadata.ts b/packages/core/src/lib/metadata/metadata.ts index 10e4b7616c..ac4707e575 100644 --- a/packages/core/src/lib/metadata/metadata.ts +++ b/packages/core/src/lib/metadata/metadata.ts @@ -1,16 +1,16 @@ import type { PeerId } from "@libp2p/interface"; import { IncomingStreamData } from "@libp2p/interface"; import { + type ClusterId, type IMetadata, type Libp2pComponents, type MetadataQueryResult, type PeerIdStr, ProtocolError, - PubsubTopic, type ShardInfo } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; -import { encodeRelayShard, Logger, pubsubTopicsToShardInfo } from "@waku/utils"; +import { encodeRelayShard, Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -30,7 +30,7 @@ class Metadata implements IMetadata { public readonly multicodec = MetadataCodec; public constructor( - public pubsubTopics: PubsubTopic[], + public clusterId: ClusterId, libp2p: Libp2pComponents ) { this.streamManager = new StreamManager(MetadataCodec, libp2p); @@ -44,9 +44,10 @@ class Metadata implements IMetadata { * Make a metadata query to a peer */ public async query(peerId: PeerId): Promise { - const request = proto_metadata.WakuMetadataRequest.encode( - pubsubTopicsToShardInfo(this.pubsubTopics) - ); + const request = proto_metadata.WakuMetadataRequest.encode({ + clusterId: this.clusterId, + shards: [] // Only services node need to provide shards + }); const peer = await this.libp2pComponents.peerStore.get(peerId); if (!peer) { @@ -112,9 +113,10 @@ class Metadata implements IMetadata { private async onRequest(streamData: IncomingStreamData): Promise { try { const { stream, connection } = streamData; - const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode( - pubsubTopicsToShardInfo(this.pubsubTopics) - ); + const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode({ + clusterId: this.clusterId, + shards: [] // Only service nodes need to provide shards + }); const encodedResponse = await pipe( [encodedShardInfo], @@ -178,8 +180,7 @@ class Metadata implements IMetadata { } export function wakuMetadata( - pubsubTopics: PubsubTopic[] + clusterId: ClusterId ): (components: Libp2pComponents) => IMetadata { - return (components: Libp2pComponents) => - new Metadata(pubsubTopics, components); + return (components: Libp2pComponents) => new Metadata(clusterId, components); } diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 35640378df..301ca94155 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -1,8 +1,6 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; -import type { PubsubTopic } from "./misc.js"; - // Peer tags export enum Tags { BOOTSTRAP = "bootstrap", @@ -156,22 +154,6 @@ export interface IConnectionManager { */ getConnectedPeers(codec?: string): Promise; - /** - * Checks if a specific pubsub topic is configured in the connection manager. - * - * @param pubsubTopic - The pubsub topic to check - * @returns True if the topic is configured, false otherwise - * - * @example - * ```typescript - * const isConfigured = connectionManager.isTopicConfigured("/waku/2/default-waku/proto"); - * if (isConfigured) { - * console.log("Topic is configured"); - * } - * ``` - */ - isTopicConfigured(pubsubTopic: PubsubTopic): boolean; - /** * Checks if a peer has shard info. * diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index 91261ead37..32ce59c2e6 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,13 +1,13 @@ import type { PeerId } from "@libp2p/interface"; -import { PubsubTopic, ThisOrThat } from "./misc.js"; -import type { ShardInfo } from "./sharding.js"; +import { ThisOrThat } from "./misc.js"; +import type { ClusterId, ShardInfo } from "./sharding.js"; export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>; export interface IMetadata { readonly multicodec: string; - readonly pubsubTopics: PubsubTopic[]; + readonly clusterId: ClusterId; confirmOrAttemptHandshake(peerId: PeerId): Promise; query(peerId: PeerId): Promise; } diff --git a/packages/interfaces/src/sharding.ts b/packages/interfaces/src/sharding.ts index 110bf0e74f..c2364b6396 100644 --- a/packages/interfaces/src/sharding.ts +++ b/packages/interfaces/src/sharding.ts @@ -4,9 +4,10 @@ export type ShardInfo = { }; export type ContentTopicInfo = { - clusterId?: number; + clusterId?: number; // TODO: This should be mandatory on a network config contentTopics: string[]; }; export type StaticSharding = ShardInfo; export type AutoSharding = ContentTopicInfo; +export type ClusterId = number; diff --git a/packages/relay/src/create.ts b/packages/relay/src/create.ts index 49702ad356..0d330dffd6 100644 --- a/packages/relay/src/create.ts +++ b/packages/relay/src/create.ts @@ -1,5 +1,7 @@ import type { CreateNodeOptions, RelayNode } from "@waku/interfaces"; +import { DefaultNetworkConfig } from "@waku/interfaces"; import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk"; +import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils"; import { Relay, RelayCreateOptions, wakuGossipSub } from "./relay.js"; @@ -26,14 +28,16 @@ export async function createRelayNode( } }; - const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); + const libp2p = await createLibp2pAndUpdateOptions(options); + const pubsubTopics = derivePubsubTopicsFromNetworkConfig( + options.networkConfig ?? DefaultNetworkConfig + ); const relay = new Relay({ - pubsubTopics: pubsubTopics || [], + pubsubTopics, libp2p }); const node = new WakuNode( - pubsubTopics, options as CreateNodeOptions, libp2p, {}, diff --git a/packages/sdk/src/create/create.ts b/packages/sdk/src/create/create.ts index 3f11e56048..d3838288c7 100644 --- a/packages/sdk/src/create/create.ts +++ b/packages/sdk/src/create/create.ts @@ -12,9 +12,9 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js"; export async function createLightNode( options: CreateNodeOptions = {} ): Promise { - const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); + const libp2p = await createLibp2pAndUpdateOptions(options); - const node = new WakuNode(pubsubTopics, options, libp2p, { + const node = new WakuNode(options, libp2p, { store: true, lightpush: true, filter: true diff --git a/packages/sdk/src/create/libp2p.ts b/packages/sdk/src/create/libp2p.ts index 8f91d3660b..958605ddce 100644 --- a/packages/sdk/src/create/libp2p.ts +++ b/packages/sdk/src/create/libp2p.ts @@ -7,32 +7,27 @@ import { webSockets } from "@libp2p/websockets"; import { all as filterAll, wss } from "@libp2p/websockets/filters"; import { wakuMetadata } from "@waku/core"; import { + type ClusterId, type CreateLibp2pOptions, type CreateNodeOptions, + DEFAULT_CLUSTER_ID, DefaultNetworkConfig, - type IMetadata, - type Libp2p, - type Libp2pComponents, - PubsubTopic + type Libp2p } from "@waku/interfaces"; -import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; +import { Logger } from "@waku/utils"; import { createLibp2p } from "libp2p"; import { isTestEnvironment } from "../env.js"; import { getPeerDiscoveries } from "./discovery.js"; -type MetadataService = { - metadata?: (components: Libp2pComponents) => IMetadata; -}; - const log = new Logger("sdk:create"); const DefaultUserAgent = "js-waku"; const DefaultPingMaxInboundStreams = 10; export async function defaultLibp2p( - pubsubTopics: PubsubTopic[], + clusterId: ClusterId, options?: Partial, userAgent?: string ): Promise { @@ -49,10 +44,6 @@ export async function defaultLibp2p( /* eslint-enable no-console */ } - const metadataService: MetadataService = pubsubTopics - ? { metadata: wakuMetadata(pubsubTopics) } - : {}; - const filter = options?.filterMultiaddrs === false || isTestEnvironment() ? filterAll @@ -71,7 +62,7 @@ export async function defaultLibp2p( maxInboundStreams: options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams }), - ...metadataService, + metadata: wakuMetadata(clusterId), ...options?.services } }) as any as Libp2p; // TODO: make libp2p include it; @@ -85,12 +76,11 @@ const DEFAULT_DISCOVERIES_ENABLED = { export async function createLibp2pAndUpdateOptions( options: CreateNodeOptions -): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { - const { networkConfig } = options; - const pubsubTopics = derivePubsubTopicsFromNetworkConfig( - networkConfig ?? DefaultNetworkConfig - ); - log.info("Creating Waku node with pubsub topics", pubsubTopics); +): Promise { + const networkConfig = options.networkConfig ?? DefaultNetworkConfig; + const clusterId = networkConfig.clusterId ?? DEFAULT_CLUSTER_ID; + + log.info("Creating Waku node with cluster id: ", clusterId); const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; @@ -117,11 +107,5 @@ export async function createLibp2pAndUpdateOptions( libp2pOptions.peerDiscovery = peerDiscovery; - const libp2p = await defaultLibp2p( - pubsubTopics, - libp2pOptions, - options?.userAgent - ); - - return { libp2p, pubsubTopics }; + return defaultLibp2p(clusterId, libp2pOptions, options?.userAgent); } diff --git a/packages/sdk/src/filter/filter.spec.ts b/packages/sdk/src/filter/filter.spec.ts index 440303e56d..2010ba576c 100644 --- a/packages/sdk/src/filter/filter.spec.ts +++ b/packages/sdk/src/filter/filter.spec.ts @@ -37,22 +37,6 @@ describe("Filter SDK", () => { sinon.restore(); }); - it("should throw error when subscribing with unsupported pubsub topic", async () => { - const unsupportedDecoder = createDecoder( - CONTENT_TOPIC, - "/unsupported/topic" - ); - - try { - await filter.subscribe(unsupportedDecoder, callback); - expect.fail("Should have thrown an error"); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic /unsupported/topic has not been configured on this instance." - ); - } - }); - it("should successfully subscribe to supported pubsub topic", async () => { const addStub = sinon.stub(Subscription.prototype, "add").resolves(true); const startStub = sinon.stub(Subscription.prototype, "start"); @@ -64,22 +48,6 @@ describe("Filter SDK", () => { expect(startStub.calledOnce).to.be.true; }); - it("should throw error when unsubscribing with unsupported pubsub topic", async () => { - const unsupportedDecoder = createDecoder( - CONTENT_TOPIC, - "/unsupported/topic" - ); - - try { - await filter.unsubscribe(unsupportedDecoder); - expect.fail("Should have thrown an error"); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic /unsupported/topic has not been configured on this instance." - ); - } - }); - it("should return false when unsubscribing from a non-existing subscription", async () => { const result = await filter.unsubscribe(decoder); expect(result).to.be.false; @@ -183,9 +151,9 @@ type MockFilterOptions = { }; function mockFilter(options: MockFilterOptions): Filter { - const filter = new Filter({ + // we're not actually testing FilterCore functionality here + return new Filter({ libp2p: options.libp2p, - connectionManager: options.connectionManager || mockConnectionManager(), peerManager: options.peerManager || mockPeerManager(), options: { numPeersToUse: 2, @@ -193,9 +161,6 @@ function mockFilter(options: MockFilterOptions): Filter { keepAliveIntervalMs: 60_000 } }); - - // we're not actually testing FilterCore functionality here - return filter; } function createMockMessage(contentTopic: string): IProtoMessage { diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 3fc144ff39..43895fab7c 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -1,4 +1,4 @@ -import { ConnectionManager, FilterCore } from "@waku/core"; +import { FilterCore } from "@waku/core"; import type { Callback, FilterProtocolOptions, @@ -21,7 +21,6 @@ type PubsubTopic = string; export class Filter implements IFilter { private readonly protocol: FilterCore; private readonly peerManager: PeerManager; - private readonly connectionManager: ConnectionManager; private readonly config: FilterProtocolOptions; private subscriptions = new Map(); @@ -35,7 +34,6 @@ export class Filter implements IFilter { }; this.peerManager = params.peerManager; - this.connectionManager = params.connectionManager; this.protocol = new FilterCore( this.onIncomingMessage.bind(this), @@ -75,7 +73,6 @@ export class Filter implements IFilter { ); this.throwIfTopicNotSame(pubsubTopics); - this.throwIfTopicNotSupported(singlePubsubTopic); let subscription = this.subscriptions.get(singlePubsubTopic); if (!subscription) { @@ -117,7 +114,6 @@ export class Filter implements IFilter { ); this.throwIfTopicNotSame(pubsubTopics); - this.throwIfTopicNotSupported(singlePubsubTopic); const subscription = this.subscriptions.get(singlePubsubTopic); if (!subscription) { @@ -170,15 +166,4 @@ export class Filter implements IFilter { ); } } - - private throwIfTopicNotSupported(pubsubTopic: string): void { - const supportedPubsubTopic = - this.connectionManager.isTopicConfigured(pubsubTopic); - - if (!supportedPubsubTopic) { - throw Error( - `Pubsub topic ${pubsubTopic} has not been configured on this instance.` - ); - } - } } diff --git a/packages/sdk/src/filter/types.ts b/packages/sdk/src/filter/types.ts index 34842a1b22..44326728d1 100644 --- a/packages/sdk/src/filter/types.ts +++ b/packages/sdk/src/filter/types.ts @@ -1,15 +1,13 @@ -import { ConnectionManager } from "@waku/core"; -import { FilterCore } from "@waku/core"; +import type { FilterCore } from "@waku/core"; import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces"; -import { WakuMessage } from "@waku/proto"; +import type { WakuMessage } from "@waku/proto"; -import { PeerManager } from "../peer_manager/index.js"; +import type { PeerManager } from "../peer_manager/index.js"; export type FilterConstructorParams = { options?: Partial; libp2p: Libp2p; peerManager: PeerManager; - connectionManager: ConnectionManager; }; export type SubscriptionEvents = { diff --git a/packages/sdk/src/light_push/light_push.spec.ts b/packages/sdk/src/light_push/light_push.spec.ts index 0d44a430f2..114f0e413c 100644 --- a/packages/sdk/src/light_push/light_push.spec.ts +++ b/packages/sdk/src/light_push/light_push.spec.ts @@ -1,10 +1,5 @@ import { Peer, PeerId } from "@libp2p/interface"; -import { - ConnectionManager, - createEncoder, - Encoder, - LightPushCodec -} from "@waku/core"; +import { createEncoder, Encoder, LightPushCodec } from "@waku/core"; import { Libp2p, ProtocolError } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -14,7 +9,6 @@ import { PeerManager } from "../peer_manager/index.js"; import { LightPush } from "./light_push.js"; -const PUBSUB_TOPIC = "/waku/2/rs/1/4"; const CONTENT_TOPIC = "/test/1/waku-light-push/utf8"; describe("LightPush SDK", () => { @@ -28,19 +22,6 @@ describe("LightPush SDK", () => { lightPush = mockLightPush({ libp2p }); }); - it("should fail to send if pubsub topics are misconfigured", async () => { - lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] }); - - const result = await lightPush.send(encoder, { - payload: utf8ToBytes("test") - }); - const failures = result.failures ?? []; - - expect(failures.length).to.be.eq(1); - expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED)) - .to.be.true; - }); - it("should fail to send if no connected peers found", async () => { const result = await lightPush.send(encoder, { payload: utf8ToBytes("test") @@ -168,10 +149,6 @@ type MockLightPushOptions = { function mockLightPush(options: MockLightPushOptions): LightPush { const lightPush = new LightPush({ - connectionManager: { - isTopicConfigured: (topic: string) => - (options.pubsubTopics || [PUBSUB_TOPIC]).includes(topic) - } as unknown as ConnectionManager, peerManager: { getPeers: () => options.libp2p diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 18719ec210..5789f351bd 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, LightPushCore } from "@waku/core"; +import { LightPushCore } from "@waku/core"; import { type CoreProtocolResult, Failure, @@ -8,7 +8,7 @@ import { type IMessage, type ISendOptions, type Libp2p, - LightPushProtocolOptions, + type LightPushProtocolOptions, ProtocolError, Protocols, SDKProtocolResult @@ -30,7 +30,6 @@ const DEFAULT_SEND_OPTIONS: LightPushProtocolOptions = { }; type LightPushConstructorParams = { - connectionManager: ConnectionManager; peerManager: PeerManager; libp2p: Libp2p; options?: Partial; @@ -40,7 +39,6 @@ export class LightPush implements ILightPush { private readonly config: LightPushProtocolOptions; private readonly retryManager: RetryManager; private readonly peerManager: PeerManager; - private readonly connectionManager: ConnectionManager; private readonly protocol: LightPushCore; public constructor(params: LightPushConstructorParams) { @@ -50,7 +48,6 @@ export class LightPush implements ILightPush { } as LightPushProtocolOptions; this.peerManager = params.peerManager; - this.connectionManager = params.connectionManager; this.protocol = new LightPushCore(params.libp2p); this.retryManager = new RetryManager({ peerManager: params.peerManager, @@ -84,17 +81,6 @@ export class LightPush implements ILightPush { log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); - if (!this.connectionManager.isTopicConfigured(pubsubTopic)) { - return { - successes: [], - failures: [ - { - error: ProtocolError.TOPIC_NOT_CONFIGURED - } - ] - }; - } - const peerIds = await this.peerManager.getPeers({ protocol: Protocols.LightPush, pubsubTopic: encoder.pubsubTopic diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 874244a35a..126cab8678 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface"; import { peerIdFromString } from "@libp2p/peer-id"; import { multiaddr } from "@multiformats/multiaddr"; -import { ConnectionManager, messageHash, StoreCore } from "@waku/core"; +import { messageHash, StoreCore } from "@waku/core"; import { IDecodedMessage, IDecoder, @@ -21,7 +21,6 @@ const log = new Logger("waku:store:sdk"); type StoreConstructorParams = { libp2p: Libp2p; peerManager: PeerManager; - connectionManager: ConnectionManager; options?: Partial; }; @@ -33,13 +32,11 @@ export class Store implements IStore { private readonly options: Partial; private readonly libp2p: Libp2p; private readonly peerManager: PeerManager; - private readonly connectionManager: ConnectionManager; private readonly protocol: StoreCore; public constructor(params: StoreConstructorParams) { this.options = params.options || {}; this.peerManager = params.peerManager; - this.connectionManager = params.connectionManager; this.libp2p = params.libp2p; this.protocol = new StoreCore(params.libp2p); @@ -229,14 +226,6 @@ export class Store implements IStore { } const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0]; - const isTopicSupported = - this.connectionManager.isTopicConfigured(pubsubTopicForQuery); - - if (!isTopicSupported) { - throw new Error( - `Pubsub topic ${pubsubTopicForQuery} has not been configured on this instance.` - ); - } const decodersAsMap = new Map(); decoders.forEach((dec) => { @@ -271,11 +260,9 @@ export class Store implements IStore { pubsubTopic }); - const peer = this.options.peers + return this.options.peers ? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers) : peers[0]; - - return peer; } private async getPeerFromConfigurationOrFirst( diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index e547130927..df6e845e3a 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -20,8 +20,7 @@ import type { IWaku, IWakuEventEmitter, Libp2p, - NetworkConfig, - PubsubTopic + NetworkConfig } from "@waku/interfaces"; import { DefaultNetworkConfig, @@ -67,7 +66,6 @@ export class WakuNode implements IWaku { private readonly healthIndicator: HealthIndicator; public constructor( - pubsubTopics: PubsubTopic[], options: CreateNodeOptions, libp2p: Libp2p, protocolsEnabled: ProtocolsEnabled, @@ -90,7 +88,6 @@ export class WakuNode implements IWaku { libp2p, relay: this.relay, events: this.events, - pubsubTopics: pubsubTopics, networkConfig: this.networkConfig, config: options?.connectionManager }); @@ -108,7 +105,6 @@ export class WakuNode implements IWaku { if (protocolsEnabled.store) { this.store = new Store({ libp2p, - connectionManager: this.connectionManager, peerManager: this.peerManager, options: options?.store }); @@ -118,7 +114,6 @@ export class WakuNode implements IWaku { this.lightPush = new LightPush({ libp2p, peerManager: this.peerManager, - connectionManager: this.connectionManager, options: options?.lightPush }); } @@ -126,7 +121,6 @@ export class WakuNode implements IWaku { if (protocolsEnabled.filter) { this.filter = new Filter({ libp2p, - connectionManager: this.connectionManager, peerManager: this.peerManager, options: options.filter }); diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 6fb522376e..15d0c9b476 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,5 +1,5 @@ -import { LightNode, ProtocolError } from "@waku/interfaces"; -import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { LightNode } from "@waku/interfaces"; +import { createEncoder, utf8ToBytes } from "@waku/sdk"; import { contentTopicToPubsubTopic, contentTopicToShardIndex @@ -201,58 +201,4 @@ describe("Autosharding: Running Nodes", function () { }) ).to.eq(true); }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - { clusterId, contentTopics: [ContentTopic] }, - { lightpush: true, filter: true }, - false, - numServiceNodes, - true - ); - - // use a content topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); - - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - if (successes.length > 0 || failures?.length === 0) { - throw new Error("The request should've thrown an error"); - } - - const errors = failures?.map((failure) => failure.error); - expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); - }); - - it("start node with empty content topic", async function () { - try { - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [] - } - }); - throw new Error( - "Starting the node with no content topic should've thrown an error" - ); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - "Invalid content topics configuration: please provide at least one content topic" - ) - ) { - throw err; - } - } - }); }); diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 776c106bf6..aa55abac45 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -1,5 +1,5 @@ -import { LightNode, ProtocolError, SingleShardInfo } from "@waku/interfaces"; -import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; +import { LightNode, SingleShardInfo } from "@waku/interfaces"; +import { createEncoder, utf8ToBytes } from "@waku/sdk"; import { shardInfoToPubsubTopics, singleShardInfosToShardInfo, @@ -197,52 +197,5 @@ describe("Static Sharding: Running Nodes", function () { }) ).to.eq(true); }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - this.timeout(15_000); - - // use a pubsub topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId, - shard: 4 - } - }); - - const request = await waku?.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - if ( - (request?.successes.length || 0) > 0 || - request?.failures?.length === 0 - ) { - throw new Error("The request should've thrown an error"); - } - - const errors = request?.failures?.map((failure) => failure.error); - expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); - }); - - it("start node with empty shard should fail", async function () { - try { - waku = await createLightNode({ - networkConfig: { clusterId: clusterId, shards: [] } - }); - throw new Error( - "Starting the node with no shard should've thrown an error" - ); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - "Invalid shards configuration: please provide at least one shard" - ) - ) { - throw err; - } - } - }); }); }); diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts index e39ce3265b..89f80d5cdf 100644 --- a/packages/tests/tests/store/error_handling.node.spec.ts +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -1,4 +1,3 @@ -import { createDecoder } from "@waku/core"; import { IMessage, type LightNode } from "@waku/interfaces"; import { determinePubsubTopic } from "@waku/utils"; import { expect } from "chai"; @@ -13,7 +12,6 @@ import { import { processQueriedMessages, runStoreNodes, - TestContentTopic1, TestDecoder, TestDecoder2, TestShardInfo @@ -32,28 +30,6 @@ describe("Waku Store, error handling", function () { await tearDownNodes(nwaku, waku); }); - it("Query Generator, Wrong PubsubTopic", async function () { - const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); - - try { - for await (const msgPromises of waku.store.queryGenerator([ - wrongDecoder - ])) { - void msgPromises; - } - throw new Error("QueryGenerator was successful but was expected to fail"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` - ) - ) { - throw err; - } - } - }); - it("Query Generator, Multiple PubsubTopics", async function () { try { for await (const msgPromises of waku.store.queryGenerator([ @@ -101,23 +77,6 @@ describe("Waku Store, error handling", function () { expect(messages?.length).eq(0); }); - it("Query with Ordered Callback, Wrong PubsubTopic", async function () { - const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); - try { - await waku.store.queryWithOrderedCallback([wrongDecoder], async () => {}); - throw new Error("QueryGenerator was successful but was expected to fail"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` - ) - ) { - throw err; - } - } - }); - it("Query with Ordered Callback, Multiple PubsubTopics", async function () { try { await waku.store.queryWithOrderedCallback( @@ -159,23 +118,6 @@ describe("Waku Store, error handling", function () { expect(messages?.length).eq(0); }); - it("Query with Promise Callback, Wrong PubsubTopic", async function () { - const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); - try { - await waku.store.queryWithPromiseCallback([wrongDecoder], async () => {}); - throw new Error("QueryGenerator was successful but was expected to fail"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` - ) - ) { - throw err; - } - } - }); - it("Query with Promise Callback, Multiple PubsubTopics", async function () { try { await waku.store.queryWithPromiseCallback(