2025-03-07 18:00:33 -08:00
|
|
|
import { TypedEventEmitter } from "@libp2p/interface";
|
2025-08-08 10:18:01 +10:00
|
|
|
import { sha256 } from "@noble/hashes/sha2";
|
2025-02-11 13:24:43 -08:00
|
|
|
import { bytesToHex } from "@noble/hashes/utils";
|
2025-06-03 14:46:12 -07:00
|
|
|
import { Logger } from "@waku/utils";
|
2025-02-11 13:24:43 -08:00
|
|
|
|
2025-06-03 14:46:12 -07:00
|
|
|
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
|
2025-02-11 13:24:43 -08:00
|
|
|
|
2025-06-03 14:46:12 -07:00
|
|
|
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
|
2025-08-14 10:44:18 +10:00
|
|
|
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
|
|
|
|
|
import { MemLocalHistory } from "./mem_local_history.js";
|
2025-06-03 14:46:12 -07:00
|
|
|
import {
|
2025-08-14 10:44:18 +10:00
|
|
|
ChannelId,
|
|
|
|
|
ContentMessage,
|
|
|
|
|
EphemeralMessage,
|
|
|
|
|
HistoryEntry,
|
|
|
|
|
isContentMessage,
|
|
|
|
|
isEphemeralMessage,
|
|
|
|
|
isSyncMessage,
|
2025-06-03 14:46:12 -07:00
|
|
|
Message,
|
2025-08-14 10:44:18 +10:00
|
|
|
MessageId,
|
|
|
|
|
SenderId,
|
|
|
|
|
SyncMessage
|
|
|
|
|
} from "./message.js";
|
2025-02-11 13:24:43 -08:00
|
|
|
|
|
|
|
|
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
|
|
|
|
capacity: 10000,
|
|
|
|
|
errorRate: 0.001
|
|
|
|
|
};
|
|
|
|
|
|
2025-08-14 10:44:18 +10:00
|
|
|
const DEFAULT_CAUSAL_HISTORY_SIZE = 200;
|
2025-08-12 10:47:52 +10:00
|
|
|
const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2;
|
2025-02-11 13:24:43 -08:00
|
|
|
|
2025-08-27 23:32:41 +10:00
|
|
|
const log = new Logger("sds:message-channel");
|
2025-06-03 14:46:12 -07:00
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
export interface MessageChannelOptions {
|
2025-03-07 18:00:33 -08:00
|
|
|
causalHistorySize?: number;
|
2025-08-12 10:47:52 +10:00
|
|
|
/**
|
|
|
|
|
* The time in milliseconds after which a message dependencies that could not
|
|
|
|
|
* be resolved is marked as irretrievable.
|
|
|
|
|
* Disabled if undefined or `0`.
|
|
|
|
|
*
|
|
|
|
|
* @default undefined because it is coupled to processTask calls frequency
|
|
|
|
|
*/
|
2025-08-14 10:44:18 +10:00
|
|
|
timeoutForLostMessagesMs?: number;
|
2025-08-12 10:47:52 +10:00
|
|
|
/**
|
|
|
|
|
* How many possible acks does it take to consider it a definitive ack.
|
|
|
|
|
*/
|
|
|
|
|
possibleAcksThreshold?: number;
|
2025-03-07 18:00:33 -08:00
|
|
|
}
|
|
|
|
|
|
2025-08-14 10:44:18 +10:00
|
|
|
export type ILocalHistory = Pick<
|
|
|
|
|
Array<ContentMessage>,
|
2025-08-28 15:57:23 +10:00
|
|
|
"some" | "push" | "slice" | "find" | "length" | "findIndex"
|
2025-08-14 10:44:18 +10:00
|
|
|
>;
|
|
|
|
|
|
2025-03-07 18:00:33 -08:00
|
|
|
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
2025-06-11 17:34:51 -07:00
|
|
|
public readonly channelId: ChannelId;
|
2025-08-12 10:47:52 +10:00
|
|
|
public readonly senderId: SenderId;
|
fix!: SDS lamport timestamp overflow and keep it to current time (#2664)
* fix!: avoid SDS lamport timestamp overflow
The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052).
The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp.
Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp.
uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity.
max uint64 is 18,446,744,073,709,551,615 (20 digits).
Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106.
The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol.
* change lamportTimestamp to uint64 in protobuf
* lamport timestamp remains close to current time
2025-10-02 09:07:10 +10:00
|
|
|
private lamportTimestamp: bigint;
|
2025-02-11 13:24:43 -08:00
|
|
|
private filter: DefaultBloomFilter;
|
2025-08-14 10:44:18 +10:00
|
|
|
private outgoingBuffer: ContentMessage[];
|
2025-08-12 10:47:52 +10:00
|
|
|
private possibleAcks: Map<MessageId, number>;
|
2025-08-14 10:44:18 +10:00
|
|
|
private incomingBuffer: Array<ContentMessage | SyncMessage>;
|
2025-08-28 15:57:23 +10:00
|
|
|
private readonly localHistory: ILocalHistory;
|
2025-10-04 02:02:38 +05:30
|
|
|
private localHistoryIndex: Set<MessageId>;
|
2025-08-08 10:18:01 +10:00
|
|
|
private timeReceived: Map<MessageId, number>;
|
|
|
|
|
private readonly causalHistorySize: number;
|
2025-08-12 10:47:52 +10:00
|
|
|
private readonly possibleAcksThreshold: number;
|
2025-08-14 10:44:18 +10:00
|
|
|
private readonly timeoutForLostMessagesMs?: number;
|
2025-06-03 14:46:12 -07:00
|
|
|
|
|
|
|
|
private tasks: Task[] = [];
|
|
|
|
|
private handlers: Handlers = {
|
|
|
|
|
[Command.Send]: async (
|
|
|
|
|
params: ParamsByAction[Command.Send]
|
|
|
|
|
): Promise<void> => {
|
2025-08-12 10:47:52 +10:00
|
|
|
await this._pushOutgoingMessage(params.payload, params.callback);
|
2025-06-03 14:46:12 -07:00
|
|
|
},
|
|
|
|
|
[Command.Receive]: async (
|
|
|
|
|
params: ParamsByAction[Command.Receive]
|
|
|
|
|
): Promise<void> => {
|
2025-08-12 10:47:52 +10:00
|
|
|
this._pushIncomingMessage(params.message);
|
2025-06-03 14:46:12 -07:00
|
|
|
},
|
|
|
|
|
[Command.SendEphemeral]: async (
|
|
|
|
|
params: ParamsByAction[Command.SendEphemeral]
|
|
|
|
|
): Promise<void> => {
|
2025-08-12 10:47:52 +10:00
|
|
|
await this._pushOutgoingEphemeralMessage(params.payload, params.callback);
|
2025-06-03 14:46:12 -07:00
|
|
|
}
|
|
|
|
|
};
|
2025-02-11 13:24:43 -08:00
|
|
|
|
|
|
|
|
public constructor(
|
|
|
|
|
channelId: ChannelId,
|
2025-08-12 10:47:52 +10:00
|
|
|
senderId: SenderId,
|
2025-08-14 10:44:18 +10:00
|
|
|
options: MessageChannelOptions = {},
|
|
|
|
|
localHistory: ILocalHistory = new MemLocalHistory()
|
2025-02-11 13:24:43 -08:00
|
|
|
) {
|
2025-03-07 18:00:33 -08:00
|
|
|
super();
|
2025-02-11 13:24:43 -08:00
|
|
|
this.channelId = channelId;
|
2025-08-12 10:47:52 +10:00
|
|
|
this.senderId = senderId;
|
fix!: SDS lamport timestamp overflow and keep it to current time (#2664)
* fix!: avoid SDS lamport timestamp overflow
The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052).
The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp.
Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp.
uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity.
max uint64 is 18,446,744,073,709,551,615 (20 digits).
Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106.
The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol.
* change lamportTimestamp to uint64 in protobuf
* lamport timestamp remains close to current time
2025-10-02 09:07:10 +10:00
|
|
|
// Initialize channel lamport timestamp to current time in milliseconds.
|
|
|
|
|
this.lamportTimestamp = BigInt(Date.now());
|
2025-02-11 13:24:43 -08:00
|
|
|
this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS);
|
|
|
|
|
this.outgoingBuffer = [];
|
2025-08-12 10:47:52 +10:00
|
|
|
this.possibleAcks = new Map();
|
2025-02-11 13:24:43 -08:00
|
|
|
this.incomingBuffer = [];
|
2025-08-14 10:44:18 +10:00
|
|
|
this.localHistory = localHistory;
|
2025-10-04 02:02:38 +05:30
|
|
|
this.localHistoryIndex = new Set(
|
|
|
|
|
localHistory.slice(0).map((msg) => msg.messageId)
|
|
|
|
|
);
|
2025-03-07 18:00:33 -08:00
|
|
|
this.causalHistorySize =
|
|
|
|
|
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
|
2025-08-12 10:47:52 +10:00
|
|
|
// TODO: this should be determined based on the bloom filter parameters and number of hashes
|
|
|
|
|
this.possibleAcksThreshold =
|
|
|
|
|
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
|
2025-02-21 11:26:22 -08:00
|
|
|
this.timeReceived = new Map();
|
2025-08-14 10:44:18 +10:00
|
|
|
this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs;
|
2025-06-03 14:46:12 -07:00
|
|
|
}
|
|
|
|
|
|
2025-08-08 10:18:01 +10:00
|
|
|
public static getMessageId(payload: Uint8Array): MessageId {
|
2025-06-11 17:34:51 -07:00
|
|
|
return bytesToHex(sha256(payload));
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-03 14:46:12 -07:00
|
|
|
/**
|
|
|
|
|
* Processes all queued tasks sequentially to ensure proper message ordering.
|
|
|
|
|
*
|
|
|
|
|
* This method should be called periodically by the library consumer to execute
|
|
|
|
|
* queued send/receive operations in the correct sequence.
|
|
|
|
|
*
|
|
|
|
|
* @example
|
|
|
|
|
* ```typescript
|
|
|
|
|
* const channel = new MessageChannel("my-channel");
|
|
|
|
|
*
|
|
|
|
|
* // Queue some operations
|
2025-08-12 10:47:52 +10:00
|
|
|
* await channel.pushOutgoingMessage(payload, callback);
|
|
|
|
|
* channel.pushIncomingMessage(incomingMessage);
|
2025-06-03 14:46:12 -07:00
|
|
|
*
|
|
|
|
|
* // Process all queued operations
|
|
|
|
|
* await channel.processTasks();
|
|
|
|
|
* ```
|
|
|
|
|
*
|
2025-08-14 10:44:18 +10:00
|
|
|
* @emits CustomEvent("taskError", { detail: { command, error, params } }
|
|
|
|
|
* if any task fails, but continues processing remaining tasks
|
2025-06-03 14:46:12 -07:00
|
|
|
*/
|
|
|
|
|
public async processTasks(): Promise<void> {
|
|
|
|
|
while (this.tasks.length > 0) {
|
|
|
|
|
const item = this.tasks.shift();
|
|
|
|
|
if (!item) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.executeTask(item);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-02-11 13:24:43 -08:00
|
|
|
/**
|
2025-06-03 14:46:12 -07:00
|
|
|
* Queues a message to be sent on this channel.
|
2025-02-11 13:24:43 -08:00
|
|
|
*
|
2025-06-03 14:46:12 -07:00
|
|
|
* The message will be processed sequentially when processTasks() is called.
|
|
|
|
|
* This ensures proper lamport timestamp ordering and causal history tracking.
|
2025-02-11 13:24:43 -08:00
|
|
|
*
|
2025-06-03 14:46:12 -07:00
|
|
|
* @param payload - The message content as a byte array
|
2025-08-14 10:44:18 +10:00
|
|
|
* @param callback - callback function that should propagate the message
|
|
|
|
|
* on the routing layer; `success` should be false if sending irremediably fails,
|
|
|
|
|
* when set to true, the message is finalized into the channel locally.
|
2025-06-03 14:46:12 -07:00
|
|
|
* @returns Promise that resolves when the message is queued (not sent)
|
2025-02-11 13:24:43 -08:00
|
|
|
*
|
2025-06-03 14:46:12 -07:00
|
|
|
* @example
|
|
|
|
|
* ```typescript
|
|
|
|
|
* const channel = new MessageChannel("chat-room");
|
|
|
|
|
* const message = new TextEncoder().encode("Hello, world!");
|
2025-02-11 13:24:43 -08:00
|
|
|
*
|
2025-08-12 10:47:52 +10:00
|
|
|
* await channel.pushOutgoingMessage(message, async (processedMessage) => {
|
2025-06-03 14:46:12 -07:00
|
|
|
* console.log("Message processed:", processedMessage.messageId);
|
|
|
|
|
* return { success: true };
|
|
|
|
|
* });
|
|
|
|
|
*
|
|
|
|
|
* // Actually send the message
|
|
|
|
|
* await channel.processTasks();
|
|
|
|
|
* ```
|
2025-08-14 10:44:18 +10:00
|
|
|
*
|
|
|
|
|
* @throws Error if the payload is empty
|
2025-02-11 13:24:43 -08:00
|
|
|
*/
|
2025-09-09 12:43:48 +10:00
|
|
|
public pushOutgoingMessage(
|
2025-02-11 13:24:43 -08:00
|
|
|
payload: Uint8Array,
|
2025-08-14 10:44:18 +10:00
|
|
|
callback?: (processedMessage: ContentMessage) => Promise<{
|
2025-03-10 19:55:43 -07:00
|
|
|
success: boolean;
|
|
|
|
|
retrievalHint?: Uint8Array;
|
|
|
|
|
}>
|
2025-09-09 12:43:48 +10:00
|
|
|
): void {
|
2025-08-14 10:44:18 +10:00
|
|
|
if (!payload || !payload.length) {
|
|
|
|
|
throw Error("Only messages with valid payloads are allowed");
|
|
|
|
|
}
|
2025-06-03 14:46:12 -07:00
|
|
|
this.tasks.push({
|
|
|
|
|
command: Command.Send,
|
|
|
|
|
params: {
|
|
|
|
|
payload,
|
|
|
|
|
callback
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-07 18:00:33 -08:00
|
|
|
/**
|
|
|
|
|
* Sends a short-lived message without synchronization or reliability requirements.
|
|
|
|
|
*
|
|
|
|
|
* Sends a message without a timestamp, causal history, or bloom filter.
|
|
|
|
|
* Ephemeral messages are not added to the outgoing buffer.
|
|
|
|
|
* Upon reception, ephemeral messages are delivered immediately without
|
|
|
|
|
* checking for causal dependencies or including in the local log.
|
|
|
|
|
*
|
|
|
|
|
* See https://rfc.vac.dev/vac/raw/sds/#ephemeral-messages
|
|
|
|
|
*
|
|
|
|
|
* @param payload - The payload to send.
|
|
|
|
|
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
|
|
|
|
*/
|
2025-08-12 10:47:52 +10:00
|
|
|
public async pushOutgoingEphemeralMessage(
|
2025-03-07 18:00:33 -08:00
|
|
|
payload: Uint8Array,
|
2025-08-12 10:47:52 +10:00
|
|
|
callback?: (processedMessage: Message) => Promise<boolean>
|
2025-06-03 14:46:12 -07:00
|
|
|
): Promise<void> {
|
|
|
|
|
this.tasks.push({
|
|
|
|
|
command: Command.SendEphemeral,
|
|
|
|
|
params: {
|
|
|
|
|
payload,
|
|
|
|
|
callback
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-02-11 13:24:43 -08:00
|
|
|
/**
|
2025-06-03 14:46:12 -07:00
|
|
|
* Queues a received message for processing.
|
2025-02-11 13:24:43 -08:00
|
|
|
*
|
2025-06-03 14:46:12 -07:00
|
|
|
* The message will be processed when processTasks() is called, ensuring
|
|
|
|
|
* proper dependency resolution and causal ordering.
|
2025-02-11 13:24:43 -08:00
|
|
|
*
|
2025-06-03 14:46:12 -07:00
|
|
|
* @param message - The message to receive and process
|
2025-08-28 15:57:23 +10:00
|
|
|
* @param retrievalHint - The retrieval hint for the message, provided by the transport layer
|
2025-06-03 14:46:12 -07:00
|
|
|
* @example
|
|
|
|
|
* ```typescript
|
|
|
|
|
* const channel = new MessageChannel("chat-room");
|
|
|
|
|
*
|
|
|
|
|
* // Receive a message from the network
|
2025-08-12 10:47:52 +10:00
|
|
|
* channel.pushIncomingMessage(incomingMessage);
|
2025-06-03 14:46:12 -07:00
|
|
|
*
|
|
|
|
|
* // Process the received message
|
|
|
|
|
* await channel.processTasks();
|
|
|
|
|
* ```
|
2025-02-11 13:24:43 -08:00
|
|
|
*/
|
2025-08-28 15:57:23 +10:00
|
|
|
public pushIncomingMessage(
|
|
|
|
|
message: Message,
|
|
|
|
|
retrievalHint: Uint8Array | undefined
|
|
|
|
|
): void {
|
|
|
|
|
message.retrievalHint = retrievalHint;
|
|
|
|
|
|
2025-06-03 14:46:12 -07:00
|
|
|
this.tasks.push({
|
|
|
|
|
command: Command.Receive,
|
|
|
|
|
params: {
|
|
|
|
|
message
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
2025-06-11 17:34:51 -07:00
|
|
|
/**
|
|
|
|
|
* Processes messages in the incoming buffer, delivering those with satisfied dependencies.
|
|
|
|
|
*
|
|
|
|
|
* @returns Array of history entries for messages still missing dependencies
|
|
|
|
|
*/
|
2025-03-10 19:55:43 -07:00
|
|
|
public sweepIncomingBuffer(): HistoryEntry[] {
|
2025-02-21 11:26:22 -08:00
|
|
|
const { buffer, missing } = this.incomingBuffer.reduce<{
|
2025-08-14 10:44:18 +10:00
|
|
|
buffer: Array<ContentMessage | SyncMessage>;
|
2025-06-03 14:46:12 -07:00
|
|
|
missing: Set<HistoryEntry>;
|
2025-02-21 11:26:22 -08:00
|
|
|
}>(
|
|
|
|
|
({ buffer, missing }, message) => {
|
2025-08-12 10:47:52 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"sweeping incoming buffer",
|
|
|
|
|
message.messageId,
|
|
|
|
|
message.causalHistory.map((ch) => ch.messageId)
|
|
|
|
|
);
|
2025-02-21 11:26:22 -08:00
|
|
|
const missingDependencies = message.causalHistory.filter(
|
2025-03-10 19:55:43 -07:00
|
|
|
(messageHistoryEntry) =>
|
2025-10-04 02:02:38 +05:30
|
|
|
!this.localHistoryIndex.has(messageHistoryEntry.messageId)
|
2025-02-21 11:26:22 -08:00
|
|
|
);
|
|
|
|
|
if (missingDependencies.length === 0) {
|
2025-08-14 10:44:18 +10:00
|
|
|
if (isContentMessage(message) && this.deliverMessage(message)) {
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
|
|
|
|
|
detail: message.messageId
|
|
|
|
|
});
|
|
|
|
|
}
|
2025-02-21 11:26:22 -08:00
|
|
|
return { buffer, missing };
|
|
|
|
|
}
|
2025-08-12 10:47:52 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
2025-09-09 12:43:48 +10:00
|
|
|
"message from incoming buffer",
|
2025-08-12 10:47:52 +10:00
|
|
|
message.messageId,
|
|
|
|
|
"is missing dependencies",
|
2025-08-28 15:57:23 +10:00
|
|
|
missingDependencies.map(({ messageId, retrievalHint }) => {
|
|
|
|
|
return { messageId, retrievalHint };
|
|
|
|
|
})
|
2025-08-12 10:47:52 +10:00
|
|
|
);
|
2025-02-21 11:26:22 -08:00
|
|
|
|
|
|
|
|
// Optionally, if a message has not been received after a predetermined amount of time,
|
2025-08-12 10:47:52 +10:00
|
|
|
// its dependencies are marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
|
2025-08-14 10:44:18 +10:00
|
|
|
if (this.timeoutForLostMessagesMs) {
|
2025-02-21 11:26:22 -08:00
|
|
|
const timeReceived = this.timeReceived.get(message.messageId);
|
|
|
|
|
if (
|
|
|
|
|
timeReceived &&
|
2025-08-14 10:44:18 +10:00
|
|
|
Date.now() - timeReceived > this.timeoutForLostMessagesMs
|
2025-02-21 11:26:22 -08:00
|
|
|
) {
|
2025-08-14 10:44:18 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.InMessageLost, {
|
2025-08-12 10:47:52 +10:00
|
|
|
detail: Array.from(missingDependencies)
|
|
|
|
|
});
|
2025-02-21 11:26:22 -08:00
|
|
|
return { buffer, missing };
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-06-03 14:46:12 -07:00
|
|
|
missingDependencies.forEach((dependency) => {
|
|
|
|
|
missing.add(dependency);
|
|
|
|
|
});
|
2025-02-21 11:26:22 -08:00
|
|
|
return {
|
|
|
|
|
buffer: buffer.concat(message),
|
2025-06-03 14:46:12 -07:00
|
|
|
missing
|
2025-02-21 11:26:22 -08:00
|
|
|
};
|
|
|
|
|
},
|
2025-08-14 10:44:18 +10:00
|
|
|
{ buffer: new Array<ContentMessage>(), missing: new Set<HistoryEntry>() }
|
2025-02-21 11:26:22 -08:00
|
|
|
);
|
|
|
|
|
this.incomingBuffer = buffer;
|
2025-06-03 14:46:12 -07:00
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
|
2025-06-03 14:46:12 -07:00
|
|
|
detail: Array.from(missing)
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
return Array.from(missing);
|
2025-02-21 11:26:22 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep
|
|
|
|
|
public sweepOutgoingBuffer(): {
|
2025-08-14 10:44:18 +10:00
|
|
|
unacknowledged: ContentMessage[];
|
|
|
|
|
possiblyAcknowledged: ContentMessage[];
|
2025-02-21 11:26:22 -08:00
|
|
|
} {
|
|
|
|
|
return this.outgoingBuffer.reduce<{
|
2025-08-14 10:44:18 +10:00
|
|
|
unacknowledged: ContentMessage[];
|
|
|
|
|
possiblyAcknowledged: ContentMessage[];
|
2025-02-21 11:26:22 -08:00
|
|
|
}>(
|
|
|
|
|
({ unacknowledged, possiblyAcknowledged }, message) => {
|
2025-08-12 10:47:52 +10:00
|
|
|
if (this.possibleAcks.has(message.messageId)) {
|
2025-02-21 11:26:22 -08:00
|
|
|
return {
|
|
|
|
|
unacknowledged,
|
|
|
|
|
possiblyAcknowledged: possiblyAcknowledged.concat(message)
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
return {
|
|
|
|
|
unacknowledged: unacknowledged.concat(message),
|
|
|
|
|
possiblyAcknowledged
|
|
|
|
|
};
|
|
|
|
|
},
|
|
|
|
|
{
|
2025-08-14 10:44:18 +10:00
|
|
|
unacknowledged: new Array<ContentMessage>(),
|
|
|
|
|
possiblyAcknowledged: new Array<ContentMessage>()
|
2025-02-21 11:26:22 -08:00
|
|
|
}
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-03 22:02:53 -08:00
|
|
|
/**
|
|
|
|
|
* Send a sync message to the SDS channel.
|
|
|
|
|
*
|
|
|
|
|
* Increments the lamport timestamp, constructs a `Message` object
|
|
|
|
|
* with an empty load. Skips outgoing buffer, filter, and local log.
|
|
|
|
|
*
|
|
|
|
|
* See https://rfc.vac.dev/vac/raw/sds/#send-sync-message
|
|
|
|
|
*
|
|
|
|
|
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
|
|
|
|
|
*/
|
2025-08-12 10:47:52 +10:00
|
|
|
public async pushOutgoingSyncMessage(
|
2025-08-14 10:44:18 +10:00
|
|
|
callback?: (message: SyncMessage) => Promise<boolean>
|
2025-03-03 22:02:53 -08:00
|
|
|
): Promise<boolean> {
|
fix!: SDS lamport timestamp overflow and keep it to current time (#2664)
* fix!: avoid SDS lamport timestamp overflow
The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052).
The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp.
Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp.
uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity.
max uint64 is 18,446,744,073,709,551,615 (20 digits).
Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106.
The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol.
* change lamportTimestamp to uint64 in protobuf
* lamport timestamp remains close to current time
2025-10-02 09:07:10 +10:00
|
|
|
this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
|
2025-08-14 10:44:18 +10:00
|
|
|
const message = new SyncMessage(
|
|
|
|
|
// does not need to be secure randomness
|
|
|
|
|
`sync-${Math.random().toString(36).substring(2)}`,
|
2025-08-12 10:47:52 +10:00
|
|
|
this.channelId,
|
|
|
|
|
this.senderId,
|
|
|
|
|
this.localHistory
|
2025-03-03 22:02:53 -08:00
|
|
|
.slice(-this.causalHistorySize)
|
2025-08-14 10:44:18 +10:00
|
|
|
.map(({ messageId, retrievalHint }) => {
|
|
|
|
|
return { messageId, retrievalHint };
|
|
|
|
|
}),
|
2025-08-12 10:47:52 +10:00
|
|
|
this.lamportTimestamp,
|
|
|
|
|
this.filter.toBytes(),
|
2025-08-14 10:44:18 +10:00
|
|
|
undefined
|
2025-08-12 10:47:52 +10:00
|
|
|
);
|
2025-03-03 22:02:53 -08:00
|
|
|
|
2025-10-02 15:17:10 +10:00
|
|
|
if (!message.causalHistory || message.causalHistory.length === 0) {
|
|
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"no causal history in sync message, aborting sending"
|
|
|
|
|
);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2025-03-03 22:02:53 -08:00
|
|
|
if (callback) {
|
2025-06-03 14:46:12 -07:00
|
|
|
try {
|
|
|
|
|
await callback(message);
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(this.senderId, "sync message sent", message.messageId);
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.OutSyncSent, {
|
2025-06-03 14:46:12 -07:00
|
|
|
detail: message
|
|
|
|
|
});
|
|
|
|
|
return true;
|
|
|
|
|
} catch (error) {
|
2025-08-12 10:47:52 +10:00
|
|
|
log.error(
|
|
|
|
|
"Callback execution failed in pushOutgoingSyncMessage:",
|
|
|
|
|
error
|
|
|
|
|
);
|
2025-06-03 14:46:12 -07:00
|
|
|
throw error;
|
|
|
|
|
}
|
2025-03-03 22:02:53 -08:00
|
|
|
}
|
2025-10-02 15:17:10 +10:00
|
|
|
// No problem encountered so returning true
|
|
|
|
|
return true;
|
2025-03-03 22:02:53 -08:00
|
|
|
}
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
private _pushIncomingMessage(message: Message): void {
|
2025-08-28 15:57:23 +10:00
|
|
|
if (message.channelId !== this.channelId) {
|
|
|
|
|
log.warn("dropping message on different channel", message.channelId);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.info(
|
|
|
|
|
`${this.senderId} incoming message ${message.messageId}`,
|
|
|
|
|
`retrieval hint: ${bytesToHex(message.retrievalHint ?? new Uint8Array())}`
|
|
|
|
|
);
|
2025-06-11 17:34:51 -07:00
|
|
|
const isDuplicate =
|
|
|
|
|
message.content &&
|
|
|
|
|
message.content.length > 0 &&
|
|
|
|
|
this.timeReceived.has(message.messageId);
|
|
|
|
|
|
|
|
|
|
if (isDuplicate) {
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"dropping dupe incoming message",
|
|
|
|
|
message.messageId
|
|
|
|
|
);
|
2025-06-11 17:34:51 -07:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
const isOwnOutgoingMessage = this.senderId === message.senderId;
|
2025-08-08 10:18:01 +10:00
|
|
|
if (isOwnOutgoingMessage) {
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(this.senderId, "ignoring own incoming message");
|
2025-08-08 10:18:01 +10:00
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
// Ephemeral messages SHOULD be delivered immediately
|
2025-08-14 10:44:18 +10:00
|
|
|
if (isEphemeralMessage(message)) {
|
|
|
|
|
log.info(this.senderId, "delivering ephemeral message");
|
2025-06-11 17:34:51 -07:00
|
|
|
return;
|
|
|
|
|
}
|
2025-08-14 10:44:18 +10:00
|
|
|
if (!isSyncMessage(message) && !isContentMessage(message)) {
|
|
|
|
|
log.error(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"internal error, a message is neither sync nor ephemeral nor content, ignoring it",
|
|
|
|
|
message
|
|
|
|
|
);
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (isSyncMessage(message)) {
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.InSyncReceived, {
|
2025-06-11 17:34:51 -07:00
|
|
|
detail: message
|
|
|
|
|
});
|
|
|
|
|
} else {
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.InMessageReceived, {
|
2025-06-11 17:34:51 -07:00
|
|
|
detail: message
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
this.reviewAckStatus(message);
|
2025-08-14 10:44:18 +10:00
|
|
|
if (isContentMessage(message)) {
|
2025-06-11 17:34:51 -07:00
|
|
|
this.filter.insert(message.messageId);
|
|
|
|
|
}
|
2025-08-12 10:47:52 +10:00
|
|
|
|
|
|
|
|
const missingDependencies = message.causalHistory.filter(
|
|
|
|
|
(messageHistoryEntry) =>
|
2025-10-04 02:02:38 +05:30
|
|
|
!this.localHistoryIndex.has(messageHistoryEntry.messageId)
|
2025-06-11 17:34:51 -07:00
|
|
|
);
|
2025-08-12 10:47:52 +10:00
|
|
|
|
|
|
|
|
if (missingDependencies.length > 0) {
|
2025-06-11 17:34:51 -07:00
|
|
|
this.incomingBuffer.push(message);
|
|
|
|
|
this.timeReceived.set(message.messageId, Date.now());
|
2025-08-12 10:47:52 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
2025-09-09 12:43:48 +10:00
|
|
|
"new incoming message",
|
2025-08-12 10:47:52 +10:00
|
|
|
message.messageId,
|
|
|
|
|
"is missing dependencies",
|
|
|
|
|
missingDependencies.map((ch) => ch.messageId)
|
|
|
|
|
);
|
2025-09-09 12:43:48 +10:00
|
|
|
|
|
|
|
|
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
|
|
|
|
|
detail: Array.from(missingDependencies)
|
|
|
|
|
});
|
2025-06-11 17:34:51 -07:00
|
|
|
} else {
|
2025-08-14 10:44:18 +10:00
|
|
|
if (isContentMessage(message) && this.deliverMessage(message)) {
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.InMessageDelivered, {
|
|
|
|
|
detail: message.messageId
|
|
|
|
|
});
|
|
|
|
|
}
|
2025-06-11 17:34:51 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async executeTask<A extends Command>(item: Task<A>): Promise<void> {
|
|
|
|
|
try {
|
|
|
|
|
const handler = this.handlers[item.command];
|
|
|
|
|
await handler(item.params as ParamsByAction[A]);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
log.error(`Task execution failed for command ${item.command}:`, error);
|
|
|
|
|
this.dispatchEvent(
|
|
|
|
|
new CustomEvent("taskError", {
|
|
|
|
|
detail: { command: item.command, error, params: item.params }
|
|
|
|
|
})
|
|
|
|
|
);
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.ErrorTask, {
|
|
|
|
|
detail: { command: item.command, error, params: item.params }
|
|
|
|
|
});
|
2025-06-11 17:34:51 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private safeSendEvent<T extends MessageChannelEvent>(
|
|
|
|
|
event: T,
|
|
|
|
|
eventInit?: CustomEventInit
|
|
|
|
|
): void {
|
|
|
|
|
try {
|
|
|
|
|
this.dispatchEvent(new CustomEvent(event, eventInit));
|
|
|
|
|
} catch (error) {
|
|
|
|
|
log.error(`Failed to dispatch event ${event}:`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
private async _pushOutgoingMessage(
|
2025-06-11 17:34:51 -07:00
|
|
|
payload: Uint8Array,
|
2025-08-14 10:44:18 +10:00
|
|
|
callback?: (message: ContentMessage) => Promise<{
|
2025-06-11 17:34:51 -07:00
|
|
|
success: boolean;
|
|
|
|
|
retrievalHint?: Uint8Array;
|
|
|
|
|
}>
|
|
|
|
|
): Promise<void> {
|
fix!: SDS lamport timestamp overflow and keep it to current time (#2664)
* fix!: avoid SDS lamport timestamp overflow
The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052).
The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp.
Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp.
uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity.
max uint64 is 18,446,744,073,709,551,615 (20 digits).
Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106.
The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol.
* change lamportTimestamp to uint64 in protobuf
* lamport timestamp remains close to current time
2025-10-02 09:07:10 +10:00
|
|
|
this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
|
2025-06-11 17:34:51 -07:00
|
|
|
|
|
|
|
|
const messageId = MessageChannel.getMessageId(payload);
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
// if same message id is in the outgoing buffer,
|
|
|
|
|
// it means it's a retry, and we need to resend the same message
|
|
|
|
|
// to ensure we do not create a cyclic dependency of any sort.
|
2025-08-08 10:18:01 +10:00
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
let message = this.outgoingBuffer.find(
|
|
|
|
|
(m: Message) => m.messageId === messageId
|
|
|
|
|
);
|
2025-06-11 17:34:51 -07:00
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
// It's a new message
|
|
|
|
|
if (!message) {
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(this.senderId, "sending new message", messageId);
|
|
|
|
|
message = new ContentMessage(
|
2025-08-12 10:47:52 +10:00
|
|
|
messageId,
|
|
|
|
|
this.channelId,
|
|
|
|
|
this.senderId,
|
|
|
|
|
this.localHistory
|
|
|
|
|
.slice(-this.causalHistorySize)
|
2025-08-14 10:44:18 +10:00
|
|
|
.map(({ messageId, retrievalHint }) => {
|
|
|
|
|
return { messageId, retrievalHint };
|
|
|
|
|
}),
|
2025-08-12 10:47:52 +10:00
|
|
|
this.lamportTimestamp,
|
|
|
|
|
this.filter.toBytes(),
|
|
|
|
|
payload
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
this.outgoingBuffer.push(message);
|
2025-08-14 10:44:18 +10:00
|
|
|
} else {
|
|
|
|
|
log.info(this.senderId, "resending message", messageId);
|
2025-08-12 10:47:52 +10:00
|
|
|
}
|
2025-06-11 17:34:51 -07:00
|
|
|
|
|
|
|
|
if (callback) {
|
|
|
|
|
try {
|
|
|
|
|
const { success, retrievalHint } = await callback(message);
|
2025-08-14 10:44:18 +10:00
|
|
|
// isContentMessage should always be true as `this.lamportTimestamp` has been
|
|
|
|
|
// used to create the message
|
|
|
|
|
if (success && isContentMessage(message)) {
|
|
|
|
|
message.retrievalHint = retrievalHint;
|
2025-06-11 17:34:51 -07:00
|
|
|
this.filter.insert(messageId);
|
2025-08-14 10:44:18 +10:00
|
|
|
this.localHistory.push(message);
|
2025-10-04 02:02:38 +05:30
|
|
|
this.localHistoryIndex.add(messageId);
|
2025-06-11 17:34:51 -07:00
|
|
|
this.timeReceived.set(messageId, Date.now());
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.OutMessageSent, {
|
2025-06-11 17:34:51 -07:00
|
|
|
detail: message
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
} catch (error) {
|
2025-08-12 10:47:52 +10:00
|
|
|
log.error("Callback execution failed in _pushOutgoingMessage:", error);
|
2025-06-11 17:34:51 -07:00
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
private async _pushOutgoingEphemeralMessage(
|
2025-06-11 17:34:51 -07:00
|
|
|
payload: Uint8Array,
|
2025-08-14 10:44:18 +10:00
|
|
|
callback?: (message: EphemeralMessage) => Promise<boolean>
|
2025-06-11 17:34:51 -07:00
|
|
|
): Promise<void> {
|
2025-08-14 10:44:18 +10:00
|
|
|
const message = new EphemeralMessage(
|
2025-08-12 10:47:52 +10:00
|
|
|
MessageChannel.getMessageId(payload),
|
|
|
|
|
this.channelId,
|
|
|
|
|
this.senderId,
|
|
|
|
|
[],
|
|
|
|
|
undefined,
|
|
|
|
|
undefined,
|
|
|
|
|
payload
|
|
|
|
|
);
|
2025-06-11 17:34:51 -07:00
|
|
|
|
|
|
|
|
if (callback) {
|
|
|
|
|
try {
|
|
|
|
|
await callback(message);
|
|
|
|
|
} catch (error) {
|
2025-08-12 10:47:52 +10:00
|
|
|
log.error(
|
|
|
|
|
"Callback execution failed in _pushOutgoingEphemeralMessage:",
|
|
|
|
|
error
|
|
|
|
|
);
|
2025-06-11 17:34:51 -07:00
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-12 10:47:52 +10:00
|
|
|
/**
|
|
|
|
|
* Return true if the message was "delivered"
|
|
|
|
|
*
|
|
|
|
|
* @param message
|
|
|
|
|
* @private
|
|
|
|
|
*/
|
2025-02-11 13:24:43 -08:00
|
|
|
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
|
2025-08-28 15:57:23 +10:00
|
|
|
private deliverMessage(message: ContentMessage): boolean {
|
2025-08-14 10:44:18 +10:00
|
|
|
if (!isContentMessage(message)) {
|
2025-03-03 22:02:53 -08:00
|
|
|
// Messages with empty content are sync messages.
|
2025-03-07 18:00:33 -08:00
|
|
|
// Messages with no timestamp are ephemeral messages.
|
2025-08-12 10:47:52 +10:00
|
|
|
// They do not need to be "delivered".
|
2025-03-03 22:02:53 -08:00
|
|
|
// They are not added to the local log or bloom filter.
|
2025-08-12 10:47:52 +10:00
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-28 15:57:23 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"delivering message",
|
|
|
|
|
message.messageId,
|
|
|
|
|
message.retrievalHint
|
|
|
|
|
);
|
2025-08-12 10:47:52 +10:00
|
|
|
if (message.lamportTimestamp > this.lamportTimestamp) {
|
|
|
|
|
this.lamportTimestamp = message.lamportTimestamp;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Check if the entry is already present
|
|
|
|
|
const existingHistoryEntry = this.localHistory.find(
|
2025-08-14 10:44:18 +10:00
|
|
|
({ messageId }) => messageId === message.messageId
|
2025-08-12 10:47:52 +10:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// The history entry is already present, no need to re-add
|
|
|
|
|
if (existingHistoryEntry) {
|
|
|
|
|
return true;
|
2025-03-03 22:02:53 -08:00
|
|
|
}
|
|
|
|
|
|
2025-08-28 15:57:23 +10:00
|
|
|
if (!message.retrievalHint) {
|
|
|
|
|
log.warn("message delivered without a retrieval hint", message.messageId);
|
|
|
|
|
}
|
2025-08-14 10:44:18 +10:00
|
|
|
|
|
|
|
|
this.localHistory.push(message);
|
2025-10-04 02:02:38 +05:30
|
|
|
this.localHistoryIndex.add(message.messageId);
|
2025-08-12 10:47:52 +10:00
|
|
|
return true;
|
2025-02-11 13:24:43 -08:00
|
|
|
}
|
|
|
|
|
|
2025-03-03 22:02:53 -08:00
|
|
|
// For each received message (including sync messages), inspect the causal history and bloom filter
|
|
|
|
|
// to determine the acknowledgement status of messages in the outgoing buffer.
|
2025-02-11 13:24:43 -08:00
|
|
|
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
|
|
|
|
|
private reviewAckStatus(receivedMessage: Message): void {
|
2025-08-12 10:47:52 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
2025-08-14 10:44:18 +10:00
|
|
|
"reviewing ack status using causal history:",
|
2025-08-12 10:47:52 +10:00
|
|
|
receivedMessage.causalHistory.map((ch) => ch.messageId)
|
|
|
|
|
);
|
|
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"current outgoing buffer:",
|
|
|
|
|
this.outgoingBuffer.map((b) => b.messageId)
|
|
|
|
|
);
|
2025-03-10 19:55:43 -07:00
|
|
|
receivedMessage.causalHistory.forEach(({ messageId }) => {
|
2025-02-11 13:24:43 -08:00
|
|
|
this.outgoingBuffer = this.outgoingBuffer.filter(
|
2025-08-14 10:44:18 +10:00
|
|
|
({ messageId: bufferMessageId }) => {
|
|
|
|
|
if (bufferMessageId !== messageId) {
|
2025-06-03 14:46:12 -07:00
|
|
|
return true;
|
|
|
|
|
}
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(this.senderId, "message acknowledged", messageId);
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, {
|
2025-06-03 14:46:12 -07:00
|
|
|
detail: messageId
|
|
|
|
|
});
|
|
|
|
|
return false;
|
|
|
|
|
}
|
2025-02-11 13:24:43 -08:00
|
|
|
);
|
|
|
|
|
});
|
2025-08-14 10:44:18 +10:00
|
|
|
|
2025-02-11 13:24:43 -08:00
|
|
|
if (!receivedMessage.bloomFilter) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
2025-08-14 10:44:18 +10:00
|
|
|
|
2025-02-11 13:24:43 -08:00
|
|
|
const messageBloomFilter = DefaultBloomFilter.fromBytes(
|
|
|
|
|
receivedMessage.bloomFilter,
|
|
|
|
|
this.filter.options
|
|
|
|
|
);
|
|
|
|
|
this.outgoingBuffer = this.outgoingBuffer.filter((message) => {
|
|
|
|
|
if (!messageBloomFilter.lookup(message.messageId)) {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
// If a message appears as possibly acknowledged in multiple received bloom filters,
|
|
|
|
|
// the participant MAY mark it as acknowledged based on probabilistic grounds,
|
|
|
|
|
// taking into account the bloom filter size and hash number.
|
2025-08-12 10:47:52 +10:00
|
|
|
const count = (this.possibleAcks.get(message.messageId) ?? 0) + 1;
|
|
|
|
|
if (count < this.possibleAcksThreshold) {
|
|
|
|
|
this.possibleAcks.set(message.messageId, count);
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(
|
|
|
|
|
this.senderId,
|
|
|
|
|
"message possibly acknowledged",
|
|
|
|
|
message.messageId,
|
|
|
|
|
count
|
|
|
|
|
);
|
2025-08-12 10:47:52 +10:00
|
|
|
this.safeSendEvent(MessageChannelEvent.OutMessagePossiblyAcknowledged, {
|
2025-06-03 14:46:12 -07:00
|
|
|
detail: {
|
|
|
|
|
messageId: message.messageId,
|
|
|
|
|
count
|
|
|
|
|
}
|
|
|
|
|
});
|
2025-08-14 10:44:18 +10:00
|
|
|
// Not enough possible acks to acknowledge it, keep it in buffer
|
2025-02-11 13:24:43 -08:00
|
|
|
return true;
|
|
|
|
|
}
|
2025-08-14 10:44:18 +10:00
|
|
|
// Enough possible acks for it to be acknowledged
|
2025-08-12 10:47:52 +10:00
|
|
|
this.possibleAcks.delete(message.messageId);
|
2025-08-14 10:44:18 +10:00
|
|
|
log.info(this.senderId, "message acknowledged", message.messageId, count);
|
|
|
|
|
this.safeSendEvent(MessageChannelEvent.OutMessageAcknowledged, {
|
|
|
|
|
detail: message.messageId
|
|
|
|
|
});
|
2025-02-11 13:24:43 -08:00
|
|
|
return false;
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
fix!: SDS lamport timestamp overflow and keep it to current time (#2664)
* fix!: avoid SDS lamport timestamp overflow
The SDS timestamp is initialized to the current time in milliseconds, which is a 13 digits value (e.g. 1,759,223,090,052).
The maximum value for int32 is 2,147,483,647 (10 digits), which is clearly less than the timestamp.
Maximum value for uint32 is 4,294,967,295 (10 digits), which does not help with ms timestamp.
uint64 is BigInt in JavaScript, so best to be avoided unless strictly necessary as it creates complexity.
max uint64 is 18,446,744,073,709,551,615 (20 digits).
Using seconds instead of milliseconds would enable usage of uint32 valid until the year 2106.
The lamport timestamp is only initialized to current time for a new channel. The only scenario is when a user comes in a channel, and thinks it's new (did not get previous messages), and then starts sending messages. Meaning that there may be an initial timestamp conflict until the logs are consolidated, which is already handled by the protocol.
* change lamportTimestamp to uint64 in protobuf
* lamport timestamp remains close to current time
2025-10-02 09:07:10 +10:00
|
|
|
|
|
|
|
|
export function lamportTimestampIncrement(lamportTimestamp: bigint): bigint {
|
|
|
|
|
const now = BigInt(Date.now());
|
|
|
|
|
lamportTimestamp++;
|
|
|
|
|
if (now > lamportTimestamp) {
|
|
|
|
|
return now;
|
|
|
|
|
}
|
|
|
|
|
return lamportTimestamp;
|
|
|
|
|
}
|