feat: integrate sds-r within reliable channels SDK

This commit is contained in:
jm-clius 2025-10-10 19:41:28 +01:00
parent 62aae5148f
commit fd357f9142
3 changed files with 229 additions and 9 deletions

View File

@ -36,10 +36,13 @@ import { RetryManager } from "./retry_manager.js";
const log = new Logger("sdk:reliable-channel");
const DEFAULT_SYNC_MIN_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS = 10 * 1000; // 10 seconds when repairs pending
const DEFAULT_RETRY_INTERVAL_MS = 30 * 1000; // 30 seconds
const DEFAULT_MAX_RETRY_ATTEMPTS = 10;
const DEFAULT_SWEEP_IN_BUF_INTERVAL_MS = 5 * 1000;
const DEFAULT_SWEEP_REPAIR_INTERVAL_MS = 10 * 1000; // 10 seconds
const DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS = 1000;
const DEFAULT_SDSR_FALLBACK_TIMEOUT_MS = 120 * 1000; // 2 minutes
const IRRECOVERABLE_SENDING_ERRORS: LightPushError[] = [
LightPushError.ENCODE_FAILED,
@ -78,6 +81,7 @@ export type ReliableChannelOptions = MessageChannelOptions & {
/**
* How often store queries are done to retrieve missing messages.
* Only applies when retrievalStrategy includes Store ('both' or 'store-only').
*
* @default 10,000 (10 seconds)
*/
@ -111,6 +115,25 @@ export type ReliableChannelOptions = MessageChannelOptions & {
* @default 1000 (1 second)
*/
processTaskMinElapseMs?: number;
/**
* Strategy for retrieving missing messages.
* - 'both': Use SDS-R peer repair and Store queries (default)
* - 'sds-r-only': Only use SDS-R peer repair
* - 'store-only': Only use Store queries (legacy behavior)
* - 'none': No automatic retrieval
*
* @default 'both'
*/
retrievalStrategy?: "both" | "sds-r-only" | "store-only" | "none";
/**
* How long to wait for SDS-R repair before falling back to Store.
* Only applies when retrievalStrategy is 'both'.
*
* @default 120,000 (2 minutes - matches SDS-R T_max)
*/
sdsrFallbackTimeoutMs?: number;
};
/**
@ -145,11 +168,22 @@ export class ReliableChannel<
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
private readonly sweepInBufIntervalMs: number;
private sweepRepairInterval: ReturnType<typeof setInterval> | undefined;
private processTaskTimeout: ReturnType<typeof setTimeout> | undefined;
private readonly retryManager: RetryManager | undefined;
private readonly missingMessageRetriever?: MissingMessageRetriever<T>;
private readonly queryOnConnect?: QueryOnConnect<T>;
private readonly processTaskMinElapseMs: number;
private readonly retrievalStrategy:
| "both"
| "sds-r-only"
| "store-only"
| "none";
private readonly sdsrFallbackTimeoutMs: number;
private readonly missingMessageTimeouts: Map<
string,
ReturnType<typeof setTimeout>
>;
private _started: boolean;
private constructor(
@ -214,7 +248,13 @@ export class ReliableChannel<
this.processTaskMinElapseMs =
options?.processTaskMinElapseMs ?? DEFAULT_PROCESS_TASK_MIN_ELAPSE_MS;
if (this._retrieve) {
this.retrievalStrategy = options?.retrievalStrategy ?? "both";
this.sdsrFallbackTimeoutMs =
options?.sdsrFallbackTimeoutMs ?? DEFAULT_SDSR_FALLBACK_TIMEOUT_MS;
this.missingMessageTimeouts = new Map();
// Only enable Store retrieval based on strategy
if (this._retrieve && this.shouldUseStore()) {
this.missingMessageRetriever = new MissingMessageRetriever(
this.decoder,
options?.retrieveFrequencyMs,
@ -418,6 +458,13 @@ export class ReliableChannel<
// missing messages or the status of previous outgoing messages
this.messageChannel.pushIncomingMessage(sdsMessage, retrievalHint);
// Cancel Store fallback timeout if message was retrieved
const timeout = this.missingMessageTimeouts.get(sdsMessage.messageId);
if (timeout) {
clearTimeout(timeout);
this.missingMessageTimeouts.delete(sdsMessage.messageId);
}
this.missingMessageRetriever?.removeMissingMessage(sdsMessage.messageId);
if (sdsMessage.content && sdsMessage.content.length > 0) {
@ -478,6 +525,12 @@ export class ReliableChannel<
this.setupEventListeners();
this.restartSync();
this.startSweepIncomingBufferLoop();
// Only start repair sweep if SDS-R is enabled
if (this.shouldUseSdsR()) {
this.startRepairSweepLoop();
}
if (this._retrieve) {
this.missingMessageRetriever?.start();
this.queryOnConnect?.start();
@ -490,6 +543,8 @@ export class ReliableChannel<
this._started = false;
this.stopSync();
this.stopSweepIncomingBufferLoop();
this.stopRepairSweepLoop();
this.clearMissingMessageTimeouts();
this.missingMessageRetriever?.stop();
this.queryOnConnect?.stop();
// TODO unsubscribe
@ -512,18 +567,67 @@ export class ReliableChannel<
if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval);
}
private startRepairSweepLoop(): void {
this.stopRepairSweepLoop();
this.sweepRepairInterval = setInterval(() => {
void this.messageChannel
.sweepRepairIncomingBuffer(async (message) => {
// Rebroadcast the repair message
const wakuMessage = { payload: message.encode() };
const result = await this._send(this.encoder, wakuMessage);
return result.failures.length === 0;
})
.catch((err) => {
log.error("error encountered when sweeping repair buffer", err);
});
}, DEFAULT_SWEEP_REPAIR_INTERVAL_MS);
}
private stopRepairSweepLoop(): void {
if (this.sweepRepairInterval) clearInterval(this.sweepRepairInterval);
}
private clearMissingMessageTimeouts(): void {
for (const timeout of this.missingMessageTimeouts.values()) {
clearTimeout(timeout);
}
this.missingMessageTimeouts.clear();
}
private shouldUseStore(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "store-only"
);
}
private shouldUseSdsR(): boolean {
return (
this.retrievalStrategy === "both" ||
this.retrievalStrategy === "sds-r-only"
);
}
private restartSync(multiplier: number = 1): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
}
if (this.syncMinIntervalMs) {
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
// Adaptive sync: use shorter interval when repairs are pending
const hasPendingRepairs =
this.shouldUseSdsR() && this.messageChannel.hasPendingRepairRequests();
const baseInterval = hasPendingRepairs
? DEFAULT_SYNC_MIN_INTERVAL_WITH_REPAIRS_MS
: this.syncMinIntervalMs;
const timeoutMs = this.random() * baseInterval * multiplier;
this.syncTimeout = setTimeout(() => {
void this.sendSyncMessage();
// Always restart a sync, no matter whether the message was sent.
// Set a multiplier so we wait a bit longer to not hog the conversation
void this.restartSync(2);
// Use smaller multiplier when repairs pending to send more frequently
const nextMultiplier = hasPendingRepairs ? 1.2 : 2;
void this.restartSync(nextMultiplier);
}, timeoutMs);
}
}
@ -669,12 +773,35 @@ export class ReliableChannel<
MessageChannelEvent.InMessageMissing,
(event) => {
for (const { messageId, retrievalHint } of event.detail) {
if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage(
messageId,
retrievalHint
);
// Coordinated retrieval strategy
if (this.retrievalStrategy === "both") {
// SDS-R will attempt first, schedule Store fallback
// Note: missingMessageRetriever only exists if Store protocol is available
if (retrievalHint && this.missingMessageRetriever) {
const timeout = setTimeout(() => {
// Still missing after SDS-R timeout, try Store
log.info(
`Message ${messageId} not retrieved via SDS-R, falling back to Store`
);
this.missingMessageRetriever?.addMissingMessage(
messageId,
retrievalHint
);
this.missingMessageTimeouts.delete(messageId);
}, this.sdsrFallbackTimeoutMs);
this.missingMessageTimeouts.set(messageId, timeout);
}
} else if (this.retrievalStrategy === "store-only") {
// Immediate Store retrieval
if (retrievalHint && this.missingMessageRetriever) {
this.missingMessageRetriever.addMissingMessage(
messageId,
retrievalHint
);
}
}
// For 'sds-r-only' and 'none', SDS-R handles it or nothing happens
}
}
);

View File

@ -142,6 +142,31 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
return bytesToHex(sha256(payload));
}
/**
* Check if there are pending repair requests that need to be sent.
* Useful for adaptive sync intervals - increase frequency when repairs pending.
*/
public hasPendingRepairRequests(currentTime = Date.now()): boolean {
if (!this.repairManager.isEnabled) {
return false;
}
const nextRequestTime = this.repairManager.getNextRequestTime();
return nextRequestTime !== undefined && nextRequestTime <= currentTime;
}
/**
* Get repair statistics for monitoring/debugging.
*/
public getRepairStats(): {
pendingRequests: number;
pendingResponses: number;
nextRequestTime?: number;
nextResponseTime?: number;
} {
return this.repairManager.getStats();
}
/**
* Processes all queued tasks sequentially to ensure proper message ordering.
*

View File

@ -328,4 +328,72 @@ export class RepairManager {
`Updated response groups to ${this.config.numResponseGroups} for ${numParticipants} participants`
);
}
/**
* Check if there are any pending outgoing repair requests
*/
public hasPendingRequests(): boolean {
return this.outgoingBuffer.size > 0;
}
/**
* Get count of pending repair requests
*/
public getPendingRequestCount(): number {
return this.outgoingBuffer.size;
}
/**
* Get count of pending repair responses
*/
public getPendingResponseCount(): number {
return this.incomingBuffer.size;
}
/**
* Get next scheduled repair request time (earliest T_req)
*/
public getNextRequestTime(): number | undefined {
const items = this.outgoingBuffer.getItems();
return items.length > 0 ? items[0].tReq : undefined;
}
/**
* Get next scheduled repair response time (earliest T_resp)
*/
public getNextResponseTime(): number | undefined {
const items = this.incomingBuffer.getItems();
return items.length > 0 ? items[0].tResp : undefined;
}
/**
* Check if a specific message has a pending repair request
*/
public isPendingRequest(messageId: string): boolean {
return this.outgoingBuffer.has(messageId);
}
/**
* Check if we have a pending response for a message
*/
public isPendingResponse(messageId: string): boolean {
return this.incomingBuffer.has(messageId);
}
/**
* Get stats for monitoring/debugging
*/
public getStats(): {
pendingRequests: number;
pendingResponses: number;
nextRequestTime?: number;
nextResponseTime?: number;
} {
return {
pendingRequests: this.getPendingRequestCount(),
pendingResponses: this.getPendingResponseCount(),
nextRequestTime: this.getNextRequestTime(),
nextResponseTime: this.getNextResponseTime()
};
}
}