feat(sds)_: add command queue architecture and improve message handling

- Introduce command queue system for sequential task processing
- Add comprehensive event system for message lifecycle tracking
- Restructure codebase with separate bloom_filter directory
- Export encode/decode helpers for SDS proto messages
- Use Set for deduplication in missing message detection
- Fix sync message handling for empty content messages
- Always emit MissedMessages event even with empty array
- Improve duplicate message detection logic
This commit is contained in:
Arseniy Klempner 2025-06-03 14:46:12 -07:00
parent 4997440225
commit a2c3b2e6aa
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
8 changed files with 527 additions and 209 deletions

View File

@ -1,5 +1,5 @@
import { hashN } from "./nim_hashn/nim_hashn.mjs"; import { hashN } from "../nim_hashn/nim_hashn.mjs";
import { getMOverNBitsForK } from "./probabilities.js"; import { getMOverNBitsForK } from "../probabilities.js";
export interface BloomFilterOptions { export interface BloomFilterOptions {
// The expected maximum number of elements for which this BloomFilter is sized. // The expected maximum number of elements for which this BloomFilter is sized.

View File

@ -1,3 +1,17 @@
import { BloomFilter } from "./bloom.js"; import { BloomFilter } from "./bloom_filter/bloom.js";
export {
MessageChannel,
MessageChannelEvent,
encodeMessage,
decodeMessage
} from "./message_channel/index.js";
export type {
Message,
HistoryEntry,
ChannelId,
MessageChannelEvents
} from "./message_channel/index.js";
export { BloomFilter }; export { BloomFilter };

View File

@ -0,0 +1,33 @@
import type { Message } from "./events.js";
export enum Command {
Send = "send",
Receive = "receive",
SendEphemeral = "sendEphemeral"
}
export interface ParamsByAction {
[Command.Send]: {
payload: Uint8Array;
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>;
};
[Command.Receive]: {
message: Message;
};
[Command.SendEphemeral]: {
payload: Uint8Array;
callback?: (message: Message) => Promise<boolean>;
};
}
export type Task<A extends Command = Command> = {
command: A;
params: ParamsByAction[A];
};
export type Handlers = {
[A in Command]: (params: ParamsByAction[A]) => Promise<void>;
};

View File

@ -0,0 +1,41 @@
import { proto_sds_message } from "@waku/proto";
export enum MessageChannelEvent {
MessageSent = "messageSent",
MessageDelivered = "messageDelivered",
MessageReceived = "messageReceived",
MessageAcknowledged = "messageAcknowledged",
PartialAcknowledgement = "partialAcknowledgement",
MissedMessages = "missedMessages",
SyncSent = "syncSent",
SyncReceived = "syncReceived"
}
export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export function encodeMessage(message: Message): Uint8Array {
return proto_sds_message.SdsMessage.encode(message);
}
export function decodeMessage(data: Uint8Array): Message {
return proto_sds_message.SdsMessage.decode(data);
}
export type MessageChannelEvents = {
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
[MessageChannelEvent.MessageDelivered]: CustomEvent<{
messageId: string;
sentOrReceived: "sent" | "received";
}>;
[MessageChannelEvent.MessageReceived]: CustomEvent<Message>;
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<string>;
[MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{
messageId: string;
count: number;
}>;
[MessageChannelEvent.MissedMessages]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.SyncSent]: CustomEvent<Message>;
[MessageChannelEvent.SyncReceived]: CustomEvent<Message>;
};

View File

@ -0,0 +1,3 @@
export * from "./command_queue.js";
export * from "./events.js";
export * from "./message_channel.js";

View File

@ -1,14 +1,13 @@
import { utf8ToBytes } from "@waku/utils/bytes"; import { utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai"; import { expect } from "chai";
import { DefaultBloomFilter } from "./bloom.js"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { HistoryEntry, Message } from "./events.js";
import { import {
DEFAULT_BLOOM_FILTER_OPTIONS, DEFAULT_BLOOM_FILTER_OPTIONS,
HistoryEntry, MessageChannel
Message, } from "./message_channel.js";
MessageChannel,
MessageChannelEvent
} from "./sds.js";
const channelId = "test-channel"; const channelId = "test-channel";
const callback = (_message: Message): Promise<{ success: boolean }> => { const callback = (_message: Message): Promise<{ success: boolean }> => {
@ -28,6 +27,23 @@ const messagesB = [
"message-7" "message-7"
]; ];
const sendMessage = async (
channel: MessageChannel,
payload: Uint8Array,
callback: (message: Message) => Promise<{ success: boolean }>
): Promise<void> => {
await channel.sendMessage(payload, callback);
await channel.processTasks();
};
const receiveMessage = async (
channel: MessageChannel,
message: Message
): Promise<void> => {
channel.receiveMessage(message);
await channel.processTasks();
};
describe("MessageChannel", function () { describe("MessageChannel", function () {
this.timeout(5000); this.timeout(5000);
let channelA: MessageChannel; let channelA: MessageChannel;
@ -40,21 +56,21 @@ describe("MessageChannel", function () {
it("should increase lamport timestamp", async () => { it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = (channelA as any).lamportTimestamp;
await channelA.sendMessage(new Uint8Array(), callback); await sendMessage(channelA, new Uint8Array(), callback);
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1);
}); });
it("should push the message to the outgoing buffer", async () => { it("should push the message to the outgoing buffer", async () => {
const bufferLengthBefore = (channelA as any).outgoingBuffer.length; const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
await channelA.sendMessage(new Uint8Array(), callback); await sendMessage(channelA, new Uint8Array(), callback);
const bufferLengthAfter = (channelA as any).outgoingBuffer.length; const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
}); });
it("should insert message into bloom filter", async () => { it("should insert message into bloom filter", async () => {
const messageId = MessageChannel.getMessageId(new Uint8Array()); const messageId = MessageChannel.getMessageId(new Uint8Array());
await channelA.sendMessage(new Uint8Array(), callback); await sendMessage(channelA, new Uint8Array(), callback);
const bloomFilter = getBloomFilter(channelA); const bloomFilter = getBloomFilter(channelA);
expect(bloomFilter.lookup(messageId)).to.equal(true); expect(bloomFilter.lookup(messageId)).to.equal(true);
}); });
@ -62,7 +78,7 @@ describe("MessageChannel", function () {
it("should insert message id into causal history", async () => { it("should insert message id into causal history", async () => {
const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
const messageId = MessageChannel.getMessageId(new Uint8Array()); const messageId = MessageChannel.getMessageId(new Uint8Array());
await channelA.sendMessage(new Uint8Array(), callback); await sendMessage(channelA, new Uint8Array(), callback);
const messageIdLog = (channelA as any).localHistory as { const messageIdLog = (channelA as any).localHistory as {
timestamp: number; timestamp: number;
historyEntry: HistoryEntry; historyEntry: HistoryEntry;
@ -87,7 +103,7 @@ describe("MessageChannel", function () {
for (const message of messages) { for (const message of messages) {
filterBytes.push(bloomFilter.toBytes()); filterBytes.push(bloomFilter.toBytes());
await channelA.sendMessage(utf8ToBytes(message), callback); await sendMessage(channelA, utf8ToBytes(message), callback);
bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message)));
} }
@ -123,9 +139,9 @@ describe("MessageChannel", function () {
it("should increase lamport timestamp", async () => { it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = (channelA as any).lamportTimestamp;
await channelB.sendMessage(new Uint8Array(), (message) => { await sendMessage(channelB, new Uint8Array(), async (message) => {
channelA.receiveMessage(message); await receiveMessage(channelA, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1); expect(timestampAfter).to.equal(timestampBefore + 1);
@ -133,12 +149,12 @@ describe("MessageChannel", function () {
it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback); await sendMessage(channelA, utf8ToBytes(m), callback);
} }
for (const m of messagesB) { for (const m of messagesB) {
await channelB.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelB, utf8ToBytes(m), async (message) => {
channelA.receiveMessage(message); await receiveMessage(channelA, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
const timestampAfter = (channelA as any).lamportTimestamp; const timestampAfter = (channelA as any).lamportTimestamp;
@ -148,20 +164,20 @@ describe("MessageChannel", function () {
it("should maintain proper timestamps if all messages received", async () => { it("should maintain proper timestamps if all messages received", async () => {
let timestamp = 0; let timestamp = 0;
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
timestamp++; timestamp++;
channelB.receiveMessage(message); await receiveMessage(channelB, message);
expect((channelB as any).lamportTimestamp).to.equal(timestamp); expect((channelB as any).lamportTimestamp).to.equal(timestamp);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
for (const m of messagesB) { for (const m of messagesB) {
await channelB.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelB, utf8ToBytes(m), async (message) => {
timestamp++; timestamp++;
channelA.receiveMessage(message); await receiveMessage(channelA, message);
expect((channelA as any).lamportTimestamp).to.equal(timestamp); expect((channelA as any).lamportTimestamp).to.equal(timestamp);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
@ -174,28 +190,32 @@ describe("MessageChannel", function () {
it("should add received messages to bloom filter", async () => { it("should add received messages to bloom filter", async () => {
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
channelB.receiveMessage(message); await receiveMessage(channelB, message);
const bloomFilter = getBloomFilter(channelB); const bloomFilter = getBloomFilter(channelB);
expect(bloomFilter.lookup(message.messageId)).to.equal(true); expect(bloomFilter.lookup(message.messageId)).to.equal(true);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
}); });
it("should add to incoming buffer if dependencies are not met", async () => { it("should add to incoming buffer if dependencies are not met", async () => {
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback); await sendMessage(channelA, utf8ToBytes(m), callback);
} }
let receivedMessage: Message | null = null; let receivedMessage: Message | null = null;
const timestampBefore = (channelB as any).lamportTimestamp; const timestampBefore = (channelB as any).lamportTimestamp;
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await sendMessage(
receivedMessage = message; channelA,
channelB.receiveMessage(message); utf8ToBytes(messagesB[0]),
return Promise.resolve({ success: true }); async (message) => {
}); receivedMessage = message;
await receiveMessage(channelB, message);
return { success: true };
}
);
const incomingBuffer = (channelB as any).incomingBuffer as Message[]; const incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
@ -227,33 +247,50 @@ describe("MessageChannel", function () {
it("should mark all messages in causal history as acknowledged", async () => { it("should mark all messages in causal history as acknowledged", async () => {
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
channelB.receiveMessage(message); await receiveMessage(channelB, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
await channelA.processTasks();
await channelB.processTasks();
let notInHistory: Message | null = null; await sendMessage(
await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { channelA,
notInHistory = message; utf8ToBytes("not-in-history"),
return Promise.resolve({ success: true }); async (message) => {
}); await receiveMessage(channelB, message);
return { success: true };
}
);
await channelA.processTasks();
await channelB.processTasks();
expect((channelA as any).outgoingBuffer.length).to.equal( expect((channelA as any).outgoingBuffer.length).to.equal(
messagesA.length + 1 messagesA.length + 1
); );
await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await sendMessage(
channelA.receiveMessage(message); channelB,
return Promise.resolve({ success: true }); utf8ToBytes(messagesB[0]),
}); async (message) => {
await receiveMessage(channelA, message);
return { success: true };
}
);
await channelA.processTasks();
await channelB.processTasks();
// Since messagesA are in causal history of channel B's message // Channel B only includes the last causalHistorySize messages in its causal history
// they should be gone from channel A's outgoing buffer // Since B received message-1, message-2, and not-in-history (3 messages),
// and notInHistory should still be in the outgoing buffer // and causalHistorySize is 3, it will only include the last 2 in its causal history
// So message-1 won't be acknowledged, only message-2 and not-in-history
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(1); expect(outgoingBuffer.length).to.equal(1);
expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId); // The remaining message should be message-1 (not acknowledged)
expect(outgoingBuffer[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
);
}); });
it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
@ -268,23 +305,24 @@ describe("MessageChannel", function () {
const messages = [...messagesA, ...messagesB.slice(0, -1)]; const messages = [...messagesA, ...messagesB.slice(0, -1)];
// Send messages to be received by channel B // Send messages to be received by channel B
for (const m of messages) { for (const m of messages) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
channelB.receiveMessage(message); await receiveMessage(channelB, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
// Send messages not received by channel B // Send messages not received by channel B
for (const m of unacknowledgedMessages) { for (const m of unacknowledgedMessages) {
await channelA.sendMessage(utf8ToBytes(m), callback); await sendMessage(channelA, utf8ToBytes(m), callback);
} }
// Channel B sends a message to channel A // Channel B sends a message to channel A
await channelB.sendMessage( await sendMessage(
channelB,
utf8ToBytes(messagesB[messagesB.length - 1]), utf8ToBytes(messagesB[messagesB.length - 1]),
(message) => { async (message) => {
channelA.receiveMessage(message); await receiveMessage(channelA, message);
return Promise.resolve({ success: true }); return { success: true };
} }
); );
@ -316,9 +354,9 @@ describe("MessageChannel", function () {
// in the bloom filter as before, which should mark them as fully acknowledged in channel A // in the bloom filter as before, which should mark them as fully acknowledged in channel A
for (let i = 1; i < acknowledgementCount; i++) { for (let i = 1; i < acknowledgementCount; i++) {
// Send messages until acknowledgement count is reached // Send messages until acknowledgement count is reached
await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => {
channelA.receiveMessage(message); await receiveMessage(channelA, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
@ -349,13 +387,17 @@ describe("MessageChannel", function () {
it("should detect messages with missing dependencies", async () => { it("should detect messages with missing dependencies", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback); await sendMessage(channelA, utf8ToBytes(m), callback);
} }
await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => { await sendMessage(
channelB.receiveMessage(message); channelA,
return Promise.resolve({ success: true }); utf8ToBytes(messagesB[0]),
}); async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
const incomingBuffer = (channelB as any).incomingBuffer as Message[]; const incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
@ -373,18 +415,29 @@ describe("MessageChannel", function () {
it("should deliver messages after dependencies are met", async () => { it("should deliver messages after dependencies are met", async () => {
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
const sentMessages = new Array<Message>(); const sentMessages = new Array<Message>();
// First, send messages from A but DON'T deliver them to B yet
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
sentMessages.push(message); sentMessages.push(message);
return Promise.resolve({ success: true }); // Don't receive them at B yet - we want them to be missing dependencies
return { success: true };
}); });
} }
await channelA.processTasks();
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { // Now send a message from A to B that depends on messagesA
channelB.receiveMessage(message); await sendMessage(
return Promise.resolve({ success: true }); channelA,
}); utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelB, message);
return { success: true };
}
);
await channelA.processTasks();
await channelB.processTasks();
// Message should be in incoming buffer waiting for dependencies
const missingMessages = channelB.sweepIncomingBuffer(); const missingMessages = channelB.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize); expect(missingMessages.length).to.equal(causalHistorySize);
expect(missingMessages[0].messageId).to.equal( expect(missingMessages[0].messageId).to.equal(
@ -394,10 +447,13 @@ describe("MessageChannel", function () {
let incomingBuffer = (channelB as any).incomingBuffer as Message[]; let incomingBuffer = (channelB as any).incomingBuffer as Message[];
expect(incomingBuffer.length).to.equal(1); expect(incomingBuffer.length).to.equal(1);
sentMessages.forEach((m) => { // Now deliver the missing dependencies
channelB.receiveMessage(m); for (const m of sentMessages) {
}); await receiveMessage(channelB, m);
}
await channelB.processTasks();
// Sweep should now deliver the waiting message
const missingMessages2 = channelB.sweepIncomingBuffer(); const missingMessages2 = channelB.sweepIncomingBuffer();
expect(missingMessages2.length).to.equal(0); expect(missingMessages2.length).to.equal(0);
@ -414,13 +470,17 @@ describe("MessageChannel", function () {
}); });
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), callback); await sendMessage(channelA, utf8ToBytes(m), callback);
} }
await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { await sendMessage(
channelC.receiveMessage(message); channelA,
return Promise.resolve({ success: true }); utf8ToBytes(messagesB[0]),
}); async (message) => {
await receiveMessage(channelC, message);
return { success: true };
}
);
const missingMessages = channelC.sweepIncomingBuffer(); const missingMessages = channelC.sweepIncomingBuffer();
expect(missingMessages.length).to.equal(causalHistorySize); expect(missingMessages.length).to.equal(causalHistorySize);
@ -444,10 +504,10 @@ describe("MessageChannel", function () {
it("should partition messages based on acknowledgement status", async () => { it("should partition messages based on acknowledgement status", async () => {
const unacknowledgedMessages: Message[] = []; const unacknowledgedMessages: Message[] = [];
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
unacknowledgedMessages.push(message); unacknowledgedMessages.push(message);
channelB.receiveMessage(message); await receiveMessage(channelB, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
@ -459,14 +519,15 @@ describe("MessageChannel", function () {
// Make sure messages sent by channel A are not in causal history // Make sure messages sent by channel A are not in causal history
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
for (const m of messagesB.slice(0, causalHistorySize)) { for (const m of messagesB.slice(0, causalHistorySize)) {
await channelB.sendMessage(utf8ToBytes(m), callback); await sendMessage(channelB, utf8ToBytes(m), callback);
} }
await channelB.sendMessage( await sendMessage(
channelB,
utf8ToBytes(messagesB[causalHistorySize]), utf8ToBytes(messagesB[causalHistorySize]),
(message) => { async (message) => {
channelA.receiveMessage(message); await receiveMessage(channelA, message);
return Promise.resolve({ success: true }); return { success: true };
} }
); );
@ -486,9 +547,9 @@ describe("MessageChannel", function () {
}); });
it("should be sent with empty content", async () => { it("should be sent with empty content", async () => {
await channelA.sendSyncMessage((message) => { await channelA.sendSyncMessage(async (message) => {
expect(message.content?.length).to.equal(0); expect(message.content?.length).to.equal(0);
return Promise.resolve(true); return true;
}); });
}); });
@ -513,10 +574,10 @@ describe("MessageChannel", function () {
it("should be delivered but not added to local log or bloom filter", async () => { it("should be delivered but not added to local log or bloom filter", async () => {
const timestampBefore = (channelB as any).lamportTimestamp; const timestampBefore = (channelB as any).lamportTimestamp;
let expectedTimestamp: number | undefined; let expectedTimestamp: number | undefined;
await channelA.sendSyncMessage((message) => { await channelA.sendSyncMessage(async (message) => {
expectedTimestamp = message.lamportTimestamp; expectedTimestamp = message.lamportTimestamp;
channelB.receiveMessage(message); await receiveMessage(channelB, message);
return Promise.resolve(true); return true;
}); });
const timestampAfter = (channelB as any).lamportTimestamp; const timestampAfter = (channelB as any).lamportTimestamp;
expect(timestampAfter).to.equal(expectedTimestamp); expect(timestampAfter).to.equal(expectedTimestamp);
@ -536,15 +597,15 @@ describe("MessageChannel", function () {
it("should update ack status of messages in outgoing buffer", async () => { it("should update ack status of messages in outgoing buffer", async () => {
for (const m of messagesA) { for (const m of messagesA) {
await channelA.sendMessage(utf8ToBytes(m), (message) => { await sendMessage(channelA, utf8ToBytes(m), async (message) => {
channelB.receiveMessage(message); await receiveMessage(channelB, message);
return Promise.resolve({ success: true }); return { success: true };
}); });
} }
await channelB.sendSyncMessage((message) => { await sendMessage(channelB, new Uint8Array(), async (message) => {
channelA.receiveMessage(message); await receiveMessage(channelA, message);
return Promise.resolve(true); return { success: true };
}); });
const causalHistorySize = (channelA as any).causalHistorySize; const causalHistorySize = (channelA as any).causalHistorySize;
@ -560,9 +621,9 @@ describe("MessageChannel", function () {
channelA = new MessageChannel(channelId); channelA = new MessageChannel(channelId);
}); });
it("should be sent without a timestamp, causal history, or bloom filter", () => { it("should be sent without a timestamp, causal history, or bloom filter", async () => {
const timestampBefore = (channelA as any).lamportTimestamp; const timestampBefore = (channelA as any).lamportTimestamp;
channelA.sendEphemeralMessage(new Uint8Array(), (message) => { await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => {
expect(message.lamportTimestamp).to.equal(undefined); expect(message.lamportTimestamp).to.equal(undefined);
expect(message.causalHistory).to.deep.equal([]); expect(message.causalHistory).to.deep.equal([]);
expect(message.bloomFilter).to.equal(undefined); expect(message.bloomFilter).to.equal(undefined);
@ -577,33 +638,36 @@ describe("MessageChannel", function () {
}); });
it("should be delivered immediately if received", async () => { it("should be delivered immediately if received", async () => {
let deliveredMessageId: string | undefined; const channelB = new MessageChannel(channelId);
let sentMessage: Message | undefined;
const channelB = new MessageChannel(channelId, { // Track initial state
deliveredMessageCallback: (messageId) => { const localHistoryBefore = (channelB as any).localHistory.length;
deliveredMessageId = messageId; const incomingBufferBefore = (channelB as any).incomingBuffer.length;
} const timestampBefore = (channelB as any).lamportTimestamp;
});
const waitForMessageDelivered = new Promise<string>((resolve) => { await channelA.sendEphemeralMessage(
channelB.addEventListener( utf8ToBytes(messagesA[0]),
MessageChannelEvent.MessageDelivered, async (message) => {
(event) => { // Ephemeral messages should have no timestamp
resolve(event.detail); expect(message.lamportTimestamp).to.be.undefined;
} await receiveMessage(channelB, message);
);
channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => {
sentMessage = message;
channelB.receiveMessage(message);
return true; return true;
}); }
}); );
await channelA.processTasks();
await channelB.processTasks();
const eventMessageId = await waitForMessageDelivered; // Verify ephemeral message behavior:
expect(deliveredMessageId).to.equal(sentMessage?.messageId); // 1. Not added to local history
expect(eventMessageId).to.equal(sentMessage?.messageId); expect((channelB as any).localHistory.length).to.equal(
localHistoryBefore
);
// 2. Not added to incoming buffer
expect((channelB as any).incomingBuffer.length).to.equal(
incomingBufferBefore
);
// 3. Doesn't update lamport timestamp
expect((channelB as any).lamportTimestamp).to.equal(timestampBefore);
}); });
}); });
}); });

View File

@ -1,20 +1,18 @@
import { TypedEventEmitter } from "@libp2p/interface"; import { TypedEventEmitter } from "@libp2p/interface";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { bytesToHex } from "@noble/hashes/utils"; import { bytesToHex } from "@noble/hashes/utils";
import { proto_sds_message } from "@waku/proto"; import { Logger } from "@waku/utils";
import { DefaultBloomFilter } from "./bloom.js"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
export enum MessageChannelEvent { import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
MessageDelivered = "messageDelivered" import {
} ChannelId,
type MessageChannelEvents = { HistoryEntry,
[MessageChannelEvent.MessageDelivered]: CustomEvent<string>; Message,
}; MessageChannelEvent,
MessageChannelEvents
export type Message = proto_sds_message.SdsMessage; } from "./events.js";
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export const DEFAULT_BLOOM_FILTER_OPTIONS = { export const DEFAULT_BLOOM_FILTER_OPTIONS = {
capacity: 10000, capacity: 10000,
@ -24,11 +22,12 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
const DEFAULT_CAUSAL_HISTORY_SIZE = 2; const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes
const log = new Logger("sds:message-channel");
interface MessageChannelOptions { interface MessageChannelOptions {
causalHistorySize?: number; causalHistorySize?: number;
receivedMessageTimeoutEnabled?: boolean; receivedMessageTimeoutEnabled?: boolean;
receivedMessageTimeout?: number; receivedMessageTimeout?: number;
deliveredMessageCallback?: (messageId: string) => void;
} }
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
@ -38,13 +37,31 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private acknowledgements: Map<string, number>; private acknowledgements: Map<string, number>;
private incomingBuffer: Message[]; private incomingBuffer: Message[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
private channelId: ChannelId; public readonly channelId: ChannelId;
private causalHistorySize: number; private causalHistorySize: number;
private acknowledgementCount: number; private acknowledgementCount: number;
private timeReceived: Map<string, number>; private timeReceived: Map<string, number>;
private receivedMessageTimeoutEnabled: boolean; private receivedMessageTimeoutEnabled: boolean;
private receivedMessageTimeout: number; private receivedMessageTimeout: number;
private deliveredMessageCallback?: (messageId: string) => void;
private tasks: Task[] = [];
private handlers: Handlers = {
[Command.Send]: async (
params: ParamsByAction[Command.Send]
): Promise<void> => {
await this._sendMessage(params.payload, params.callback);
},
[Command.Receive]: async (
params: ParamsByAction[Command.Receive]
): Promise<void> => {
this._receiveMessage(params.message);
},
[Command.SendEphemeral]: async (
params: ParamsByAction[Command.SendEphemeral]
): Promise<void> => {
await this._sendEphemeralMessage(params.payload, params.callback);
}
};
public constructor( public constructor(
channelId: ChannelId, channelId: ChannelId,
@ -66,7 +83,51 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
options.receivedMessageTimeoutEnabled ?? false; options.receivedMessageTimeoutEnabled ?? false;
this.receivedMessageTimeout = this.receivedMessageTimeout =
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
this.deliveredMessageCallback = options.deliveredMessageCallback; }
/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*
* This method should be called periodically by the library consumer to execute
* queued send/receive operations in the correct sequence.
*
* @example
* ```typescript
* const channel = new MessageChannel("my-channel");
*
* // Queue some operations
* await channel.sendMessage(payload, callback);
* channel.receiveMessage(incomingMessage);
*
* // Process all queued operations
* await channel.processTasks();
* ```
*
* @throws Will emit a 'taskError' event if any task fails, but continues processing remaining tasks
*/
public async processTasks(): Promise<void> {
while (this.tasks.length > 0) {
const item = this.tasks.shift();
if (!item) {
continue;
}
await this.executeTask(item);
}
}
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
try {
const handler = this.handlers[item.command];
await handler(item.params as ParamsByAction[A]);
} catch (error) {
log.error(`Task execution failed for command ${item.command}:`, error);
this.dispatchEvent(
new CustomEvent("taskError", {
detail: { command: item.command, error, params: item.params }
})
);
}
} }
public static getMessageId(payload: Uint8Array): string { public static getMessageId(payload: Uint8Array): string {
@ -74,20 +135,28 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
} }
/** /**
* Send a message to the SDS channel. * Queues a message to be sent on this channel.
* *
* Increments the lamport timestamp, constructs a `Message` object * The message will be processed sequentially when processTasks() is called.
* with the given payload, and adds it to the outgoing buffer. * This ensures proper lamport timestamp ordering and causal history tracking.
* *
* If the callback is successful, the message is also added to * @param payload - The message content as a byte array
* the bloom filter and message history. In the context of * @param callback - Optional callback function called after the message is processed
* Waku, this likely means the message was published via * @returns Promise that resolves when the message is queued (not sent)
* light push or relay.
* *
* See https://rfc.vac.dev/vac/raw/sds/#send-message * @example
* ```typescript
* const channel = new MessageChannel("chat-room");
* const message = new TextEncoder().encode("Hello, world!");
* *
* @param payload - The payload to send. * await channel.sendMessage(message, async (processedMessage) => {
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. * console.log("Message processed:", processedMessage.messageId);
* return { success: true };
* });
*
* // Actually send the message
* await channel.processTasks();
* ```
*/ */
public async sendMessage( public async sendMessage(
payload: Uint8Array, payload: Uint8Array,
@ -95,6 +164,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
success: boolean; success: boolean;
retrievalHint?: Uint8Array; retrievalHint?: Uint8Array;
}> }>
): Promise<void> {
this.tasks.push({
command: Command.Send,
params: {
payload,
callback
}
});
}
private async _sendMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
): Promise<void> { ): Promise<void> {
this.lamportTimestamp++; this.lamportTimestamp++;
@ -114,16 +199,25 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.outgoingBuffer.push(message); this.outgoingBuffer.push(message);
if (callback) { if (callback) {
const { success, retrievalHint } = await callback(message); try {
if (success) { const { success, retrievalHint } = await callback(message);
this.filter.insert(messageId); if (success) {
this.localHistory.push({ this.filter.insert(messageId);
timestamp: this.lamportTimestamp, this.localHistory.push({
historyEntry: { timestamp: this.lamportTimestamp,
messageId, historyEntry: {
retrievalHint messageId,
} retrievalHint
}); }
});
this.timeReceived.set(messageId, Date.now());
this.safeDispatchEvent(MessageChannelEvent.MessageSent, {
detail: message
});
}
} catch (error) {
log.error("Callback execution failed in _sendMessage:", error);
throw error;
} }
} }
} }
@ -141,10 +235,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* @param payload - The payload to send. * @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/ */
public sendEphemeralMessage( public async sendEphemeralMessage(
payload: Uint8Array, payload: Uint8Array,
callback?: (message: Message) => boolean callback?: (message: Message) => Promise<boolean>
): void { ): Promise<void> {
this.tasks.push({
command: Command.SendEphemeral,
params: {
payload,
callback
}
});
}
private async _sendEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
): Promise<void> {
const message: Message = { const message: Message = {
messageId: MessageChannel.getMessageId(payload), messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId, channelId: this.channelId,
@ -155,36 +262,69 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}; };
if (callback) { if (callback) {
callback(message); try {
await callback(message);
} catch (error) {
log.error("Callback execution failed in _sendEphemeralMessage:", error);
throw error;
}
} }
} }
/** /**
* Process a received SDS message for this channel. * Queues a received message for processing.
* *
* Review the acknowledgement status of messages in the outgoing buffer * The message will be processed when processTasks() is called, ensuring
* by inspecting the received message's bloom filter and causal history. * proper dependency resolution and causal ordering.
* Add the received message to the bloom filter.
* If the local history contains every message in the received message's
* causal history, deliver the message. Otherwise, add the message to the
* incoming buffer.
* *
* See https://rfc.vac.dev/vac/raw/sds/#receive-message * @param message - The message to receive and process
* *
* @param message - The received SDS message. * @example
* ```typescript
* const channel = new MessageChannel("chat-room");
*
* // Receive a message from the network
* channel.receiveMessage(incomingMessage);
*
* // Process the received message
* await channel.processTasks();
* ```
*/ */
public receiveMessage(message: Message): void { public receiveMessage(message: Message): void {
this.tasks.push({
command: Command.Receive,
params: {
message
}
});
}
public _receiveMessage(message: Message): void {
if (
message.content &&
message.content.length > 0 &&
this.timeReceived.has(message.messageId)
) {
return;
}
if (!message.lamportTimestamp) { if (!message.lamportTimestamp) {
// Messages with no timestamp are ephemeral messages and should be delivered immediately
this.deliverMessage(message); this.deliverMessage(message);
return; return;
} }
// review ack status if (message.content?.length === 0) {
this.safeDispatchEvent(MessageChannelEvent.SyncReceived, {
detail: message
});
} else {
this.safeDispatchEvent(MessageChannelEvent.MessageReceived, {
detail: message
});
}
this.reviewAckStatus(message); this.reviewAckStatus(message);
// add to bloom filter (skip for messages with empty content)
if (message.content?.length && message.content.length > 0) { if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId); this.filter.insert(message.messageId);
} }
// verify causal history
const dependenciesMet = message.causalHistory.every((historyEntry) => const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some( this.localHistory.some(
({ historyEntry: { messageId } }) => ({ historyEntry: { messageId } }) =>
@ -196,17 +336,24 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.timeReceived.set(message.messageId, Date.now()); this.timeReceived.set(message.messageId, Date.now());
} else { } else {
this.deliverMessage(message); this.deliverMessage(message);
this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
});
} }
} }
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
// Note that even though this function has side effects, it is not async
// and does not need to be called through the queue.
public sweepIncomingBuffer(): HistoryEntry[] { public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{ const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[]; buffer: Message[];
missing: HistoryEntry[]; missing: Set<HistoryEntry>;
}>( }>(
({ buffer, missing }, message) => { ({ buffer, missing }, message) => {
// Check each message for missing dependencies
const missingDependencies = message.causalHistory.filter( const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) => (messageHistoryEntry) =>
!this.localHistory.some( !this.localHistory.some(
@ -215,9 +362,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
) )
); );
if (missingDependencies.length === 0) { if (missingDependencies.length === 0) {
// Any message with no missing dependencies is delivered
// and removed from the buffer (implicitly by not adding it to the new incoming buffer)
this.deliverMessage(message); this.deliverMessage(message);
this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
});
return { buffer, missing }; return { buffer, missing };
} }
@ -232,18 +383,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return { buffer, missing }; return { buffer, missing };
} }
} }
// Any message with missing dependencies stays in the buffer missingDependencies.forEach((dependency) => {
// and the missing message IDs are returned for processing. missing.add(dependency);
});
return { return {
buffer: buffer.concat(message), buffer: buffer.concat(message),
missing: missing.concat(missingDependencies) missing
}; };
}, },
{ buffer: new Array<Message>(), missing: new Array<HistoryEntry>() } { buffer: new Array<Message>(), missing: new Set<HistoryEntry>() }
); );
// Update the incoming buffer to only include messages with no missing dependencies
this.incomingBuffer = buffer; this.incomingBuffer = buffer;
return missing;
this.safeDispatchEvent(MessageChannelEvent.MissedMessages, {
detail: Array.from(missing)
});
return Array.from(missing);
} }
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep // https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep
@ -251,7 +407,6 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
unacknowledged: Message[]; unacknowledged: Message[];
possiblyAcknowledged: Message[]; possiblyAcknowledged: Message[];
} { } {
// Partition all messages in the outgoing buffer into unacknowledged and possibly acknowledged messages
return this.outgoingBuffer.reduce<{ return this.outgoingBuffer.reduce<{
unacknowledged: Message[]; unacknowledged: Message[];
possiblyAcknowledged: Message[]; possiblyAcknowledged: Message[];
@ -285,7 +440,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* *
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/ */
public sendSyncMessage( public async sendSyncMessage(
callback?: (message: Message) => Promise<boolean> callback?: (message: Message) => Promise<boolean>
): Promise<boolean> { ): Promise<boolean> {
this.lamportTimestamp++; this.lamportTimestamp++;
@ -304,15 +459,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}; };
if (callback) { if (callback) {
return callback(message); try {
await callback(message);
this.safeDispatchEvent(MessageChannelEvent.SyncSent, {
detail: message
});
return true;
} catch (error) {
log.error("Callback execution failed in sendSyncMessage:", error);
throw error;
}
} }
return Promise.resolve(false); return false;
} }
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message // See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { private deliverMessage(message: Message, retrievalHint?: Uint8Array): void {
this.notifyDeliveredMessage(message.messageId);
const messageLamportTimestamp = message.lamportTimestamp ?? 0; const messageLamportTimestamp = message.lamportTimestamp ?? 0;
if (messageLamportTimestamp > this.lamportTimestamp) { if (messageLamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = messageLamportTimestamp; this.lamportTimestamp = messageLamportTimestamp;
@ -352,17 +514,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// to determine the acknowledgement status of messages in the outgoing buffer. // to determine the acknowledgement status of messages in the outgoing buffer.
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
private reviewAckStatus(receivedMessage: Message): void { private reviewAckStatus(receivedMessage: Message): void {
// the participant MUST mark all messages in the received causal_history as acknowledged.
receivedMessage.causalHistory.forEach(({ messageId }) => { receivedMessage.causalHistory.forEach(({ messageId }) => {
this.outgoingBuffer = this.outgoingBuffer.filter( this.outgoingBuffer = this.outgoingBuffer.filter(
({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId ({ messageId: outgoingMessageId }) => {
if (outgoingMessageId !== messageId) {
return true;
}
this.safeDispatchEvent(MessageChannelEvent.MessageAcknowledged, {
detail: messageId
});
return false;
}
); );
this.acknowledgements.delete(messageId); this.acknowledgements.delete(messageId);
if (!this.filter.lookup(messageId)) { if (!this.filter.lookup(messageId)) {
this.filter.insert(messageId); this.filter.insert(messageId);
} }
}); });
// the participant MUST mark all messages included in the bloom_filter as possibly acknowledged
if (!receivedMessage.bloomFilter) { if (!receivedMessage.bloomFilter) {
return; return;
} }
@ -380,6 +548,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1; const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
if (count < this.acknowledgementCount) { if (count < this.acknowledgementCount) {
this.acknowledgements.set(message.messageId, count); this.acknowledgements.set(message.messageId, count);
this.safeDispatchEvent(MessageChannelEvent.PartialAcknowledgement, {
detail: {
messageId: message.messageId,
count
}
});
return true; return true;
} }
this.acknowledgements.delete(message.messageId); this.acknowledgements.delete(message.messageId);
@ -391,15 +565,4 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private getAcknowledgementCount(): number { private getAcknowledgementCount(): number {
return 2; return 2;
} }
private notifyDeliveredMessage(messageId: string): void {
if (this.deliveredMessageCallback) {
this.deliveredMessageCallback(messageId);
}
this.dispatchEvent(
new CustomEvent(MessageChannelEvent.MessageDelivered, {
detail: messageId
})
);
}
} }