revert most invasive changes

This commit is contained in:
fryorcraken 2025-07-19 21:25:21 +10:00
parent 97a26b2373
commit e5919a6bd9
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
26 changed files with 136 additions and 175 deletions

View File

@ -8,7 +8,8 @@ import {
type ThisOrThat
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isMessageSizeUnderCap, Logger } from "@waku/utils";
import { isMessageSizeUnderCap } from "@waku/utils";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
@ -62,10 +63,7 @@ export class LightPushCore {
};
}
const query = PushRpc.createRequest(
protoMessage,
encoder.routingInfo.pubsubTopic
);
const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
return { query, error: null };
} catch (error) {
log.error("Failed to prepare push message", error);

View File

@ -153,9 +153,7 @@ describe("Sets sharding configuration correctly", () => {
});
// When autosharding is enabled, we expect the shard index to be 1
expect(autoshardingEncoder.routingInfo.pubsubTopic).to.be.eq(
"/waku/2/rs/0/0"
);
expect(autoshardingEncoder.pubsubTopic).to.be.eq("/waku/2/rs/0/0");
// Create an encoder setup to use static sharding with the same content topic
const staticshardingEncoder = createEncoder({
@ -164,8 +162,6 @@ describe("Sets sharding configuration correctly", () => {
});
// When static sharding is enabled, we expect the shard index to be 0
expect(staticshardingEncoder.routingInfo.pubsubTopic).to.be.eq(
"/waku/2/rs/0/3"
);
expect(staticshardingEncoder.pubsubTopic).to.be.eq("/waku/2/rs/0/3");
});
});

View File

