fix!: remove node level pubsub topic concept

For an edge node, there is no such thing as a "pubsub topic configuration". An edge node should be able to operate for any possible shard, and it is a per-protocol matter (eg send message with light push).

A relay node do subscribe to shards, but in this case, even metadata protocol does not need to advertise them, this is already handled by gossipsub.

Only service node should advertise their shards via metadata protocol, which is out of scope for js-waku.

# Conflicts:
#	packages/interfaces/src/connection_manager.ts
This commit is contained in:
fryorcraken 2025-07-11 12:55:02 +10:00
parent 188e4bf928
commit 6d55af947e
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
19 changed files with 56 additions and 385 deletions

View File

@ -157,7 +157,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
@ -168,7 +167,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig, networkConfig,
relay relay
}); });
@ -180,7 +178,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
@ -197,7 +194,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig, networkConfig,
config: customConfig config: customConfig
}); });
@ -209,7 +205,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig, networkConfig,
relay relay
}); });
@ -224,7 +219,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig, networkConfig,
relay relay
}); });
@ -255,7 +249,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig, networkConfig,
relay relay
}); });
@ -287,7 +280,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
}); });
@ -316,7 +308,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
}); });
@ -367,7 +358,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
}); });
@ -409,7 +399,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
}); });
@ -540,22 +529,9 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
}); });
it("should return true when topic is configured", () => {
const result = connectionManager.isTopicConfigured("/waku/2/rs/1/0");
expect(result).to.be.true;
});
it("should return false when topic is not configured", () => {
const result = connectionManager.isTopicConfigured("/waku/2/rs/1/99");
expect(result).to.be.false;
});
}); });
describe("isPeerOnTopic", () => { describe("isPeerOnTopic", () => {
@ -563,7 +539,6 @@ describe("ConnectionManager", () => {
connectionManager = new ConnectionManager({ connectionManager = new ConnectionManager({
libp2p, libp2p,
events, events,
pubsubTopics,
networkConfig networkConfig
}); });
}); });

View File

@ -5,8 +5,7 @@ import {
IConnectionManager, IConnectionManager,
IRelay, IRelay,
IWakuEventEmitter, IWakuEventEmitter,
NetworkConfig, NetworkConfig
PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { Libp2p } from "@waku/interfaces"; import { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
@ -33,15 +32,12 @@ const DEFAULT_DIAL_COOLDOWN_SEC = 10;
type ConnectionManagerConstructorOptions = { type ConnectionManagerConstructorOptions = {
libp2p: Libp2p; libp2p: Libp2p;
events: IWakuEventEmitter; events: IWakuEventEmitter;
pubsubTopics: PubsubTopic[];
networkConfig: NetworkConfig; networkConfig: NetworkConfig;
relay?: IRelay; relay?: IRelay;
config?: Partial<ConnectionManagerOptions>; config?: Partial<ConnectionManagerOptions>;
}; };
export class ConnectionManager implements IConnectionManager { export class ConnectionManager implements IConnectionManager {
private readonly pubsubTopics: PubsubTopic[];
private readonly keepAliveManager: KeepAliveManager; private readonly keepAliveManager: KeepAliveManager;
private readonly discoveryDialer: DiscoveryDialer; private readonly discoveryDialer: DiscoveryDialer;
private readonly dialer: Dialer; private readonly dialer: Dialer;
@ -54,7 +50,6 @@ export class ConnectionManager implements IConnectionManager {
public constructor(options: ConnectionManagerConstructorOptions) { public constructor(options: ConnectionManagerConstructorOptions) {
this.libp2p = options.libp2p; this.libp2p = options.libp2p;
this.pubsubTopics = options.pubsubTopics;
this.options = { this.options = {
maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, maxBootstrapPeers: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
@ -189,10 +184,6 @@ export class ConnectionManager implements IConnectionManager {
return result; return result;
} }
public isTopicConfigured(pubsubTopic: PubsubTopic): boolean {
return this.pubsubTopics.includes(pubsubTopic);
}
public async hasShardInfo(peerId: PeerId): Promise<boolean> { public async hasShardInfo(peerId: PeerId): Promise<boolean> {
return this.shardReader.hasShardInfo(peerId); return this.shardReader.hasShardInfo(peerId);
} }

View File

@ -1,16 +1,16 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { IncomingStreamData } from "@libp2p/interface"; import { IncomingStreamData } from "@libp2p/interface";
import { import {
type ClusterId,
type IMetadata, type IMetadata,
type Libp2pComponents, type Libp2pComponents,
type MetadataQueryResult, type MetadataQueryResult,
type PeerIdStr, type PeerIdStr,
ProtocolError, ProtocolError,
PubsubTopic,
type ShardInfo type ShardInfo
} from "@waku/interfaces"; } from "@waku/interfaces";
import { proto_metadata } from "@waku/proto"; import { proto_metadata } from "@waku/proto";
import { encodeRelayShard, Logger, pubsubTopicsToShardInfo } from "@waku/utils"; import { encodeRelayShard, Logger } from "@waku/utils";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
@ -30,7 +30,7 @@ class Metadata implements IMetadata {
public readonly multicodec = MetadataCodec; public readonly multicodec = MetadataCodec;
public constructor( public constructor(
public pubsubTopics: PubsubTopic[], public clusterId: ClusterId,
libp2p: Libp2pComponents libp2p: Libp2pComponents
) { ) {
this.streamManager = new StreamManager(MetadataCodec, libp2p); this.streamManager = new StreamManager(MetadataCodec, libp2p);
@ -44,9 +44,10 @@ class Metadata implements IMetadata {
* Make a metadata query to a peer * Make a metadata query to a peer
*/ */
public async query(peerId: PeerId): Promise<MetadataQueryResult> { public async query(peerId: PeerId): Promise<MetadataQueryResult> {
const request = proto_metadata.WakuMetadataRequest.encode( const request = proto_metadata.WakuMetadataRequest.encode({
pubsubTopicsToShardInfo(this.pubsubTopics) clusterId: this.clusterId,
); shards: [] // Only services node need to provide shards
});
const peer = await this.libp2pComponents.peerStore.get(peerId); const peer = await this.libp2pComponents.peerStore.get(peerId);
if (!peer) { if (!peer) {
@ -112,9 +113,10 @@ class Metadata implements IMetadata {
private async onRequest(streamData: IncomingStreamData): Promise<void> { private async onRequest(streamData: IncomingStreamData): Promise<void> {
try { try {
const { stream, connection } = streamData; const { stream, connection } = streamData;
const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode( const encodedShardInfo = proto_metadata.WakuMetadataResponse.encode({
pubsubTopicsToShardInfo(this.pubsubTopics) clusterId: this.clusterId,
); shards: [] // Only service nodes need to provide shards
});
const encodedResponse = await pipe( const encodedResponse = await pipe(
[encodedShardInfo], [encodedShardInfo],
@ -178,8 +180,7 @@ class Metadata implements IMetadata {
} }
export function wakuMetadata( export function wakuMetadata(
pubsubTopics: PubsubTopic[] clusterId: ClusterId
): (components: Libp2pComponents) => IMetadata { ): (components: Libp2pComponents) => IMetadata {
return (components: Libp2pComponents) => return (components: Libp2pComponents) => new Metadata(clusterId, components);
new Metadata(pubsubTopics, components);
} }

View File

@ -1,8 +1,6 @@
import type { Peer, PeerId, Stream } from "@libp2p/interface"; import type { Peer, PeerId, Stream } from "@libp2p/interface";
import type { MultiaddrInput } from "@multiformats/multiaddr"; import type { MultiaddrInput } from "@multiformats/multiaddr";
import type { PubsubTopic } from "./misc.js";
// Peer tags // Peer tags
export enum Tags { export enum Tags {
BOOTSTRAP = "bootstrap", BOOTSTRAP = "bootstrap",
@ -156,22 +154,6 @@ export interface IConnectionManager {
*/ */
getConnectedPeers(codec?: string): Promise<Peer[]>; getConnectedPeers(codec?: string): Promise<Peer[]>;
/**
* Checks if a specific pubsub topic is configured in the connection manager.
*
* @param pubsubTopic - The pubsub topic to check
* @returns True if the topic is configured, false otherwise
*
* @example
* ```typescript
* const isConfigured = connectionManager.isTopicConfigured("/waku/2/default-waku/proto");
* if (isConfigured) {
* console.log("Topic is configured");
* }
* ```
*/
isTopicConfigured(pubsubTopic: PubsubTopic): boolean;
/** /**
* Checks if a peer has shard info. * Checks if a peer has shard info.
* *

View File

@ -1,13 +1,13 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { PubsubTopic, ThisOrThat } from "./misc.js"; import { ThisOrThat } from "./misc.js";
import type { ShardInfo } from "./sharding.js"; import type { ClusterId, ShardInfo } from "./sharding.js";
export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>; export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>;
export interface IMetadata { export interface IMetadata {
readonly multicodec: string; readonly multicodec: string;
readonly pubsubTopics: PubsubTopic[]; readonly clusterId: ClusterId;
confirmOrAttemptHandshake(peerId: PeerId): Promise<MetadataQueryResult>; confirmOrAttemptHandshake(peerId: PeerId): Promise<MetadataQueryResult>;
query(peerId: PeerId): Promise<MetadataQueryResult>; query(peerId: PeerId): Promise<MetadataQueryResult>;
} }

View File

@ -4,9 +4,10 @@ export type ShardInfo = {
}; };
export type ContentTopicInfo = { export type ContentTopicInfo = {
clusterId?: number; clusterId?: number; // TODO: This should be mandatory on a network config
contentTopics: string[]; contentTopics: string[];
}; };
export type StaticSharding = ShardInfo; export type StaticSharding = ShardInfo;
export type AutoSharding = ContentTopicInfo; export type AutoSharding = ContentTopicInfo;
export type ClusterId = number;

View File

@ -1,5 +1,7 @@
import type { CreateNodeOptions, RelayNode } from "@waku/interfaces"; import type { CreateNodeOptions, RelayNode } from "@waku/interfaces";
import { DefaultNetworkConfig } from "@waku/interfaces";
import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk"; import { createLibp2pAndUpdateOptions, WakuNode } from "@waku/sdk";
import { derivePubsubTopicsFromNetworkConfig } from "@waku/utils";
import { Relay, RelayCreateOptions, wakuGossipSub } from "./relay.js"; import { Relay, RelayCreateOptions, wakuGossipSub } from "./relay.js";
@ -26,14 +28,16 @@ export async function createRelayNode(
} }
}; };
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); const libp2p = await createLibp2pAndUpdateOptions(options);
const pubsubTopics = derivePubsubTopicsFromNetworkConfig(
options.networkConfig ?? DefaultNetworkConfig
);
const relay = new Relay({ const relay = new Relay({
pubsubTopics: pubsubTopics || [], pubsubTopics,
libp2p libp2p
}); });
const node = new WakuNode( const node = new WakuNode(
pubsubTopics,
options as CreateNodeOptions, options as CreateNodeOptions,
libp2p, libp2p,
{}, {},

View File

@ -12,9 +12,9 @@ import { createLibp2pAndUpdateOptions } from "./libp2p.js";
export async function createLightNode( export async function createLightNode(
options: CreateNodeOptions = {} options: CreateNodeOptions = {}
): Promise<LightNode> { ): Promise<LightNode> {
const { libp2p, pubsubTopics } = await createLibp2pAndUpdateOptions(options); const libp2p = await createLibp2pAndUpdateOptions(options);
const node = new WakuNode(pubsubTopics, options, libp2p, { const node = new WakuNode(options, libp2p, {
store: true, store: true,
lightpush: true, lightpush: true,
filter: true filter: true

View File

@ -7,32 +7,27 @@ import { webSockets } from "@libp2p/websockets";
import { all as filterAll, wss } from "@libp2p/websockets/filters"; import { all as filterAll, wss } from "@libp2p/websockets/filters";
import { wakuMetadata } from "@waku/core"; import { wakuMetadata } from "@waku/core";
import { import {
type ClusterId,
type CreateLibp2pOptions, type CreateLibp2pOptions,
type CreateNodeOptions, type CreateNodeOptions,
DEFAULT_CLUSTER_ID,
DefaultNetworkConfig, DefaultNetworkConfig,
type IMetadata, type Libp2p
type Libp2p,
type Libp2pComponents,
PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import { createLibp2p } from "libp2p"; import { createLibp2p } from "libp2p";
import { isTestEnvironment } from "../env.js"; import { isTestEnvironment } from "../env.js";
import { getPeerDiscoveries } from "./discovery.js"; import { getPeerDiscoveries } from "./discovery.js";
type MetadataService = {
metadata?: (components: Libp2pComponents) => IMetadata;
};
const log = new Logger("sdk:create"); const log = new Logger("sdk:create");
const DefaultUserAgent = "js-waku"; const DefaultUserAgent = "js-waku";
const DefaultPingMaxInboundStreams = 10; const DefaultPingMaxInboundStreams = 10;
export async function defaultLibp2p( export async function defaultLibp2p(
pubsubTopics: PubsubTopic[], clusterId: ClusterId,
options?: Partial<CreateLibp2pOptions>, options?: Partial<CreateLibp2pOptions>,
userAgent?: string userAgent?: string
): Promise<Libp2p> { ): Promise<Libp2p> {
@ -49,10 +44,6 @@ export async function defaultLibp2p(
/* eslint-enable no-console */ /* eslint-enable no-console */
} }
const metadataService: MetadataService = pubsubTopics
? { metadata: wakuMetadata(pubsubTopics) }
: {};
const filter = const filter =
options?.filterMultiaddrs === false || isTestEnvironment() options?.filterMultiaddrs === false || isTestEnvironment()
? filterAll ? filterAll
@ -71,7 +62,7 @@ export async function defaultLibp2p(
maxInboundStreams: maxInboundStreams:
options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams
}), }),
...metadataService, metadata: wakuMetadata(clusterId),
...options?.services ...options?.services
} }
}) as any as Libp2p; // TODO: make libp2p include it; }) as any as Libp2p; // TODO: make libp2p include it;
@ -85,12 +76,11 @@ const DEFAULT_DISCOVERIES_ENABLED = {
export async function createLibp2pAndUpdateOptions( export async function createLibp2pAndUpdateOptions(
options: CreateNodeOptions options: CreateNodeOptions
): Promise<{ libp2p: Libp2p; pubsubTopics: PubsubTopic[] }> { ): Promise<Libp2p> {
const { networkConfig } = options; const networkConfig = options.networkConfig ?? DefaultNetworkConfig;
const pubsubTopics = derivePubsubTopicsFromNetworkConfig( const clusterId = networkConfig.clusterId ?? DEFAULT_CLUSTER_ID;
networkConfig ?? DefaultNetworkConfig
); log.info("Creating Waku node with cluster id: ", clusterId);
log.info("Creating Waku node with pubsub topics", pubsubTopics);
const libp2pOptions = options?.libp2p ?? {}; const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? []; const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
@ -117,11 +107,5 @@ export async function createLibp2pAndUpdateOptions(
libp2pOptions.peerDiscovery = peerDiscovery; libp2pOptions.peerDiscovery = peerDiscovery;
const libp2p = await defaultLibp2p( return defaultLibp2p(clusterId, libp2pOptions, options?.userAgent);
pubsubTopics,
libp2pOptions,
options?.userAgent
);
return { libp2p, pubsubTopics };
} }

View File

@ -37,22 +37,6 @@ describe("Filter SDK", () => {
sinon.restore(); sinon.restore();
}); });
it("should throw error when subscribing with unsupported pubsub topic", async () => {
const unsupportedDecoder = createDecoder(
CONTENT_TOPIC,
"/unsupported/topic"
);
try {
await filter.subscribe(unsupportedDecoder, callback);
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic /unsupported/topic has not been configured on this instance."
);
}
});
it("should successfully subscribe to supported pubsub topic", async () => { it("should successfully subscribe to supported pubsub topic", async () => {
const addStub = sinon.stub(Subscription.prototype, "add").resolves(true); const addStub = sinon.stub(Subscription.prototype, "add").resolves(true);
const startStub = sinon.stub(Subscription.prototype, "start"); const startStub = sinon.stub(Subscription.prototype, "start");
@ -64,22 +48,6 @@ describe("Filter SDK", () => {
expect(startStub.calledOnce).to.be.true; expect(startStub.calledOnce).to.be.true;
}); });
it("should throw error when unsubscribing with unsupported pubsub topic", async () => {
const unsupportedDecoder = createDecoder(
CONTENT_TOPIC,
"/unsupported/topic"
);
try {
await filter.unsubscribe(unsupportedDecoder);
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.include(
"Pubsub topic /unsupported/topic has not been configured on this instance."
);
}
});
it("should return false when unsubscribing from a non-existing subscription", async () => { it("should return false when unsubscribing from a non-existing subscription", async () => {
const result = await filter.unsubscribe(decoder); const result = await filter.unsubscribe(decoder);
expect(result).to.be.false; expect(result).to.be.false;
@ -183,9 +151,9 @@ type MockFilterOptions = {
}; };
function mockFilter(options: MockFilterOptions): Filter { function mockFilter(options: MockFilterOptions): Filter {
const filter = new Filter({ // we're not actually testing FilterCore functionality here
return new Filter({
libp2p: options.libp2p, libp2p: options.libp2p,
connectionManager: options.connectionManager || mockConnectionManager(),
peerManager: options.peerManager || mockPeerManager(), peerManager: options.peerManager || mockPeerManager(),
options: { options: {
numPeersToUse: 2, numPeersToUse: 2,
@ -193,9 +161,6 @@ function mockFilter(options: MockFilterOptions): Filter {
keepAliveIntervalMs: 60_000 keepAliveIntervalMs: 60_000
} }
}); });
// we're not actually testing FilterCore functionality here
return filter;
} }
function createMockMessage(contentTopic: string): IProtoMessage { function createMockMessage(contentTopic: string): IProtoMessage {

View File

@ -1,4 +1,4 @@
import { ConnectionManager, FilterCore } from "@waku/core"; import { FilterCore } from "@waku/core";
import type { import type {
Callback, Callback,
FilterProtocolOptions, FilterProtocolOptions,
@ -21,7 +21,6 @@ type PubsubTopic = string;
export class Filter implements IFilter { export class Filter implements IFilter {
private readonly protocol: FilterCore; private readonly protocol: FilterCore;
private readonly peerManager: PeerManager; private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly config: FilterProtocolOptions; private readonly config: FilterProtocolOptions;
private subscriptions = new Map<PubsubTopic, Subscription>(); private subscriptions = new Map<PubsubTopic, Subscription>();
@ -35,7 +34,6 @@ export class Filter implements IFilter {
}; };
this.peerManager = params.peerManager; this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.protocol = new FilterCore( this.protocol = new FilterCore(
this.onIncomingMessage.bind(this), this.onIncomingMessage.bind(this),
@ -75,7 +73,6 @@ export class Filter implements IFilter {
); );
this.throwIfTopicNotSame(pubsubTopics); this.throwIfTopicNotSame(pubsubTopics);
this.throwIfTopicNotSupported(singlePubsubTopic);
let subscription = this.subscriptions.get(singlePubsubTopic); let subscription = this.subscriptions.get(singlePubsubTopic);
if (!subscription) { if (!subscription) {
@ -117,7 +114,6 @@ export class Filter implements IFilter {
); );
this.throwIfTopicNotSame(pubsubTopics); this.throwIfTopicNotSame(pubsubTopics);
this.throwIfTopicNotSupported(singlePubsubTopic);
const subscription = this.subscriptions.get(singlePubsubTopic); const subscription = this.subscriptions.get(singlePubsubTopic);
if (!subscription) { if (!subscription) {
@ -170,15 +166,4 @@ export class Filter implements IFilter {
); );
} }
} }
private throwIfTopicNotSupported(pubsubTopic: string): void {
const supportedPubsubTopic =
this.connectionManager.isTopicConfigured(pubsubTopic);
if (!supportedPubsubTopic) {
throw Error(
`Pubsub topic ${pubsubTopic} has not been configured on this instance.`
);
}
}
} }

View File

@ -1,15 +1,13 @@
import { ConnectionManager } from "@waku/core"; import type { FilterCore } from "@waku/core";
import { FilterCore } from "@waku/core";
import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces"; import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto"; import type { WakuMessage } from "@waku/proto";
import { PeerManager } from "../peer_manager/index.js"; import type { PeerManager } from "../peer_manager/index.js";
export type FilterConstructorParams = { export type FilterConstructorParams = {
options?: Partial<FilterProtocolOptions>; options?: Partial<FilterProtocolOptions>;
libp2p: Libp2p; libp2p: Libp2p;
peerManager: PeerManager; peerManager: PeerManager;
connectionManager: ConnectionManager;
}; };
export type SubscriptionEvents = { export type SubscriptionEvents = {

View File

@ -1,10 +1,5 @@
import { Peer, PeerId } from "@libp2p/interface"; import { Peer, PeerId } from "@libp2p/interface";
import { import { createEncoder, Encoder, LightPushCodec } from "@waku/core";
ConnectionManager,
createEncoder,
Encoder,
LightPushCodec
} from "@waku/core";
import { Libp2p, ProtocolError } from "@waku/interfaces"; import { Libp2p, ProtocolError } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes"; import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai"; import { expect } from "chai";
@ -14,7 +9,6 @@ import { PeerManager } from "../peer_manager/index.js";
import { LightPush } from "./light_push.js"; import { LightPush } from "./light_push.js";
const PUBSUB_TOPIC = "/waku/2/rs/1/4";
const CONTENT_TOPIC = "/test/1/waku-light-push/utf8"; const CONTENT_TOPIC = "/test/1/waku-light-push/utf8";
describe("LightPush SDK", () => { describe("LightPush SDK", () => {
@ -28,19 +22,6 @@ describe("LightPush SDK", () => {
lightPush = mockLightPush({ libp2p }); lightPush = mockLightPush({ libp2p });
}); });
it("should fail to send if pubsub topics are misconfigured", async () => {
lightPush = mockLightPush({ libp2p, pubsubTopics: ["/wrong"] });
const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
});
const failures = result.failures ?? [];
expect(failures.length).to.be.eq(1);
expect(failures.some((v) => v.error === ProtocolError.TOPIC_NOT_CONFIGURED))
.to.be.true;
});
it("should fail to send if no connected peers found", async () => { it("should fail to send if no connected peers found", async () => {
const result = await lightPush.send(encoder, { const result = await lightPush.send(encoder, {
payload: utf8ToBytes("test") payload: utf8ToBytes("test")
@ -168,10 +149,6 @@ type MockLightPushOptions = {
function mockLightPush(options: MockLightPushOptions): LightPush { function mockLightPush(options: MockLightPushOptions): LightPush {
const lightPush = new LightPush({ const lightPush = new LightPush({
connectionManager: {
isTopicConfigured: (topic: string) =>
(options.pubsubTopics || [PUBSUB_TOPIC]).includes(topic)
} as unknown as ConnectionManager,
peerManager: { peerManager: {
getPeers: () => getPeers: () =>
options.libp2p options.libp2p

View File

@ -1,5 +1,5 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { ConnectionManager, LightPushCore } from "@waku/core"; import { LightPushCore } from "@waku/core";
import { import {
type CoreProtocolResult, type CoreProtocolResult,
Failure, Failure,
@ -8,7 +8,7 @@ import {
type IMessage, type IMessage,
type ISendOptions, type ISendOptions,
type Libp2p, type Libp2p,
LightPushProtocolOptions, type LightPushProtocolOptions,
ProtocolError, ProtocolError,
Protocols, Protocols,
SDKProtocolResult SDKProtocolResult
@ -30,7 +30,6 @@ const DEFAULT_SEND_OPTIONS: LightPushProtocolOptions = {
}; };
type LightPushConstructorParams = { type LightPushConstructorParams = {
connectionManager: ConnectionManager;
peerManager: PeerManager; peerManager: PeerManager;
libp2p: Libp2p; libp2p: Libp2p;
options?: Partial<LightPushProtocolOptions>; options?: Partial<LightPushProtocolOptions>;
@ -40,7 +39,6 @@ export class LightPush implements ILightPush {
private readonly config: LightPushProtocolOptions; private readonly config: LightPushProtocolOptions;
private readonly retryManager: RetryManager; private readonly retryManager: RetryManager;
private readonly peerManager: PeerManager; private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly protocol: LightPushCore; private readonly protocol: LightPushCore;
public constructor(params: LightPushConstructorParams) { public constructor(params: LightPushConstructorParams) {
@ -50,7 +48,6 @@ export class LightPush implements ILightPush {
} as LightPushProtocolOptions; } as LightPushProtocolOptions;
this.peerManager = params.peerManager; this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.protocol = new LightPushCore(params.libp2p); this.protocol = new LightPushCore(params.libp2p);
this.retryManager = new RetryManager({ this.retryManager = new RetryManager({
peerManager: params.peerManager, peerManager: params.peerManager,
@ -84,17 +81,6 @@ export class LightPush implements ILightPush {
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
if (!this.connectionManager.isTopicConfigured(pubsubTopic)) {
return {
successes: [],
failures: [
{
error: ProtocolError.TOPIC_NOT_CONFIGURED
}
]
};
}
const peerIds = await this.peerManager.getPeers({ const peerIds = await this.peerManager.getPeers({
protocol: Protocols.LightPush, protocol: Protocols.LightPush,
pubsubTopic: encoder.pubsubTopic pubsubTopic: encoder.pubsubTopic

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface";
import { peerIdFromString } from "@libp2p/peer-id"; import { peerIdFromString } from "@libp2p/peer-id";
import { multiaddr } from "@multiformats/multiaddr"; import { multiaddr } from "@multiformats/multiaddr";
import { ConnectionManager, messageHash, StoreCore } from "@waku/core"; import { messageHash, StoreCore } from "@waku/core";
import { import {
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
@ -21,7 +21,6 @@ const log = new Logger("waku:store:sdk");
type StoreConstructorParams = { type StoreConstructorParams = {
libp2p: Libp2p; libp2p: Libp2p;
peerManager: PeerManager; peerManager: PeerManager;
connectionManager: ConnectionManager;
options?: Partial<StoreProtocolOptions>; options?: Partial<StoreProtocolOptions>;
}; };
@ -33,13 +32,11 @@ export class Store implements IStore {
private readonly options: Partial<StoreProtocolOptions>; private readonly options: Partial<StoreProtocolOptions>;
private readonly libp2p: Libp2p; private readonly libp2p: Libp2p;
private readonly peerManager: PeerManager; private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly protocol: StoreCore; private readonly protocol: StoreCore;
public constructor(params: StoreConstructorParams) { public constructor(params: StoreConstructorParams) {
this.options = params.options || {}; this.options = params.options || {};
this.peerManager = params.peerManager; this.peerManager = params.peerManager;
this.connectionManager = params.connectionManager;
this.libp2p = params.libp2p; this.libp2p = params.libp2p;
this.protocol = new StoreCore(params.libp2p); this.protocol = new StoreCore(params.libp2p);
@ -229,14 +226,6 @@ export class Store implements IStore {
} }
const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0]; const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0];
const isTopicSupported =
this.connectionManager.isTopicConfigured(pubsubTopicForQuery);
if (!isTopicSupported) {
throw new Error(
`Pubsub topic ${pubsubTopicForQuery} has not been configured on this instance.`
);
}
const decodersAsMap = new Map(); const decodersAsMap = new Map();
decoders.forEach((dec) => { decoders.forEach((dec) => {
@ -271,11 +260,9 @@ export class Store implements IStore {
pubsubTopic pubsubTopic
}); });
const peer = this.options.peers return this.options.peers
? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers) ? await this.getPeerFromConfigurationOrFirst(peers, this.options.peers)
: peers[0]; : peers[0];
return peer;
} }
private async getPeerFromConfigurationOrFirst( private async getPeerFromConfigurationOrFirst(

View File

@ -20,8 +20,7 @@ import type {
IWaku, IWaku,
IWakuEventEmitter, IWakuEventEmitter,
Libp2p, Libp2p,
NetworkConfig, NetworkConfig
PubsubTopic
} from "@waku/interfaces"; } from "@waku/interfaces";
import { import {
DefaultNetworkConfig, DefaultNetworkConfig,
@ -67,7 +66,6 @@ export class WakuNode implements IWaku {
private readonly healthIndicator: HealthIndicator; private readonly healthIndicator: HealthIndicator;
public constructor( public constructor(
pubsubTopics: PubsubTopic[],
options: CreateNodeOptions, options: CreateNodeOptions,
libp2p: Libp2p, libp2p: Libp2p,
protocolsEnabled: ProtocolsEnabled, protocolsEnabled: ProtocolsEnabled,
@ -90,7 +88,6 @@ export class WakuNode implements IWaku {
libp2p, libp2p,
relay: this.relay, relay: this.relay,
events: this.events, events: this.events,
pubsubTopics: pubsubTopics,
networkConfig: this.networkConfig, networkConfig: this.networkConfig,
config: options?.connectionManager config: options?.connectionManager
}); });
@ -108,7 +105,6 @@ export class WakuNode implements IWaku {
if (protocolsEnabled.store) { if (protocolsEnabled.store) {
this.store = new Store({ this.store = new Store({
libp2p, libp2p,
connectionManager: this.connectionManager,
peerManager: this.peerManager, peerManager: this.peerManager,
options: options?.store options: options?.store
}); });
@ -118,7 +114,6 @@ export class WakuNode implements IWaku {
this.lightPush = new LightPush({ this.lightPush = new LightPush({
libp2p, libp2p,
peerManager: this.peerManager, peerManager: this.peerManager,
connectionManager: this.connectionManager,
options: options?.lightPush options: options?.lightPush
}); });
} }
@ -126,7 +121,6 @@ export class WakuNode implements IWaku {
if (protocolsEnabled.filter) { if (protocolsEnabled.filter) {
this.filter = new Filter({ this.filter = new Filter({
libp2p, libp2p,
connectionManager: this.connectionManager,
peerManager: this.peerManager, peerManager: this.peerManager,
options: options.filter options: options.filter
}); });

View File

@ -1,5 +1,5 @@
import { LightNode, ProtocolError } from "@waku/interfaces"; import { LightNode } from "@waku/interfaces";
import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { import {
contentTopicToPubsubTopic, contentTopicToPubsubTopic,
contentTopicToShardIndex contentTopicToShardIndex
@ -201,58 +201,4 @@ describe("Autosharding: Running Nodes", function () {
}) })
).to.eq(true); ).to.eq(true);
}); });
it("using a protocol with unconfigured pubsub topic should fail", async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
{ clusterId, contentTopics: [ContentTopic] },
{ lightpush: true, filter: true },
false,
numServiceNodes,
true
);
// use a content topic that is not configured
const encoder = createEncoder({
contentTopic: ContentTopic2,
pubsubTopicShardInfo: {
clusterId: clusterId,
shard: contentTopicToShardIndex(ContentTopic2)
}
});
const { successes, failures } = await waku.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
if (successes.length > 0 || failures?.length === 0) {
throw new Error("The request should've thrown an error");
}
const errors = failures?.map((failure) => failure.error);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
});
it("start node with empty content topic", async function () {
try {
waku = await createLightNode({
networkConfig: {
clusterId: clusterId,
contentTopics: []
}
});
throw new Error(
"Starting the node with no content topic should've thrown an error"
);
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
"Invalid content topics configuration: please provide at least one content topic"
)
) {
throw err;
}
}
});
}); });

View File

@ -1,5 +1,5 @@
import { LightNode, ProtocolError, SingleShardInfo } from "@waku/interfaces"; import { LightNode, SingleShardInfo } from "@waku/interfaces";
import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { createEncoder, utf8ToBytes } from "@waku/sdk";
import { import {
shardInfoToPubsubTopics, shardInfoToPubsubTopics,
singleShardInfosToShardInfo, singleShardInfosToShardInfo,
@ -197,52 +197,5 @@ describe("Static Sharding: Running Nodes", function () {
}) })
).to.eq(true); ).to.eq(true);
}); });
it("using a protocol with unconfigured pubsub topic should fail", async function () {
this.timeout(15_000);
// use a pubsub topic that is not configured
const encoder = createEncoder({
contentTopic: ContentTopic,
pubsubTopicShardInfo: {
clusterId,
shard: 4
}
});
const request = await waku?.lightPush.send(encoder, {
payload: utf8ToBytes("Hello World")
});
if (
(request?.successes.length || 0) > 0 ||
request?.failures?.length === 0
) {
throw new Error("The request should've thrown an error");
}
const errors = request?.failures?.map((failure) => failure.error);
expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED);
});
it("start node with empty shard should fail", async function () {
try {
waku = await createLightNode({
networkConfig: { clusterId: clusterId, shards: [] }
});
throw new Error(
"Starting the node with no shard should've thrown an error"
);
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
"Invalid shards configuration: please provide at least one shard"
)
) {
throw err;
}
}
});
}); });
}); });

View File

@ -1,4 +1,3 @@
import { createDecoder } from "@waku/core";
import { IMessage, type LightNode } from "@waku/interfaces"; import { IMessage, type LightNode } from "@waku/interfaces";
import { determinePubsubTopic } from "@waku/utils"; import { determinePubsubTopic } from "@waku/utils";
import { expect } from "chai"; import { expect } from "chai";
@ -13,7 +12,6 @@ import {
import { import {
processQueriedMessages, processQueriedMessages,
runStoreNodes, runStoreNodes,
TestContentTopic1,
TestDecoder, TestDecoder,
TestDecoder2, TestDecoder2,
TestShardInfo TestShardInfo
@ -32,28 +30,6 @@ describe("Waku Store, error handling", function () {
await tearDownNodes(nwaku, waku); await tearDownNodes(nwaku, waku);
}); });
it("Query Generator, Wrong PubsubTopic", async function () {
const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic");
try {
for await (const msgPromises of waku.store.queryGenerator([
wrongDecoder
])) {
void msgPromises;
}
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.`
)
) {
throw err;
}
}
});
it("Query Generator, Multiple PubsubTopics", async function () { it("Query Generator, Multiple PubsubTopics", async function () {
try { try {
for await (const msgPromises of waku.store.queryGenerator([ for await (const msgPromises of waku.store.queryGenerator([
@ -101,23 +77,6 @@ describe("Waku Store, error handling", function () {
expect(messages?.length).eq(0); expect(messages?.length).eq(0);
}); });
it("Query with Ordered Callback, Wrong PubsubTopic", async function () {
const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic");
try {
await waku.store.queryWithOrderedCallback([wrongDecoder], async () => {});
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.`
)
) {
throw err;
}
}
});
it("Query with Ordered Callback, Multiple PubsubTopics", async function () { it("Query with Ordered Callback, Multiple PubsubTopics", async function () {
try { try {
await waku.store.queryWithOrderedCallback( await waku.store.queryWithOrderedCallback(
@ -159,23 +118,6 @@ describe("Waku Store, error handling", function () {
expect(messages?.length).eq(0); expect(messages?.length).eq(0);
}); });
it("Query with Promise Callback, Wrong PubsubTopic", async function () {
const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic");
try {
await waku.store.queryWithPromiseCallback([wrongDecoder], async () => {});
throw new Error("QueryGenerator was successful but was expected to fail");
} catch (err) {
if (
!(err instanceof Error) ||
!err.message.includes(
`Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.`
)
) {
throw err;
}
}
});
it("Query with Promise Callback, Multiple PubsubTopics", async function () { it("Query with Promise Callback, Multiple PubsubTopics", async function () {
try { try {
await waku.store.queryWithPromiseCallback( await waku.store.queryWithPromiseCallback(