From 0334f721370c99bde4b4a10e8d0e6d152784624d Mon Sep 17 00:00:00 2001 From: jm-clius Date: Fri, 10 Oct 2025 12:22:27 +0100 Subject: [PATCH] feat: add SDS-R events --- packages/sds/src/message_channel/events.ts | 29 ++++++++++++++-- .../src/message_channel/message_channel.ts | 34 ++++++++++++++++++- .../sds/src/message_channel/repair/repair.ts | 27 +++++++++++++-- 3 files changed, 85 insertions(+), 5 deletions(-) diff --git a/packages/sds/src/message_channel/events.ts b/packages/sds/src/message_channel/events.ts index 75318df210..1fcd21c38a 100644 --- a/packages/sds/src/message_channel/events.ts +++ b/packages/sds/src/message_channel/events.ts @@ -10,7 +10,13 @@ export enum MessageChannelEvent { OutSyncSent = "sds:out:sync-sent", InSyncReceived = "sds:in:sync-received", InMessageLost = "sds:in:message-irretrievably-lost", - ErrorTask = "sds:error-task" + ErrorTask = "sds:error-task", + // SDS-R Repair Events + RepairRequestQueued = "sds:repair:request-queued", + RepairRequestSent = "sds:repair:request-sent", + RepairRequestReceived = "sds:repair:request-received", + RepairResponseQueued = "sds:repair:response-queued", + RepairResponseSent = "sds:repair:response-sent" } export type MessageChannelEvents = { @@ -26,5 +32,24 @@ export type MessageChannelEvents = { [MessageChannelEvent.InMessageLost]: CustomEvent; [MessageChannelEvent.OutSyncSent]: CustomEvent; [MessageChannelEvent.InSyncReceived]: CustomEvent; - [MessageChannelEvent.ErrorTask]: CustomEvent; + [MessageChannelEvent.ErrorTask]: CustomEvent; + [MessageChannelEvent.RepairRequestQueued]: CustomEvent<{ + messageId: MessageId; + tReq: number; + }>; + [MessageChannelEvent.RepairRequestSent]: CustomEvent<{ + messageIds: MessageId[]; + carrierMessageId: MessageId; + }>; + [MessageChannelEvent.RepairRequestReceived]: CustomEvent<{ + messageIds: MessageId[]; + fromSenderId?: string; + }>; + [MessageChannelEvent.RepairResponseQueued]: CustomEvent<{ + messageId: MessageId; + tResp: number; + }>; + [MessageChannelEvent.RepairResponseSent]: CustomEvent<{ + messageId: MessageId; + }>; }; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index f6d9fb273a..fd374fd95f 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -115,7 +115,13 @@ export class MessageChannel extends TypedEventEmitter { options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD; this.timeReceived = new Map(); this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs; - this.repairManager = new RepairManager(senderId, options.repairConfig); + this.repairManager = new RepairManager( + senderId, + options.repairConfig, + (event: string, detail: unknown) => { + this.safeSendEvent(event as MessageChannelEvent, { detail }); + } + ); } public static getMessageId(payload: Uint8Array): MessageId { @@ -385,6 +391,13 @@ export class MessageChannel extends TypedEventEmitter { "repair message rebroadcast", message.messageId ); + + // Emit RepairResponseSent event + this.safeSendEvent(MessageChannelEvent.RepairResponseSent, { + detail: { + messageId: message.messageId + } + }); } catch (error) { log.error("Failed to rebroadcast repair message:", error); } @@ -446,6 +459,17 @@ export class MessageChannel extends TypedEventEmitter { this.safeSendEvent(MessageChannelEvent.OutSyncSent, { detail: message }); + + // Emit RepairRequestSent event if repair requests were included + if (repairRequests.length > 0) { + this.safeSendEvent(MessageChannelEvent.RepairRequestSent, { + detail: { + messageIds: repairRequests.map((r) => r.messageId), + carrierMessageId: message.messageId + } + }); + } + return true; } catch (error) { log.error( @@ -517,6 +541,14 @@ export class MessageChannel extends TypedEventEmitter { // SDS-R: Process incoming repair requests if (message.repairRequest && message.repairRequest.length > 0) { + // Emit RepairRequestReceived event + this.safeSendEvent(MessageChannelEvent.RepairRequestReceived, { + detail: { + messageIds: message.repairRequest.map((r) => r.messageId), + fromSenderId: message.senderId + } + }); + this.repairManager.processIncomingRepairRequests( message.repairRequest, this.localHistory diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index a8f5e1698e..90c854c49b 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -15,6 +15,11 @@ import { const log = new Logger("sds:repair:manager"); +/** + * Event emitter callback for repair events + */ +export type RepairEventEmitter = (event: string, detail: unknown) => void; + /** * Configuration for SDS-R repair protocol */ @@ -51,10 +56,16 @@ export class RepairManager { private readonly config: Required; private readonly outgoingBuffer: OutgoingRepairBuffer; private readonly incomingBuffer: IncomingRepairBuffer; + private readonly eventEmitter?: RepairEventEmitter; - constructor(participantId: ParticipantId, config: RepairConfig = {}) { + public constructor( + participantId: ParticipantId, + config: RepairConfig = {}, + eventEmitter?: RepairEventEmitter + ) { this.participantId = participantId; this.config = { ...DEFAULT_REPAIR_CONFIG, ...config }; + this.eventEmitter = eventEmitter; this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize); this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize); @@ -105,7 +116,7 @@ export class RepairManager { } const numGroups = BigInt(this.config.numResponseGroups); - if (numGroups <= 1n) { + if (numGroups <= BigInt(1)) { // Single group, everyone is in it return true; } @@ -139,6 +150,12 @@ export class RepairManager { log.info( `Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}` ); + + // Emit event + this.eventEmitter?.("RepairRequestQueued", { + messageId: entry.messageId, + tReq + }); } } @@ -223,6 +240,12 @@ export class RepairManager { log.info( `Will respond to repair request for ${request.messageId} at T_resp=${tResp}` ); + + // Emit event + this.eventEmitter?.("RepairResponseQueued", { + messageId: request.messageId, + tResp + }); } }