chore!: remove IBaseProtocol and improve interface on PeerExchange (#2422)

* remove IBaseProtocol

* fix references, interfaces and integration

* fix ci

* up mock

* up lock

* add mock for local storage

* add missing prop, fix tests

* up lock
This commit is contained in:
Sasha 2025-06-20 12:53:42 +02:00 committed by GitHub
parent a0fc9e05d4
commit 7c8d1073b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 167 additions and 259 deletions

40
package-lock.json generated
View File

@ -11068,16 +11068,6 @@
"@types/node": "*"
}
},
"node_modules/@types/node-localstorage": {
"version": "1.3.3",
"resolved": "https://registry.npmjs.org/@types/node-localstorage/-/node-localstorage-1.3.3.tgz",
"integrity": "sha512-Wkn5g4eM5x10UNV9Xvl9K6y6m0zorocuJy4WjB5muUdyMZuPbZpSJG3hlhjGHe1HGxbOQO7RcB+jlHcNwkh+Jw==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/node": "*"
}
},
"node_modules/@types/normalize-package-data": {
"version": "2.4.4",
"resolved": "https://registry.npmjs.org/@types/normalize-package-data/-/normalize-package-data-2.4.4.tgz",
@ -28711,19 +28701,6 @@
"license": "MIT",
"peer": true
},
"node_modules/node-localstorage": {
"version": "3.0.5",
"resolved": "https://registry.npmjs.org/node-localstorage/-/node-localstorage-3.0.5.tgz",
"integrity": "sha512-GCwtK33iwVXboZWYcqQHu3aRvXEBwmPkAMRBLeaX86ufhqslyUkLGsi4aW3INEfdQYpUB5M9qtYf3eHvAk2VBg==",
"dev": true,
"license": "MIT",
"dependencies": {
"write-file-atomic": "^5.0.1"
},
"engines": {
"node": ">=0.12"
}
},
"node_modules/node-polyfill-webpack-plugin": {
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/node-polyfill-webpack-plugin/-/node-polyfill-webpack-plugin-4.1.0.tgz",
@ -42890,20 +42867,6 @@
"integrity": "sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==",
"license": "ISC"
},
"node_modules/write-file-atomic": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/write-file-atomic/-/write-file-atomic-5.0.1.tgz",
"integrity": "sha512-+QU2zd6OTD8XWIJCbffaiQeH9U73qIqafo1x6V1snCWYGJf6cVE0cDR4D8xRzcEnfI21IFrUPzPGtcPf8AC+Rw==",
"dev": true,
"license": "ISC",
"dependencies": {
"imurmurhash": "^0.1.4",
"signal-exit": "^4.0.1"
},
"engines": {
"node": "^14.17.0 || ^16.13.0 || >=18.0.0"
}
},
"node_modules/ws": {
"version": "8.18.2",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.18.2.tgz",
@ -43534,13 +43497,11 @@
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
"@types/chai": "^4.3.11",
"@types/node-localstorage": "^1.3.3",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"chai-as-promised": "^7.1.1",
"cspell": "^8.6.1",
"mocha": "^10.3.0",
"node-localstorage": "^3.0.5",
"npm-run-all": "^4.1.5",
"rollup": "^4.12.0",
"sinon": "^18.0.0"
@ -43622,6 +43583,7 @@
"@babel/preset-env": "^7.24.0",
"@babel/preset-typescript": "^7.23.3",
"babel-loader": "^9.1.3",
"filter-obj": "^2.0.2",
"node-polyfill-webpack-plugin": "^2.0.1",
"serve": "^14.1.2",
"webpack": "^5.99.5",

View File

@ -12,10 +12,6 @@
"./lib/message/version_0": {
"types": "./dist/lib/message/version_0.d.ts",
"import": "./dist/lib/message/version_0.js"
},
"./lib/base_protocol": {
"types": "./dist/lib/base_protocol.d.ts",
"import": "./dist/lib/base_protocol.js"
}
},
"typesVersions": {

View File

@ -1,44 +0,0 @@
import type { Libp2p } from "@libp2p/interface";
import type { PeerId, Stream } from "@libp2p/interface";
import type {
IBaseProtocolCore,
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { StreamManager } from "./stream_manager/index.js";
/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
*/
export class BaseProtocol implements IBaseProtocolCore {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;
protected constructor(
public multicodec: string,
protected components: Libp2pComponents,
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
components.events
);
this.streamManager = new StreamManager(
multicodec,
components.connectionManager.getConnections.bind(
components.connectionManager
),
this.addLibp2pEventListener
);
}
protected async getStream(peerId: PeerId): Promise<Stream> {
return this.streamManager.getStream(peerId);
}
}

View File

@ -3,7 +3,6 @@ import type { IncomingStreamData } from "@libp2p/interface-internal";
import {
type ContentTopic,
type CoreProtocolResult,
type IBaseProtocolCore,
type Libp2p,
ProtocolError,
type PubsubTopic
@ -15,7 +14,7 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { StreamManager } from "../stream_manager/index.js";
import {
FilterPushRpc,
@ -36,15 +35,21 @@ type IncomingMessageHandler = (
peerIdStr: string
) => Promise<void>;
export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
export class FilterCore {
private streamManager: StreamManager;
private static handleIncomingMessage?: IncomingMessageHandler;
public readonly multicodec = FilterCodecs.SUBSCRIBE;
public constructor(
handleIncomingMessage: IncomingMessageHandler,
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics);
this.streamManager = new StreamManager(
FilterCodecs.SUBSCRIBE,
libp2p.components
);
// TODO(weboko): remove when @waku/sdk 0.0.33 is released
const prevHandler = FilterCore.handleIncomingMessage;
@ -83,7 +88,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
peerId: PeerId,
contentTopics: ContentTopic[]
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peerId);
const stream = await this.streamManager.getStream(peerId);
const request = FilterSubscribeRpc.createSubscribeRequest(
pubsubTopic,
@ -139,7 +144,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peerId);
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peerId.toString()}`,
@ -182,7 +187,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
pubsubTopic: PubsubTopic,
peerId: PeerId
): Promise<CoreProtocolResult> {
const stream = await this.getStream(peerId);
const stream = await this.streamManager.getStream(peerId);
const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);
@ -229,7 +234,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
public async ping(peerId: PeerId): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peerId);
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error(
`Failed to get a stream for remote peer${peerId.toString()}`,

View File

@ -1,7 +1,6 @@
import type { PeerId, Stream } from "@libp2p/interface";
import {
type CoreProtocolResult,
type IBaseProtocolCore,
type IEncoder,
type IMessage,
type Libp2p,
@ -17,7 +16,7 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { StreamManager } from "../stream_manager/index.js";
import { PushRpc } from "./push_rpc.js";
import { isRLNResponseError } from "./utils.js";
@ -32,12 +31,16 @@ type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
export class LightPushCore {
private readonly streamManager: StreamManager;
public readonly multicodec = LightPushCodec;
public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(LightPushCodec, libp2p.components, pubsubTopics);
this.streamManager = new StreamManager(LightPushCodec, libp2p.components);
}
private async preparePushMessage(
@ -98,7 +101,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
let stream: Stream;
try {
stream = await this.getStream(peerId);
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
return {

View File

@ -16,21 +16,24 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { StreamManager } from "../stream_manager/index.js";
const log = new Logger("metadata");
export const MetadataCodec = "/vac/waku/metadata/1.0.0";
class Metadata extends BaseProtocol implements IMetadata {
private libp2pComponents: Libp2pComponents;
class Metadata implements IMetadata {
private readonly streamManager: StreamManager;
private readonly libp2pComponents: Libp2pComponents;
protected handshakesConfirmed: Map<PeerIdStr, ShardInfo> = new Map();
public readonly multicodec = MetadataCodec;
public constructor(
public pubsubTopics: PubsubTopic[],
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, pubsubTopics);
this.streamManager = new StreamManager(MetadataCodec, libp2p);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
@ -55,7 +58,7 @@ class Metadata extends BaseProtocol implements IMetadata {
let stream;
try {
stream = await this.getStream(peerId);
stream = await this.streamManager.getStream(peerId);
} catch (error) {
log.error("Failed to get stream", error);
return {

View File

@ -2,7 +2,6 @@ import type { PeerId } from "@libp2p/interface";
import {
IDecodedMessage,
IDecoder,
IStoreCore,
Libp2p,
PubsubTopic,
QueryRequestParams
@ -13,7 +12,7 @@ import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { StreamManager } from "../stream_manager/index.js";
import { toProtoMessage } from "../to_proto_message.js";
import {
@ -27,12 +26,16 @@ const log = new Logger("store");
export const StoreCodec = "/vac/waku/store-query/3.0.0";
export class StoreCore extends BaseProtocol implements IStoreCore {
export class StoreCore {
private readonly streamManager: StreamManager;
public readonly multicodec = StoreCodec;
public constructor(
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(StoreCodec, libp2p.components, pubsubTopics);
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
}
public async *queryPerPage<T extends IDecodedMessage>(
@ -70,7 +73,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
let stream;
try {
stream = await this.getStream(peerId);
stream = await this.streamManager.getStream(peerId);
} catch (e) {
log.error("Failed to get stream", e);
break;

View File

@ -1,4 +1,5 @@
import { Connection, Peer, PeerId, Stream } from "@libp2p/interface";
import { Libp2pComponents } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
@ -20,11 +21,10 @@ describe("StreamManager", () => {
beforeEach(() => {
eventTarget = new EventTarget();
streamManager = new StreamManager(
MULTICODEC,
() => [],
eventTarget.addEventListener.bind(eventTarget)
);
streamManager = new StreamManager(MULTICODEC, {
connectionManager: { getConnections: () => [] },
events: eventTarget
} as any as Libp2pComponents);
});
it("should return usable stream attached to connection", async () => {
@ -34,7 +34,9 @@ describe("StreamManager", () => {
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
streamManager["libp2p"]["connectionManager"]["getConnections"] = (
_peerId: PeerId | undefined
) => [con1];
const stream = await streamManager.getStream(mockPeer.id);
@ -44,7 +46,9 @@ describe("StreamManager", () => {
});
it("should throw if no connection provided", async () => {
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [];
streamManager["libp2p"]["connectionManager"]["getConnections"] = (
_peerId: PeerId | undefined
) => [];
let error: Error | undefined;
try {
@ -74,7 +78,9 @@ describe("StreamManager", () => {
);
con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
streamManager["libp2p"]["connectionManager"]["getConnections"] = (
_peerId: PeerId | undefined
) => [con1];
const stream = await streamManager.getStream(mockPeer.id);
@ -99,7 +105,9 @@ describe("StreamManager", () => {
);
con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];
streamManager["libp2p"]["connectionManager"]["getConnections"] = (
_peerId: PeerId | undefined
) => [con1];
const [stream1, stream2] = await Promise.all([
streamManager.getStream(mockPeer.id),
@ -143,7 +151,9 @@ describe("StreamManager", () => {
writeStatus: "writable"
})
];
streamManager["getConnections"] = (_id) => [con1];
streamManager["libp2p"]["connectionManager"]["getConnections"] = (
_id: PeerId | undefined
) => [con1];
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;

View File

@ -1,5 +1,5 @@
import type { Peer, PeerId, PeerUpdate, Stream } from "@libp2p/interface";
import type { Libp2p } from "@waku/interfaces";
import type { Libp2pComponents } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { selectOpenConnection } from "./utils.js";
@ -14,11 +14,13 @@ export class StreamManager {
public constructor(
private multicodec: string,
private getConnections: Libp2p["getConnections"],
private addEventListener: Libp2p["addEventListener"]
private readonly libp2p: Libp2pComponents
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
this.libp2p.events.addEventListener(
"peer:update",
this.handlePeerUpdateStreamPool
);
}
public async getStream(peerId: PeerId): Promise<Stream> {
@ -47,7 +49,7 @@ export class StreamManager {
}
private async createStream(peerId: PeerId, retries = 0): Promise<Stream> {
const connections = this.getConnections(peerId);
const connections = this.libp2p.connectionManager.getConnections(peerId);
const connection = selectOpenConnection(connections);
if (!connection) {
@ -135,7 +137,7 @@ export class StreamManager {
}
private getOpenStreamForCodec(peerId: PeerId): Stream | undefined {
const connections = this.getConnections(peerId);
const connections = this.libp2p.connectionManager.getConnections(peerId);
const connection = selectOpenConnection(connections);
if (!connection) {

View File

@ -69,13 +69,11 @@
"@rollup/plugin-json": "^6.0.0",
"@rollup/plugin-node-resolve": "^15.2.3",
"@types/chai": "^4.3.11",
"@types/node-localstorage": "^1.3.3",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"chai-as-promised": "^7.1.1",
"cspell": "^8.6.1",
"mocha": "^10.3.0",
"node-localstorage": "^3.0.5",
"npm-run-all": "^4.1.5",
"rollup": "^4.12.0",
"sinon": "^18.0.0"

View File

@ -17,11 +17,23 @@ import { LocalPeerCacheDiscovery } from "./index.js";
chai.use(chaiAsPromised);
// dynamically importing the local storage polyfill for node
if (typeof window === "undefined") {
try {
const { LocalStorage } = await import("node-localstorage");
global.localStorage = new LocalStorage("./scratch");
global.localStorage = {
store: {} as Record<string, string>,
getItem(key: string) {
return this.store[key] || null;
},
setItem(key: string, value: string) {
this.store[key] = value;
},
removeItem(key: string) {
delete this.store[key];
},
clear() {
this.store = {};
}
} as any;
} catch (error) {
console.error("Failed to load localStorage polyfill:", error);
}

View File

@ -1,12 +1,11 @@
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { StreamManager } from "@waku/core";
import { EnrDecoder } from "@waku/enr";
import {
IPeerExchange,
Libp2pComponents,
PeerExchangeQueryParams,
PeerExchangeQueryResult,
ProtocolError,
PubsubTopic
ProtocolError
} from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import { Logger } from "@waku/utils";
@ -24,15 +23,14 @@ const log = new Logger("peer-exchange");
/**
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/
export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
export class WakuPeerExchange implements IPeerExchange {
private readonly streamManager: StreamManager;
/**
* @param components - libp2p components
*/
public constructor(
components: Libp2pComponents,
pubsubTopics: PubsubTopic[]
) {
super(PeerExchangeCodec, components, pubsubTopics);
public constructor(private readonly components: Libp2pComponents) {
this.streamManager = new StreamManager(PeerExchangeCodec, components);
}
/**
@ -57,7 +55,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
let stream;
try {
stream = await this.getStream(peerId);
stream = await this.streamManager.getStream(peerId);
} catch (err) {
log.error("Failed to get stream", err);
return {
@ -118,9 +116,8 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
*
* @returns A function that creates a new peer exchange protocol
*/
export function wakuPeerExchange(
pubsubTopics: PubsubTopic[]
): (components: Libp2pComponents) => WakuPeerExchange {
return (components: Libp2pComponents) =>
new WakuPeerExchange(components, pubsubTopics);
export function wakuPeerExchange(): (
components: Libp2pComponents
) => WakuPeerExchange {
return (components: Libp2pComponents) => new WakuPeerExchange(components);
}

View File

@ -10,7 +10,6 @@ import type {
import {
type Libp2pComponents,
type PeerExchangeQueryResult,
PubsubTopic,
ShardInfo,
Tags
} from "@waku/interfaces";
@ -87,14 +86,10 @@ export class PeerExchangeDiscovery
);
};
public constructor(
components: Libp2pComponents,
pubsubTopics: PubsubTopic[],
options: Options = {}
) {
public constructor(components: Libp2pComponents, options: Options = {}) {
super();
this.components = components;
this.peerExchange = new WakuPeerExchange(components, pubsubTopics);
this.peerExchange = new WakuPeerExchange(components);
this.options = options;
this.isStarted = false;
}
@ -314,9 +309,9 @@ export class PeerExchangeDiscovery
}
}
export function wakuPeerExchangeDiscovery(
pubsubTopics: PubsubTopic[]
): (components: Libp2pComponents) => PeerExchangeDiscovery {
export function wakuPeerExchangeDiscovery(): (
components: Libp2pComponents
) => PeerExchangeDiscovery {
return (components: Libp2pComponents) =>
new PeerExchangeDiscovery(components, pubsubTopics);
new PeerExchangeDiscovery(components);
}

View File

@ -9,6 +9,7 @@
"@babel/preset-env": "^7.24.0",
"@babel/preset-typescript": "^7.23.3",
"babel-loader": "^9.1.3",
"filter-obj": "^2.0.2",
"node-polyfill-webpack-plugin": "^2.0.1",
"serve": "^14.1.2",
"webpack": "^5.99.5",

View File

@ -4,7 +4,6 @@ import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, ThisOrThat } from "./misc.js";
import type {
Callback,
IBaseProtocolCore,
ProtocolError,
SDKProtocolResult
} from "./protocols.js";
@ -52,7 +51,8 @@ export interface ISubscription {
unsubscribeAll(): Promise<SDKProtocolResult>;
}
export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & {
export type IFilter = IReceiver & {
readonly multicodec: string;
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>

View File

@ -2,6 +2,8 @@ import type { IDecodedMessage, IDecoder } from "./message.js";
import type { Callback } from "./protocols.js";
export type INextFilter = {
readonly multicodec: string;
/**
* Subscribes to messages with specified decoders and executes callback when a message is received.
* In case no peers available initially - will delay subscription till connects to any peer.

View File

@ -1,4 +1,3 @@
import { IBaseProtocolCore } from "./protocols.js";
import type { ISender, ISendOptions } from "./sender.js";
export type LightPushProtocolOptions = ISendOptions & {
@ -17,7 +16,7 @@ export type LightPushProtocolOptions = ISendOptions & {
};
export type ILightPush = ISender & {
readonly multicodec: string;
start: () => void;
stop: () => void;
protocol: IBaseProtocolCore;
};

View File

@ -1,14 +1,13 @@
import type { PeerId } from "@libp2p/interface";
import { PubsubTopic, ThisOrThat } from "./misc.js";
import type { IBaseProtocolCore } from "./protocols.js";
import type { ShardInfo } from "./sharding.js";
export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>;
// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol
export interface IMetadata extends Omit<IBaseProtocolCore, "shardInfo"> {
pubsubTopics: PubsubTopic[];
export interface IMetadata {
readonly multicodec: string;
readonly pubsubTopics: PubsubTopic[];
confirmOrAttemptHandshake(peerId: PeerId): Promise<MetadataQueryResult>;
query(peerId: PeerId): Promise<MetadataQueryResult>;
}

View File

@ -4,9 +4,8 @@ import type { ConnectionManager } from "@libp2p/interface-internal";
import { IEnr } from "./enr.js";
import { ThisOrThat } from "./misc.js";
import { IBaseProtocolCore } from "./protocols.js";
export interface IPeerExchange extends IBaseProtocolCore {
export interface IPeerExchange {
query(params: PeerExchangeQueryParams): Promise<PeerExchangeQueryResult>;
}

View File

@ -1,4 +1,3 @@
import type { Libp2p } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { ConnectionManagerOptions } from "./connection_manager.js";
@ -17,12 +16,6 @@ export enum Protocols {
Filter = "filter"
}
export type IBaseProtocolCore = {
multicodec: string;
addLibp2pEventListener: Libp2p["addEventListener"];
removeLibp2pEventListener: Libp2p["removeEventListener"];
};
export type NetworkConfig = StaticSharding | AutoSharding;
export type CreateNodeOptions = {

View File

@ -1,5 +1,4 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type { IBaseProtocolCore } from "./protocols.js";
export type StoreCursor = Uint8Array;
@ -76,10 +75,9 @@ export type QueryRequestParams = {
paginationLimit?: number;
};
export type IStoreCore = IBaseProtocolCore;
export type IStore = {
protocol: IBaseProtocolCore;
readonly multicodec: string;
createCursor(message: IDecodedMessage): StoreCursor;
queryGenerator: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],

View File

@ -1,18 +1,15 @@
import { PubsubTopic } from "@waku/interfaces";
import { expect } from "chai";
import { getPeerDiscoveries } from "./discovery.js";
describe("Default Peer Discoveries", () => {
const pubsubTopics: PubsubTopic[] = [];
it("should have no discoveries enabled by default", () => {
const discoveries = getPeerDiscoveries(pubsubTopics);
const discoveries = getPeerDiscoveries();
expect(discoveries.length).to.equal(0);
});
it("should enable all discoveries when explicitly set", () => {
const discoveries = getPeerDiscoveries(pubsubTopics, {
const discoveries = getPeerDiscoveries({
dns: true,
peerExchange: true,
localPeerCache: true
@ -21,7 +18,7 @@ describe("Default Peer Discoveries", () => {
});
it("should enable only peerExchange and localPeerCache when dns is disabled", () => {
const discoveries = getPeerDiscoveries(pubsubTopics, {
const discoveries = getPeerDiscoveries({
dns: false,
peerExchange: true,
localPeerCache: true
@ -30,7 +27,7 @@ describe("Default Peer Discoveries", () => {
});
it("should enable only dns and localPeerCache when peerExchange is disabled", () => {
const discoveries = getPeerDiscoveries(pubsubTopics, {
const discoveries = getPeerDiscoveries({
dns: true,
peerExchange: false,
localPeerCache: true
@ -39,7 +36,7 @@ describe("Default Peer Discoveries", () => {
});
it("should enable only dns and peerExchange when localPeerCache is disabled", () => {
const discoveries = getPeerDiscoveries(pubsubTopics, {
const discoveries = getPeerDiscoveries({
dns: true,
peerExchange: true,
localPeerCache: false
@ -48,7 +45,7 @@ describe("Default Peer Discoveries", () => {
});
it("should enable only localPeerCache when dns and peerExchange are disabled", () => {
const discoveries = getPeerDiscoveries(pubsubTopics, {
const discoveries = getPeerDiscoveries({
dns: false,
peerExchange: false,
localPeerCache: true

View File

@ -5,14 +5,9 @@ import {
wakuLocalPeerCacheDiscovery,
wakuPeerExchangeDiscovery
} from "@waku/discovery";
import {
CreateNodeOptions,
type Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { CreateNodeOptions, type Libp2pComponents } from "@waku/interfaces";
export function getPeerDiscoveries(
pubsubTopics: PubsubTopic[],
enabled?: CreateNodeOptions["discovery"]
): ((components: Libp2pComponents) => PeerDiscovery)[] {
const dnsEnrTrees = [enrTree["SANDBOX"]];
@ -28,7 +23,7 @@ export function getPeerDiscoveries(
}
if (enabled?.peerExchange) {
discoveries.push(wakuPeerExchangeDiscovery(pubsubTopics));
discoveries.push(wakuPeerExchangeDiscovery());
}
return discoveries;

View File

@ -97,13 +97,13 @@ export async function createLibp2pAndUpdateOptions(
if (options?.defaultBootstrap) {
peerDiscovery.push(
...getPeerDiscoveries(pubsubTopics, {
...getPeerDiscoveries({
...DEFAULT_DISCOVERIES_ENABLED,
...options.discovery
})
);
} else {
peerDiscovery.push(...getPeerDiscoveries(pubsubTopics, options.discovery));
peerDiscovery.push(...getPeerDiscoveries(options.discovery));
}
if (options?.bootstrapPeers) {

View File

@ -74,6 +74,10 @@ export class Filter implements IFilter {
this.activeSubscriptions = new Map();
}
public get multicodec(): string {
return this.protocol.multicodec;
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.

View File

@ -47,6 +47,10 @@ export class Filter implements IFilter {
);
}
public get multicodec(): string {
return this.protocol.multicodec;
}
/**
* Unsubscribes from all active subscriptions across all pubsub topics.
*

View File

@ -62,7 +62,7 @@ describe("LightPush SDK", () => {
(_encoder: any, _message: any, peerId: PeerId) =>
Promise.resolve({ success: peerId }) as any
);
lightPush.protocol.send = sendSpy;
lightPush["protocol"].send = sendSpy;
let result = await lightPush.send(encoder, {
payload: utf8ToBytes("test")
@ -77,7 +77,7 @@ describe("LightPush SDK", () => {
(_encoder: any, _message: any, peerId: PeerId) =>
Promise.resolve({ success: peerId }) as any
);
lightPush.protocol.send = sendSpy;
lightPush["protocol"].send = sendSpy;
result = await lightPush.send(encoder, { payload: utf8ToBytes("test") });
@ -94,7 +94,7 @@ describe("LightPush SDK", () => {
const sendSpy = sinon.spy((_encoder: any, _message: any, _peerId: PeerId) =>
Promise.resolve({ failure: { error: "problem" } })
);
lightPush.protocol.send = sendSpy as any;
lightPush["protocol"].send = sendSpy as any;
const retryPushSpy = (lightPush as any)["retryManager"].push as SinonSpy;
const result = await lightPush.send(
@ -122,7 +122,7 @@ describe("LightPush SDK", () => {
return Promise.resolve({ failure: { error: "problem" } });
}
);
lightPush.protocol.send = sendSpy as any;
lightPush["protocol"].send = sendSpy as any;
const retryPushSpy = (lightPush as any)["retryManager"].push as SinonSpy;
const result = await lightPush.send(

View File

@ -38,9 +38,8 @@ type LightPushConstructorParams = {
export class LightPush implements ILightPush {
private readonly config: LightPushProtocolOptions;
private readonly retryManager: RetryManager;
private peerManager: PeerManager;
public readonly protocol: LightPushCore;
private readonly peerManager: PeerManager;
private readonly protocol: LightPushCore;
public constructor(params: LightPushConstructorParams) {
this.config = {
@ -59,6 +58,10 @@ export class LightPush implements ILightPush {
});
}
public get multicodec(): string {
return this.protocol.multicodec;
}
public start(): void {
this.retryManager.start();
}

View File

@ -27,11 +27,10 @@ type StoreConstructorParams = {
* It provides methods to interact with the Waku Store protocol.
*/
export class Store implements IStore {
private options: Partial<StoreProtocolOptions>;
private peerManager: PeerManager;
private connectionManager: ConnectionManager;
public readonly protocol: StoreCore;
private readonly options: Partial<StoreProtocolOptions>;
private readonly peerManager: PeerManager;
private readonly connectionManager: ConnectionManager;
private readonly protocol: StoreCore;
public constructor(params: StoreConstructorParams) {
this.options = params.options || {};
@ -44,6 +43,10 @@ export class Store implements IStore {
);
}
public get multicodec(): string {
return this.protocol.multicodec;
}
/**
* Queries the Waku Store for historical messages using the provided decoders and options.
* Returns an asynchronous generator that yields promises of decoded messages.

View File

@ -190,7 +190,7 @@ export class WakuNode implements IWaku {
}
if (_protocols.includes(Protocols.Store)) {
if (this.store) {
codecs.push(this.store.protocol.multicodec);
codecs.push(this.store.multicodec);
} else {
log.error(
"Store codec not included in dial codec: protocol not mounted locally"
@ -199,7 +199,7 @@ export class WakuNode implements IWaku {
}
if (_protocols.includes(Protocols.LightPush)) {
if (this.lightPush) {
codecs.push(this.lightPush.protocol.multicodec);
codecs.push(this.lightPush.multicodec);
} else {
log.error(
"Light Push codec not included in dial codec: protocol not mounted locally"
@ -207,8 +207,8 @@ export class WakuNode implements IWaku {
}
}
if (_protocols.includes(Protocols.Filter)) {
if (this.filter) {
codecs.push(this.filter.protocol.multicodec);
if (this.nextFilter) {
codecs.push(this.nextFilter.multicodec);
} else {
log.error(
"Filter codec not included in dial codec: protocol not mounted locally"

View File

@ -5,7 +5,6 @@ import { createLightNode } from "@waku/sdk";
import {
beforeEachCustom,
DefaultTestPubsubTopic,
DefaultTestShardInfo,
makeLogFileName,
ServiceNode,
@ -46,9 +45,7 @@ describe("Peer Exchange", function () {
const nwaku2Ma = await nwaku2.getMultiaddrWithId();
const peerExchange = new PeerExchangeDiscovery(waku.libp2p.components, [
DefaultTestPubsubTopic
]);
const peerExchange = new PeerExchangeDiscovery(waku.libp2p.components);
peerExchange.addEventListener("waku:peer-exchange:started", (event) => {
if (event.detail === true) {

View File

@ -5,7 +5,7 @@ import { multiaddr } from "@multiformats/multiaddr";
import { PeerExchangeDiscovery } from "@waku/discovery";
import { IEnr, LightNode } from "@waku/interfaces";
import { createLightNode, ShardInfo } from "@waku/sdk";
import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils";
import { decodeRelayShard } from "@waku/utils";
import { expect } from "chai";
import Sinon from "sinon";
@ -24,10 +24,7 @@ describe("Peer Exchange Continuous Discovery", () => {
beforeEach(async () => {
waku = await createLightNode();
peerExchangeDiscovery = new PeerExchangeDiscovery(
waku.libp2p.components,
shardInfoToPubsubTopics(shardInfo)
);
peerExchangeDiscovery = new PeerExchangeDiscovery(waku.libp2p.components);
queryStub = Sinon.stub(
(peerExchangeDiscovery as any).peerExchange,
"query" as any

View File

@ -10,7 +10,6 @@ import Sinon, { SinonSpy } from "sinon";
import {
afterEachCustom,
beforeEachCustom,
DefaultTestPubsubTopic,
DefaultTestShardInfo,
makeLogFileName,
ServiceNode,
@ -59,7 +58,7 @@ describe("Peer Exchange", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery([DefaultTestPubsubTopic])
wakuPeerExchangeDiscovery()
]
}
});
@ -106,7 +105,7 @@ describe("Peer Exchange", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery([DefaultTestPubsubTopic])
wakuPeerExchangeDiscovery()
]
}
});
@ -165,7 +164,7 @@ describe("Peer Exchange", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }),
wakuPeerExchangeDiscovery(["wrong"])
wakuPeerExchangeDiscovery()
]
}
});

View File

@ -6,10 +6,7 @@ import {
} from "@waku/discovery";
import type { LightNode } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import {
singleShardInfosToShardInfo,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { singleShardInfosToShardInfo } from "@waku/utils";
import { expect } from "chai";
import { afterEachCustom, tearDownNodes } from "../../src/index.js";
@ -41,12 +38,11 @@ describe("Peer Exchange", () => {
const singleShardInfo = { clusterId: 1, shard: 1 };
const shardInfo = singleShardInfosToShardInfo([singleShardInfo]);
const pubsubTopic = singleShardInfoToPubsubTopic(singleShardInfo);
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: dnsPeerMultiaddrs }),
wakuPeerExchangeDiscovery([pubsubTopic])
wakuPeerExchangeDiscovery()
]
},
networkConfig: shardInfo

View File

@ -9,7 +9,7 @@ import {
} from "@waku/discovery";
import type { LightNode, PeerExchangeQueryResult } from "@waku/interfaces";
import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk";
import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils";
import { Logger } from "@waku/utils";
import { expect } from "chai";
import {
@ -25,12 +25,6 @@ import {
export const log = new Logger("test:pe");
const ShardInfo = { clusterId: 0, shards: [2] };
const pubsubTopic = [
singleShardInfoToPubsubTopic({
clusterId: ShardInfo.clusterId,
shard: ShardInfo.shards[0]
})
];
describe("Peer Exchange Query", function () {
this.timeout(30_000);
@ -82,7 +76,7 @@ describe("Peer Exchange Query", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3MA.toString()] }),
wakuPeerExchangeDiscovery(pubsubTopic)
wakuPeerExchangeDiscovery()
]
}
});
@ -91,7 +85,7 @@ describe("Peer Exchange Query", function () {
await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku3PeerId);
components = waku.libp2p.components as unknown as Libp2pComponents;
peerExchange = new WakuPeerExchange(components, pubsubTopic);
peerExchange = new WakuPeerExchange(components);
numPeersToRequest = 2;
const startTime = Date.now();

View File

@ -8,11 +8,7 @@ import {
ShardInfo,
Tags
} from "@waku/sdk";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
singleShardInfoToPubsubTopic
} from "@waku/utils";
import { contentTopicToShardIndex } from "@waku/utils";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import Sinon, { SinonSpy } from "sinon";
@ -52,9 +48,6 @@ describe("Static Sharding: Peer Management", function () {
it("all px service nodes subscribed to the shard topic should be dialed", async function () {
this.timeout(100_000);
const pubsubTopics = [
singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 2 })
];
const shardInfo: ShardInfo = { clusterId: clusterId, shards: [2] };
await nwaku1.start({
@ -93,7 +86,7 @@ describe("Static Sharding: Peer Management", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery(pubsubTopics)
wakuPeerExchangeDiscovery()
]
}
});
@ -127,9 +120,6 @@ describe("Static Sharding: Peer Management", function () {
it("px service nodes not subscribed to the shard should not be dialed", async function () {
this.timeout(100_000);
const pubsubTopicsToDial = [
singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 2 })
];
const shardInfoToDial: ShardInfo = { clusterId: clusterId, shards: [2] };
// this service node is not subscribed to the shard
@ -169,7 +159,7 @@ describe("Static Sharding: Peer Management", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery(pubsubTopicsToDial)
wakuPeerExchangeDiscovery()
]
}
});
@ -229,7 +219,6 @@ describe("Autosharding: Peer Management", function () {
it("all px service nodes subscribed to the shard topic should be dialed", async function () {
this.timeout(100_000);
const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)];
const contentTopicInfo: ContentTopicInfo = {
clusterId: clusterId,
contentTopics: [ContentTopic]
@ -274,7 +263,7 @@ describe("Autosharding: Peer Management", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery(pubsubTopics)
wakuPeerExchangeDiscovery()
]
}
});
@ -308,9 +297,6 @@ describe("Autosharding: Peer Management", function () {
it("px service nodes not subscribed to the shard should not be dialed", async function () {
this.timeout(100_000);
const pubsubTopicsToDial = [
contentTopicToPubsubTopic(ContentTopic, clusterId)
];
const contentTopicInfoToDial: ContentTopicInfo = {
clusterId: clusterId,
contentTopics: [ContentTopic]
@ -355,7 +341,7 @@ describe("Autosharding: Peer Management", function () {
libp2p: {
peerDiscovery: [
bootstrap({ list: [nwaku3Ma.toString()] }),
wakuPeerExchangeDiscovery(pubsubTopicsToDial)
wakuPeerExchangeDiscovery()
]
}
});