From fd357f914240e0ff3f4df9718047a2a1846736b9 Mon Sep 17 00:00:00 2001 From: jm-clius Date: Fri, 10 Oct 2025 19:41:28 +0100 Subject: [PATCH] feat: integrate sds-r within reliable channels SDK --- .../src/reliable_channel/reliable_channel.ts | 145 ++++++++++++++++-- .../src/message_channel/message_channel.ts | 25 +++ .../sds/src/message_channel/repair/repair.ts | 68 ++++++++ 3 files changed, 229 insertions(+), 9 deletions(-) diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..03418a72ea 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -36,10 +36,13 @@ import { RetryManager } from "./retry_manager.js"; const log = new Logger("sdk:reliable-channel"); const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds +const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000; // 10 seconds when repairs pending const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds const DEFAULT_MAX_RETRY_ATTEMPTS = 10; const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000; +const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000; +const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000; // 2 minutes const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [ LightPushError.ENCODE_FAILED, @@ -78,6 +81,7 @@ export type ReliableChannelOptions = MessageChannelOptions & { /** * How often store queries are done to retrieve missing messages. + * Only applies when retrievalStrategy includes Store ('both' or 'store-only'). * * @default 10,000 (10 seconds) */ @@ -111,6 +115,25 @@ export type ReliableChannelOptions = MessageChannelOptions & { * @default 1000 (1 second) */ processTaskMinElapseMs?: number; + + /** + * Strategy for retrieving missing messages. + * - 'both': Use SDS-R peer repair and Store queries (default) + * - 'sds-r-only': Only use SDS-R peer repair + * - 'store-only': Only use Store queries (legacy behavior) + * - 'none': No automatic retrieval + * + * @default 'both' + */ + retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none"; + + /** + * How long to wait for SDS-R repair before falling back to Store. + * Only applies when retrievalStrategy is 'both'. + * + * @default 120,000 (2 minutes - matches SDS-R T_max) + */ + sdsrFallbackTimeoutMs?: number; }; /** @@ -145,11 +168,22 @@ export class ReliableChannel< private syncTimeout: ReturnType | undefined; private sweepInBufInterval: ReturnType | undefined; private readonly sweepInBufIntervalMs: number; + private sweepRepairInterval: ReturnType | undefined; private processTaskTimeout: ReturnType | undefined; private readonly retryManager: RetryManager | undefined; private readonly missingMessageRetriever?: MissingMessageRetriever; private readonly queryOnConnect?: QueryOnConnect; private readonly processTaskMinElapseMs: number; + private readonly retrievalStrategy: + | "both" + | "sds-r-only" + | "store-only" + | "none"; + private readonly sdsrFallbackTimeoutMs: number; + private readonly missingMessageTimeouts: Map< + string, + ReturnType + >; private _started: boolean; private constructor( @@ -214,7 +248,13 @@ export class ReliableChannel< this.processTaskMinElapseMs = options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS; - if (this._retrieve) { + this.retrievalStrategy = options?.retrievalStrategy ?? "both"; + this.sdsrFallbackTimeoutMs = + options?.sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS; + this.missingMessageTimeouts = new Map(); + + // Only enable Store retrieval based on strategy + if (this._retrieve && this.shouldUseStore()) { this.missingMessageRetriever = new MissingMessageRetriever( this.decoder, options?.retrieveFrequencyMs, @@ -418,6 +458,13 @@ export class ReliableChannel< // missing messages or the status of previous outgoing messages this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint); + // Cancel Store fallback timeout if message was retrieved + const timeout = this.missingMessageTimeouts.get(sdsMessage.messageId); + if (timeout) { + clearTimeout(timeout); + this.missingMessageTimeouts.delete(sdsMessage.messageId); + } + this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId); if (sdsMessage.content && sdsMessage.content.length > 0) { @@ -478,6 +525,12 @@ export class ReliableChannel< this.setupEventListeners(); this.restartSync(); this.startSweepIncomingBufferLoop(); + + // Only start repair sweep if SDS-R is enabled + if (this.shouldUseSdsR()) { + this.startRepairSweepLoop(); + } + if (this._retrieve) { this.missingMessageRetriever?.start(); this.queryOnConnect?.start(); @@ -490,6 +543,8 @@ export class ReliableChannel< this._started = false; this.stopSync(); this.stopSweepIncomingBufferLoop(); + this.stopRepairSweepLoop(); + this.clearMissingMessageTimeouts(); this.missingMessageRetriever?.stop(); this.queryOnConnect?.stop(); // TODO unsubscribe @@ -512,18 +567,67 @@ export class ReliableChannel< if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval); } + private startRepairSweepLoop(): void { + this.stopRepairSweepLoop(); + this.sweepRepairInterval = setInterval(() => { + void this.messageChannel + .sweepRepairIncomingBuffer(async (message) => { + // Rebroadcast the repair message + const wakuMessage = { payload: message.encode() }; + const result = await this._send(this.encoder, wakuMessage); + return result.failures.length === 0; + }) + .catch((err) => { + log.error("error encountered when sweeping repair buffer", err); + }); + }, DEFAULT_SWEEP_REPAIR_INTERVAL_MS); + } + + private stopRepairSweepLoop(): void { + if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval); + } + + private clearMissingMessageTimeouts(): void { + for (const timeout of this.missingMessageTimeouts.values()) { + clearTimeout(timeout); + } + this.missingMessageTimeouts.clear(); + } + + private shouldUseStore(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "store-only" + ); + } + + private shouldUseSdsR(): boolean { + return ( + this.retrievalStrategy === "both" || + this.retrievalStrategy === "sds-r-only" + ); + } + private restartSync(multiplier: number = 1): void { if (this.syncTimeout) { clearTimeout(this.syncTimeout); } if (this.syncMinIntervalMs) { - const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier; + // Adaptive sync: use shorter interval when repairs are pending + const hasPendingRepairs = + this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests(); + const baseInterval = hasPendingRepairs + ? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS + : this.syncMinIntervalMs; + + const timeoutMs = this.random() * baseInterval * multiplier; this.syncTimeout = setTimeout(() => { void this.sendSyncMessage(); // Always restart a sync, no matter whether the message was sent. - // Set a multiplier so we wait a bit longer to not hog the conversation - void this.restartSync(2); + // Use smaller multiplier when repairs pending to send more frequently + const nextMultiplier = hasPendingRepairs ? 1.2 : 2; + void this.restartSync(nextMultiplier); }, timeoutMs); } } @@ -669,12 +773,35 @@ export class ReliableChannel< MessageChannelEvent.InMessageMissing, (event) => { for (const { messageId, retrievalHint } of event.detail) { - if (retrievalHint && this.missingMessageRetriever) { - this.missingMessageRetriever.addMissingMessage( - messageId, - retrievalHint - ); + // Coordinated retrieval strategy + if (this.retrievalStrategy === "both") { + // SDS-R will attempt first, schedule Store fallback + // Note: missingMessageRetriever only exists if Store protocol is available + if (retrievalHint && this.missingMessageRetriever) { + const timeout = setTimeout(() => { + // Still missing after SDS-R timeout, try Store + log.info( + `Message ${messageId} not retrieved via SDS-R, falling back to Store` + ); + this.missingMessageRetriever?.addMissingMessage( + messageId, + retrievalHint + ); + this.missingMessageTimeouts.delete(messageId); + }, this.sdsrFallbackTimeoutMs); + + this.missingMessageTimeouts.set(messageId, timeout); + } + } else if (this.retrievalStrategy === "store-only") { + // Immediate Store retrieval + if (retrievalHint && this.missingMessageRetriever) { + this.missingMessageRetriever.addMissingMessage( + messageId, + retrievalHint + ); + } } + // For 'sds-r-only' and 'none', SDS-R handles it or nothing happens } } ); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index f72ba52579..0ee8159e0c 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -142,6 +142,31 @@ export class MessageChannel extends TypedEventEmitter { return bytesToHex(sha256(payload)); } + /** + * Check if there are pending repair requests that need to be sent. + * Useful for adaptive sync intervals - increase frequency when repairs pending. + */ + public hasPendingRepairRequests(currentTime = Date.now()): boolean { + if (!this.repairManager.isEnabled) { + return false; + } + + const nextRequestTime = this.repairManager.getNextRequestTime(); + return nextRequestTime !== undefined && nextRequestTime <= currentTime; + } + + /** + * Get repair statistics for monitoring/debugging. + */ + public getRepairStats(): { + pendingRequests: number; + pendingResponses: number; + nextRequestTime?: number; + nextResponseTime?: number; + } { + return this.repairManager.getStats(); + } + /** * Processes all queued tasks sequentially to ensure proper message ordering. * diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 4207483165..5065e0323e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -328,4 +328,72 @@ export class RepairManager { `Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants` ); } + + /** + * Check if there are any pending outgoing repair requests + */ + public hasPendingRequests(): boolean { + return this.outgoingBuffer.size > 0; + } + + /** + * Get count of pending repair requests + */ + public getPendingRequestCount(): number { + return this.outgoingBuffer.size; + } + + /** + * Get count of pending repair responses + */ + public getPendingResponseCount(): number { + return this.incomingBuffer.size; + } + + /** + * Get next scheduled repair request time (earliest T_req) + */ + public getNextRequestTime(): number | undefined { + const items = this.outgoingBuffer.getItems(); + return items.length > 0 ? items[0].tReq : undefined; + } + + /** + * Get next scheduled repair response time (earliest T_resp) + */ + public getNextResponseTime(): number | undefined { + const items = this.incomingBuffer.getItems(); + return items.length > 0 ? items[0].tResp : undefined; + } + + /** + * Check if a specific message has a pending repair request + */ + public isPendingRequest(messageId: string): boolean { + return this.outgoingBuffer.has(messageId); + } + + /** + * Check if we have a pending response for a message + */ + public isPendingResponse(messageId: string): boolean { + return this.incomingBuffer.has(messageId); + } + + /** + * Get stats for monitoring/debugging + */ + public getStats(): { + pendingRequests: number; + pendingResponses: number; + nextRequestTime?: number; + nextResponseTime?: number; + } { + return { + pendingRequests: this.getPendingRequestCount(), + pendingResponses: this.getPendingResponseCount(), + nextRequestTime: this.getNextRequestTime(), + nextResponseTime: this.getNextResponseTime() + }; + } }