diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index eba12d4acd..86474c41a7 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -13,6 +13,7 @@ import type { Uint8ArrayList } from 'uint8arraylist' export interface HistoryEntry { messageId: string retrievalHint?: Uint8Array + senderId?: string } export namespace HistoryEntry { @@ -35,6 +36,11 @@ export namespace HistoryEntry { w.bytes(obj.retrievalHint) } + if (obj.senderId != null) { + w.uint32(26) + w.string(obj.senderId) + } + if (opts.lengthDelimited !== false) { w.ldelim() } @@ -57,6 +63,10 @@ export namespace HistoryEntry { obj.retrievalHint = reader.bytes() break } + case 3: { + obj.senderId = reader.string() + break + } default: { reader.skipType(tag & 7) break @@ -87,6 +97,7 @@ export interface SdsMessage { lamportTimestamp?: bigint causalHistory: HistoryEntry[] bloomFilter?: Uint8Array + repairRequest: HistoryEntry[] content?: Uint8Array } @@ -132,6 +143,13 @@ export namespace SdsMessage { w.bytes(obj.bloomFilter) } + if (obj.repairRequest != null) { + for (const value of obj.repairRequest) { + w.uint32(106) + HistoryEntry.codec().encode(value, w) + } + } + if (obj.content != null) { w.uint32(162) w.bytes(obj.content) @@ -145,7 +163,8 @@ export namespace SdsMessage { senderId: '', messageId: '', channelId: '', - causalHistory: [] + causalHistory: [], + repairRequest: [] } const end = length == null ? reader.len : reader.pos + length @@ -184,6 +203,16 @@ export namespace SdsMessage { obj.bloomFilter = reader.bytes() break } + case 13: { + if (opts.limits?.repairRequest != null && obj.repairRequest.length === opts.limits.repairRequest) { + throw new MaxLengthError('Decode error - map field "repairRequest" had too many elements') + } + + obj.repairRequest.push(HistoryEntry.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.repairRequest$ + })) + break + } case 20: { obj.content = reader.bytes() break diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index c38e99b084..50ca08d716 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -3,6 +3,8 @@ syntax = "proto3"; message HistoryEntry { string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash + + optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension } message SdsMessage { @@ -12,5 +14,8 @@ message SdsMessage { optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + + repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension. + optional bytes content = 20; // Actual content of the message } diff --git a/packages/sds/src/index.ts b/packages/sds/src/index.ts index 3c1fb30cb1..8841e99477 100644 --- a/packages/sds/src/index.ts +++ b/packages/sds/src/index.ts @@ -14,7 +14,7 @@ export { type HistoryEntry, type ChannelId, type MessageChannelEvents, - type SenderId, + type ParticipantId, type MessageId } from "./message_channel/index.js"; diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts index 53c37ae388..7a8e279c2a 100644 --- a/packages/sds/src/message_channel/index.ts +++ b/packages/sds/src/message_channel/index.ts @@ -8,7 +8,7 @@ export { HistoryEntry, Message, MessageId, - SenderId, + ParticipantId, SyncMessage, isContentMessage, isEphemeralMessage, diff --git a/packages/sds/src/message_channel/message.spec.ts b/packages/sds/src/message_channel/message.spec.ts index 680bf5cdb5..e2b677121c 100644 --- a/packages/sds/src/message_channel/message.spec.ts +++ b/packages/sds/src/message_channel/message.spec.ts @@ -44,6 +44,7 @@ describe("Message serialization", () => { [{ messageId: depMessageId, retrievalHint: depRetrievalHint }], 0n, undefined, + undefined, undefined ); @@ -54,6 +55,29 @@ describe("Message serialization", () => { { messageId: depMessageId, retrievalHint: depRetrievalHint } ]); }); + + it("Repair Request", () => { + const repairMessageId = "missing-message"; + const repairRetrievalHint = utf8ToBytes("missing-retrieval"); + const repairSenderId = "original-sender"; + const message = new Message( + "123", + "my-channel", + "me", + [], + 0n, + undefined, + undefined, + [{ messageId: repairMessageId, retrievalHint: repairRetrievalHint, senderId: repairSenderId }] + ); + + const bytes = message.encode(); + const decMessage = Message.decode(bytes); + + expect(decMessage!.repairRequest).to.deep.equal([ + { messageId: repairMessageId, retrievalHint: repairRetrievalHint, senderId: repairSenderId } + ]); + }); }); describe("ContentMessage comparison with < operator", () => { diff --git a/packages/sds/src/message_channel/message.ts b/packages/sds/src/message_channel/message.ts index 78b99f9006..d820a10445 100644 --- a/packages/sds/src/message_channel/message.ts +++ b/packages/sds/src/message_channel/message.ts @@ -4,7 +4,7 @@ import { Logger } from "@waku/utils"; export type MessageId = string; export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; -export type SenderId = string; +export type ParticipantId = string; const log = new Logger("sds:message"); @@ -17,6 +17,7 @@ export class Message implements proto_sds_message.SdsMessage { public lamportTimestamp?: bigint | undefined, public bloomFilter?: Uint8Array | undefined, public content?: Uint8Array | undefined, + public repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -38,7 +39,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, lamportTimestamp, bloomFilter, - content + content, + repairRequest } = proto_sds_message.SdsMessage.decode(data); if (testContentMessage({ lamportTimestamp, content })) { @@ -49,7 +51,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, lamportTimestamp!, bloomFilter, - content! + content!, + repairRequest ); } @@ -61,7 +64,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, undefined, bloomFilter, - content! + content!, + repairRequest ); } @@ -73,7 +77,8 @@ export class Message implements proto_sds_message.SdsMessage { causalHistory, lamportTimestamp!, bloomFilter, - undefined + undefined, + repairRequest ); } log.error( @@ -97,6 +102,7 @@ export class SyncMessage extends Message { public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: undefined, + public override repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -110,6 +116,7 @@ export class SyncMessage extends Message { lamportTimestamp, bloomFilter, content, + repairRequest, retrievalHint ); } @@ -141,6 +148,7 @@ export class EphemeralMessage extends Message { public lamportTimestamp: undefined, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, + public override repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -157,6 +165,7 @@ export class EphemeralMessage extends Message { lamportTimestamp, bloomFilter, content, + repairRequest, retrievalHint ); } @@ -189,6 +198,7 @@ export class ContentMessage extends Message { public lamportTimestamp: bigint, public bloomFilter: Uint8Array | undefined, public content: Uint8Array, + public override repairRequest: proto_sds_message.HistoryEntry[] = [], /** * Not encoded, set after it is sent, used to include in follow-up messages */ @@ -205,6 +215,7 @@ export class ContentMessage extends Message { lamportTimestamp, bloomFilter, content, + repairRequest, retrievalHint ); } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 91184f04d8..ea1629250c 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -162,7 +162,8 @@ describe("MessageChannel", function () { .slice(-causalHistorySize - 1, -1) .map((message) => ({ messageId: MessageChannel.getMessageId(utf8ToBytes(message)), - retrievalHint: undefined + retrievalHint: undefined, + senderId: "alice" })); expect(causalHistory).to.deep.equal(expectedCausalHistory); }); @@ -298,6 +299,7 @@ describe("MessageChannel", function () { 1n, undefined, payload, + undefined, testRetrievalHint ), testRetrievalHint diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 3df21f160a..f6d9fb273a 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -18,9 +18,10 @@ import { isSyncMessage, Message, MessageId, - SenderId, + ParticipantId, SyncMessage } from "./message.js"; +import { RepairConfig, RepairManager } from "./repair/repair.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, @@ -46,6 +47,10 @@ export interface MessageChannelOptions { * How many possible acks does it take to consider it a definitive ack. */ possibleAcksThreshold?: number; + /** + * SDS-R repair configuration. If not provided, repair is enabled with default settings. + */ + repairConfig?: RepairConfig; } export type ILocalHistory = Pick< @@ -55,7 +60,7 @@ export type ILocalHistory = Pick< export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; - public readonly senderId: SenderId; + public readonly senderId: ParticipantId; private lamportTimestamp: bigint; private filter: DefaultBloomFilter; private outgoingBuffer: ContentMessage[]; @@ -66,6 +71,7 @@ export class MessageChannel extends TypedEventEmitter { private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; private readonly timeoutForLostMessagesMs?: number; + private readonly repairManager: RepairManager; private tasks: Task[] = []; private handlers: Handlers = { @@ -88,7 +94,7 @@ export class MessageChannel extends TypedEventEmitter { public constructor( channelId: ChannelId, - senderId: SenderId, + senderId: ParticipantId, options: MessageChannelOptions = {}, localHistory: ILocalHistory = new MemLocalHistory() ) { @@ -109,6 +115,7 @@ export class MessageChannel extends TypedEventEmitter { options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD; this.timeReceived = new Map(); this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs; + this.repairManager = new RepairManager(senderId, options.repairConfig); } public static getMessageId(payload: Uint8Array): MessageId { @@ -355,6 +362,38 @@ export class MessageChannel extends TypedEventEmitter { ); } + /** + * Sweep repair incoming buffer and rebroadcast messages ready for repair. + * Per SDS-R spec: periodically check for repair responses that are due. + * + * @param callback - callback to rebroadcast the message + * @returns Promise that resolves when all ready repairs have been sent + */ + public async sweepRepairIncomingBuffer( + callback?: (message: Message) => Promise + ): Promise { + const repairsToSend = this.repairManager.sweepIncomingBuffer( + this.localHistory + ); + + if (callback) { + for (const message of repairsToSend) { + try { + await callback(message); + log.info( + this.senderId, + "repair message rebroadcast", + message.messageId + ); + } catch (error) { + log.error("Failed to rebroadcast repair message:", error); + } + } + } + + return repairsToSend; + } + /** * Send a sync message to the SDS channel. * @@ -369,6 +408,10 @@ export class MessageChannel extends TypedEventEmitter { callback?: (message: SyncMessage) => Promise ): Promise { this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp); + + // Get repair requests to include in sync message (SDS-R) + const repairRequests = this.repairManager.getRepairRequests(3); + const message = new SyncMessage( // does not need to be secure randomness `sync-${Math.random().toString(36).substring(2)}`, @@ -376,18 +419,22 @@ export class MessageChannel extends TypedEventEmitter { this.senderId, this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId, retrievalHint }) => { - return { messageId, retrievalHint }; + .map(({ messageId, retrievalHint, senderId }) => { + return { messageId, retrievalHint, senderId }; }), this.lamportTimestamp, this.filter.toBytes(), - undefined + undefined, + repairRequests ); - if (!message.causalHistory || message.causalHistory.length === 0) { + if ( + (!message.causalHistory || message.causalHistory.length === 0) && + repairRequests.length === 0 + ) { log.info( this.senderId, - "no causal history in sync message, aborting sending" + "no causal history and no repair requests in sync message, aborting sending" ); return false; } @@ -464,6 +511,18 @@ export class MessageChannel extends TypedEventEmitter { detail: message }); } + + // SDS-R: Handle received message in repair manager + this.repairManager.onMessageReceived(message.messageId); + + // SDS-R: Process incoming repair requests + if (message.repairRequest && message.repairRequest.length > 0) { + this.repairManager.processIncomingRepairRequests( + message.repairRequest, + this.localHistory + ); + } + this.reviewAckStatus(message); if (isContentMessage(message)) { this.filter.insert(message.messageId); @@ -487,6 +546,9 @@ export class MessageChannel extends TypedEventEmitter { missingDependencies.map((ch) => ch.messageId) ); + // SDS-R: Track missing dependencies in repair manager + this.repairManager.onMissingDependencies(missingDependencies); + this.safeSendEvent(MessageChannelEvent.InMessageMissing, { detail: Array.from(missingDependencies) }); @@ -549,18 +611,23 @@ export class MessageChannel extends TypedEventEmitter { // It's a new message if (!message) { log.info(this.senderId, "sending new message", messageId); + + // Get repair requests to include in the message (SDS-R) + const repairRequests = this.repairManager.getRepairRequests(3); + message = new ContentMessage( messageId, this.channelId, this.senderId, this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId, retrievalHint }) => { - return { messageId, retrievalHint }; + .map(({ messageId, retrievalHint, senderId }) => { + return { messageId, retrievalHint, senderId }; }), this.lamportTimestamp, this.filter.toBytes(), - payload + payload, + repairRequests ); this.outgoingBuffer.push(message); @@ -657,6 +724,7 @@ export class MessageChannel extends TypedEventEmitter { } this.localHistory.push(message); + return true; } diff --git a/packages/sds/src/message_channel/repair/buffers.spec.ts b/packages/sds/src/message_channel/repair/buffers.spec.ts index 449f7d3d65..972c64d73f 100644 --- a/packages/sds/src/message_channel/repair/buffers.spec.ts +++ b/packages/sds/src/message_channel/repair/buffers.spec.ts @@ -1,10 +1,8 @@ import { expect } from "chai"; -import { - IncomingRepairBuffer, - OutgoingRepairBuffer, - RepairHistoryEntry -} from "./buffers.js"; +import type { HistoryEntry } from "../message.js"; + +import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; describe("OutgoingRepairBuffer", () => { let buffer: OutgoingRepairBuffer; @@ -14,9 +12,9 @@ describe("OutgoingRepairBuffer", () => { }); it("should add entries and maintain sorted order", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; buffer.add(entry2, 2000); buffer.add(entry1, 1000); @@ -30,21 +28,21 @@ describe("OutgoingRepairBuffer", () => { }); it("should not update T_req if message already exists", () => { - const entry: RepairHistoryEntry = { messageId: "msg1" }; - + const entry: HistoryEntry = { messageId: "msg1" }; + buffer.add(entry, 1000); buffer.add(entry, 2000); // Try to add again with different T_req - + const items = buffer.getItems(); expect(items).to.have.lengthOf(1); expect(items[0].tReq).to.equal(1000); // Should keep original }); it("should evict oldest entry when buffer is full", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; - const entry4: RepairHistoryEntry = { messageId: "msg4" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + const entry4: HistoryEntry = { messageId: "msg4" }; buffer.add(entry2, 2000); buffer.add(entry1, 1000); @@ -60,9 +58,9 @@ describe("OutgoingRepairBuffer", () => { }); it("should get eligible entries based on current time", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); @@ -79,9 +77,9 @@ describe("OutgoingRepairBuffer", () => { }); it("should respect maxRequests limit", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); @@ -94,12 +92,12 @@ describe("OutgoingRepairBuffer", () => { }); it("should remove entries", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); - + expect(buffer.size).to.equal(2); buffer.remove("msg1"); expect(buffer.size).to.equal(1); @@ -109,7 +107,7 @@ describe("OutgoingRepairBuffer", () => { it("should handle retrieval hint and sender_id", () => { const hint = new Uint8Array([1, 2, 3]); - const entry: RepairHistoryEntry = { + const entry: HistoryEntry = { messageId: "msg1", retrievalHint: hint, senderId: "sender1" @@ -130,9 +128,9 @@ describe("IncomingRepairBuffer", () => { }); it("should add entries and maintain sorted order", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; buffer.add(entry2, 2000); buffer.add(entry1, 1000); @@ -146,21 +144,21 @@ describe("IncomingRepairBuffer", () => { }); it("should ignore duplicate entries", () => { - const entry: RepairHistoryEntry = { messageId: "msg1" }; - + const entry: HistoryEntry = { messageId: "msg1" }; + buffer.add(entry, 1000); buffer.add(entry, 500); // Try to add again with earlier T_resp - + const items = buffer.getItems(); expect(items).to.have.lengthOf(1); expect(items[0].tResp).to.equal(1000); // Should keep original }); it("should evict furthest entry when buffer is full", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; - const entry4: RepairHistoryEntry = { messageId: "msg4" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; + const entry4: HistoryEntry = { messageId: "msg4" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); @@ -176,9 +174,9 @@ describe("IncomingRepairBuffer", () => { }); it("should get and remove ready entries", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; - const entry3: RepairHistoryEntry = { messageId: "msg3" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; + const entry3: HistoryEntry = { messageId: "msg3" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); @@ -187,7 +185,7 @@ describe("IncomingRepairBuffer", () => { const ready = buffer.getReady(1500); expect(ready).to.have.lengthOf(1); expect(ready[0].messageId).to.equal("msg1"); - + // Entry should be removed from buffer expect(buffer.size).to.equal(2); expect(buffer.has("msg1")).to.be.false; @@ -195,19 +193,19 @@ describe("IncomingRepairBuffer", () => { const ready2 = buffer.getReady(2500); expect(ready2).to.have.lengthOf(1); expect(ready2[0].messageId).to.equal("msg2"); - + expect(buffer.size).to.equal(1); expect(buffer.has("msg2")).to.be.false; expect(buffer.has("msg3")).to.be.true; }); it("should remove entries", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); - + expect(buffer.size).to.equal(2); buffer.remove("msg1"); expect(buffer.size).to.equal(1); @@ -216,14 +214,14 @@ describe("IncomingRepairBuffer", () => { }); it("should clear all entries", () => { - const entry1: RepairHistoryEntry = { messageId: "msg1" }; - const entry2: RepairHistoryEntry = { messageId: "msg2" }; + const entry1: HistoryEntry = { messageId: "msg1" }; + const entry2: HistoryEntry = { messageId: "msg2" }; buffer.add(entry1, 1000); buffer.add(entry2, 2000); - + expect(buffer.size).to.equal(2); buffer.clear(); expect(buffer.size).to.equal(0); }); -}); \ No newline at end of file +}); diff --git a/packages/sds/src/message_channel/repair/buffers.ts b/packages/sds/src/message_channel/repair/buffers.ts index b82c01cafb..caadd97c8f 100644 --- a/packages/sds/src/message_channel/repair/buffers.ts +++ b/packages/sds/src/message_channel/repair/buffers.ts @@ -1,22 +1,15 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; -const log = new Logger("sds:repair:buffers"); +import type { HistoryEntry } from "../message.js"; -/** - * Extended HistoryEntry that includes sender_id for SDS-R - */ -export interface RepairHistoryEntry { - messageId: string; - retrievalHint?: Uint8Array; - senderId?: string; // Original sender's ID for repair calculations -} +const log = new Logger("sds:repair:buffers"); /** * Entry in the outgoing repair buffer with request timing */ interface OutgoingBufferEntry { - entry: RepairHistoryEntry; + entry: HistoryEntry; tReq: number; // Timestamp when this repair request should be sent } @@ -24,7 +17,7 @@ interface OutgoingBufferEntry { * Entry in the incoming repair buffer with response timing */ interface IncomingBufferEntry { - entry: RepairHistoryEntry; + entry: HistoryEntry; tResp: number; // Timestamp when we should respond with this repair } @@ -45,7 +38,7 @@ export class OutgoingRepairBuffer { * Add a missing message to the outgoing repair request buffer * If message already exists, it is not updated (keeps original T_req) */ - public add(entry: RepairHistoryEntry, tReq: number): void { + public add(entry: HistoryEntry, tReq: number): void { const messageId = entry.messageId; // Check if already exists - do NOT update T_req per spec @@ -84,8 +77,8 @@ export class OutgoingRepairBuffer { * Get eligible repair requests (where T_req <= currentTime) * Returns up to maxRequests entries from the front of the sorted array */ - public getEligible(currentTime: number, maxRequests = 3): RepairHistoryEntry[] { - const eligible: RepairHistoryEntry[] = []; + public getEligible(currentTime: number, maxRequests = 3): HistoryEntry[] { + const eligible: HistoryEntry[] = []; // Iterate from front of sorted array (earliest T_req first) for (const item of this.items) { @@ -124,7 +117,7 @@ export class OutgoingRepairBuffer { /** * Get all entries (for testing/debugging) */ - public getAll(): RepairHistoryEntry[] { + public getAll(): HistoryEntry[] { return this.items.map(item => item.entry); } @@ -153,7 +146,7 @@ export class IncomingRepairBuffer { * Add a repair request that we can fulfill * If message already exists, it is ignored (not updated) */ - public add(entry: RepairHistoryEntry, tResp: number): void { + public add(entry: HistoryEntry, tResp: number): void { const messageId = entry.messageId; // Check if already exists - ignore per spec @@ -192,8 +185,8 @@ export class IncomingRepairBuffer { * Get repairs ready to be sent (where T_resp <= currentTime) * Removes and returns ready entries */ - public getReady(currentTime: number): RepairHistoryEntry[] { - const ready: RepairHistoryEntry[] = []; + public getReady(currentTime: number): HistoryEntry[] { + const ready: HistoryEntry[] = []; const remaining: IncomingBufferEntry[] = []; for (const item of this.items) { @@ -236,7 +229,7 @@ export class IncomingRepairBuffer { /** * Get all entries (for testing/debugging) */ - public getAll(): RepairHistoryEntry[] { + public getAll(): HistoryEntry[] { return this.items.map(item => item.entry); } diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 112e3178ef..a8f5e1698e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,12 +1,10 @@ import { Logger } from "@waku/utils"; +import type { HistoryEntry } from "../message.js"; import { Message } from "../message.js"; +import type { ILocalHistory } from "../message_channel.js"; -import { - IncomingRepairBuffer, - OutgoingRepairBuffer, - RepairHistoryEntry -} from "./buffers.js"; +import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { bigintToNumber, calculateXorDistance, @@ -57,10 +55,10 @@ export class RepairManager { constructor(participantId: ParticipantId, config: RepairConfig = {}) { this.participantId = participantId; this.config = { ...DEFAULT_REPAIR_CONFIG, ...config }; - + this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize); this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize); - + log.info(`RepairManager initialized for participant ${participantId}`); } @@ -94,7 +92,7 @@ export class RepairManager { /** * Determine if this participant is in the response group for a message - * Per spec: (hash(participant_id, message_id) % num_response_groups) == + * Per spec: (hash(participant_id, message_id) % num_response_groups) == * (hash(sender_id, message_id) % num_response_groups) */ public isInResponseGroup( @@ -112,9 +110,10 @@ export class RepairManager { return true; } - const participantGroup = combinedHash(this.participantId, messageId) % numGroups; + const participantGroup = + combinedHash(this.participantId, messageId) % numGroups; const senderGroup = combinedHash(senderId, messageId) % numGroups; - + return participantGroup === senderGroup; } @@ -123,7 +122,7 @@ export class RepairManager { * Called when causal dependencies are detected as missing */ public onMissingDependencies( - missingEntries: RepairHistoryEntry[], + missingEntries: HistoryEntry[], currentTime = Date.now() ): void { if (!this.config.enabled) { @@ -133,10 +132,10 @@ export class RepairManager { for (const entry of missingEntries) { // Calculate when to request this repair const tReq = this.calculateTReq(entry.messageId, currentTime); - + // Add to outgoing buffer this.outgoingBuffer.add(entry, tReq); - + log.info( `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` ); @@ -151,7 +150,7 @@ export class RepairManager { // Remove from both buffers as we no longer need to request or respond this.outgoingBuffer.remove(messageId); this.incomingBuffer.remove(messageId); - + log.info(`Removed ${messageId} from repair buffers after receipt`); } @@ -162,7 +161,7 @@ export class RepairManager { public getRepairRequests( maxRequests = 3, currentTime = Date.now() - ): RepairHistoryEntry[] { + ): HistoryEntry[] { if (!this.config.enabled) { return []; } @@ -175,8 +174,8 @@ export class RepairManager { * Adds to incoming buffer if we can fulfill and are in response group */ public processIncomingRepairRequests( - requests: RepairHistoryEntry[], - localHistory: Map, + requests: HistoryEntry[], + localHistory: ILocalHistory, currentTime = Date.now() ): void { if (!this.config.enabled) { @@ -186,16 +185,23 @@ export class RepairManager { for (const request of requests) { // Remove from our own outgoing buffer (someone else is requesting it) this.outgoingBuffer.remove(request.messageId); - + // Check if we have this message - if (!localHistory.has(request.messageId)) { - log.info(`Cannot fulfill repair for ${request.messageId} - not in local history`); + const message = localHistory.find( + (m) => m.messageId === request.messageId + ); + if (!message) { + log.info( + `Cannot fulfill repair for ${request.messageId} - not in local history` + ); continue; } // Check if we're in the response group if (!request.senderId) { - log.warn(`Cannot determine response group for ${request.messageId} - missing sender_id`); + log.warn( + `Cannot determine response group for ${request.messageId} - missing sender_id` + ); continue; } @@ -205,11 +211,15 @@ export class RepairManager { } // Calculate when to respond - const tResp = this.calculateTResp(request.senderId, request.messageId, currentTime); - + const tResp = this.calculateTResp( + request.senderId, + request.messageId, + currentTime + ); + // Add to incoming buffer this.incomingBuffer.add(request, tResp); - + log.info( `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` ); @@ -223,7 +233,7 @@ export class RepairManager { public sweepOutgoingBuffer( maxRequests = 3, currentTime = Date.now() - ): RepairHistoryEntry[] { + ): HistoryEntry[] { if (!this.config.enabled) { return []; } @@ -236,7 +246,7 @@ export class RepairManager { * Returns messages that should be rebroadcast */ public sweepIncomingBuffer( - localHistory: Map, + localHistory: ILocalHistory, currentTime = Date.now() ): Message[] { if (!this.config.enabled) { @@ -247,7 +257,7 @@ export class RepairManager { const messages: Message[] = []; for (const entry of ready) { - const message = localHistory.get(entry.messageId); + const message = localHistory.find((m) => m.messageId === entry.messageId); if (message) { messages.push(message); log.info(`Sending repair for ${entry.messageId}`); @@ -279,7 +289,12 @@ export class RepairManager { */ public updateResponseGroups(numParticipants: number): void { // Per spec: num_response_groups = max(1, num_participants / 128) - this.config.numResponseGroups = Math.max(1, Math.floor(numParticipants / 128)); - log.info(`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`); + this.config.numResponseGroups = Math.max( + 1, + Math.floor(numParticipants / 128) + ); + log.info( + `Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants` + ); } -} \ No newline at end of file +} diff --git a/sds.md b/sds.md index 1b1a902c03..acb683ccf2 100644 --- a/sds.md +++ b/sds.md @@ -58,6 +58,9 @@ other participants using the corresponding message ID. * **Participant ID:** Each participant has a globally unique, immutable ID visible to other participants in the communication. +* **Sender ID:** +The *Participant ID* of the original sender of a message, +often coupled with a *Message ID*. ## Wire protocol @@ -75,15 +78,20 @@ syntax = "proto3"; message HistoryEntry { string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash + + optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension } message Message { string sender_id = 1; // Participant ID of the message sender string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs - optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel + optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel + + repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension. + optional bytes content = 20; // Actual content of the message } ``` @@ -111,7 +119,11 @@ Its importance is expected to increase once a p2p retrieval mechanism is added t Each participant MUST maintain: * A Lamport timestamp for each channel of communication, -initialized to current epoch time in nanosecond resolution. +initialized to current epoch time in millisecond resolution. +The Lamport timestamp is increased as described in the [protocol steps](#protocol-steps) +to maintain a logical ordering of events while staying close to the current epoch time. +This allows the messages from new joiners to be correctly ordered with other recent messages, +without these new participants first having to synchronize past messages to discover the current Lamport timestamp. * A bloom filter for received message IDs per channel. The bloom filter SHOULD be rolled over and recomputed once it reaches a predefined capacity of message IDs. @@ -144,8 +156,11 @@ the `lamport_timestamp`, `causal_history` and `bloom_filter` fields. Before broadcasting a message: -* the participant MUST increase its local Lamport timestamp by `1` and -include this in the `lamport_timestamp` field. +* the participant MUST set its local Lamport timestamp +to the maximum between the current value + `1` +and the current epoch time in milliseconds. +In other words the local Lamport timestamp is set to `max(timeNowInMs, current_lamport_timestamp + 1)`. +* the participant MUST include the increased Lamport timestamp in the message's `lamport_timestamp` field. * the participant MUST determine the preceding few message IDs in the local history and include these in an ordered list in the `causal_history` field. The number of message IDs to include in the `causal_history` depends on the application. @@ -250,7 +265,8 @@ participants SHOULD periodically send sync messages to maintain state. These sync messages: * MUST be sent with empty content -* MUST include an incremented Lamport timestamp +* MUST include a Lamport timestamp increased to `max(timeNowInMs, current_lamport_timestamp + 1)`, +where `timeNowInMs` is the current epoch time in milliseconds. * MUST include causal history and bloom filter according to regular message rules * MUST NOT be added to the unacknowledged outgoing buffer * MUST NOT be included in causal histories of subsequent messages @@ -281,82 +297,181 @@ Upon reception, ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies or including in the local log. -## Implementation Suggestions +### SDS Repair (SDS-R) -This section provides practical guidance based on the js-waku implementation of SDS. +SDS Repair (SDS-R) is an optional extension module for SDS, +allowing participants in a communication to collectively repair any gaps in causal history (missing messages) +preferably over a limited time window. +Since SDS-R acts as coordinated rebroadcasting of missing messages, +which involves all participants of the communication, +it is most appropriate in a limited use case for repairing relatively recent missed dependencies. +It is not meant to replace mechanisms for long-term consistency, +such as peer-to-peer syncing or the use of a high-availability centralised cache (Store node). -### Default Configuration Values +#### SDS-R message fields -The js-waku implementation uses the following defaults: -- **Bloom filter capacity**: 10,000 messages -- **Bloom filter error rate**: 0.001 (0.1% false positive rate) -- **Causal history size**: 200 message IDs -- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged +SDS-R adds the following fields to SDS messages: +* `sender_id` in `HistoryEntry`: +the original message sender's participant ID. +This is used to determine the group of participants who will respond to a repair request. +* `repair_request` in `Message`: +a capped list of history entries missing for the message sender +and for which it's requesting a repair. -With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes), -each message carries 200 × 64 bytes = 12.8 KB of causal history overhead. +#### SDS-R participant state -### External Task Scheduling +SDS-R adds the following to each participant state: -The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods: +* Outgoing **repair request buffer**: +a list of locally missing `HistoryEntry`s +each mapped to a future request timestamp, `T_req`, +after which this participant will request a repair if at that point the missing dependency has not been repaired yet. +`T_req` is computed as a pseudorandom backoff from the timestamp when the dependency was detected missing. +[Determining `T_req`](#determine-t_req) is described below. +We RECOMMEND that the outgoing repair request buffer be chronologically ordered in ascending order of `T_req`. +- Incoming **repair request buffer**: +a list of locally available `HistoryEntry`s +that were requested for repair by a remote participant +AND for which this participant might be an eligible responder, +each mapped to a future response timestamp, `T_resp`, +after which this participant will rebroadcast the corresponding requested `Message` if at that point no other participant had rebroadcast the `Message`. +`T_resp` is computed as a pseudorandom backoff from the timestamp when the repair was first requested. +[Determining `T_resp`](#determine-t_resp) is described below. +We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific repair request. +- Augmented local history log: +for each message ID kept in the local log for which the participant could be a repair responder, +the full SDS `Message` must be cached rather than just the message ID, +in case this participant is called upon to rebroadcast the message. +We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific message. -- `processTasks()`: Process queued send/receive operations -- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies -- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry -- `pushOutgoingSyncMessage(callback)`: Send a sync message +**_Note:_** The required state can likely be significantly reduced in future by simply requiring that a responding participant should _reconstruct_ the original `Message` when rebroadcasting, rather than the simpler, but heavier, requirement of caching the entire received `Message` content in local history. -The implementation does not include internal timers, -allowing applications to integrate SDS with their existing scheduling infrastructure. +#### SDS-R global state -### Message Processing +For a specific channel (that is, within a specific SDS-controlled communication) +the following SDS-R configuration state SHOULD be common for all participants in the conversation: -#### Handling Missing Dependencies +* `T_min`: the _minimum_ time period to wait before a missing causal entry can be repaired. +We RECOMMEND a value of at least 30 seconds. +* `T_max`: the _maximum_ time period over which missing causal entries can be repaired. +We RECOMMEND a value of between 120 and 600 seconds. -When `sweepIncomingBuffer()` returns missing dependencies, -the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing: -- `messageId`: The missing message identifier -- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash) +Furthermore, to avoid a broadcast storm with multiple participants responding to a repair request, +participants in a single channel MAY be divided into discrete response groups. +Participants will only respond to a repair request if they are in the response group for that request. +The global `num_response_groups` variable configures the number of response groups for this communication. +Its use is described below. +A reasonable default value for `num_response_groups` is one response group for every `128` participants. +In other words, if the (roughly) expected number of participants is expressed as `num_participants`, then +`num_response_groups = num_participants div 128 + 1`. +In other words, if there are fewer than 128 participants in a communication, +they will all belong to the same response group. -#### Timeout for Lost Messages +We RECOMMEND that the global state variables `T_min`, `T_max` and `num_response_groups` be set _statically_ for a specific SDS-R application, +based on expected number of group participants and volume of traffic. -The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout. -When configured, the implementation emits an `InMessageLost` event after the timeout expires. +**_Note:_** Future versions of this protocol will recommend dynamic global SDS-R variables, based on the current number of participants. -### Events Emitted +#### SDS-R send message -The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for: -- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout) -- **Outgoing messages**: sent, acknowledged, possibly acknowledged -- **Sync messages**: sent, received -- **Errors**: task execution failures +SDS-R adds the following steps when sending a message: -### SDK Usage: ReliableChannel +Before broadcasting a message, +* the participant SHOULD populate the `repair_request` field in the message +with _eligible_ entries from the outgoing repair request buffer. +An entry is eligible to be included in a `repair_request` +if its corresponding request timestamp, `T_req`, has expired (in other words, `T_req <= current_time`). +The maximum number of repair request entries to include is up to the application. +We RECOMMEND that this quota be filled by the eligible entries from the outgoing repair request buffer with the lowest `T_req`. +We RECOMMEND a maximum of 3 entries. +If there are no eligible entries in the buffer, this optional field MUST be left unset. -The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration: +#### SDS-R receive message -#### Configuration +On receiving a message, +* the participant MUST remove entries matching the received message ID from its _outgoing_ repair request buffer. +This ensures that the participant does not request repairs for dependencies that have now been met. +* the participant MUST remove entries matching the received message ID from its _incoming_ repair request buffer. +This ensures that the participant does not respond to repair requests that another participant has already responded to. +* the participant SHOULD check for any unmet causal dependencies that do not yet have a corresponding entry in its outgoing repair request buffer. +For each such dependency, the participant SHOULD add a new entry against a unique `T_req` timestamp. +It MUST compute the `T_req` for each such HistoryEntry according to the steps outlined in [_Determine T_req_](#determine-t_req). +* for each item in the `repair_request` field: + - the participant MUST remove entries matching the repair message ID from its own outgoing repair request buffer. + This limits the number of participants that will request a common missing dependency. + - if the participant has the requested `Message` in its local history _and_ is an eligible responder for the repair request, + it SHOULD add the request to its incoming repair request buffer against a unique `T_resp` timestamp for that entry. + It MUST compute the `T_resp` for each such repair request according to the steps outlined in [_Determine T_resp_](#determine-t_resp). + It MUST determine if it's an eligible responder for a repair request according to the steps outlined in [_Determine response group_](#determine-response-group). -The ReliableChannel uses these default intervals: -- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff) -- **Retry interval**: 30 seconds for unacknowledged messages -- **Max retry attempts**: 10 attempts before giving up -- **Store query interval**: 10 seconds for missing message retrieval +#### Determine T_req -#### Task Scheduling Implementation +A participant determines the repair request timestamp, `T_req`, +for a missing `HistoryEntry` as follows: -The SDK automatically schedules SDS periodic tasks: -- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others -- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages -- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval -- **Process tasks**: Called immediately after sending/receiving messages and during sync +``` +T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min +``` -#### Integration with Waku Protocols +where `current_time` is the current timestamp, +`participant_id` is the participant's _own_ participant ID (not the `sender_id` in the missing `HistoryEntry`), +`message_id` is the missing `HistoryEntry`'s message ID, +and `T_min` and `T_max` are as set out in [SDS-R global state](#sds-r-global-state). -ReliableChannel integrates SDS with Waku: -- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes) -- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application -- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history -- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default) +This allows `T_req` to be pseudorandomly and linearly distributed as a backoff of between `T_min` and `T_max` from current time. + +> **_Note:_** placing `T_req` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. + +#### Determine T_resp + +A participant determines the repair response timestamp, `T_resp`, +for a `HistoryEntry` that it could repair as follows: + +``` +distance = hash(participant_id) XOR hash(sender_id) +T_resp = current_time + distance*hash(message_id) % T_max +``` + +where `current_time` is the current timestamp, +`participant_id` is the participant's _own_ (local) participant ID, +`sender_id` is the requested `HistoryEntry` sender ID, +`message_id` is the requested `HistoryEntry` message ID, +and `T_max` is as set out in [SDS-R global state](#sds-r-global-state). + +We first calculate the logical `distance` between the local `participant_id` and the original `sender_id`. +If this participant is the original sender, the `distance` will be `0`. +It should then be clear that the original participant will have a response backoff time of `0`, making it the most likely responder. +The `T_resp` values for other eligible participants will be pseudorandomly and linearly distributed as a backoff of up to `T_max` from current time. + +> **_Note:_** placing `T_resp` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement. + +#### Determine response group + +Given a message with `sender_id` and `message_id`, +a participant with `participant_id` is in the response group for that message if + +``` +hash(participant_id, message_id) % num_response_groups == hash(sender_id, message_id) % num_response_groups +``` + +where `num_response_groups` is as set out in [SDS-R global state](#sds-r-global-state). +This ensures that a participant will always be in the response group for its own published messages. +It also allows participants to determine immediately on first reception of a message or a history entry +if they are in the associated response group. + +#### SDS-R incoming repair request buffer sweep + +An SDS-R participant MUST periodically check if there are any incoming requests in the *incoming repair request buffer* that is due for a response. +For each item in the buffer, +the participant SHOULD broadcast the corresponding `Message` from local history +if its corresponding response timestamp, `T_resp`, has expired (in other words, `T_resp <= current_time`). + +#### SDS-R Periodic Sync Message + +If the participant is due to send a periodic sync message, +it SHOULD send the message according to [SDS-R send message](#sds-r-send-message) +if there are any eligible items in the outgoing repair request buffer, +regardless of whether other participants have also recently broadcast a Periodic Sync message. ## Copyright