From 9e7719e5191ba78894e2f033d5f66116f09d4786 Mon Sep 17 00:00:00 2001 From: Sasha Date: Mon, 29 Sep 2025 23:01:43 +0200 Subject: [PATCH] add ICodec --- packages/core/src/index.ts | 2 + packages/core/src/lib/message/codec.ts | 76 ++++++++++++++++++++++ packages/core/src/lib/message/constants.ts | 2 + packages/core/src/lib/message/index.ts | 2 + packages/core/src/lib/message/version_0.ts | 6 +- packages/interfaces/src/message.ts | 2 + packages/interfaces/src/waku.ts | 38 ++++++++++- packages/rln/src/message.ts | 2 +- packages/sdk/src/messaging/ack_manager.ts | 14 ++-- packages/sdk/src/messaging/index.ts | 1 + packages/sdk/src/messaging/utils.ts | 7 +- packages/sdk/src/waku/waku.ts | 31 ++++++++- packages/utils/src/common/mock_node.ts | 5 ++ 13 files changed, 168 insertions(+), 20 deletions(-) create mode 100644 packages/core/src/lib/message/codec.ts create mode 100644 packages/core/src/lib/message/constants.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 8021ac0a91..5688e5e937 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -1,9 +1,11 @@ export { createEncoder, createDecoder } from "./lib/message/version_0.js"; +export { createCodec } from "./lib/message/index.js"; export type { Encoder, Decoder, DecodedMessage } from "./lib/message/version_0.js"; +export type { Codec } from "./lib/message/index.js"; export * as message from "./lib/message/index.js"; export * as waku_filter from "./lib/filter/index.js"; diff --git a/packages/core/src/lib/message/codec.ts b/packages/core/src/lib/message/codec.ts new file mode 100644 index 0000000000..c0212331bf --- /dev/null +++ b/packages/core/src/lib/message/codec.ts @@ -0,0 +1,76 @@ +import type { + ICodec, + IDecodedMessage, + IDecoder, + IEncoder, + IMessage, + IMetaSetter, + IProtoMessage, + IRoutingInfo, + PubsubTopic +} from "@waku/interfaces"; + +import { Decoder, Encoder } from "./version_0.js"; + +export class Codec implements ICodec { + private encoder: IEncoder; + private decoder: IDecoder; + + public constructor( + public contentTopic: string, + public ephemeral: boolean = false, + public routingInfo: IRoutingInfo, + public metaSetter?: IMetaSetter + ) { + this.encoder = new Encoder( + contentTopic, + ephemeral, + routingInfo, + metaSetter + ); + this.decoder = new Decoder(contentTopic, routingInfo); + } + + public get pubsubTopic(): PubsubTopic { + return this.routingInfo.pubsubTopic; + } + + public async toWire(message: IMessage): Promise { + return this.encoder.toWire(message); + } + + public async toProtoObj( + message: IMessage + ): Promise { + return this.encoder.toProtoObj(message); + } + + public fromWireToProtoObj( + bytes: Uint8Array + ): Promise { + return this.decoder.fromWireToProtoObj(bytes); + } + + public async fromProtoObj( + pubsubTopic: string, + proto: IProtoMessage + ): Promise { + return this.decoder.fromProtoObj(pubsubTopic, proto); + } +} + +type CodecParams = { + contentTopic: string; + ephemeral: boolean; + routingInfo: IRoutingInfo; + metaSetter?: IMetaSetter; +}; + +export function createCodec(params: CodecParams): Codec { + return new Codec( + params.contentTopic, + params.ephemeral, + params.routingInfo, + params.metaSetter + ); +} diff --git a/packages/core/src/lib/message/constants.ts b/packages/core/src/lib/message/constants.ts new file mode 100644 index 0000000000..0d54071228 --- /dev/null +++ b/packages/core/src/lib/message/constants.ts @@ -0,0 +1,2 @@ +export const OneMillion = BigInt(1_000_000); +export const Version = 0; diff --git a/packages/core/src/lib/message/index.ts b/packages/core/src/lib/message/index.ts index e4736e54e1..8add5fba21 100644 --- a/packages/core/src/lib/message/index.ts +++ b/packages/core/src/lib/message/index.ts @@ -1 +1,3 @@ export * as version_0 from "./version_0.js"; +export { Codec, createCodec } from "./codec.js"; +export { OneMillion, Version } from "./constants.js"; diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index c127120e74..d3d8a7d333 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -16,10 +16,10 @@ import { bytesToHex } from "@waku/utils/bytes"; import { messageHash } from "../message_hash/index.js"; -const log = new Logger("message:version-0"); -const OneMillion = BigInt(1_000_000); +import { OneMillion, Version } from "./constants.js"; + +const log = new Logger("message:version-0"); -export const Version = 0; export { proto }; export class DecodedMessage implements IDecodedMessage { diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index caecb73aec..e97a5d0304 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -111,3 +111,5 @@ export interface IDecoder { proto: IProtoMessage ) => Promise; } + +export type ICodec = IEncoder & IDecoder; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 1fbec20cce..a0c83ac746 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -10,7 +10,7 @@ import type { IFilter } from "./filter.js"; import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; +import { ICodec, IDecodedMessage, IDecoder, IEncoder } from "./message.js"; import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { ShardId } from "./sharding.js"; @@ -25,6 +25,8 @@ export type CreateEncoderParams = CreateDecoderParams & { ephemeral?: boolean; }; +export type CreateCodecParams = CreateDecoderParams & CreateEncoderParams; + export enum WakuEvent { Connection = "waku:connection", Health = "waku:health" @@ -206,6 +208,8 @@ export interface IWaku { waitForPeers(protocols?: Protocols[], timeoutMs?: number): Promise; /** + * @deprecated Use {@link createCodec} instead + * * Creates a decoder for Waku messages on a specific content topic. * * A decoder is used to decode messages from the Waku network format. @@ -235,6 +239,8 @@ export interface IWaku { createDecoder(params: CreateDecoderParams): IDecoder; /** + * @deprecated Use {@link createCodec} instead + * * Creates an encoder for Waku messages on a specific content topic. * * An encoder is used to encode messages into the Waku network format. @@ -264,6 +270,36 @@ export interface IWaku { */ createEncoder(params: CreateEncoderParams): IEncoder; + /** + * Creates a codec for Waku messages on a specific content topic. + * + * A codec is used to encode and decode messages from the Waku network format. + * The codec automatically handles shard configuration based on the Waku node's network settings. + * + * @param {CreateCodecParams} params - Configuration for the codec including content topic and optionally shard information and ephemeral flag + * @returns {ICodec} A codec instance configured for the specified content topic + * @throws {Error} If the shard configuration is incompatible with the node's network settings + * + * @example + * ```typescript + * // Create a codec with default network shard settings + * const codec = waku.createCodec({ + * contentTopic: "/my-app/1/chat/proto" + * }); + * + * // Create a codec with custom shard settings + * const customCodec = waku.createCodec({ + * contentTopic: "/my-app/1/chat/proto", + * ephemeral: true, + * shardInfo: { + * clusterId: 1, + * shard: 5 + * } + * }); + * ``` + */ + createCodec(params: CreateCodecParams): ICodec; + /** * @returns {boolean} `true` if the node was started and `false` otherwise */ diff --git a/packages/rln/src/message.ts b/packages/rln/src/message.ts index 1cca8a4ed2..d4fa44e84d 100644 --- a/packages/rln/src/message.ts +++ b/packages/rln/src/message.ts @@ -17,7 +17,7 @@ export function toRLNSignal(contentTopic: string, msg: IMessage): Uint8Array { export class RlnMessage implements IRlnMessage { public pubsubTopic = ""; - public version = message.version_0.Version; + public version = message.Version; public constructor( private rlnInstance: RLNInstance, diff --git a/packages/sdk/src/messaging/ack_manager.ts b/packages/sdk/src/messaging/ack_manager.ts index 5f41d1a771..c7874c31b1 100644 --- a/packages/sdk/src/messaging/ack_manager.ts +++ b/packages/sdk/src/messaging/ack_manager.ts @@ -1,7 +1,7 @@ -import { IDecodedMessage, IFilter, IStore } from "@waku/interfaces"; +import { ICodec, IDecodedMessage, IFilter, IStore } from "@waku/interfaces"; import { MessageStore } from "./message_store.js"; -import { IAckManager, ICodec } from "./utils.js"; +import { IAckManager } from "./utils.js"; type AckManagerConstructorParams = { messageStore: MessageStore; @@ -35,7 +35,7 @@ export class AckManager implements IAckManager { this.storeAckManager.stop(); } - public async subscribe(codec: ICodec): Promise { + public async subscribe(codec: ICodec): Promise { return ( (await this.filterAckManager.subscribe(codec)) || (await this.storeAckManager.subscribe(codec)) @@ -44,7 +44,7 @@ export class AckManager implements IAckManager { } class FilterAckManager implements IAckManager { - private codecs: Set = new Set(); + private codecs: Set> = new Set(); public constructor( private messageStore: MessageStore, @@ -63,7 +63,7 @@ class FilterAckManager implements IAckManager { this.codecs.clear(); } - public async subscribe(codec: ICodec): Promise { + public async subscribe(codec: ICodec): Promise { const success = await this.filter.subscribe( codec, this.onMessage.bind(this) @@ -86,7 +86,7 @@ class FilterAckManager implements IAckManager { class StoreAckManager implements IAckManager { private interval: ReturnType | null = null; - private codecs: Set = new Set(); + private codecs: Set> = new Set(); public constructor( private messageStore: MessageStore, @@ -112,7 +112,7 @@ class StoreAckManager implements IAckManager { this.interval = null; } - public async subscribe(codec: ICodec): Promise { + public async subscribe(codec: ICodec): Promise { this.codecs.add(codec); return true; } diff --git a/packages/sdk/src/messaging/index.ts b/packages/sdk/src/messaging/index.ts index 0035e4eb2f..04928b7037 100644 --- a/packages/sdk/src/messaging/index.ts +++ b/packages/sdk/src/messaging/index.ts @@ -1 +1,2 @@ export { Messaging } from "./messaging.js"; +export type { RequestId } from "./utils.js"; diff --git a/packages/sdk/src/messaging/utils.ts b/packages/sdk/src/messaging/utils.ts index 3f270e5bbe..f76cb7920f 100644 --- a/packages/sdk/src/messaging/utils.ts +++ b/packages/sdk/src/messaging/utils.ts @@ -1,12 +1,9 @@ -import { IDecodedMessage, IDecoder, IEncoder } from "@waku/interfaces"; - -// TODO: create a local entity for that that will literally extend existing encoder and decoder from package/core -export type ICodec = IEncoder & IDecoder; +import { ICodec, IDecodedMessage } from "@waku/interfaces"; export type RequestId = string; export interface IAckManager { start(): void; stop(): void; - subscribe(codec: ICodec): Promise; + subscribe(codec: ICodec): Promise; } diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 4324e48a4c..d558a13369 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -5,11 +5,18 @@ import { TypedEventEmitter } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, createDecoder, createEncoder } from "@waku/core"; +import { + ConnectionManager, + createCodec, + createDecoder, + createEncoder +} from "@waku/core"; import type { + CreateCodecParams, CreateDecoderParams, CreateEncoderParams, CreateNodeOptions, + ICodec, IDecodedMessage, IDecoder, IEncoder, @@ -35,6 +42,7 @@ import { Filter } from "../filter/index.js"; import { HealthIndicator } from "../health_indicator/index.js"; import { LightPush } from "../light_push/index.js"; import { Messaging } from "../messaging/index.js"; +import type { RequestId } from "../messaging/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { Store } from "../store/index.js"; @@ -295,8 +303,25 @@ export class WakuNode implements IWaku { }); } - public send(encoder: IEncoder, message: IMessage): Promise { - return this.messaging?.send(encoder, message) ?? Promise.resolve(); + public send(encoder: IEncoder, message: IMessage): Promise { + if (!this.messaging) { + throw new Error("Messaging not initialized"); + } + + return this.messaging.send(encoder, message); + } + + public createCodec(params: CreateCodecParams): ICodec { + const routingInfo = this.createRoutingInfo( + params.contentTopic, + params.shardId + ); + + return createCodec({ + contentTopic: params.contentTopic, + ephemeral: params.ephemeral ?? false, + routingInfo: routingInfo + }); } private createRoutingInfo( diff --git a/packages/utils/src/common/mock_node.ts b/packages/utils/src/common/mock_node.ts index 40472fea17..531709e0dd 100644 --- a/packages/utils/src/common/mock_node.ts +++ b/packages/utils/src/common/mock_node.ts @@ -2,9 +2,11 @@ import { Peer, PeerId, Stream, TypedEventEmitter } from "@libp2p/interface"; import { MultiaddrInput } from "@multiformats/multiaddr"; import { Callback, + CreateCodecParams, CreateDecoderParams, CreateEncoderParams, HealthStatus, + ICodec, IDecodedMessage, IDecoder, IEncoder, @@ -154,6 +156,9 @@ export class MockWakuNode implements IWaku { public createEncoder(_params: CreateEncoderParams): IEncoder { throw new Error("Method not implemented."); } + public createCodec(_params: CreateCodecParams): ICodec { + throw new Error("Method not implemented."); + } public isStarted(): boolean { throw new Error("Method not implemented."); }