diff --git a/package-lock.json b/package-lock.json index 649e2211fe..1330a4bcb7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -42806,6 +42806,7 @@ "version": "0.0.2", "license": "MIT OR Apache-2.0", "dependencies": { + "@libp2p/interface": "2.7.0", "@noble/hashes": "^1.7.1", "@waku/message-hash": "^0.1.18", "@waku/proto": "^0.0.9", diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index 757756b6f9..8ad2dbef6c 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -7,11 +7,81 @@ import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, MaxLengthError, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +export interface HistoryEntry { + messageId: string + retrievalHint?: Uint8Array +} + +export namespace HistoryEntry { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.messageId != null && obj.messageId !== '')) { + w.uint32(10) + w.string(obj.messageId) + } + + if (obj.retrievalHint != null) { + w.uint32(18) + w.bytes(obj.retrievalHint) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + messageId: '' + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.messageId = reader.string() + break + } + case 2: { + obj.retrievalHint = reader.bytes() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, HistoryEntry.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): HistoryEntry => { + return decodeMessage(buf, HistoryEntry.codec(), opts) + } +} + export interface SdsMessage { messageId: string channelId: string lamportTimestamp?: number - causalHistory: string[] + causalHistory: HistoryEntry[] bloomFilter?: Uint8Array content?: Uint8Array } @@ -44,7 +114,7 @@ export namespace SdsMessage { if (obj.causalHistory != null) { for (const value of obj.causalHistory) { w.uint32(90) - w.string(value) + HistoryEntry.codec().encode(value, w) } } @@ -91,7 +161,9 @@ export namespace SdsMessage { throw new MaxLengthError('Decode error - map field "causalHistory" had too many elements') } - obj.causalHistory.push(reader.string()) + obj.causalHistory.push(HistoryEntry.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.causalHistory$ + })) break } case 12: { diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index f0396e6cc8..c75c8d7447 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -1,11 +1,16 @@ syntax = "proto3"; +message HistoryEntry { + string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` + optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash +} + message SdsMessage { // 1 Reserved for sender/participant id string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel - repeated string causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. + repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes content = 20; // Actual content of the message } \ No newline at end of file diff --git a/packages/sds/package.json b/packages/sds/package.json index a5e74c631c..545ecca746 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -59,7 +59,7 @@ "node": ">=20" }, "dependencies": { - "@libp2p/interface": "^2.7.0", + "@libp2p/interface": "2.7.0", "@noble/hashes": "^1.7.1", "@waku/message-hash": "^0.1.18", "@waku/proto": "^0.0.9", diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts index b70bb6edac..64ced83926 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/sds.spec.ts @@ -4,14 +4,15 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "./bloom.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, + HistoryEntry, Message, MessageChannel, MessageChannelEvent } from "./sds.js"; const channelId = "test-channel"; -const callback = (_message: Message): Promise => { - return Promise.resolve(true); +const callback = (_message: Message): Promise<{ success: boolean }> => { + return Promise.resolve({ success: true }); }; const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { @@ -62,15 +63,16 @@ describe("MessageChannel", function () { const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const messageId = MessageChannel.getMessageId(new Uint8Array()); await channelA.sendMessage(new Uint8Array(), callback); - const messageIdLog = (channelA as any).messageIdLog as { + const messageIdLog = (channelA as any).localHistory as { timestamp: number; - messageId: string; + historyEntry: HistoryEntry; }[]; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( (log) => - log.timestamp === expectedTimestamp && log.messageId === messageId + log.timestamp === expectedTimestamp && + log.historyEntry.messageId === messageId ) ).to.equal(true); }); @@ -100,12 +102,15 @@ describe("MessageChannel", function () { // Causal history should only contain the last N messages as defined by causalHistorySize const causalHistory = outgoingBuffer[outgoingBuffer.length - 1] - .causalHistory as string[]; + .causalHistory as HistoryEntry[]; expect(causalHistory.length).to.equal(causalHistorySize); const expectedCausalHistory = messages .slice(-causalHistorySize - 1, -1) - .map((message) => MessageChannel.getMessageId(utf8ToBytes(message))); + .map((message) => ({ + messageId: MessageChannel.getMessageId(utf8ToBytes(message)), + retrievalHint: undefined + })); expect(causalHistory).to.deep.equal(expectedCausalHistory); }); }); @@ -120,7 +125,7 @@ describe("MessageChannel", function () { const timestampBefore = (channelA as any).lamportTimestamp; await channelB.sendMessage(new Uint8Array(), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); @@ -133,7 +138,7 @@ describe("MessageChannel", function () { for (const m of messagesB) { await channelB.sendMessage(utf8ToBytes(m), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } const timestampAfter = (channelA as any).lamportTimestamp; @@ -147,7 +152,7 @@ describe("MessageChannel", function () { timestamp++; channelB.receiveMessage(message); expect((channelB as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -156,7 +161,7 @@ describe("MessageChannel", function () { timestamp++; channelA.receiveMessage(message); expect((channelA as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -173,7 +178,7 @@ describe("MessageChannel", function () { channelB.receiveMessage(message); const bloomFilter = getBloomFilter(channelB); expect(bloomFilter.lookup(message.messageId)).to.equal(true); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } }); @@ -189,7 +194,7 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { receivedMessage = message; channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; @@ -201,12 +206,15 @@ describe("MessageChannel", function () { expect(timestampAfter).to.equal(timestampBefore); // Message should not be in local history - const localHistory = (channelB as any).messageIdLog as { + const localHistory = (channelB as any).localHistory as { timestamp: number; - messageId: string; + historyEntry: HistoryEntry; }[]; expect( - localHistory.some((m) => m.messageId === receivedMessage!.messageId) + localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === receivedMessage!.messageId + ) ).to.equal(false); }); }); @@ -221,14 +229,14 @@ describe("MessageChannel", function () { for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } let notInHistory: Message | null = null; await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { notInHistory = message; - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); expect((channelA as any).outgoingBuffer.length).to.equal( @@ -237,7 +245,7 @@ describe("MessageChannel", function () { await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); // Since messagesA are in causal history of channel B's message @@ -262,7 +270,7 @@ describe("MessageChannel", function () { for (const m of messages) { await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -276,7 +284,7 @@ describe("MessageChannel", function () { utf8ToBytes(messagesB[messagesB.length - 1]), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); } ); @@ -310,7 +318,7 @@ describe("MessageChannel", function () { // Send messages until acknowledgement count is reached await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -344,9 +352,9 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(m), callback); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; @@ -357,7 +365,7 @@ describe("MessageChannel", function () { const missingMessages = channelB.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); - expect(missingMessages[0]).to.equal( + expect(missingMessages[0].messageId).to.equal( MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) ); }); @@ -368,18 +376,18 @@ describe("MessageChannel", function () { for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), (message) => { sentMessages.push(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const missingMessages = channelB.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); - expect(missingMessages[0]).to.equal( + expect(missingMessages[0].messageId).to.equal( MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) ); @@ -411,7 +419,7 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelC.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const missingMessages = channelC.sweepIncomingBuffer(); @@ -439,7 +447,7 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(m), (message) => { unacknowledgedMessages.push(message); channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -458,7 +466,7 @@ describe("MessageChannel", function () { utf8ToBytes(messagesB[causalHistorySize]), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); } ); @@ -495,7 +503,7 @@ describe("MessageChannel", function () { bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) ).to.equal(false); - const localLog = (channelA as any).messageIdLog as { + const localLog = (channelA as any).localHistory as { timestamp: number; messageId: string; }[]; @@ -514,7 +522,7 @@ describe("MessageChannel", function () { expect(timestampAfter).to.equal(expectedTimestamp); expect(timestampAfter).to.be.greaterThan(timestampBefore); - const localLog = (channelB as any).messageIdLog as { + const localLog = (channelB as any).localHistory as { timestamp: number; messageId: string; }[]; @@ -530,7 +538,7 @@ describe("MessageChannel", function () { for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts index c316f88ff4..ea3c26a814 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/sds.ts @@ -13,6 +13,7 @@ type MessageChannelEvents = { }; export type Message = proto_sds_message.SdsMessage; +export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -36,7 +37,7 @@ export class MessageChannel extends TypedEventEmitter { private outgoingBuffer: Message[]; private acknowledgements: Map; private incomingBuffer: Message[]; - private messageIdLog: { timestamp: number; messageId: string }[]; + private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; private channelId: ChannelId; private causalHistorySize: number; private acknowledgementCount: number; @@ -56,7 +57,7 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.acknowledgements = new Map(); this.incomingBuffer = []; - this.messageIdLog = []; + this.localHistory = []; this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; this.acknowledgementCount = this.getAcknowledgementCount(); @@ -90,7 +91,10 @@ export class MessageChannel extends TypedEventEmitter { */ public async sendMessage( payload: Uint8Array, - callback?: (message: Message) => Promise + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }> ): Promise { this.lamportTimestamp++; @@ -100,9 +104,9 @@ export class MessageChannel extends TypedEventEmitter { messageId, channelId: this.channelId, lamportTimestamp: this.lamportTimestamp, - causalHistory: this.messageIdLog + causalHistory: this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId }) => messageId), + .map(({ historyEntry }) => historyEntry), bloomFilter: this.filter.toBytes(), content: payload }; @@ -110,10 +114,16 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer.push(message); if (callback) { - const success = await callback(message); + const { success, retrievalHint } = await callback(message); if (success) { this.filter.insert(messageId); - this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId }); + this.localHistory.push({ + timestamp: this.lamportTimestamp, + historyEntry: { + messageId, + retrievalHint + } + }); } } } @@ -175,9 +185,10 @@ export class MessageChannel extends TypedEventEmitter { this.filter.insert(message.messageId); } // verify causal history - const dependenciesMet = message.causalHistory.every((messageId) => - this.messageIdLog.some( - ({ messageId: logMessageId }) => logMessageId === messageId + const dependenciesMet = message.causalHistory.every((historyEntry) => + this.localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === historyEntry.messageId ) ); if (!dependenciesMet) { @@ -189,17 +200,18 @@ export class MessageChannel extends TypedEventEmitter { } // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep - public sweepIncomingBuffer(): string[] { + public sweepIncomingBuffer(): HistoryEntry[] { const { buffer, missing } = this.incomingBuffer.reduce<{ buffer: Message[]; - missing: string[]; + missing: HistoryEntry[]; }>( ({ buffer, missing }, message) => { // Check each message for missing dependencies const missingDependencies = message.causalHistory.filter( - (messageId) => - !this.messageIdLog.some( - ({ messageId: logMessageId }) => logMessageId === messageId + (messageHistoryEntry) => + !this.localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === messageHistoryEntry.messageId ) ); if (missingDependencies.length === 0) { @@ -227,7 +239,7 @@ export class MessageChannel extends TypedEventEmitter { missing: missing.concat(missingDependencies) }; }, - { buffer: new Array(), missing: new Array() } + { buffer: new Array(), missing: new Array() } ); // Update the incoming buffer to only include messages with no missing dependencies this.incomingBuffer = buffer; @@ -284,9 +296,9 @@ export class MessageChannel extends TypedEventEmitter { messageId: MessageChannel.getMessageId(emptyMessage), channelId: this.channelId, lamportTimestamp: this.lamportTimestamp, - causalHistory: this.messageIdLog + causalHistory: this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId }) => messageId), + .map(({ historyEntry }) => historyEntry), bloomFilter: this.filter.toBytes(), content: emptyMessage }; @@ -298,7 +310,7 @@ export class MessageChannel extends TypedEventEmitter { } // See https://rfc.vac.dev/vac/raw/sds/#deliver-message - private deliverMessage(message: Message): void { + private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { this.notifyDeliveredMessage(message.messageId); const messageLamportTimestamp = message.lamportTimestamp ?? 0; @@ -321,15 +333,18 @@ export class MessageChannel extends TypedEventEmitter { // If one or more message IDs with the same Lamport timestamp already exists, // the participant MUST follow the Resolve Conflicts procedure. // https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts - this.messageIdLog.push({ + this.localHistory.push({ timestamp: messageLamportTimestamp, - messageId: message.messageId + historyEntry: { + messageId: message.messageId, + retrievalHint + } }); - this.messageIdLog.sort((a, b) => { + this.localHistory.sort((a, b) => { if (a.timestamp !== b.timestamp) { return a.timestamp - b.timestamp; } - return a.messageId.localeCompare(b.messageId); + return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId); }); } @@ -338,9 +353,9 @@ export class MessageChannel extends TypedEventEmitter { // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status private reviewAckStatus(receivedMessage: Message): void { // the participant MUST mark all messages in the received causal_history as acknowledged. - receivedMessage.causalHistory.forEach((messageId) => { + receivedMessage.causalHistory.forEach(({ messageId }) => { this.outgoingBuffer = this.outgoingBuffer.filter( - (msg) => msg.messageId !== messageId + ({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId ); this.acknowledgements.delete(messageId); if (!this.filter.lookup(messageId)) {