@ -7,7 +7,8 @@ import type {
IMetaSetter,
IProtoMessage,
IRateLimitProof,
IRoutingInfo
IRoutingInfo,
PubsubTopic
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import { Logger } from "@waku/utils";
@ -78,6 +79,10 @@ export class Encoder implements IEncoder {
}
}
public get pubsubTopic(): PubsubTopic {
return this.routingInfo.pubsubTopic;
}
public async toWire(message: IMessage): Promise<Uint8Array> {
return proto.WakuMessage.encode(await this.toProtoObj(message));
}
@ -133,6 +138,10 @@ export class Decoder implements IDecoder<IDecodedMessage> {
}
}
public get pubsubTopic(): PubsubTopic {
return this.routingInfo.pubsubTopic;
}
public fromWireToProtoObj(
bytes: Uint8Array
): Promise<IProtoMessage | undefined> {

View File

@ -1,17 +1,11 @@
import { createRoutingInfo } from "@waku/utils";
import { expect } from "chai";
import { StoreQueryRequest } from "./rpc.js";
const routingInfo = createRoutingInfo(
{ clusterId: 0 },
{ pubsubTopic: "/waku/2/rs/0/0" }
);
describe("StoreQueryRequest validation", () => {
it("accepts valid content-filtered query", () => {
const request = StoreQueryRequest.create({
routingInfo,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: ["/test/1/content/proto"],
includeData: true,
paginationForward: true
@ -22,7 +16,7 @@ describe("StoreQueryRequest validation", () => {
it("rejects content-filtered query with only pubsubTopic", () => {
expect(() =>
StoreQueryRequest.create({
routingInfo,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: [],
includeData: true,
paginationForward: true
@ -32,9 +26,22 @@ describe("StoreQueryRequest validation", () => {
);
});
it("rejects content-filtered query with only contentTopics", () => {
expect(() =>
StoreQueryRequest.create({
pubsubTopic: "",
contentTopics: ["/test/1/content/proto"],
includeData: true,
paginationForward: true
})
).to.throw(
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
);
});
it("accepts valid message hash query", () => {
const request = StoreQueryRequest.create({
routingInfo,
pubsubTopic: "",
contentTopics: [],
messageHashes: [new Uint8Array([1, 2, 3, 4])],
includeData: true,
@ -47,7 +54,7 @@ describe("StoreQueryRequest validation", () => {
expect(() =>
StoreQueryRequest.create({
messageHashes: [new Uint8Array([1, 2, 3, 4])],
routingInfo,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: ["/test/1/content/proto"],
includeData: true,
paginationForward: true
@ -60,7 +67,7 @@ describe("StoreQueryRequest validation", () => {
it("rejects hash query with time filter", () => {
expect(() =>
StoreQueryRequest.create({
routingInfo,
pubsubTopic: "",
contentTopics: [],
messageHashes: [new Uint8Array([1, 2, 3, 4])],
timeStart: new Date(),
@ -74,7 +81,7 @@ describe("StoreQueryRequest validation", () => {
it("accepts time-filtered query with content filter", () => {
const request = StoreQueryRequest.create({
routingInfo,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: ["/test/1/content/proto"],
timeStart: new Date(Date.now() - 3600000),
timeEnd: new Date(),

View File

@ -42,9 +42,9 @@ export class StoreQueryRequest {
}
} else {
if (
(params.routingInfo &&
(params.pubsubTopic &&
(!params.contentTopics || params.contentTopics.length === 0)) ||
(!params.routingInfo &&
(!params.pubsubTopic &&
params.contentTopics &&
params.contentTopics.length > 0)
) {

View File

@ -2,7 +2,6 @@ import type { PeerId } from "@libp2p/interface";
import {
IDecodedMessage,
IDecoder,
IRoutingInfo,
Libp2p,
QueryRequestParams
} from "@waku/interfaces";
@ -79,15 +78,9 @@ describe("StoreCore", () => {
let mockStoreQueryRequest: any;
let mockStoreQueryResponse: any;
const routingInfo: IRoutingInfo = {
pubsubTopic: "test-topic",
shardId: 1,
clusterId: 0
};
beforeEach(() => {
queryOpts = {
routingInfo,
pubsubTopic: "test-topic",
contentTopics: ["test-topic"],
paginationLimit: 10,
includeData: true,

View File

@ -76,7 +76,7 @@ export class StoreCore {
log.info("Sending store query request:", {
hasMessageHashes: !!queryOpts.messageHashes?.length,
messageHashCount: queryOpts.messageHashes?.length,
routingInfo: queryOpts.routingInfo,
pubsubTopic: queryOpts.pubsubTopic,
contentTopics: queryOpts.contentTopics
});

View File

@ -95,13 +95,14 @@ export interface IEncoder {
contentTopic: string;
ephemeral: boolean;
routingInfo: IRoutingInfo;
pubsubTopic: PubsubTopic;
toWire: (message: IMessage) => Promise<Uint8Array | undefined>;
toProtoObj: (message: IMessage) => Promise<IProtoMessage | undefined>;
}
export interface IDecoder<T extends IDecodedMessage> {
contentTopic: string;
routingInfo: IRoutingInfo;
pubsubTopic: PubsubTopic;
fromWireToProtoObj: (bytes: Uint8Array) => Promise<IProtoMessage | undefined>;
fromProtoObj: (
pubsubTopic: string,

View File

@ -1,5 +1,4 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import { IRoutingInfo } from "./sharding.js";
export type StoreCursor = Uint8Array;
@ -16,10 +15,10 @@ export type QueryRequestParams = {
includeData: boolean;
/**
* The routing information to query. This field is mandatory.
* The query will only return messages that were published on this specific route (cluster and shard).
* The pubsub topic to query. This field is mandatory.
* The query will only return messages that were published on this specific pubsub topic.
*/
routingInfo: IRoutingInfo;
pubsubTopic: string;
/**
* The content topics to filter the messages.

View File

@ -7,7 +7,8 @@ import {
type IMessage,
type IMetaSetter,
type IProtoMessage,
type IRoutingInfo
type IRoutingInfo,
type PubsubTopic
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
@ -46,6 +47,10 @@ class Encoder implements IEncoder {
}
}
public get pubsubTopic(): PubsubTopic {
return this.routingInfo.pubsubTopic;
}
public async toWire(message: IMessage): Promise<Uint8Array | undefined> {
const protoMessage = await this.toProtoObj(message);
if (!protoMessage) return;

View File

@ -7,7 +7,8 @@ import type {
IMessage,
IMetaSetter,
IProtoMessage,
IRoutingInfo
IRoutingInfo,
PubsubTopic
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { Logger } from "@waku/utils";
@ -46,6 +47,10 @@ class Encoder implements IEncoder {
}
}
public get pubsubTopic(): PubsubTopic {
return this.routingInfo.pubsubTopic;
}
public async toWire(message: IMessage): Promise<Uint8Array | undefined> {
const protoMessage = await this.toProtoObj(message);
if (!protoMessage) return;

View File

@ -128,7 +128,7 @@ export class Relay implements IRelay {
encoder: IEncoder,
message: IMessage
): Promise<SDKProtocolResult> {
const { pubsubTopic } = encoder.routingInfo;
const { pubsubTopic } = encoder;
if (!this.pubsubTopics.has(pubsubTopic)) {
log.error("Failed to send waku relay: topic not configured");
return {
@ -180,7 +180,7 @@ export class Relay implements IRelay {
const observers: Array<[PubsubTopic, Observer<T>]> = [];
for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) {
const { pubsubTopic } = decoder.routingInfo;
const { pubsubTopic } = decoder;
const ctObs: Map<ContentTopic, Set<Observer<T>>> = this.observers.get(
pubsubTopic
) ?? new Map();

View File

@ -2,7 +2,8 @@ import type {
IDecoder,
IProtoMessage,
IRoutingInfo,
ITopicOnlyMessage
ITopicOnlyMessage,
PubsubTopic
} from "@waku/interfaces";
import { TopicOnlyMessage as ProtoTopicOnlyMessage } from "@waku/proto";
@ -32,6 +33,10 @@ export class TopicOnlyMessage implements ITopicOnlyMessage {
export class ContentTopicOnlyDecoder implements IDecoder<ITopicOnlyMessage> {
public constructor() {}
public get pubsubTopic(): PubsubTopic {
throw "Pubsub Topic is not available on this decoder, it is only meant to decode the content topic for any message";
}
public get contentTopic(): string {
throw "ContentTopic is not available on this decoder, it is only meant to decode the content topic for any message";
}

View File

@ -5,7 +5,8 @@ import type {
IMessage,
IProtoMessage,
IRateLimitProof,
IRoutingInfo
IRoutingInfo,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -28,6 +29,10 @@ export class RLNEncoder implements IEncoder {
this.idSecretHash = identityCredential.IDSecretHash;
}
public get pubsubTopic(): PubsubTopic {
return this.encoder.pubsubTopic;
}
public async toWire(message: IMessage): Promise<Uint8Array | undefined> {
message.rateLimitProof = await this.generateProof(message);
log.info("Proof generated", message.rateLimitProof);
@ -93,8 +98,8 @@ export class RLNDecoder<T extends IDecodedMessage>
private readonly decoder: IDecoder<T>
) {}
public get routingInfo(): IRoutingInfo {
return this.decoder.routingInfo;
public get pubsubTopic(): PubsubTopic {
return this.decoder.pubsubTopic;
}
public get contentTopic(): string {

View File

@ -63,21 +63,21 @@ export class Filter implements IFilter {
throw Error("Cannot subscribe with 0 decoders.");
}
const routingInfos = decoders.map((v) => v.routingInfo);
const routingInfo = routingInfos[0];
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const singlePubsubTopic = pubsubTopics[0];
const contentTopics = decoders.map((v) => v.contentTopic);
log.info(
`Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${routingInfo.pubsubTopic}`
`Subscribing to contentTopics: ${contentTopics}, pubsubTopic: ${singlePubsubTopic}`
);
this.throwIfTopicNotSame(routingInfos.map((r) => r.pubsubTopic));
this.throwIfTopicNotSame(pubsubTopics);
let subscription = this.subscriptions.get(routingInfo.pubsubTopic);
let subscription = this.subscriptions.get(singlePubsubTopic);
if (!subscription) {
subscription = new Subscription({
routingInfo: routingInfo,
pubsubTopic: singlePubsubTopic,
protocol: this.protocol,
config: this.config,
peerManager: this.peerManager
@ -86,7 +86,7 @@ export class Filter implements IFilter {
}
const result = await subscription.add(decoders, callback);
this.subscriptions.set(routingInfo.pubsubTopic, subscription);
this.subscriptions.set(singlePubsubTopic, subscription);
log.info(
`Subscription ${result ? "successful" : "failed"} for content topic: ${contentTopics}`
@ -104,7 +104,7 @@ export class Filter implements IFilter {
throw Error("Cannot unsubscribe with 0 decoders.");
}
const pubsubTopics = decoders.map((v) => v.routingInfo.pubsubTopic);
const pubsubTopics = decoders.map((v) => v.pubsubTopic);
const singlePubsubTopic = pubsubTopics[0];
const contentTopics = decoders.map((v) => v.contentTopic);

View File

@ -1,12 +1,10 @@
import { FilterCore } from "@waku/core";
import type {
AutoSharding,
FilterProtocolOptions,
IDecodedMessage,
IDecoder
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { createRoutingInfo } from "@waku/utils";
import { expect } from "chai";
import sinon from "sinon";
@ -16,13 +14,7 @@ import { Subscription } from "./subscription.js";
const PUBSUB_TOPIC = "/waku/2/rs/1/4";
const CONTENT_TOPIC = "/test/1/waku-filter/utf8";
const NETWORK_CONFIG: AutoSharding = {
clusterId: 2,
numShardsInCluster: 3
};
const ROUTING_INFO = createRoutingInfo(NETWORK_CONFIG, {
contentTopic: CONTENT_TOPIC
});
describe("Filter Subscription", () => {
let filterCore: FilterCore;
let peerManager: PeerManager;
@ -40,7 +32,7 @@ describe("Filter Subscription", () => {
};
subscription = new Subscription({
routingInfo: ROUTING_INFO,
pubsubTopic: PUBSUB_TOPIC,
protocol: filterCore,
config,
peerManager
@ -87,11 +79,9 @@ describe("Filter Subscription", () => {
});
it("should invoke callbacks when receiving a message", async () => {
const testContentTopic = "/custom/0/content/proto";
const testContentTopic = "/custom/content/topic";
const testDecoder = {
routingInfo: createRoutingInfo(NETWORK_CONFIG, {
contentTopic: testContentTopic
}),
pubsubTopic: PUBSUB_TOPIC,
contentTopic: testContentTopic,
fromProtoObj: sinon.stub().callsFake(() => {
return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) });
@ -116,11 +106,9 @@ describe("Filter Subscription", () => {
});
it("should invoke callbacks only when newly receiving message is given", async () => {
const testContentTopic = "/custom/0/content/topic";
const testContentTopic = "/custom/content/topic";
const testDecoder = {
routingInfo: createRoutingInfo(NETWORK_CONFIG, {
contentTopic: testContentTopic
}),
pubsubTopic: PUBSUB_TOPIC,
contentTopic: testContentTopic,
fromProtoObj: sinon.stub().callsFake(() => {
return Promise.resolve({ payload: new Uint8Array([1, 2, 3]) });

View File

@ -10,9 +10,7 @@ import type {
IDecodedMessage,
IDecoder,
IProtoMessage,
IRoutingInfo,
PeerIdStr,
PubsubTopic
PeerIdStr
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
@ -37,8 +35,7 @@ type AttemptUnsubscribeParams = {
type Libp2pEventHandler = (e: CustomEvent<PeerId>) => void;
export class Subscription {
private readonly routingInfo: IRoutingInfo;
private readonly pubsubTopic: PubsubTopic;
private readonly pubsubTopic: string;
private readonly protocol: FilterCore;
private readonly peerManager: PeerManager;
@ -76,8 +73,7 @@ export class Subscription {
public constructor(params: SubscriptionParams) {
this.config = params.config;
this.routingInfo = params.routingInfo;
this.pubsubTopic = params.routingInfo.pubsubTopic;
this.pubsubTopic = params.pubsubTopic;
this.protocol = params.protocol;
this.peerManager = params.peerManager;
@ -197,7 +193,7 @@ export class Subscription {
if (this.callbacks.has(decoder)) {
log.warn(
`Replacing callback associated associated with decoder with pubsubTopic:${decoder.routingInfo.pubsubTopic} and contentTopic:${decoder.contentTopic}`
`Replacing callback associated associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
const callback = this.callbacks.get(decoder);
@ -209,7 +205,7 @@ export class Subscription {
void (async (): Promise<void> => {
try {
const message = await decoder.fromProtoObj(
decoder.routingInfo.pubsubTopic,
decoder.pubsubTopic,
event.detail as IProtoMessage
);
void callback(message!);
@ -234,7 +230,7 @@ export class Subscription {
if (!callback) {
log.warn(
`No callback associated with decoder with pubsubTopic:${decoder.routingInfo.pubsubTopic} and contentTopic:${decoder.contentTopic}`
`No callback associated with decoder with pubsubTopic:${decoder.pubsubTopic} and contentTopic:${decoder.contentTopic}`
);
}
@ -417,13 +413,11 @@ export class Subscription {
const usablePeer = await this.peerManager.isPeerOnPubsub(
event.detail,
this.routingInfo.pubsubTopic
this.pubsubTopic
);
if (!usablePeer) {
log.info(
`Peer ${id} doesn't support pubsubTopic:${this.routingInfo.pubsubTopic}`
);
log.info(`Peer ${id} doesn't support pubsubTopic:${this.pubsubTopic}`);
return;
}
@ -489,7 +483,7 @@ export class Subscription {
const prevPeers = new Set<PeerIdStr>(this.peers.keys());
const peersToAdd = await this.peerManager.getPeers({
protocol: Protocols.Filter,
routingInfo: this.routingInfo
pubsubTopic: this.pubsubTopic
});
for (const peer of peersToAdd) {

View File

@ -1,9 +1,5 @@
import type { FilterCore } from "@waku/core";
import type {
FilterProtocolOptions,
IRoutingInfo,
Libp2p
} from "@waku/interfaces";
import type { FilterProtocolOptions, Libp2p } from "@waku/interfaces";
import type { WakuMessage } from "@waku/proto";
import type { PeerManager } from "../peer_manager/index.js";
@ -19,7 +15,7 @@ export type SubscriptionEvents = {
};
export type SubscriptionParams = {
routingInfo: IRoutingInfo;
pubsubTopic: string;
protocol: FilterCore;
config: FilterProtocolOptions;
peerManager: PeerManager;

View File

@ -77,13 +77,13 @@ export class LightPush implements ILightPush {
...options
};
const { pubsubTopic } = encoder.routingInfo;
const { pubsubTopic } = encoder;
log.info("send: attempting to send a message to pubsubTopic:", pubsubTopic);
const peerIds = await this.peerManager.getPeers({
protocol: Protocols.LightPush,
routingInfo: encoder.routingInfo
pubsubTopic: encoder.pubsubTopic
});
const coreResults: CoreProtocolResult[] =

View File

@ -100,7 +100,7 @@ export class RetryManager {
const peerId = (
await this.peerManager.getPeers({
protocol: Protocols.LightPush,
routingInfo: task.routingInfo
pubsubTopic: task.routingInfo.pubsubTopic
})
)[0];
@ -146,7 +146,7 @@ export class RetryManager {
if (shouldPeerBeChanged(error.message)) {
await this.peerManager.renewPeer(peerId, {
protocol: Protocols.LightPush,
routingInfo: task.routingInfo
pubsubTopic: task.routingInfo.pubsubTopic
});
}

View File

@ -5,7 +5,6 @@ import {
Libp2p,
Protocols
} from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { expect } from "chai";
import sinon from "sinon";
@ -18,12 +17,8 @@ describe("PeerManager", () => {
let peers: any[];
let mockConnections: any[];
const TEST_PUBSUB_TOPIC = "/waku/2/rs/0/0";
const TEST_PUBSUB_TOPIC = "/test/1/waku-light-push/utf8";
const TEST_PROTOCOL = Protocols.LightPush;
const TEST_ROUTING_INFO = createRoutingInfo(
{ clusterId: 0 },
{ pubsubTopic: TEST_PUBSUB_TOPIC }
);
const clearPeerState = (): void => {
(peerManager as any).lockedPeers.clear();
@ -41,7 +36,7 @@ describe("PeerManager", () => {
const getPeersForTest = async (): Promise<PeerId[]> => {
return await peerManager.getPeers({
protocol: TEST_PROTOCOL,
routingInfo: TEST_ROUTING_INFO
pubsubTopic: TEST_PUBSUB_TOPIC
});
};
@ -131,7 +126,7 @@ describe("PeerManager", () => {
const peerId = ids[0];
await peerManager.renewPeer(peerId, {
protocol: TEST_PROTOCOL,
routingInfo: TEST_ROUTING_INFO
pubsubTopic: TEST_PUBSUB_TOPIC
});
expect((peerManager as any).lockedPeers.has(peerId.toString())).to.be.false;
expect((peerManager as any).unlockedPeers.has(peerId.toString())).to.be
@ -229,7 +224,7 @@ describe("PeerManager", () => {
if (skipIfNoPeers(first)) return;
await peerManager.renewPeer(first[0], {
protocol: TEST_PROTOCOL,
routingInfo: TEST_ROUTING_INFO
pubsubTopic: TEST_PUBSUB_TOPIC
});
const second = await getPeersForTest();
if (skipIfNoPeers(second)) return;
@ -243,7 +238,7 @@ describe("PeerManager", () => {
} as any;
await peerManager.renewPeer(fakePeerId, {
protocol: TEST_PROTOCOL,
routingInfo: TEST_ROUTING_INFO
pubsubTopic: TEST_PUBSUB_TOPIC
});
expect(true).to.be.true;
});
@ -268,7 +263,7 @@ describe("PeerManager", () => {
const peerId = result[0];
await peerManager.renewPeer(peerId, {
protocol: TEST_PROTOCOL,
routingInfo: TEST_ROUTING_INFO
pubsubTopic: TEST_PUBSUB_TOPIC
});
const connection = mockConnections.find((c) => c.remotePeer.equals(peerId));

View File

@ -12,7 +12,6 @@ import {
} from "@waku/core";
import {
CONNECTION_LOCKED_TAG,
type IRoutingInfo,
Libp2p,
Libp2pEventHandler,
Protocols
@ -35,7 +34,7 @@ type PeerManagerParams = {
type GetPeersParams = {
protocol: Protocols;
routingInfo: IRoutingInfo;
pubsubTopic: string;
};
export enum PeerManagerEventNames {
@ -108,9 +107,7 @@ export class PeerManager {
public async getPeers(params: GetPeersParams): Promise<PeerId[]> {
log.info(
`Getting peers for protocol: ${params.protocol}, ` +
`clusterId: ${params.routingInfo.clusterId},` +
` shard: ${params.routingInfo.shardId}`
`Getting peers for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}`
);
const connectedPeers = await this.connectionManager.getConnectedPeers();
@ -120,19 +117,13 @@ export class PeerManager {
for (const peer of connectedPeers) {
const hasProtocol = this.hasPeerProtocol(peer, params.protocol);
const isOnSameShard = await this.connectionManager.isPeerOnShard(
const hasSamePubsub = await this.connectionManager.isPeerOnTopic(
peer.id,
params.routingInfo.clusterId,
params.routingInfo.shardId
params.pubsubTopic
);
if (!isOnSameShard) {
continue;
}
const isPeerAvailableForUse = this.isPeerAvailableForUse(peer.id);
if (hasProtocol && isPeerAvailableForUse) {
if (hasProtocol && hasSamePubsub && isPeerAvailableForUse) {
results.push(peer);
log.info(`Peer ${peer.id} qualifies for protocol ${params.protocol}`);
}
@ -177,7 +168,7 @@ export class PeerManager {
public async renewPeer(id: PeerId, params: GetPeersParams): Promise<void> {
log.info(
`Renewing peer ${id} for protocol: ${params.protocol}, routingInfo: ${params.routingInfo}`
`Renewing peer ${id} for protocol: ${params.protocol}, pubsubTopic: ${params.pubsubTopic}`
);
const connectedPeers = await this.connectionManager.getConnectedPeers();
@ -274,7 +265,7 @@ export class PeerManager {
}
const wasUnlocked = new Date(value).getTime();
return Date.now() - wasUnlocked >= 10_000;
return Date.now() - wasUnlocked >= 10_000 ? true : false;
}
private dispatchFilterPeerConnect(id: PeerId): void {

View File

@ -1,12 +1,6 @@
import { StoreCore } from "@waku/core";
import {
IDecodedMessage,
IDecoder,
IRoutingInfo,
Libp2p
} from "@waku/interfaces";
import type { IDecodedMessage, IDecoder, Libp2p } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { expect } from "chai";
import sinon from "sinon";
@ -14,13 +8,6 @@ import { PeerManager } from "../peer_manager/index.js";
import { Store } from "./store.js";
const TestNetworkingInfo = { clusterId: 0, numShardsInCluster: 8 };
const MockRoutingInfo: IRoutingInfo = {
pubsubTopic: "/custom/topic",
shardId: 1,
clusterId: TestNetworkingInfo.clusterId
};
describe("Store", () => {
let store: Store;
let mockLibp2p: Libp2p;
@ -74,11 +61,9 @@ describe("Store", () => {
});
describe("queryGenerator", () => {
const contentTopic = "/test/1/test/proto";
const routingInfo = createRoutingInfo(TestNetworkingInfo, { contentTopic });
const mockDecoder: IDecoder<IDecodedMessage> = {
routingInfo,
contentTopic,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopic: "/test/1/test/proto",
fromWireToProtoObj: sinon.stub(),
fromProtoObj: sinon.stub()
};
@ -86,7 +71,7 @@ describe("Store", () => {
const mockMessage: IDecodedMessage = {
version: 1,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopic,
contentTopic: "/test/1/test/proto",
payload: new Uint8Array([1, 2, 3]),
timestamp: new Date(),
rateLimitProof: undefined,
@ -113,7 +98,7 @@ describe("Store", () => {
expect(
mockPeerManager.getPeers.calledWith({
protocol: Protocols.Store,
routingInfo
pubsubTopic: "/waku/2/default-waku/proto"
})
).to.be.true;
@ -265,11 +250,9 @@ describe("Store", () => {
mockPeerManager.getPeers.resolves([mockPeerId]);
mockStoreCore.queryPerPage.returns(mockResponseGenerator);
const routingInfo: IRoutingInfo = structuredClone(MockRoutingInfo);
routingInfo.pubsubTopic = "/custom/topic";
const generator = store.queryGenerator([mockDecoder], {
messageHashes: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
routingInfo
pubsubTopic: "/custom/topic"
});
const results = [];

View File

@ -5,7 +5,6 @@ import { messageHash, StoreCore } from "@waku/core";
import {
IDecodedMessage,
IDecoder,
type IRoutingInfo,
IStore,
Libp2p,
Protocols,
@ -66,7 +65,7 @@ export class Store implements IStore {
);
for (const queryOption of queryOptions) {
const peer = await this.getPeerToUse(queryOption.routingInfo);
const peer = await this.getPeerToUse(queryOption.pubsubTopic);
if (!peer) {
log.error("No peers available to query");
@ -182,7 +181,7 @@ export class Store implements IStore {
private validateDecodersAndPubsubTopic<T extends IDecodedMessage>(
decoders: IDecoder<T>[]
): {
routingInfo: IRoutingInfo;
pubsubTopic: string;
contentTopics: string[];
decodersAsMap: Map<string, IDecoder<T>>;
} {
@ -192,7 +191,7 @@ export class Store implements IStore {
}
const uniquePubsubTopicsInQuery = Array.from(
new Set(decoders.map((decoder) => decoder.routingInfo.pubsubTopic))
new Set(decoders.map((decoder) => decoder.pubsubTopic))
);
if (uniquePubsubTopicsInQuery.length > 1) {
log.error("API does not support querying multiple pubsub topics at once");
@ -215,9 +214,7 @@ export class Store implements IStore {
});
const contentTopics = decoders
.filter(
(decoder) => decoder.routingInfo.pubsubTopic === pubsubTopicForQuery
)
.filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery)
.map((dec) => dec.contentTopic);
if (contentTopics.length === 0) {
@ -226,18 +223,16 @@ export class Store implements IStore {
}
return {
routingInfo: decoders[0].routingInfo,
pubsubTopic: pubsubTopicForQuery,
contentTopics,
decodersAsMap
};
}
private async getPeerToUse(
routingInfo: IRoutingInfo
): Promise<PeerId | undefined> {
private async getPeerToUse(pubsubTopic: string): Promise<PeerId | undefined> {
const peers = await this.peerManager.getPeers({
protocol: Protocols.Store,
routingInfo
pubsubTopic
});
return this.options.peers
@ -302,16 +297,15 @@ export class Store implements IStore {
const isHashQuery =
options?.messageHashes && options.messageHashes.length > 0;
let routingInfo: IRoutingInfo;
let pubsubTopic: string;
let contentTopics: string[];
let decodersAsMap: Map<string, IDecoder<T>>;
if (isHashQuery) {
// For hash queries, we still need decoders to decode messages
// but we don't validate routing info consistency
// Use routing info from options if provided, otherwise from first decoder
// Otherwise, throw
routingInfo = options?.routingInfo || decoders[0]?.routingInfo;
// but we don't validate pubsubTopic consistency
// Use pubsubTopic from options if provided, otherwise from first decoder
pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || "";
contentTopics = [];
decodersAsMap = new Map();
decoders.forEach((dec) => {
@ -319,7 +313,7 @@ export class Store implements IStore {
});
} else {
const validated = this.validateDecodersAndPubsubTopic(decoders);
routingInfo = validated.routingInfo;
pubsubTopic = validated.pubsubTopic;
contentTopics = validated.contentTopics;
decodersAsMap = validated.decodersAsMap;
}
@ -346,7 +340,7 @@ export class Store implements IStore {
decodersAsMap,
queryOptions: [
{
routingInfo,
pubsubTopic,
contentTopics,
includeData: true,
paginationForward: true,
@ -361,7 +355,7 @@ export class Store implements IStore {
return {
decodersAsMap,
queryOptions: subTimeRanges.map(([start, end]) => ({
routingInfo,
pubsubTopic,
contentTopics,
includeData: true,
paginationForward: true,

View File

@ -15,6 +15,7 @@ import {
sendMessages,
TestDecoder,
TestNetworkConfig,
TestPubsubTopic,
TestRoutingInfo,
totalMsgs
} from "./utils.js";
@ -74,7 +75,7 @@ describe("Waku Store, message hash query", function () {
const messages: IDecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
messageHashes,
routingInfo: TestRoutingInfo
pubsubTopic: TestPubsubTopic
})) {
for await (const msg of page) {
messages.push(msg as IDecodedMessage);

View File

@ -10,7 +10,6 @@ import {
LightNode,
type NetworkConfig,
Protocols,
RelayShards,
ShardId
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
@ -34,10 +33,7 @@ export const TestRoutingInfo = createRoutingInfo(TestNetworkConfig, {
contentTopic: TestContentTopic
});
export const TestRelayShards: RelayShards = {
clusterId: TestClusterId,
shards: [TestRoutingInfo.shardId]
};
export const TestPubsubTopic = TestRoutingInfo.pubsubTopic;
export const TestEncoder = createEncoder({
contentTopic: TestContentTopic,