feat: add SDS-R events

This commit is contained in:
jm-clius 2025-10-10 12:22:27 +01:00
parent c2a1f4ca82
commit 0334f72137
No known key found for this signature in database
GPG Key ID: 5FCD9D5211B952DA
3 changed files with 85 additions and 5 deletions

View File

@ -10,7 +10,13 @@ export enum MessageChannelEvent {
OutSyncSent = "sds:out:sync-sent",
InSyncReceived = "sds:in:sync-received",
InMessageLost = "sds:in:message-irretrievably-lost",
ErrorTask = "sds:error-task"
ErrorTask = "sds:error-task",
// SDS-R Repair Events
RepairRequestQueued = "sds:repair:request-queued",
RepairRequestSent = "sds:repair:request-sent",
RepairRequestReceived = "sds:repair:request-received",
RepairResponseQueued = "sds:repair:response-queued",
RepairResponseSent = "sds:repair:response-sent"
}
export type MessageChannelEvents = {
@ -26,5 +32,24 @@ export type MessageChannelEvents = {
[MessageChannelEvent.InMessageLost]: CustomEvent<HistoryEntry[]>;
[MessageChannelEvent.OutSyncSent]: CustomEvent<Message>;
[MessageChannelEvent.InSyncReceived]: CustomEvent<Message>;
[MessageChannelEvent.ErrorTask]: CustomEvent<any>;
[MessageChannelEvent.ErrorTask]: CustomEvent<unknown>;
[MessageChannelEvent.RepairRequestQueued]: CustomEvent<{
messageId: MessageId;
tReq: number;
}>;
[MessageChannelEvent.RepairRequestSent]: CustomEvent<{
messageIds: MessageId[];
carrierMessageId: MessageId;
}>;
[MessageChannelEvent.RepairRequestReceived]: CustomEvent<{
messageIds: MessageId[];
fromSenderId?: string;
}>;
[MessageChannelEvent.RepairResponseQueued]: CustomEvent<{
messageId: MessageId;
tResp: number;
}>;
[MessageChannelEvent.RepairResponseSent]: CustomEvent<{
messageId: MessageId;
}>;
};

View File

@ -115,7 +115,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
this.timeReceived = new Map();
this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs;
this.repairManager = new RepairManager(senderId, options.repairConfig);
this.repairManager = new RepairManager(
senderId,
options.repairConfig,
(event: string, detail: unknown) => {
this.safeSendEvent(event as MessageChannelEvent, { detail });
}
);
}
public static getMessageId(payload: Uint8Array): MessageId {
@ -385,6 +391,13 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
"repair message rebroadcast",
message.messageId
);
// Emit RepairResponseSent event
this.safeSendEvent(MessageChannelEvent.RepairResponseSent, {
detail: {
messageId: message.messageId
}
});
} catch (error) {
log.error("Failed to rebroadcast repair message:", error);
}
@ -446,6 +459,17 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.safeSendEvent(MessageChannelEvent.OutSyncSent, {
detail: message
});
// Emit RepairRequestSent event if repair requests were included
if (repairRequests.length > 0) {
this.safeSendEvent(MessageChannelEvent.RepairRequestSent, {
detail: {
messageIds: repairRequests.map((r) => r.messageId),
carrierMessageId: message.messageId
}
});
}
return true;
} catch (error) {
log.error(
@ -517,6 +541,14 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
// SDS-R: Process incoming repair requests
if (message.repairRequest && message.repairRequest.length > 0) {
// Emit RepairRequestReceived event
this.safeSendEvent(MessageChannelEvent.RepairRequestReceived, {
detail: {
messageIds: message.repairRequest.map((r) => r.messageId),
fromSenderId: message.senderId
}
});
this.repairManager.processIncomingRepairRequests(
message.repairRequest,
this.localHistory

View File

@ -15,6 +15,11 @@ import {
const log = new Logger("sds:repair:manager");
/**
* Event emitter callback for repair events
*/
export type RepairEventEmitter = (event: string, detail: unknown) => void;
/**
* Configuration for SDS-R repair protocol
*/
@ -51,10 +56,16 @@ export class RepairManager {
private readonly config: Required<RepairConfig>;
private readonly outgoingBuffer: OutgoingRepairBuffer;
private readonly incomingBuffer: IncomingRepairBuffer;
private readonly eventEmitter?: RepairEventEmitter;
constructor(participantId: ParticipantId, config: RepairConfig = {}) {
public constructor(
participantId: ParticipantId,
config: RepairConfig = {},
eventEmitter?: RepairEventEmitter
) {
this.participantId = participantId;
this.config = { ...DEFAULT_REPAIR_CONFIG, ...config };
this.eventEmitter = eventEmitter;
this.outgoingBuffer = new OutgoingRepairBuffer(this.config.bufferSize);
this.incomingBuffer = new IncomingRepairBuffer(this.config.bufferSize);
@ -105,7 +116,7 @@ export class RepairManager {
}
const numGroups = BigInt(this.config.numResponseGroups);
if (numGroups <= 1n) {
if (numGroups <= BigInt(1)) {
// Single group, everyone is in it
return true;
}
@ -139,6 +150,12 @@ export class RepairManager {
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
}
}
@ -223,6 +240,12 @@ export class RepairManager {
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
}
}