remove Codec, update types

This commit is contained in:
Sasha 2025-10-07 00:16:38 +02:00
parent 62b43f061a
commit 80cef4bc2a
No known key found for this signature in database
12 changed files with 61 additions and 188 deletions

View File

@ -1,11 +1,9 @@
export { createEncoder, createDecoder } from "./lib/message/version_0.js"; export { createEncoder, createDecoder } from "./lib/message/version_0.js";
export { createCodec } from "./lib/message/index.js";
export type { export type {
Encoder, Encoder,
Decoder, Decoder,
DecodedMessage DecodedMessage
} from "./lib/message/version_0.js"; } from "./lib/message/version_0.js";
export type { Codec } from "./lib/message/index.js";
export * as message from "./lib/message/index.js"; export * as message from "./lib/message/index.js";
export * as waku_filter from "./lib/filter/index.js"; export * as waku_filter from "./lib/filter/index.js";

View File

@ -1,76 +0,0 @@
import type {
ICodec,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage,
IMetaSetter,
IProtoMessage,
IRoutingInfo,
PubsubTopic
} from "@waku/interfaces";
import { Decoder, Encoder } from "./version_0.js";
export class Codec implements ICodec<IDecodedMessage> {
private encoder: IEncoder;
private decoder: IDecoder<IDecodedMessage>;
public constructor(
public contentTopic: string,
public ephemeral: boolean = false,
public routingInfo: IRoutingInfo,
public metaSetter?: IMetaSetter
) {
this.encoder = new Encoder(
contentTopic,
ephemeral,
routingInfo,
metaSetter
);
this.decoder = new Decoder(contentTopic, routingInfo);
}
public get pubsubTopic(): PubsubTopic {
return this.routingInfo.pubsubTopic;
}
public async toWire(message: IMessage): Promise<Uint8Array | undefined> {
return this.encoder.toWire(message);
}
public async toProtoObj(
message: IMessage
): Promise<IProtoMessage | undefined> {
return this.encoder.toProtoObj(message);
}
public fromWireToProtoObj(
bytes: Uint8Array
): Promise<IProtoMessage | undefined> {
return this.decoder.fromWireToProtoObj(bytes);
}
public async fromProtoObj(
pubsubTopic: string,
proto: IProtoMessage
): Promise<IDecodedMessage | undefined> {
return this.decoder.fromProtoObj(pubsubTopic, proto);
}
}
type CodecParams = {
contentTopic: string;
ephemeral: boolean;
routingInfo: IRoutingInfo;
metaSetter?: IMetaSetter;
};
export function createCodec(params: CodecParams): Codec {
return new Codec(
params.contentTopic,
params.ephemeral,
params.routingInfo,
params.metaSetter
);
}

View File

@ -1,3 +1,2 @@
export * as version_0 from "./version_0.js"; export * as version_0 from "./version_0.js";
export { Codec, createCodec } from "./codec.js";
export { OneMillion, Version } from "./constants.js"; export { OneMillion, Version } from "./constants.js";

View File

@ -69,6 +69,21 @@ export interface IMessage {
rateLimitProof?: IRateLimitProof; rateLimitProof?: IRateLimitProof;
} }
/**
* Send message data structure used in {@link IWaku.send}.
*/
export interface ISendMessage {
contentTopic: string;
payload: Uint8Array;
ephemeral?: boolean;
rateLimitProof?: boolean;
}
/**
* Request ID of attempt to send a message.
*/
export type RequestId = string;
export interface IMetaSetter { export interface IMetaSetter {
(message: IProtoMessage & { meta: undefined }): Uint8Array; (message: IProtoMessage & { meta: undefined }): Uint8Array;
} }
@ -111,5 +126,3 @@ export interface IDecoder<T extends IDecodedMessage> {
proto: IProtoMessage proto: IProtoMessage
) => Promise<T | undefined>; ) => Promise<T | undefined>;
} }
export type ICodec<T extends IDecodedMessage> = IEncoder & IDecoder<T>;

View File

