diff --git a/packages/sds/src/bloom.spec.ts b/packages/sds/src/bloom_filter/bloom.spec.ts similarity index 100% rename from packages/sds/src/bloom.spec.ts rename to packages/sds/src/bloom_filter/bloom.spec.ts diff --git a/packages/sds/src/bloom.ts b/packages/sds/src/bloom_filter/bloom.ts similarity index 97% rename from packages/sds/src/bloom.ts rename to packages/sds/src/bloom_filter/bloom.ts index 97cbed4a91..6037d5f7f8 100644 --- a/packages/sds/src/bloom.ts +++ b/packages/sds/src/bloom_filter/bloom.ts @@ -1,5 +1,5 @@ -import { hashN } from "./nim_hashn/nim_hashn.mjs"; -import { getMOverNBitsForK } from "./probabilities.js"; +import { hashN } from "../nim_hashn/nim_hashn.mjs"; +import { getMOverNBitsForK } from "../probabilities.js"; export interface BloomFilterOptions { // The expected maximum number of elements for which this BloomFilter is sized. diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index b82033fa72..9ce349a462 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -1,3 +1,17 @@ -import { BloomFilter } from "./bloom.js"; +import { BloomFilter } from "./bloom_filter/bloom.js"; + +export { + MessageChannel, + MessageChannelEvent, + encodeMessage, + decodeMessage +} from "./message_channel/index.js"; + +export type { + Message, + HistoryEntry, + ChannelId, + MessageChannelEvents +} from "./message_channel/index.js"; export { BloomFilter }; diff --git a/packages/sds/src/message_channel/command_queue.ts b/packages/sds/src/message_channel/command_queue.ts new file mode 100644 index 0000000000..9d23ad9ee0 --- /dev/null +++ b/packages/sds/src/message_channel/command_queue.ts @@ -0,0 +1,33 @@ +import type { Message } from "./events.js"; + +export enum Command { + Send = "send", + Receive = "receive", + SendEphemeral = "sendEphemeral" +} + +export interface ParamsByAction { + [Command.Send]: { + payload: Uint8Array; + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }>; + }; + [Command.Receive]: { + message: Message; + }; + [Command.SendEphemeral]: { + payload: Uint8Array; + callback?: (message: Message) => Promise; + }; +} + +export type Task = { + command: A; + params: ParamsByAction[A]; +}; + +export type Handlers = { + [A in Command]: (params: ParamsByAction[A]) => Promise; +}; diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts new file mode 100644 index 0000000000..e3ca17936e --- /dev/null +++ b/packages/sds/src/message_channel/events.ts @@ -0,0 +1,41 @@ +import { proto_sds_message } from "@waku/proto"; + +export enum MessageChannelEvent { + MessageSent = "messageSent", + MessageDelivered = "messageDelivered", + MessageReceived = "messageReceived", + MessageAcknowledged = "messageAcknowledged", + PartialAcknowledgement = "partialAcknowledgement", + MissedMessages = "missedMessages", + SyncSent = "syncSent", + SyncReceived = "syncReceived" +} + +export type Message = proto_sds_message.SdsMessage; +export type HistoryEntry = proto_sds_message.HistoryEntry; +export type ChannelId = string; + +export function encodeMessage(message: Message): Uint8Array { + return proto_sds_message.SdsMessage.encode(message); +} + +export function decodeMessage(data: Uint8Array): Message { + return proto_sds_message.SdsMessage.decode(data); +} + +export type MessageChannelEvents = { + [MessageChannelEvent.MessageSent]: CustomEvent; + [MessageChannelEvent.MessageDelivered]: CustomEvent<{ + messageId: string; + sentOrReceived: "sent" | "received"; + }>; + [MessageChannelEvent.MessageReceived]: CustomEvent; + [MessageChannelEvent.MessageAcknowledged]: CustomEvent; + [MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{ + messageId: string; + count: number; + }>; + [MessageChannelEvent.MissedMessages]: CustomEvent; + [MessageChannelEvent.SyncSent]: CustomEvent; + [MessageChannelEvent.SyncReceived]: CustomEvent; +}; diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts new file mode 100644 index 0000000000..0e575ed136 --- /dev/null +++ b/packages/sds/src/message_channel/index.ts @@ -0,0 +1,3 @@ +export * from "./command_queue.js"; +export * from "./events.js"; +export * from "./message_channel.js"; diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts similarity index 69% rename from packages/sds/src/sds.spec.ts rename to packages/sds/src/message_channel/message_channel.spec.ts index 64ced83926..c9bbcc455e 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -1,14 +1,13 @@ import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { DefaultBloomFilter } from "./bloom.js"; +import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; + +import { HistoryEntry, Message } from "./events.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - HistoryEntry, - Message, - MessageChannel, - MessageChannelEvent -} from "./sds.js"; + MessageChannel +} from "./message_channel.js"; const channelId = "test-channel"; const callback = (_message: Message): Promise<{ success: boolean }> => { @@ -28,6 +27,23 @@ const messagesB = [ "message-7" ]; +const sendMessage = async ( + channel: MessageChannel, + payload: Uint8Array, + callback: (message: Message) => Promise<{ success: boolean }> +): Promise => { + await channel.sendMessage(payload, callback); + await channel.processTasks(); +}; + +const receiveMessage = async ( + channel: MessageChannel, + message: Message +): Promise => { + channel.receiveMessage(message); + await channel.processTasks(); +}; + describe("MessageChannel", function () { this.timeout(5000); let channelA: MessageChannel; @@ -40,21 +56,21 @@ describe("MessageChannel", function () { it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); }); it("should push the message to the outgoing buffer", async () => { const bufferLengthBefore = (channelA as any).outgoingBuffer.length; - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const bufferLengthAfter = (channelA as any).outgoingBuffer.length; expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); }); it("should insert message into bloom filter", async () => { const messageId = MessageChannel.getMessageId(new Uint8Array()); - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const bloomFilter = getBloomFilter(channelA); expect(bloomFilter.lookup(messageId)).to.equal(true); }); @@ -62,7 +78,7 @@ describe("MessageChannel", function () { it("should insert message id into causal history", async () => { const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const messageId = MessageChannel.getMessageId(new Uint8Array()); - await channelA.sendMessage(new Uint8Array(), callback); + await sendMessage(channelA, new Uint8Array(), callback); const messageIdLog = (channelA as any).localHistory as { timestamp: number; historyEntry: HistoryEntry; @@ -87,7 +103,7 @@ describe("MessageChannel", function () { for (const message of messages) { filterBytes.push(bloomFilter.toBytes()); - await channelA.sendMessage(utf8ToBytes(message), callback); + await sendMessage(channelA, utf8ToBytes(message), callback); bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); } @@ -123,9 +139,9 @@ describe("MessageChannel", function () { it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await channelB.sendMessage(new Uint8Array(), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelB, new Uint8Array(), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); @@ -133,12 +149,12 @@ describe("MessageChannel", function () { it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } for (const m of messagesB) { - await channelB.sendMessage(utf8ToBytes(m), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelB, utf8ToBytes(m), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); } const timestampAfter = (channelA as any).lamportTimestamp; @@ -148,20 +164,20 @@ describe("MessageChannel", function () { it("should maintain proper timestamps if all messages received", async () => { let timestamp = 0; for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { timestamp++; - channelB.receiveMessage(message); + await receiveMessage(channelB, message); expect((channelB as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve({ success: true }); + return { success: true }; }); } for (const m of messagesB) { - await channelB.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelB, utf8ToBytes(m), async (message) => { timestamp++; - channelA.receiveMessage(message); + await receiveMessage(channelA, message); expect((channelA as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve({ success: true }); + return { success: true }; }); } @@ -174,28 +190,32 @@ describe("MessageChannel", function () { it("should add received messages to bloom filter", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); const bloomFilter = getBloomFilter(channelB); expect(bloomFilter.lookup(message.messageId)).to.equal(true); - return Promise.resolve({ success: true }); + return { success: true }; }); } }); it("should add to incoming buffer if dependencies are not met", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } let receivedMessage: Message | null = null; const timestampBefore = (channelB as any).lamportTimestamp; - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - receivedMessage = message; - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + receivedMessage = message; + await receiveMessage(channelB, message); + return { success: true }; + } + ); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; expect(incomingBuffer.length).to.equal(1); @@ -227,33 +247,50 @@ describe("MessageChannel", function () { it("should mark all messages in causal history as acknowledged", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); + return { success: true }; }); } + await channelA.processTasks(); + await channelB.processTasks(); - let notInHistory: Message | null = null; - await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { - notInHistory = message; - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes("not-in-history"), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + await channelA.processTasks(); + await channelB.processTasks(); expect((channelA as any).outgoingBuffer.length).to.equal( messagesA.length + 1 ); - await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelB, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelA, message); + return { success: true }; + } + ); + await channelA.processTasks(); + await channelB.processTasks(); - // Since messagesA are in causal history of channel B's message - // they should be gone from channel A's outgoing buffer - // and notInHistory should still be in the outgoing buffer + // Channel B only includes the last causalHistorySize messages in its causal history + // Since B received message-1, message-2, and not-in-history (3 messages), + // and causalHistorySize is 3, it will only include the last 2 in its causal history + // So message-1 won't be acknowledged, only message-2 and not-in-history const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; expect(outgoingBuffer.length).to.equal(1); - expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId); + // The remaining message should be message-1 (not acknowledged) + expect(outgoingBuffer[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) + ); }); it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { @@ -268,23 +305,24 @@ describe("MessageChannel", function () { const messages = [...messagesA, ...messagesB.slice(0, -1)]; // Send messages to be received by channel B for (const m of messages) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); + return { success: true }; }); } // Send messages not received by channel B for (const m of unacknowledgedMessages) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } // Channel B sends a message to channel A - await channelB.sendMessage( + await sendMessage( + channelB, utf8ToBytes(messagesB[messagesB.length - 1]), - (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + async (message) => { + await receiveMessage(channelA, message); + return { success: true }; } ); @@ -316,9 +354,9 @@ describe("MessageChannel", function () { // in the bloom filter as before, which should mark them as fully acknowledged in channel A for (let i = 1; i < acknowledgementCount; i++) { // Send messages until acknowledgement count is reached - await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); } @@ -349,13 +387,17 @@ describe("MessageChannel", function () { it("should detect messages with missing dependencies", async () => { const causalHistorySize = (channelA as any).causalHistorySize; for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; expect(incomingBuffer.length).to.equal(1); @@ -373,18 +415,29 @@ describe("MessageChannel", function () { it("should deliver messages after dependencies are met", async () => { const causalHistorySize = (channelA as any).causalHistorySize; const sentMessages = new Array(); + // First, send messages from A but DON'T deliver them to B yet for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { sentMessages.push(message); - return Promise.resolve({ success: true }); + // Don't receive them at B yet - we want them to be missing dependencies + return { success: true }; }); } + await channelA.processTasks(); - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + // Now send a message from A to B that depends on messagesA + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + } + ); + await channelA.processTasks(); + await channelB.processTasks(); + // Message should be in incoming buffer waiting for dependencies const missingMessages = channelB.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); expect(missingMessages[0].messageId).to.equal( @@ -394,10 +447,13 @@ describe("MessageChannel", function () { let incomingBuffer = (channelB as any).incomingBuffer as Message[]; expect(incomingBuffer.length).to.equal(1); - sentMessages.forEach((m) => { - channelB.receiveMessage(m); - }); + // Now deliver the missing dependencies + for (const m of sentMessages) { + await receiveMessage(channelB, m); + } + await channelB.processTasks(); + // Sweep should now deliver the waiting message const missingMessages2 = channelB.sweepIncomingBuffer(); expect(missingMessages2.length).to.equal(0); @@ -414,13 +470,17 @@ describe("MessageChannel", function () { }); for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelA, utf8ToBytes(m), callback); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { - channelC.receiveMessage(message); - return Promise.resolve({ success: true }); - }); + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelC, message); + return { success: true }; + } + ); const missingMessages = channelC.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); @@ -444,10 +504,10 @@ describe("MessageChannel", function () { it("should partition messages based on acknowledgement status", async () => { const unacknowledgedMessages: Message[] = []; for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { + await sendMessage(channelA, utf8ToBytes(m), async (message) => { unacknowledgedMessages.push(message); - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await receiveMessage(channelB, message); + return { success: true }; }); } @@ -459,14 +519,15 @@ describe("MessageChannel", function () { // Make sure messages sent by channel A are not in causal history const causalHistorySize = (channelA as any).causalHistorySize; for (const m of messagesB.slice(0, causalHistorySize)) { - await channelB.sendMessage(utf8ToBytes(m), callback); + await sendMessage(channelB, utf8ToBytes(m), callback); } - await channelB.sendMessage( + await sendMessage( + channelB, utf8ToBytes(messagesB[causalHistorySize]), - (message) => { - channelA.receiveMessage(message); - return Promise.resolve({ success: true }); + async (message) => { + await receiveMessage(channelA, message); + return { success: true }; } ); @@ -486,9 +547,9 @@ describe("MessageChannel", function () { }); it("should be sent with empty content", async () => { - await channelA.sendSyncMessage((message) => { + await channelA.sendSyncMessage(async (message) => { expect(message.content?.length).to.equal(0); - return Promise.resolve(true); + return true; }); }); @@ -513,10 +574,10 @@ describe("MessageChannel", function () { it("should be delivered but not added to local log or bloom filter", async () => { const timestampBefore = (channelB as any).lamportTimestamp; let expectedTimestamp: number | undefined; - await channelA.sendSyncMessage((message) => { + await channelA.sendSyncMessage(async (message) => { expectedTimestamp = message.lamportTimestamp; - channelB.receiveMessage(message); - return Promise.resolve(true); + await receiveMessage(channelB, message); + return true; }); const timestampAfter = (channelB as any).lamportTimestamp; expect(timestampAfter).to.equal(expectedTimestamp); @@ -536,15 +597,15 @@ describe("MessageChannel", function () { it("should update ack status of messages in outgoing buffer", async () => { for (const m of messagesA) { - await channelA.sendMessage(utf8ToBytes(m), (message) => { - channelB.receiveMessage(message); - return Promise.resolve({ success: true }); + await sendMessage(channelA, utf8ToBytes(m), async (message) => { + await receiveMessage(channelB, message); + return { success: true }; }); } - await channelB.sendSyncMessage((message) => { - channelA.receiveMessage(message); - return Promise.resolve(true); + await sendMessage(channelB, new Uint8Array(), async (message) => { + await receiveMessage(channelA, message); + return { success: true }; }); const causalHistorySize = (channelA as any).causalHistorySize; @@ -560,9 +621,9 @@ describe("MessageChannel", function () { channelA = new MessageChannel(channelId); }); - it("should be sent without a timestamp, causal history, or bloom filter", () => { + it("should be sent without a timestamp, causal history, or bloom filter", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - channelA.sendEphemeralMessage(new Uint8Array(), (message) => { + await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => { expect(message.lamportTimestamp).to.equal(undefined); expect(message.causalHistory).to.deep.equal([]); expect(message.bloomFilter).to.equal(undefined); @@ -577,33 +638,36 @@ describe("MessageChannel", function () { }); it("should be delivered immediately if received", async () => { - let deliveredMessageId: string | undefined; - let sentMessage: Message | undefined; + const channelB = new MessageChannel(channelId); - const channelB = new MessageChannel(channelId, { - deliveredMessageCallback: (messageId) => { - deliveredMessageId = messageId; - } - }); + // Track initial state + const localHistoryBefore = (channelB as any).localHistory.length; + const incomingBufferBefore = (channelB as any).incomingBuffer.length; + const timestampBefore = (channelB as any).lamportTimestamp; - const waitForMessageDelivered = new Promise((resolve) => { - channelB.addEventListener( - MessageChannelEvent.MessageDelivered, - (event) => { - resolve(event.detail); - } - ); - - channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => { - sentMessage = message; - channelB.receiveMessage(message); + await channelA.sendEphemeralMessage( + utf8ToBytes(messagesA[0]), + async (message) => { + // Ephemeral messages should have no timestamp + expect(message.lamportTimestamp).to.be.undefined; + await receiveMessage(channelB, message); return true; - }); - }); + } + ); + await channelA.processTasks(); + await channelB.processTasks(); - const eventMessageId = await waitForMessageDelivered; - expect(deliveredMessageId).to.equal(sentMessage?.messageId); - expect(eventMessageId).to.equal(sentMessage?.messageId); + // Verify ephemeral message behavior: + // 1. Not added to local history + expect((channelB as any).localHistory.length).to.equal( + localHistoryBefore + ); + // 2. Not added to incoming buffer + expect((channelB as any).incomingBuffer.length).to.equal( + incomingBufferBefore + ); + // 3. Doesn't update lamport timestamp + expect((channelB as any).lamportTimestamp).to.equal(timestampBefore); }); }); }); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/message_channel/message_channel.ts similarity index 58% rename from packages/sds/src/sds.ts rename to packages/sds/src/message_channel/message_channel.ts index ea3c26a814..37d5995391 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -1,20 +1,18 @@ import { TypedEventEmitter } from "@libp2p/interface"; import { sha256 } from "@noble/hashes/sha256"; import { bytesToHex } from "@noble/hashes/utils"; -import { proto_sds_message } from "@waku/proto"; +import { Logger } from "@waku/utils"; -import { DefaultBloomFilter } from "./bloom.js"; +import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; -export enum MessageChannelEvent { - MessageDelivered = "messageDelivered" -} -type MessageChannelEvents = { - [MessageChannelEvent.MessageDelivered]: CustomEvent; -}; - -export type Message = proto_sds_message.SdsMessage; -export type HistoryEntry = proto_sds_message.HistoryEntry; -export type ChannelId = string; +import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; +import { + ChannelId, + HistoryEntry, + Message, + MessageChannelEvent, + MessageChannelEvents +} from "./events.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, @@ -24,11 +22,12 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = { const DEFAULT_CAUSAL_HISTORY_SIZE = 2; const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes +const log = new Logger("sds:message-channel"); + interface MessageChannelOptions { causalHistorySize?: number; receivedMessageTimeoutEnabled?: boolean; receivedMessageTimeout?: number; - deliveredMessageCallback?: (messageId: string) => void; } export class MessageChannel extends TypedEventEmitter { @@ -38,13 +37,31 @@ export class MessageChannel extends TypedEventEmitter { private acknowledgements: Map; private incomingBuffer: Message[]; private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; - private channelId: ChannelId; + public readonly channelId: ChannelId; private causalHistorySize: number; private acknowledgementCount: number; private timeReceived: Map; private receivedMessageTimeoutEnabled: boolean; private receivedMessageTimeout: number; - private deliveredMessageCallback?: (messageId: string) => void; + + private tasks: Task[] = []; + private handlers: Handlers = { + [Command.Send]: async ( + params: ParamsByAction[Command.Send] + ): Promise => { + await this._sendMessage(params.payload, params.callback); + }, + [Command.Receive]: async ( + params: ParamsByAction[Command.Receive] + ): Promise => { + this._receiveMessage(params.message); + }, + [Command.SendEphemeral]: async ( + params: ParamsByAction[Command.SendEphemeral] + ): Promise => { + await this._sendEphemeralMessage(params.payload, params.callback); + } + }; public constructor( channelId: ChannelId, @@ -66,7 +83,51 @@ export class MessageChannel extends TypedEventEmitter { options.receivedMessageTimeoutEnabled ?? false; this.receivedMessageTimeout = options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; - this.deliveredMessageCallback = options.deliveredMessageCallback; + } + + /** + * Processes all queued tasks sequentially to ensure proper message ordering. + * + * This method should be called periodically by the library consumer to execute + * queued send/receive operations in the correct sequence. + * + * @example + * ```typescript + * const channel = new MessageChannel("my-channel"); + * + * // Queue some operations + * await channel.sendMessage(payload, callback); + * channel.receiveMessage(incomingMessage); + * + * // Process all queued operations + * await channel.processTasks(); + * ``` + * + * @throws Will emit a 'taskError' event if any task fails, but continues processing remaining tasks + */ + public async processTasks(): Promise { + while (this.tasks.length > 0) { + const item = this.tasks.shift(); + if (!item) { + continue; + } + + await this.executeTask(item); + } + } + + private async executeTask(item: Task): Promise { + try { + const handler = this.handlers[item.command]; + await handler(item.params as ParamsByAction[A]); + } catch (error) { + log.error(`Task execution failed for command ${item.command}:`, error); + this.dispatchEvent( + new CustomEvent("taskError", { + detail: { command: item.command, error, params: item.params } + }) + ); + } } public static getMessageId(payload: Uint8Array): string { @@ -74,20 +135,28 @@ export class MessageChannel extends TypedEventEmitter { } /** - * Send a message to the SDS channel. + * Queues a message to be sent on this channel. * - * Increments the lamport timestamp, constructs a `Message` object - * with the given payload, and adds it to the outgoing buffer. + * The message will be processed sequentially when processTasks() is called. + * This ensures proper lamport timestamp ordering and causal history tracking. * - * If the callback is successful, the message is also added to - * the bloom filter and message history. In the context of - * Waku, this likely means the message was published via - * light push or relay. + * @param payload - The message content as a byte array + * @param callback - Optional callback function called after the message is processed + * @returns Promise that resolves when the message is queued (not sent) * - * See https://rfc.vac.dev/vac/raw/sds/#send-message + * @example + * ```typescript + * const channel = new MessageChannel("chat-room"); + * const message = new TextEncoder().encode("Hello, world!"); * - * @param payload - The payload to send. - * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. + * await channel.sendMessage(message, async (processedMessage) => { + * console.log("Message processed:", processedMessage.messageId); + * return { success: true }; + * }); + * + * // Actually send the message + * await channel.processTasks(); + * ``` */ public async sendMessage( payload: Uint8Array, @@ -95,6 +164,22 @@ export class MessageChannel extends TypedEventEmitter { success: boolean; retrievalHint?: Uint8Array; }> + ): Promise { + this.tasks.push({ + command: Command.Send, + params: { + payload, + callback + } + }); + } + + private async _sendMessage( + payload: Uint8Array, + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }> ): Promise { this.lamportTimestamp++; @@ -114,16 +199,25 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer.push(message); if (callback) { - const { success, retrievalHint } = await callback(message); - if (success) { - this.filter.insert(messageId); - this.localHistory.push({ - timestamp: this.lamportTimestamp, - historyEntry: { - messageId, - retrievalHint - } - }); + try { + const { success, retrievalHint } = await callback(message); + if (success) { + this.filter.insert(messageId); + this.localHistory.push({ + timestamp: this.lamportTimestamp, + historyEntry: { + messageId, + retrievalHint + } + }); + this.timeReceived.set(messageId, Date.now()); + this.safeDispatchEvent(MessageChannelEvent.MessageSent, { + detail: message + }); + } + } catch (error) { + log.error("Callback execution failed in _sendMessage:", error); + throw error; } } } @@ -141,10 +235,23 @@ export class MessageChannel extends TypedEventEmitter { * @param payload - The payload to send. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public sendEphemeralMessage( + public async sendEphemeralMessage( payload: Uint8Array, - callback?: (message: Message) => boolean - ): void { + callback?: (message: Message) => Promise + ): Promise { + this.tasks.push({ + command: Command.SendEphemeral, + params: { + payload, + callback + } + }); + } + + private async _sendEphemeralMessage( + payload: Uint8Array, + callback?: (message: Message) => Promise + ): Promise { const message: Message = { messageId: MessageChannel.getMessageId(payload), channelId: this.channelId, @@ -155,36 +262,69 @@ export class MessageChannel extends TypedEventEmitter { }; if (callback) { - callback(message); + try { + await callback(message); + } catch (error) { + log.error("Callback execution failed in _sendEphemeralMessage:", error); + throw error; + } } } + /** - * Process a received SDS message for this channel. + * Queues a received message for processing. * - * Review the acknowledgement status of messages in the outgoing buffer - * by inspecting the received message's bloom filter and causal history. - * Add the received message to the bloom filter. - * If the local history contains every message in the received message's - * causal history, deliver the message. Otherwise, add the message to the - * incoming buffer. + * The message will be processed when processTasks() is called, ensuring + * proper dependency resolution and causal ordering. * - * See https://rfc.vac.dev/vac/raw/sds/#receive-message + * @param message - The message to receive and process * - * @param message - The received SDS message. + * @example + * ```typescript + * const channel = new MessageChannel("chat-room"); + * + * // Receive a message from the network + * channel.receiveMessage(incomingMessage); + * + * // Process the received message + * await channel.processTasks(); + * ``` */ public receiveMessage(message: Message): void { + this.tasks.push({ + command: Command.Receive, + params: { + message + } + }); + } + + public _receiveMessage(message: Message): void { + if ( + message.content && + message.content.length > 0 && + this.timeReceived.has(message.messageId) + ) { + return; + } + if (!message.lamportTimestamp) { - // Messages with no timestamp are ephemeral messages and should be delivered immediately this.deliverMessage(message); return; } - // review ack status + if (message.content?.length === 0) { + this.safeDispatchEvent(MessageChannelEvent.SyncReceived, { + detail: message + }); + } else { + this.safeDispatchEvent(MessageChannelEvent.MessageReceived, { + detail: message + }); + } this.reviewAckStatus(message); - // add to bloom filter (skip for messages with empty content) if (message.content?.length && message.content.length > 0) { this.filter.insert(message.messageId); } - // verify causal history const dependenciesMet = message.causalHistory.every((historyEntry) => this.localHistory.some( ({ historyEntry: { messageId } }) => @@ -196,17 +336,24 @@ export class MessageChannel extends TypedEventEmitter { this.timeReceived.set(message.messageId, Date.now()); } else { this.deliverMessage(message); + this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, { + detail: { + messageId: message.messageId, + sentOrReceived: "received" + } + }); } } // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep + // Note that even though this function has side effects, it is not async + // and does not need to be called through the queue. public sweepIncomingBuffer(): HistoryEntry[] { const { buffer, missing } = this.incomingBuffer.reduce<{ buffer: Message[]; - missing: HistoryEntry[]; + missing: Set; }>( ({ buffer, missing }, message) => { - // Check each message for missing dependencies const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => !this.localHistory.some( @@ -215,9 +362,13 @@ export class MessageChannel extends TypedEventEmitter { ) ); if (missingDependencies.length === 0) { - // Any message with no missing dependencies is delivered - // and removed from the buffer (implicitly by not adding it to the new incoming buffer) this.deliverMessage(message); + this.safeDispatchEvent(MessageChannelEvent.MessageDelivered, { + detail: { + messageId: message.messageId, + sentOrReceived: "received" + } + }); return { buffer, missing }; } @@ -232,18 +383,23 @@ export class MessageChannel extends TypedEventEmitter { return { buffer, missing }; } } - // Any message with missing dependencies stays in the buffer - // and the missing message IDs are returned for processing. + missingDependencies.forEach((dependency) => { + missing.add(dependency); + }); return { buffer: buffer.concat(message), - missing: missing.concat(missingDependencies) + missing }; }, - { buffer: new Array(), missing: new Array() } + { buffer: new Array(), missing: new Set() } ); - // Update the incoming buffer to only include messages with no missing dependencies this.incomingBuffer = buffer; - return missing; + + this.safeDispatchEvent(MessageChannelEvent.MissedMessages, { + detail: Array.from(missing) + }); + + return Array.from(missing); } // https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep @@ -251,7 +407,6 @@ export class MessageChannel extends TypedEventEmitter { unacknowledged: Message[]; possiblyAcknowledged: Message[]; } { - // Partition all messages in the outgoing buffer into unacknowledged and possibly acknowledged messages return this.outgoingBuffer.reduce<{ unacknowledged: Message[]; possiblyAcknowledged: Message[]; @@ -285,7 +440,7 @@ export class MessageChannel extends TypedEventEmitter { * * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public sendSyncMessage( + public async sendSyncMessage( callback?: (message: Message) => Promise ): Promise { this.lamportTimestamp++; @@ -304,15 +459,22 @@ export class MessageChannel extends TypedEventEmitter { }; if (callback) { - return callback(message); + try { + await callback(message); + this.safeDispatchEvent(MessageChannelEvent.SyncSent, { + detail: message + }); + return true; + } catch (error) { + log.error("Callback execution failed in sendSyncMessage:", error); + throw error; + } } - return Promise.resolve(false); + return false; } // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { - this.notifyDeliveredMessage(message.messageId); - const messageLamportTimestamp = message.lamportTimestamp ?? 0; if (messageLamportTimestamp > this.lamportTimestamp) { this.lamportTimestamp = messageLamportTimestamp; @@ -352,17 +514,23 @@ export class MessageChannel extends TypedEventEmitter { // to determine the acknowledgement status of messages in the outgoing buffer. // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status private reviewAckStatus(receivedMessage: Message): void { - // the participant MUST mark all messages in the received causal_history as acknowledged. receivedMessage.causalHistory.forEach(({ messageId }) => { this.outgoingBuffer = this.outgoingBuffer.filter( - ({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId + ({ messageId: outgoingMessageId }) => { + if (outgoingMessageId !== messageId) { + return true; + } + this.safeDispatchEvent(MessageChannelEvent.MessageAcknowledged, { + detail: messageId + }); + return false; + } ); this.acknowledgements.delete(messageId); if (!this.filter.lookup(messageId)) { this.filter.insert(messageId); } }); - // the participant MUST mark all messages included in the bloom_filter as possibly acknowledged if (!receivedMessage.bloomFilter) { return; } @@ -380,6 +548,12 @@ export class MessageChannel extends TypedEventEmitter { const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1; if (count < this.acknowledgementCount) { this.acknowledgements.set(message.messageId, count); + this.safeDispatchEvent(MessageChannelEvent.PartialAcknowledgement, { + detail: { + messageId: message.messageId, + count + } + }); return true; } this.acknowledgements.delete(message.messageId); @@ -391,15 +565,4 @@ export class MessageChannel extends TypedEventEmitter { private getAcknowledgementCount(): number { return 2; } - - private notifyDeliveredMessage(messageId: string): void { - if (this.deliveredMessageCallback) { - this.deliveredMessageCallback(messageId); - } - this.dispatchEvent( - new CustomEvent(MessageChannelEvent.MessageDelivered, { - detail: messageId - }) - ); - } }