diff --git a/packages/sds/package.json b/packages/sds/package.json index ae996f2553..a5e74c631c 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -59,6 +59,7 @@ "node": ">=20" }, "dependencies": { + "@libp2p/interface": "^2.7.0", "@noble/hashes": "^1.7.1", "@waku/message-hash": "^0.1.18", "@waku/proto": "^0.0.9", diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts index 9caba42c60..b70bb6edac 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/sds.spec.ts @@ -5,7 +5,8 @@ import { DefaultBloomFilter } from "./bloom.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, Message, - MessageChannel + MessageChannel, + MessageChannelEvent } from "./sds.js"; const channelId = "test-channel"; @@ -399,12 +400,10 @@ describe("MessageChannel", function () { it("should remove messages without delivering if timeout is exceeded", async () => { const causalHistorySize = (channelA as any).causalHistorySize; // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel( - channelId, - causalHistorySize, - true, - 10 - ); + const channelC: MessageChannel = new MessageChannel(channelId, { + receivedMessageTimeoutEnabled: true, + receivedMessageTimeout: 10 + }); for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), callback); @@ -547,4 +546,56 @@ describe("MessageChannel", function () { ); }); }); + + describe("Ephemeral messages", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + }); + + it("should be sent without a timestamp, causal history, or bloom filter", () => { + const timestampBefore = (channelA as any).lamportTimestamp; + channelA.sendEphemeralMessage(new Uint8Array(), (message) => { + expect(message.lamportTimestamp).to.equal(undefined); + expect(message.causalHistory).to.deep.equal([]); + expect(message.bloomFilter).to.equal(undefined); + return true; + }); + + const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + expect(outgoingBuffer.length).to.equal(0); + + const timestampAfter = (channelA as any).lamportTimestamp; + expect(timestampAfter).to.equal(timestampBefore); + }); + + it("should be delivered immediately if received", async () => { + let deliveredMessageId: string | undefined; + let sentMessage: Message | undefined; + + const channelB = new MessageChannel(channelId, { + deliveredMessageCallback: (messageId) => { + deliveredMessageId = messageId; + } + }); + + const waitForMessageDelivered = new Promise((resolve) => { + channelB.addEventListener( + MessageChannelEvent.MessageDelivered, + (event) => { + resolve(event.detail); + } + ); + + channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => { + sentMessage = message; + channelB.receiveMessage(message); + return true; + }); + }); + + const eventMessageId = await waitForMessageDelivered; + expect(deliveredMessageId).to.equal(sentMessage?.messageId); + expect(eventMessageId).to.equal(sentMessage?.messageId); + }); + }); }); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts index 0582085cb5..c316f88ff4 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/sds.ts @@ -1,9 +1,17 @@ +import { TypedEventEmitter } from "@libp2p/interface"; import { sha256 } from "@noble/hashes/sha256"; import { bytesToHex } from "@noble/hashes/utils"; import { proto_sds_message } from "@waku/proto"; import { DefaultBloomFilter } from "./bloom.js"; +export enum MessageChannelEvent { + MessageDelivered = "messageDelivered" +} +type MessageChannelEvents = { + [MessageChannelEvent.MessageDelivered]: CustomEvent; +}; + export type Message = proto_sds_message.SdsMessage; export type ChannelId = string; @@ -15,7 +23,14 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = { const DEFAULT_CAUSAL_HISTORY_SIZE = 2; const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes -export class MessageChannel { +interface MessageChannelOptions { + causalHistorySize?: number; + receivedMessageTimeoutEnabled?: boolean; + receivedMessageTimeout?: number; + deliveredMessageCallback?: (messageId: string) => void; +} + +export class MessageChannel extends TypedEventEmitter { private lamportTimestamp: number; private filter: DefaultBloomFilter; private outgoingBuffer: Message[]; @@ -26,13 +41,15 @@ export class MessageChannel { private causalHistorySize: number; private acknowledgementCount: number; private timeReceived: Map; + private receivedMessageTimeoutEnabled: boolean; + private receivedMessageTimeout: number; + private deliveredMessageCallback?: (messageId: string) => void; public constructor( channelId: ChannelId, - causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE, - private receivedMessageTimeoutEnabled: boolean = false, - private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT + options: MessageChannelOptions = {} ) { + super(); this.channelId = channelId; this.lamportTimestamp = 0; this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); @@ -40,9 +57,15 @@ export class MessageChannel { this.acknowledgements = new Map(); this.incomingBuffer = []; this.messageIdLog = []; - this.causalHistorySize = causalHistorySize; + this.causalHistorySize = + options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; this.acknowledgementCount = this.getAcknowledgementCount(); this.timeReceived = new Map(); + this.receivedMessageTimeoutEnabled = + options.receivedMessageTimeoutEnabled ?? false; + this.receivedMessageTimeout = + options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; + this.deliveredMessageCallback = options.deliveredMessageCallback; } public static getMessageId(payload: Uint8Array): string { @@ -95,6 +118,36 @@ export class MessageChannel { } } + /** + * Sends a short-lived message without synchronization or reliability requirements. + * + * Sends a message without a timestamp, causal history, or bloom filter. + * Ephemeral messages are not added to the outgoing buffer. + * Upon reception, ephemeral messages are delivered immediately without + * checking for causal dependencies or including in the local log. + * + * See https://rfc.vac.dev/vac/raw/sds/#ephemeral-messages + * + * @param payload - The payload to send. + * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. + */ + public sendEphemeralMessage( + payload: Uint8Array, + callback?: (message: Message) => boolean + ): void { + const message: Message = { + messageId: MessageChannel.getMessageId(payload), + channelId: this.channelId, + content: payload, + lamportTimestamp: undefined, + causalHistory: [], + bloomFilter: undefined + }; + + if (callback) { + callback(message); + } + } /** * Process a received SDS message for this channel. * @@ -110,6 +163,11 @@ export class MessageChannel { * @param message - The received SDS message. */ public receiveMessage(message: Message): void { + if (!message.lamportTimestamp) { + // Messages with no timestamp are ephemeral messages and should be delivered immediately + this.deliverMessage(message); + return; + } // review ack status this.reviewAckStatus(message); // add to bloom filter (skip for messages with empty content) @@ -241,13 +299,19 @@ export class MessageChannel { // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message): void { + this.notifyDeliveredMessage(message.messageId); + const messageLamportTimestamp = message.lamportTimestamp ?? 0; if (messageLamportTimestamp > this.lamportTimestamp) { this.lamportTimestamp = messageLamportTimestamp; } - if (message.content?.length === 0) { + if ( + message.content?.length === 0 || + message.lamportTimestamp === undefined + ) { // Messages with empty content are sync messages. + // Messages with no timestamp are ephemeral messages. // They are not added to the local log or bloom filter. return; } @@ -312,4 +376,15 @@ export class MessageChannel { private getAcknowledgementCount(): number { return 2; } + + private notifyDeliveredMessage(messageId: string): void { + if (this.deliveredMessageCallback) { + this.deliveredMessageCallback(messageId); + } + this.dispatchEvent( + new CustomEvent(MessageChannelEvent.MessageDelivered, { + detail: messageId + }) + ); + } }