diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index 68f17d4a71..6c2430e5a5 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -8,7 +8,8 @@ import { type ThisOrThat } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { isMessageSizeUnderCap, Logger } from "@waku/utils"; +import { isMessageSizeUnderCap } from "@waku/utils"; +import { Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -62,10 +63,7 @@ export class LightPushCore { }; } - const query = PushRpc.createRequest( - protoMessage, - encoder.routingInfo.pubsubTopic - ); + const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic); return { query, error: null }; } catch (error) { log.error("Failed to prepare push message", error); diff --git a/packages/core/src/lib/message/version_0.spec.ts b/packages/core/src/lib/message/version_0.spec.ts index 45d7da6c12..d95963905c 100644 --- a/packages/core/src/lib/message/version_0.spec.ts +++ b/packages/core/src/lib/message/version_0.spec.ts @@ -153,9 +153,7 @@ describe("Sets sharding configuration correctly", () => { }); // When autosharding is enabled, we expect the shard index to be 1 - expect(autoshardingEncoder.routingInfo.pubsubTopic).to.be.eq( - "/waku/2/rs/0/0" - ); + expect(autoshardingEncoder.pubsubTopic).to.be.eq("/waku/2/rs/0/0"); // Create an encoder setup to use static sharding with the same content topic const staticshardingEncoder = createEncoder({ @@ -164,8 +162,6 @@ describe("Sets sharding configuration correctly", () => { }); // When static sharding is enabled, we expect the shard index to be 0 - expect(staticshardingEncoder.routingInfo.pubsubTopic).to.be.eq( - "/waku/2/rs/0/3" - ); + expect(staticshardingEncoder.pubsubTopic).to.be.eq("/waku/2/rs/0/3"); }); }); diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index bb7e48fc36..a8706817d9 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -7,7 +7,8 @@ import type { IMetaSetter, IProtoMessage, IRateLimitProof, - IRoutingInfo + IRoutingInfo, + PubsubTopic } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -78,6 +79,10 @@ export class Encoder implements IEncoder { } } + public get pubsubTopic(): PubsubTopic { + return this.routingInfo.pubsubTopic; + } + public async toWire(message: IMessage): Promise { return proto.WakuMessage.encode(await this.toProtoObj(message)); } @@ -133,6 +138,10 @@ export class Decoder implements IDecoder { } } + public get pubsubTopic(): PubsubTopic { + return this.routingInfo.pubsubTopic; + } + public fromWireToProtoObj( bytes: Uint8Array ): Promise { diff --git a/packages/core/src/lib/store/rpc.spec.ts b/packages/core/src/lib/store/rpc.spec.ts index ecea28e3c0..6e38449c2f 100644 --- a/packages/core/src/lib/store/rpc.spec.ts +++ b/packages/core/src/lib/store/rpc.spec.ts @@ -1,17 +1,11 @@ -import { createRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import { StoreQueryRequest } from "./rpc.js"; -const routingInfo = createRoutingInfo( - { clusterId: 0 }, - { pubsubTopic: "/waku/2/rs/0/0" } -); - describe("StoreQueryRequest validation", () => { it("accepts valid content-filtered query", () => { const request = StoreQueryRequest.create({ - routingInfo, + pubsubTopic: "/waku/2/default-waku/proto", contentTopics: ["/test/1/content/proto"], includeData: true, paginationForward: true @@ -22,7 +16,7 @@ describe("StoreQueryRequest validation", () => { it("rejects content-filtered query with only pubsubTopic", () => { expect(() => StoreQueryRequest.create({ - routingInfo, + pubsubTopic: "/waku/2/default-waku/proto", contentTopics: [], includeData: true, paginationForward: true @@ -32,9 +26,22 @@ describe("StoreQueryRequest validation", () => { ); }); + it("rejects content-filtered query with only contentTopics", () => { + expect(() => + StoreQueryRequest.create({ + pubsubTopic: "", + contentTopics: ["/test/1/content/proto"], + includeData: true, + paginationForward: true + }) + ).to.throw( + "Both pubsubTopic and contentTopics must be set together for content-filtered queries" + ); + }); + it("accepts valid message hash query", () => { const request = StoreQueryRequest.create({ - routingInfo, + pubsubTopic: "", contentTopics: [], messageHashes: [new Uint8Array([1, 2, 3, 4])], includeData: true, @@ -47,7 +54,7 @@ describe("StoreQueryRequest validation", () => { expect(() => StoreQueryRequest.create({ messageHashes: [new Uint8Array([1, 2, 3, 4])], - routingInfo, + pubsubTopic: "/waku/2/default-waku/proto", contentTopics: ["/test/1/content/proto"], includeData: true, paginationForward: true @@ -60,7 +67,7 @@ describe("StoreQueryRequest validation", () => { it("rejects hash query with time filter", () => { expect(() => StoreQueryRequest.create({ - routingInfo, + pubsubTopic: "", contentTopics: [], messageHashes: [new Uint8Array([1, 2, 3, 4])], timeStart: new Date(), @@ -74,7 +81,7 @@ describe("StoreQueryRequest validation", () => { it("accepts time-filtered query with content filter", () => { const request = StoreQueryRequest.create({ - routingInfo, + pubsubTopic: "/waku/2/default-waku/proto", contentTopics: ["/test/1/content/proto"], timeStart: new Date(Date.now() - 3600000), timeEnd: new Date(), diff --git a/packages/core/src/lib/store/rpc.ts b/packages/core/src/lib/store/rpc.ts index 3fcc00f8ab..0055ed96a3 100644 --- a/packages/core/src/lib/store/rpc.ts +++ b/packages/core/src/lib/store/rpc.ts @@ -42,9 +42,9 @@ export class StoreQueryRequest { } } else { if ( - (params.routingInfo && + (params.pubsubTopic && (!params.contentTopics || params.contentTopics.length === 0)) || - (!params.routingInfo && + (!params.pubsubTopic && params.contentTopics && params.contentTopics.length > 0) ) { diff --git a/packages/core/src/lib/store/store.spec.ts b/packages/core/src/lib/store/store.spec.ts index fbe340ab90..1cf61eb878 100644 --- a/packages/core/src/lib/store/store.spec.ts +++ b/packages/core/src/lib/store/store.spec.ts @@ -2,7 +2,6 @@ import type { PeerId } from "@libp2p/interface"; import { IDecodedMessage, IDecoder, - IRoutingInfo, Libp2p, QueryRequestParams } from "@waku/interfaces"; @@ -79,15 +78,9 @@ describe("StoreCore", () => { let mockStoreQueryRequest: any; let mockStoreQueryResponse: any; - const routingInfo: IRoutingInfo = { - pubsubTopic: "test-topic", - shardId: 1, - clusterId: 0 - }; - beforeEach(() => { queryOpts = { - routingInfo, + pubsubTopic: "test-topic", contentTopics: ["test-topic"], paginationLimit: 10, includeData: true, diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index 61f6f07737..ce61b7a553 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -76,7 +76,7 @@ export class StoreCore { log.info("Sending store query request:", { hasMessageHashes: !!queryOpts.messageHashes?.length, messageHashCount: queryOpts.messageHashes?.length, - routingInfo: queryOpts.routingInfo, + pubsubTopic: queryOpts.pubsubTopic, contentTopics: queryOpts.contentTopics }); diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 0e076f0b3e..1b34700010 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -95,13 +95,14 @@ export interface IEncoder { contentTopic: string; ephemeral: boolean; routingInfo: IRoutingInfo; + pubsubTopic: PubsubTopic; toWire: (message: IMessage) => Promise; toProtoObj: (message: IMessage) => Promise; } export interface IDecoder { contentTopic: string; - routingInfo: IRoutingInfo; + pubsubTopic: PubsubTopic; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: ( pubsubTopic: string, diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index a8feebb236..014842aaa6 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -1,5 +1,4 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; -import { IRoutingInfo } from "./sharding.js"; export type StoreCursor = Uint8Array; @@ -16,10 +15,10 @@ export type QueryRequestParams = { includeData: boolean; /** - * The routing information to query. This field is mandatory. - * The query will only return messages that were published on this specific route (cluster and shard). + * The pubsub topic to query. This field is mandatory. + * The query will only return messages that were published on this specific pubsub topic. */ - routingInfo: IRoutingInfo; + pubsubTopic: string; /** * The content topics to filter the messages. diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 1628b2481f..0cdf17975e 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -7,7 +7,8 @@ import { type IMessage, type IMetaSetter, type IProtoMessage, - type IRoutingInfo + type IRoutingInfo, + type PubsubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -46,6 +47,10 @@ class Encoder implements IEncoder { } } + public get pubsubTopic(): PubsubTopic { + return this.routingInfo.pubsubTopic; + } + public async toWire(message: IMessage): Promise { const protoMessage = await this.toProtoObj(message); if (!protoMessage) return; diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 2261e01751..87fcbbb8fc 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -7,7 +7,8 @@ import type { IMessage, IMetaSetter, IProtoMessage, - IRoutingInfo + IRoutingInfo, + PubsubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; @@ -46,6 +47,10 @@ class Encoder implements IEncoder { } } + public get pubsubTopic(): PubsubTopic { + return this.routingInfo.pubsubTopic; + } + public async toWire(message: IMessage): Promise { const protoMessage = await this.toProtoObj(message); if (!protoMessage) return; diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index 6c4c5b9136..6f6dd98fa7 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -128,7 +128,7 @@ export class Relay implements IRelay { encoder: IEncoder, message: IMessage ): Promise { - const { pubsubTopic } = encoder.routingInfo; + const { pubsubTopic } = encoder; if (!this.pubsubTopics.has(pubsubTopic)) { log.error("Failed to send waku relay: topic not configured"); return { @@ -180,7 +180,7 @@ export class Relay implements IRelay { const observers: Array<[PubsubTopic, Observer]> = []; for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) { - const { pubsubTopic } = decoder.routingInfo; + const { pubsubTopic } = decoder; const ctObs: Map>> = this.observers.get( pubsubTopic ) ?? new Map(); diff --git a/packages/relay/src/topic_only_message.ts b/packages/relay/src/topic_only_message.ts index d5332b9d9f..0929361166 100644 --- a/packages/relay/src/topic_only_message.ts +++ b/packages/relay/src/topic_only_message.ts @@ -2,7 +2,8 @@ import type { IDecoder, IProtoMessage, IRoutingInfo, - ITopicOnlyMessage + ITopicOnlyMessage, + PubsubTopic } from "@waku/interfaces"; import { TopicOnlyMessage as ProtoTopicOnlyMessage } from "@waku/proto"; @@ -32,6 +33,10 @@ export class TopicOnlyMessage implements ITopicOnlyMessage { export class ContentTopicOnlyDecoder implements IDecoder { public constructor() {} + public get pubsubTopic(): PubsubTopic { + throw "Pubsub Topic is not available on this decoder, it is only meant to decode the content topic for any message"; + } + public get contentTopic(): string { throw "ContentTopic is not available on this decoder, it is only meant to decode the content topic for any message"; } diff --git a/packages/rln/src/codec.ts b/packages/rln/src/codec.ts index 21be117c1e..25a441b9bb 100644 --- a/packages/rln/src/codec.ts +++ b/packages/rln/src/codec.ts @@ -5,7 +5,8 @@ import type { IMessage, IProtoMessage, IRateLimitProof, - IRoutingInfo + IRoutingInfo, + PubsubTopic } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -28,6 +29,10 @@ export class RLNEncoder implements IEncoder { this.idSecretHash = identityCredential.IDSecretHash; } + public get pubsubTopic(): PubsubTopic { + return this.encoder.pubsubTopic; + } + public async toWire(message: IMessage): Promise { message.rateLimitProof = await this.generateProof(message); log.info("Proof generated", message.rateLimitProof); @@ -93,8 +98,8 @@ export class RLNDecoder private readonly decoder: IDecoder ) {} - public get routingInfo(): IRoutingInfo { - return this.decoder.routingInfo; + public get pubsubTopic(): PubsubTopic { + return this.decoder.pubsubTopic; } public get contentTopic(): string { diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index b686822f30..43895fab7c 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -63,21 +63,21 @@ export class Filter implements IFilter { throw Error("Cannot subscribe with 0 decoders."); } - const routingInfos = decoders.map((v) => v.routingInfo); - const routingInfo = routingInfos[0]; + const pubsubTopics = decoders.map((v) => v.pubsubTopic); + const singlePubsubTopic = pubsubTopics[0]; const contentTopics = decoders.map((v) => v.contentTopic); log.info( - `Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${routingInfo.pubsubTopic}` + `Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}` ); - this.throwIfTopicNotSame(routingInfos.map((r) => r.pubsubTopic)); + this.throwIfTopicNotSame(pubsubTopics); - let subscription = this.subscriptions.get(routingInfo.pubsubTopic); + let subscription = this.subscriptions.get(singlePubsubTopic); if (!subscription) { subscription = new Subscription({ - routingInfo: routingInfo, + pubsubTopic: singlePubsubTopic, protocol: this.protocol, config: this.config, peerManager: this.peerManager @@ -86,7 +86,7 @@ export class Filter implements IFilter { } const result = await subscription.add(decoders, callback); - this.subscriptions.set(routingInfo.pubsubTopic, subscription); + this.subscriptions.set(singlePubsubTopic, subscription); log.info( `Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}` @@ -104,7 +104,7 @@ export class Filter implements IFilter { throw Error("Cannot unsubscribe with 0 decoders."); } - const pubsubTopics = decoders.map((v) => v.routingInfo.pubsubTopic); + const pubsubTopics = decoders.map((v) => v.pubsubTopic); const singlePubsubTopic = pubsubTopics[0]; const contentTopics = decoders.map((v) => v.contentTopic); diff --git a/packages/sdk/src/filter/subscription.spec.ts b/packages/sdk/src/filter/subscription.spec.ts index e65128092f..37f3d48ed3 100644 --- a/packages/sdk/src/filter/subscription.spec.ts +++ b/packages/sdk/src/filter/subscription.spec.ts @@ -1,12 +1,10 @@ import { FilterCore } from "@waku/core"; import type { - AutoSharding, FilterProtocolOptions, IDecodedMessage, IDecoder } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { createRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import sinon from "sinon"; @@ -16,13 +14,7 @@ import { Subscription } from "./subscription.js"; const PUBSUB_TOPIC = "/waku/2/rs/1/4"; const CONTENT_TOPIC = "/test/1/waku-filter/utf8"; -const NETWORK_CONFIG: AutoSharding = { - clusterId: 2, - numShardsInCluster: 3 -}; -const ROUTING_INFO = createRoutingInfo(NETWORK_CONFIG, { - contentTopic: CONTENT_TOPIC -}); + describe("Filter Subscription", () => { let filterCore: FilterCore; let peerManager: PeerManager; @@ -40,7 +32,7 @@ describe("Filter Subscription", () => { }; subscription = new Subscription({ - routingInfo: ROUTING_INFO, + pubsubTopic: PUBSUB_TOPIC, protocol: filterCore, config, peerManager @@ -87,11 +79,9 @@ describe("Filter Subscription", () => { }); it("should invoke callbacks when receiving a message", async () => { - const testContentTopic = "/custom/0/content/proto"; + const testContentTopic = "/custom/content/topic"; const testDecoder = { - routingInfo: createRoutingInfo(NETWORK_CONFIG, { - contentTopic: testContentTopic - }), + pubsubTopic: PUBSUB_TOPIC, contentTopic: testContentTopic, fromProtoObj: sinon.stub().callsFake(() => { return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) }); @@ -116,11 +106,9 @@ describe("Filter Subscription", () => { }); it("should invoke callbacks only when newly receiving message is given", async () => { - const testContentTopic = "/custom/0/content/topic"; + const testContentTopic = "/custom/content/topic"; const testDecoder = { - routingInfo: createRoutingInfo(NETWORK_CONFIG, { - contentTopic: testContentTopic - }), + pubsubTopic: PUBSUB_TOPIC, contentTopic: testContentTopic, fromProtoObj: sinon.stub().callsFake(() => { return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) }); diff --git a/packages/sdk/src/filter/subscription.ts b/packages/sdk/src/filter/subscription.ts index e35429572d..00804f5601 100644 --- a/packages/sdk/src/filter/subscription.ts +++ b/packages/sdk/src/filter/subscription.ts @@ -10,9 +10,7 @@ import type { IDecodedMessage, IDecoder, IProtoMessage, - IRoutingInfo, - PeerIdStr, - PubsubTopic + PeerIdStr } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; @@ -37,8 +35,7 @@ type AttemptUnsubscribeParams = { type Libp2pEventHandler = (e: CustomEvent) => void; export class Subscription { - private readonly routingInfo: IRoutingInfo; - private readonly pubsubTopic: PubsubTopic; + private readonly pubsubTopic: string; private readonly protocol: FilterCore; private readonly peerManager: PeerManager; @@ -76,8 +73,7 @@ export class Subscription { public constructor(params: SubscriptionParams) { this.config = params.config; - this.routingInfo = params.routingInfo; - this.pubsubTopic = params.routingInfo.pubsubTopic; + this.pubsubTopic = params.pubsubTopic; this.protocol = params.protocol; this.peerManager = params.peerManager; @@ -197,7 +193,7 @@ export class Subscription { if (this.callbacks.has(decoder)) { log.warn( - `Replacing callback associated associated with decoder with pubsubTopic:${decoder.routingInfo.pubsubTopic} and contentTopic:${decoder.contentTopic}` + `Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` ); const callback = this.callbacks.get(decoder); @@ -209,7 +205,7 @@ export class Subscription { void (async (): Promise => { try { const message = await decoder.fromProtoObj( - decoder.routingInfo.pubsubTopic, + decoder.pubsubTopic, event.detail as IProtoMessage ); void callback(message!); @@ -234,7 +230,7 @@ export class Subscription { if (!callback) { log.warn( - `No callback associated with decoder with pubsubTopic:${decoder.routingInfo.pubsubTopic} and contentTopic:${decoder.contentTopic}` + `No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}` ); } @@ -417,13 +413,11 @@ export class Subscription { const usablePeer = await this.peerManager.isPeerOnPubsub( event.detail, - this.routingInfo.pubsubTopic + this.pubsubTopic ); if (!usablePeer) { - log.info( - `Peer ${id} doesn't support pubsubTopic:${this.routingInfo.pubsubTopic}` - ); + log.info(`Peer ${id} doesn't support pubsubTopic:${this.pubsubTopic}`); return; } @@ -489,7 +483,7 @@ export class Subscription { const prevPeers = new Set(this.peers.keys()); const peersToAdd = await this.peerManager.getPeers({ protocol: Protocols.Filter, - routingInfo: this.routingInfo + pubsubTopic: this.pubsubTopic }); for (const peer of peersToAdd) { diff --git a/packages/sdk/src/filter/types.ts b/packages/sdk/src/filter/types.ts index f010f45440..44326728d1 100644 --- a/packages/sdk/src/filter/types.ts +++ b/packages/sdk/src/filter/types.ts @@ -1,9 +1,5 @@ import type { FilterCore } from "@waku/core"; -import type { - FilterProtocolOptions, - IRoutingInfo, - Libp2p -} from "@waku/interfaces"; +import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces"; import type { WakuMessage } from "@waku/proto"; import type { PeerManager } from "../peer_manager/index.js"; @@ -19,7 +15,7 @@ export type SubscriptionEvents = { }; export type SubscriptionParams = { - routingInfo: IRoutingInfo; + pubsubTopic: string; protocol: FilterCore; config: FilterProtocolOptions; peerManager: PeerManager; diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 13dc92089e..947ce1528b 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -77,13 +77,13 @@ export class LightPush implements ILightPush { ...options }; - const { pubsubTopic } = encoder.routingInfo; + const { pubsubTopic } = encoder; log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic); const peerIds = await this.peerManager.getPeers({ protocol: Protocols.LightPush, - routingInfo: encoder.routingInfo + pubsubTopic: encoder.pubsubTopic }); const coreResults: CoreProtocolResult[] = diff --git a/packages/sdk/src/light_push/retry_manager.ts b/packages/sdk/src/light_push/retry_manager.ts index 380c954277..0fc156efe4 100644 --- a/packages/sdk/src/light_push/retry_manager.ts +++ b/packages/sdk/src/light_push/retry_manager.ts @@ -100,7 +100,7 @@ export class RetryManager { const peerId = ( await this.peerManager.getPeers({ protocol: Protocols.LightPush, - routingInfo: task.routingInfo + pubsubTopic: task.routingInfo.pubsubTopic }) )[0]; @@ -146,7 +146,7 @@ export class RetryManager { if (shouldPeerBeChanged(error.message)) { await this.peerManager.renewPeer(peerId, { protocol: Protocols.LightPush, - routingInfo: task.routingInfo + pubsubTopic: task.routingInfo.pubsubTopic }); } diff --git a/packages/sdk/src/peer_manager/peer_manager.spec.ts b/packages/sdk/src/peer_manager/peer_manager.spec.ts index cac779ca0e..f4eac85f81 100644 --- a/packages/sdk/src/peer_manager/peer_manager.spec.ts +++ b/packages/sdk/src/peer_manager/peer_manager.spec.ts @@ -5,7 +5,6 @@ import { Libp2p, Protocols } from "@waku/interfaces"; -import { createRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import sinon from "sinon"; @@ -18,12 +17,8 @@ describe("PeerManager", () => { let peers: any[]; let mockConnections: any[]; - const TEST_PUBSUB_TOPIC = "/waku/2/rs/0/0"; + const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8"; const TEST_PROTOCOL = Protocols.LightPush; - const TEST_ROUTING_INFO = createRoutingInfo( - { clusterId: 0 }, - { pubsubTopic: TEST_PUBSUB_TOPIC } - ); const clearPeerState = (): void => { (peerManager as any).lockedPeers.clear(); @@ -41,7 +36,7 @@ describe("PeerManager", () => { const getPeersForTest = async (): Promise => { return await peerManager.getPeers({ protocol: TEST_PROTOCOL, - routingInfo: TEST_ROUTING_INFO + pubsubTopic: TEST_PUBSUB_TOPIC }); }; @@ -131,7 +126,7 @@ describe("PeerManager", () => { const peerId = ids[0]; await peerManager.renewPeer(peerId, { protocol: TEST_PROTOCOL, - routingInfo: TEST_ROUTING_INFO + pubsubTopic: TEST_PUBSUB_TOPIC }); expect((peerManager as any).lockedPeers.has(peerId.toString())).to.be.false; expect((peerManager as any).unlockedPeers.has(peerId.toString())).to.be @@ -229,7 +224,7 @@ describe("PeerManager", () => { if (skipIfNoPeers(first)) return; await peerManager.renewPeer(first[0], { protocol: TEST_PROTOCOL, - routingInfo: TEST_ROUTING_INFO + pubsubTopic: TEST_PUBSUB_TOPIC }); const second = await getPeersForTest(); if (skipIfNoPeers(second)) return; @@ -243,7 +238,7 @@ describe("PeerManager", () => { } as any; await peerManager.renewPeer(fakePeerId, { protocol: TEST_PROTOCOL, - routingInfo: TEST_ROUTING_INFO + pubsubTopic: TEST_PUBSUB_TOPIC }); expect(true).to.be.true; }); @@ -268,7 +263,7 @@ describe("PeerManager", () => { const peerId = result[0]; await peerManager.renewPeer(peerId, { protocol: TEST_PROTOCOL, - routingInfo: TEST_ROUTING_INFO + pubsubTopic: TEST_PUBSUB_TOPIC }); const connection = mockConnections.find((c) => c.remotePeer.equals(peerId)); diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index ebf8bc82c5..a42baf7215 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -12,7 +12,6 @@ import { } from "@waku/core"; import { CONNECTION_LOCKED_TAG, - type IRoutingInfo, Libp2p, Libp2pEventHandler, Protocols @@ -35,7 +34,7 @@ type PeerManagerParams = { type GetPeersParams = { protocol: Protocols; - routingInfo: IRoutingInfo; + pubsubTopic: string; }; export enum PeerManagerEventNames { @@ -108,9 +107,7 @@ export class PeerManager { public async getPeers(params: GetPeersParams): Promise { log.info( - `Getting peers for protocol: ${params.protocol}, ` + - `clusterId: ${params.routingInfo.clusterId},` + - ` shard: ${params.routingInfo.shardId}` + `Getting peers for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}` ); const connectedPeers = await this.connectionManager.getConnectedPeers(); @@ -120,19 +117,13 @@ export class PeerManager { for (const peer of connectedPeers) { const hasProtocol = this.hasPeerProtocol(peer, params.protocol); - - const isOnSameShard = await this.connectionManager.isPeerOnShard( + const hasSamePubsub = await this.connectionManager.isPeerOnTopic( peer.id, - params.routingInfo.clusterId, - params.routingInfo.shardId + params.pubsubTopic ); - if (!isOnSameShard) { - continue; - } - const isPeerAvailableForUse = this.isPeerAvailableForUse(peer.id); - if (hasProtocol && isPeerAvailableForUse) { + if (hasProtocol && hasSamePubsub && isPeerAvailableForUse) { results.push(peer); log.info(`Peer ${peer.id} qualifies for protocol ${params.protocol}`); } @@ -177,7 +168,7 @@ export class PeerManager { public async renewPeer(id: PeerId, params: GetPeersParams): Promise { log.info( - `Renewing peer ${id} for protocol: ${params.protocol}, routingInfo: ${params.routingInfo}` + `Renewing peer ${id} for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}` ); const connectedPeers = await this.connectionManager.getConnectedPeers(); @@ -274,7 +265,7 @@ export class PeerManager { } const wasUnlocked = new Date(value).getTime(); - return Date.now() - wasUnlocked >= 10_000; + return Date.now() - wasUnlocked >= 10_000 ? true : false; } private dispatchFilterPeerConnect(id: PeerId): void { diff --git a/packages/sdk/src/store/store.spec.ts b/packages/sdk/src/store/store.spec.ts index 83ccb08436..025f2df425 100644 --- a/packages/sdk/src/store/store.spec.ts +++ b/packages/sdk/src/store/store.spec.ts @@ -1,12 +1,6 @@ import { StoreCore } from "@waku/core"; -import { - IDecodedMessage, - IDecoder, - IRoutingInfo, - Libp2p -} from "@waku/interfaces"; +import type { IDecodedMessage, IDecoder, Libp2p } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; -import { createRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import sinon from "sinon"; @@ -14,13 +8,6 @@ import { PeerManager } from "../peer_manager/index.js"; import { Store } from "./store.js"; -const TestNetworkingInfo = { clusterId: 0, numShardsInCluster: 8 }; -const MockRoutingInfo: IRoutingInfo = { - pubsubTopic: "/custom/topic", - shardId: 1, - clusterId: TestNetworkingInfo.clusterId -}; - describe("Store", () => { let store: Store; let mockLibp2p: Libp2p; @@ -74,11 +61,9 @@ describe("Store", () => { }); describe("queryGenerator", () => { - const contentTopic = "/test/1/test/proto"; - const routingInfo = createRoutingInfo(TestNetworkingInfo, { contentTopic }); const mockDecoder: IDecoder = { - routingInfo, - contentTopic, + pubsubTopic: "/waku/2/default-waku/proto", + contentTopic: "/test/1/test/proto", fromWireToProtoObj: sinon.stub(), fromProtoObj: sinon.stub() }; @@ -86,7 +71,7 @@ describe("Store", () => { const mockMessage: IDecodedMessage = { version: 1, pubsubTopic: "/waku/2/default-waku/proto", - contentTopic, + contentTopic: "/test/1/test/proto", payload: new Uint8Array([1, 2, 3]), timestamp: new Date(), rateLimitProof: undefined, @@ -113,7 +98,7 @@ describe("Store", () => { expect( mockPeerManager.getPeers.calledWith({ protocol: Protocols.Store, - routingInfo + pubsubTopic: "/waku/2/default-waku/proto" }) ).to.be.true; @@ -265,11 +250,9 @@ describe("Store", () => { mockPeerManager.getPeers.resolves([mockPeerId]); mockStoreCore.queryPerPage.returns(mockResponseGenerator); - const routingInfo: IRoutingInfo = structuredClone(MockRoutingInfo); - routingInfo.pubsubTopic = "/custom/topic"; const generator = store.queryGenerator([mockDecoder], { messageHashes: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])], - routingInfo + pubsubTopic: "/custom/topic" }); const results = []; diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 0d8e686d23..1297060cf2 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -5,7 +5,6 @@ import { messageHash, StoreCore } from "@waku/core"; import { IDecodedMessage, IDecoder, - type IRoutingInfo, IStore, Libp2p, Protocols, @@ -66,7 +65,7 @@ export class Store implements IStore { ); for (const queryOption of queryOptions) { - const peer = await this.getPeerToUse(queryOption.routingInfo); + const peer = await this.getPeerToUse(queryOption.pubsubTopic); if (!peer) { log.error("No peers available to query"); @@ -182,7 +181,7 @@ export class Store implements IStore { private validateDecodersAndPubsubTopic( decoders: IDecoder[] ): { - routingInfo: IRoutingInfo; + pubsubTopic: string; contentTopics: string[]; decodersAsMap: Map>; } { @@ -192,7 +191,7 @@ export class Store implements IStore { } const uniquePubsubTopicsInQuery = Array.from( - new Set(decoders.map((decoder) => decoder.routingInfo.pubsubTopic)) + new Set(decoders.map((decoder) => decoder.pubsubTopic)) ); if (uniquePubsubTopicsInQuery.length > 1) { log.error("API does not support querying multiple pubsub topics at once"); @@ -215,9 +214,7 @@ export class Store implements IStore { }); const contentTopics = decoders - .filter( - (decoder) => decoder.routingInfo.pubsubTopic === pubsubTopicForQuery - ) + .filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery) .map((dec) => dec.contentTopic); if (contentTopics.length === 0) { @@ -226,18 +223,16 @@ export class Store implements IStore { } return { - routingInfo: decoders[0].routingInfo, + pubsubTopic: pubsubTopicForQuery, contentTopics, decodersAsMap }; } - private async getPeerToUse( - routingInfo: IRoutingInfo - ): Promise { + private async getPeerToUse(pubsubTopic: string): Promise { const peers = await this.peerManager.getPeers({ protocol: Protocols.Store, - routingInfo + pubsubTopic }); return this.options.peers @@ -302,16 +297,15 @@ export class Store implements IStore { const isHashQuery = options?.messageHashes && options.messageHashes.length > 0; - let routingInfo: IRoutingInfo; + let pubsubTopic: string; let contentTopics: string[]; let decodersAsMap: Map>; if (isHashQuery) { // For hash queries, we still need decoders to decode messages - // but we don't validate routing info consistency - // Use routing info from options if provided, otherwise from first decoder - // Otherwise, throw - routingInfo = options?.routingInfo || decoders[0]?.routingInfo; + // but we don't validate pubsubTopic consistency + // Use pubsubTopic from options if provided, otherwise from first decoder + pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || ""; contentTopics = []; decodersAsMap = new Map(); decoders.forEach((dec) => { @@ -319,7 +313,7 @@ export class Store implements IStore { }); } else { const validated = this.validateDecodersAndPubsubTopic(decoders); - routingInfo = validated.routingInfo; + pubsubTopic = validated.pubsubTopic; contentTopics = validated.contentTopics; decodersAsMap = validated.decodersAsMap; } @@ -346,7 +340,7 @@ export class Store implements IStore { decodersAsMap, queryOptions: [ { - routingInfo, + pubsubTopic, contentTopics, includeData: true, paginationForward: true, @@ -361,7 +355,7 @@ export class Store implements IStore { return { decodersAsMap, queryOptions: subTimeRanges.map(([start, end]) => ({ - routingInfo, + pubsubTopic, contentTopics, includeData: true, paginationForward: true, diff --git a/packages/tests/tests/store/message_hash.spec.ts b/packages/tests/tests/store/message_hash.spec.ts index 906fc98ed8..59e93c6153 100644 --- a/packages/tests/tests/store/message_hash.spec.ts +++ b/packages/tests/tests/store/message_hash.spec.ts @@ -15,6 +15,7 @@ import { sendMessages, TestDecoder, TestNetworkConfig, + TestPubsubTopic, TestRoutingInfo, totalMsgs } from "./utils.js"; @@ -74,7 +75,7 @@ describe("Waku Store, message hash query", function () { const messages: IDecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder], { messageHashes, - routingInfo: TestRoutingInfo + pubsubTopic: TestPubsubTopic })) { for await (const msg of page) { messages.push(msg as IDecodedMessage); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 7d25243a9a..b13967f952 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -10,7 +10,6 @@ import { LightNode, type NetworkConfig, Protocols, - RelayShards, ShardId } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; @@ -34,10 +33,7 @@ export const TestRoutingInfo = createRoutingInfo(TestNetworkConfig, { contentTopic: TestContentTopic }); -export const TestRelayShards: RelayShards = { - clusterId: TestClusterId, - shards: [TestRoutingInfo.shardId] -}; +export const TestPubsubTopic = TestRoutingInfo.pubsubTopic; export const TestEncoder = createEncoder({ contentTopic: TestContentTopic,