feat: incorporate sds-r into reliable channels (#2701)

* wip

* feat: integrate sds-r with message channels

* fix: fix implementation guide, remove unrelated claude file

* feat: integrate sds-r within reliable channels SDK

* fix: fix import, export

* fix: fix build errors, simplify parallel operation

* fix: sigh. this file has 9 lives

* fix: simplify more

* fix: disable repair if not part of retrieval strategy

* fix: remove dead code, simplify

* fix: improve repair loop

Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>

* chore: make retrievalStrategy mandatory argument

* chore: add repair multiplier, safer checks

---------

Co-authored-by: fryorcraken <commits@fryorcraken.xyz>
Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
This commit is contained in:
Hanno Cornelius 2025-11-21 15:03:48 +00:00 committed by GitHub
parent e5f51d7df1
commit 788f7e62c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 110 additions and 48 deletions

View File

@ -19,8 +19,8 @@ import {
MessageChannelEvent,
MessageChannelEvents,
type MessageChannelOptions,
type ParticipantId,
Message as SdsMessage,
type SenderId,
SyncMessage
} from "@waku/sds";
import { Logger } from "@waku/utils";
@ -39,9 +39,11 @@ import { ISyncStatusEvents, SyncStatus } from "./sync_status.js";
const log = new Logger("sdk:reliable-channel");
const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
const SYNC_INTERVAL_REPAIR_MULTIPLIER = 0.3; // Reduce sync interval when repairs pending
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
@ -51,6 +53,15 @@ const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
LightPushError.RLN_PROOF_GENERATION
];
/**
* Strategy for retrieving missing messages.
* - 'both': Use SDS-R peer repair and Store queries in parallel (default)
* - 'sds-r-only': Only use SDS-R peer repair
* - 'store-only': Only use Store queries (legacy behavior)
* - 'none': No automatic retrieval
*/
export type RetrievalStrategy = "both" | "sds-r-only" | "store-only" | "none";
export type ReliableChannelOptions = MessageChannelOptions & {
/**
* The minimum interval between 2 sync messages in the channel.
@ -81,6 +92,7 @@ export type ReliableChannelOptions = MessageChannelOptions & {
/**
* How often store queries are done to retrieve missing messages.
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
*
* @default 10,000 (10 seconds)
*/
@ -114,6 +126,13 @@ export type ReliableChannelOptions = MessageChannelOptions & {
* @default 1000 (1 second)
*/
processTaskMinElapseMs?: number;
/**
* Strategy for retrieving missing messages.
*
* @default 'both'
*/
retrievalStrategy?: RetrievalStrategy;
};
/**
@ -152,6 +171,7 @@ export class ReliableChannel<
private syncRandomTimeout: RandomTimeout;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
private readonly sweepInBufIntervalMs: number;
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
private readonly retryManager: RetryManager | undefined;
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
@ -165,6 +185,7 @@ export class ReliableChannel<
public messageChannel: MessageChannel,
private encoder: IEncoder,
private decoder: IDecoder<T>,
private retrievalStrategy: RetrievalStrategy,
options?: ReliableChannelOptions
) {
super();
@ -226,7 +247,8 @@ export class ReliableChannel<
this.processTaskMinElapseMs =
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
if (this._retrieve) {
// Only enable Store retrieval based on strategy
if (this._retrieve && this.shouldUseStore()) {
this.missingMessageRetriever = new MissingMessageRetriever(
this.decoder,
options?.retrieveFrequencyMs,
@ -290,17 +312,26 @@ export class ReliableChannel<
public static async create<T extends IDecodedMessage>(
node: IWaku,
channelId: ChannelId,
senderId: SenderId,
senderId: ParticipantId,
encoder: IEncoder,
decoder: IDecoder<T>,
options?: ReliableChannelOptions
): Promise<ReliableChannel<T>> {
const sdsMessageChannel = new MessageChannel(channelId, senderId, options);
// Enable SDS-R repair only if retrieval strategy uses it
const retrievalStrategy = options?.retrievalStrategy ?? "both";
const enableRepair =
retrievalStrategy === "both" || retrievalStrategy === "sds-r-only";
const sdsMessageChannel = new MessageChannel(channelId, senderId, {
...options,
enableRepair
});
const messageChannel = new ReliableChannel(
node,
sdsMessageChannel,
encoder,
decoder,
retrievalStrategy,
options
);
@ -455,6 +486,7 @@ export class ReliableChannel<
// missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
// Remove from Store retriever if message was retrieved
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
if (sdsMessage.content && sdsMessage.content.length > 0) {
@ -528,6 +560,9 @@ export class ReliableChannel<
this.setupEventListeners();
this.restartSync();
this.startSweepIncomingBufferLoop();
this.startRepairSweepLoop();
if (this._retrieve) {
this.missingMessageRetriever?.start();
this.queryOnConnect?.start();
@ -544,6 +579,7 @@ export class ReliableChannel<
this.removeAllEventListeners();
this.stopSync();
this.stopSweepIncomingBufferLoop();
this.stopRepairSweepLoop();
this.clearProcessTasks();
if (this.activePendingProcessTask) {
@ -582,8 +618,55 @@ export class ReliableChannel<
}
}
private startRepairSweepLoop(): void {
if (!this.shouldUseSdsR()) {
return;
}
this.stopRepairSweepLoop();
this.sweepRepairInterval = setInterval(() => {
void this.messageChannel
.sweepRepairIncomingBuffer(async (message) => {
// Rebroadcast the repair message
const wakuMessage = { payload: message.encode() };
const result = await this._send(this.encoder, wakuMessage);
return result.failures.length === 0;
})
.catch((err) => {
log.error("error encountered when sweeping repair buffer", err);
});
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
}
private stopRepairSweepLoop(): void {
if (this.sweepRepairInterval) {
clearInterval(this.sweepRepairInterval);
this.sweepInBufInterval = undefined;
}
}
private shouldUseStore(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "store-only"
);
}
private shouldUseSdsR(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "sds-r-only"
);
}
private restartSync(multiplier: number = 1): void {
this.syncRandomTimeout.restart(multiplier);
// Adaptive sync: use shorter interval when repairs are pending
const hasPendingRepairs =
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
const effectiveMultiplier = hasPendingRepairs
? multiplier * SYNC_INTERVAL_REPAIR_MULTIPLIER
: multiplier;
this.syncRandomTimeout.restart(effectiveMultiplier);
}
private stopSync(): void {
@ -731,6 +814,8 @@ export class ReliableChannel<
);
for (const { messageId, retrievalHint } of event.detail) {
// Store retrieval (for 'both' and 'store-only' strategies)
// SDS-R repair happens automatically via RepairManager for 'both' and 'sds-r-only'
if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage(
messageId,

View File

@ -12,10 +12,8 @@ export enum MessageChannelEvent {
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task",
// SDS-R Repair Events
RepairRequestQueued = "sds:repair:request-queued",
RepairRequestSent = "sds:repair:request-sent",
RepairRequestReceived = "sds:repair:request-received",
RepairResponseQueued = "sds:repair:response-queued",
RepairResponseSent = "sds:repair:response-sent"
}
@ -33,10 +31,6 @@ export type MessageChannelEvents = {
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
messageId: MessageId;
tReq: number;
}>;
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
messageIds: MessageId[];
carrierMessageId: MessageId;
@ -45,10 +39,6 @@ export type MessageChannelEvents = {
messageIds: MessageId[];
fromSenderId?: ParticipantId;
}>;
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
messageId: MessageId;
tResp: number;
}>;
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
messageId: MessageId;
}>;

View File

@ -128,13 +128,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// Only construct RepairManager if repair is enabled (default: true)
if (options.enableRepair ?? true) {
this.repairManager = new RepairManager(
senderId,
options.repairConfig,
(event: string, detail: unknown) => {
this.safeSendEvent(event as MessageChannelEvent, { detail });
}
);
this.repairManager = new RepairManager(senderId, options.repairConfig);
}
}
@ -142,6 +136,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return bytesToHex(sha256(payload));
}
/**
* Check if there are pending repair requests that need to be sent.
* Useful for adaptive sync intervals - increase frequency when repairs pending.
*/
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
return this.repairManager?.hasRequestsReady(currentTime) ?? false;
}
/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*

