feat: introduce createDecoder and createEncoder on IWaku (#2352)

* feat: introduce createDecoder and createEncoder on IWaku

* add tests, refactor

* fix type
This commit is contained in:
Sasha 2025-04-14 10:46:47 +02:00 committed by GitHub
parent 163bea56c3
commit 3038c48917
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 376 additions and 30 deletions

View File

@ -132,7 +132,7 @@ export function createEncoder({
);
}
export class Decoder implements IDecoder<DecodedMessage> {
export class Decoder implements IDecoder<IDecodedMessage> {
public constructor(
public pubsubTopic: PubsubTopic,
public contentTopic: string

View File

@ -5,6 +5,11 @@ import type { ShardInfo } from "./sharding";
*/
export const DEFAULT_CLUSTER_ID = 1;
/**
* The default number of shards under a cluster.
*/
export const DEFAULT_NUM_SHARDS = 8;
/**
* DefaultShardInfo is default configuration for The Waku Network.
*/

View File

@ -6,10 +6,30 @@ import type { IFilter } from "./filter.js";
import type { IHealthIndicator } from "./health_indicator.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStore } from "./store.js";
type AutoShardSingle = {
clusterId: number;
shardsUnderCluster: number;
};
type StaticShardSingle = {
clusterId: number;
shard: number;
};
export type CreateDecoderParams = {
contentTopic: string;
shardInfo?: AutoShardSingle | StaticShardSingle;
};
export type CreateEncoderParams = CreateDecoderParams & {
ephemeral?: boolean;
};
export interface IWaku {
libp2p: Libp2p;
relay?: IRelay;
@ -111,6 +131,65 @@ export interface IWaku {
*/
waitForPeers(protocols?: Protocols[], timeoutMs?: number): Promise<void>;
/**
* Creates a decoder for Waku messages on a specific content topic.
*
* A decoder is used to decode messages from the Waku network format.
* The decoder automatically handles shard configuration based on the Waku node's network settings.
*
* @param {CreateDecoderParams} params - Configuration for the decoder
* @returns {IDecoder<IDecodedMessage>} A decoder instance configured for the specified content topic
* @throws {Error} If the shard configuration is incompatible with the node's network settings
*
* @example
* ```typescript
* // Create a decoder with default network shard settings
* const decoder = waku.createDecoder({
* contentTopic: "/my-app/1/chat/proto"
* });
*
* // Create a decoder with custom shard settings
* const customDecoder = waku.createDecoder({
* contentTopic: "/my-app/1/chat/proto",
* shardInfo: {
* clusterId: 1,
* shard: 5
* }
* });
* ```
*/
createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage>;
/**
* Creates an encoder for Waku messages on a specific content topic.
*
* An encoder is used to encode messages into the Waku network format.
* The encoder automatically handles shard configuration based on the Waku node's network settings.
*
* @param {CreateEncoderParams} params - Configuration for the encoder including content topic and optionally shard information and ephemeral flag
* @returns {IEncoder} An encoder instance configured for the specified content topic
* @throws {Error} If the shard configuration is incompatible with the node's network settings
*
* @example
* ```typescript
* // Create a basic encoder with default network shard settings
* const encoder = waku.createEncoder({
* contentTopic: "/my-app/1/chat/proto"
* });
*
* // Create an ephemeral encoder (messages won't be stored by store nodes)
* const ephemeralEncoder = waku.createEncoder({
* contentTopic: "/my-app/1/notifications/proto",
* ephemeral: true,
* shardInfo: {
* clusterId: 2,
* shardsUnderCluster: 16
* }
* });
* ```
*/
createEncoder(params: CreateEncoderParams): IEncoder;
/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/

View File

@ -0,0 +1,145 @@
import { peerIdFromString } from "@libp2p/peer-id";
import { DEFAULT_NUM_SHARDS, DefaultNetworkConfig } from "@waku/interfaces";
import { contentTopicToShardIndex } from "@waku/utils";
import { expect } from "chai";
import {
decoderParamsToShardInfo,
isShardCompatible,
mapToPeerIdOrMultiaddr
} from "./utils.js";
const TestContentTopic = "/test/1/waku-sdk/utf8";
describe("IWaku utils", () => {
describe("mapToPeerIdOrMultiaddr", () => {
it("should return PeerId when PeerId is provided", async () => {
const peerId = peerIdFromString(
"12D3KooWHFJGwBXD7ukXqKaQZYmV1U3xxN1XCNrgriSEyvkxf6nE"
);
const result = mapToPeerIdOrMultiaddr(peerId);
expect(result).to.equal(peerId);
});
it("should return Multiaddr when Multiaddr input is provided", () => {
const multiAddr =
"/ip4/127.0.0.1/tcp/8000/p2p/12D3KooWHFJGwBXD7ukXqKaQZYmV1U3xxN1XCNrgriSEyvkxf6nE";
const result = mapToPeerIdOrMultiaddr(multiAddr);
expect(result.toString()).to.equal(multiAddr);
});
});
describe("decoderParamsToShardInfo", () => {
it("should use provided shard info when available", () => {
const params = {
contentTopic: TestContentTopic,
shardInfo: {
clusterId: 10,
shard: 5
}
};
const result = decoderParamsToShardInfo(params, DefaultNetworkConfig);
expect(result.clusterId).to.equal(10);
expect(result.shard).to.equal(5);
});
it("should use network config clusterId when shard info clusterId is not provided", () => {
const params = {
contentTopic: TestContentTopic,
shardInfo: {
clusterId: 1,
shard: 5
}
};
const result = decoderParamsToShardInfo(params, DefaultNetworkConfig);
expect(result.clusterId).to.equal(1);
expect(result.shard).to.equal(5);
});
it("should use shardsUnderCluster when provided", () => {
const contentTopic = TestContentTopic;
const params = {
contentTopic,
shardInfo: {
clusterId: 10,
shardsUnderCluster: 64
}
};
const result = decoderParamsToShardInfo(params, DefaultNetworkConfig);
const expectedShardIndex = contentTopicToShardIndex(contentTopic, 64);
expect(result.clusterId).to.equal(10);
expect(result.shard).to.equal(expectedShardIndex);
});
it("should calculate shard index from content topic when shard is not provided", () => {
const contentTopic = TestContentTopic;
const params = {
contentTopic
};
const result = decoderParamsToShardInfo(params, DefaultNetworkConfig);
const expectedShardIndex = contentTopicToShardIndex(
contentTopic,
DEFAULT_NUM_SHARDS
);
expect(result.clusterId).to.equal(1);
expect(result.shard).to.equal(expectedShardIndex);
});
});
describe("isShardCompatible", () => {
it("should return false when clusterId doesn't match", () => {
const shardInfo = {
clusterId: 10,
shard: 5
};
const result = isShardCompatible(shardInfo, DefaultNetworkConfig);
expect(result).to.be.false;
});
it("should return false when shard is not included in network shards", () => {
const shardInfo = {
clusterId: 1,
shard: 5
};
const networkConfig = {
clusterId: 1,
shards: [1, 2, 3, 4]
};
const result = isShardCompatible(shardInfo, networkConfig);
expect(result).to.be.false;
});
it("should return true when clusterId matches and shard is included in network shards", () => {
const shardInfo = {
clusterId: 1,
shard: 3
};
const networkConfig = {
clusterId: 1,
shards: [1, 2, 3, 4]
};
const result = isShardCompatible(shardInfo, networkConfig);
expect(result).to.be.true;
});
});
});

View File

@ -0,0 +1,56 @@
import { isPeerId } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import type {
CreateDecoderParams,
NetworkConfig,
SingleShardInfo
} from "@waku/interfaces";
import { DEFAULT_NUM_SHARDS } from "@waku/interfaces";
import { contentTopicToShardIndex } from "@waku/utils";
export const mapToPeerIdOrMultiaddr = (
peerId: PeerId | MultiaddrInput
): PeerId | Multiaddr => {
return isPeerId(peerId) ? peerId : multiaddr(peerId);
};
export const decoderParamsToShardInfo = (
params: CreateDecoderParams,
networkConfig: NetworkConfig
): SingleShardInfo => {
const clusterId = (params.shardInfo?.clusterId ||
networkConfig.clusterId) as number;
const shardsUnderCluster =
params.shardInfo && "shardsUnderCluster" in params.shardInfo
? params.shardInfo.shardsUnderCluster
: DEFAULT_NUM_SHARDS;
const shardIndex =
params.shardInfo && "shard" in params.shardInfo
? params.shardInfo.shard
: contentTopicToShardIndex(params.contentTopic, shardsUnderCluster);
return {
clusterId,
shard: shardIndex
};
};
export const isShardCompatible = (
shardInfo: SingleShardInfo,
networkConfig: NetworkConfig
): boolean => {
if (networkConfig.clusterId !== shardInfo.clusterId) {
return false;
}
if (
"shards" in networkConfig &&
!networkConfig.shards.includes(shardInfo.shard!)
) {
return false;
}
return true;
};

View File

@ -1,18 +1,28 @@
import { isPeerId } from "@libp2p/interface";
import type { Peer, PeerId, Stream } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, StoreCodec } from "@waku/core";
import { MultiaddrInput } from "@multiformats/multiaddr";
import {
ConnectionManager,
createDecoder,
createEncoder,
StoreCodec
} from "@waku/core";
import type {
CreateDecoderParams,
CreateEncoderParams,
CreateNodeOptions,
IDecodedMessage,
IDecoder,
IEncoder,
IFilter,
ILightPush,
IRelay,
IStore,
IWaku,
Libp2p,
NetworkConfig,
PubsubTopic
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { DefaultNetworkConfig, Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Filter } from "../filter/index.js";
@ -21,6 +31,11 @@ import { LightPush } from "../light_push/index.js";
import { PeerManager } from "../peer_manager/index.js";
import { Store } from "../store/index.js";
import {
decoderParamsToShardInfo,
isShardCompatible,
mapToPeerIdOrMultiaddr
} from "./utils.js";
import { waitForRemotePeer } from "./wait_for_remote_peer.js";
const log = new Logger("waku");
@ -40,6 +55,8 @@ export class WakuNode implements IWaku {
public connectionManager: ConnectionManager;
public health: HealthIndicator;
public readonly networkConfig: NetworkConfig;
// needed to create a lock for async operations
private _nodeStateLock = false;
private _nodeStarted = false;
@ -55,6 +72,7 @@ export class WakuNode implements IWaku {
) {
this.relay = relay;
this.libp2p = libp2p;
this.networkConfig = options.networkConfig || DefaultNetworkConfig;
protocolsEnabled = {
filter: false,
@ -188,7 +206,7 @@ export class WakuNode implements IWaku {
}
}
const peerId = this.mapToPeerIdOrMultiaddr(peer);
const peerId = mapToPeerIdOrMultiaddr(peer);
log.info(`Dialing to ${peerId.toString()} with protocols ${_protocols}`);
return await this.connectionManager.rawDialPeerWithProtocols(peer, codecs);
}
@ -241,9 +259,41 @@ export class WakuNode implements IWaku {
return this.connectionManager.isConnected();
}
private mapToPeerIdOrMultiaddr(
peerId: PeerId | MultiaddrInput
): PeerId | Multiaddr {
return isPeerId(peerId) ? peerId : multiaddr(peerId);
public createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage> {
const singleShardInfo = decoderParamsToShardInfo(
params,
this.networkConfig
);
log.info(
`Creating Decoder with input:${JSON.stringify(params.shardInfo)}, determined:${JSON.stringify(singleShardInfo)}, expected:${JSON.stringify(this.networkConfig)}.`
);
if (!isShardCompatible(singleShardInfo, this.networkConfig)) {
throw Error(`Cannot create decoder: incompatible shard configuration.`);
}
return createDecoder(params.contentTopic, singleShardInfo);
}
public createEncoder(params: CreateEncoderParams): IEncoder {
const singleShardInfo = decoderParamsToShardInfo(
params,
this.networkConfig
);
log.info(
`Creating Encoder with input:${JSON.stringify(params.shardInfo)}, determined:${JSON.stringify(singleShardInfo)}, expected:${JSON.stringify(this.networkConfig)}.`
);
if (!isShardCompatible(singleShardInfo, this.networkConfig)) {
throw Error(`Cannot create encoder: incompatible shard configuration.`);
}
return createEncoder({
contentTopic: params.contentTopic,
ephemeral: params.ephemeral,
pubsubTopicShardInfo: singleShardInfo
});
}
}

View File

@ -1,5 +1,5 @@
import { createDecoder, createEncoder } from "@waku/core";
import { LightNode } from "@waku/interfaces";
import { createDecoder, createEncoder, DecodedMessage } from "@waku/core";
import { IDecoder, LightNode } from "@waku/interfaces";
import {
ecies,
generatePrivateKey,
@ -30,6 +30,7 @@ import {
ClusterId,
messagePayload,
messageText,
ShardIndex,
TestContentTopic,
TestDecoder,
TestEncoder,
@ -433,14 +434,23 @@ const runTests = (strictCheckNodes: boolean): void => {
TEST_STRING.forEach((testItem) => {
it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () {
const newContentTopic = testItem.value;
const newEncoder = createEncoder({
const newEncoder = waku.createEncoder({
contentTopic: newContentTopic,
pubsubTopic: TestPubsubTopic
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
const newDecoder = waku.createDecoder({
contentTopic: newContentTopic,
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
await waku.filter.subscribe(
[newDecoder],
[newDecoder as IDecoder<DecodedMessage>],
serviceNodes.messageCollector.callback
);
await waku.lightPush.send(newEncoder, messagePayload);

View File

@ -11,6 +11,7 @@ import {
import { createLightNode } from "@waku/sdk";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
derivePubsubTopicsFromNetworkConfig,
Logger
} from "@waku/utils";
@ -28,6 +29,7 @@ import {
export const log = new Logger("test:filter");
export const TestContentTopic = "/test/1/waku-filter/default";
export const ClusterId = 2;
export const ShardIndex = contentTopicToShardIndex(TestContentTopic);
export const TestShardInfo = {
contentTopics: [TestContentTopic],
clusterId: ClusterId

View File

@ -14,8 +14,10 @@ import {
} from "../../src/index.js";
import {
ClusterId,
messagePayload,
messageText,
ShardIndex,
TestContentTopic,
TestEncoder,
TestPubsubTopic,
@ -112,9 +114,12 @@ const runTests = (strictNodeCheck: boolean): void => {
TEST_STRING.forEach((testItem) => {
it(`Push message with content topic containing ${testItem.description}`, async function () {
const customEncoder = createEncoder({
const customEncoder = waku.createEncoder({
contentTopic: testItem.value,
pubsubTopic: TestPubsubTopic
shardInfo: {
clusterId: ClusterId,
shard: ShardIndex
}
});
const pushResponse = await waku.lightPush.send(
customEncoder,
@ -135,17 +140,6 @@ const runTests = (strictNodeCheck: boolean): void => {
});
});
it("Fails to push message with empty content topic", async function () {
try {
createEncoder({ contentTopic: "" });
expect.fail("Expected an error but didn't get one");
} catch (error) {
expect((error as Error).message).to.equal(
"Content topic must be specified"
);
}
});
it("Push message with meta", async function () {
const customTestEncoder = createEncoder({
contentTopic: TestContentTopic,

View File

@ -2,7 +2,11 @@ import { createEncoder } from "@waku/core";
import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk";
import { createLightNode } from "@waku/sdk";
import { contentTopicToPubsubTopic, Logger } from "@waku/utils";
import {
contentTopicToPubsubTopic,
contentTopicToShardIndex,
Logger
} from "@waku/utils";
import { Context } from "mocha";
import { runNodes as runNodesBuilder, ServiceNode } from "../../src/index.js";
@ -11,6 +15,7 @@ import { runNodes as runNodesBuilder, ServiceNode } from "../../src/index.js";
export const log = new Logger("test:lightpush");
export const TestContentTopic = "/test/1/waku-light-push/utf8";
export const ClusterId = 3;
export const ShardIndex = contentTopicToShardIndex(TestContentTopic);
export const TestPubsubTopic = contentTopicToPubsubTopic(
TestContentTopic,
ClusterId