From dc5155056b2f8583ffc4340701466f4820501c4a Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Tue, 12 Aug 2025 10:47:52 +1000 Subject: [PATCH] feat!: SDS improvements and fixes (#2539) * introduce `MessageId` type # Conflicts: # packages/sds/src/message_channel/message_channel.ts * fix: own messages are not used for ack * fix: own messages are not used for ack * doc: long term solution is SDS protocol change * SDS: renaming to match message function * SDS: introduce `Message` class for easier encoding/decoding # Conflicts: # packages/sds/src/message_channel/events.ts # packages/sds/src/message_channel/message_channel.ts * SDS Message is a class now * SDS: it's "possibly" not "partially" acknowledged. * SDS: TODO * SDS: fix tests * SDS: make logs start with `waku` * SDS: add bloom filter test # Conflicts: # packages/sds/src/message_channel/events.spec.ts * SDS: improve naming * SDS: improve naming Messages are not "sent" or received, but pushed for processing in local queues. * SDS: sync message should not be delivered * SDS: renaming from earlier * SDS: remove useless variable * SDS: Fix comment * SDS: sync messages do not get "delivered" * SDS: acks * SDS: simplify delivered event * SDS: improve event naming * SDS: fix comment * SDS: make task error an official event * SDS: Mark messages that are irretrievably lost * SDS: remove default for irretrievable and simplify config * SDS: typo on sync event * SDS: add and user sender id * SDS: resent message never get ack'd * SDS: fix cylic dependencies * SDS: helpful logs * SDS: avoid duplicate history entries * SDS: export options --- .cspell.json | 1 + packages/proto/src/generated/sds_message.ts | 11 + packages/proto/src/lib/sds_message.proto | 4 +- packages/sds/src/index.ts | 12 +- .../sds/src/message_channel/events.spec.ts | 22 +- packages/sds/src/message_channel/events.ts | 80 +++-- .../message_channel/message_channel.spec.ts | 196 ++++++++--- .../src/message_channel/message_channel.ts | 319 +++++++++++------- 8 files changed, 431 insertions(+), 214 deletions(-) diff --git a/.cspell.json b/.cspell.json index f4e425d684..e8582e38b0 100644 --- a/.cspell.json +++ b/.cspell.json @@ -4,6 +4,7 @@ "language": "en", "words": [ "abortable", + "acks", "Addrs", "ahadns", "Alives", diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index 017b8caa75..20ef8746ae 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -81,6 +81,7 @@ export namespace HistoryEntry { } export interface SdsMessage { + senderId: string messageId: string channelId: string lamportTimestamp?: number @@ -99,6 +100,11 @@ export namespace SdsMessage { w.fork() } + if ((obj.senderId != null && obj.senderId !== '')) { + w.uint32(10) + w.string(obj.senderId) + } + if ((obj.messageId != null && obj.messageId !== '')) { w.uint32(18) w.string(obj.messageId) @@ -136,6 +142,7 @@ export namespace SdsMessage { } }, (reader, length, opts = {}) => { const obj: any = { + senderId: '', messageId: '', channelId: '', causalHistory: [] @@ -147,6 +154,10 @@ export namespace SdsMessage { const tag = reader.uint32() switch (tag >>> 3) { + case 1: { + obj.senderId = reader.string() + break + } case 2: { obj.messageId = reader.string() break diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index c75c8d7447..5344a0d33a 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -6,11 +6,11 @@ message HistoryEntry { } message SdsMessage { - // 1 Reserved for sender/participant id + string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes content = 20; // Actual content of the message -} \ No newline at end of file +} diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index 9ce349a462..f80280732b 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -3,15 +3,15 @@ import { BloomFilter } from "./bloom_filter/bloom.js"; export { MessageChannel, MessageChannelEvent, - encodeMessage, - decodeMessage + MessageChannelOptions } from "./message_channel/index.js"; -export type { +export { Message, - HistoryEntry, - ChannelId, - MessageChannelEvents + type HistoryEntry, + type ChannelId, + type MessageChannelEvents, + type SenderId } from "./message_channel/index.js"; export { BloomFilter }; diff --git a/packages/sds/src/message_channel/events.spec.ts b/packages/sds/src/message_channel/events.spec.ts index eb3ce23d18..18508208c0 100644 --- a/packages/sds/src/message_channel/events.spec.ts +++ b/packages/sds/src/message_channel/events.spec.ts @@ -2,7 +2,7 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; -import { decodeMessage, encodeMessage, Message } from "./events.js"; +import { Message } from "./events.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS } from "./message_channel.js"; describe("Message serialization", () => { @@ -12,16 +12,18 @@ describe("Message serialization", () => { const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); bloomFilter.insert(messageId); - const message: Message = { - messageId: "123", - channelId: "my-channel", - causalHistory: [], - lamportTimestamp: 0, - bloomFilter: bloomFilter.toBytes() - }; + const message = new Message( + "123", + "my-channel", + "me", + [], + 0, + bloomFilter.toBytes(), + undefined + ); - const bytes = encodeMessage(message); - const decMessage = decodeMessage(bytes); + const bytes = message.encode(); + const decMessage = Message.decode(bytes); const decBloomFilter = DefaultBloomFilter.fromBytes( decMessage!.bloomFilter!, diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index 546f6aafad..055269108d 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -1,42 +1,72 @@ import { proto_sds_message } from "@waku/proto"; export enum MessageChannelEvent { - MessageSent = "messageSent", - MessageDelivered = "messageDelivered", - MessageReceived = "messageReceived", - MessageAcknowledged = "messageAcknowledged", - PartialAcknowledgement = "partialAcknowledgement", - MissedMessages = "missedMessages", - SyncSent = "syncSent", - SyncReceived = "syncReceived" + OutMessageSent = "sds:out:message-sent", + InMessageDelivered = "sds:in:message-delivered", + InMessageReceived = "sds:in:message-received", + OutMessageAcknowledged = "sds:out:message-acknowledged", + OutMessagePossiblyAcknowledged = "sds:out:message-possibly-acknowledged", + InMessageMissing = "sds:in:message-missing", + OutSyncSent = "sds:out:sync-sent", + InSyncReceived = "sds:in:sync-received", + InMessageIrretrievablyLost = "sds:in:message-irretrievably-lost", + ErrorTask = "sds:error-task" } export type MessageId = string; -export type Message = proto_sds_message.SdsMessage; export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; +export type SenderId = string; -export function encodeMessage(message: Message): Uint8Array { - return proto_sds_message.SdsMessage.encode(message); -} +export class Message implements proto_sds_message.SdsMessage { + public constructor( + public messageId: string, + public channelId: string, + public senderId: string, + public causalHistory: proto_sds_message.HistoryEntry[], + public lamportTimestamp?: number | undefined, + public bloomFilter?: Uint8Array | undefined, + public content?: Uint8Array | undefined + ) {} -export function decodeMessage(data: Uint8Array): Message { - return proto_sds_message.SdsMessage.decode(data); + public encode(): Uint8Array { + return proto_sds_message.SdsMessage.encode(this); + } + + public static decode(data: Uint8Array): Message { + const { + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content + } = proto_sds_message.SdsMessage.decode(data); + return new Message( + messageId, + channelId, + senderId, + causalHistory, + lamportTimestamp, + bloomFilter, + content + ); + } } export type MessageChannelEvents = { - [MessageChannelEvent.MessageSent]: CustomEvent; - [MessageChannelEvent.MessageDelivered]: CustomEvent<{ - messageId: MessageId; - sentOrReceived: "sent" | "received"; - }>; - [MessageChannelEvent.MessageReceived]: CustomEvent; - [MessageChannelEvent.MessageAcknowledged]: CustomEvent; - [MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{ + [MessageChannelEvent.OutMessageSent]: CustomEvent; + [MessageChannelEvent.InMessageDelivered]: CustomEvent; + [MessageChannelEvent.InMessageReceived]: CustomEvent; + [MessageChannelEvent.OutMessageAcknowledged]: CustomEvent; + [MessageChannelEvent.OutMessagePossiblyAcknowledged]: CustomEvent<{ messageId: MessageId; count: number; }>; - [MessageChannelEvent.MissedMessages]: CustomEvent; - [MessageChannelEvent.SyncSent]: CustomEvent; - [MessageChannelEvent.SyncReceived]: CustomEvent; + [MessageChannelEvent.InMessageMissing]: CustomEvent; + [MessageChannelEvent.InMessageIrretrievablyLost]: CustomEvent; + [MessageChannelEvent.OutSyncSent]: CustomEvent; + [MessageChannelEvent.InSyncReceived]: CustomEvent; + [MessageChannelEvent.ErrorTask]: CustomEvent; }; diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index e541da8d34..f290a9b99d 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -3,7 +3,12 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; -import { HistoryEntry, Message, MessageId } from "./events.js"; +import { + HistoryEntry, + Message, + MessageChannelEvent, + MessageId +} from "./events.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, MessageChannel @@ -32,7 +37,7 @@ const sendMessage = async ( payload: Uint8Array, callback: (message: Message) => Promise<{ success: boolean }> ): Promise => { - await channel.sendMessage(payload, callback); + await channel.pushOutgoingMessage(payload, callback); await channel.processTasks(); }; @@ -40,7 +45,7 @@ const receiveMessage = async ( channel: MessageChannel, message: Message ): Promise => { - channel.receiveMessage(message); + channel.pushIncomingMessage(message); await channel.processTasks(); }; @@ -51,7 +56,7 @@ describe("MessageChannel", function () { describe("sending a message ", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice"); }); it("should increase lamport timestamp", async () => { @@ -133,13 +138,13 @@ describe("MessageChannel", function () { describe("receiving a message", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); - channelB = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice"); + channelB = new MessageChannel(channelId, "bob"); }); it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await sendMessage(channelB, new Uint8Array(), async (message) => { + await sendMessage(channelB, utf8ToBytes("message"), async (message) => { await receiveMessage(channelA, message); return { success: true }; }); @@ -241,8 +246,10 @@ describe("MessageChannel", function () { describe("reviewing ack status", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); - channelB = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice", { + causalHistorySize: 2 + }); + channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should mark all messages in causal history as acknowledged", async () => { @@ -309,7 +316,7 @@ describe("MessageChannel", function () { }); it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { - const acknowledgementCount = (channelA as any).acknowledgementCount; + const possibleAcksThreshold = (channelA as any).possibleAcksThreshold; const causalHistorySize = (channelA as any).causalHistorySize; @@ -341,8 +348,8 @@ describe("MessageChannel", function () { } ); - const acknowledgements: ReadonlyMap = (channelA as any) - .acknowledgements; + const possibleAcks: ReadonlyMap = (channelA as any) + .possibleAcks; // Other than the message IDs which were included in causal history, // the remaining messages sent by channel A should be considered possibly acknowledged // for having been included in the bloom filter sent from channel B @@ -350,24 +357,24 @@ describe("MessageChannel", function () { if (expectedAcknowledgementsSize <= 0) { throw new Error("expectedAcknowledgementsSize must be greater than 0"); } - expect(acknowledgements.size).to.equal(expectedAcknowledgementsSize); + expect(possibleAcks.size).to.equal(expectedAcknowledgementsSize); // Channel B only included the last N messages in causal history messages.slice(0, -causalHistorySize).forEach((m) => { expect( - acknowledgements.get(MessageChannel.getMessageId(utf8ToBytes(m))) + possibleAcks.get(MessageChannel.getMessageId(utf8ToBytes(m))) ).to.equal(1); }); // Messages that never reached channel B should not be acknowledged unacknowledgedMessages.forEach((m) => { expect( - acknowledgements.has(MessageChannel.getMessageId(utf8ToBytes(m))) + possibleAcks.has(MessageChannel.getMessageId(utf8ToBytes(m))) ).to.equal(false); }); // When channel C sends more messages, it will include all the same messages // in the bloom filter as before, which should mark them as fully acknowledged in channel A - for (let i = 1; i < acknowledgementCount; i++) { + for (let i = 1; i < possibleAcksThreshold; i++) { // Send messages until acknowledgement count is reached await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => { await receiveMessage(channelA, message); @@ -375,8 +382,8 @@ describe("MessageChannel", function () { }); } - // No more partial acknowledgements should be in channel A - expect(acknowledgements.size).to.equal(0); + // No more possible acknowledgements should be in channel A + expect(possibleAcks.size).to.equal(0); // Messages that were not acknowledged should still be in the outgoing buffer expect((channelA as any).outgoingBuffer.length).to.equal( @@ -400,17 +407,64 @@ describe("MessageChannel", function () { }); } - const acknowledgements: ReadonlyMap = (channelA as any) - .acknowledgements; + const possibleAcks: ReadonlyMap = (channelA as any) + .possibleAcks; - expect(acknowledgements.size).to.equal(0); + expect(possibleAcks.size).to.equal(0); + }); + + it("First message is missed, then re-sent, should be ack'd", async () => { + const firstMessage = utf8ToBytes("first message"); + const firstMessageId = MessageChannel.getMessageId(firstMessage); + console.log("firstMessage", firstMessageId); + let messageAcked = false; + channelA.addEventListener( + MessageChannelEvent.OutMessageAcknowledged, + (event) => { + if (firstMessageId === event.detail) { + messageAcked = true; + } + } + ); + + await sendMessage(channelA, firstMessage, callback); + + const secondMessage = utf8ToBytes("second message"); + await sendMessage(channelA, secondMessage, async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + }); + + const thirdMessage = utf8ToBytes("third message"); + await sendMessage(channelB, thirdMessage, async (message) => { + await receiveMessage(channelA, message); + return { success: true }; + }); + + expect(messageAcked).to.be.false; + + // Now, A resends first message, and B is receiving it. + await sendMessage(channelA, firstMessage, async (message) => { + await receiveMessage(channelB, message); + return { success: true }; + }); + + // And be sends a sync message + await channelB.pushOutgoingSyncMessage(async (message) => { + await receiveMessage(channelA, message); + return true; + }); + + expect(messageAcked).to.be.true; }); }); describe("Sweeping incoming buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); - channelB = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice", { + causalHistorySize: 2 + }); + channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should detect messages with missing dependencies", async () => { @@ -490,12 +544,54 @@ describe("MessageChannel", function () { expect(incomingBuffer.length).to.equal(0); }); + it("should mark a message as irretrievably lost if timeout is exceeded", async () => { + // Create a channel with very very short timeout + const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + timeoutToMarkMessageIrretrievableMs: 10 + }); + + for (const m of messagesA) { + await sendMessage(channelA, utf8ToBytes(m), callback); + } + + let irretrievablyLost = false; + const messageToBeLostId = MessageChannel.getMessageId( + utf8ToBytes(messagesA[0]) + ); + channelC.addEventListener( + MessageChannelEvent.InMessageIrretrievablyLost, + (event) => { + for (const hist of event.detail) { + if (hist.messageId === messageToBeLostId) { + irretrievablyLost = true; + } + } + } + ); + + await sendMessage( + channelA, + utf8ToBytes(messagesB[0]), + async (message) => { + await receiveMessage(channelC, message); + return { success: true }; + } + ); + + channelC.sweepIncomingBuffer(); + + await new Promise((resolve) => setTimeout(resolve, 20)); + + channelC.sweepIncomingBuffer(); + + expect(irretrievablyLost).to.be.true; + }); + it("should remove messages without delivering if timeout is exceeded", async () => { const causalHistorySize = (channelA as any).causalHistorySize; // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, { - receivedMessageTimeoutEnabled: true, - receivedMessageTimeout: 10 + const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + timeoutToMarkMessageIrretrievableMs: 10 }); for (const m of messagesA) { @@ -526,15 +622,15 @@ describe("MessageChannel", function () { describe("Sweeping outgoing buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); - channelB = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice", { + causalHistorySize: 2 + }); + channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should partition messages based on acknowledgement status", async () => { - const unacknowledgedMessages: Message[] = []; for (const m of messagesA) { await sendMessage(channelA, utf8ToBytes(m), async (message) => { - unacknowledgedMessages.push(message); await receiveMessage(channelB, message); return { success: true }; }); @@ -571,19 +667,21 @@ describe("MessageChannel", function () { describe("Sync messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); - channelB = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice", { + causalHistorySize: 2 + }); + channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should be sent with empty content", async () => { - await channelA.sendSyncMessage(async (message) => { + await channelA.pushOutgoingSyncMessage(async (message) => { expect(message.content?.length).to.equal(0); return true; }); }); it("should not be added to outgoing buffer, bloom filter, or local log", async () => { - await channelA.sendSyncMessage(); + await channelA.pushOutgoingSyncMessage(); const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; expect(outgoingBuffer.length).to.equal(0); @@ -600,17 +698,14 @@ describe("MessageChannel", function () { expect(localLog.length).to.equal(0); }); - it("should be delivered but not added to local log or bloom filter", async () => { + it("should not be delivered", async () => { const timestampBefore = (channelB as any).lamportTimestamp; - let expectedTimestamp: number | undefined; - await channelA.sendSyncMessage(async (message) => { - expectedTimestamp = message.lamportTimestamp; + await channelA.pushOutgoingSyncMessage(async (message) => { await receiveMessage(channelB, message); return true; }); const timestampAfter = (channelB as any).lamportTimestamp; - expect(timestampAfter).to.equal(expectedTimestamp); - expect(timestampAfter).to.be.greaterThan(timestampBefore); + expect(timestampAfter).to.equal(timestampBefore); const localLog = (channelB as any).localHistory as { timestamp: number; @@ -647,17 +742,20 @@ describe("MessageChannel", function () { describe("Ephemeral messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId); + channelA = new MessageChannel(channelId, "alice"); }); it("should be sent without a timestamp, causal history, or bloom filter", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => { - expect(message.lamportTimestamp).to.equal(undefined); - expect(message.causalHistory).to.deep.equal([]); - expect(message.bloomFilter).to.equal(undefined); - return true; - }); + await channelA.pushOutgoingEphemeralMessage( + new Uint8Array(), + async (message) => { + expect(message.lamportTimestamp).to.equal(undefined); + expect(message.causalHistory).to.deep.equal([]); + expect(message.bloomFilter).to.equal(undefined); + return true; + } + ); const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; expect(outgoingBuffer.length).to.equal(0); @@ -667,14 +765,14 @@ describe("MessageChannel", function () { }); it("should be delivered immediately if received", async () => { - const channelB = new MessageChannel(channelId); + const channelB = new MessageChannel(channelId, "bob"); // Track initial state const localHistoryBefore = (channelB as any).localHistory.length; const incomingBufferBefore = (channelB as any).incomingBuffer.length; const timestampBefore = (channelB as any).lamportTimestamp; - await channelA.sendEphemeralMessage( + await channelA.pushOutgoingEphemeralMessage( utf8ToBytes(messagesA[0]), async (message) => { // Ephemeral messages should have no timestamp diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index b53e6caa4f..d44471d68c 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,12 +7,13 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { - ChannelId, - HistoryEntry, + type ChannelId, + type HistoryEntry, Message, MessageChannelEvent, MessageChannelEvents, - type MessageId + type MessageId, + type SenderId } from "./events.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -21,72 +22,81 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = { }; const DEFAULT_CAUSAL_HISTORY_SIZE = 2; -const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes +const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; -const log = new Logger("sds:message-channel"); +const log = new Logger("waku:sds:message-channel"); -interface MessageChannelOptions { +export interface MessageChannelOptions { causalHistorySize?: number; - receivedMessageTimeoutEnabled?: boolean; - receivedMessageTimeout?: number; + /** + * The time in milliseconds after which a message dependencies that could not + * be resolved is marked as irretrievable. + * Disabled if undefined or `0`. + * + * @default undefined because it is coupled to processTask calls frequency + */ + timeoutToMarkMessageIrretrievableMs?: number; + /** + * How many possible acks does it take to consider it a definitive ack. + */ + possibleAcksThreshold?: number; } export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; + public readonly senderId: SenderId; private lamportTimestamp: number; private filter: DefaultBloomFilter; private outgoingBuffer: Message[]; - private acknowledgements: Map; + private possibleAcks: Map; private incomingBuffer: Message[]; private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; private timeReceived: Map; - // TODO: To be removed once sender id is added to SDS protocol - private outgoingMessages: Set; private readonly causalHistorySize: number; - private readonly acknowledgementCount: number; - private readonly receivedMessageTimeoutEnabled: boolean; - private readonly receivedMessageTimeout: number; + private readonly possibleAcksThreshold: number; + private readonly timeoutToMarkMessageIrretrievableMs?: number; private tasks: Task[] = []; private handlers: Handlers = { [Command.Send]: async ( params: ParamsByAction[Command.Send] ): Promise => { - await this._sendMessage(params.payload, params.callback); + await this._pushOutgoingMessage(params.payload, params.callback); }, [Command.Receive]: async ( params: ParamsByAction[Command.Receive] ): Promise => { - this._receiveMessage(params.message); + this._pushIncomingMessage(params.message); }, [Command.SendEphemeral]: async ( params: ParamsByAction[Command.SendEphemeral] ): Promise => { - await this._sendEphemeralMessage(params.payload, params.callback); + await this._pushOutgoingEphemeralMessage(params.payload, params.callback); } }; public constructor( channelId: ChannelId, + senderId: SenderId, options: MessageChannelOptions = {} ) { super(); this.channelId = channelId; + this.senderId = senderId; this.lamportTimestamp = 0; this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); this.outgoingBuffer = []; - this.acknowledgements = new Map(); + this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = []; this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; - this.acknowledgementCount = this.getAcknowledgementCount(); + // TODO: this should be determined based on the bloom filter parameters and number of hashes + this.possibleAcksThreshold = + options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD; this.timeReceived = new Map(); - this.receivedMessageTimeoutEnabled = - options.receivedMessageTimeoutEnabled ?? false; - this.receivedMessageTimeout = - options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; - this.outgoingMessages = new Set(); + this.timeoutToMarkMessageIrretrievableMs = + options.timeoutToMarkMessageIrretrievableMs; } public static getMessageId(payload: Uint8Array): MessageId { @@ -104,8 +114,8 @@ export class MessageChannel extends TypedEventEmitter { * const channel = new MessageChannel("my-channel"); * * // Queue some operations - * await channel.sendMessage(payload, callback); - * channel.receiveMessage(incomingMessage); + * await channel.pushOutgoingMessage(payload, callback); + * channel.pushIncomingMessage(incomingMessage); * * // Process all queued operations * await channel.processTasks(); @@ -139,7 +149,7 @@ export class MessageChannel extends TypedEventEmitter { * const channel = new MessageChannel("chat-room"); * const message = new TextEncoder().encode("Hello, world!"); * - * await channel.sendMessage(message, async (processedMessage) => { + * await channel.pushOutgoingMessage(message, async (processedMessage) => { * console.log("Message processed:", processedMessage.messageId); * return { success: true }; * }); @@ -148,9 +158,9 @@ export class MessageChannel extends TypedEventEmitter { * await channel.processTasks(); * ``` */ - public async sendMessage( + public async pushOutgoingMessage( payload: Uint8Array, - callback?: (message: Message) => Promise<{ + callback?: (processedMessage: Message) => Promise<{ success: boolean; retrievalHint?: Uint8Array; }> @@ -177,9 +187,9 @@ export class MessageChannel extends TypedEventEmitter { * @param payload - The payload to send. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public async sendEphemeralMessage( + public async pushOutgoingEphemeralMessage( payload: Uint8Array, - callback?: (message: Message) => Promise + callback?: (processedMessage: Message) => Promise ): Promise { this.tasks.push({ command: Command.SendEphemeral, @@ -203,13 +213,13 @@ export class MessageChannel extends TypedEventEmitter { * const channel = new MessageChannel("chat-room"); * * // Receive a message from the network - * channel.receiveMessage(incomingMessage); + * channel.pushIncomingMessage(incomingMessage); * * // Process the received message * await channel.processTasks(); * ``` */ - public receiveMessage(message: Message): void { + public pushIncomingMessage(message: Message): void { this.tasks.push({ command: Command.Receive, params: { @@ -229,6 +239,12 @@ export class MessageChannel extends TypedEventEmitter { missing: Set; }>( ({ buffer, missing }, message) => { + log.info( + this.senderId, + "sweeping incoming buffer", + message.messageId, + message.causalHistory.map((ch) => ch.messageId) + ); const missingDependencies = message.causalHistory.filter( (messageHistoryEntry) => !this.localHistory.some( @@ -237,24 +253,31 @@ export class MessageChannel extends TypedEventEmitter { ) ); if (missingDependencies.length === 0) { - this.deliverMessage(message); - this.safeSendEvent(MessageChannelEvent.MessageDelivered, { - detail: { - messageId: message.messageId, - sentOrReceived: "received" - } - }); + if (this.deliverMessage(message)) { + this.safeSendEvent(MessageChannelEvent.InMessageDelivered, { + detail: message.messageId + }); + } return { buffer, missing }; } + log.info( + this.senderId, + message.messageId, + "is missing dependencies", + missingDependencies.map((ch) => ch.messageId) + ); // Optionally, if a message has not been received after a predetermined amount of time, - // it is marked as irretrievably lost (implicitly by removing it from the buffer without delivery) - if (this.receivedMessageTimeoutEnabled) { + // its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery) + if (this.timeoutToMarkMessageIrretrievableMs) { const timeReceived = this.timeReceived.get(message.messageId); if ( timeReceived && - Date.now() - timeReceived > this.receivedMessageTimeout + Date.now() - timeReceived > this.timeoutToMarkMessageIrretrievableMs ) { + this.safeSendEvent(MessageChannelEvent.InMessageIrretrievablyLost, { + detail: Array.from(missingDependencies) + }); return { buffer, missing }; } } @@ -270,7 +293,7 @@ export class MessageChannel extends TypedEventEmitter { ); this.incomingBuffer = buffer; - this.safeSendEvent(MessageChannelEvent.MissedMessages, { + this.safeSendEvent(MessageChannelEvent.InMessageMissing, { detail: Array.from(missing) }); @@ -287,7 +310,7 @@ export class MessageChannel extends TypedEventEmitter { possiblyAcknowledged: Message[]; }>( ({ unacknowledged, possiblyAcknowledged }, message) => { - if (this.acknowledgements.has(message.messageId)) { + if (this.possibleAcks.has(message.messageId)) { return { unacknowledged, possiblyAcknowledged: possiblyAcknowledged.concat(message) @@ -315,40 +338,44 @@ export class MessageChannel extends TypedEventEmitter { * * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public async sendSyncMessage( + public async pushOutgoingSyncMessage( callback?: (message: Message) => Promise ): Promise { this.lamportTimestamp++; const emptyMessage = new Uint8Array(); - const message: Message = { - messageId: MessageChannel.getMessageId(emptyMessage), - channelId: this.channelId, - lamportTimestamp: this.lamportTimestamp, - causalHistory: this.localHistory + const message = new Message( + MessageChannel.getMessageId(emptyMessage), + this.channelId, + this.senderId, + this.localHistory .slice(-this.causalHistorySize) .map(({ historyEntry }) => historyEntry), - bloomFilter: this.filter.toBytes(), - content: emptyMessage - }; + this.lamportTimestamp, + this.filter.toBytes(), + emptyMessage + ); if (callback) { try { await callback(message); - this.safeSendEvent(MessageChannelEvent.SyncSent, { + this.safeSendEvent(MessageChannelEvent.OutSyncSent, { detail: message }); return true; } catch (error) { - log.error("Callback execution failed in sendSyncMessage:", error); + log.error( + "Callback execution failed in pushOutgoingSyncMessage:", + error + ); throw error; } } return false; } - private _receiveMessage(message: Message): void { + private _pushIncomingMessage(message: Message): void { const isDuplicate = message.content && message.content.length > 0 && @@ -358,25 +385,22 @@ export class MessageChannel extends TypedEventEmitter { return; } - const isOwnOutgoingMessage = - message.content && - message.content.length > 0 && - this.outgoingMessages.has(MessageChannel.getMessageId(message.content)); - + const isOwnOutgoingMessage = this.senderId === message.senderId; if (isOwnOutgoingMessage) { return; } + // Ephemeral messages SHOULD be delivered immediately if (!message.lamportTimestamp) { this.deliverMessage(message); return; } if (message.content?.length === 0) { - this.safeSendEvent(MessageChannelEvent.SyncReceived, { + this.safeSendEvent(MessageChannelEvent.InSyncReceived, { detail: message }); } else { - this.safeSendEvent(MessageChannelEvent.MessageReceived, { + this.safeSendEvent(MessageChannelEvent.InMessageReceived, { detail: message }); } @@ -384,23 +408,30 @@ export class MessageChannel extends TypedEventEmitter { if (message.content?.length && message.content.length > 0) { this.filter.insert(message.messageId); } - const dependenciesMet = message.causalHistory.every((historyEntry) => - this.localHistory.some( - ({ historyEntry: { messageId } }) => - messageId === historyEntry.messageId - ) + + const missingDependencies = message.causalHistory.filter( + (messageHistoryEntry) => + !this.localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === messageHistoryEntry.messageId + ) ); - if (!dependenciesMet) { + + if (missingDependencies.length > 0) { this.incomingBuffer.push(message); this.timeReceived.set(message.messageId, Date.now()); + log.info( + this.senderId, + message.messageId, + "is missing dependencies", + missingDependencies.map((ch) => ch.messageId) + ); } else { - this.deliverMessage(message); - this.safeSendEvent(MessageChannelEvent.MessageDelivered, { - detail: { - messageId: message.messageId, - sentOrReceived: "received" - } - }); + if (this.deliverMessage(message)) { + this.safeSendEvent(MessageChannelEvent.InMessageDelivered, { + detail: message.messageId + }); + } } } @@ -415,6 +446,9 @@ export class MessageChannel extends TypedEventEmitter { detail: { command: item.command, error, params: item.params } }) ); + this.safeSendEvent(MessageChannelEvent.ErrorTask, { + detail: { command: item.command, error, params: item.params } + }); } } @@ -429,7 +463,7 @@ export class MessageChannel extends TypedEventEmitter { } } - private async _sendMessage( + private async _pushOutgoingMessage( payload: Uint8Array, callback?: (message: Message) => Promise<{ success: boolean; @@ -440,20 +474,30 @@ export class MessageChannel extends TypedEventEmitter { const messageId = MessageChannel.getMessageId(payload); - this.outgoingMessages.add(messageId); + // if same message id is in the outgoing buffer, + // it means it's a retry, and we need to resend the same message + // to ensure we do not create a cyclic dependency of any sort. - const message: Message = { - messageId, - channelId: this.channelId, - lamportTimestamp: this.lamportTimestamp, - causalHistory: this.localHistory - .slice(-this.causalHistorySize) - .map(({ historyEntry }) => historyEntry), - bloomFilter: this.filter.toBytes(), - content: payload - }; + let message = this.outgoingBuffer.find( + (m: Message) => m.messageId === messageId + ); - this.outgoingBuffer.push(message); + // It's a new message + if (!message) { + message = new Message( + messageId, + this.channelId, + this.senderId, + this.localHistory + .slice(-this.causalHistorySize) + .map(({ historyEntry }) => historyEntry), + this.lamportTimestamp, + this.filter.toBytes(), + payload + ); + + this.outgoingBuffer.push(message); + } if (callback) { try { @@ -468,55 +512,80 @@ export class MessageChannel extends TypedEventEmitter { } }); this.timeReceived.set(messageId, Date.now()); - this.safeSendEvent(MessageChannelEvent.MessageSent, { + this.safeSendEvent(MessageChannelEvent.OutMessageSent, { detail: message }); } } catch (error) { - log.error("Callback execution failed in _sendMessage:", error); + log.error("Callback execution failed in _pushOutgoingMessage:", error); throw error; } } } - private async _sendEphemeralMessage( + private async _pushOutgoingEphemeralMessage( payload: Uint8Array, callback?: (message: Message) => Promise ): Promise { - const message: Message = { - messageId: MessageChannel.getMessageId(payload), - channelId: this.channelId, - content: payload, - lamportTimestamp: undefined, - causalHistory: [], - bloomFilter: undefined - }; + const message = new Message( + MessageChannel.getMessageId(payload), + this.channelId, + this.senderId, + [], + undefined, + undefined, + payload + ); if (callback) { try { await callback(message); } catch (error) { - log.error("Callback execution failed in _sendEphemeralMessage:", error); + log.error( + "Callback execution failed in _pushOutgoingEphemeralMessage:", + error + ); throw error; } } } + /** + * Return true if the message was "delivered" + * + * @param message + * @param retrievalHint + * @private + */ // See https://rfc.vac.dev/vac/raw/sds/#deliver-message - private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { - const messageLamportTimestamp = message.lamportTimestamp ?? 0; - if (messageLamportTimestamp > this.lamportTimestamp) { - this.lamportTimestamp = messageLamportTimestamp; - } - + private deliverMessage( + message: Message, + retrievalHint?: Uint8Array + ): boolean { if ( message.content?.length === 0 || message.lamportTimestamp === undefined ) { // Messages with empty content are sync messages. // Messages with no timestamp are ephemeral messages. + // They do not need to be "delivered". // They are not added to the local log or bloom filter. - return; + return false; + } + + log.info(this.senderId, "delivering message", message.messageId); + if (message.lamportTimestamp > this.lamportTimestamp) { + this.lamportTimestamp = message.lamportTimestamp; + } + + // Check if the entry is already present + const existingHistoryEntry = this.localHistory.find( + ({ historyEntry }) => historyEntry.messageId === message.messageId + ); + + // The history entry is already present, no need to re-add + if (existingHistoryEntry) { + return true; } // The participant MUST insert the message ID into its local log, @@ -525,7 +594,7 @@ export class MessageChannel extends TypedEventEmitter { // the participant MUST follow the Resolve Conflicts procedure. // https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts this.localHistory.push({ - timestamp: messageLamportTimestamp, + timestamp: message.lamportTimestamp, historyEntry: { messageId: message.messageId, retrievalHint @@ -537,25 +606,36 @@ export class MessageChannel extends TypedEventEmitter { } return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId); }); + return true; } // For each received message (including sync messages), inspect the causal history and bloom filter // to determine the acknowledgement status of messages in the outgoing buffer. // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status private reviewAckStatus(receivedMessage: Message): void { + log.info( + this.senderId, + "reviewing ack status using:", + receivedMessage.causalHistory.map((ch) => ch.messageId) + ); + log.info( + this.senderId, + "current outgoing buffer:", + this.outgoingBuffer.map((b) => b.messageId) + ); receivedMessage.causalHistory.forEach(({ messageId }) => { this.outgoingBuffer = this.outgoingBuffer.filter( ({ messageId: outgoingMessageId }) => { if (outgoingMessageId !== messageId) { return true; } - this.safeSendEvent(MessageChannelEvent.MessageAcknowledged, { + this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, { detail: messageId }); return false; } ); - this.acknowledgements.delete(messageId); + this.possibleAcks.delete(messageId); if (!this.filter.lookup(messageId)) { this.filter.insert(messageId); } @@ -574,10 +654,10 @@ export class MessageChannel extends TypedEventEmitter { // If a message appears as possibly acknowledged in multiple received bloom filters, // the participant MAY mark it as acknowledged based on probabilistic grounds, // taking into account the bloom filter size and hash number. - const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1; - if (count < this.acknowledgementCount) { - this.acknowledgements.set(message.messageId, count); - this.safeSendEvent(MessageChannelEvent.PartialAcknowledgement, { + const count = (this.possibleAcks.get(message.messageId) ?? 0) + 1; + if (count < this.possibleAcksThreshold) { + this.possibleAcks.set(message.messageId, count); + this.safeSendEvent(MessageChannelEvent.OutMessagePossiblyAcknowledged, { detail: { messageId: message.messageId, count @@ -585,13 +665,8 @@ export class MessageChannel extends TypedEventEmitter { }); return true; } - this.acknowledgements.delete(message.messageId); + this.possibleAcks.delete(message.messageId); return false; }); } - - // TODO: this should be determined based on the bloom filter parameters and number of hashes - private getAcknowledgementCount(): number { - return 2; - } }