move from encoder/decoder/codec to simple message parameter

This commit is contained in:
Sasha 2025-10-02 19:39:30 +02:00
parent 43851045cb
commit 7f98bb183d
No known key found for this signature in database
8 changed files with 131 additions and 91 deletions

View File

@ -10,13 +10,7 @@ import type { IFilter } from "./filter.js";
import type { HealthStatus } from "./health_status.js"; import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js"; import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js"; import type { ILightPush } from "./light_push.js";
import { import { ICodec, IDecodedMessage, IDecoder, IEncoder } from "./message.js";
ICodec,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage
} from "./message.js";
import type { Protocols } from "./protocols.js"; import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js"; import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js"; import type { ShardId } from "./sharding.js";
@ -313,7 +307,7 @@ export interface IWaku {
* @param {IMessage} message - The message to send * @param {IMessage} message - The message to send
* @returns {Promise<string>} A promise that resolves to the request ID * @returns {Promise<string>} A promise that resolves to the request ID
*/ */
send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<string>; // send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<string>;
/** /**
* @returns {boolean} `true` if the node was started and `false` otherwise * @returns {boolean} `true` if the node was started and `false` otherwise

View File

@ -1,4 +1,12 @@
import { ICodec, IDecodedMessage, IFilter, IStore } from "@waku/interfaces"; import { createDecoder } from "@waku/core";
import {
IDecodedMessage,
IDecoder,
IFilter,
IStore,
NetworkConfig
} from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { MessageStore } from "./message_store.js"; import { MessageStore } from "./message_store.js";
import { IAckManager } from "./utils.js"; import { IAckManager } from "./utils.js";
@ -7,15 +15,18 @@ type AckManagerConstructorParams = {
messageStore: MessageStore; messageStore: MessageStore;
filter: IFilter; filter: IFilter;
store: IStore; store: IStore;
networkConfig: NetworkConfig;
}; };
export class AckManager implements IAckManager { export class AckManager implements IAckManager {
private readonly messageStore: MessageStore; private readonly messageStore: MessageStore;
private readonly filterAckManager: FilterAckManager; private readonly filterAckManager: FilterAckManager;
private readonly storeAckManager: StoreAckManager; private readonly storeAckManager: StoreAckManager;
private readonly networkConfig: NetworkConfig;
public constructor(params: AckManagerConstructorParams) { public constructor(params: AckManagerConstructorParams) {
this.messageStore = params.messageStore; this.messageStore = params.messageStore;
this.networkConfig = params.networkConfig;
this.filterAckManager = new FilterAckManager( this.filterAckManager = new FilterAckManager(
this.messageStore, this.messageStore,
@ -35,16 +46,23 @@ export class AckManager implements IAckManager {
this.storeAckManager.stop(); this.storeAckManager.stop();
} }
public async subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean> { public async subscribe(contentTopic: string): Promise<boolean> {
const decoder = createDecoder(
contentTopic,
createRoutingInfo(this.networkConfig, {
contentTopic
})
);
return ( return (
(await this.filterAckManager.subscribe(codec)) || (await this.filterAckManager.subscribe(decoder)) ||
(await this.storeAckManager.subscribe(codec)) (await this.storeAckManager.subscribe(decoder))
); );
} }
} }
class FilterAckManager implements IAckManager { class FilterAckManager {
private codecs: Set<ICodec<IDecodedMessage>> = new Set(); private decoders: Set<IDecoder<IDecodedMessage>> = new Set();
public constructor( public constructor(
private messageStore: MessageStore, private messageStore: MessageStore,
@ -56,20 +74,20 @@ class FilterAckManager implements IAckManager {
} }
public async stop(): Promise<void> { public async stop(): Promise<void> {
const promises = Array.from(this.codecs.entries()).map((codec) => const promises = Array.from(this.decoders.entries()).map((decoder) =>
this.filter.unsubscribe(codec) this.filter.unsubscribe(decoder)
); );
await Promise.all(promises); await Promise.all(promises);
this.codecs.clear(); this.decoders.clear();
} }
public async subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean> { public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
const success = await this.filter.subscribe( const success = await this.filter.subscribe(
codec, decoder,
this.onMessage.bind(this) this.onMessage.bind(this)
); );
if (success) { if (success) {
this.codecs.add(codec); this.decoders.add(decoder);
} }
return success; return success;
} }
@ -83,10 +101,10 @@ class FilterAckManager implements IAckManager {
} }
} }
class StoreAckManager implements IAckManager { class StoreAckManager {
private interval: ReturnType<typeof setInterval> | null = null; private interval: ReturnType<typeof setInterval> | null = null;
private codecs: Set<ICodec<IDecodedMessage>> = new Set(); private decoders: Set<IDecoder<IDecodedMessage>> = new Set();
public constructor( public constructor(
private messageStore: MessageStore, private messageStore: MessageStore,
@ -112,15 +130,15 @@ class StoreAckManager implements IAckManager {
this.interval = null; this.interval = null;
} }
public async subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean> { public async subscribe(decoder: IDecoder<IDecodedMessage>): Promise<boolean> {
this.codecs.add(codec); this.decoders.add(decoder);
return true; return true;
} }
private async query(): Promise<void> { private async query(): Promise<void> {
for (const codec of this.codecs) { for (const decoder of this.decoders) {
await this.store.queryWithOrderedCallback( await this.store.queryWithOrderedCallback(
[codec], [decoder],
(message) => { (message) => {
if (!this.messageStore.has(message.hashStr)) { if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message, { storeAck: true }); this.messageStore.add(message, { storeAck: true });

View File

@ -1,2 +1,3 @@
export { Messaging } from "./messaging.js"; export { Messaging } from "./messaging.js";
export type { RequestId } from "./utils.js"; // todo: do not export this
export type { RequestId, WakuLikeMessage } from "./utils.js";

View File

@ -1,10 +1,10 @@
import { ICodec, IDecodedMessage, IMessage } from "@waku/interfaces"; import { IDecodedMessage } from "@waku/interfaces";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { WakuLikeMessage } from "./utils.js";
type QueuedMessage = { type QueuedMessage = {
codec?: ICodec<IDecodedMessage>; messageRequest?: WakuLikeMessage;
messageRequest?: IMessage;
sentMessage?: IMessage;
filterAck: boolean; filterAck: boolean;
storeAck: boolean; storeAck: boolean;
lastSentAt?: number; lastSentAt?: number;
@ -65,23 +65,18 @@ export class MessageStore {
): Promise<void> { ): Promise<void> {
const entry = this.pendingRequests.get(requestId); const entry = this.pendingRequests.get(requestId);
if (!entry || !entry.codec || !entry.messageRequest) { if (!entry || !entry.messageRequest) {
return; return;
} }
entry.lastSentAt = Number(sentMessage.timestamp); entry.lastSentAt = Number(sentMessage.timestamp);
entry.sentMessage = sentMessage;
this.pendingMessages.set(sentMessage.hashStr, requestId); this.pendingMessages.set(sentMessage.hashStr, requestId);
} }
public async queue( public async queue(message: WakuLikeMessage): Promise<RequestId> {
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<RequestId> {
const requestId = uuidv4(); const requestId = uuidv4();
this.pendingRequests.set(requestId.toString(), { this.pendingRequests.set(requestId.toString(), {
codec,
messageRequest: message, messageRequest: message,
filterAck: false, filterAck: false,
storeAck: false, storeAck: false,
@ -93,19 +88,17 @@ export class MessageStore {
public getMessagesToSend(): Array<{ public getMessagesToSend(): Array<{
requestId: string; requestId: string;
codec: ICodec<IDecodedMessage>; message: WakuLikeMessage;
message: IMessage;
}> { }> {
const res: Array<{ const res: Array<{
requestId: string; requestId: string;
codec: ICodec<IDecodedMessage>; message: WakuLikeMessage;
message: IMessage;
}> = []; }> = [];
for (const [requestId, entry] of this.pendingRequests.entries()) { for (const [requestId, entry] of this.pendingRequests.entries()) {
const isAcknowledged = entry.filterAck || entry.storeAck; const isAcknowledged = entry.filterAck || entry.storeAck;
if (!entry.codec || !entry.messageRequest || isAcknowledged) { if (!entry.messageRequest || isAcknowledged) {
continue; continue;
} }
@ -118,7 +111,6 @@ export class MessageStore {
if (notSent || notAcknowledged) { if (notSent || notAcknowledged) {
res.push({ res.push({
requestId, requestId,
codec: entry.codec,
message: entry.messageRequest message: entry.messageRequest
}); });
} }

View File

@ -1,25 +1,19 @@
import { import { IFilter, ILightPush, IStore, NetworkConfig } from "@waku/interfaces";
ICodec,
IDecodedMessage,
IFilter,
ILightPush,
IMessage,
IStore
} from "@waku/interfaces";
import { AckManager } from "./ack_manager.js"; import { AckManager } from "./ack_manager.js";
import { MessageStore } from "./message_store.js"; import { MessageStore } from "./message_store.js";
import { Sender } from "./sender.js"; import { Sender } from "./sender.js";
import type { RequestId } from "./utils.js"; import type { RequestId, WakuLikeMessage } from "./utils.js";
interface IMessaging { interface IMessaging {
send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<RequestId>; send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId>;
} }
type MessagingConstructorParams = { type MessagingConstructorParams = {
lightPush: ILightPush; lightPush: ILightPush;
filter: IFilter; filter: IFilter;
store: IStore; store: IStore;
networkConfig: NetworkConfig;
}; };
export class Messaging implements IMessaging { export class Messaging implements IMessaging {
@ -33,13 +27,15 @@ export class Messaging implements IMessaging {
this.ackManager = new AckManager({ this.ackManager = new AckManager({
messageStore: this.messageStore, messageStore: this.messageStore,
filter: params.filter, filter: params.filter,
store: params.store store: params.store,
networkConfig: params.networkConfig
}); });
this.sender = new Sender({ this.sender = new Sender({
messageStore: this.messageStore, messageStore: this.messageStore,
lightPush: params.lightPush, lightPush: params.lightPush,
ackManager: this.ackManager ackManager: this.ackManager,
networkConfig: params.networkConfig
}); });
} }
@ -53,10 +49,7 @@ export class Messaging implements IMessaging {
this.sender.stop(); this.sender.stop();
} }
public send( public send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId> {
codec: ICodec<IDecodedMessage>, return this.sender.send(wakuLikeMessage);
message: IMessage
): Promise<string> {
return this.sender.send(codec, message);
} }
} }

View File

@ -1,25 +1,28 @@
import { createDecoder, createEncoder } from "@waku/core";
import { import {
ICodec,
IDecodedMessage, IDecodedMessage,
ILightPush, ILightPush,
IMessage, IProtoMessage,
IProtoMessage NetworkConfig
} from "@waku/interfaces"; } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { AckManager } from "./ack_manager.js"; import { AckManager } from "./ack_manager.js";
import type { MessageStore } from "./message_store.js"; import type { MessageStore } from "./message_store.js";
import type { RequestId } from "./utils.js"; import type { RequestId, WakuLikeMessage } from "./utils.js";
type SenderConstructorParams = { type SenderConstructorParams = {
messageStore: MessageStore; messageStore: MessageStore;
lightPush: ILightPush; lightPush: ILightPush;
ackManager: AckManager; ackManager: AckManager;
networkConfig: NetworkConfig;
}; };
export class Sender { export class Sender {
private readonly messageStore: MessageStore; private readonly messageStore: MessageStore;
private readonly lightPush: ILightPush; private readonly lightPush: ILightPush;
private readonly ackManager: AckManager; private readonly ackManager: AckManager;
private readonly networkConfig: NetworkConfig;
private sendInterval: ReturnType<typeof setInterval> | null = null; private sendInterval: ReturnType<typeof setInterval> | null = null;
@ -27,6 +30,7 @@ export class Sender {
this.messageStore = params.messageStore; this.messageStore = params.messageStore;
this.lightPush = params.lightPush; this.lightPush = params.lightPush;
this.ackManager = params.ackManager; this.ackManager = params.ackManager;
this.networkConfig = params.networkConfig;
} }
public start(): void { public start(): void {
@ -40,20 +44,36 @@ export class Sender {
} }
} }
public async send( public async send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId> {
codec: ICodec<IDecodedMessage>, const requestId = await this.messageStore.queue(wakuLikeMessage);
message: IMessage
): Promise<RequestId> {
const requestId = await this.messageStore.queue(codec, message);
await this.ackManager.subscribe(codec); await this.ackManager.subscribe(wakuLikeMessage.contentTopic);
const response = await this.lightPush.send(codec, message); // todo: add to light push return of proto message or decoded message const encoder = createEncoder({
contentTopic: wakuLikeMessage.contentTopic,
routingInfo: createRoutingInfo(this.networkConfig, {
contentTopic: wakuLikeMessage.contentTopic
}),
ephemeral: wakuLikeMessage.ephemeral
});
const decoder = createDecoder(
wakuLikeMessage.contentTopic,
createRoutingInfo(this.networkConfig, {
contentTopic: wakuLikeMessage.contentTopic
})
);
const response = await this.lightPush.send(encoder, {
payload: wakuLikeMessage.payload
}); // todo: add to light push return of proto message or decoded message
if (response.successes.length > 0) { if (response.successes.length > 0) {
const protoObj = await codec.toProtoObj(message); const protoObj = await encoder.toProtoObj({
const decodedMessage = await codec.fromProtoObj( payload: wakuLikeMessage.payload
codec.pubsubTopic, });
const decodedMessage = await decoder.fromProtoObj(
decoder.pubsubTopic,
protoObj as IProtoMessage protoObj as IProtoMessage
); );
@ -66,13 +86,32 @@ export class Sender {
private async backgroundSend(): Promise<void> { private async backgroundSend(): Promise<void> {
const pendingRequests = this.messageStore.getMessagesToSend(); const pendingRequests = this.messageStore.getMessagesToSend();
for (const { requestId, codec, message } of pendingRequests) { for (const { requestId, message } of pendingRequests) {
const response = await this.lightPush.send(codec, message); const encoder = createEncoder({
contentTopic: message.contentTopic,
routingInfo: createRoutingInfo(this.networkConfig, {
contentTopic: message.contentTopic
}),
ephemeral: message.ephemeral
});
const decoder = createDecoder(
message.contentTopic,
createRoutingInfo(this.networkConfig, {
contentTopic: message.contentTopic
})
);
const response = await this.lightPush.send(encoder, {
payload: message.payload
});
if (response.successes.length > 0) { if (response.successes.length > 0) {
const protoObj = await codec.toProtoObj(message); const protoObj = await encoder.toProtoObj({
const decodedMessage = await codec.fromProtoObj( payload: message.payload
codec.pubsubTopic, });
const decodedMessage = await decoder.fromProtoObj(
decoder.pubsubTopic,
protoObj as IProtoMessage protoObj as IProtoMessage
); );

View File

@ -1,9 +1,15 @@
import { ICodec, IDecodedMessage } from "@waku/interfaces";
export type RequestId = string; export type RequestId = string;
// todo: make it IMessage type
export type WakuLikeMessage = {
contentTopic: string;
payload: Uint8Array;
ephemeral?: boolean;
rateLimitProof?: boolean;
};
export interface IAckManager { export interface IAckManager {
start(): void; start(): void;
stop(): void; stop(): void;
subscribe(codec: ICodec<IDecodedMessage>): Promise<boolean>; subscribe(contentTopic: string): Promise<boolean>;
} }

View File

@ -22,7 +22,6 @@ import type {
IEncoder, IEncoder,
IFilter, IFilter,
ILightPush, ILightPush,
IMessage,
IRelay, IRelay,
IRoutingInfo, IRoutingInfo,
IStore, IStore,
@ -42,7 +41,7 @@ import { Filter } from "../filter/index.js";
import { HealthIndicator } from "../health_indicator/index.js"; import { HealthIndicator } from "../health_indicator/index.js";
import { LightPush } from "../light_push/index.js"; import { LightPush } from "../light_push/index.js";
import { Messaging } from "../messaging/index.js"; import { Messaging } from "../messaging/index.js";
import type { RequestId } from "../messaging/index.js"; import type { RequestId, WakuLikeMessage } from "../messaging/index.js";
import { PeerManager } from "../peer_manager/index.js"; import { PeerManager } from "../peer_manager/index.js";
import { Store } from "../store/index.js"; import { Store } from "../store/index.js";
@ -141,7 +140,8 @@ export class WakuNode implements IWaku {
this.messaging = new Messaging({ this.messaging = new Messaging({
lightPush: this.lightPush, lightPush: this.lightPush,
filter: this.filter, filter: this.filter,
store: this.store store: this.store,
networkConfig: this.networkConfig
}); });
} }
@ -303,15 +303,12 @@ export class WakuNode implements IWaku {
}); });
} }
public send( public send(wakuLikeMessage: WakuLikeMessage): Promise<RequestId> {
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<RequestId> {
if (!this.messaging) { if (!this.messaging) {
throw new Error("Messaging not initialized"); throw new Error("Messaging not initialized");
} }
return this.messaging.send(codec, message); return this.messaging.send(wakuLikeMessage);
} }
public createCodec(params: CreateCodecParams): ICodec<IDecodedMessage> { public createCodec(params: CreateCodecParams): ICodec<IDecodedMessage> {