implement send on waku

This commit is contained in:
Sasha 2025-09-30 00:12:43 +02:00
parent 9e7719e519
commit ad675d8174
No known key found for this signature in database
7 changed files with 76 additions and 36 deletions

View File

@ -10,7 +10,13 @@ import type { IFilter } from "./filter.js";
import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { ICodec, IDecodedMessage, IDecoder, IEncoder } from "./message.js";
import {
ICodec,
IDecodedMessage,
IDecoder,
IEncoder,
IMessage
} from "./message.js";
import type { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { ShardId } from "./sharding.js";
@ -300,6 +306,15 @@ export interface IWaku {
*/
createCodec(params: CreateCodecParams): ICodec<IDecodedMessage>;
/**
* Sends a message to the Waku network.
*
* @param {ICodec<IDecodedMessage>} codec - The codec to use for encoding the message
* @param {IMessage} message - The message to send
* @returns {Promise<string>} A promise that resolves to the request ID
*/
send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<string>;
/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/

View File

@ -76,7 +76,7 @@ class FilterAckManager implements IAckManager {
private async onMessage(message: IDecodedMessage): Promise<void> {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
this.messageStore.add(message, { filterAck: true });
}
this.messageStore.markFilterAck(message.hashStr);
@ -123,7 +123,7 @@ class StoreAckManager implements IAckManager {
[codec],
(message) => {
if (!this.messageStore.has(message.hashStr)) {
this.messageStore.add(message);
this.messageStore.add(message, { storeAck: true });
}
this.messageStore.markStoreAck(message.hashStr);

View File

@ -1,8 +1,8 @@
import { messageHashStr } from "@waku/core";
import { IDecodedMessage, IEncoder, IMessage } from "@waku/interfaces";
import { ICodec, IDecodedMessage, IMessage } from "@waku/interfaces";
type QueuedMessage = {
encoder?: IEncoder;
codec?: ICodec<IDecodedMessage>;
message?: IMessage;
filterAck: boolean;
storeAck: boolean;
@ -10,6 +10,11 @@ type QueuedMessage = {
createdAt: number;
};
type AddMessageOptions = {
filterAck?: boolean;
storeAck?: boolean;
};
type MessageStoreOptions = {
resendIntervalMs?: number;
};
@ -30,11 +35,11 @@ export class MessageStore {
return this.messages.has(hashStr);
}
public add(message: IDecodedMessage): void {
public add(message: IDecodedMessage, options: AddMessageOptions = {}): void {
if (!this.messages.has(message.hashStr)) {
this.messages.set(message.hashStr, {
filterAck: false,
storeAck: false,
filterAck: options.filterAck ?? false,
storeAck: options.storeAck ?? false,
createdAt: Date.now()
});
}
@ -57,7 +62,7 @@ export class MessageStore {
public async markSent(requestId: RequestId): Promise<void> {
const entry = this.pendingRequests.get(requestId);
if (!entry || !entry.encoder || !entry.message) {
if (!entry || !entry.codec || !entry.message) {
return;
}
@ -65,13 +70,13 @@ export class MessageStore {
entry.lastSentAt = Date.now();
this.pendingRequests.delete(requestId);
const proto = await entry.encoder.toProtoObj(entry.message);
const proto = await entry.codec.toProtoObj(entry.message);
if (!proto) {
return;
}
const hashStr = messageHashStr(entry.encoder.pubsubTopic, proto);
const hashStr = messageHashStr(entry.codec.pubsubTopic, proto);
this.messages.set(hashStr, entry);
} catch (error) {
@ -80,11 +85,14 @@ export class MessageStore {
}
}
public async queue(encoder: IEncoder, message: IMessage): Promise<RequestId> {
public async queue(
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<RequestId> {
const requestId = crypto.randomUUID();
this.pendingRequests.set(requestId, {
encoder,
codec,
message,
filterAck: false,
storeAck: false,
@ -96,25 +104,21 @@ export class MessageStore {
public getMessagesToSend(): Array<{
requestId: string;
encoder: IEncoder;
codec: ICodec<IDecodedMessage>;
message: IMessage;
}> {
const now = Date.now();
const res: Array<{
requestId: string;
encoder: IEncoder;
codec: ICodec<IDecodedMessage>;
message: IMessage;
}> = [];
for (const [requestId, entry] of this.pendingRequests.entries()) {
if (!entry.encoder || !entry.message) {
continue;
}
const isAcknowledged = entry.filterAck || entry.storeAck;
const isAcknowledged = entry.filterAck || entry.storeAck; // TODO: make sure it works with message and pending requests and returns messages to re-sent that are not ack yet
if (isAcknowledged) {
if (!entry.codec || !entry.message || isAcknowledged) {
continue;
}
@ -122,7 +126,7 @@ export class MessageStore {
!entry.lastSentAt ||
now - entry.lastSentAt >= this.resendIntervalMs
) {
res.push({ requestId, encoder: entry.encoder, message: entry.message });
res.push({ requestId, codec: entry.codec, message: entry.message });
}
}

View File

@ -1,5 +1,6 @@
import {
IEncoder,
ICodec,
IDecodedMessage,
IFilter,
ILightPush,
IMessage,
@ -12,7 +13,7 @@ import { Sender } from "./sender.js";
import type { RequestId } from "./utils.js";
interface IMessaging {
send(encoder: IEncoder, message: IMessage): Promise<RequestId>;
send(codec: ICodec<IDecodedMessage>, message: IMessage): Promise<RequestId>;
}
type MessagingConstructorParams = {
@ -43,13 +44,18 @@ export class Messaging implements IMessaging {
public start(): void {
this.ackManager.start();
this.sender.start();
}
public async stop(): Promise<void> {
await this.ackManager.stop();
this.sender.stop();
}
public send(encoder: IEncoder, message: IMessage): Promise<string> {
return this.sender.send(encoder, message);
public send(
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<string> {
return this.sender.send(codec, message);
}
}

View File

@ -1,4 +1,9 @@
import { IEncoder, ILightPush, IMessage } from "@waku/interfaces";
import {
ICodec,
IDecodedMessage,
ILightPush,
IMessage
} from "@waku/interfaces";
import type { MessageStore } from "./message_store.js";
import type { RequestId } from "./utils.js";
@ -30,9 +35,12 @@ export class Sender {
}
}
public async send(encoder: IEncoder, message: IMessage): Promise<RequestId> {
const requestId = await this.messageStore.queue(encoder, message);
const response = await this.lightPush.send(encoder, message);
public async send(
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<RequestId> {
const requestId = await this.messageStore.queue(codec, message);
const response = await this.lightPush.send(codec, message);
if (response.successes.length > 0) {
await this.messageStore.markSent(requestId);
@ -44,10 +52,8 @@ export class Sender {
private async backgroundSend(): Promise<void> {
const pendingRequests = this.messageStore.getMessagesToSend();
// todo: implement chunking, error handling, retry, etc.
// todo: implement backoff and batching potentially
for (const { requestId, encoder, message } of pendingRequests) {
const response = await this.lightPush.send(encoder, message);
for (const { requestId, codec, message } of pendingRequests) {
const response = await this.lightPush.send(codec, message);
if (response.successes.length > 0) {
await this.messageStore.markSent(requestId);

View File

@ -303,12 +303,15 @@ export class WakuNode implements IWaku {
});
}
public send(encoder: IEncoder, message: IMessage): Promise<RequestId> {
public send(
codec: ICodec<IDecodedMessage>,
message: IMessage
): Promise<RequestId> {
if (!this.messaging) {
throw new Error("Messaging not initialized");
}
return this.messaging.send(encoder, message);
return this.messaging.send(codec, message);
}
public createCodec(params: CreateCodecParams): ICodec<IDecodedMessage> {

View File

@ -159,6 +159,12 @@ export class MockWakuNode implements IWaku {
public createCodec(_params: CreateCodecParams): ICodec<IDecodedMessage> {
throw new Error("Method not implemented.");
}
public send(
_codec: ICodec<IDecodedMessage>,
_message: IMessage
): Promise<string> {
throw new Error("Method not implemented.");
}
public isStarted(): boolean {
throw new Error("Method not implemented.");
}