@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js";
import type { HealthStatus } from "./health_status.js"; import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js"; import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js"; import type { ILightPush } from "./light_push.js";
import { ICodec, IDecodedMessage, IDecoder, IEncoder } from "./message.js"; import {
IDecodedMessage,
IDecoder,
IEncoder,
ISendMessage,
RequestId
} from "./message.js";
import type { Protocols } from "./protocols.js"; import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js"; import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js"; import type { ShardId } from "./sharding.js";
@ -25,8 +31,6 @@ export type CreateEncoderParams = CreateDecoderParams & {
ephemeral?: boolean; ephemeral?: boolean;
}; };
export type CreateCodecParams = CreateDecoderParams & CreateEncoderParams;
export enum WakuEvent { export enum WakuEvent {
Connection = "waku:connection", Connection = "waku:connection",
Health = "waku:health" Health = "waku:health"
@ -208,8 +212,6 @@ export interface IWaku {
waitForPeers(protocols?: Protocols[], timeoutMs?: number): Promise<void>; waitForPeers(protocols?: Protocols[], timeoutMs?: number): Promise<void>;
/** /**
* @deprecated Use {@link createCodec} instead
*
* Creates a decoder for Waku messages on a specific content topic. * Creates a decoder for Waku messages on a specific content topic.
* *
* A decoder is used to decode messages from the Waku network format. * A decoder is used to decode messages from the Waku network format.
@ -239,8 +241,6 @@ export interface IWaku {
createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage>; createDecoder(params: CreateDecoderParams): IDecoder<IDecodedMessage>;
/** /**
* @deprecated Use {@link createCodec} instead
*
* Creates an encoder for Waku messages on a specific content topic. * Creates an encoder for Waku messages on a specific content topic.
* *
* An encoder is used to encode messages into the Waku network format. * An encoder is used to encode messages into the Waku network format.
@ -270,44 +270,13 @@ export interface IWaku {
*/ */
createEncoder(params: CreateEncoderParams): IEncoder; createEncoder(params: CreateEncoderParams): IEncoder;
/**
* Creates a codec for Waku messages on a specific content topic.
*
* A codec is used to encode and decode messages from the Waku network format.
* The codec automatically handles shard configuration based on the Waku node's network settings.
*
* @param {CreateCodecParams} params - Configuration for the codec including content topic and optionally shard information and ephemeral flag
* @returns {ICodec<IDecodedMessage>} A codec instance configured for the specified content topic
* @throws {Error} If the shard configuration is incompatible with the node's network settings
*
* @example
* ```typescript
* // Create a codec with default network shard settings
* const codec = waku.createCodec({
* contentTopic: "/my-app/1/chat/proto"
* });
*
* // Create a codec with custom shard settings
* const customCodec = waku.createCodec({
* contentTopic: "/my-app/1/chat/proto",
* ephemeral: true,
* shardInfo: {
* clusterId: 1,
* shard: 5
* }
* });
* ```
*/
createCodec(params: CreateCodecParams): ICodec<IDecodedMessage>;
/** /**
* Sends a message to the Waku network. * Sends a message to the Waku network.
* *
* @param {ICodec<IDecodedMessage>} codec - The codec to use for encoding the message * @param {ISendMessage} message - The message to send.
* @param {IMessage} message - The message to send * @returns {Promise<RequestId>} A promise that resolves to the request ID
* @returns {Promise<string>} A promise that resolves to the request ID
*/ */
// send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<string>; send(message: ISendMessage): Promise<RequestId>;
/** /**
* @returns {boolean} `true` if the node was started and `false` otherwise * @returns {boolean} `true` if the node was started and `false` otherwise

View File

@ -1,3 +1 @@
export { Messaging } from "./messaging.js"; export { Messaging } from "./messaging.js";
// todo: do not export this
export type { RequestId, WakuLikeMessage } from "./utils.js";

View File

@ -1,10 +1,8 @@
import { IDecodedMessage } from "@waku/interfaces"; import { IDecodedMessage, ISendMessage, RequestId } from "@waku/interfaces";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { WakuLikeMessage } from "./utils.js";
type QueuedMessage = { type QueuedMessage = {
messageRequest?: WakuLikeMessage; messageRequest?: ISendMessage;
filterAck: boolean; filterAck: boolean;
storeAck: boolean; storeAck: boolean;
lastSentAt?: number; lastSentAt?: number;
@ -20,7 +18,6 @@ type MessageStoreOptions = {
resendIntervalMs?: number; resendIntervalMs?: number;
}; };
type RequestId = string;
type MessageHashStr = string; type MessageHashStr = string;
export class MessageStore { export class MessageStore {
@ -72,7 +69,7 @@ export class MessageStore {
this.replacePendingWithMessage(sentMessage.hashStr); this.replacePendingWithMessage(sentMessage.hashStr);
} }
public async queue(message: WakuLikeMessage): Promise<RequestId> { public async queue(message: ISendMessage): Promise<RequestId> {
const requestId = uuidv4(); const requestId = uuidv4();
this.pendingRequests.set(requestId.toString(), { this.pendingRequests.set(requestId.toString(), {
@ -87,11 +84,11 @@ export class MessageStore {
public getMessagesToSend(): Array<{ public getMessagesToSend(): Array<{
requestId: string; requestId: string;
message: WakuLikeMessage; message: ISendMessage;
}> { }> {
const res: Array<{ const res: Array<{
requestId: string; requestId: string;
message: WakuLikeMessage; message: ISendMessage;
}> = []; }> = [];
for (const [requestId, entry] of this.pendingRequests.entries()) { for (const [requestId, entry] of this.pendingRequests.entries()) {

View File

@ -1,12 +1,18 @@
import { IFilter, ILightPush, IStore, NetworkConfig } from "@waku/interfaces"; import {
IFilter,
ILightPush,
ISendMessage,
IStore,
NetworkConfig,
RequestId
} from "@waku/interfaces";
import { AckManager } from "./ack_manager.js"; import { AckManager } from "./ack_manager.js";
import { MessageStore } from "./message_store.js"; import { MessageStore } from "./message_store.js";
import { Sender } from "./sender.js"; import { Sender } from "./sender.js";
import type { RequestId, WakuLikeMessage } from "./utils.js";
interface IMessaging { interface IMessaging {
send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId>; send(wakuLikeMessage: ISendMessage): Promise<RequestId>;
} }
type MessagingConstructorParams = { type MessagingConstructorParams = {
@ -49,7 +55,7 @@ export class Messaging implements IMessaging {
this.sender.stop(); this.sender.stop();
} }
public send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId> { public send(wakuLikeMessage: ISendMessage): Promise<RequestId> {
return this.sender.send(wakuLikeMessage); return this.sender.send(wakuLikeMessage);
} }
} }

View File

@ -1,10 +1,14 @@
import { createDecoder, createEncoder } from "@waku/core"; import { createDecoder, createEncoder } from "@waku/core";
import { ILightPush, NetworkConfig } from "@waku/interfaces"; import {
ILightPush,
ISendMessage,
NetworkConfig,
RequestId
} from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils"; import { createRoutingInfo } from "@waku/utils";
import { AckManager } from "./ack_manager.js"; import { AckManager } from "./ack_manager.js";
import type { MessageStore } from "./message_store.js"; import type { MessageStore } from "./message_store.js";
import type { RequestId, WakuLikeMessage } from "./utils.js";
type SenderConstructorParams = { type SenderConstructorParams = {
messageStore: MessageStore; messageStore: MessageStore;
@ -41,11 +45,11 @@ export class Sender {
} }
} }
public async send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId> { public async send(message: ISendMessage): Promise<RequestId> {
const requestId = await this.messageStore.queue(wakuLikeMessage); const requestId = await this.messageStore.queue(message);
await this.ackManager.subscribe(wakuLikeMessage.contentTopic); await this.ackManager.subscribe(message.contentTopic);
await this.sendMessage(requestId, wakuLikeMessage); await this.sendMessage(requestId, message);
return requestId; return requestId;
} }
@ -60,7 +64,7 @@ export class Sender {
private async sendMessage( private async sendMessage(
requestId: RequestId, requestId: RequestId,
message: WakuLikeMessage message: ISendMessage
): Promise<void> { ): Promise<void> {
try { try {
if (this.processingRequests.has(requestId)) { if (this.processingRequests.has(requestId)) {

View File

@ -1,13 +1,3 @@
export type RequestId = string;
// todo: make it IMessage type
export type WakuLikeMessage = {
contentTopic: string;
payload: Uint8Array;
ephemeral?: boolean;
rateLimitProof?: boolean;
};
export interface IAckManager { export interface IAckManager {
start(): void; start(): void;
stop(): void; stop(): void;

View File

@ -5,18 +5,11 @@ import {
TypedEventEmitter TypedEventEmitter
} from "@libp2p/interface"; } from "@libp2p/interface";
import type { MultiaddrInput } from "@multiformats/multiaddr"; import type { MultiaddrInput } from "@multiformats/multiaddr";
import { import { ConnectionManager, createDecoder, createEncoder } from "@waku/core";
ConnectionManager,
createCodec,
createDecoder,
createEncoder
} from "@waku/core";
import type { import type {
CreateCodecParams,
CreateDecoderParams, CreateDecoderParams,
CreateEncoderParams, CreateEncoderParams,
CreateNodeOptions, CreateNodeOptions,
ICodec,
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IEncoder, IEncoder,
@ -33,7 +26,9 @@ import type {
import { import {
DefaultNetworkConfig, DefaultNetworkConfig,
HealthStatus, HealthStatus,
Protocols ISendMessage,
Protocols,
RequestId
} from "@waku/interfaces"; } from "@waku/interfaces";
import { createRoutingInfo, Logger } from "@waku/utils"; import { createRoutingInfo, Logger } from "@waku/utils";
@ -41,7 +36,6 @@ import { Filter } from "../filter/index.js";
import { HealthIndicator } from "../health_indicator/index.js"; import { HealthIndicator } from "../health_indicator/index.js";
import { LightPush } from "../light_push/index.js"; import { LightPush } from "../light_push/index.js";
import { Messaging } from "../messaging/index.js"; import { Messaging } from "../messaging/index.js";
import type { RequestId, WakuLikeMessage } from "../messaging/index.js";
import { PeerManager } from "../peer_manager/index.js"; import { PeerManager } from "../peer_manager/index.js";
import { Store } from "../store/index.js"; import { Store } from "../store/index.js";
@ -303,25 +297,12 @@ export class WakuNode implements IWaku {
}); });
} }
public send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId> { public send(message: ISendMessage): Promise<RequestId> {
if (!this.messaging) { if (!this.messaging) {
throw new Error("Messaging not initialized"); throw new Error("Messaging not initialized");
} }
return this.messaging.send(wakuLikeMessage); return this.messaging.send(message);
}
public createCodec(params: CreateCodecParams): ICodec<IDecodedMessage> {
const routingInfo = this.createRoutingInfo(
params.contentTopic,
params.shardId
);
return createCodec({
contentTopic: params.contentTopic,
ephemeral: params.ephemeral ?? false,
routingInfo: routingInfo
});
} }
private createRoutingInfo( private createRoutingInfo(

View File

@ -2,11 +2,9 @@ import { Peer, PeerId, Stream, TypedEventEmitter } from "@libp2p/interface";
import { MultiaddrInput } from "@multiformats/multiaddr"; import { MultiaddrInput } from "@multiformats/multiaddr";
import { import {
Callback, Callback,
CreateCodecParams,
CreateDecoderParams, CreateDecoderParams,
CreateEncoderParams, CreateEncoderParams,
HealthStatus, HealthStatus,
ICodec,
IDecodedMessage, IDecodedMessage,
IDecoder, IDecoder,
IEncoder, IEncoder,
@ -14,13 +12,15 @@ import {
ILightPush, ILightPush,
type IMessage, type IMessage,
IRelay, IRelay,
ISendMessage,
ISendOptions, ISendOptions,
IStore, IStore,
IWaku, IWaku,
IWakuEventEmitter, IWakuEventEmitter,
Libp2p, Libp2p,
LightPushSDKResult, LightPushSDKResult,
Protocols Protocols,
RequestId
} from "@waku/interfaces"; } from "@waku/interfaces";
export type MockWakuEvents = { export type MockWakuEvents = {
@ -156,13 +156,7 @@ export class MockWakuNode implements IWaku {
public createEncoder(_params: CreateEncoderParams): IEncoder { public createEncoder(_params: CreateEncoderParams): IEncoder {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }
public createCodec(_params: CreateCodecParams): ICodec<IDecodedMessage> { public send(_message: ISendMessage): Promise<RequestId> {
throw new Error("Method not implemented.");
}
public send(
_codec: ICodec<IDecodedMessage>,
_message: IMessage
): Promise<string> {
throw new Error("Method not implemented."); throw new Error("Method not implemented.");
} }
public isStarted(): boolean { public isStarted(): boolean {