Merge pull request #2322 from waku-org/feat/sds-command-queue

feat: add command queue architecture and improve message handling
This commit is contained in:
Arseniy Klempner 2025-06-13 16:47:38 -07:00 committed by GitHub
commit a0fc9e05d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 592 additions and 260 deletions

View File

@ -1,5 +1,5 @@
import { hashN } from "./nim_hashn/nim_hashn.mjs";
import { getMOverNBitsForK } from "./probabilities.js";
import { hashN } from "../nim_hashn/nim_hashn.mjs";
import { getMOverNBitsForK } from "../probabilities.js";
export interface BloomFilterOptions {
// The expected maximum number of elements for which this BloomFilter is sized.

View File

@ -1,3 +1,17 @@
import { BloomFilter } from "./bloom.js";
import { BloomFilter } from "./bloom_filter/bloom.js";
export {
MessageChannel,
MessageChannelEvent,
encodeMessage,
decodeMessage
} from "./message_channel/index.js";
export type {
Message,
HistoryEntry,
ChannelId,
MessageChannelEvents
} from "./message_channel/index.js";
export { BloomFilter };

View File

@ -0,0 +1,33 @@
import type { Message } from "./events.js";
export enum Command {
Send = "send",
Receive = "receive",
SendEphemeral = "sendEphemeral"
}
export interface ParamsByAction {
[Command.Send]: {
payload: Uint8Array;
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>;
};
[Command.Receive]: {
message: Message;
};
[Command.SendEphemeral]: {
payload: Uint8Array;
callback?: (message: Message) => Promise<boolean>;
};
}
export type Task<A extends Command = Command> = {
command: A;
params: ParamsByAction[A];
};
export type Handlers = {
[A in Command]: (params: ParamsByAction[A]) => Promise<void>;
};

View File

@ -0,0 +1,41 @@
import { proto_sds_message } from "@waku/proto";
export enum MessageChannelEvent {
MessageSent = "messageSent",
MessageDelivered = "messageDelivered",
MessageReceived = "messageReceived",
MessageAcknowledged = "messageAcknowledged",
PartialAcknowledgement = "partialAcknowledgement",
MissedMessages = "missedMessages",
SyncSent = "syncSent",
SyncReceived = "syncReceived"
}
export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export function encodeMessage(message: Message): Uint8Array {
return proto_sds_message.SdsMessage.encode(message);
}
export function decodeMessage(data: Uint8Array): Message {
return proto_sds_message.SdsMessage.decode(data);
}
export type MessageChannelEvents = {
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
[MessageChannelEvent.MessageDelivered]: CustomEvent<{
messageId: string;
sentOrReceived: "sent" | "received";
}>;
[MessageChannelEvent.MessageReceived]: CustomEvent<Message>;
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<string>;
[MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{
messageId: string;
count: number;
}>;
[MessageChannelEvent.MissedMessages]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.SyncSent]: CustomEvent<Message>;
[MessageChannelEvent.SyncReceived]: CustomEvent<Message>;
};

View File

@ -0,0 +1,3 @@
export * from "./command_queue.js";
export * from "./events.js";
export * from "./message_channel.js";

View File

@ -1,14 +1,13 @@
import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { DefaultBloomFilter } from "./bloom.js";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { HistoryEntry, Message } from "./events.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
HistoryEntry,
Message,
MessageChannel,
MessageChannelEvent
} from "./sds.js";
MessageChannel
} from "./message_channel.js";
const channelId = "test-channel";
const callback = (_message: Message): Promise<{ success: boolean }> => {
@ -28,6 +27,23 @@ const messagesB = [
"message-7"
];
const sendMessage = async (
channel: MessageChannel,
payload: Uint8Array,
callback: (message: Message) => Promise<{ success: boolean }>
): Promise<void> => {
await channel.sendMessage(payload, callback);
await channel.processTasks();
};
const receiveMessage = async (
channel: MessageChannel,
message: Message
): Promise<void> => {
channel.receiveMessage(message);
await channel.processTasks();
};
describe("MessageChannel", function () {
this.timeout(5000);
let channelA: MessageChannel;
@ -40,21 +56,21 @@ describe("MessageChannel", function () {
it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp;
await channelA.sendMessage(new Uint8Array(), callback);
await sendMessage(channelA, new Uint8Array(), callback);
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1);
});
it("should push the message to the outgoing buffer", async () => {
const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
await channelA.sendMessage(new Uint8Array(), callback);
await sendMessage(channelA, new Uint8Array(), callback);
const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
});
it("should insert message into bloom filter", async () => {
const messageId = MessageChannel.getMessageId(new Uint8Array());
await channelA.sendMessage(new Uint8Array(), callback);
await sendMessage(channelA, new Uint8Array(), callback);
const bloomFilter = getBloomFilter(channelA);
expect(bloomFilter.lookup(messageId)).to.equal(true);
});
@ -62,7 +78,7 @@ describe("MessageChannel", function () {
it("should insert message id into causal history", async () => {
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
const messageId = MessageChannel.getMessageId(new Uint8Array());
await channelA.sendMessage(new Uint8Array(), callback);
await sendMessage(channelA, new Uint8Array(), callback);
const messageIdLog = (channelA as any).localHistory as {
timestamp: number;
historyEntry: HistoryEntry;
@ -87,7 +103,7 @@ describe("MessageChannel", function () {
for (const message of messages) {
filterBytes.push(bloomFilter.toBytes());
await channelA.sendMessage(utf8ToBytes(message), callback);
await sendMessage(channelA, utf8ToBytes(message), callback);
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
}
@ -123,9 +139,9 @@ describe("MessageChannel", function () {
it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp;
await channelB.sendMessage(new Uint8Array(), (message) => {
channelA.receiveMessage(message);
return Promise.resolve({ success: true });
await sendMessage(channelB, new Uint8Array(), async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1);
@ -133,12 +149,12 @@ describe("MessageChannel", function () {
it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback);
await sendMessage(channelA, utf8ToBytes(m), callback);
}
for (const m of messagesB) {
await channelB.sendMessage(utf8ToBytes(m), (message) => {
channelA.receiveMessage(message);
return Promise.resolve({ success: true });
await sendMessage(channelB, utf8ToBytes(m), async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
}
const timestampAfter = (channelA as any).lamportTimestamp;
@ -148,20 +164,20 @@ describe("MessageChannel", function () {
it("should maintain proper timestamps if all messages received", async () => {
let timestamp = 0;
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
timestamp++;
channelB.receiveMessage(message);
await receiveMessage(channelB, message);
expect((channelB as any).lamportTimestamp).to.equal(timestamp);
return Promise.resolve({ success: true });
return { success: true };
});
}
for (const m of messagesB) {
await channelB.sendMessage(utf8ToBytes(m), (message) => {
await sendMessage(channelB, utf8ToBytes(m), async (message) => {
timestamp++;
channelA.receiveMessage(message);
await receiveMessage(channelA, message);
expect((channelA as any).lamportTimestamp).to.equal(timestamp);
return Promise.resolve({ success: true });
return { success: true };
});
}
@ -174,28 +190,32 @@ describe("MessageChannel", function () {
it("should add received messages to bloom filter", async () => {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
await receiveMessage(channelB, message);
const bloomFilter = getBloomFilter(channelB);
expect(bloomFilter.lookup(message.messageId)).to.equal(true);
return Promise.resolve({ success: true });
return { success: true };
});
}
});
it("should add to incoming buffer if dependencies are not met", async () => {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback);
await sendMessage(channelA, utf8ToBytes(m), callback);
}
let receivedMessage: Message | null = null;
const timestampBefore = (channelB as any).lamportTimestamp;
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
receivedMessage = message;
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
});
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
receivedMessage = message;
await receiveMessage(channelB, message);
return { success: true };
}
);
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
@ -227,33 +247,50 @@ describe("MessageChannel", function () {
it("should mark all messages in causal history as acknowledged", async () => {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
await receiveMessage(channelB, message);
return { success: true };
});
}
await channelA.processTasks();
await channelB.processTasks();
let notInHistory: Message | null = null;
await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => {
notInHistory = message;
return Promise.resolve({ success: true });
});
await sendMessage(
channelA,
utf8ToBytes("not-in-history"),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
await channelA.processTasks();
await channelB.processTasks();
expect((channelA as any).outgoingBuffer.length).to.equal(
messagesA.length + 1
);
await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelA.receiveMessage(message);
return Promise.resolve({ success: true });
});
await sendMessage(
channelB,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelA, message);
return { success: true };
}
);
await channelA.processTasks();
await channelB.processTasks();
// Since messagesA are in causal history of channel B's message
// they should be gone from channel A's outgoing buffer
// and notInHistory should still be in the outgoing buffer
// Channel B only includes the last causalHistorySize messages in its causal history
// Since B received message-1, message-2, and not-in-history (3 messages),
// and causalHistorySize is 3, it will only include the last 2 in its causal history
// So message-1 won't be acknowledged, only message-2 and not-in-history
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(1);
expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId);
// The remaining message should be message-1 (not acknowledged)
expect(outgoingBuffer[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
});
it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
@ -268,23 +305,24 @@ describe("MessageChannel", function () {
const messages = [...messagesA, ...messagesB.slice(0, -1)];
// Send messages to be received by channel B
for (const m of messages) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
await receiveMessage(channelB, message);
return { success: true };
});
}
// Send messages not received by channel B
for (const m of unacknowledgedMessages) {
await channelA.sendMessage(utf8ToBytes(m), callback);
await sendMessage(channelA, utf8ToBytes(m), callback);
}
// Channel B sends a message to channel A
await channelB.sendMessage(
await sendMessage(
channelB,
utf8ToBytes(messagesB[messagesB.length - 1]),
(message) => {
channelA.receiveMessage(message);
return Promise.resolve({ success: true });
async (message) => {
await receiveMessage(channelA, message);
return { success: true };
}
);
@ -316,9 +354,9 @@ describe("MessageChannel", function () {
// in the bloom filter as before, which should mark them as fully acknowledged in channel A
for (let i = 1; i < acknowledgementCount; i++) {
// Send messages until acknowledgement count is reached
await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => {
channelA.receiveMessage(message);
return Promise.resolve({ success: true });
await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
}
@ -349,13 +387,17 @@ describe("MessageChannel", function () {
it("should detect messages with missing dependencies", async () => {
const causalHistorySize = (channelA as any).causalHistorySize;
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback);
await sendMessage(channelA, utf8ToBytes(m), callback);
}
await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => {
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
});
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
@ -373,18 +415,29 @@ describe("MessageChannel", function () {
it("should deliver messages after dependencies are met", async () => {
const causalHistorySize = (channelA as any).causalHistorySize;
const sentMessages = new Array<Message>();
// First, send messages from A but DON'T deliver them to B yet
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
sentMessages.push(message);
return Promise.resolve({ success: true });
// Don't receive them at B yet - we want them to be missing dependencies
return { success: true };
});
}
await channelA.processTasks();
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
});
// Now send a message from A to B that depends on messagesA
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
await channelA.processTasks();
await channelB.processTasks();
// Message should be in incoming buffer waiting for dependencies
const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
expect(missingMessages[0].messageId).to.equal(
@ -394,10 +447,13 @@ describe("MessageChannel", function () {
let incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1);
sentMessages.forEach((m) => {
channelB.receiveMessage(m);
});
// Now deliver the missing dependencies
for (const m of sentMessages) {
await receiveMessage(channelB, m);
}
await channelB.processTasks();
// Sweep should now deliver the waiting message
const missingMessages2 = channelB.sweepIncomingBuffer();
expect(missingMessages2.length).to.equal(0);
@ -414,13 +470,17 @@ describe("MessageChannel", function () {
});
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback);
await sendMessage(channelA, utf8ToBytes(m), callback);
}
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
channelC.receiveMessage(message);
return Promise.resolve({ success: true });
});
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelC, message);
return { success: true };
}
);
const missingMessages = channelC.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize);
@ -444,10 +504,10 @@ describe("MessageChannel", function () {
it("should partition messages based on acknowledgement status", async () => {
const unacknowledgedMessages: Message[] = [];
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
unacknowledgedMessages.push(message);
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
await receiveMessage(channelB, message);
return { success: true };
});
}
@ -459,14 +519,15 @@ describe("MessageChannel", function () {
// Make sure messages sent by channel A are not in causal history
const causalHistorySize = (channelA as any).causalHistorySize;
for (const m of messagesB.slice(0, causalHistorySize)) {
await channelB.sendMessage(utf8ToBytes(m), callback);
await sendMessage(channelB, utf8ToBytes(m), callback);
}
await channelB.sendMessage(
await sendMessage(
channelB,
utf8ToBytes(messagesB[causalHistorySize]),
(message) => {
channelA.receiveMessage(message);
return Promise.resolve({ success: true });
async (message) => {
await receiveMessage(channelA, message);
return { success: true };
}
);
@ -486,9 +547,9 @@ describe("MessageChannel", function () {
});
it("should be sent with empty content", async () => {
await channelA.sendSyncMessage((message) => {
await channelA.sendSyncMessage(async (message) => {
expect(message.content?.length).to.equal(0);
return Promise.resolve(true);
return true;
});
});
@ -513,10 +574,10 @@ describe("MessageChannel", function () {
it("should be delivered but not added to local log or bloom filter", async () => {
const timestampBefore = (channelB as any).lamportTimestamp;
let expectedTimestamp: number | undefined;
await channelA.sendSyncMessage((message) => {
await channelA.sendSyncMessage(async (message) => {
expectedTimestamp = message.lamportTimestamp;
channelB.receiveMessage(message);
return Promise.resolve(true);
await receiveMessage(channelB, message);
return true;
});
const timestampAfter = (channelB as any).lamportTimestamp;
expect(timestampAfter).to.equal(expectedTimestamp);
@ -536,15 +597,15 @@ describe("MessageChannel", function () {
it("should update ack status of messages in outgoing buffer", async () => {
for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => {
channelB.receiveMessage(message);
return Promise.resolve({ success: true });
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
await receiveMessage(channelB, message);
return { success: true };
});
}
await channelB.sendSyncMessage((message) => {
channelA.receiveMessage(message);
return Promise.resolve(true);
await sendMessage(channelB, new Uint8Array(), async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
const causalHistorySize = (channelA as any).causalHistorySize;
@ -560,9 +621,9 @@ describe("MessageChannel", function () {
channelA = new MessageChannel(channelId);
});
it("should be sent without a timestamp, causal history, or bloom filter", () => {
it("should be sent without a timestamp, causal history, or bloom filter", async () => {
const timestampBefore = (channelA as any).lamportTimestamp;
channelA.sendEphemeralMessage(new Uint8Array(), (message) => {
await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => {
expect(message.lamportTimestamp).to.equal(undefined);
expect(message.causalHistory).to.deep.equal([]);
expect(message.bloomFilter).to.equal(undefined);
@ -577,33 +638,36 @@ describe("MessageChannel", function () {
});
it("should be delivered immediately if received", async () => {
let deliveredMessageId: string | undefined;
let sentMessage: Message | undefined;
const channelB = new MessageChannel(channelId);
const channelB = new MessageChannel(channelId, {
deliveredMessageCallback: (messageId) => {
deliveredMessageId = messageId;
}
});
// Track initial state
const localHistoryBefore = (channelB as any).localHistory.length;
const incomingBufferBefore = (channelB as any).incomingBuffer.length;
const timestampBefore = (channelB as any).lamportTimestamp;
const waitForMessageDelivered = new Promise<string>((resolve) => {
channelB.addEventListener(
MessageChannelEvent.MessageDelivered,
(event) => {
resolve(event.detail);
}
);
channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => {
sentMessage = message;
channelB.receiveMessage(message);
await channelA.sendEphemeralMessage(
utf8ToBytes(messagesA[0]),
async (message) => {
// Ephemeral messages should have no timestamp
expect(message.lamportTimestamp).to.be.undefined;
await receiveMessage(channelB, message);
return true;
});
});
}
);
await channelA.processTasks();
await channelB.processTasks();
const eventMessageId = await waitForMessageDelivered;
expect(deliveredMessageId).to.equal(sentMessage?.messageId);
expect(eventMessageId).to.equal(sentMessage?.messageId);
// Verify ephemeral message behavior:
// 1. Not added to local history
expect((channelB as any).localHistory.length).to.equal(
localHistoryBefore
);
// 2. Not added to incoming buffer
expect((channelB as any).incomingBuffer.length).to.equal(
incomingBufferBefore
);
// 3. Doesn't update lamport timestamp
expect((channelB as any).lamportTimestamp).to.equal(timestampBefore);
});
});
});

View File

@ -1,20 +1,18 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256";
import { bytesToHex } from "@noble/hashes/utils";
import { proto_sds_message } from "@waku/proto";
import { Logger } from "@waku/utils";
import { DefaultBloomFilter } from "./bloom.js";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
export enum MessageChannelEvent {
MessageDelivered = "messageDelivered"
}
type MessageChannelEvents = {
[MessageChannelEvent.MessageDelivered]: CustomEvent<string>;
};
export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
import {
ChannelId,
HistoryEntry,
Message,
MessageChannelEvent,
MessageChannelEvents
} from "./events.js";
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
capacity: 10000,
@ -24,27 +22,46 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes
const log = new Logger("sds:message-channel");
interface MessageChannelOptions {
causalHistorySize?: number;
receivedMessageTimeoutEnabled?: boolean;
receivedMessageTimeout?: number;
deliveredMessageCallback?: (messageId: string) => void;
}
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
private acknowledgements: Map<string, number>;
private incomingBuffer: Message[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
private channelId: ChannelId;
private causalHistorySize: number;
private acknowledgementCount: number;
private timeReceived: Map<string, number>;
private receivedMessageTimeoutEnabled: boolean;
private receivedMessageTimeout: number;
private deliveredMessageCallback?: (messageId: string) => void;
private tasks: Task[] = [];
private handlers: Handlers = {
[Command.Send]: async (
params: ParamsByAction[Command.Send]
): Promise<void> => {
await this._sendMessage(params.payload, params.callback);
},
[Command.Receive]: async (
params: ParamsByAction[Command.Receive]
): Promise<void> => {
this._receiveMessage(params.message);
},
[Command.SendEphemeral]: async (
params: ParamsByAction[Command.SendEphemeral]
): Promise<void> => {
await this._sendEphemeralMessage(params.payload, params.callback);
}
};
public constructor(
channelId: ChannelId,
@ -66,7 +83,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
options.receivedMessageTimeoutEnabled ?? false;
this.receivedMessageTimeout =
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
this.deliveredMessageCallback = options.deliveredMessageCallback;
}
public static getMessageId(payload: Uint8Array): string {
@ -74,20 +90,59 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
/**
* Send a message to the SDS channel.
* Processes all queued tasks sequentially to ensure proper message ordering.
*
* Increments the lamport timestamp, constructs a `Message` object
* with the given payload, and adds it to the outgoing buffer.
* This method should be called periodically by the library consumer to execute
* queued send/receive operations in the correct sequence.
*
* If the callback is successful, the message is also added to
* the bloom filter and message history. In the context of
* Waku, this likely means the message was published via
* light push or relay.
* @example
* ```typescript
* const channel = new MessageChannel("my-channel");
*
* See https://rfc.vac.dev/vac/raw/sds/#send-message
* // Queue some operations
* await channel.sendMessage(payload, callback);
* channel.receiveMessage(incomingMessage);
*
* @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
* // Process all queued operations
* await channel.processTasks();
* ```
*
* @throws Will emit a 'taskError' event if any task fails, but continues processing remaining tasks
*/
public async processTasks(): Promise<void> {
while (this.tasks.length > 0) {
const item = this.tasks.shift();
if (!item) {
continue;
}
await this.executeTask(item);
}
}
/**
* Queues a message to be sent on this channel.
*
* The message will be processed sequentially when processTasks() is called.
* This ensures proper lamport timestamp ordering and causal history tracking.
*
* @param payload - The message content as a byte array
* @param callback - Optional callback function called after the message is processed
* @returns Promise that resolves when the message is queued (not sent)
*
* @example
* ```typescript
* const channel = new MessageChannel("chat-room");
* const message = new TextEncoder().encode("Hello, world!");
*
* await channel.sendMessage(message, async (processedMessage) => {
* console.log("Message processed:", processedMessage.messageId);
* return { success: true };
* });
*
* // Actually send the message
* await channel.processTasks();
* ```
*/
public async sendMessage(
payload: Uint8Array,
@ -96,36 +151,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
retrievalHint?: Uint8Array;
}>
): Promise<void> {
this.lamportTimestamp++;
const messageId = MessageChannel.getMessageId(payload);
const message: Message = {
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: payload
};
this.outgoingBuffer.push(message);
if (callback) {
const { success, retrievalHint } = await callback(message);
if (success) {
this.filter.insert(messageId);
this.localHistory.push({
timestamp: this.lamportTimestamp,
historyEntry: {
messageId,
retrievalHint
}
});
this.tasks.push({
command: Command.Send,
params: {
payload,
callback
}
}
});
}
/**
@ -141,72 +173,58 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public sendEphemeralMessage(
public async sendEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => boolean
): void {
const message: Message = {
messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId,
content: payload,
lamportTimestamp: undefined,
causalHistory: [],
bloomFilter: undefined
};
if (callback) {
callback(message);
}
callback?: (message: Message) => Promise<boolean>
): Promise<void> {
this.tasks.push({
command: Command.SendEphemeral,
params: {
payload,
callback
}
});
}
/**
* Process a received SDS message for this channel.
* Queues a received message for processing.
*
* Review the acknowledgement status of messages in the outgoing buffer
* by inspecting the received message's bloom filter and causal history.
* Add the received message to the bloom filter.
* If the local history contains every message in the received message's
* causal history, deliver the message. Otherwise, add the message to the
* incoming buffer.
* The message will be processed when processTasks() is called, ensuring
* proper dependency resolution and causal ordering.
*
* See https://rfc.vac.dev/vac/raw/sds/#receive-message
* @param message - The message to receive and process
*
* @param message - The received SDS message.
* @example
* ```typescript
* const channel = new MessageChannel("chat-room");
*
* // Receive a message from the network
* channel.receiveMessage(incomingMessage);
*
* // Process the received message
* await channel.processTasks();
* ```
*/
public receiveMessage(message: Message): void {
if (!message.lamportTimestamp) {
// Messages with no timestamp are ephemeral messages and should be delivered immediately
this.deliverMessage(message);
return;
}
// review ack status
this.reviewAckStatus(message);
// add to bloom filter (skip for messages with empty content)
if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId);
}
// verify causal history
const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === historyEntry.messageId
)
);
if (!dependenciesMet) {
this.incomingBuffer.push(message);
this.timeReceived.set(message.messageId, Date.now());
} else {
this.deliverMessage(message);
}
this.tasks.push({
command: Command.Receive,
params: {
message
}
});
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
/**
* Processes messages in the incoming buffer, delivering those with satisfied dependencies.
*
* @returns Array of history entries for messages still missing dependencies
*/
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[];
missing: HistoryEntry[];
missing: Set<HistoryEntry>;
}>(
({ buffer, missing }, message) => {
// Check each message for missing dependencies
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
@ -215,9 +233,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
)
);
if (missingDependencies.length === 0) {
// Any message with no missing dependencies is delivered
// and removed from the buffer (implicitly by not adding it to the new incoming buffer)
this.deliverMessage(message);
this.safeSendEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
});
return { buffer, missing };
}
@ -232,18 +254,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return { buffer, missing };
}
}
// Any message with missing dependencies stays in the buffer
// and the missing message IDs are returned for processing.
missingDependencies.forEach((dependency) => {
missing.add(dependency);
});
return {
buffer: buffer.concat(message),
missing: missing.concat(missingDependencies)
missing
};
},
{ buffer: new Array<Message>(), missing: new Array<HistoryEntry>() }
{ buffer: new Array<Message>(), missing: new Set<HistoryEntry>() }
);
// Update the incoming buffer to only include messages with no missing dependencies
this.incomingBuffer = buffer;
return missing;
this.safeSendEvent(MessageChannelEvent.MissedMessages, {
detail: Array.from(missing)
});
return Array.from(missing);
}
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep
@ -251,7 +278,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
unacknowledged: Message[];
possiblyAcknowledged: Message[];
} {
// Partition all messages in the outgoing buffer into unacknowledged and possibly acknowledged messages
return this.outgoingBuffer.reduce<{
unacknowledged: Message[];
possiblyAcknowledged: Message[];
@ -285,7 +311,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
*
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public sendSyncMessage(
public async sendSyncMessage(
callback?: (message: Message) => Promise<boolean>
): Promise<boolean> {
this.lamportTimestamp++;
@ -304,15 +330,165 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
};
if (callback) {
return callback(message);
try {
await callback(message);
this.safeSendEvent(MessageChannelEvent.SyncSent, {
detail: message
});
return true;
} catch (error) {
log.error("Callback execution failed in sendSyncMessage:", error);
throw error;
}
}
return false;
}
private _receiveMessage(message: Message): void {
const isDuplicate =
message.content &&
message.content.length > 0 &&
this.timeReceived.has(message.messageId);
if (isDuplicate) {
return;
}
if (!message.lamportTimestamp) {
this.deliverMessage(message);
return;
}
if (message.content?.length === 0) {
this.safeSendEvent(MessageChannelEvent.SyncReceived, {
detail: message
});
} else {
this.safeSendEvent(MessageChannelEvent.MessageReceived, {
detail: message
});
}
this.reviewAckStatus(message);
if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId);
}
const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === historyEntry.messageId
)
);
if (!dependenciesMet) {
this.incomingBuffer.push(message);
this.timeReceived.set(message.messageId, Date.now());
} else {
this.deliverMessage(message);
this.safeSendEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
});
}
}
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
try {
const handler = this.handlers[item.command];
await handler(item.params as ParamsByAction[A]);
} catch (error) {
log.error(`Task execution failed for command ${item.command}:`, error);
this.dispatchEvent(
new CustomEvent("taskError", {
detail: { command: item.command, error, params: item.params }
})
);
}
}
private safeSendEvent<T extends MessageChannelEvent>(
event: T,
eventInit?: CustomEventInit
): void {
try {
this.dispatchEvent(new CustomEvent(event, eventInit));
} catch (error) {
log.error(`Failed to dispatch event ${event}:`, error);
}
}
private async _sendMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
): Promise<void> {
this.lamportTimestamp++;
const messageId = MessageChannel.getMessageId(payload);
const message: Message = {
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: payload
};
this.outgoingBuffer.push(message);
if (callback) {
try {
const { success, retrievalHint } = await callback(message);
if (success) {
this.filter.insert(messageId);
this.localHistory.push({
timestamp: this.lamportTimestamp,
historyEntry: {
messageId,
retrievalHint
}
});
this.timeReceived.set(messageId, Date.now());
this.safeSendEvent(MessageChannelEvent.MessageSent, {
detail: message
});
}
} catch (error) {
log.error("Callback execution failed in _sendMessage:", error);
throw error;
}
}
}
private async _sendEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
): Promise<void> {
const message: Message = {
messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId,
content: payload,
lamportTimestamp: undefined,
causalHistory: [],
bloomFilter: undefined
};
if (callback) {
try {
await callback(message);
} catch (error) {
log.error("Callback execution failed in _sendEphemeralMessage:", error);
throw error;
}
}
return Promise.resolve(false);
}
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message, retrievalHint?: Uint8Array): void {
this.notifyDeliveredMessage(message.messageId);
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
if (messageLamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = messageLamportTimestamp;
@ -352,17 +528,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// to determine the acknowledgement status of messages in the outgoing buffer.
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
private reviewAckStatus(receivedMessage: Message): void {
// the participant MUST mark all messages in the received causal_history as acknowledged.
receivedMessage.causalHistory.forEach(({ messageId }) => {
this.outgoingBuffer = this.outgoingBuffer.filter(
({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId
({ messageId: outgoingMessageId }) => {
if (outgoingMessageId !== messageId) {
return true;
}
this.safeSendEvent(MessageChannelEvent.MessageAcknowledged, {
detail: messageId
});
return false;
}
);
this.acknowledgements.delete(messageId);
if (!this.filter.lookup(messageId)) {
this.filter.insert(messageId);
}
});
// the participant MUST mark all messages included in the bloom_filter as possibly acknowledged
if (!receivedMessage.bloomFilter) {
return;
}
@ -380,6 +562,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
if (count < this.acknowledgementCount) {
this.acknowledgements.set(message.messageId, count);
this.safeSendEvent(MessageChannelEvent.PartialAcknowledgement, {
detail: {
messageId: message.messageId,
count
}
});
return true;
}
this.acknowledgements.delete(message.messageId);
@ -391,15 +579,4 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private getAcknowledgementCount(): number {
return 2;
}
private notifyDeliveredMessage(messageId: string): void {
if (this.deliveredMessageCallback) {
this.deliveredMessageCallback(messageId);
}
this.dispatchEvent(
new CustomEvent(MessageChannelEvent.MessageDelivered, {
detail: messageId
})
);
}
}