mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
feat: add SDS-Repair (SDS-R) to the SDS implementation (#2698)
* wip * feat: integrate sds-r with message channels * feat: add SDS-R events * fix: fixed buffer handling incoming and outgoing * fix: more buffer fixes * fix: remove some magic numbers * fix: buffer optimisation, backwards compatible senderId * fix: fix implementation guide, remove unrelated claude file * fix: further buffer optimisations * fix: linting errors * fix: suggestions from code review Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com> Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> * fix: remove implementation guide * fix: build errors, remove override, improve buffer * fix: consistent use of MessageId and ParticipantId * fix: switch to conditionally constructed from conditionally executed --------- Co-authored-by: fryorcraken <commits@fryorcraken.xyz> Co-authored-by: Sasha <118575614+weboko@users.noreply.github.com> Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
This commit is contained in:
parent
115cdd28fe
commit
5334a7fcc9
@ -13,6 +13,7 @@ import type { Uint8ArrayList } from 'uint8arraylist'
|
||||
export interface HistoryEntry {
|
||||
messageId: string
|
||||
retrievalHint?: Uint8Array
|
||||
senderId?: string
|
||||
}
|
||||
|
||||
export namespace HistoryEntry {
|
||||
@ -35,6 +36,11 @@ export namespace HistoryEntry {
|
||||
w.bytes(obj.retrievalHint)
|
||||
}
|
||||
|
||||
if (obj.senderId != null) {
|
||||
w.uint32(26)
|
||||
w.string(obj.senderId)
|
||||
}
|
||||
|
||||
if (opts.lengthDelimited !== false) {
|
||||
w.ldelim()
|
||||
}
|
||||
@ -57,6 +63,10 @@ export namespace HistoryEntry {
|
||||
obj.retrievalHint = reader.bytes()
|
||||
break
|
||||
}
|
||||
case 3: {
|
||||
obj.senderId = reader.string()
|
||||
break
|
||||
}
|
||||
default: {
|
||||
reader.skipType(tag & 7)
|
||||
break
|
||||
@ -87,6 +97,7 @@ export interface SdsMessage {
|
||||
lamportTimestamp?: bigint
|
||||
causalHistory: HistoryEntry[]
|
||||
bloomFilter?: Uint8Array
|
||||
repairRequest: HistoryEntry[]
|
||||
content?: Uint8Array
|
||||
}
|
||||
|
||||
@ -132,6 +143,13 @@ export namespace SdsMessage {
|
||||
w.bytes(obj.bloomFilter)
|
||||
}
|
||||
|
||||
if (obj.repairRequest != null) {
|
||||
for (const value of obj.repairRequest) {
|
||||
w.uint32(106)
|
||||
HistoryEntry.codec().encode(value, w)
|
||||
}
|
||||
}
|
||||
|
||||
if (obj.content != null) {
|
||||
w.uint32(162)
|
||||
w.bytes(obj.content)
|
||||
@ -145,7 +163,8 @@ export namespace SdsMessage {
|
||||
senderId: '',
|
||||
messageId: '',
|
||||
channelId: '',
|
||||
causalHistory: []
|
||||
causalHistory: [],
|
||||
repairRequest: []
|
||||
}
|
||||
|
||||
const end = length == null ? reader.len : reader.pos + length
|
||||
@ -184,6 +203,16 @@ export namespace SdsMessage {
|
||||
obj.bloomFilter = reader.bytes()
|
||||
break
|
||||
}
|
||||
case 13: {
|
||||
if (opts.limits?.repairRequest != null && obj.repairRequest.length === opts.limits.repairRequest) {
|
||||
throw new MaxLengthError('Decode error - map field "repairRequest" had too many elements')
|
||||
}
|
||||
|
||||
obj.repairRequest.push(HistoryEntry.codec().decode(reader, reader.uint32(), {
|
||||
limits: opts.limits?.repairRequest$
|
||||
}))
|
||||
break
|
||||
}
|
||||
case 20: {
|
||||
obj.content = reader.bytes()
|
||||
break
|
||||
|
||||
@ -3,6 +3,8 @@ syntax = "proto3";
|
||||
message HistoryEntry {
|
||||
string message_id = 1; // Unique identifier of the SDS message, as defined in `Message`
|
||||
optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash
|
||||
|
||||
optional string sender_id = 3; // Participant ID of original message sender. Only populated if using optional SDS Repair extension
|
||||
}
|
||||
|
||||
message SdsMessage {
|
||||
@ -12,5 +14,8 @@ message SdsMessage {
|
||||
optional uint64 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel
|
||||
repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included.
|
||||
optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel
|
||||
|
||||
repeated HistoryEntry repair_request = 13; // Capped list of history entries missing from sender's causal history. Only populated if using the optional SDS Repair extension.
|
||||
|
||||
optional bytes content = 20; // Actual content of the message
|
||||
}
|
||||
|
||||
@ -14,8 +14,14 @@ export {
|
||||
type HistoryEntry,
|
||||
type ChannelId,
|
||||
type MessageChannelEvents,
|
||||
type SenderId,
|
||||
type ParticipantId,
|
||||
type MessageId
|
||||
} from "./message_channel/index.js";
|
||||
|
||||
/**
|
||||
* @deprecated Use ParticipantId instead. SenderId has been renamed to ParticipantId
|
||||
* to better reflect that it represents a channel participant, not just a message sender.
|
||||
*/
|
||||
export type { ParticipantId as SenderId } from "./message_channel/index.js";
|
||||
|
||||
export { BloomFilter };
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { HistoryEntry, Message, MessageId } from "./message.js";
|
||||
import { HistoryEntry, Message, MessageId, ParticipantId } from "./message.js";
|
||||
|
||||
export enum MessageChannelEvent {
|
||||
OutMessageSent = "sds:out:message-sent",
|
||||
@ -10,7 +10,13 @@ export enum MessageChannelEvent {
|
||||
OutSyncSent = "sds:out:sync-sent",
|
||||
InSyncReceived = "sds:in:sync-received",
|
||||
InMessageLost = "sds:in:message-irretrievably-lost",
|
||||
ErrorTask = "sds:error-task"
|
||||
ErrorTask = "sds:error-task",
|
||||
// SDS-R Repair Events
|
||||
RepairRequestQueued = "sds:repair:request-queued",
|
||||
RepairRequestSent = "sds:repair:request-sent",
|
||||
RepairRequestReceived = "sds:repair:request-received",
|
||||
RepairResponseQueued = "sds:repair:response-queued",
|
||||
RepairResponseSent = "sds:repair:response-sent"
|
||||
}
|
||||
|
||||
export type MessageChannelEvents = {
|
||||
@ -26,5 +32,24 @@ export type MessageChannelEvents = {
|
||||
[MessageChannelEvent.InMessageLost]: CustomEvent<HistoryEntry[]>;
|
||||
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
|
||||
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
|
||||
[MessageChannelEvent.ErrorTask]: CustomEvent<any>;
|
||||
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
|
||||
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
|
||||
messageId: MessageId;
|
||||
tReq: number;
|
||||
}>;
|
||||
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
|
||||
messageIds: MessageId[];
|
||||
carrierMessageId: MessageId;
|
||||
}>;
|
||||
[MessageChannelEvent.RepairRequestReceived]: CustomEvent<{
|
||||
messageIds: MessageId[];
|
||||
fromSenderId?: ParticipantId;
|
||||
}>;
|
||||
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
|
||||
messageId: MessageId;
|
||||
tResp: number;
|
||||
}>;
|
||||
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
|
||||
messageId: MessageId;
|
||||
}>;
|
||||
};
|
||||
|
||||
@ -8,7 +8,7 @@ export {
|
||||
HistoryEntry,
|
||||
Message,
|
||||
MessageId,
|
||||
SenderId,
|
||||
ParticipantId,
|
||||
SyncMessage,
|
||||
isContentMessage,
|
||||
isEphemeralMessage,
|
||||
|
||||
@ -44,6 +44,7 @@ describe("Message serialization", () => {
|
||||
[{ messageId: depMessageId, retrievalHint: depRetrievalHint }],
|
||||
0n,
|
||||
undefined,
|
||||
undefined,
|
||||
undefined
|
||||
);
|
||||
|
||||
@ -54,6 +55,39 @@ describe("Message serialization", () => {
|
||||
{ messageId: depMessageId, retrievalHint: depRetrievalHint }
|
||||
]);
|
||||
});
|
||||
|
||||
it("Repair Request", () => {
|
||||
const repairMessageId = "missing-message";
|
||||
const repairRetrievalHint = utf8ToBytes("missing-retrieval");
|
||||
const repairSenderId = "original-sender";
|
||||
const message = new Message(
|
||||
"123",
|
||||
"my-channel",
|
||||
"me",
|
||||
[],
|
||||
0n,
|
||||
undefined,
|
||||
undefined,
|
||||
[
|
||||
{
|
||||
messageId: repairMessageId,
|
||||
retrievalHint: repairRetrievalHint,
|
||||
senderId: repairSenderId
|
||||
}
|
||||
]
|
||||
);
|
||||
|
||||
const bytes = message.encode();
|
||||
const decMessage = Message.decode(bytes);
|
||||
|
||||
expect(decMessage!.repairRequest).to.deep.equal([
|
||||
{
|
||||
messageId: repairMessageId,
|
||||
retrievalHint: repairRetrievalHint,
|
||||
senderId: repairSenderId
|
||||
}
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ContentMessage comparison with < operator", () => {
|
||||
|
||||
@ -4,19 +4,20 @@ import { Logger } from "@waku/utils";
|
||||
export type MessageId = string;
|
||||
export type HistoryEntry = proto_sds_message.HistoryEntry;
|
||||
export type ChannelId = string;
|
||||
export type SenderId = string;
|
||||
export type ParticipantId = string;
|
||||
|
||||
const log = new Logger("sds:message");
|
||||
|
||||
export class Message implements proto_sds_message.SdsMessage {
|
||||
public constructor(
|
||||
public messageId: string,
|
||||
public messageId: MessageId,
|
||||
public channelId: string,
|
||||
public senderId: string,
|
||||
public senderId: ParticipantId,
|
||||
public causalHistory: proto_sds_message.HistoryEntry[],
|
||||
public lamportTimestamp?: bigint | undefined,
|
||||
public bloomFilter?: Uint8Array<ArrayBufferLike> | undefined,
|
||||
public content?: Uint8Array<ArrayBufferLike> | undefined,
|
||||
public repairRequest: proto_sds_message.HistoryEntry[] = [],
|
||||
/**
|
||||
* Not encoded, set after it is sent, used to include in follow-up messages
|
||||
*/
|
||||
@ -38,7 +39,8 @@ export class Message implements proto_sds_message.SdsMessage {
|
||||
causalHistory,
|
||||
lamportTimestamp,
|
||||
bloomFilter,
|
||||
content
|
||||
content,
|
||||
repairRequest
|
||||
} = proto_sds_message.SdsMessage.decode(data);
|
||||
|
||||
if (testContentMessage({ lamportTimestamp, content })) {
|
||||
@ -49,7 +51,8 @@ export class Message implements proto_sds_message.SdsMessage {
|
||||
causalHistory,
|
||||
lamportTimestamp!,
|
||||
bloomFilter,
|
||||
content!
|
||||
content!,
|
||||
repairRequest
|
||||
);
|
||||
}
|
||||
|
||||
@ -61,7 +64,8 @@ export class Message implements proto_sds_message.SdsMessage {
|
||||
causalHistory,
|
||||
undefined,
|
||||
bloomFilter,
|
||||
content!
|
||||
content!,
|
||||
repairRequest
|
||||
);
|
||||
}
|
||||
|
||||
@ -73,7 +77,8 @@ export class Message implements proto_sds_message.SdsMessage {
|
||||
causalHistory,
|
||||
lamportTimestamp!,
|
||||
bloomFilter,
|
||||
undefined
|
||||
undefined,
|
||||
repairRequest
|
||||
);
|
||||
}
|
||||
log.error(
|
||||
@ -90,13 +95,14 @@ export class Message implements proto_sds_message.SdsMessage {
|
||||
|
||||
export class SyncMessage extends Message {
|
||||
public constructor(
|
||||
public messageId: string,
|
||||
public messageId: MessageId,
|
||||
public channelId: string,
|
||||
public senderId: string,
|
||||
public senderId: ParticipantId,
|
||||
public causalHistory: proto_sds_message.HistoryEntry[],
|
||||
public lamportTimestamp: bigint,
|
||||
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
|
||||
public content: undefined,
|
||||
public repairRequest: proto_sds_message.HistoryEntry[] = [],
|
||||
/**
|
||||
* Not encoded, set after it is sent, used to include in follow-up messages
|
||||
*/
|
||||
@ -110,6 +116,7 @@ export class SyncMessage extends Message {
|
||||
lamportTimestamp,
|
||||
bloomFilter,
|
||||
content,
|
||||
repairRequest,
|
||||
retrievalHint
|
||||
);
|
||||
}
|
||||
@ -134,13 +141,14 @@ export function isSyncMessage(
|
||||
|
||||
export class EphemeralMessage extends Message {
|
||||
public constructor(
|
||||
public messageId: string,
|
||||
public messageId: MessageId,
|
||||
public channelId: string,
|
||||
public senderId: string,
|
||||
public senderId: ParticipantId,
|
||||
public causalHistory: proto_sds_message.HistoryEntry[],
|
||||
public lamportTimestamp: undefined,
|
||||
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
|
||||
public content: Uint8Array<ArrayBufferLike>,
|
||||
public repairRequest: proto_sds_message.HistoryEntry[] = [],
|
||||
/**
|
||||
* Not encoded, set after it is sent, used to include in follow-up messages
|
||||
*/
|
||||
@ -157,6 +165,7 @@ export class EphemeralMessage extends Message {
|
||||
lamportTimestamp,
|
||||
bloomFilter,
|
||||
content,
|
||||
repairRequest,
|
||||
retrievalHint
|
||||
);
|
||||
}
|
||||
@ -182,13 +191,14 @@ function testEphemeralMessage(message: {
|
||||
|
||||
export class ContentMessage extends Message {
|
||||
public constructor(
|
||||
public messageId: string,
|
||||
public messageId: MessageId,
|
||||
public channelId: string,
|
||||
public senderId: string,
|
||||
public senderId: ParticipantId,
|
||||
public causalHistory: proto_sds_message.HistoryEntry[],
|
||||
public lamportTimestamp: bigint,
|
||||
public bloomFilter: Uint8Array<ArrayBufferLike> | undefined,
|
||||
public content: Uint8Array<ArrayBufferLike>,
|
||||
public repairRequest: proto_sds_message.HistoryEntry[] = [],
|
||||
/**
|
||||
* Not encoded, set after it is sent, used to include in follow-up messages
|
||||
*/
|
||||
@ -205,6 +215,7 @@ export class ContentMessage extends Message {
|
||||
lamportTimestamp,
|
||||
bloomFilter,
|
||||
content,
|
||||
repairRequest,
|
||||
retrievalHint
|
||||
);
|
||||
}
|
||||
|
||||
@ -162,7 +162,8 @@ describe("MessageChannel", function () {
|
||||
.slice(-causalHistorySize - 1, -1)
|
||||
.map((message) => ({
|
||||
messageId: MessageChannel.getMessageId(utf8ToBytes(message)),
|
||||
retrievalHint: undefined
|
||||
retrievalHint: undefined,
|
||||
senderId: "alice"
|
||||
}));
|
||||
expect(causalHistory).to.deep.equal(expectedCausalHistory);
|
||||
});
|
||||
@ -298,6 +299,7 @@ describe("MessageChannel", function () {
|
||||
1n,
|
||||
undefined,
|
||||
payload,
|
||||
undefined,
|
||||
testRetrievalHint
|
||||
),
|
||||
testRetrievalHint
|
||||
|
||||
@ -18,15 +18,21 @@ import {
|
||||
isSyncMessage,
|
||||
Message,
|
||||
MessageId,
|
||||
SenderId,
|
||||
ParticipantId,
|
||||
SyncMessage
|
||||
} from "./message.js";
|
||||
import { RepairConfig, RepairManager } from "./repair/repair.js";
|
||||
|
||||
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
||||
capacity: 10000,
|
||||
errorRate: 0.001
|
||||
};
|
||||
|
||||
/**
|
||||
* Maximum number of repair requests to include in a single message
|
||||
*/
|
||||
const MAX_REPAIR_REQUESTS_PER_MESSAGE = 3;
|
||||
|
||||
const DEFAULT_CAUSAL_HISTORY_SIZE = 200;
|
||||
const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2;
|
||||
|
||||
@ -46,6 +52,15 @@ export interface MessageChannelOptions {
|
||||
* How many possible acks does it take to consider it a definitive ack.
|
||||
*/
|
||||
possibleAcksThreshold?: number;
|
||||
/**
|
||||
* Whether to enable SDS-R repair protocol.
|
||||
* @default true
|
||||
*/
|
||||
enableRepair?: boolean;
|
||||
/**
|
||||
* SDS-R repair configuration. Only used if enableRepair is true.
|
||||
*/
|
||||
repairConfig?: RepairConfig;
|
||||
}
|
||||
|
||||
export type ILocalHistory = Pick<
|
||||
@ -55,7 +70,7 @@ export type ILocalHistory = Pick<
|
||||
|
||||
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
public readonly channelId: ChannelId;
|
||||
public readonly senderId: SenderId;
|
||||
public readonly senderId: ParticipantId;
|
||||
private lamportTimestamp: bigint;
|
||||
private filter: DefaultBloomFilter;
|
||||
private outgoingBuffer: ContentMessage[];
|
||||
@ -66,6 +81,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
private readonly causalHistorySize: number;
|
||||
private readonly possibleAcksThreshold: number;
|
||||
private readonly timeoutForLostMessagesMs?: number;
|
||||
private readonly repairManager?: RepairManager;
|
||||
|
||||
private tasks: Task[] = [];
|
||||
private handlers: Handlers = {
|
||||
@ -88,7 +104,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
|
||||
public constructor(
|
||||
channelId: ChannelId,
|
||||
senderId: SenderId,
|
||||
senderId: ParticipantId,
|
||||
options: MessageChannelOptions = {},
|
||||
localHistory: ILocalHistory = new MemLocalHistory()
|
||||
) {
|
||||
@ -109,6 +125,17 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
|
||||
this.timeReceived = new Map();
|
||||
this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs;
|
||||
|
||||
// Only construct RepairManager if repair is enabled (default: true)
|
||||
if (options.enableRepair ?? true) {
|
||||
this.repairManager = new RepairManager(
|
||||
senderId,
|
||||
options.repairConfig,
|
||||
(event: string, detail: unknown) => {
|
||||
this.safeSendEvent(event as MessageChannelEvent, { detail });
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static getMessageId(payload: Uint8Array): MessageId {
|
||||
@ -272,9 +299,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
);
|
||||
const missingDependencies = message.causalHistory.filter(
|
||||
(messageHistoryEntry) =>
|
||||
!this.localHistory.some(
|
||||
({ messageId }) => messageId === messageHistoryEntry.messageId
|
||||
)
|
||||
!this.isMessageAvailable(messageHistoryEntry.messageId)
|
||||
);
|
||||
if (missingDependencies.length === 0) {
|
||||
if (isContentMessage(message) && this.deliverMessage(message)) {
|
||||
@ -355,6 +380,44 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep repair incoming buffer and rebroadcast messages ready for repair.
|
||||
* Per SDS-R spec: periodically check for repair responses that are due.
|
||||
*
|
||||
* @param callback - callback to rebroadcast the message
|
||||
* @returns Promise that resolves when all ready repairs have been sent
|
||||
*/
|
||||
public async sweepRepairIncomingBuffer(
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<Message[]> {
|
||||
const repairsToSend =
|
||||
this.repairManager?.sweepIncomingBuffer(this.localHistory) ?? [];
|
||||
|
||||
if (callback) {
|
||||
for (const message of repairsToSend) {
|
||||
try {
|
||||
await callback(message);
|
||||
log.info(
|
||||
this.senderId,
|
||||
"repair message rebroadcast",
|
||||
message.messageId
|
||||
);
|
||||
|
||||
// Emit RepairResponseSent event
|
||||
this.safeSendEvent(MessageChannelEvent.RepairResponseSent, {
|
||||
detail: {
|
||||
messageId: message.messageId
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
log.error("Failed to rebroadcast repair message:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return repairsToSend;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a sync message to the SDS channel.
|
||||
*
|
||||
@ -369,6 +432,12 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
callback?: (message: SyncMessage) => Promise<boolean>
|
||||
): Promise<boolean> {
|
||||
this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
|
||||
|
||||
// Get repair requests to include in sync message (SDS-R)
|
||||
const repairRequests =
|
||||
this.repairManager?.getRepairRequests(MAX_REPAIR_REQUESTS_PER_MESSAGE) ??
|
||||
[];
|
||||
|
||||
const message = new SyncMessage(
|
||||
// does not need to be secure randomness
|
||||
`sync-${Math.random().toString(36).substring(2)}`,
|
||||
@ -376,18 +445,22 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
this.senderId,
|
||||
this.localHistory
|
||||
.slice(-this.causalHistorySize)
|
||||
.map(({ messageId, retrievalHint }) => {
|
||||
return { messageId, retrievalHint };
|
||||
.map(({ messageId, retrievalHint, senderId }) => {
|
||||
return { messageId, retrievalHint, senderId };
|
||||
}),
|
||||
this.lamportTimestamp,
|
||||
this.filter.toBytes(),
|
||||
undefined
|
||||
undefined,
|
||||
repairRequests
|
||||
);
|
||||
|
||||
if (!message.causalHistory || message.causalHistory.length === 0) {
|
||||
if (
|
||||
(!message.causalHistory || message.causalHistory.length === 0) &&
|
||||
repairRequests.length === 0
|
||||
) {
|
||||
log.info(
|
||||
this.senderId,
|
||||
"no causal history in sync message, aborting sending"
|
||||
"no causal history and no repair requests in sync message, aborting sending"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
@ -399,6 +472,17 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
this.safeSendEvent(MessageChannelEvent.OutSyncSent, {
|
||||
detail: message
|
||||
});
|
||||
|
||||
// Emit RepairRequestSent event if repair requests were included
|
||||
if (repairRequests.length > 0) {
|
||||
this.safeSendEvent(MessageChannelEvent.RepairRequestSent, {
|
||||
detail: {
|
||||
messageIds: repairRequests.map((r) => r.messageId),
|
||||
carrierMessageId: message.messageId
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
log.error(
|
||||
@ -464,6 +548,26 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
detail: message
|
||||
});
|
||||
}
|
||||
|
||||
// SDS-R: Handle received message in repair manager
|
||||
this.repairManager?.markMessageReceived(message.messageId);
|
||||
|
||||
// SDS-R: Process incoming repair requests
|
||||
if (message.repairRequest && message.repairRequest.length > 0) {
|
||||
// Emit RepairRequestReceived event
|
||||
this.safeSendEvent(MessageChannelEvent.RepairRequestReceived, {
|
||||
detail: {
|
||||
messageIds: message.repairRequest.map((r) => r.messageId),
|
||||
fromSenderId: message.senderId
|
||||
}
|
||||
});
|
||||
|
||||
this.repairManager?.processIncomingRepairRequests(
|
||||
message.repairRequest,
|
||||
this.localHistory
|
||||
);
|
||||
}
|
||||
|
||||
this.reviewAckStatus(message);
|
||||
if (isContentMessage(message)) {
|
||||
this.filter.insert(message.messageId);
|
||||
@ -471,9 +575,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
|
||||
const missingDependencies = message.causalHistory.filter(
|
||||
(messageHistoryEntry) =>
|
||||
!this.localHistory.some(
|
||||
({ messageId }) => messageId === messageHistoryEntry.messageId
|
||||
)
|
||||
!this.isMessageAvailable(messageHistoryEntry.messageId)
|
||||
);
|
||||
|
||||
if (missingDependencies.length > 0) {
|
||||
@ -487,6 +589,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
missingDependencies.map((ch) => ch.messageId)
|
||||
);
|
||||
|
||||
// SDS-R: Track missing dependencies in repair manager
|
||||
this.repairManager?.markDependenciesMissing(missingDependencies);
|
||||
|
||||
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
|
||||
detail: Array.from(missingDependencies)
|
||||
});
|
||||
@ -549,18 +654,26 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
// It's a new message
|
||||
if (!message) {
|
||||
log.info(this.senderId, "sending new message", messageId);
|
||||
|
||||
// Get repair requests to include in the message (SDS-R)
|
||||
const repairRequests =
|
||||
this.repairManager?.getRepairRequests(
|
||||
MAX_REPAIR_REQUESTS_PER_MESSAGE
|
||||
) ?? [];
|
||||
|
||||
message = new ContentMessage(
|
||||
messageId,
|
||||
this.channelId,
|
||||
this.senderId,
|
||||
this.localHistory
|
||||
.slice(-this.causalHistorySize)
|
||||
.map(({ messageId, retrievalHint }) => {
|
||||
return { messageId, retrievalHint };
|
||||
.map(({ messageId, retrievalHint, senderId }) => {
|
||||
return { messageId, retrievalHint, senderId };
|
||||
}),
|
||||
this.lamportTimestamp,
|
||||
this.filter.toBytes(),
|
||||
payload
|
||||
payload,
|
||||
repairRequests
|
||||
);
|
||||
|
||||
this.outgoingBuffer.push(message);
|
||||
@ -616,6 +729,26 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a message is available (either in localHistory or incomingBuffer)
|
||||
* This prevents treating messages as "missing" when they've already been received
|
||||
* but are waiting in the incoming buffer for their dependencies.
|
||||
*
|
||||
* @param messageId - The ID of the message to check
|
||||
* @private
|
||||
*/
|
||||
private isMessageAvailable(messageId: MessageId): boolean {
|
||||
// Check if in local history
|
||||
if (this.localHistory.some((m) => m.messageId === messageId)) {
|
||||
return true;
|
||||
}
|
||||
// Check if in incoming buffer (already received, waiting for dependencies)
|
||||
if (this.incomingBuffer.some((m) => m.messageId === messageId)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if the message was "delivered"
|
||||
*
|
||||
@ -657,6 +790,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
}
|
||||
|
||||
this.localHistory.push(message);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
239
packages/sds/src/message_channel/repair/buffers.spec.ts
Normal file
239
packages/sds/src/message_channel/repair/buffers.spec.ts
Normal file
@ -0,0 +1,239 @@
|
||||
import { expect } from "chai";
|
||||
|
||||
import type { HistoryEntry } from "../message.js";
|
||||
|
||||
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
|
||||
|
||||
describe("OutgoingRepairBuffer", () => {
|
||||
let buffer: OutgoingRepairBuffer;
|
||||
|
||||
beforeEach(() => {
|
||||
buffer = new OutgoingRepairBuffer(3); // Small buffer for testing
|
||||
});
|
||||
|
||||
it("should add entries and maintain sorted order", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry3, 3000);
|
||||
|
||||
const items = buffer.getItems();
|
||||
expect(items).to.have.lengthOf(3);
|
||||
expect(items[0].tReq).to.equal(1000);
|
||||
expect(items[1].tReq).to.equal(2000);
|
||||
expect(items[2].tReq).to.equal(3000);
|
||||
});
|
||||
|
||||
it("should not update T_req if message already exists", () => {
|
||||
const entry: HistoryEntry = { messageId: "msg1" };
|
||||
|
||||
buffer.add(entry, 1000);
|
||||
buffer.add(entry, 2000); // Try to add again with different T_req
|
||||
|
||||
const items = buffer.getItems();
|
||||
expect(items).to.have.lengthOf(1);
|
||||
expect(items[0].tReq).to.equal(1000); // Should keep original
|
||||
});
|
||||
|
||||
it("should evict furthest entry when buffer is full", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
const entry4: HistoryEntry = { messageId: "msg4" };
|
||||
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry3, 3000);
|
||||
buffer.add(entry4, 1500); // Should evict msg3 (furthest T_req = 3000)
|
||||
|
||||
const items = buffer.getItems();
|
||||
expect(items).to.have.lengthOf(3);
|
||||
expect(buffer.has("msg3")).to.be.false; // msg3 should be evicted (furthest T_req)
|
||||
expect(buffer.has("msg1")).to.be.true;
|
||||
expect(buffer.has("msg2")).to.be.true;
|
||||
expect(buffer.has("msg4")).to.be.true;
|
||||
});
|
||||
|
||||
it("should get eligible entries based on current time", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry3, 3000);
|
||||
|
||||
const eligible = buffer.getEligible(1500, 3);
|
||||
expect(eligible).to.have.lengthOf(1);
|
||||
expect(eligible[0].messageId).to.equal("msg1");
|
||||
});
|
||||
|
||||
it("should get multiple eligible entries at later time", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
|
||||
// Create new buffer for second test since getEligible marks entries as requested
|
||||
const buffer2 = new OutgoingRepairBuffer(3);
|
||||
buffer2.add(entry1, 1000);
|
||||
buffer2.add(entry2, 2000);
|
||||
buffer2.add(entry3, 3000);
|
||||
|
||||
const eligible = buffer2.getEligible(2500, 3);
|
||||
expect(eligible).to.have.lengthOf(2);
|
||||
expect(eligible[0].messageId).to.equal("msg1");
|
||||
expect(eligible[1].messageId).to.equal("msg2");
|
||||
});
|
||||
|
||||
it("should respect maxRequests limit", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry3, 3000);
|
||||
|
||||
const eligible = buffer.getEligible(5000, 2); // All are eligible but limit to 2
|
||||
expect(eligible).to.have.lengthOf(2);
|
||||
expect(eligible[0].messageId).to.equal("msg1");
|
||||
expect(eligible[1].messageId).to.equal("msg2");
|
||||
});
|
||||
|
||||
it("should remove entries", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
|
||||
expect(buffer.size).to.equal(2);
|
||||
buffer.remove("msg1");
|
||||
expect(buffer.size).to.equal(1);
|
||||
expect(buffer.has("msg1")).to.be.false;
|
||||
expect(buffer.has("msg2")).to.be.true;
|
||||
});
|
||||
|
||||
it("should handle retrieval hint and sender_id", () => {
|
||||
const hint = new Uint8Array([1, 2, 3]);
|
||||
const entry: HistoryEntry = {
|
||||
messageId: "msg1",
|
||||
retrievalHint: hint,
|
||||
senderId: "sender1"
|
||||
};
|
||||
|
||||
buffer.add(entry, 1000);
|
||||
const all = buffer.getAll();
|
||||
expect(all[0].retrievalHint).to.deep.equal(hint);
|
||||
expect(all[0].senderId).to.equal("sender1");
|
||||
});
|
||||
});
|
||||
|
||||
describe("IncomingRepairBuffer", () => {
|
||||
let buffer: IncomingRepairBuffer;
|
||||
|
||||
beforeEach(() => {
|
||||
buffer = new IncomingRepairBuffer(3); // Small buffer for testing
|
||||
});
|
||||
|
||||
it("should add entries and maintain sorted order", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry3, 3000);
|
||||
|
||||
const items = buffer.getItems();
|
||||
expect(items).to.have.lengthOf(3);
|
||||
expect(items[0].tResp).to.equal(1000);
|
||||
expect(items[1].tResp).to.equal(2000);
|
||||
expect(items[2].tResp).to.equal(3000);
|
||||
});
|
||||
|
||||
it("should ignore duplicate entries", () => {
|
||||
const entry: HistoryEntry = { messageId: "msg1" };
|
||||
|
||||
buffer.add(entry, 1000);
|
||||
buffer.add(entry, 500); // Try to add again with earlier T_resp
|
||||
|
||||
const items = buffer.getItems();
|
||||
expect(items).to.have.lengthOf(1);
|
||||
expect(items[0].tResp).to.equal(1000); // Should keep original
|
||||
});
|
||||
|
||||
it("should evict furthest entry when buffer is full", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
const entry4: HistoryEntry = { messageId: "msg4" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry3, 3000);
|
||||
buffer.add(entry4, 1500); // Should evict msg3 (furthest T_resp)
|
||||
|
||||
const items = buffer.getItems();
|
||||
expect(items).to.have.lengthOf(3);
|
||||
expect(buffer.has("msg3")).to.be.false; // msg3 should be evicted
|
||||
expect(buffer.has("msg1")).to.be.true;
|
||||
expect(buffer.has("msg2")).to.be.true;
|
||||
expect(buffer.has("msg4")).to.be.true;
|
||||
});
|
||||
|
||||
it("should get and remove ready entries", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
const entry3: HistoryEntry = { messageId: "msg3" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
buffer.add(entry3, 3000);
|
||||
|
||||
const ready = buffer.getReady(1500);
|
||||
expect(ready).to.have.lengthOf(1);
|
||||
expect(ready[0].messageId).to.equal("msg1");
|
||||
|
||||
// Entry should be removed from buffer
|
||||
expect(buffer.size).to.equal(2);
|
||||
expect(buffer.has("msg1")).to.be.false;
|
||||
|
||||
const ready2 = buffer.getReady(2500);
|
||||
expect(ready2).to.have.lengthOf(1);
|
||||
expect(ready2[0].messageId).to.equal("msg2");
|
||||
|
||||
expect(buffer.size).to.equal(1);
|
||||
expect(buffer.has("msg2")).to.be.false;
|
||||
expect(buffer.has("msg3")).to.be.true;
|
||||
});
|
||||
|
||||
it("should remove entries", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
|
||||
expect(buffer.size).to.equal(2);
|
||||
buffer.remove("msg1");
|
||||
expect(buffer.size).to.equal(1);
|
||||
expect(buffer.has("msg1")).to.be.false;
|
||||
expect(buffer.has("msg2")).to.be.true;
|
||||
});
|
||||
|
||||
it("should clear all entries", () => {
|
||||
const entry1: HistoryEntry = { messageId: "msg1" };
|
||||
const entry2: HistoryEntry = { messageId: "msg2" };
|
||||
|
||||
buffer.add(entry1, 1000);
|
||||
buffer.add(entry2, 2000);
|
||||
|
||||
expect(buffer.size).to.equal(2);
|
||||
buffer.clear();
|
||||
expect(buffer.size).to.equal(0);
|
||||
});
|
||||
});
|
||||
277
packages/sds/src/message_channel/repair/buffers.ts
Normal file
277
packages/sds/src/message_channel/repair/buffers.ts
Normal file
@ -0,0 +1,277 @@
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import type { HistoryEntry, MessageId } from "../message.js";
|
||||
|
||||
const log = new Logger("sds:repair:buffers");
|
||||
|
||||
/**
|
||||
* Entry in the outgoing repair buffer with request timing
|
||||
*/
|
||||
interface OutgoingBufferEntry {
|
||||
entry: HistoryEntry;
|
||||
tReq: number; // Timestamp when this repair request should be sent
|
||||
requested: boolean; // Whether this repair has already been requested by the local node
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry in the incoming repair buffer with response timing
|
||||
*/
|
||||
interface IncomingBufferEntry {
|
||||
entry: HistoryEntry;
|
||||
tResp: number; // Timestamp when we should respond with this repair
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer for outgoing repair requests (messages we need)
|
||||
* Maintains a sorted array by T_req for efficient retrieval of eligible entries
|
||||
*/
|
||||
export class OutgoingRepairBuffer {
|
||||
// Sorted array by T_req (ascending - earliest first)
|
||||
private items: OutgoingBufferEntry[] = [];
|
||||
private readonly maxSize: number;
|
||||
|
||||
public constructor(maxSize = 1000) {
|
||||
this.maxSize = maxSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a missing message to the outgoing repair request buffer
|
||||
* If message already exists, it is not updated (keeps original T_req)
|
||||
* @returns true if the entry was added, false if it already existed
|
||||
*/
|
||||
public add(entry: HistoryEntry, tReq: number): boolean {
|
||||
const messageId = entry.messageId;
|
||||
|
||||
// Check if already exists - do NOT update T_req per spec
|
||||
if (this.has(messageId)) {
|
||||
log.info(
|
||||
`Message ${messageId} already in outgoing buffer, keeping original T_req`
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check buffer size limit
|
||||
if (this.items.length >= this.maxSize) {
|
||||
// Evict furthest T_req entry (last in sorted array) to preserve repairs that need to be sent the soonest
|
||||
const evicted = this.items.pop()!;
|
||||
log.warn(
|
||||
`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`
|
||||
);
|
||||
}
|
||||
|
||||
// Add new entry and re-sort
|
||||
const newEntry: OutgoingBufferEntry = { entry, tReq, requested: false };
|
||||
const combined = [...this.items, newEntry];
|
||||
|
||||
// Sort by T_req (ascending)
|
||||
combined.sort((a, b) => a.tReq - b.tReq);
|
||||
|
||||
this.items = combined;
|
||||
log.info(`Added ${messageId} to outgoing buffer with T_req: ${tReq}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a message from the buffer (e.g., when received)
|
||||
*/
|
||||
public remove(messageId: MessageId): void {
|
||||
this.items = this.items.filter(
|
||||
(item) => item.entry.messageId !== messageId
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get eligible repair requests (where T_req <= currentTime)
|
||||
* Returns up to maxRequests entries from the front of the sorted array
|
||||
* Marks returned entries as requested but keeps them in buffer until received
|
||||
*/
|
||||
public getEligible(
|
||||
currentTime: number = Date.now(),
|
||||
maxRequests = 3
|
||||
): HistoryEntry[] {
|
||||
const eligible: HistoryEntry[] = [];
|
||||
|
||||
// Iterate from front of sorted array (earliest T_req first)
|
||||
for (const item of this.items) {
|
||||
// Since array is sorted, once we hit an item with tReq > currentTime,
|
||||
// all remaining items also have tReq > currentTime
|
||||
if (item.tReq > currentTime) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Only return items that haven't been requested yet
|
||||
if (!item.requested && eligible.length < maxRequests) {
|
||||
eligible.push(item.entry);
|
||||
// Mark as requested so we don't request it again
|
||||
item.requested = true;
|
||||
log.info(
|
||||
`Repair request for ${item.entry.messageId} is eligible and marked as requested`
|
||||
);
|
||||
}
|
||||
|
||||
// If we've found enough eligible items, exit early
|
||||
if (eligible.length >= maxRequests) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return eligible;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a message is in the buffer
|
||||
*/
|
||||
public has(messageId: MessageId): boolean {
|
||||
return this.items.some((item) => item.entry.messageId === messageId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current buffer size
|
||||
*/
|
||||
public get size(): number {
|
||||
return this.items.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all entries
|
||||
*/
|
||||
public clear(): void {
|
||||
this.items = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all entries (for testing/debugging)
|
||||
*/
|
||||
public getAll(): HistoryEntry[] {
|
||||
return this.items.map((item) => item.entry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get items array directly (for testing)
|
||||
*/
|
||||
public getItems(): OutgoingBufferEntry[] {
|
||||
return [...this.items];
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Buffer for incoming repair requests (repairs we need to send)
|
||||
* Maintains a sorted array by T_resp for efficient retrieval of ready entries
|
||||
*/
|
||||
export class IncomingRepairBuffer {
|
||||
// Sorted array by T_resp (ascending - earliest first)
|
||||
private items: IncomingBufferEntry[] = [];
|
||||
private readonly maxSize: number;
|
||||
|
||||
public constructor(maxSize = 1000) {
|
||||
this.maxSize = maxSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a repair request that we can fulfill
|
||||
* If message already exists, it is ignored (not updated)
|
||||
* @returns true if the entry was added, false if it already existed
|
||||
*/
|
||||
public add(entry: HistoryEntry, tResp: number): boolean {
|
||||
const messageId = entry.messageId;
|
||||
|
||||
// Check if already exists - ignore per spec
|
||||
if (this.has(messageId)) {
|
||||
log.info(`Message ${messageId} already in incoming buffer, ignoring`);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check buffer size limit
|
||||
if (this.items.length >= this.maxSize) {
|
||||
// Evict furthest T_resp entry (last in sorted array)
|
||||
const evicted = this.items.pop()!;
|
||||
log.warn(
|
||||
`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}`
|
||||
);
|
||||
}
|
||||
|
||||
// Add new entry and re-sort
|
||||
const newEntry: IncomingBufferEntry = { entry, tResp };
|
||||
const combined = [...this.items, newEntry];
|
||||
|
||||
// Sort by T_resp (ascending)
|
||||
combined.sort((a, b) => a.tResp - b.tResp);
|
||||
|
||||
this.items = combined;
|
||||
log.info(`Added ${messageId} to incoming buffer with T_resp: ${tResp}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a message from the buffer
|
||||
*/
|
||||
public remove(messageId: MessageId): void {
|
||||
this.items = this.items.filter(
|
||||
(item) => item.entry.messageId !== messageId
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get repairs ready to be sent (where T_resp <= currentTime)
|
||||
* Removes and returns ready entries
|
||||
*/
|
||||
public getReady(currentTime: number): HistoryEntry[] {
|
||||
// Find cutoff point - first item with tResp > currentTime
|
||||
// Since array is sorted, all items before this are ready
|
||||
let cutoff = 0;
|
||||
for (let i = 0; i < this.items.length; i++) {
|
||||
if (this.items[i].tResp > currentTime) {
|
||||
cutoff = i;
|
||||
break;
|
||||
}
|
||||
// If we reach the end, all items are ready
|
||||
cutoff = i + 1;
|
||||
}
|
||||
|
||||
// Extract ready items and log them
|
||||
const ready = this.items.slice(0, cutoff).map((item) => {
|
||||
log.info(`Repair for ${item.entry.messageId} is ready to be sent`);
|
||||
return item.entry;
|
||||
});
|
||||
|
||||
// Keep only items after cutoff
|
||||
this.items = this.items.slice(cutoff);
|
||||
|
||||
return ready;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a message is in the buffer
|
||||
*/
|
||||
public has(messageId: MessageId): boolean {
|
||||
return this.items.some((item) => item.entry.messageId === messageId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current buffer size
|
||||
*/
|
||||
public get size(): number {
|
||||
return this.items.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all entries
|
||||
*/
|
||||
public clear(): void {
|
||||
this.items = [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all entries (for testing/debugging)
|
||||
*/
|
||||
public getAll(): HistoryEntry[] {
|
||||
return this.items.map((item) => item.entry);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get items array directly (for testing)
|
||||
*/
|
||||
public getItems(): IncomingBufferEntry[] {
|
||||
return [...this.items];
|
||||
}
|
||||
}
|
||||
331
packages/sds/src/message_channel/repair/repair.ts
Normal file
331
packages/sds/src/message_channel/repair/repair.ts
Normal file
@ -0,0 +1,331 @@
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import type { HistoryEntry, MessageId } from "../message.js";
|
||||
import { Message } from "../message.js";
|
||||
import type { ILocalHistory } from "../message_channel.js";
|
||||
|
||||
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
|
||||
import {
|
||||
bigintToNumber,
|
||||
calculateXorDistance,
|
||||
combinedHash,
|
||||
hashString,
|
||||
ParticipantId
|
||||
} from "./utils.js";
|
||||
|
||||
const log = new Logger("sds:repair:manager");
|
||||
|
||||
/**
|
||||
* Per SDS-R spec: One response group per 128 participants
|
||||
*/
|
||||
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;
|
||||
|
||||
/**
|
||||
* Event emitter callback for repair events
|
||||
*/
|
||||
export type RepairEventEmitter = (event: string, detail: unknown) => void;
|
||||
|
||||
/**
|
||||
* Configuration for SDS-R repair protocol
|
||||
*/
|
||||
export interface RepairConfig {
|
||||
/** Minimum wait time before requesting repair (milliseconds) */
|
||||
tMin?: number;
|
||||
/** Maximum wait time for repair window (milliseconds) */
|
||||
tMax?: number;
|
||||
/** Number of response groups for load distribution */
|
||||
numResponseGroups?: number;
|
||||
/** Maximum buffer size for repair requests */
|
||||
bufferSize?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default configuration values based on spec recommendations
|
||||
*/
|
||||
export const DEFAULT_REPAIR_CONFIG: Required<RepairConfig> = {
|
||||
tMin: 30000, // 30 seconds
|
||||
tMax: 120000, // 120 seconds
|
||||
numResponseGroups: 1, // Recommendation is 1 group per PARTICIPANTS_PER_RESPONSE_GROUP participants
|
||||
bufferSize: 1000
|
||||
};
|
||||
|
||||
/**
|
||||
* Manager for SDS-R repair protocol
|
||||
* Handles repair request/response timing and coordination
|
||||
*/
|
||||
export class RepairManager {
|
||||
private readonly participantId: ParticipantId;
|
||||
private readonly config: Required<RepairConfig>;
|
||||
private readonly outgoingBuffer: OutgoingRepairBuffer;
|
||||
private readonly incomingBuffer: IncomingRepairBuffer;
|
||||
private readonly eventEmitter?: RepairEventEmitter;
|
||||
|
||||
public constructor(
|
||||
participantId: ParticipantId,
|
||||
config: RepairConfig = {},
|
||||
eventEmitter?: RepairEventEmitter
|
||||
) {
|
||||
this.participantId = participantId;
|
||||
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
|
||||
this.eventEmitter = eventEmitter;
|
||||
|
||||
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
|
||||
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
|
||||
|
||||
log.info(`RepairManager initialized for participant ${participantId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate T_req - when to request repair for a missing message
|
||||
* Per spec: T_req = current_time + hash(participant_id, message_id) % (T_max - T_min) + T_min
|
||||
*/
|
||||
public calculateTReq(messageId: MessageId, currentTime = Date.now()): number {
|
||||
const hash = combinedHash(this.participantId, messageId);
|
||||
const range = BigInt(this.config.tMax - this.config.tMin);
|
||||
const offset = bigintToNumber(hash % range) + this.config.tMin;
|
||||
return currentTime + offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate T_resp - when to respond with a repair
|
||||
* Per spec: T_resp = current_time + (distance * hash(message_id)) % T_max
|
||||
* where distance = participant_id XOR sender_id
|
||||
*/
|
||||
public calculateTResp(
|
||||
senderId: ParticipantId,
|
||||
messageId: MessageId,
|
||||
currentTime = Date.now()
|
||||
): number {
|
||||
const distance = calculateXorDistance(this.participantId, senderId);
|
||||
const messageHash = hashString(messageId);
|
||||
const product = distance * messageHash;
|
||||
const offset = bigintToNumber(product % BigInt(this.config.tMax));
|
||||
return currentTime + offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if this participant is in the response group for a message
|
||||
* Per spec: (hash(participant_id, message_id) % num_response_groups) ==
|
||||
* (hash(sender_id, message_id) % num_response_groups)
|
||||
*/
|
||||
public isInResponseGroup(
|
||||
senderId: ParticipantId,
|
||||
messageId: MessageId
|
||||
): boolean {
|
||||
if (!senderId) {
|
||||
// Cannot determine response group without sender_id
|
||||
return false;
|
||||
}
|
||||
|
||||
const numGroups = BigInt(this.config.numResponseGroups);
|
||||
if (numGroups <= BigInt(1)) {
|
||||
// Single group, everyone is in it
|
||||
return true;
|
||||
}
|
||||
|
||||
const participantGroup =
|
||||
combinedHash(this.participantId, messageId) % numGroups;
|
||||
const senderGroup = combinedHash(senderId, messageId) % numGroups;
|
||||
|
||||
return participantGroup === senderGroup;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle missing dependencies by adding them to outgoing repair buffer
|
||||
* Called when causal dependencies are detected as missing
|
||||
*/
|
||||
public markDependenciesMissing(
|
||||
missingEntries: HistoryEntry[],
|
||||
currentTime = Date.now()
|
||||
): void {
|
||||
for (const entry of missingEntries) {
|
||||
// Calculate when to request this repair
|
||||
const tReq = this.calculateTReq(entry.messageId, currentTime);
|
||||
|
||||
// Add to outgoing buffer - only log and emit event if actually added
|
||||
const wasAdded = this.outgoingBuffer.add(entry, tReq);
|
||||
|
||||
if (wasAdded) {
|
||||
log.info(
|
||||
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
|
||||
);
|
||||
|
||||
// Emit event
|
||||
this.eventEmitter?.("RepairRequestQueued", {
|
||||
messageId: entry.messageId,
|
||||
tReq
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle receipt of a message - remove from repair buffers
|
||||
* Called when a message is successfully received
|
||||
*/
|
||||
public markMessageReceived(messageId: MessageId): void {
|
||||
// Remove from both buffers as we no longer need to request or respond
|
||||
const wasInOutgoing = this.outgoingBuffer.has(messageId);
|
||||
const wasInIncoming = this.incomingBuffer.has(messageId);
|
||||
|
||||
if (wasInOutgoing) {
|
||||
this.outgoingBuffer.remove(messageId);
|
||||
log.info(
|
||||
`Removed ${messageId} from outgoing repair buffer after receipt`
|
||||
);
|
||||
}
|
||||
|
||||
if (wasInIncoming) {
|
||||
this.incomingBuffer.remove(messageId);
|
||||
log.info(
|
||||
`Removed ${messageId} from incoming repair buffer after receipt`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get repair requests that are eligible to be sent
|
||||
* Returns up to maxRequests entries where T_req <= currentTime
|
||||
*/
|
||||
public getRepairRequests(
|
||||
maxRequests = 3,
|
||||
currentTime = Date.now()
|
||||
): HistoryEntry[] {
|
||||
return this.outgoingBuffer.getEligible(currentTime, maxRequests);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process incoming repair requests from other participants
|
||||
* Adds to incoming buffer if we can fulfill and are in response group
|
||||
*/
|
||||
public processIncomingRepairRequests(
|
||||
requests: HistoryEntry[],
|
||||
localHistory: ILocalHistory,
|
||||
currentTime = Date.now()
|
||||
): void {
|
||||
for (const request of requests) {
|
||||
// Remove from our own outgoing buffer (someone else is requesting it)
|
||||
this.outgoingBuffer.remove(request.messageId);
|
||||
|
||||
// Check if we have this message
|
||||
const message = localHistory.find(
|
||||
(m) => m.messageId === request.messageId
|
||||
);
|
||||
if (!message) {
|
||||
log.info(
|
||||
`Cannot fulfill repair for ${request.messageId} - not in local history`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if we're in the response group
|
||||
if (!request.senderId) {
|
||||
log.warn(
|
||||
`Cannot determine response group for ${request.messageId} - missing sender_id`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!this.isInResponseGroup(request.senderId, request.messageId)) {
|
||||
log.info(`Not in response group for ${request.messageId}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Calculate when to respond
|
||||
const tResp = this.calculateTResp(
|
||||
request.senderId,
|
||||
request.messageId,
|
||||
currentTime
|
||||
);
|
||||
|
||||
// Add to incoming buffer - only log and emit event if actually added
|
||||
const wasAdded = this.incomingBuffer.add(request, tResp);
|
||||
|
||||
if (wasAdded) {
|
||||
log.info(
|
||||
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
|
||||
);
|
||||
|
||||
// Emit event
|
||||
this.eventEmitter?.("RepairResponseQueued", {
|
||||
messageId: request.messageId,
|
||||
tResp
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep outgoing buffer for repairs that should be requested
|
||||
* Returns entries where T_req <= currentTime
|
||||
*/
|
||||
public sweepOutgoingBuffer(
|
||||
maxRequests = 3,
|
||||
currentTime = Date.now()
|
||||
): HistoryEntry[] {
|
||||
return this.getRepairRequests(maxRequests, currentTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep incoming buffer for repairs ready to be sent
|
||||
* Returns messages that should be rebroadcast
|
||||
*/
|
||||
public sweepIncomingBuffer(
|
||||
localHistory: ILocalHistory,
|
||||
currentTime = Date.now()
|
||||
): Message[] {
|
||||
const ready = this.incomingBuffer.getReady(currentTime);
|
||||
const messages: Message[] = [];
|
||||
|
||||
for (const entry of ready) {
|
||||
const message = localHistory.find((m) => m.messageId === entry.messageId);
|
||||
if (message) {
|
||||
messages.push(message);
|
||||
log.info(`Sending repair for ${entry.messageId}`);
|
||||
} else {
|
||||
log.warn(`Message ${entry.messageId} no longer in local history`);
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all buffers
|
||||
*/
|
||||
public clear(): void {
|
||||
this.outgoingBuffer.clear();
|
||||
this.incomingBuffer.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update number of response groups (e.g., when participants change)
|
||||
*/
|
||||
public updateResponseGroups(numParticipants: number): void {
|
||||
if (
|
||||
numParticipants < 0 ||
|
||||
!Number.isFinite(numParticipants) ||
|
||||
!Number.isInteger(numParticipants)
|
||||
) {
|
||||
throw new Error(
|
||||
`Invalid numParticipants: ${numParticipants}. Must be a positive integer.`
|
||||
);
|
||||
}
|
||||
|
||||
if (numParticipants > Number.MAX_SAFE_INTEGER) {
|
||||
log.warn(
|
||||
`numParticipants ${numParticipants} exceeds MAX_SAFE_INTEGER, using MAX_SAFE_INTEGER`
|
||||
);
|
||||
numParticipants = Number.MAX_SAFE_INTEGER;
|
||||
}
|
||||
|
||||
// Per spec: num_response_groups = max(1, num_participants / PARTICIPANTS_PER_RESPONSE_GROUP)
|
||||
this.config.numResponseGroups = Math.max(
|
||||
1,
|
||||
Math.floor(numParticipants / PARTICIPANTS_PER_RESPONSE_GROUP)
|
||||
);
|
||||
log.info(
|
||||
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
|
||||
);
|
||||
}
|
||||
}
|
||||
80
packages/sds/src/message_channel/repair/utils.ts
Normal file
80
packages/sds/src/message_channel/repair/utils.ts
Normal file
@ -0,0 +1,80 @@
|
||||
import { sha256 } from "@noble/hashes/sha2";
|
||||
import { bytesToHex } from "@waku/utils/bytes";
|
||||
|
||||
import type { MessageId } from "../message.js";
|
||||
|
||||
/**
|
||||
* ParticipantId can be a string or converted to a numeric representation for XOR operations
|
||||
*/
|
||||
export type ParticipantId = string;
|
||||
|
||||
/**
|
||||
* Compute SHA256 hash and convert to integer for modulo operations
|
||||
* Uses first 8 bytes of hash for the integer conversion
|
||||
*/
|
||||
export function hashToInteger(input: string): bigint {
|
||||
const hashBytes = sha256(new TextEncoder().encode(input));
|
||||
// Use first 8 bytes for a 64-bit integer
|
||||
const view = new DataView(hashBytes.buffer, 0, 8);
|
||||
return view.getBigUint64(0, false); // big-endian
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute combined hash for (participantId, messageId) and convert to integer
|
||||
* This is used for T_req calculations and response group membership
|
||||
*/
|
||||
export function combinedHash(
|
||||
participantId: ParticipantId,
|
||||
messageId: MessageId
|
||||
): bigint {
|
||||
const combined = `${participantId}${messageId}`;
|
||||
return hashToInteger(combined);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert ParticipantId to numeric representation for XOR operations
|
||||
* TODO: Not per spec, further review needed
|
||||
* The spec assumes participant IDs support XOR natively, but we're using
|
||||
* SHA256 hash to ensure consistent numeric representation for string IDs
|
||||
*/
|
||||
export function participantIdToNumeric(participantId: ParticipantId): bigint {
|
||||
return hashToInteger(participantId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate XOR distance between two participant IDs
|
||||
* Used for T_resp calculations where distance affects response timing
|
||||
*/
|
||||
export function calculateXorDistance(
|
||||
participantId1: ParticipantId,
|
||||
participantId2: ParticipantId
|
||||
): bigint {
|
||||
const numeric1 = participantIdToNumeric(participantId1);
|
||||
const numeric2 = participantIdToNumeric(participantId2);
|
||||
return numeric1 ^ numeric2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to convert bigint to number for timing calculations
|
||||
* Ensures the result fits in JavaScript's number range
|
||||
*/
|
||||
export function bigintToNumber(value: bigint): number {
|
||||
// For timing calculations, we modulo by MAX_SAFE_INTEGER to ensure it fits
|
||||
const maxSafe = BigInt(Number.MAX_SAFE_INTEGER);
|
||||
return Number(value % maxSafe);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate hash for a single string (used for message_id in T_resp)
|
||||
*/
|
||||
export function hashString(input: string): bigint {
|
||||
return hashToInteger(input);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a hash result to hex string for debugging/logging
|
||||
*/
|
||||
export function hashToHex(input: string): string {
|
||||
const hashBytes = sha256(new TextEncoder().encode(input));
|
||||
return bytesToHex(hashBytes);
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user