From c161b37d080419dce26cb019617226d8f706f5de Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Thu, 14 Aug 2025 10:44:18 +1000 Subject: [PATCH] fix!: SDS acknowledgements (#2549) * SDS: change default causal history size to 200 # Conflicts: # packages/sds/src/index.ts # packages/sds/src/message_channel/message_channel.ts * SDS: add some comments * SDS: segregate messages types, introduce LocalHistory * SDS: fix miss-acks * SDS: logs and more explicit variable names * SDS: shorten event name * SDS: shorten var name * SDS: move message classes to own file. * SDS: use lodash instead of custom SortedArray implementation * SDS: move Message to own file * SDS: add comparison tests --- package-lock.json | 3 +- packages/sds/package.json | 3 +- packages/sds/src/index.ts | 11 +- .../sds/src/message_channel/command_queue.ts | 6 +- .../sds/src/message_channel/events.spec.ts | 35 --- packages/sds/src/message_channel/events.ts | 48 +--- packages/sds/src/message_channel/index.ts | 13 ++ .../src/message_channel/mem_local_history.ts | 66 ++++++ .../sds/src/message_channel/message.spec.ts | 88 +++++++ packages/sds/src/message_channel/message.ts | 175 ++++++++++++++ .../message_channel/message_channel.spec.ts | 65 +++--- .../src/message_channel/message_channel.ts | 214 ++++++++++-------- 12 files changed, 516 insertions(+), 211 deletions(-) delete mode 100644 packages/sds/src/message_channel/events.spec.ts create mode 100644 packages/sds/src/message_channel/mem_local_history.ts create mode 100644 packages/sds/src/message_channel/message.spec.ts create mode 100644 packages/sds/src/message_channel/message.ts diff --git a/package-lock.json b/package-lock.json index fa6c8bc191..28a537fc82 100644 --- a/package-lock.json +++ b/package-lock.json @@ -37730,7 +37730,8 @@ "@noble/hashes": "^1.7.1", "@waku/proto": "^0.0.12", "@waku/utils": "^0.0.25", - "chai": "^5.1.2" + "chai": "^5.1.2", + "lodash": "^4.17.21" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", diff --git a/packages/sds/package.json b/packages/sds/package.json index 659f4cbee9..3124f2ac08 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -64,7 +64,8 @@ "@noble/hashes": "^1.7.1", "@waku/proto": "^0.0.12", "@waku/utils": "^0.0.25", - "chai": "^5.1.2" + "chai": "^5.1.2", + "lodash": "^4.17.21" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index f80280732b..f5e4586fe0 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -3,11 +3,14 @@ import { BloomFilter } from "./bloom_filter/bloom.js"; export { MessageChannel, MessageChannelEvent, - MessageChannelOptions -} from "./message_channel/index.js"; - -export { + MessageChannelOptions, + isContentMessage, + isSyncMessage, + isEphemeralMessage, Message, + ContentMessage, + SyncMessage, + EphemeralMessage, type HistoryEntry, type ChannelId, type MessageChannelEvents, diff --git a/packages/sds/src/message_channel/command_queue.ts b/packages/sds/src/message_channel/command_queue.ts index 9d23ad9ee0..8dacafbfb5 100644 --- a/packages/sds/src/message_channel/command_queue.ts +++ b/packages/sds/src/message_channel/command_queue.ts @@ -1,4 +1,4 @@ -import type { Message } from "./events.js"; +import { ContentMessage, EphemeralMessage, Message } from "./message.js"; export enum Command { Send = "send", @@ -9,7 +9,7 @@ export enum Command { export interface ParamsByAction { [Command.Send]: { payload: Uint8Array; - callback?: (message: Message) => Promise<{ + callback?: (message: ContentMessage) => Promise<{ success: boolean; retrievalHint?: Uint8Array; }>; @@ -19,7 +19,7 @@ export interface ParamsByAction { }; [Command.SendEphemeral]: { payload: Uint8Array; - callback?: (message: Message) => Promise; + callback?: (message: EphemeralMessage) => Promise; }; } diff --git a/packages/sds/src/message_channel/events.spec.ts b/packages/sds/src/message_channel/events.spec.ts deleted file mode 100644 index 18508208c0..0000000000 --- a/packages/sds/src/message_channel/events.spec.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { expect } from "chai"; - -import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; - -import { Message } from "./events.js"; -import { DEFAULT_BLOOM_FILTER_OPTIONS } from "./message_channel.js"; - -describe("Message serialization", () => { - it("Bloom filter", () => { - const messageId = "first"; - - const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); - bloomFilter.insert(messageId); - - const message = new Message( - "123", - "my-channel", - "me", - [], - 0, - bloomFilter.toBytes(), - undefined - ); - - const bytes = message.encode(); - const decMessage = Message.decode(bytes); - - const decBloomFilter = DefaultBloomFilter.fromBytes( - decMessage!.bloomFilter!, - DEFAULT_BLOOM_FILTER_OPTIONS - ); - - expect(decBloomFilter.lookup(messageId)).to.be.true; - }); -}); diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index 055269108d..75318df210 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -1,4 +1,4 @@ -import { proto_sds_message } from "@waku/proto"; +import { HistoryEntry, Message, MessageId } from "./message.js"; export enum MessageChannelEvent { OutMessageSent = "sds:out:message-sent", @@ -9,52 +9,10 @@ export enum MessageChannelEvent { InMessageMissing = "sds:in:message-missing", OutSyncSent = "sds:out:sync-sent", InSyncReceived = "sds:in:sync-received", - InMessageIrretrievablyLost = "sds:in:message-irretrievably-lost", + InMessageLost = "sds:in:message-irretrievably-lost", ErrorTask = "sds:error-task" } -export type MessageId = string; -export type HistoryEntry = proto_sds_message.HistoryEntry; -export type ChannelId = string; -export type SenderId = string; - -export class Message implements proto_sds_message.SdsMessage { - public constructor( - public messageId: string, - public channelId: string, - public senderId: string, - public causalHistory: proto_sds_message.HistoryEntry[], - public lamportTimestamp?: number | undefined, - public bloomFilter?: Uint8Array | undefined, - public content?: Uint8Array | undefined - ) {} - - public encode(): Uint8Array { - return proto_sds_message.SdsMessage.encode(this); - } - - public static decode(data: Uint8Array): Message { - const { - messageId, - channelId, - senderId, - causalHistory, - lamportTimestamp, - bloomFilter, - content - } = proto_sds_message.SdsMessage.decode(data); - return new Message( - messageId, - channelId, - senderId, - causalHistory, - lamportTimestamp, - bloomFilter, - content - ); - } -} - export type MessageChannelEvents = { [MessageChannelEvent.OutMessageSent]: CustomEvent; [MessageChannelEvent.InMessageDelivered]: CustomEvent; @@ -65,7 +23,7 @@ export type MessageChannelEvents = { count: number; }>; [MessageChannelEvent.InMessageMissing]: CustomEvent; - [MessageChannelEvent.InMessageIrretrievablyLost]: CustomEvent; + [MessageChannelEvent.InMessageLost]: CustomEvent; [MessageChannelEvent.OutSyncSent]: CustomEvent; [MessageChannelEvent.InSyncReceived]: CustomEvent; [MessageChannelEvent.ErrorTask]: CustomEvent; diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts index 0e575ed136..53c37ae388 100644 --- a/packages/sds/src/message_channel/index.ts +++ b/packages/sds/src/message_channel/index.ts @@ -1,3 +1,16 @@ export * from "./command_queue.js"; export * from "./events.js"; export * from "./message_channel.js"; +export { + ChannelId, + ContentMessage, + EphemeralMessage, + HistoryEntry, + Message, + MessageId, + SenderId, + SyncMessage, + isContentMessage, + isEphemeralMessage, + isSyncMessage +} from "./message.js"; diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts new file mode 100644 index 0000000000..2d2f22abae --- /dev/null +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -0,0 +1,66 @@ +import _ from "lodash"; + +import { ContentMessage, isContentMessage } from "./message.js"; + +/** + * In-Memory implementation of a local store of messages. + * + * Messages are store in SDS chronological order: + * - messages[0] is the oldest message + * - messages[n] is the newest message + * + * Only stores content message: `message.lamportTimestamp` and `message.content` are present. + */ +export class MemLocalHistory { + private items: ContentMessage[] = []; + + public get length(): number { + return this.items.length; + } + + public push(...items: ContentMessage[]): number { + for (const item of items) { + this.validateMessage(item); + } + + // Add new items and ensure uniqueness by messageId using sortedUniqBy + // The valueOf() method on ContentMessage enables native < operator sorting + this.items = _.sortedUniqBy([...this.items, ...items], "messageId"); + + return this.items.length; + } + + public some( + predicate: ( + value: ContentMessage, + index: number, + array: ContentMessage[] + ) => unknown, + thisArg?: any + ): boolean { + return this.items.some(predicate, thisArg); + } + + public slice(start?: number, end?: number): ContentMessage[] { + return this.items.slice(start, end); + } + + public find( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): ContentMessage | undefined { + return this.items.find(predicate, thisArg); + } + + private validateMessage(message: ContentMessage): void { + if (!isContentMessage(message)) { + throw new Error( + "Message must have lamportTimestamp and content defined, sync and ephemeral messages cannot be stored" + ); + } + } +} diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts new file mode 100644 index 0000000000..9b4a2e0841 --- /dev/null +++ b/packages/sds/src/message_channel/message.spec.ts @@ -0,0 +1,88 @@ +import { expect } from "chai"; + +import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; + +import { ContentMessage, Message } from "./message.js"; +import { DEFAULT_BLOOM_FILTER_OPTIONS } from "./message_channel.js"; + +describe("Message serialization", () => { + it("Bloom filter", () => { + const messageId = "first"; + + const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); + bloomFilter.insert(messageId); + + const message = new Message( + "123", + "my-channel", + "me", + [], + 0, + bloomFilter.toBytes(), + undefined + ); + + const bytes = message.encode(); + const decMessage = Message.decode(bytes); + + const decBloomFilter = DefaultBloomFilter.fromBytes( + decMessage!.bloomFilter!, + DEFAULT_BLOOM_FILTER_OPTIONS + ); + + expect(decBloomFilter.lookup(messageId)).to.be.true; + }); +}); + +describe("ContentMessage comparison with < operator", () => { + it("should sort by lamportTimestamp when timestamps differ", () => { + const msgA = new ContentMessage( + "zzz", // Higher messageId + "channel", + "sender", + [], + 100, // Lower timestamp + undefined, + new Uint8Array([1]) + ); + + const msgB = new ContentMessage( + "aaa", // Lower messageId + "channel", + "sender", + [], + 200, // Higher timestamp + undefined, + new Uint8Array([2]) + ); + + // Despite msgA having higher messageId, it should be < msgB due to lower timestamp + expect(msgA < msgB).to.be.true; + expect(msgB < msgA).to.be.false; + }); + + it("should sort by messageId when timestamps are equal", () => { + const msgA = new ContentMessage( + "aaa", // Lower messageId + "channel", + "sender", + [], + 100, // Same timestamp + undefined, + new Uint8Array([1]) + ); + + const msgB = new ContentMessage( + "zzz", // Higher messageId + "channel", + "sender", + [], + 100, // Same timestamp + undefined, + new Uint8Array([2]) + ); + + expect(msgA < msgB).to.be.true; + expect(msgB < msgA).to.be.false; + }); +}); diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts new file mode 100644 index 0000000000..eeb5c732d2 --- /dev/null +++ b/packages/sds/src/message_channel/message.ts @@ -0,0 +1,175 @@ +import { proto_sds_message } from "@waku/proto"; + +export type MessageId = string; +export type HistoryEntry = proto_sds_message.HistoryEntry; +export type ChannelId = string; +export type SenderId = string; + +export class Message implements proto_sds_message.SdsMessage { + public constructor( + public messageId: string, + public channelId: string, + public senderId: string, + public causalHistory: proto_sds_message.HistoryEntry[], + public lamportTimestamp?: number | undefined, + public bloomFilter?: Uint8Array | undefined, + public content?: Uint8Array | undefined, + /** + * Not encoded, set after it is sent, used to include in follow-up messages + */ + public retrievalHint?: Uint8Array | undefined + ) {} + + public encode(): Uint8Array { + return proto_sds_message.SdsMessage.encode(this); + } + + public static decode(data: Uint8Array): Message { + const { + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content + } = proto_sds_message.SdsMessage.decode(data); + return new Message( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content + ); + } +} + +export class SyncMessage extends Message { + public constructor( + public messageId: string, + public channelId: string, + public senderId: string, + public causalHistory: proto_sds_message.HistoryEntry[], + public lamportTimestamp: number, + public bloomFilter: Uint8Array | undefined, + public content: undefined, + /** + * Not encoded, set after it is sent, used to include in follow-up messages + */ + public retrievalHint?: Uint8Array | undefined + ) { + super( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content, + retrievalHint + ); + } +} + +export function isSyncMessage( + message: Message | ContentMessage | SyncMessage | EphemeralMessage +): message is SyncMessage { + return Boolean( + "lamportTimestamp" in message && + typeof message.lamportTimestamp === "number" && + (message.content === undefined || message.content.length === 0) + ); +} + +export class EphemeralMessage extends Message { + public constructor( + public messageId: string, + public channelId: string, + public senderId: string, + public causalHistory: proto_sds_message.HistoryEntry[], + public lamportTimestamp: undefined, + public bloomFilter: Uint8Array | undefined, + public content: Uint8Array, + /** + * Not encoded, set after it is sent, used to include in follow-up messages + */ + public retrievalHint?: Uint8Array | undefined + ) { + if (!content || !content.length) { + throw Error("Ephemeral Message must have content"); + } + super( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content, + retrievalHint + ); + } +} + +export function isEphemeralMessage( + message: Message | ContentMessage | SyncMessage | EphemeralMessage +): message is EphemeralMessage { + return Boolean( + message.lamportTimestamp === undefined && + "content" in message && + message.content && + message.content.length + ); +} + +export class ContentMessage extends Message { + public constructor( + public messageId: string, + public channelId: string, + public senderId: string, + public causalHistory: proto_sds_message.HistoryEntry[], + public lamportTimestamp: number, + public bloomFilter: Uint8Array | undefined, + public content: Uint8Array, + /** + * Not encoded, set after it is sent, used to include in follow-up messages + */ + public retrievalHint?: Uint8Array | undefined + ) { + if (!content.length) { + throw Error("Content Message must have content"); + } + super( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content, + retrievalHint + ); + } + + // `valueOf` is used by comparison operands such as `<` + public valueOf(): string { + // Create a sortable string representation that matches the compare logic + // Pad lamportTimestamp to ensure proper lexicographic ordering + // Use 16 digits to handle up to Number.MAX_SAFE_INTEGER (9007199254740991) + const paddedTimestamp = this.lamportTimestamp.toString().padStart(16, "0"); + return `${paddedTimestamp}_${this.messageId}`; + } +} + +export function isContentMessage( + message: Message | ContentMessage +): message is ContentMessage { + return Boolean( + "lamportTimestamp" in message && + typeof message.lamportTimestamp === "number" && + message.content && + message.content.length + ); +} diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index f290a9b99d..624e9a97e8 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -3,14 +3,17 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; +import { MessageChannelEvent } from "./events.js"; import { + ContentMessage, HistoryEntry, Message, - MessageChannelEvent, - MessageId -} from "./events.js"; + MessageId, + SyncMessage +} from "./message.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, + ILocalHistory, MessageChannel } from "./message_channel.js"; @@ -35,12 +38,20 @@ const messagesB = [ const sendMessage = async ( channel: MessageChannel, payload: Uint8Array, - callback: (message: Message) => Promise<{ success: boolean }> + callback: (message: ContentMessage) => Promise<{ success: boolean }> ): Promise => { await channel.pushOutgoingMessage(payload, callback); await channel.processTasks(); }; +const sendSyncMessage = async ( + channel: MessageChannel, + callback: (message: SyncMessage) => Promise +): Promise => { + await channel.pushOutgoingSyncMessage(callback); + await channel.processTasks(); +}; + const receiveMessage = async ( channel: MessageChannel, message: Message @@ -61,39 +72,38 @@ describe("MessageChannel", function () { it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await sendMessage(channelA, new Uint8Array(), callback); + await sendMessage(channelA, utf8ToBytes("message"), callback); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); }); it("should push the message to the outgoing buffer", async () => { const bufferLengthBefore = (channelA as any).outgoingBuffer.length; - await sendMessage(channelA, new Uint8Array(), callback); + await sendMessage(channelA, utf8ToBytes("message"), callback); const bufferLengthAfter = (channelA as any).outgoingBuffer.length; expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); }); it("should insert message into bloom filter", async () => { - const messageId = MessageChannel.getMessageId(new Uint8Array()); - await sendMessage(channelA, new Uint8Array(), callback); + const payload = utf8ToBytes("message"); + const messageId = MessageChannel.getMessageId(payload); + await sendMessage(channelA, payload, callback); const bloomFilter = getBloomFilter(channelA); expect(bloomFilter.lookup(messageId)).to.equal(true); }); it("should insert message id into causal history", async () => { + const payload = utf8ToBytes("message"); const expectedTimestamp = (channelA as any).lamportTimestamp + 1; - const messageId = MessageChannel.getMessageId(new Uint8Array()); - await sendMessage(channelA, new Uint8Array(), callback); - const messageIdLog = (channelA as any).localHistory as { - timestamp: number; - historyEntry: HistoryEntry; - }[]; + const messageId = MessageChannel.getMessageId(payload); + await sendMessage(channelA, payload, callback); + const messageIdLog = (channelA as any).localHistory as ILocalHistory; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( (log) => - log.timestamp === expectedTimestamp && - log.historyEntry.messageId === messageId + log.lamportTimestamp === expectedTimestamp && + log.messageId === messageId ) ).to.equal(true); }); @@ -547,7 +557,7 @@ describe("MessageChannel", function () { it("should mark a message as irretrievably lost if timeout is exceeded", async () => { // Create a channel with very very short timeout const channelC: MessageChannel = new MessageChannel(channelId, "carol", { - timeoutToMarkMessageIrretrievableMs: 10 + timeoutForLostMessagesMs: 10 }); for (const m of messagesA) { @@ -558,16 +568,13 @@ describe("MessageChannel", function () { const messageToBeLostId = MessageChannel.getMessageId( utf8ToBytes(messagesA[0]) ); - channelC.addEventListener( - MessageChannelEvent.InMessageIrretrievablyLost, - (event) => { - for (const hist of event.detail) { - if (hist.messageId === messageToBeLostId) { - irretrievablyLost = true; - } + channelC.addEventListener(MessageChannelEvent.InMessageLost, (event) => { + for (const hist of event.detail) { + if (hist.messageId === messageToBeLostId) { + irretrievablyLost = true; } } - ); + }); await sendMessage( channelA, @@ -591,7 +598,7 @@ describe("MessageChannel", function () { const causalHistorySize = (channelA as any).causalHistorySize; // Create a channel with very very short timeout const channelC: MessageChannel = new MessageChannel(channelId, "carol", { - timeoutToMarkMessageIrretrievableMs: 10 + timeoutForLostMessagesMs: 10 }); for (const m of messagesA) { @@ -675,7 +682,7 @@ describe("MessageChannel", function () { it("should be sent with empty content", async () => { await channelA.pushOutgoingSyncMessage(async (message) => { - expect(message.content?.length).to.equal(0); + expect(message.content).to.be.undefined; return true; }); }); @@ -727,9 +734,9 @@ describe("MessageChannel", function () { }); } - await sendMessage(channelB, new Uint8Array(), async (message) => { + await sendSyncMessage(channelB, async (message) => { await receiveMessage(channelA, message); - return { success: true }; + return true; }); const causalHistorySize = (channelA as any).causalHistorySize; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index d44471d68c..6ae3799801 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -6,22 +6,28 @@ import { Logger } from "@waku/utils"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; +import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; +import { MemLocalHistory } from "./mem_local_history.js"; import { - type ChannelId, - type HistoryEntry, + ChannelId, + ContentMessage, + EphemeralMessage, + HistoryEntry, + isContentMessage, + isEphemeralMessage, + isSyncMessage, Message, - MessageChannelEvent, - MessageChannelEvents, - type MessageId, - type SenderId -} from "./events.js"; + MessageId, + SenderId, + SyncMessage +} from "./message.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, errorRate: 0.001 }; -const DEFAULT_CAUSAL_HISTORY_SIZE = 2; +const DEFAULT_CAUSAL_HISTORY_SIZE = 200; const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; const log = new Logger("waku:sds:message-channel"); @@ -35,26 +41,31 @@ export interface MessageChannelOptions { * * @default undefined because it is coupled to processTask calls frequency */ - timeoutToMarkMessageIrretrievableMs?: number; + timeoutForLostMessagesMs?: number; /** * How many possible acks does it take to consider it a definitive ack. */ possibleAcksThreshold?: number; } +export type ILocalHistory = Pick< + Array, + "some" | "push" | "slice" | "find" | "length" +>; + export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: SenderId; private lamportTimestamp: number; private filter: DefaultBloomFilter; - private outgoingBuffer: Message[]; + private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; - private incomingBuffer: Message[]; - private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; + private incomingBuffer: Array; + private localHistory: ILocalHistory; private timeReceived: Map; private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; - private readonly timeoutToMarkMessageIrretrievableMs?: number; + private readonly timeoutForLostMessagesMs?: number; private tasks: Task[] = []; private handlers: Handlers = { @@ -78,7 +89,8 @@ export class MessageChannel extends TypedEventEmitter { public constructor( channelId: ChannelId, senderId: SenderId, - options: MessageChannelOptions = {} + options: MessageChannelOptions = {}, + localHistory: ILocalHistory = new MemLocalHistory() ) { super(); this.channelId = channelId; @@ -88,15 +100,14 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.possibleAcks = new Map(); this.incomingBuffer = []; - this.localHistory = []; + this.localHistory = localHistory; this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes this.possibleAcksThreshold = options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD; this.timeReceived = new Map(); - this.timeoutToMarkMessageIrretrievableMs = - options.timeoutToMarkMessageIrretrievableMs; + this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs; } public static getMessageId(payload: Uint8Array): MessageId { @@ -121,7 +132,8 @@ export class MessageChannel extends TypedEventEmitter { * await channel.processTasks(); * ``` * - * @throws Will emit a 'taskError' event if any task fails, but continues processing remaining tasks + * @emits CustomEvent("taskError", { detail: { command, error, params } } + * if any task fails, but continues processing remaining tasks */ public async processTasks(): Promise { while (this.tasks.length > 0) { @@ -141,7 +153,9 @@ export class MessageChannel extends TypedEventEmitter { * This ensures proper lamport timestamp ordering and causal history tracking. * * @param payload - The message content as a byte array - * @param callback - Optional callback function called after the message is processed + * @param callback - callback function that should propagate the message + * on the routing layer; `success` should be false if sending irremediably fails, + * when set to true, the message is finalized into the channel locally. * @returns Promise that resolves when the message is queued (not sent) * * @example @@ -157,14 +171,19 @@ export class MessageChannel extends TypedEventEmitter { * // Actually send the message * await channel.processTasks(); * ``` + * + * @throws Error if the payload is empty */ public async pushOutgoingMessage( payload: Uint8Array, - callback?: (processedMessage: Message) => Promise<{ + callback?: (processedMessage: ContentMessage) => Promise<{ success: boolean; retrievalHint?: Uint8Array; }> ): Promise { + if (!payload || !payload.length) { + throw Error("Only messages with valid payloads are allowed"); + } this.tasks.push({ command: Command.Send, params: { @@ -235,7 +254,7 @@ export class MessageChannel extends TypedEventEmitter { */ public sweepIncomingBuffer(): HistoryEntry[] { const { buffer, missing } = this.incomingBuffer.reduce<{ - buffer: Message[]; + buffer: Array; missing: Set; }>( ({ buffer, missing }, message) => { @@ -248,12 +267,11 @@ export class MessageChannel extends TypedEventEmitter { const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => !this.localHistory.some( - ({ historyEntry: { messageId } }) => - messageId === messageHistoryEntry.messageId + ({ messageId }) => messageId === messageHistoryEntry.messageId ) ); if (missingDependencies.length === 0) { - if (this.deliverMessage(message)) { + if (isContentMessage(message) && this.deliverMessage(message)) { this.safeSendEvent(MessageChannelEvent.InMessageDelivered, { detail: message.messageId }); @@ -269,13 +287,13 @@ export class MessageChannel extends TypedEventEmitter { // Optionally, if a message has not been received after a predetermined amount of time, // its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery) - if (this.timeoutToMarkMessageIrretrievableMs) { + if (this.timeoutForLostMessagesMs) { const timeReceived = this.timeReceived.get(message.messageId); if ( timeReceived && - Date.now() - timeReceived > this.timeoutToMarkMessageIrretrievableMs + Date.now() - timeReceived > this.timeoutForLostMessagesMs ) { - this.safeSendEvent(MessageChannelEvent.InMessageIrretrievablyLost, { + this.safeSendEvent(MessageChannelEvent.InMessageLost, { detail: Array.from(missingDependencies) }); return { buffer, missing }; @@ -289,7 +307,7 @@ export class MessageChannel extends TypedEventEmitter { missing }; }, - { buffer: new Array(), missing: new Set() } + { buffer: new Array(), missing: new Set() } ); this.incomingBuffer = buffer; @@ -302,12 +320,12 @@ export class MessageChannel extends TypedEventEmitter { // https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep public sweepOutgoingBuffer(): { - unacknowledged: Message[]; - possiblyAcknowledged: Message[]; + unacknowledged: ContentMessage[]; + possiblyAcknowledged: ContentMessage[]; } { return this.outgoingBuffer.reduce<{ - unacknowledged: Message[]; - possiblyAcknowledged: Message[]; + unacknowledged: ContentMessage[]; + possiblyAcknowledged: ContentMessage[]; }>( ({ unacknowledged, possiblyAcknowledged }, message) => { if (this.possibleAcks.has(message.messageId)) { @@ -322,8 +340,8 @@ export class MessageChannel extends TypedEventEmitter { }; }, { - unacknowledged: new Array(), - possiblyAcknowledged: new Array() + unacknowledged: new Array(), + possiblyAcknowledged: new Array() } ); } @@ -339,27 +357,28 @@ export class MessageChannel extends TypedEventEmitter { * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ public async pushOutgoingSyncMessage( - callback?: (message: Message) => Promise + callback?: (message: SyncMessage) => Promise ): Promise { this.lamportTimestamp++; - - const emptyMessage = new Uint8Array(); - - const message = new Message( - MessageChannel.getMessageId(emptyMessage), + const message = new SyncMessage( + // does not need to be secure randomness + `sync-${Math.random().toString(36).substring(2)}`, this.channelId, this.senderId, this.localHistory .slice(-this.causalHistorySize) - .map(({ historyEntry }) => historyEntry), + .map(({ messageId, retrievalHint }) => { + return { messageId, retrievalHint }; + }), this.lamportTimestamp, this.filter.toBytes(), - emptyMessage + undefined ); if (callback) { try { await callback(message); + log.info(this.senderId, "sync message sent", message.messageId); this.safeSendEvent(MessageChannelEvent.OutSyncSent, { detail: message }); @@ -376,26 +395,41 @@ export class MessageChannel extends TypedEventEmitter { } private _pushIncomingMessage(message: Message): void { + log.info(this.senderId, "incoming message", message.messageId); const isDuplicate = message.content && message.content.length > 0 && this.timeReceived.has(message.messageId); if (isDuplicate) { + log.info( + this.senderId, + "dropping dupe incoming message", + message.messageId + ); return; } const isOwnOutgoingMessage = this.senderId === message.senderId; if (isOwnOutgoingMessage) { + log.info(this.senderId, "ignoring own incoming message"); return; } // Ephemeral messages SHOULD be delivered immediately - if (!message.lamportTimestamp) { - this.deliverMessage(message); + if (isEphemeralMessage(message)) { + log.info(this.senderId, "delivering ephemeral message"); return; } - if (message.content?.length === 0) { + if (!isSyncMessage(message) && !isContentMessage(message)) { + log.error( + this.senderId, + "internal error, a message is neither sync nor ephemeral nor content, ignoring it", + message + ); + return; + } + if (isSyncMessage(message)) { this.safeSendEvent(MessageChannelEvent.InSyncReceived, { detail: message }); @@ -405,15 +439,14 @@ export class MessageChannel extends TypedEventEmitter { }); } this.reviewAckStatus(message); - if (message.content?.length && message.content.length > 0) { + if (isContentMessage(message)) { this.filter.insert(message.messageId); } const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => !this.localHistory.some( - ({ historyEntry: { messageId } }) => - messageId === messageHistoryEntry.messageId + ({ messageId }) => messageId === messageHistoryEntry.messageId ) ); @@ -427,7 +460,7 @@ export class MessageChannel extends TypedEventEmitter { missingDependencies.map((ch) => ch.messageId) ); } else { - if (this.deliverMessage(message)) { + if (isContentMessage(message) && this.deliverMessage(message)) { this.safeSendEvent(MessageChannelEvent.InMessageDelivered, { detail: message.messageId }); @@ -465,7 +498,7 @@ export class MessageChannel extends TypedEventEmitter { private async _pushOutgoingMessage( payload: Uint8Array, - callback?: (message: Message) => Promise<{ + callback?: (message: ContentMessage) => Promise<{ success: boolean; retrievalHint?: Uint8Array; }> @@ -484,33 +517,35 @@ export class MessageChannel extends TypedEventEmitter { // It's a new message if (!message) { - message = new Message( + log.info(this.senderId, "sending new message", messageId); + message = new ContentMessage( messageId, this.channelId, this.senderId, this.localHistory .slice(-this.causalHistorySize) - .map(({ historyEntry }) => historyEntry), + .map(({ messageId, retrievalHint }) => { + return { messageId, retrievalHint }; + }), this.lamportTimestamp, this.filter.toBytes(), payload ); this.outgoingBuffer.push(message); + } else { + log.info(this.senderId, "resending message", messageId); } if (callback) { try { const { success, retrievalHint } = await callback(message); - if (success) { + // isContentMessage should always be true as `this.lamportTimestamp` has been + // used to create the message + if (success && isContentMessage(message)) { + message.retrievalHint = retrievalHint; this.filter.insert(messageId); - this.localHistory.push({ - timestamp: this.lamportTimestamp, - historyEntry: { - messageId, - retrievalHint - } - }); + this.localHistory.push(message); this.timeReceived.set(messageId, Date.now()); this.safeSendEvent(MessageChannelEvent.OutMessageSent, { detail: message @@ -525,9 +560,9 @@ export class MessageChannel extends TypedEventEmitter { private async _pushOutgoingEphemeralMessage( payload: Uint8Array, - callback?: (message: Message) => Promise + callback?: (message: EphemeralMessage) => Promise ): Promise { - const message = new Message( + const message = new EphemeralMessage( MessageChannel.getMessageId(payload), this.channelId, this.senderId, @@ -559,13 +594,10 @@ export class MessageChannel extends TypedEventEmitter { */ // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage( - message: Message, + message: ContentMessage, retrievalHint?: Uint8Array ): boolean { - if ( - message.content?.length === 0 || - message.lamportTimestamp === undefined - ) { + if (!isContentMessage(message)) { // Messages with empty content are sync messages. // Messages with no timestamp are ephemeral messages. // They do not need to be "delivered". @@ -580,7 +612,7 @@ export class MessageChannel extends TypedEventEmitter { // Check if the entry is already present const existingHistoryEntry = this.localHistory.find( - ({ historyEntry }) => historyEntry.messageId === message.messageId + ({ messageId }) => messageId === message.messageId ); // The history entry is already present, no need to re-add @@ -588,24 +620,9 @@ export class MessageChannel extends TypedEventEmitter { return true; } - // The participant MUST insert the message ID into its local log, - // based on Lamport timestamp. - // If one or more message IDs with the same Lamport timestamp already exists, - // the participant MUST follow the Resolve Conflicts procedure. - // https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts - this.localHistory.push({ - timestamp: message.lamportTimestamp, - historyEntry: { - messageId: message.messageId, - retrievalHint - } - }); - this.localHistory.sort((a, b) => { - if (a.timestamp !== b.timestamp) { - return a.timestamp - b.timestamp; - } - return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId); - }); + message.retrievalHint = retrievalHint; + + this.localHistory.push(message); return true; } @@ -615,7 +632,7 @@ export class MessageChannel extends TypedEventEmitter { private reviewAckStatus(receivedMessage: Message): void { log.info( this.senderId, - "reviewing ack status using:", + "reviewing ack status using causal history:", receivedMessage.causalHistory.map((ch) => ch.messageId) ); log.info( @@ -625,24 +642,23 @@ export class MessageChannel extends TypedEventEmitter { ); receivedMessage.causalHistory.forEach(({ messageId }) => { this.outgoingBuffer = this.outgoingBuffer.filter( - ({ messageId: outgoingMessageId }) => { - if (outgoingMessageId !== messageId) { + ({ messageId: bufferMessageId }) => { + if (bufferMessageId !== messageId) { return true; } + log.info(this.senderId, "message acknowledged", messageId); this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, { detail: messageId }); return false; } ); - this.possibleAcks.delete(messageId); - if (!this.filter.lookup(messageId)) { - this.filter.insert(messageId); - } }); + if (!receivedMessage.bloomFilter) { return; } + const messageBloomFilter = DefaultBloomFilter.fromBytes( receivedMessage.bloomFilter, this.filter.options @@ -657,15 +673,27 @@ export class MessageChannel extends TypedEventEmitter { const count = (this.possibleAcks.get(message.messageId) ?? 0) + 1; if (count < this.possibleAcksThreshold) { this.possibleAcks.set(message.messageId, count); + log.info( + this.senderId, + "message possibly acknowledged", + message.messageId, + count + ); this.safeSendEvent(MessageChannelEvent.OutMessagePossiblyAcknowledged, { detail: { messageId: message.messageId, count } }); + // Not enough possible acks to acknowledge it, keep it in buffer return true; } + // Enough possible acks for it to be acknowledged this.possibleAcks.delete(message.messageId); + log.info(this.senderId, "message acknowledged", message.messageId, count); + this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, { + detail: message.messageId + }); return false; }); }