feat!: SDS improvements and fixes (#2539)

* introduce `MessageId` type

# Conflicts:
#	packages/sds/src/message_channel/message_channel.ts

* fix: own messages are not used for ack

* fix: own messages are not used for ack

* doc: long term solution is SDS protocol change

* SDS: renaming to match message function

* SDS: introduce `Message` class for easier encoding/decoding

# Conflicts:
#	packages/sds/src/message_channel/events.ts
#	packages/sds/src/message_channel/message_channel.ts

* SDS Message is a class now

* SDS: it's "possibly" not "partially" acknowledged.

* SDS: TODO

* SDS: fix tests

* SDS: make logs start with `waku`

* SDS: add bloom filter test

# Conflicts:
#	packages/sds/src/message_channel/events.spec.ts

* SDS: improve naming

* SDS: improve naming

Messages are not "sent" or received, but pushed for processing in local queues.

* SDS: sync message should not be delivered

* SDS: renaming from earlier

* SDS: remove useless variable

* SDS: Fix comment

* SDS: sync messages do not get "delivered"

* SDS: acks

* SDS: simplify delivered event

* SDS: improve event naming

* SDS: fix comment

* SDS: make task error an official event

* SDS: Mark messages that are irretrievably lost

* SDS: remove default for irretrievable and simplify config

* SDS: typo on sync event

* SDS: add and user sender id

* SDS: resent message never get ack'd

* SDS: fix cylic dependencies

* SDS: helpful logs

* SDS: avoid duplicate history entries

* SDS: export options
This commit is contained in:
fryorcraken 2025-08-12 10:47:52 +10:00 committed by GitHub
parent 459fe96fe6
commit dc5155056b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 431 additions and 214 deletions

View File

@ -4,6 +4,7 @@
"language": "en",
"words": [
"abortable",
"acks",
"Addrs",
"ahadns",
"Alives",

View File

@ -81,6 +81,7 @@ export namespace HistoryEntry {
}
export interface SdsMessage {
senderId: string
messageId: string
channelId: string
lamportTimestamp?: number
@ -99,6 +100,11 @@ export namespace SdsMessage {
w.fork()
}
if ((obj.senderId != null && obj.senderId !== '')) {
w.uint32(10)
w.string(obj.senderId)
}
if ((obj.messageId != null && obj.messageId !== '')) {
w.uint32(18)
w.string(obj.messageId)
@ -136,6 +142,7 @@ export namespace SdsMessage {
}
}, (reader, length, opts = {}) => {
const obj: any = {
senderId: '',
messageId: '',
channelId: '',
causalHistory: []
@ -147,6 +154,10 @@ export namespace SdsMessage {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1: {
obj.senderId = reader.string()
break
}
case 2: {
obj.messageId = reader.string()
break

View File

@ -6,7 +6,7 @@ message HistoryEntry {
}
message SdsMessage {
// 1 Reserved for sender/participant id
string sender_id = 1; // Participant ID of the message sender
string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel

View File

@ -3,15 +3,15 @@ import { BloomFilter } from "./bloom_filter/bloom.js";
export {
MessageChannel,
MessageChannelEvent,
encodeMessage,
decodeMessage
MessageChannelOptions
} from "./message_channel/index.js";
export type {
export {
Message,
HistoryEntry,
ChannelId,
MessageChannelEvents
type HistoryEntry,
type ChannelId,
type MessageChannelEvents,
type SenderId
} from "./message_channel/index.js";
export { BloomFilter };

View File

@ -2,7 +2,7 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { decodeMessage, encodeMessage, Message } from "./events.js";
import { Message } from "./events.js";
import { DEFAULT_BLOOM_FILTER_OPTIONS } from "./message_channel.js";
describe("Message serialization", () => {
@ -12,16 +12,18 @@ describe("Message serialization", () => {
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
bloomFilter.insert(messageId);
const message: Message = {
messageId: "123",
channelId: "my-channel",
causalHistory: [],
lamportTimestamp: 0,
bloomFilter: bloomFilter.toBytes()
};
const message = new Message(
"123",
"my-channel",
"me",
[],
0,
bloomFilter.toBytes(),
undefined
);
const bytes = encodeMessage(message);
const decMessage = decodeMessage(bytes);
const bytes = message.encode();
const decMessage = Message.decode(bytes);
const decBloomFilter = DefaultBloomFilter.fromBytes(
decMessage!.bloomFilter!,

View File

@ -1,42 +1,72 @@
import { proto_sds_message } from "@waku/proto";
export enum MessageChannelEvent {
MessageSent = "messageSent",
MessageDelivered = "messageDelivered",
MessageReceived = "messageReceived",
MessageAcknowledged = "messageAcknowledged",
PartialAcknowledgement = "partialAcknowledgement",
MissedMessages = "missedMessages",
SyncSent = "syncSent",
SyncReceived = "syncReceived"
OutMessageSent = "sds:out:message-sent",
InMessageDelivered = "sds:in:message-delivered",
InMessageReceived = "sds:in:message-received",
OutMessageAcknowledged = "sds:out:message-acknowledged",
OutMessagePossiblyAcknowledged = "sds:out:message-possibly-acknowledged",
InMessageMissing = "sds:in:message-missing",
OutSyncSent = "sds:out:sync-sent",
InSyncReceived = "sds:in:sync-received",
InMessageIrretrievablyLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task"
}
export type MessageId = string;
export type Message = proto_sds_message.SdsMessage;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export type SenderId = string;
export function encodeMessage(message: Message): Uint8Array {
return proto_sds_message.SdsMessage.encode(message);
}
export class Message implements proto_sds_message.SdsMessage {
public constructor(
public messageId: string,
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp?: number | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined
) {}
export function decodeMessage(data: Uint8Array): Message {
return proto_sds_message.SdsMessage.decode(data);
public encode(): Uint8Array {
return proto_sds_message.SdsMessage.encode(this);
}
public static decode(data: Uint8Array): Message {
const {
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content
} = proto_sds_message.SdsMessage.decode(data);
return new Message(
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content
);
}
}
export type MessageChannelEvents = {
[MessageChannelEvent.MessageSent]: CustomEvent<Message>;
[MessageChannelEvent.MessageDelivered]: CustomEvent<{
messageId: MessageId;
sentOrReceived: "sent" | "received";
}>;
[MessageChannelEvent.MessageReceived]: CustomEvent<Message>;
[MessageChannelEvent.MessageAcknowledged]: CustomEvent<MessageId>;
[MessageChannelEvent.PartialAcknowledgement]: CustomEvent<{
[MessageChannelEvent.OutMessageSent]: CustomEvent<Message>;
[MessageChannelEvent.InMessageDelivered]: CustomEvent<MessageId>;
[MessageChannelEvent.InMessageReceived]: CustomEvent<Message>;
[MessageChannelEvent.OutMessageAcknowledged]: CustomEvent<MessageId>;
[MessageChannelEvent.OutMessagePossiblyAcknowledged]: CustomEvent<{
messageId: MessageId;
count: number;
}>;
[MessageChannelEvent.MissedMessages]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.SyncSent]: CustomEvent<Message>;
[MessageChannelEvent.SyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.InMessageMissing]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.InMessageIrretrievablyLost]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<any>;
};

View File

@ -3,7 +3,12 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { HistoryEntry, Message, MessageId } from "./events.js";
import {
HistoryEntry,
Message,
MessageChannelEvent,
MessageId
} from "./events.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
MessageChannel
@ -32,7 +37,7 @@ const sendMessage = async (
payload: Uint8Array,
callback: (message: Message) => Promise<{ success: boolean }>
): Promise<void> => {
await channel.sendMessage(payload, callback);
await channel.pushOutgoingMessage(payload, callback);
await channel.processTasks();
};
@ -40,7 +45,7 @@ const receiveMessage = async (
channel: MessageChannel,
message: Message
): Promise<void> => {
channel.receiveMessage(message);
channel.pushIncomingMessage(message);
await channel.processTasks();
};
@ -51,7 +56,7 @@ describe("MessageChannel", function () {
describe("sending a message ", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice");
});
it("should increase lamport timestamp", async () => {
@ -133,13 +138,13 @@ describe("MessageChannel", function () {
describe("receiving a message", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice");
channelB = new MessageChannel(channelId, "bob");
});
it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp;
await sendMessage(channelB, new Uint8Array(), async (message) => {
await sendMessage(channelB, utf8ToBytes("message"), async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
@ -241,8 +246,10 @@ describe("MessageChannel", function () {
describe("reviewing ack status", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should mark all messages in causal history as acknowledged", async () => {
@ -309,7 +316,7 @@ describe("MessageChannel", function () {
});
it("should track probabilistic acknowledgements of messages received in bloom filter", async () => {
const acknowledgementCount = (channelA as any).acknowledgementCount;
const possibleAcksThreshold = (channelA as any).possibleAcksThreshold;
const causalHistorySize = (channelA as any).causalHistorySize;
@ -341,8 +348,8 @@ describe("MessageChannel", function () {
}
);
const acknowledgements: ReadonlyMap<MessageId, number> = (channelA as any)
.acknowledgements;
const possibleAcks: ReadonlyMap<MessageId, number> = (channelA as any)
.possibleAcks;
// Other than the message IDs which were included in causal history,
// the remaining messages sent by channel A should be considered possibly acknowledged
// for having been included in the bloom filter sent from channel B
@ -350,24 +357,24 @@ describe("MessageChannel", function () {
if (expectedAcknowledgementsSize <= 0) {
throw new Error("expectedAcknowledgementsSize must be greater than 0");
}
expect(acknowledgements.size).to.equal(expectedAcknowledgementsSize);
expect(possibleAcks.size).to.equal(expectedAcknowledgementsSize);
// Channel B only included the last N messages in causal history
messages.slice(0, -causalHistorySize).forEach((m) => {
expect(
acknowledgements.get(MessageChannel.getMessageId(utf8ToBytes(m)))
possibleAcks.get(MessageChannel.getMessageId(utf8ToBytes(m)))
).to.equal(1);
});
// Messages that never reached channel B should not be acknowledged
unacknowledgedMessages.forEach((m) => {
expect(
acknowledgements.has(MessageChannel.getMessageId(utf8ToBytes(m)))
possibleAcks.has(MessageChannel.getMessageId(utf8ToBytes(m)))
).to.equal(false);
});
// When channel C sends more messages, it will include all the same messages
// in the bloom filter as before, which should mark them as fully acknowledged in channel A
for (let i = 1; i < acknowledgementCount; i++) {
for (let i = 1; i < possibleAcksThreshold; i++) {
// Send messages until acknowledgement count is reached
await sendMessage(channelB, utf8ToBytes(`x-${i}`), async (message) => {
await receiveMessage(channelA, message);
@ -375,8 +382,8 @@ describe("MessageChannel", function () {
});
}
// No more partial acknowledgements should be in channel A
expect(acknowledgements.size).to.equal(0);
// No more possible acknowledgements should be in channel A
expect(possibleAcks.size).to.equal(0);
// Messages that were not acknowledged should still be in the outgoing buffer
expect((channelA as any).outgoingBuffer.length).to.equal(
@ -400,17 +407,64 @@ describe("MessageChannel", function () {
});
}
const acknowledgements: ReadonlyMap<MessageId, number> = (channelA as any)
.acknowledgements;
const possibleAcks: ReadonlyMap<MessageId, number> = (channelA as any)
.possibleAcks;
expect(acknowledgements.size).to.equal(0);
expect(possibleAcks.size).to.equal(0);
});
it("First message is missed, then re-sent, should be ack'd", async () => {
const firstMessage = utf8ToBytes("first message");
const firstMessageId = MessageChannel.getMessageId(firstMessage);
console.log("firstMessage", firstMessageId);
let messageAcked = false;
channelA.addEventListener(
MessageChannelEvent.OutMessageAcknowledged,
(event) => {
if (firstMessageId === event.detail) {
messageAcked = true;
}
}
);
await sendMessage(channelA, firstMessage, callback);
const secondMessage = utf8ToBytes("second message");
await sendMessage(channelA, secondMessage, async (message) => {
await receiveMessage(channelB, message);
return { success: true };
});
const thirdMessage = utf8ToBytes("third message");
await sendMessage(channelB, thirdMessage, async (message) => {
await receiveMessage(channelA, message);
return { success: true };
});
expect(messageAcked).to.be.false;
// Now, A resends first message, and B is receiving it.
await sendMessage(channelA, firstMessage, async (message) => {
await receiveMessage(channelB, message);
return { success: true };
});
// And be sends a sync message
await channelB.pushOutgoingSyncMessage(async (message) => {
await receiveMessage(channelA, message);
return true;
});
expect(messageAcked).to.be.true;
});
});
describe("Sweeping incoming buffer", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should detect messages with missing dependencies", async () => {
@ -490,12 +544,54 @@ describe("MessageChannel", function () {
expect(incomingBuffer.length).to.equal(0);
});
it("should mark a message as irretrievably lost if timeout is exceeded", async () => {
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
timeoutToMarkMessageIrretrievableMs: 10
});
for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), callback);
}
let irretrievablyLost = false;
const messageToBeLostId = MessageChannel.getMessageId(
utf8ToBytes(messagesA[0])
);
channelC.addEventListener(
MessageChannelEvent.InMessageIrretrievablyLost,
(event) => {
for (const hist of event.detail) {
if (hist.messageId === messageToBeLostId) {
irretrievablyLost = true;
}
}
}
);
await sendMessage(
channelA,
utf8ToBytes(messagesB[0]),
async (message) => {
await receiveMessage(channelC, message);
return { success: true };
}
);
channelC.sweepIncomingBuffer();
await new Promise((resolve) => setTimeout(resolve, 20));
channelC.sweepIncomingBuffer();
expect(irretrievablyLost).to.be.true;
});
it("should remove messages without delivering if timeout is exceeded", async () => {
const causalHistorySize = (channelA as any).causalHistorySize;
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, {
receivedMessageTimeoutEnabled: true,
receivedMessageTimeout: 10
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
timeoutToMarkMessageIrretrievableMs: 10
});
for (const m of messagesA) {
@ -526,15 +622,15 @@ describe("MessageChannel", function () {
describe("Sweeping outgoing buffer", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should partition messages based on acknowledgement status", async () => {
const unacknowledgedMessages: Message[] = [];
for (const m of messagesA) {
await sendMessage(channelA, utf8ToBytes(m), async (message) => {
unacknowledgedMessages.push(message);
await receiveMessage(channelB, message);
return { success: true };
});
@ -571,19 +667,21 @@ describe("MessageChannel", function () {
describe("Sync messages", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelB = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should be sent with empty content", async () => {
await channelA.sendSyncMessage(async (message) => {
await channelA.pushOutgoingSyncMessage(async (message) => {
expect(message.content?.length).to.equal(0);
return true;
});
});
it("should not be added to outgoing buffer, bloom filter, or local log", async () => {
await channelA.sendSyncMessage();
await channelA.pushOutgoingSyncMessage();
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(0);
@ -600,17 +698,14 @@ describe("MessageChannel", function () {
expect(localLog.length).to.equal(0);
});
it("should be delivered but not added to local log or bloom filter", async () => {
it("should not be delivered", async () => {
const timestampBefore = (channelB as any).lamportTimestamp;
let expectedTimestamp: number | undefined;
await channelA.sendSyncMessage(async (message) => {
expectedTimestamp = message.lamportTimestamp;
await channelA.pushOutgoingSyncMessage(async (message) => {
await receiveMessage(channelB, message);
return true;
});
const timestampAfter = (channelB as any).lamportTimestamp;
expect(timestampAfter).to.equal(expectedTimestamp);
expect(timestampAfter).to.be.greaterThan(timestampBefore);
expect(timestampAfter).to.equal(timestampBefore);
const localLog = (channelB as any).localHistory as {
timestamp: number;
@ -647,17 +742,20 @@ describe("MessageChannel", function () {
describe("Ephemeral messages", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId);
channelA = new MessageChannel(channelId, "alice");
});
it("should be sent without a timestamp, causal history, or bloom filter", async () => {
const timestampBefore = (channelA as any).lamportTimestamp;
await channelA.sendEphemeralMessage(new Uint8Array(), async (message) => {
await channelA.pushOutgoingEphemeralMessage(
new Uint8Array(),
async (message) => {
expect(message.lamportTimestamp).to.equal(undefined);
expect(message.causalHistory).to.deep.equal([]);
expect(message.bloomFilter).to.equal(undefined);
return true;
});
}
);
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
expect(outgoingBuffer.length).to.equal(0);
@ -667,14 +765,14 @@ describe("MessageChannel", function () {
});
it("should be delivered immediately if received", async () => {
const channelB = new MessageChannel(channelId);
const channelB = new MessageChannel(channelId, "bob");
// Track initial state
const localHistoryBefore = (channelB as any).localHistory.length;
const incomingBufferBefore = (channelB as any).incomingBuffer.length;
const timestampBefore = (channelB as any).lamportTimestamp;
await channelA.sendEphemeralMessage(
await channelA.pushOutgoingEphemeralMessage(
utf8ToBytes(messagesA[0]),
async (message) => {
// Ephemeral messages should have no timestamp

View File

@ -7,12 +7,13 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
import {
ChannelId,
HistoryEntry,
type ChannelId,
type HistoryEntry,
Message,
MessageChannelEvent,
MessageChannelEvents,
type MessageId
type MessageId,
type SenderId
} from "./events.js";
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
@ -21,72 +22,81 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
};
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes
const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2;
const log = new Logger("sds:message-channel");
const log = new Logger("waku:sds:message-channel");
interface MessageChannelOptions {
export interface MessageChannelOptions {
causalHistorySize?: number;
receivedMessageTimeoutEnabled?: boolean;
receivedMessageTimeout?: number;
/**
* The time in milliseconds after which a message dependencies that could not
* be resolved is marked as irretrievable.
* Disabled if undefined or `0`.
*
* @default undefined because it is coupled to processTask calls frequency
*/
timeoutToMarkMessageIrretrievableMs?: number;
/**
* How many possible acks does it take to consider it a definitive ack.
*/
possibleAcksThreshold?: number;
}
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
public readonly senderId: SenderId;
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
private acknowledgements: Map<MessageId, number>;
private possibleAcks: Map<MessageId, number>;
private incomingBuffer: Message[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
private timeReceived: Map<MessageId, number>;
// TODO: To be removed once sender id is added to SDS protocol
private outgoingMessages: Set<MessageId>;
private readonly causalHistorySize: number;
private readonly acknowledgementCount: number;
private readonly receivedMessageTimeoutEnabled: boolean;
private readonly receivedMessageTimeout: number;
private readonly possibleAcksThreshold: number;
private readonly timeoutToMarkMessageIrretrievableMs?: number;
private tasks: Task[] = [];
private handlers: Handlers = {
[Command.Send]: async (
params: ParamsByAction[Command.Send]
): Promise<void> => {
await this._sendMessage(params.payload, params.callback);
await this._pushOutgoingMessage(params.payload, params.callback);
},
[Command.Receive]: async (
params: ParamsByAction[Command.Receive]
): Promise<void> => {
this._receiveMessage(params.message);
this._pushIncomingMessage(params.message);
},
[Command.SendEphemeral]: async (
params: ParamsByAction[Command.SendEphemeral]
): Promise<void> => {
await this._sendEphemeralMessage(params.payload, params.callback);
await this._pushOutgoingEphemeralMessage(params.payload, params.callback);
}
};
public constructor(
channelId: ChannelId,
senderId: SenderId,
options: MessageChannelOptions = {}
) {
super();
this.channelId = channelId;
this.senderId = senderId;
this.lamportTimestamp = 0;
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
this.outgoingBuffer = [];
this.acknowledgements = new Map();
this.possibleAcks = new Map();
this.incomingBuffer = [];
this.localHistory = [];
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
this.acknowledgementCount = this.getAcknowledgementCount();
// TODO: this should be determined based on the bloom filter parameters and number of hashes
this.possibleAcksThreshold =
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
this.timeReceived = new Map();
this.receivedMessageTimeoutEnabled =
options.receivedMessageTimeoutEnabled ?? false;
this.receivedMessageTimeout =
options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT;
this.outgoingMessages = new Set();
this.timeoutToMarkMessageIrretrievableMs =
options.timeoutToMarkMessageIrretrievableMs;
}
public static getMessageId(payload: Uint8Array): MessageId {
@ -104,8 +114,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* const channel = new MessageChannel("my-channel");
*
* // Queue some operations
* await channel.sendMessage(payload, callback);
* channel.receiveMessage(incomingMessage);
* await channel.pushOutgoingMessage(payload, callback);
* channel.pushIncomingMessage(incomingMessage);
*
* // Process all queued operations
* await channel.processTasks();
@ -139,7 +149,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* const channel = new MessageChannel("chat-room");
* const message = new TextEncoder().encode("Hello, world!");
*
* await channel.sendMessage(message, async (processedMessage) => {
* await channel.pushOutgoingMessage(message, async (processedMessage) => {
* console.log("Message processed:", processedMessage.messageId);
* return { success: true };
* });
@ -148,9 +158,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* await channel.processTasks();
* ```
*/
public async sendMessage(
public async pushOutgoingMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
callback?: (processedMessage: Message) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
@ -177,9 +187,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* @param payload - The payload to send.
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public async sendEphemeralMessage(
public async pushOutgoingEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
callback?: (processedMessage: Message) => Promise<boolean>
): Promise<void> {
this.tasks.push({
command: Command.SendEphemeral,
@ -203,13 +213,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* const channel = new MessageChannel("chat-room");
*
* // Receive a message from the network
* channel.receiveMessage(incomingMessage);
* channel.pushIncomingMessage(incomingMessage);
*
* // Process the received message
* await channel.processTasks();
* ```
*/
public receiveMessage(message: Message): void {
public pushIncomingMessage(message: Message): void {
this.tasks.push({
command: Command.Receive,
params: {
@ -229,6 +239,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
missing: Set<HistoryEntry>;
}>(
({ buffer, missing }, message) => {
log.info(
this.senderId,
"sweeping incoming buffer",
message.messageId,
message.causalHistory.map((ch) => ch.messageId)
);
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
@ -237,24 +253,31 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
)
);
if (missingDependencies.length === 0) {
this.deliverMessage(message);
this.safeSendEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
if (this.deliverMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
detail: message.messageId
});
}
return { buffer, missing };
}
log.info(
this.senderId,
message.messageId,
"is missing dependencies",
missingDependencies.map((ch) => ch.messageId)
);
// Optionally, if a message has not been received after a predetermined amount of time,
// it is marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
if (this.receivedMessageTimeoutEnabled) {
// its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
if (this.timeoutToMarkMessageIrretrievableMs) {
const timeReceived = this.timeReceived.get(message.messageId);
if (
timeReceived &&
Date.now() - timeReceived > this.receivedMessageTimeout
Date.now() - timeReceived > this.timeoutToMarkMessageIrretrievableMs
) {
this.safeSendEvent(MessageChannelEvent.InMessageIrretrievablyLost, {
detail: Array.from(missingDependencies)
});
return { buffer, missing };
}
}
@ -270,7 +293,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
);
this.incomingBuffer = buffer;
this.safeSendEvent(MessageChannelEvent.MissedMessages, {
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
detail: Array.from(missing)
});
@ -287,7 +310,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
possiblyAcknowledged: Message[];
}>(
({ unacknowledged, possiblyAcknowledged }, message) => {
if (this.acknowledgements.has(message.messageId)) {
if (this.possibleAcks.has(message.messageId)) {
return {
unacknowledged,
possiblyAcknowledged: possiblyAcknowledged.concat(message)
@ -315,40 +338,44 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
*
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public async sendSyncMessage(
public async pushOutgoingSyncMessage(
callback?: (message: Message) => Promise<boolean>
): Promise<boolean> {
this.lamportTimestamp++;
const emptyMessage = new Uint8Array();
const message: Message = {
messageId: MessageChannel.getMessageId(emptyMessage),
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.localHistory
const message = new Message(
MessageChannel.getMessageId(emptyMessage),
this.channelId,
this.senderId,
this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: emptyMessage
};
this.lamportTimestamp,
this.filter.toBytes(),
emptyMessage
);
if (callback) {
try {
await callback(message);
this.safeSendEvent(MessageChannelEvent.SyncSent, {
this.safeSendEvent(MessageChannelEvent.OutSyncSent, {
detail: message
});
return true;
} catch (error) {
log.error("Callback execution failed in sendSyncMessage:", error);
log.error(
"Callback execution failed in pushOutgoingSyncMessage:",
error
);
throw error;
}
}
return false;
}
private _receiveMessage(message: Message): void {
private _pushIncomingMessage(message: Message): void {
const isDuplicate =
message.content &&
message.content.length > 0 &&
@ -358,25 +385,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return;
}
const isOwnOutgoingMessage =
message.content &&
message.content.length > 0 &&
this.outgoingMessages.has(MessageChannel.getMessageId(message.content));
const isOwnOutgoingMessage = this.senderId === message.senderId;
if (isOwnOutgoingMessage) {
return;
}
// Ephemeral messages SHOULD be delivered immediately
if (!message.lamportTimestamp) {
this.deliverMessage(message);
return;
}
if (message.content?.length === 0) {
this.safeSendEvent(MessageChannelEvent.SyncReceived, {
this.safeSendEvent(MessageChannelEvent.InSyncReceived, {
detail: message
});
} else {
this.safeSendEvent(MessageChannelEvent.MessageReceived, {
this.safeSendEvent(MessageChannelEvent.InMessageReceived, {
detail: message
});
}
@ -384,25 +408,32 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
if (message.content?.length && message.content.length > 0) {
this.filter.insert(message.messageId);
}
const dependenciesMet = message.causalHistory.every((historyEntry) =>
this.localHistory.some(
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === historyEntry.messageId
messageId === messageHistoryEntry.messageId
)
);
if (!dependenciesMet) {
if (missingDependencies.length > 0) {
this.incomingBuffer.push(message);
this.timeReceived.set(message.messageId, Date.now());
log.info(
this.senderId,
message.messageId,
"is missing dependencies",
missingDependencies.map((ch) => ch.messageId)
);
} else {
this.deliverMessage(message);
this.safeSendEvent(MessageChannelEvent.MessageDelivered, {
detail: {
messageId: message.messageId,
sentOrReceived: "received"
}
if (this.deliverMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
detail: message.messageId
});
}
}
}
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
try {
@ -415,6 +446,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
detail: { command: item.command, error, params: item.params }
})
);
this.safeSendEvent(MessageChannelEvent.ErrorTask, {
detail: { command: item.command, error, params: item.params }
});
}
}
@ -429,7 +463,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
}
private async _sendMessage(
private async _pushOutgoingMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
success: boolean;
@ -440,20 +474,30 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const messageId = MessageChannel.getMessageId(payload);
this.outgoingMessages.add(messageId);
// if same message id is in the outgoing buffer,
// it means it's a retry, and we need to resend the same message
// to ensure we do not create a cyclic dependency of any sort.
const message: Message = {
let message = this.outgoingBuffer.find(
(m: Message) => m.messageId === messageId
);
// It's a new message
if (!message) {
message = new Message(
messageId,
channelId: this.channelId,
lamportTimestamp: this.lamportTimestamp,
causalHistory: this.localHistory
this.channelId,
this.senderId,
this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
bloomFilter: this.filter.toBytes(),
content: payload
};
this.lamportTimestamp,
this.filter.toBytes(),
payload
);
this.outgoingBuffer.push(message);
}
if (callback) {
try {
@ -468,55 +512,80 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
});
this.timeReceived.set(messageId, Date.now());
this.safeSendEvent(MessageChannelEvent.MessageSent, {
this.safeSendEvent(MessageChannelEvent.OutMessageSent, {
detail: message
});
}
} catch (error) {
log.error("Callback execution failed in _sendMessage:", error);
log.error("Callback execution failed in _pushOutgoingMessage:", error);
throw error;
}
}
}
private async _sendEphemeralMessage(
private async _pushOutgoingEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
): Promise<void> {
const message: Message = {
messageId: MessageChannel.getMessageId(payload),
channelId: this.channelId,
content: payload,
lamportTimestamp: undefined,
causalHistory: [],
bloomFilter: undefined
};
const message = new Message(
MessageChannel.getMessageId(payload),
this.channelId,
this.senderId,
[],
undefined,
undefined,
payload
);
if (callback) {
try {
await callback(message);
} catch (error) {
log.error("Callback execution failed in _sendEphemeralMessage:", error);
log.error(
"Callback execution failed in _pushOutgoingEphemeralMessage:",
error
);
throw error;
}
}
}
/**
* Return true if the message was "delivered"
*
* @param message
* @param retrievalHint
* @private
*/
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(message: Message, retrievalHint?: Uint8Array): void {
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
if (messageLamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = messageLamportTimestamp;
}
private deliverMessage(
message: Message,
retrievalHint?: Uint8Array
): boolean {
if (
message.content?.length === 0 ||
message.lamportTimestamp === undefined
) {
// Messages with empty content are sync messages.
// Messages with no timestamp are ephemeral messages.
// They do not need to be "delivered".
// They are not added to the local log or bloom filter.
return;
return false;
}
log.info(this.senderId, "delivering message", message.messageId);
if (message.lamportTimestamp > this.lamportTimestamp) {
this.lamportTimestamp = message.lamportTimestamp;
}
// Check if the entry is already present
const existingHistoryEntry = this.localHistory.find(
({ historyEntry }) => historyEntry.messageId === message.messageId
);
// The history entry is already present, no need to re-add
if (existingHistoryEntry) {
return true;
}
// The participant MUST insert the message ID into its local log,
@ -525,7 +594,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// the participant MUST follow the Resolve Conflicts procedure.
// https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts
this.localHistory.push({
timestamp: messageLamportTimestamp,
timestamp: message.lamportTimestamp,
historyEntry: {
messageId: message.messageId,
retrievalHint
@ -537,25 +606,36 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId);
});
return true;
}
// For each received message (including sync messages), inspect the causal history and bloom filter
// to determine the acknowledgement status of messages in the outgoing buffer.
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
private reviewAckStatus(receivedMessage: Message): void {
log.info(
this.senderId,
"reviewing ack status using:",
receivedMessage.causalHistory.map((ch) => ch.messageId)
);
log.info(
this.senderId,
"current outgoing buffer:",
this.outgoingBuffer.map((b) => b.messageId)
);
receivedMessage.causalHistory.forEach(({ messageId }) => {
this.outgoingBuffer = this.outgoingBuffer.filter(
({ messageId: outgoingMessageId }) => {
if (outgoingMessageId !== messageId) {
return true;
}
this.safeSendEvent(MessageChannelEvent.MessageAcknowledged, {
this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, {
detail: messageId
});
return false;
}
);
this.acknowledgements.delete(messageId);
this.possibleAcks.delete(messageId);
if (!this.filter.lookup(messageId)) {
this.filter.insert(messageId);
}
@ -574,10 +654,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// If a message appears as possibly acknowledged in multiple received bloom filters,
// the participant MAY mark it as acknowledged based on probabilistic grounds,
// taking into account the bloom filter size and hash number.
const count = (this.acknowledgements.get(message.messageId) ?? 0) + 1;
if (count < this.acknowledgementCount) {
this.acknowledgements.set(message.messageId, count);
this.safeSendEvent(MessageChannelEvent.PartialAcknowledgement, {
const count = (this.possibleAcks.get(message.messageId) ?? 0) + 1;
if (count < this.possibleAcksThreshold) {
this.possibleAcks.set(message.messageId, count);
this.safeSendEvent(MessageChannelEvent.OutMessagePossiblyAcknowledged, {
detail: {
messageId: message.messageId,
count
@ -585,13 +665,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
});
return true;
}
this.acknowledgements.delete(message.messageId);
this.possibleAcks.delete(message.messageId);
return false;
});
}
// TODO: this should be determined based on the bloom filter parameters and number of hashes
private getAcknowledgementCount(): number {
return 2;
}
}