feat: integrate sds-r with message channels

This commit is contained in:
jm-clius 2025-10-03 16:44:08 +01:00
parent 942e040def
commit c2a1f4ca82
No known key found for this signature in database
GPG Key ID: 5FCD9D5211B952DA
12 changed files with 434 additions and 174 deletions

View File

@ -13,6 +13,7 @@ import type { Uint8ArrayList } from 'uint8arraylist'
export interface HistoryEntry {
messageId: string
retrievalHint?: Uint8Array
senderId?: string
}
export namespace HistoryEntry {
@ -35,6 +36,11 @@ export namespace HistoryEntry {
w.bytes(obj.retrievalHint)
}
if (obj.senderId != null) {
w.uint32(26)
w.string(obj.senderId)
}
if (opts.lengthDelimited !== false) {
w.ldelim()
}
@ -57,6 +63,10 @@ export namespace HistoryEntry {
obj.retrievalHint = reader.bytes()
break
}
case 3: {
obj.senderId = reader.string()
break
}
default: {
reader.skipType(tag & 7)
break
@ -87,6 +97,7 @@ export interface SdsMessage {
lamportTimestamp?: bigint
causalHistory: HistoryEntry[]
bloomFilter?: Uint8Array
repairRequest: HistoryEntry[]
content?: Uint8Array
}
@ -132,6 +143,13 @@ export namespace SdsMessage {
w.bytes(obj.bloomFilter)
}
if (obj.repairRequest != null) {
for (const value of obj.repairRequest) {
w.uint32(106)
HistoryEntry.codec().encode(value, w)
}
}
if (obj.content != null) {
w.uint32(162)
w.bytes(obj.content)
@ -145,7 +163,8 @@ export namespace SdsMessage {
senderId: '',
messageId: '',
channelId: '',
causalHistory: []
causalHistory: [],
repairRequest: []
}
const end = length == null ? reader.len : reader.pos + length
@ -184,6 +203,16 @@ export namespace SdsMessage {
obj.bloomFilter = reader.bytes()
break
}
case 13: {
if (opts.limits?.repairRequest != null && obj.repairRequest.length === opts.limits.repairRequest) {
throw new MaxLengthError('Decode error - map field "repairRequest" had too many elements')
}
obj.repairRequest.push(HistoryEntry.codec().decode(reader, reader.uint32(), {
limits: opts.limits?.repairRequest$
}))
break
}
case 20: {
obj.content = reader.bytes()
break

View File

@ -3,6 +3,8 @@ syntax = "proto3";
message HistoryEntry {
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash
optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension
}
message SdsMessage {
@ -12,5 +14,8 @@ message SdsMessage {
optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension.
optional bytes content = 20; // Actual content of the message
}

View File

@ -14,7 +14,7 @@ export {
type HistoryEntry,
type ChannelId,
type MessageChannelEvents,
type SenderId,
type ParticipantId,
type MessageId
} from "./message_channel/index.js";

View File

@ -8,7 +8,7 @@ export {
HistoryEntry,
Message,
MessageId,
SenderId,
ParticipantId,
SyncMessage,
isContentMessage,
isEphemeralMessage,

View File

@ -44,6 +44,7 @@ describe("Message serialization", () => {
[{ messageId: depMessageId, retrievalHint: depRetrievalHint }],
0n,
undefined,
undefined,
undefined
);
@ -54,6 +55,29 @@ describe("Message serialization", () => {
{ messageId: depMessageId, retrievalHint: depRetrievalHint }
]);
});
it("Repair Request", () => {
const repairMessageId = "missing-message";
const repairRetrievalHint = utf8ToBytes("missing-retrieval");
const repairSenderId = "original-sender";
const message = new Message(
"123",
"my-channel",
"me",
[],
0n,
undefined,
undefined,
[{ messageId: repairMessageId, retrievalHint: repairRetrievalHint, senderId: repairSenderId }]
);
const bytes = message.encode();
const decMessage = Message.decode(bytes);
expect(decMessage!.repairRequest).to.deep.equal([
{ messageId: repairMessageId, retrievalHint: repairRetrievalHint, senderId: repairSenderId }
]);
});
});
describe("ContentMessage comparison with < operator", () => {

View File

@ -4,7 +4,7 @@ import { Logger } from "@waku/utils";
export type MessageId = string;
export type HistoryEntry = proto_sds_message.HistoryEntry;
export type ChannelId = string;
export type SenderId = string;
export type ParticipantId = string;
const log = new Logger("sds:message");
@ -17,6 +17,7 @@ export class Message implements proto_sds_message.SdsMessage {
public lamportTimestamp?: bigint | undefined,
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
public content?: Uint8Array<ArrayBufferLike> | undefined,
public repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
@ -38,7 +39,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
lamportTimestamp,
bloomFilter,
content
content,
repairRequest
} = proto_sds_message.SdsMessage.decode(data);
if (testContentMessage({ lamportTimestamp, content })) {
@ -49,7 +51,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
lamportTimestamp!,
bloomFilter,
content!
content!,
repairRequest
);
}
@ -61,7 +64,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
undefined,
bloomFilter,
content!
content!,
repairRequest
);
}
@ -73,7 +77,8 @@ export class Message implements proto_sds_message.SdsMessage {
causalHistory,
lamportTimestamp!,
bloomFilter,
undefined
undefined,
repairRequest
);
}
log.error(
@ -97,6 +102,7 @@ export class SyncMessage extends Message {
public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: undefined,
public override repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
@ -110,6 +116,7 @@ export class SyncMessage extends Message {
lamportTimestamp,
bloomFilter,
content,
repairRequest,
retrievalHint
);
}
@ -141,6 +148,7 @@ export class EphemeralMessage extends Message {
public lamportTimestamp: undefined,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
public override repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
@ -157,6 +165,7 @@ export class EphemeralMessage extends Message {
lamportTimestamp,
bloomFilter,
content,
repairRequest,
retrievalHint
);
}
@ -189,6 +198,7 @@ export class ContentMessage extends Message {
public lamportTimestamp: bigint,
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
public content: Uint8Array<ArrayBufferLike>,
public override repairRequest: proto_sds_message.HistoryEntry[] = [],
/**
* Not encoded, set after it is sent, used to include in follow-up messages
*/
@ -205,6 +215,7 @@ export class ContentMessage extends Message {
lamportTimestamp,
bloomFilter,
content,
repairRequest,
retrievalHint
);
}

View File

@ -162,7 +162,8 @@ describe("MessageChannel", function () {
.slice(-causalHistorySize - 1, -1)
.map((message) => ({
messageId: MessageChannel.getMessageId(utf8ToBytes(message)),
retrievalHint: undefined
retrievalHint: undefined,
senderId: "alice"
}));
expect(causalHistory).to.deep.equal(expectedCausalHistory);
});
@ -298,6 +299,7 @@ describe("MessageChannel", function () {
1n,
undefined,
payload,
undefined,
testRetrievalHint
),
testRetrievalHint

View File

@ -18,9 +18,10 @@ import {
isSyncMessage,
Message,
MessageId,
SenderId,
ParticipantId,
SyncMessage
} from "./message.js";
import { RepairConfig, RepairManager } from "./repair/repair.js";
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
capacity: 10000,
@ -46,6 +47,10 @@ export interface MessageChannelOptions {
* How many possible acks does it take to consider it a definitive ack.
*/
possibleAcksThreshold?: number;
/**
* SDS-R repair configuration. If not provided, repair is enabled with default settings.
*/
repairConfig?: RepairConfig;
}
export type ILocalHistory = Pick<
@ -55,7 +60,7 @@ export type ILocalHistory = Pick<
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
public readonly senderId: SenderId;
public readonly senderId: ParticipantId;
private lamportTimestamp: bigint;
private filter: DefaultBloomFilter;
private outgoingBuffer: ContentMessage[];
@ -66,6 +71,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private readonly causalHistorySize: number;
private readonly possibleAcksThreshold: number;
private readonly timeoutForLostMessagesMs?: number;
private readonly repairManager: RepairManager;
private tasks: Task[] = [];
private handlers: Handlers = {
@ -88,7 +94,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public constructor(
channelId: ChannelId,
senderId: SenderId,
senderId: ParticipantId,
options: MessageChannelOptions = {},
localHistory: ILocalHistory = new MemLocalHistory()
) {
@ -109,6 +115,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
this.timeReceived = new Map();
this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs;
this.repairManager = new RepairManager(senderId, options.repairConfig);
}
public static getMessageId(payload: Uint8Array): MessageId {
@ -355,6 +362,38 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
);
}
/**
* Sweep repair incoming buffer and rebroadcast messages ready for repair.
* Per SDS-R spec: periodically check for repair responses that are due.
*
* @param callback - callback to rebroadcast the message
* @returns Promise that resolves when all ready repairs have been sent
*/
public async sweepRepairIncomingBuffer(
callback?: (message: Message) => Promise<boolean>
): Promise<Message[]> {
const repairsToSend = this.repairManager.sweepIncomingBuffer(
this.localHistory
);
if (callback) {
for (const message of repairsToSend) {
try {
await callback(message);
log.info(
this.senderId,
"repair message rebroadcast",
message.messageId
);
} catch (error) {
log.error("Failed to rebroadcast repair message:", error);
}
}
}
return repairsToSend;
}
/**
* Send a sync message to the SDS channel.
*
@ -369,6 +408,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
callback?: (message: SyncMessage) => Promise<boolean>
): Promise<boolean> {
this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
// Get repair requests to include in sync message (SDS-R)
const repairRequests = this.repairManager.getRepairRequests(3);
const message = new SyncMessage(
// does not need to be secure randomness
`sync-${Math.random().toString(36).substring(2)}`,
@ -376,18 +419,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.senderId,
this.localHistory
.slice(-this.causalHistorySize)
.map(({ messageId, retrievalHint }) => {
return { messageId, retrievalHint };
.map(({ messageId, retrievalHint, senderId }) => {
return { messageId, retrievalHint, senderId };
}),
this.lamportTimestamp,
this.filter.toBytes(),
undefined
undefined,
repairRequests
);
if (!message.causalHistory || message.causalHistory.length === 0) {
if (
(!message.causalHistory || message.causalHistory.length === 0) &&
repairRequests.length === 0
) {
log.info(
this.senderId,
"no causal history in sync message, aborting sending"
"no causal history and no repair requests in sync message, aborting sending"
);
return false;
}
@ -464,6 +511,18 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
detail: message
});
}
// SDS-R: Handle received message in repair manager
this.repairManager.onMessageReceived(message.messageId);
// SDS-R: Process incoming repair requests
if (message.repairRequest && message.repairRequest.length > 0) {
this.repairManager.processIncomingRepairRequests(
message.repairRequest,
this.localHistory
);
}
this.reviewAckStatus(message);
if (isContentMessage(message)) {
this.filter.insert(message.messageId);
@ -487,6 +546,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
missingDependencies.map((ch) => ch.messageId)
);
// SDS-R: Track missing dependencies in repair manager
this.repairManager.onMissingDependencies(missingDependencies);
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
detail: Array.from(missingDependencies)
});
@ -549,18 +611,23 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// It's a new message
if (!message) {
log.info(this.senderId, "sending new message", messageId);
// Get repair requests to include in the message (SDS-R)
const repairRequests = this.repairManager.getRepairRequests(3);
message = new ContentMessage(
messageId,
this.channelId,
this.senderId,
this.localHistory
.slice(-this.causalHistorySize)
.map(({ messageId, retrievalHint }) => {
return { messageId, retrievalHint };
.map(({ messageId, retrievalHint, senderId }) => {
return { messageId, retrievalHint, senderId };
}),
this.lamportTimestamp,
this.filter.toBytes(),
payload
payload,
repairRequests
);
this.outgoingBuffer.push(message);
@ -657,6 +724,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
this.localHistory.push(message);
return true;
}

View File

@ -1,10 +1,8 @@
import { expect } from "chai";
import {
IncomingRepairBuffer,
OutgoingRepairBuffer,
RepairHistoryEntry
} from "./buffers.js";
import type { HistoryEntry } from "../message.js";
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
describe("OutgoingRepairBuffer", () => {
let buffer: OutgoingRepairBuffer;
@ -14,9 +12,9 @@ describe("OutgoingRepairBuffer", () => {
});
it("should add entries and maintain sorted order", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
@ -30,21 +28,21 @@ describe("OutgoingRepairBuffer", () => {
});
it("should not update T_req if message already exists", () => {
const entry: RepairHistoryEntry = { messageId: "msg1" };
const entry: HistoryEntry = { messageId: "msg1" };
buffer.add(entry, 1000);
buffer.add(entry, 2000); // Try to add again with different T_req
const items = buffer.getItems();
expect(items).to.have.lengthOf(1);
expect(items[0].tReq).to.equal(1000); // Should keep original
});
it("should evict oldest entry when buffer is full", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry4: RepairHistoryEntry = { messageId: "msg4" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
const entry4: HistoryEntry = { messageId: "msg4" };
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
@ -60,9 +58,9 @@ describe("OutgoingRepairBuffer", () => {
});
it("should get eligible entries based on current time", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
@ -79,9 +77,9 @@ describe("OutgoingRepairBuffer", () => {
});
it("should respect maxRequests limit", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
@ -94,12 +92,12 @@ describe("OutgoingRepairBuffer", () => {
});
it("should remove entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
expect(buffer.size).to.equal(2);
buffer.remove("msg1");
expect(buffer.size).to.equal(1);
@ -109,7 +107,7 @@ describe("OutgoingRepairBuffer", () => {
it("should handle retrieval hint and sender_id", () => {
const hint = new Uint8Array([1, 2, 3]);
const entry: RepairHistoryEntry = {
const entry: HistoryEntry = {
messageId: "msg1",
retrievalHint: hint,
senderId: "sender1"
@ -130,9 +128,9 @@ describe("IncomingRepairBuffer", () => {
});
it("should add entries and maintain sorted order", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
@ -146,21 +144,21 @@ describe("IncomingRepairBuffer", () => {
});
it("should ignore duplicate entries", () => {
const entry: RepairHistoryEntry = { messageId: "msg1" };
const entry: HistoryEntry = { messageId: "msg1" };
buffer.add(entry, 1000);
buffer.add(entry, 500); // Try to add again with earlier T_resp
const items = buffer.getItems();
expect(items).to.have.lengthOf(1);
expect(items[0].tResp).to.equal(1000); // Should keep original
});
it("should evict furthest entry when buffer is full", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry4: RepairHistoryEntry = { messageId: "msg4" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
const entry4: HistoryEntry = { messageId: "msg4" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
@ -176,9 +174,9 @@ describe("IncomingRepairBuffer", () => {
});
it("should get and remove ready entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry3: RepairHistoryEntry = { messageId: "msg3" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
@ -187,7 +185,7 @@ describe("IncomingRepairBuffer", () => {
const ready = buffer.getReady(1500);
expect(ready).to.have.lengthOf(1);
expect(ready[0].messageId).to.equal("msg1");
// Entry should be removed from buffer
expect(buffer.size).to.equal(2);
expect(buffer.has("msg1")).to.be.false;
@ -195,19 +193,19 @@ describe("IncomingRepairBuffer", () => {
const ready2 = buffer.getReady(2500);
expect(ready2).to.have.lengthOf(1);
expect(ready2[0].messageId).to.equal("msg2");
expect(buffer.size).to.equal(1);
expect(buffer.has("msg2")).to.be.false;
expect(buffer.has("msg3")).to.be.true;
});
it("should remove entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
expect(buffer.size).to.equal(2);
buffer.remove("msg1");
expect(buffer.size).to.equal(1);
@ -216,14 +214,14 @@ describe("IncomingRepairBuffer", () => {
});
it("should clear all entries", () => {
const entry1: RepairHistoryEntry = { messageId: "msg1" };
const entry2: RepairHistoryEntry = { messageId: "msg2" };
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
buffer.add(entry1, 1000);
buffer.add(entry2, 2000);
expect(buffer.size).to.equal(2);
buffer.clear();
expect(buffer.size).to.equal(0);
});
});
});

View File

@ -1,22 +1,15 @@
import { Logger } from "@waku/utils";
import _ from "lodash";
const log = new Logger("sds:repair:buffers");
import type { HistoryEntry } from "../message.js";
/**
* Extended HistoryEntry that includes sender_id for SDS-R
*/
export interface RepairHistoryEntry {
messageId: string;
retrievalHint?: Uint8Array;
senderId?: string; // Original sender's ID for repair calculations
}
const log = new Logger("sds:repair:buffers");
/**
* Entry in the outgoing repair buffer with request timing
*/
interface OutgoingBufferEntry {
entry: RepairHistoryEntry;
entry: HistoryEntry;
tReq: number; // Timestamp when this repair request should be sent
}
@ -24,7 +17,7 @@ interface OutgoingBufferEntry {
* Entry in the incoming repair buffer with response timing
*/
interface IncomingBufferEntry {
entry: RepairHistoryEntry;
entry: HistoryEntry;
tResp: number; // Timestamp when we should respond with this repair
}
@ -45,7 +38,7 @@ export class OutgoingRepairBuffer {
* Add a missing message to the outgoing repair request buffer
* If message already exists, it is not updated (keeps original T_req)
*/
public add(entry: RepairHistoryEntry, tReq: number): void {
public add(entry: HistoryEntry, tReq: number): void {
const messageId = entry.messageId;
// Check if already exists - do NOT update T_req per spec
@ -84,8 +77,8 @@ export class OutgoingRepairBuffer {
* Get eligible repair requests (where T_req <= currentTime)
* Returns up to maxRequests entries from the front of the sorted array
*/
public getEligible(currentTime: number, maxRequests = 3): RepairHistoryEntry[] {
const eligible: RepairHistoryEntry[] = [];
public getEligible(currentTime: number, maxRequests = 3): HistoryEntry[] {
const eligible: HistoryEntry[] = [];
// Iterate from front of sorted array (earliest T_req first)
for (const item of this.items) {
@ -124,7 +117,7 @@ export class OutgoingRepairBuffer {
/**
* Get all entries (for testing/debugging)
*/
public getAll(): RepairHistoryEntry[] {
public getAll(): HistoryEntry[] {
return this.items.map(item => item.entry);
}
@ -153,7 +146,7 @@ export class IncomingRepairBuffer {
* Add a repair request that we can fulfill
* If message already exists, it is ignored (not updated)
*/
public add(entry: RepairHistoryEntry, tResp: number): void {
public add(entry: HistoryEntry, tResp: number): void {
const messageId = entry.messageId;
// Check if already exists - ignore per spec
@ -192,8 +185,8 @@ export class IncomingRepairBuffer {
* Get repairs ready to be sent (where T_resp <= currentTime)
* Removes and returns ready entries
*/
public getReady(currentTime: number): RepairHistoryEntry[] {
const ready: RepairHistoryEntry[] = [];
public getReady(currentTime: number): HistoryEntry[] {
const ready: HistoryEntry[] = [];
const remaining: IncomingBufferEntry[] = [];
for (const item of this.items) {
@ -236,7 +229,7 @@ export class IncomingRepairBuffer {
/**
* Get all entries (for testing/debugging)
*/
public getAll(): RepairHistoryEntry[] {
public getAll(): HistoryEntry[] {
return this.items.map(item => item.entry);
}

View File

@ -1,12 +1,10 @@
import { Logger } from "@waku/utils";
import type { HistoryEntry } from "../message.js";
import { Message } from "../message.js";
import type { ILocalHistory } from "../message_channel.js";
import {
IncomingRepairBuffer,
OutgoingRepairBuffer,
RepairHistoryEntry
} from "./buffers.js";
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
import {
bigintToNumber,
calculateXorDistance,
@ -57,10 +55,10 @@ export class RepairManager {
constructor(participantId: ParticipantId, config: RepairConfig = {}) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
log.info(`RepairManager initialized for participant ${participantId}`);
}
@ -94,7 +92,7 @@ export class RepairManager {
/**
* Determine if this participant is in the response group for a message
* Per spec: (hash(participant_id, message_id) % num_response_groups) ==
* Per spec: (hash(participant_id, message_id) % num_response_groups) ==
* (hash(sender_id, message_id) % num_response_groups)
*/
public isInResponseGroup(
@ -112,9 +110,10 @@ export class RepairManager {
return true;
}
const participantGroup = combinedHash(this.participantId, messageId) % numGroups;
const participantGroup =
combinedHash(this.participantId, messageId) % numGroups;
const senderGroup = combinedHash(senderId, messageId) % numGroups;
return participantGroup === senderGroup;
}
@ -123,7 +122,7 @@ export class RepairManager {
* Called when causal dependencies are detected as missing
*/
public onMissingDependencies(
missingEntries: RepairHistoryEntry[],
missingEntries: HistoryEntry[],
currentTime = Date.now()
): void {
if (!this.config.enabled) {
@ -133,10 +132,10 @@ export class RepairManager {
for (const entry of missingEntries) {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);
// Add to outgoing buffer
this.outgoingBuffer.add(entry, tReq);
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
@ -151,7 +150,7 @@ export class RepairManager {
// Remove from both buffers as we no longer need to request or respond
this.outgoingBuffer.remove(messageId);
this.incomingBuffer.remove(messageId);
log.info(`Removed ${messageId} from repair buffers after receipt`);
}
@ -162,7 +161,7 @@ export class RepairManager {
public getRepairRequests(
maxRequests = 3,
currentTime = Date.now()
): RepairHistoryEntry[] {
): HistoryEntry[] {
if (!this.config.enabled) {
return [];
}
@ -175,8 +174,8 @@ export class RepairManager {
* Adds to incoming buffer if we can fulfill and are in response group
*/
public processIncomingRepairRequests(
requests: RepairHistoryEntry[],
localHistory: Map<string, Message>,
requests: HistoryEntry[],
localHistory: ILocalHistory,
currentTime = Date.now()
): void {
if (!this.config.enabled) {
@ -186,16 +185,23 @@ export class RepairManager {
for (const request of requests) {
// Remove from our own outgoing buffer (someone else is requesting it)
this.outgoingBuffer.remove(request.messageId);
// Check if we have this message
if (!localHistory.has(request.messageId)) {
log.info(`Cannot fulfill repair for ${request.messageId} - not in local history`);
const message = localHistory.find(
(m) => m.messageId === request.messageId
);
if (!message) {
log.info(
`Cannot fulfill repair for ${request.messageId} - not in local history`
);
continue;
}
// Check if we're in the response group
if (!request.senderId) {
log.warn(`Cannot determine response group for ${request.messageId} - missing sender_id`);
log.warn(
`Cannot determine response group for ${request.messageId} - missing sender_id`
);
continue;
}
@ -205,11 +211,15 @@ export class RepairManager {
}
// Calculate when to respond
const tResp = this.calculateTResp(request.senderId, request.messageId, currentTime);
const tResp = this.calculateTResp(
request.senderId,
request.messageId,
currentTime
);
// Add to incoming buffer
this.incomingBuffer.add(request, tResp);
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
@ -223,7 +233,7 @@ export class RepairManager {
public sweepOutgoingBuffer(
maxRequests = 3,
currentTime = Date.now()
): RepairHistoryEntry[] {
): HistoryEntry[] {
if (!this.config.enabled) {
return [];
}
@ -236,7 +246,7 @@ export class RepairManager {
* Returns messages that should be rebroadcast
*/
public sweepIncomingBuffer(
localHistory: Map<string, Message>,
localHistory: ILocalHistory,
currentTime = Date.now()
): Message[] {
if (!this.config.enabled) {
@ -247,7 +257,7 @@ export class RepairManager {
const messages: Message[] = [];
for (const entry of ready) {
const message = localHistory.get(entry.messageId);
const message = localHistory.find((m) => m.messageId === entry.messageId);
if (message) {
messages.push(message);
log.info(`Sending repair for ${entry.messageId}`);
@ -279,7 +289,12 @@ export class RepairManager {
*/
public updateResponseGroups(numParticipants: number): void {
// Per spec: num_response_groups = max(1, num_participants / 128)
this.config.numResponseGroups = Math.max(1, Math.floor(numParticipants / 128));
log.info(`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`);
this.config.numResponseGroups = Math.max(
1,
Math.floor(numParticipants / 128)
);
log.info(
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
);
}
}
}

231
sds.md
View File

@ -58,6 +58,9 @@ other participants using the corresponding message ID.
* **Participant ID:**
Each participant has a globally unique, immutable ID
visible to other participants in the communication.
* **Sender ID:**
The *Participant ID* of the original sender of a message,
often coupled with a *Message ID*.
## Wire protocol
@ -75,15 +78,20 @@ syntax = "proto3";
message HistoryEntry {
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash
optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension
}
message Message {
string sender_id = 1; // Participant ID of the message sender
string message_id = 2; // Unique identifier of the message
string channel_id = 3; // Identifier of the channel to which the message belongs
optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension.
optional bytes content = 20; // Actual content of the message
}
```
@ -111,7 +119,11 @@ Its importance is expected to increase once a p2p retrieval mechanism is added t
Each participant MUST maintain:
* A Lamport timestamp for each channel of communication,
initialized to current epoch time in nanosecond resolution.
initialized to current epoch time in millisecond resolution.
The Lamport timestamp is increased as described in the [protocol steps](#protocol-steps)
to maintain a logical ordering of events while staying close to the current epoch time.
This allows the messages from new joiners to be correctly ordered with other recent messages,
without these new participants first having to synchronize past messages to discover the current Lamport timestamp.
* A bloom filter for received message IDs per channel.
The bloom filter SHOULD be rolled over and
recomputed once it reaches a predefined capacity of message IDs.
@ -144,8 +156,11 @@ the `lamport_timestamp`, `causal_history` and `bloom_filter` fields.
Before broadcasting a message:
* the participant MUST increase its local Lamport timestamp by `1` and
include this in the `lamport_timestamp` field.
* the participant MUST set its local Lamport timestamp
to the maximum between the current value + `1`
and the current epoch time in milliseconds.
In other words the local Lamport timestamp is set to `max(timeNowInMs, current_lamport_timestamp + 1)`.
* the participant MUST include the increased Lamport timestamp in the message's `lamport_timestamp` field.
* the participant MUST determine the preceding few message IDs in the local history
and include these in an ordered list in the `causal_history` field.
The number of message IDs to include in the `causal_history` depends on the application.
@ -250,7 +265,8 @@ participants SHOULD periodically send sync messages to maintain state.
These sync messages:
* MUST be sent with empty content
* MUST include an incremented Lamport timestamp
* MUST include a Lamport timestamp increased to `max(timeNowInMs, current_lamport_timestamp + 1)`,
where `timeNowInMs` is the current epoch time in milliseconds.
* MUST include causal history and bloom filter according to regular message rules
* MUST NOT be added to the unacknowledged outgoing buffer
* MUST NOT be included in causal histories of subsequent messages
@ -281,82 +297,181 @@ Upon reception,
ephemeral messages SHOULD be delivered immediately without buffering for causal dependencies
or including in the local log.
## Implementation Suggestions
### SDS Repair (SDS-R)
This section provides practical guidance based on the js-waku implementation of SDS.
SDS Repair (SDS-R) is an optional extension module for SDS,
allowing participants in a communication to collectively repair any gaps in causal history (missing messages)
preferably over a limited time window.
Since SDS-R acts as coordinated rebroadcasting of missing messages,
which involves all participants of the communication,
it is most appropriate in a limited use case for repairing relatively recent missed dependencies.
It is not meant to replace mechanisms for long-term consistency,
such as peer-to-peer syncing or the use of a high-availability centralised cache (Store node).
### Default Configuration Values
#### SDS-R message fields
The js-waku implementation uses the following defaults:
- **Bloom filter capacity**: 10,000 messages
- **Bloom filter error rate**: 0.001 (0.1% false positive rate)
- **Causal history size**: 200 message IDs
- **Possible ACKs threshold**: 2 bloom filter hits before considering a message acknowledged
SDS-R adds the following fields to SDS messages:
* `sender_id` in `HistoryEntry`:
the original message sender's participant ID.
This is used to determine the group of participants who will respond to a repair request.
* `repair_request` in `Message`:
a capped list of history entries missing for the message sender
and for which it's requesting a repair.
With 200 messages in causal history, assuming 32-byte message IDs and 32-byte retrieval hints (e.g., Waku message hashes),
each message carries 200 × 64 bytes = 12.8 KB of causal history overhead.
#### SDS-R participant state
### External Task Scheduling
SDS-R adds the following to each participant state:
The js-waku implementation delegates periodic task scheduling to the library consumer by providing methods:
* Outgoing **repair request buffer**:
a list of locally missing `HistoryEntry`s
each mapped to a future request timestamp, `T_req`,
after which this participant will request a repair if at that point the missing dependency has not been repaired yet.
`T_req` is computed as a pseudorandom backoff from the timestamp when the dependency was detected missing.
[Determining `T_req`](#determine-t_req) is described below.
We RECOMMEND that the outgoing repair request buffer be chronologically ordered in ascending order of `T_req`.
- Incoming **repair request buffer**:
a list of locally available `HistoryEntry`s
that were requested for repair by a remote participant
AND for which this participant might be an eligible responder,
each mapped to a future response timestamp, `T_resp`,
after which this participant will rebroadcast the corresponding requested `Message` if at that point no other participant had rebroadcast the `Message`.
`T_resp` is computed as a pseudorandom backoff from the timestamp when the repair was first requested.
[Determining `T_resp`](#determine-t_resp) is described below.
We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific repair request.
- Augmented local history log:
for each message ID kept in the local log for which the participant could be a repair responder,
the full SDS `Message` must be cached rather than just the message ID,
in case this participant is called upon to rebroadcast the message.
We describe below how a participant can [determine if they're an eligible responder](#determine-response-group) for a specific message.
- `processTasks()`: Process queued send/receive operations
- `sweepIncomingBuffer()`: Check and deliver messages with met dependencies, returns missing dependencies
- `sweepOutgoingBuffer()`: Return unacknowledged and possibly acknowledged messages for retry
- `pushOutgoingSyncMessage(callback)`: Send a sync message
**_Note:_** The required state can likely be significantly reduced in future by simply requiring that a responding participant should _reconstruct_ the original `Message` when rebroadcasting, rather than the simpler, but heavier, requirement of caching the entire received `Message` content in local history.
The implementation does not include internal timers,
allowing applications to integrate SDS with their existing scheduling infrastructure.
#### SDS-R global state
### Message Processing
For a specific channel (that is, within a specific SDS-controlled communication)
the following SDS-R configuration state SHOULD be common for all participants in the conversation:
#### Handling Missing Dependencies
* `T_min`: the _minimum_ time period to wait before a missing causal entry can be repaired.
We RECOMMEND a value of at least 30 seconds.
* `T_max`: the _maximum_ time period over which missing causal entries can be repaired.
We RECOMMEND a value of between 120 and 600 seconds.
When `sweepIncomingBuffer()` returns missing dependencies,
the implementation emits an `InMessageMissing` event with `HistoryEntry[]` containing:
- `messageId`: The missing message identifier
- `retrievalHint`: Optional bytes to help retrieve the message (e.g., transport-specific hash)
Furthermore, to avoid a broadcast storm with multiple participants responding to a repair request,
participants in a single channel MAY be divided into discrete response groups.
Participants will only respond to a repair request if they are in the response group for that request.
The global `num_response_groups` variable configures the number of response groups for this communication.
Its use is described below.
A reasonable default value for `num_response_groups` is one response group for every `128` participants.
In other words, if the (roughly) expected number of participants is expressed as `num_participants`, then
`num_response_groups = num_participants div 128 + 1`.
In other words, if there are fewer than 128 participants in a communication,
they will all belong to the same response group.
#### Timeout for Lost Messages
We RECOMMEND that the global state variables `T_min`, `T_max` and `num_response_groups` be set _statically_ for a specific SDS-R application,
based on expected number of group participants and volume of traffic.
The `timeoutForLostMessagesMs` option allows marking messages as irretrievably lost after a timeout.
When configured, the implementation emits an `InMessageLost` event after the timeout expires.
**_Note:_** Future versions of this protocol will recommend dynamic global SDS-R variables, based on the current number of participants.
### Events Emitted
#### SDS-R send message
The js-waku implementation uses a `TypedEventEmitter` pattern to emit events for:
- **Incoming messages**: received, delivered, missing dependencies, lost (after timeout)
- **Outgoing messages**: sent, acknowledged, possibly acknowledged
- **Sync messages**: sent, received
- **Errors**: task execution failures
SDS-R adds the following steps when sending a message:
### SDK Usage: ReliableChannel
Before broadcasting a message,
* the participant SHOULD populate the `repair_request` field in the message
with _eligible_ entries from the outgoing repair request buffer.
An entry is eligible to be included in a `repair_request`
if its corresponding request timestamp, `T_req`, has expired (in other words, `T_req <= current_time`).
The maximum number of repair request entries to include is up to the application.
We RECOMMEND that this quota be filled by the eligible entries from the outgoing repair request buffer with the lowest `T_req`.
We RECOMMEND a maximum of 3 entries.
If there are no eligible entries in the buffer, this optional field MUST be left unset.
The SDK provides a high-level `ReliableChannel` abstraction that wraps the core SDS `MessageChannel` with automatic task scheduling and Waku protocol integration:
#### SDS-R receive message
#### Configuration
On receiving a message,
* the participant MUST remove entries matching the received message ID from its _outgoing_ repair request buffer.
This ensures that the participant does not request repairs for dependencies that have now been met.
* the participant MUST remove entries matching the received message ID from its _incoming_ repair request buffer.
This ensures that the participant does not respond to repair requests that another participant has already responded to.
* the participant SHOULD check for any unmet causal dependencies that do not yet have a corresponding entry in its outgoing repair request buffer.
For each such dependency, the participant SHOULD add a new entry against a unique `T_req` timestamp.
It MUST compute the `T_req` for each such HistoryEntry according to the steps outlined in [_Determine T_req_](#determine-t_req).
* for each item in the `repair_request` field:
- the participant MUST remove entries matching the repair message ID from its own outgoing repair request buffer.
This limits the number of participants that will request a common missing dependency.
- if the participant has the requested `Message` in its local history _and_ is an eligible responder for the repair request,
it SHOULD add the request to its incoming repair request buffer against a unique `T_resp` timestamp for that entry.
It MUST compute the `T_resp` for each such repair request according to the steps outlined in [_Determine T_resp_](#determine-t_resp).
It MUST determine if it's an eligible responder for a repair request according to the steps outlined in [_Determine response group_](#determine-response-group).
The ReliableChannel uses these default intervals:
- **Sync message interval**: 30 seconds minimum between sync messages (randomized backoff)
- **Retry interval**: 30 seconds for unacknowledged messages
- **Max retry attempts**: 10 attempts before giving up
- **Store query interval**: 10 seconds for missing message retrieval
#### Determine T_req
#### Task Scheduling Implementation
A participant determines the repair request timestamp, `T_req`,
for a missing `HistoryEntry` as follows:
The SDK automatically schedules SDS periodic tasks:
- **Sync messages**: Uses exponential backoff with randomization; sent faster (0.5x multiplier) after receiving content to acknowledge others
- **Outgoing buffer sweeps**: Triggered after each retry interval for unacknowledged messages
- **Incoming buffer sweeps**: Performed after each incoming message and during missing message retrieval
- **Process tasks**: Called immediately after sending/receiving messages and during sync
```
T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min
```
#### Integration with Waku Protocols
where `current_time` is the current timestamp,
`participant_id` is the participant's _own_ participant ID (not the `sender_id` in the missing `HistoryEntry`),
`message_id` is the missing `HistoryEntry`'s message ID,
and `T_min` and `T_max` are as set out in [SDS-R global state](#sds-r-global-state).
ReliableChannel integrates SDS with Waku:
- **Sending**: Uses LightPush or Relay protocols; includes Waku message hash as retrieval hint (32 bytes)
- **Receiving**: Subscribes via Filter protocol; unwraps SDS messages before passing to application
- **Missing message retrieval**: Queries Store nodes using retrieval hints from causal history
- **Query on connect**: Automatically queries Store when connecting to new peers (enabled by default)
This allows `T_req` to be pseudorandomly and linearly distributed as a backoff of between `T_min` and `T_max` from current time.
> **_Note:_** placing `T_req` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement.
#### Determine T_resp
A participant determines the repair response timestamp, `T_resp`,
for a `HistoryEntry` that it could repair as follows:
```
distance = hash(participant_id) XOR hash(sender_id)
T_resp = current_time + distance*hash(message_id) % T_max
```
where `current_time` is the current timestamp,
`participant_id` is the participant's _own_ (local) participant ID,
`sender_id` is the requested `HistoryEntry` sender ID,
`message_id` is the requested `HistoryEntry` message ID,
and `T_max` is as set out in [SDS-R global state](#sds-r-global-state).
We first calculate the logical `distance` between the local `participant_id` and the original `sender_id`.
If this participant is the original sender, the `distance` will be `0`.
It should then be clear that the original participant will have a response backoff time of `0`, making it the most likely responder.
The `T_resp` values for other eligible participants will be pseudorandomly and linearly distributed as a backoff of up to `T_max` from current time.
> **_Note:_** placing `T_resp` values on an exponential backoff curve will likely be more appropriate and is left for a future improvement.
#### Determine response group
Given a message with `sender_id` and `message_id`,
a participant with `participant_id` is in the response group for that message if
```
hash(participant_id, message_id) % num_response_groups == hash(sender_id, message_id) % num_response_groups
```
where `num_response_groups` is as set out in [SDS-R global state](#sds-r-global-state).
This ensures that a participant will always be in the response group for its own published messages.
It also allows participants to determine immediately on first reception of a message or a history entry
if they are in the associated response group.
#### SDS-R incoming repair request buffer sweep
An SDS-R participant MUST periodically check if there are any incoming requests in the *incoming repair request buffer* that is due for a response.
For each item in the buffer,
the participant SHOULD broadcast the corresponding `Message` from local history
if its corresponding response timestamp, `T_resp`, has expired (in other words, `T_resp <= current_time`).
#### SDS-R Periodic Sync Message
If the participant is due to send a periodic sync message,
it SHOULD send the message according to [SDS-R send message](#sds-r-send-message)
if there are any eligible items in the outgoing repair request buffer,
regardless of whether other participants have also recently broadcast a Periodic Sync message.
## Copyright