fix!: SDS acknowledgements (#2549)

* SDS: change default causal history size to 200

# Conflicts:
#	packages/sds/src/index.ts
#	packages/sds/src/message_channel/message_channel.ts

* SDS: add some comments

* SDS: segregate messages types, introduce LocalHistory

* SDS: fix miss-acks

* SDS: logs and more explicit variable names

* SDS: shorten event name

* SDS: shorten var name

* SDS: move message classes to own file.

* SDS: use lodash instead of custom SortedArray implementation

* SDS: move Message to own file

* SDS: add comparison tests
This commit is contained in:
fryorcraken 2025-08-14 10:44:18 +10:00 committed by GitHub
parent de972d6694
commit c161b37d08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 516 additions and 211 deletions

3
package-lock.json generated
View File

@ -37730,7 +37730,8 @@
"@noble/hashes": "^1.7.1",
"@waku/proto": "^0.0.12",
"@waku/utils": "^0.0.25",
"chai": "^5.1.2"
"chai": "^5.1.2",
"lodash": "^4.17.21"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",

View File

@ -64,7 +64,8 @@
"@noble/hashes": "^1.7.1",
"@waku/proto": "^0.0.12",
"@waku/utils": "^0.0.25",
"chai": "^5.1.2"
"chai": "^5.1.2",
"lodash": "^4.17.21"
},
"devDependencies": {
"@rollup/plugin-commonjs": "^25.0.7",

View File

@ -3,11 +3,14 @@ import { BloomFilter } from "./bloom_filter/bloom.js";
export {
MessageChannel,
MessageChannelEvent,
MessageChannelOptions
} from "./message_channel/index.js";
export {
MessageChannelOptions,
isContentMessage,
isSyncMessage,
isEphemeralMessage,
Message,
ContentMessage,
SyncMessage,
EphemeralMessage,
type HistoryEntry,
type ChannelId,
type MessageChannelEvents,

View File

@ -1,4 +1,4 @@
import type { Message } from "./events.js";
import { ContentMessage, EphemeralMessage, Message } from "./message.js";
export enum Command {
Send = "send",
@ -9,7 +9,7 @@ export enum Command {
export interface ParamsByAction {
[Command.Send]: {
payload: Uint8Array;
callback?: (message: Message) => Promise<{
callback?: (message: ContentMessage) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>;
@ -19,7 +19,7 @@ export interface ParamsByAction {
};
[Command.SendEphemeral]: {
payload: Uint8Array;
callback?: (message: Message) => Promise<boolean>;
callback?: (message: EphemeralMessage) => Promise<boolean>;
};
}

View File

@ -1,35 +0,0 @@
import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { Message } from "./events.js";
import { DEFAULT_BLOOM_FILTER_OPTIONS } from "./message_channel.js";
describe("Message serialization", () => {
it("Bloom filter", () => {
const messageId = "first";
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
bloomFilter.insert(messageId);
const message = new Message(
"123",
"my-channel",
"me",
[],
0,
bloomFilter.toBytes(),
undefined
);
const bytes = message.encode();
const decMessage = Message.decode(bytes);
const decBloomFilter = DefaultBloomFilter.fromBytes(
decMessage!.bloomFilter!,
DEFAULT_BLOOM_FILTER_OPTIONS
);
expect(decBloomFilter.lookup(messageId)).to.be.true;
});
});

View File

@ -1,4 +1,4 @@
import { proto_sds_message } from "@waku/proto";
import { HistoryEntry, Message, MessageId } from "./message.js";
export enum MessageChannelEvent {
OutMessageSent = "sds:out:message-sent",
@ -9,52 +9,10 @@ export enum MessageChannelEvent {
InMessageMissing = "sds:in:message-missing",
OutSyncSent = "sds:out:sync-sent",
InSyncReceived = "sds:in:sync-received",
InMessageIrretrievablyLost = "sds:in:message-irretrievably-lost",
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task"
}
export type MessageId = string;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export type SenderId = string;
export class Message implements proto_sds_message.SdsMessage {
public constructor(
public messageId: string,
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp?: number | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined
) {}
public encode(): Uint8Array {
return proto_sds_message.SdsMessage.encode(this);
}
public static decode(data: Uint8Array): Message {
const {
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content
} = proto_sds_message.SdsMessage.decode(data);
return new Message(
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content
);
}
}
export type MessageChannelEvents = {
[MessageChannelEvent.OutMessageSent]: CustomEvent<Message>;
[MessageChannelEvent.InMessageDelivered]: CustomEvent<MessageId>;
@ -65,7 +23,7 @@ export type MessageChannelEvents = {
count: number;
}>;
[MessageChannelEvent.InMessageMissing]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.InMessageIrretrievablyLost]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.InMessageLost]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<any>;

View File

@ -1,3 +1,16 @@
export * from "./command_queue.js";
export * from "./events.js";
export * from "./message_channel.js";
export {
ChannelId,
ContentMessage,
EphemeralMessage,
HistoryEntry,
Message,
MessageId,
SenderId,
SyncMessage,
isContentMessage,
isEphemeralMessage,
isSyncMessage
} from "./message.js";

View File

@ -0,0 +1,66 @@
import _ from "lodash";
import { ContentMessage, isContentMessage } from "./message.js";
/**
* In-Memory implementation of a local store of messages.
*
* Messages are store in SDS chronological order:
* - messages[0] is the oldest message
* - messages[n] is the newest message
*
* Only stores content message: `message.lamportTimestamp` and `message.content` are present.
*/
export class MemLocalHistory {
private items: ContentMessage[] = [];
public get length(): number {
return this.items.length;
}
public push(...items: ContentMessage[]): number {
for (const item of items) {
this.validateMessage(item);
}
// Add new items and ensure uniqueness by messageId using sortedUniqBy
// The valueOf() method on ContentMessage enables native < operator sorting
this.items = _.sortedUniqBy([...this.items, ...items], "messageId");
return this.items.length;
}
public some(
predicate: (
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean {
return this.items.some(predicate, thisArg);
}
public slice(start?: number, end?: number): ContentMessage[] {
return this.items.slice(start, end);
}
public find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined {
return this.items.find(predicate, thisArg);
}
private validateMessage(message: ContentMessage): void {
if (!isContentMessage(message)) {
throw new Error(
"Message must have lamportTimestamp and content defined, sync and ephemeral messages cannot be stored"
);
}
}
}

View File

@ -0,0 +1,88 @@
import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { ContentMessage, Message } from "./message.js";
import { DEFAULT_BLOOM_FILTER_OPTIONS } from "./message_channel.js";
describe("Message serialization", () => {
it("Bloom filter", () => {
const messageId = "first";
const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
bloomFilter.insert(messageId);
const message = new Message(
"123",
"my-channel",
"me",
[],
0,
bloomFilter.toBytes(),
undefined
);
const bytes = message.encode();
const decMessage = Message.decode(bytes);
const decBloomFilter = DefaultBloomFilter.fromBytes(
decMessage!.bloomFilter!,
DEFAULT_BLOOM_FILTER_OPTIONS
);
expect(decBloomFilter.lookup(messageId)).to.be.true;
});
});
describe("ContentMessage comparison with < operator", () => {
it("should sort by lamportTimestamp when timestamps differ", () => {
const msgA = new ContentMessage(
"zzz", // Higher messageId
"channel",
"sender",
[],
100, // Lower timestamp
undefined,
new Uint8Array([1])
);
const msgB = new ContentMessage(
"aaa", // Lower messageId
"channel",
"sender",
[],
200, // Higher timestamp
undefined,
new Uint8Array([2])
);
// Despite msgA having higher messageId, it should be < msgB due to lower timestamp
expect(msgA < msgB).to.be.true;
expect(msgB < msgA).to.be.false;
});
it("should sort by messageId when timestamps are equal", () => {
const msgA = new ContentMessage(
"aaa", // Lower messageId
"channel",
"sender",
[],
100, // Same timestamp
undefined,
new Uint8Array([1])
);
const msgB = new ContentMessage(
"zzz", // Higher messageId
"channel",
"sender",
[],
100, // Same timestamp
undefined,
new Uint8Array([2])
);
expect(msgA < msgB).to.be.true;
expect(msgB < msgA).to.be.false;
});
});

View File

@ -0,0 +1,175 @@
import { proto_sds_message } from "@waku/proto";
export type MessageId = string;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export type SenderId = string;
export class Message implements proto_sds_message.SdsMessage {
public constructor(
public messageId: string,
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp?: number | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined,
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
public retrievalHint?: Uint8Array | undefined
) {}
public encode(): Uint8Array {
return proto_sds_message.SdsMessage.encode(this);
}
public static decode(data: Uint8Array): Message {
const {
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content
} = proto_sds_message.SdsMessage.decode(data);
return new Message(
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content
);
}
}
export class SyncMessage extends Message {
public constructor(
public messageId: string,
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: number,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: undefined,
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
public retrievalHint?: Uint8Array | undefined
) {
super(
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content,
retrievalHint
);
}
}
export function isSyncMessage(
message: Message | ContentMessage | SyncMessage | EphemeralMessage
): message is SyncMessage {
return Boolean(
"lamportTimestamp" in message &&
typeof message.lamportTimestamp === "number" &&
(message.content === undefined || message.content.length === 0)
);
}
export class EphemeralMessage extends Message {
public constructor(
public messageId: string,
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: undefined,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
public retrievalHint?: Uint8Array | undefined
) {
if (!content || !content.length) {
throw Error("Ephemeral Message must have content");
}
super(
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content,
retrievalHint
);
}
}
export function isEphemeralMessage(
message: Message | ContentMessage | SyncMessage | EphemeralMessage
): message is EphemeralMessage {
return Boolean(
message.lamportTimestamp === undefined &&
"content" in message &&
message.content &&
message.content.length
);
}
export class ContentMessage extends Message {
public constructor(
public messageId: string,
public channelId: string,
public senderId: string,
public causalHistory: proto_sds_message.HistoryEntry[],
public lamportTimestamp: number,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
public retrievalHint?: Uint8Array | undefined
) {
if (!content.length) {
throw Error("Content Message must have content");
}
super(
messageId,
channelId,
senderId,
causalHistory,
lamportTimestamp,
bloomFilter,
content,
retrievalHint
);
}
// `valueOf` is used by comparison operands such as `<`
public valueOf(): string {
// Create a sortable string representation that matches the compare logic
// Pad lamportTimestamp to ensure proper lexicographic ordering
// Use 16 digits to handle up to Number.MAX_SAFE_INTEGER (9007199254740991)
const paddedTimestamp = this.lamportTimestamp.toString().padStart(16, "0");
return `${paddedTimestamp}_${this.messageId}`;
}
}
export function isContentMessage(
message: Message | ContentMessage
): message is ContentMessage {
return Boolean(
"lamportTimestamp" in message &&
typeof message.lamportTimestamp === "number" &&
message.content &&
message.content.length
);
}

View File

@ -3,14 +3,17 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { MessageChannelEvent } from "./events.js";
import {
ContentMessage,
HistoryEntry,
Message,
MessageChannelEvent,
MessageId
} from "./events.js";
MessageId,
SyncMessage
} from "./message.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
ILocalHistory,
MessageChannel
} from "./message_channel.js";
@ -35,12 +38,20 @@ const messagesB = [
const sendMessage = async (
channel: MessageChannel,
payload: Uint8Array,
callback: (message: Message) => Promise<{ success: boolean }>
callback: (message: ContentMessage) => Promise<{ success: boolean }>
): Promise<void> => {
await channel.pushOutgoingMessage(payload, callback);
await channel.processTasks();
};
const sendSyncMessage = async (
channel: MessageChannel,
callback: (message: SyncMessage) => Promise<boolean>
): Promise<void> => {
await channel.pushOutgoingSyncMessage(callback);
await channel.processTasks();
};
const receiveMessage = async (
channel: MessageChannel,
message: Message
@ -61,39 +72,38 @@ describe("MessageChannel", function () {
it("should increase lamport timestamp", async () => {
const timestampBefore = (channelA as any).lamportTimestamp;
await sendMessage(channelA, new Uint8Array(), callback);
await sendMessage(channelA, utf8ToBytes("message"), callback);
const timestampAfter = (channelA as any).lamportTimestamp;
expect(timestampAfter).to.equal(timestampBefore + 1);
});
it("should push the message to the outgoing buffer", async () => {
const bufferLengthBefore = (channelA as any).outgoingBuffer.length;
await sendMessage(channelA, new Uint8Array(), callback);
await sendMessage(channelA, utf8ToBytes("message"), callback);
const bufferLengthAfter = (channelA as any).outgoingBuffer.length;
expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1);
});
it("should insert message into bloom filter", async () => {
const messageId = MessageChannel.getMessageId(new Uint8Array());
await sendMessage(channelA, new Uint8Array(), callback);
const payload = utf8ToBytes("message");
const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback);
const bloomFilter = getBloomFilter(channelA);
expect(bloomFilter.lookup(messageId)).to.equal(true);
});
it("should insert message id into causal history", async () => {
const payload = utf8ToBytes("message");
const expectedTimestamp = (channelA as any).lamportTimestamp + 1;
const messageId = MessageChannel.getMessageId(new Uint8Array());
await sendMessage(channelA, new Uint8Array(), callback);
const messageIdLog = (channelA as any).localHistory as {
timestamp: number;
historyEntry: HistoryEntry;
}[];
const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback);
const messageIdLog = (channelA as any).localHistory as ILocalHistory;
expect(messageIdLog.length).to.equal(1);
expect(
messageIdLog.some(
(log) =>
log.timestamp === expectedTimestamp &&
log.historyEntry.messageId === messageId
log.lamportTimestamp === expectedTimestamp &&
log.messageId === messageId
)
).to.equal(true);
});
@ -547,7 +557,7 @@ describe("MessageChannel", function () {
it("should mark a message as irretrievably lost if timeout is exceeded", async () => {
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
timeoutToMarkMessageIrretrievableMs: 10
timeoutForLostMessagesMs: 10
});
for (const m of messagesA) {
@ -558,16 +568,13 @@ describe("MessageChannel", function () {
const messageToBeLostId = MessageChannel.getMessageId(
utf8ToBytes(messagesA[0])
);
channelC.addEventListener(
MessageChannelEvent.InMessageIrretrievablyLost,
(event) => {
channelC.addEventListener(MessageChannelEvent.InMessageLost, (event) => {
for (const hist of event.detail) {
if (hist.messageId === messageToBeLostId) {
irretrievablyLost = true;
}
}
}
);
});
await sendMessage(
channelA,
@ -591,7 +598,7 @@ describe("MessageChannel", function () {
const causalHistorySize = (channelA as any).causalHistorySize;
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
timeoutToMarkMessageIrretrievableMs: 10
timeoutForLostMessagesMs: 10
});
for (const m of messagesA) {
@ -675,7 +682,7 @@ describe("MessageChannel", function () {
it("should be sent with empty content", async () => {
await channelA.pushOutgoingSyncMessage(async (message) => {
expect(message.content?.length).to.equal(0);
expect(message.content).to.be.undefined;
return true;
});
});
@ -727,9 +734,9 @@ describe("MessageChannel", function () {
});
}
await sendMessage(channelB, new Uint8Array(), async (message) => {
await sendSyncMessage(channelB, async (message) => {
await receiveMessage(channelA, message);
return { success: true };
return true;
});
const causalHistorySize = (channelA as any).causalHistorySize;

View File

@ -6,22 +6,28 @@ import { Logger } from "@waku/utils";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
import { MemLocalHistory } from "./mem_local_history.js";
import {
type ChannelId,
type HistoryEntry,
ChannelId,
ContentMessage,
EphemeralMessage,
HistoryEntry,
isContentMessage,
isEphemeralMessage,
isSyncMessage,
Message,
MessageChannelEvent,
MessageChannelEvents,
type MessageId,
type SenderId
} from "./events.js";
MessageId,
SenderId,
SyncMessage
} from "./message.js";
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
capacity: 10000,
errorRate: 0.001
};
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
const DEFAULT_CAUSAL_HISTORY_SIZE = 200;
const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2;
const log = new Logger("waku:sds:message-channel");
@ -35,26 +41,31 @@ export interface MessageChannelOptions {
*
* @default undefined because it is coupled to processTask calls frequency
*/
timeoutToMarkMessageIrretrievableMs?: number;
timeoutForLostMessagesMs?: number;
/**
* How many possible acks does it take to consider it a definitive ack.
*/
possibleAcksThreshold?: number;
}
export type ILocalHistory = Pick<
Array<ContentMessage>,
"some" | "push" | "slice" | "find" | "length"
>;
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
public readonly senderId: SenderId;
private lamportTimestamp: number;
private filter: DefaultBloomFilter;
private outgoingBuffer: Message[];
private outgoingBuffer: ContentMessage[];
private possibleAcks: Map<MessageId, number>;
private incomingBuffer: Message[];
private localHistory: { timestamp: number; historyEntry: HistoryEntry }[];
private incomingBuffer: Array<ContentMessage | SyncMessage>;
private localHistory: ILocalHistory;
private timeReceived: Map<MessageId, number>;
private readonly causalHistorySize: number;
private readonly possibleAcksThreshold: number;
private readonly timeoutToMarkMessageIrretrievableMs?: number;
private readonly timeoutForLostMessagesMs?: number;
private tasks: Task[] = [];
private handlers: Handlers = {
@ -78,7 +89,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public constructor(
channelId: ChannelId,
senderId: SenderId,
options: MessageChannelOptions = {}
options: MessageChannelOptions = {},
localHistory: ILocalHistory = new MemLocalHistory()
) {
super();
this.channelId = channelId;
@ -88,15 +100,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.outgoingBuffer = [];
this.possibleAcks = new Map();
this.incomingBuffer = [];
this.localHistory = [];
this.localHistory = localHistory;
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
// TODO: this should be determined based on the bloom filter parameters and number of hashes
this.possibleAcksThreshold =
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
this.timeReceived = new Map();
this.timeoutToMarkMessageIrretrievableMs =
options.timeoutToMarkMessageIrretrievableMs;
this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs;
}
public static getMessageId(payload: Uint8Array): MessageId {
@ -121,7 +132,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* await channel.processTasks();
* ```
*
* @throws Will emit a 'taskError' event if any task fails, but continues processing remaining tasks
* @emits CustomEvent("taskError", { detail: { command, error, params } }
* if any task fails, but continues processing remaining tasks
*/
public async processTasks(): Promise<void> {
while (this.tasks.length > 0) {
@ -141,7 +153,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* This ensures proper lamport timestamp ordering and causal history tracking.
*
* @param payload - The message content as a byte array
* @param callback - Optional callback function called after the message is processed
* @param callback - callback function that should propagate the message
* on the routing layer; `success` should be false if sending irremediably fails,
* when set to true, the message is finalized into the channel locally.
* @returns Promise that resolves when the message is queued (not sent)
*
* @example
@ -157,14 +171,19 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* // Actually send the message
* await channel.processTasks();
* ```
*
* @throws Error if the payload is empty
*/
public async pushOutgoingMessage(
payload: Uint8Array,
callback?: (processedMessage: Message) => Promise<{
callback?: (processedMessage: ContentMessage) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
): Promise<void> {
if (!payload || !payload.length) {
throw Error("Only messages with valid payloads are allowed");
}
this.tasks.push({
command: Command.Send,
params: {
@ -235,7 +254,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
*/
public sweepIncomingBuffer(): HistoryEntry[] {
const { buffer, missing } = this.incomingBuffer.reduce<{
buffer: Message[];
buffer: Array<ContentMessage | SyncMessage>;
missing: Set<HistoryEntry>;
}>(
({ buffer, missing }, message) => {
@ -248,12 +267,11 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === messageHistoryEntry.messageId
({ messageId }) => messageId === messageHistoryEntry.messageId
)
);
if (missingDependencies.length === 0) {
if (this.deliverMessage(message)) {
if (isContentMessage(message) && this.deliverMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
detail: message.messageId
});
@ -269,13 +287,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// Optionally, if a message has not been received after a predetermined amount of time,
// its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
if (this.timeoutToMarkMessageIrretrievableMs) {
if (this.timeoutForLostMessagesMs) {
const timeReceived = this.timeReceived.get(message.messageId);
if (
timeReceived &&
Date.now() - timeReceived > this.timeoutToMarkMessageIrretrievableMs
Date.now() - timeReceived > this.timeoutForLostMessagesMs
) {
this.safeSendEvent(MessageChannelEvent.InMessageIrretrievablyLost, {
this.safeSendEvent(MessageChannelEvent.InMessageLost, {
detail: Array.from(missingDependencies)
});
return { buffer, missing };
@ -289,7 +307,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
missing
};
},
{ buffer: new Array<Message>(), missing: new Set<HistoryEntry>() }
{ buffer: new Array<ContentMessage>(), missing: new Set<HistoryEntry>() }
);
this.incomingBuffer = buffer;
@ -302,12 +320,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep
public sweepOutgoingBuffer(): {
unacknowledged: Message[];
possiblyAcknowledged: Message[];
unacknowledged: ContentMessage[];
possiblyAcknowledged: ContentMessage[];
} {
return this.outgoingBuffer.reduce<{
unacknowledged: Message[];
possiblyAcknowledged: Message[];
unacknowledged: ContentMessage[];
possiblyAcknowledged: ContentMessage[];
}>(
({ unacknowledged, possiblyAcknowledged }, message) => {
if (this.possibleAcks.has(message.messageId)) {
@ -322,8 +340,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
};
},
{
unacknowledged: new Array<Message>(),
possiblyAcknowledged: new Array<Message>()
unacknowledged: new Array<ContentMessage>(),
possiblyAcknowledged: new Array<ContentMessage>()
}
);
}
@ -339,27 +357,28 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
*/
public async pushOutgoingSyncMessage(
callback?: (message: Message) => Promise<boolean>
callback?: (message: SyncMessage) => Promise<boolean>
): Promise<boolean> {
this.lamportTimestamp++;
const emptyMessage = new Uint8Array();
const message = new Message(
MessageChannel.getMessageId(emptyMessage),
const message = new SyncMessage(
// does not need to be secure randomness
`sync-${Math.random().toString(36).substring(2)}`,
this.channelId,
this.senderId,
this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
.map(({ messageId, retrievalHint }) => {
return { messageId, retrievalHint };
}),
this.lamportTimestamp,
this.filter.toBytes(),
emptyMessage
undefined
);
if (callback) {
try {
await callback(message);
log.info(this.senderId, "sync message sent", message.messageId);
this.safeSendEvent(MessageChannelEvent.OutSyncSent, {
detail: message
});
@ -376,26 +395,41 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
private _pushIncomingMessage(message: Message): void {
log.info(this.senderId, "incoming message", message.messageId);
const isDuplicate =
message.content &&
message.content.length > 0 &&
this.timeReceived.has(message.messageId);
if (isDuplicate) {
log.info(
this.senderId,
"dropping dupe incoming message",
message.messageId
);
return;
}
const isOwnOutgoingMessage = this.senderId === message.senderId;
if (isOwnOutgoingMessage) {
log.info(this.senderId, "ignoring own incoming message");
return;
}
// Ephemeral messages SHOULD be delivered immediately
if (!message.lamportTimestamp) {
this.deliverMessage(message);
if (isEphemeralMessage(message)) {
log.info(this.senderId, "delivering ephemeral message");
return;
}
if (message.content?.length === 0) {
if (!isSyncMessage(message) && !isContentMessage(message)) {
log.error(
this.senderId,
"internal error, a message is neither sync nor ephemeral nor content, ignoring it",
message
);
return;
}
if (isSyncMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InSyncReceived, {
detail: message
});
@ -405,15 +439,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
});
}
this.reviewAckStatus(message);
if (message.content?.length && message.content.length > 0) {
if (isContentMessage(message)) {
this.filter.insert(message.messageId);
}
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
({ historyEntry: { messageId } }) =>
messageId === messageHistoryEntry.messageId
({ messageId }) => messageId === messageHistoryEntry.messageId
)
);
@ -427,7 +460,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
missingDependencies.map((ch) => ch.messageId)
);
} else {
if (this.deliverMessage(message)) {
if (isContentMessage(message) && this.deliverMessage(message)) {
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
detail: message.messageId
});
@ -465,7 +498,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private async _pushOutgoingMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<{
callback?: (message: ContentMessage) => Promise<{
success: boolean;
retrievalHint?: Uint8Array;
}>
@ -484,33 +517,35 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// It's a new message
if (!message) {
message = new Message(
log.info(this.senderId, "sending new message", messageId);
message = new ContentMessage(
messageId,
this.channelId,
this.senderId,
this.localHistory
.slice(-this.causalHistorySize)
.map(({ historyEntry }) => historyEntry),
.map(({ messageId, retrievalHint }) => {
return { messageId, retrievalHint };
}),
this.lamportTimestamp,
this.filter.toBytes(),
payload
);
this.outgoingBuffer.push(message);
} else {
log.info(this.senderId, "resending message", messageId);
}
if (callback) {
try {
const { success, retrievalHint } = await callback(message);
if (success) {
// isContentMessage should always be true as `this.lamportTimestamp` has been
// used to create the message
if (success && isContentMessage(message)) {
message.retrievalHint = retrievalHint;
this.filter.insert(messageId);
this.localHistory.push({
timestamp: this.lamportTimestamp,
historyEntry: {
messageId,
retrievalHint
}
});
this.localHistory.push(message);
this.timeReceived.set(messageId, Date.now());
this.safeSendEvent(MessageChannelEvent.OutMessageSent, {
detail: message
@ -525,9 +560,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private async _pushOutgoingEphemeralMessage(
payload: Uint8Array,
callback?: (message: Message) => Promise<boolean>
callback?: (message: EphemeralMessage) => Promise<boolean>
): Promise<void> {
const message = new Message(
const message = new EphemeralMessage(
MessageChannel.getMessageId(payload),
this.channelId,
this.senderId,
@ -559,13 +594,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
*/
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
private deliverMessage(
message: Message,
message: ContentMessage,
retrievalHint?: Uint8Array
): boolean {
if (
message.content?.length === 0 ||
message.lamportTimestamp === undefined
) {
if (!isContentMessage(message)) {
// Messages with empty content are sync messages.
// Messages with no timestamp are ephemeral messages.
// They do not need to be "delivered".
@ -580,7 +612,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// Check if the entry is already present
const existingHistoryEntry = this.localHistory.find(
({ historyEntry }) => historyEntry.messageId === message.messageId
({ messageId }) => messageId === message.messageId
);
// The history entry is already present, no need to re-add
@ -588,24 +620,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return true;
}
// The participant MUST insert the message ID into its local log,
// based on Lamport timestamp.
// If one or more message IDs with the same Lamport timestamp already exists,
// the participant MUST follow the Resolve Conflicts procedure.
// https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts
this.localHistory.push({
timestamp: message.lamportTimestamp,
historyEntry: {
messageId: message.messageId,
retrievalHint
}
});
this.localHistory.sort((a, b) => {
if (a.timestamp !== b.timestamp) {
return a.timestamp - b.timestamp;
}
return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId);
});
message.retrievalHint = retrievalHint;
this.localHistory.push(message);
return true;
}
@ -615,7 +632,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private reviewAckStatus(receivedMessage: Message): void {
log.info(
this.senderId,
"reviewing ack status using:",
"reviewing ack status using causal history:",
receivedMessage.causalHistory.map((ch) => ch.messageId)
);
log.info(
@ -625,24 +642,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
);
receivedMessage.causalHistory.forEach(({ messageId }) => {
this.outgoingBuffer = this.outgoingBuffer.filter(
({ messageId: outgoingMessageId }) => {
if (outgoingMessageId !== messageId) {
({ messageId: bufferMessageId }) => {
if (bufferMessageId !== messageId) {
return true;
}
log.info(this.senderId, "message acknowledged", messageId);
this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, {
detail: messageId
});
return false;
}
);
this.possibleAcks.delete(messageId);
if (!this.filter.lookup(messageId)) {
this.filter.insert(messageId);
}
});
if (!receivedMessage.bloomFilter) {
return;
}
const messageBloomFilter = DefaultBloomFilter.fromBytes(
receivedMessage.bloomFilter,
this.filter.options
@ -657,15 +673,27 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const count = (this.possibleAcks.get(message.messageId) ?? 0) + 1;
if (count < this.possibleAcksThreshold) {
this.possibleAcks.set(message.messageId, count);
log.info(
this.senderId,
"message possibly acknowledged",
message.messageId,
count
);
this.safeSendEvent(MessageChannelEvent.OutMessagePossiblyAcknowledged, {
detail: {
messageId: message.messageId,
count
}
});
// Not enough possible acks to acknowledge it, keep it in buffer
return true;
}
// Enough possible acks for it to be acknowledged
this.possibleAcks.delete(message.messageId);
log.info(this.senderId, "message acknowledged", message.messageId, count);
this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, {
detail: message.messageId
});
return false;
});
}