View File

@ -20,11 +20,6 @@ const log = new Logger("sds:repair:manager");
*/
const PARTICIPANTS_PER_RESPONSE_GROUP = 128;
/**
* Event emitter callback for repair events
*/
export type RepairEventEmitter = (event: string, detail: unknown) => void;
/**
* Configuration for SDS-R repair protocol
*/
@ -58,16 +53,10 @@ export class RepairManager {
private readonly config: Required<RepairConfig>;
private readonly outgoingBuffer: OutgoingRepairBuffer;
private readonly incomingBuffer: IncomingRepairBuffer;
private readonly eventEmitter?: RepairEventEmitter;
public constructor(
participantId: ParticipantId,
config: RepairConfig = {},
eventEmitter?: RepairEventEmitter
) {
public constructor(participantId: ParticipantId, config: RepairConfig = {}) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.eventEmitter = eventEmitter;
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
@ -142,19 +131,13 @@ export class RepairManager {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);
// Add to outgoing buffer - only log and emit event if actually added
// Add to outgoing buffer - only log if actually added
const wasAdded = this.outgoingBuffer.add(entry, tReq);
if (wasAdded) {
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
}
}
}
@ -238,19 +221,13 @@ export class RepairManager {
currentTime
);
// Add to incoming buffer - only log and emit event if actually added
// Add to incoming buffer - only log if actually added
const wasAdded = this.incomingBuffer.add(request, tResp);
if (wasAdded) {
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
}
}
}
@ -328,4 +305,12 @@ export class RepairManager {
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
);
}
/**
* Check if there are repair requests ready to be sent
*/
public hasRequestsReady(currentTime = Date.now()): boolean {
const items = this.outgoingBuffer.getItems();
return items.length > 0 && items[0].tReq <= currentTime;
}
}