From 3038c4891748cd13cb5c3262c722c4923d2f7079 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Mon, 14 Apr 2025 10:46:47 +0200 Subject: [PATCH] feat: introduce `createDecoder` and `createEncoder` on `IWaku` (#2352) * feat: introduce createDecoder and createEncoder on IWaku * add tests, refactor * fix type --- packages/core/src/lib/message/version_0.ts | 2 +- packages/interfaces/src/constants.ts | 5 + packages/interfaces/src/waku.ts | 79 ++++++++++ packages/sdk/src/waku/utils.spec.ts | 145 ++++++++++++++++++ packages/sdk/src/waku/utils.ts | 56 +++++++ packages/sdk/src/waku/waku.ts | 68 ++++++-- .../tests/tests/filter/subscribe.node.spec.ts | 22 ++- packages/tests/tests/filter/utils.ts | 2 + .../tests/tests/light-push/index.node.spec.ts | 20 +-- packages/tests/tests/light-push/utils.ts | 7 +- 10 files changed, 376 insertions(+), 30 deletions(-) create mode 100644 packages/sdk/src/waku/utils.spec.ts create mode 100644 packages/sdk/src/waku/utils.ts diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index 8e4c8162c0..5f7eadc7a5 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -132,7 +132,7 @@ export function createEncoder({ ); } -export class Decoder implements IDecoder { +export class Decoder implements IDecoder { public constructor( public pubsubTopic: PubsubTopic, public contentTopic: string diff --git a/packages/interfaces/src/constants.ts b/packages/interfaces/src/constants.ts index f50485174d..b65f48d72c 100644 --- a/packages/interfaces/src/constants.ts +++ b/packages/interfaces/src/constants.ts @@ -5,6 +5,11 @@ import type { ShardInfo } from "./sharding"; */ export const DEFAULT_CLUSTER_ID = 1; +/** + * The default number of shards under a cluster. + */ +export const DEFAULT_NUM_SHARDS = 8; + /** * DefaultShardInfo is default configuration for The Waku Network. */ diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index ecf791a46a..7df3ed511a 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -6,10 +6,30 @@ import type { IFilter } from "./filter.js"; import type { IHealthIndicator } from "./health_indicator.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; +import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { IStore } from "./store.js"; +type AutoShardSingle = { + clusterId: number; + shardsUnderCluster: number; +}; + +type StaticShardSingle = { + clusterId: number; + shard: number; +}; + +export type CreateDecoderParams = { + contentTopic: string; + shardInfo?: AutoShardSingle | StaticShardSingle; +}; + +export type CreateEncoderParams = CreateDecoderParams & { + ephemeral?: boolean; +}; + export interface IWaku { libp2p: Libp2p; relay?: IRelay; @@ -111,6 +131,65 @@ export interface IWaku { */ waitForPeers(protocols?: Protocols[], timeoutMs?: number): Promise; + /** + * Creates a decoder for Waku messages on a specific content topic. + * + * A decoder is used to decode messages from the Waku network format. + * The decoder automatically handles shard configuration based on the Waku node's network settings. + * + * @param {CreateDecoderParams} params - Configuration for the decoder + * @returns {IDecoder} A decoder instance configured for the specified content topic + * @throws {Error} If the shard configuration is incompatible with the node's network settings + * + * @example + * ```typescript + * // Create a decoder with default network shard settings + * const decoder = waku.createDecoder({ + * contentTopic: "/my-app/1/chat/proto" + * }); + * + * // Create a decoder with custom shard settings + * const customDecoder = waku.createDecoder({ + * contentTopic: "/my-app/1/chat/proto", + * shardInfo: { + * clusterId: 1, + * shard: 5 + * } + * }); + * ``` + */ + createDecoder(params: CreateDecoderParams): IDecoder; + + /** + * Creates an encoder for Waku messages on a specific content topic. + * + * An encoder is used to encode messages into the Waku network format. + * The encoder automatically handles shard configuration based on the Waku node's network settings. + * + * @param {CreateEncoderParams} params - Configuration for the encoder including content topic and optionally shard information and ephemeral flag + * @returns {IEncoder} An encoder instance configured for the specified content topic + * @throws {Error} If the shard configuration is incompatible with the node's network settings + * + * @example + * ```typescript + * // Create a basic encoder with default network shard settings + * const encoder = waku.createEncoder({ + * contentTopic: "/my-app/1/chat/proto" + * }); + * + * // Create an ephemeral encoder (messages won't be stored by store nodes) + * const ephemeralEncoder = waku.createEncoder({ + * contentTopic: "/my-app/1/notifications/proto", + * ephemeral: true, + * shardInfo: { + * clusterId: 2, + * shardsUnderCluster: 16 + * } + * }); + * ``` + */ + createEncoder(params: CreateEncoderParams): IEncoder; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/sdk/src/waku/utils.spec.ts b/packages/sdk/src/waku/utils.spec.ts new file mode 100644 index 0000000000..d3d3605c22 --- /dev/null +++ b/packages/sdk/src/waku/utils.spec.ts @@ -0,0 +1,145 @@ +import { peerIdFromString } from "@libp2p/peer-id"; +import { DEFAULT_NUM_SHARDS, DefaultNetworkConfig } from "@waku/interfaces"; +import { contentTopicToShardIndex } from "@waku/utils"; +import { expect } from "chai"; + +import { + decoderParamsToShardInfo, + isShardCompatible, + mapToPeerIdOrMultiaddr +} from "./utils.js"; + +const TestContentTopic = "/test/1/waku-sdk/utf8"; + +describe("IWaku utils", () => { + describe("mapToPeerIdOrMultiaddr", () => { + it("should return PeerId when PeerId is provided", async () => { + const peerId = peerIdFromString( + "12D3KooWHFJGwBXD7ukXqKaQZYmV1U3xxN1XCNrgriSEyvkxf6nE" + ); + + const result = mapToPeerIdOrMultiaddr(peerId); + + expect(result).to.equal(peerId); + }); + + it("should return Multiaddr when Multiaddr input is provided", () => { + const multiAddr = + "/ip4/127.0.0.1/tcp/8000/p2p/12D3KooWHFJGwBXD7ukXqKaQZYmV1U3xxN1XCNrgriSEyvkxf6nE"; + + const result = mapToPeerIdOrMultiaddr(multiAddr); + + expect(result.toString()).to.equal(multiAddr); + }); + }); + + describe("decoderParamsToShardInfo", () => { + it("should use provided shard info when available", () => { + const params = { + contentTopic: TestContentTopic, + shardInfo: { + clusterId: 10, + shard: 5 + } + }; + + const result = decoderParamsToShardInfo(params, DefaultNetworkConfig); + + expect(result.clusterId).to.equal(10); + expect(result.shard).to.equal(5); + }); + + it("should use network config clusterId when shard info clusterId is not provided", () => { + const params = { + contentTopic: TestContentTopic, + shardInfo: { + clusterId: 1, + shard: 5 + } + }; + + const result = decoderParamsToShardInfo(params, DefaultNetworkConfig); + + expect(result.clusterId).to.equal(1); + expect(result.shard).to.equal(5); + }); + + it("should use shardsUnderCluster when provided", () => { + const contentTopic = TestContentTopic; + const params = { + contentTopic, + shardInfo: { + clusterId: 10, + shardsUnderCluster: 64 + } + }; + + const result = decoderParamsToShardInfo(params, DefaultNetworkConfig); + const expectedShardIndex = contentTopicToShardIndex(contentTopic, 64); + + expect(result.clusterId).to.equal(10); + expect(result.shard).to.equal(expectedShardIndex); + }); + + it("should calculate shard index from content topic when shard is not provided", () => { + const contentTopic = TestContentTopic; + const params = { + contentTopic + }; + + const result = decoderParamsToShardInfo(params, DefaultNetworkConfig); + const expectedShardIndex = contentTopicToShardIndex( + contentTopic, + DEFAULT_NUM_SHARDS + ); + + expect(result.clusterId).to.equal(1); + expect(result.shard).to.equal(expectedShardIndex); + }); + }); + + describe("isShardCompatible", () => { + it("should return false when clusterId doesn't match", () => { + const shardInfo = { + clusterId: 10, + shard: 5 + }; + + const result = isShardCompatible(shardInfo, DefaultNetworkConfig); + + expect(result).to.be.false; + }); + + it("should return false when shard is not included in network shards", () => { + const shardInfo = { + clusterId: 1, + shard: 5 + }; + + const networkConfig = { + clusterId: 1, + shards: [1, 2, 3, 4] + }; + + const result = isShardCompatible(shardInfo, networkConfig); + + expect(result).to.be.false; + }); + + it("should return true when clusterId matches and shard is included in network shards", () => { + const shardInfo = { + clusterId: 1, + shard: 3 + }; + + const networkConfig = { + clusterId: 1, + shards: [1, 2, 3, 4] + }; + + const result = isShardCompatible(shardInfo, networkConfig); + + expect(result).to.be.true; + }); + }); +}); diff --git a/packages/sdk/src/waku/utils.ts b/packages/sdk/src/waku/utils.ts new file mode 100644 index 0000000000..dc391ec12b --- /dev/null +++ b/packages/sdk/src/waku/utils.ts @@ -0,0 +1,56 @@ +import { isPeerId } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; +import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; +import type { + CreateDecoderParams, + NetworkConfig, + SingleShardInfo +} from "@waku/interfaces"; +import { DEFAULT_NUM_SHARDS } from "@waku/interfaces"; +import { contentTopicToShardIndex } from "@waku/utils"; + +export const mapToPeerIdOrMultiaddr = ( + peerId: PeerId | MultiaddrInput +): PeerId | Multiaddr => { + return isPeerId(peerId) ? peerId : multiaddr(peerId); +}; + +export const decoderParamsToShardInfo = ( + params: CreateDecoderParams, + networkConfig: NetworkConfig +): SingleShardInfo => { + const clusterId = (params.shardInfo?.clusterId || + networkConfig.clusterId) as number; + const shardsUnderCluster = + params.shardInfo && "shardsUnderCluster" in params.shardInfo + ? params.shardInfo.shardsUnderCluster + : DEFAULT_NUM_SHARDS; + + const shardIndex = + params.shardInfo && "shard" in params.shardInfo + ? params.shardInfo.shard + : contentTopicToShardIndex(params.contentTopic, shardsUnderCluster); + + return { + clusterId, + shard: shardIndex + }; +}; + +export const isShardCompatible = ( + shardInfo: SingleShardInfo, + networkConfig: NetworkConfig +): boolean => { + if (networkConfig.clusterId !== shardInfo.clusterId) { + return false; + } + + if ( + "shards" in networkConfig && + !networkConfig.shards.includes(shardInfo.shard!) + ) { + return false; + } + + return true; +}; diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 62013a1a0e..aff3618823 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -1,18 +1,28 @@ -import { isPeerId } from "@libp2p/interface"; import type { Peer, PeerId, Stream } from "@libp2p/interface"; -import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, StoreCodec } from "@waku/core"; +import { MultiaddrInput } from "@multiformats/multiaddr"; +import { + ConnectionManager, + createDecoder, + createEncoder, + StoreCodec +} from "@waku/core"; import type { + CreateDecoderParams, + CreateEncoderParams, CreateNodeOptions, + IDecodedMessage, + IDecoder, + IEncoder, IFilter, ILightPush, IRelay, IStore, IWaku, Libp2p, + NetworkConfig, PubsubTopic } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { DefaultNetworkConfig, Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { Filter } from "../filter/index.js"; @@ -21,6 +31,11 @@ import { LightPush } from "../light_push/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { Store } from "../store/index.js"; +import { + decoderParamsToShardInfo, + isShardCompatible, + mapToPeerIdOrMultiaddr +} from "./utils.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; const log = new Logger("waku"); @@ -40,6 +55,8 @@ export class WakuNode implements IWaku { public connectionManager: ConnectionManager; public health: HealthIndicator; + public readonly networkConfig: NetworkConfig; + // needed to create a lock for async operations private _nodeStateLock = false; private _nodeStarted = false; @@ -55,6 +72,7 @@ export class WakuNode implements IWaku { ) { this.relay = relay; this.libp2p = libp2p; + this.networkConfig = options.networkConfig || DefaultNetworkConfig; protocolsEnabled = { filter: false, @@ -188,7 +206,7 @@ export class WakuNode implements IWaku { } } - const peerId = this.mapToPeerIdOrMultiaddr(peer); + const peerId = mapToPeerIdOrMultiaddr(peer); log.info(`Dialing to ${peerId.toString()} with protocols ${_protocols}`); return await this.connectionManager.rawDialPeerWithProtocols(peer, codecs); } @@ -241,9 +259,41 @@ export class WakuNode implements IWaku { return this.connectionManager.isConnected(); } - private mapToPeerIdOrMultiaddr( - peerId: PeerId | MultiaddrInput - ): PeerId | Multiaddr { - return isPeerId(peerId) ? peerId : multiaddr(peerId); + public createDecoder(params: CreateDecoderParams): IDecoder { + const singleShardInfo = decoderParamsToShardInfo( + params, + this.networkConfig + ); + + log.info( + `Creating Decoder with input:${JSON.stringify(params.shardInfo)}, determined:${JSON.stringify(singleShardInfo)}, expected:${JSON.stringify(this.networkConfig)}.` + ); + + if (!isShardCompatible(singleShardInfo, this.networkConfig)) { + throw Error(`Cannot create decoder: incompatible shard configuration.`); + } + + return createDecoder(params.contentTopic, singleShardInfo); + } + + public createEncoder(params: CreateEncoderParams): IEncoder { + const singleShardInfo = decoderParamsToShardInfo( + params, + this.networkConfig + ); + + log.info( + `Creating Encoder with input:${JSON.stringify(params.shardInfo)}, determined:${JSON.stringify(singleShardInfo)}, expected:${JSON.stringify(this.networkConfig)}.` + ); + + if (!isShardCompatible(singleShardInfo, this.networkConfig)) { + throw Error(`Cannot create encoder: incompatible shard configuration.`); + } + + return createEncoder({ + contentTopic: params.contentTopic, + ephemeral: params.ephemeral, + pubsubTopicShardInfo: singleShardInfo + }); } } diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 7c77f33b67..44dab9c00a 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -1,5 +1,5 @@ -import { createDecoder, createEncoder } from "@waku/core"; -import { LightNode } from "@waku/interfaces"; +import { createDecoder, createEncoder, DecodedMessage } from "@waku/core"; +import { IDecoder, LightNode } from "@waku/interfaces"; import { ecies, generatePrivateKey, @@ -30,6 +30,7 @@ import { ClusterId, messagePayload, messageText, + ShardIndex, TestContentTopic, TestDecoder, TestEncoder, @@ -433,14 +434,23 @@ const runTests = (strictCheckNodes: boolean): void => { TEST_STRING.forEach((testItem) => { it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () { const newContentTopic = testItem.value; - const newEncoder = createEncoder({ + const newEncoder = waku.createEncoder({ contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic + shardInfo: { + clusterId: ClusterId, + shard: ShardIndex + } + }); + const newDecoder = waku.createDecoder({ + contentTopic: newContentTopic, + shardInfo: { + clusterId: ClusterId, + shard: ShardIndex + } }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); await waku.filter.subscribe( - [newDecoder], + [newDecoder as IDecoder], serviceNodes.messageCollector.callback ); await waku.lightPush.send(newEncoder, messagePayload); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 938ac16d99..86e8ef2ce0 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -11,6 +11,7 @@ import { import { createLightNode } from "@waku/sdk"; import { contentTopicToPubsubTopic, + contentTopicToShardIndex, derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; @@ -28,6 +29,7 @@ import { export const log = new Logger("test:filter"); export const TestContentTopic = "/test/1/waku-filter/default"; export const ClusterId = 2; +export const ShardIndex = contentTopicToShardIndex(TestContentTopic); export const TestShardInfo = { contentTopics: [TestContentTopic], clusterId: ClusterId diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 90cc283e5e..b57429c410 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -14,8 +14,10 @@ import { } from "../../src/index.js"; import { + ClusterId, messagePayload, messageText, + ShardIndex, TestContentTopic, TestEncoder, TestPubsubTopic, @@ -112,9 +114,12 @@ const runTests = (strictNodeCheck: boolean): void => { TEST_STRING.forEach((testItem) => { it(`Push message with content topic containing ${testItem.description}`, async function () { - const customEncoder = createEncoder({ + const customEncoder = waku.createEncoder({ contentTopic: testItem.value, - pubsubTopic: TestPubsubTopic + shardInfo: { + clusterId: ClusterId, + shard: ShardIndex + } }); const pushResponse = await waku.lightPush.send( customEncoder, @@ -135,17 +140,6 @@ const runTests = (strictNodeCheck: boolean): void => { }); }); - it("Fails to push message with empty content topic", async function () { - try { - createEncoder({ contentTopic: "" }); - expect.fail("Expected an error but didn't get one"); - } catch (error) { - expect((error as Error).message).to.equal( - "Content topic must be specified" - ); - } - }); - it("Push message with meta", async function () { const customTestEncoder = createEncoder({ contentTopic: TestContentTopic, diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index b77f75a9fa..8538d48ffa 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -2,7 +2,11 @@ import { createEncoder } from "@waku/core"; import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { createLightNode } from "@waku/sdk"; -import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + contentTopicToShardIndex, + Logger +} from "@waku/utils"; import { Context } from "mocha"; import { runNodes as runNodesBuilder, ServiceNode } from "../../src/index.js"; @@ -11,6 +15,7 @@ import { runNodes as runNodesBuilder, ServiceNode } from "../../src/index.js"; export const log = new Logger("test:lightpush"); export const TestContentTopic = "/test/1/waku-light-push/utf8"; export const ClusterId = 3; +export const ShardIndex = contentTopicToShardIndex(TestContentTopic); export const TestPubsubTopic = contentTopicToPubsubTopic( TestContentTopic, ClusterId