mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-07 00:03:07 +00:00
fix: switch to conditionally constructed from conditionally executed
This commit is contained in:
parent
8cf03d3eb4
commit
3288d288dc
@ -53,7 +53,12 @@ export interface MessageChannelOptions {
|
||||
*/
|
||||
possibleAcksThreshold?: number;
|
||||
/**
|
||||
* SDS-R repair configuration. If not provided, repair is enabled with default settings.
|
||||
* Whether to enable SDS-R repair protocol.
|
||||
* @default true
|
||||
*/
|
||||
enableRepair?: boolean;
|
||||
/**
|
||||
* SDS-R repair configuration. Only used if enableRepair is true.
|
||||
*/
|
||||
repairConfig?: RepairConfig;
|
||||
}
|
||||
@ -76,7 +81,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
private readonly causalHistorySize: number;
|
||||
private readonly possibleAcksThreshold: number;
|
||||
private readonly timeoutForLostMessagesMs?: number;
|
||||
private readonly repairManager: RepairManager;
|
||||
private readonly repairManager?: RepairManager;
|
||||
|
||||
private tasks: Task[] = [];
|
||||
private handlers: Handlers = {
|
||||
@ -120,13 +125,17 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
options.possibleAcksThreshold ?? DEFAULT_POSSIBLE_ACKS_THRESHOLD;
|
||||
this.timeReceived = new Map();
|
||||
this.timeoutForLostMessagesMs = options.timeoutForLostMessagesMs;
|
||||
this.repairManager = new RepairManager(
|
||||
senderId,
|
||||
options.repairConfig,
|
||||
(event: string, detail: unknown) => {
|
||||
this.safeSendEvent(event as MessageChannelEvent, { detail });
|
||||
}
|
||||
);
|
||||
|
||||
// Only construct RepairManager if repair is enabled (default: true)
|
||||
if (options.enableRepair ?? true) {
|
||||
this.repairManager = new RepairManager(
|
||||
senderId,
|
||||
options.repairConfig,
|
||||
(event: string, detail: unknown) => {
|
||||
this.safeSendEvent(event as MessageChannelEvent, { detail });
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static getMessageId(payload: Uint8Array): MessageId {
|
||||
@ -381,9 +390,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
public async sweepRepairIncomingBuffer(
|
||||
callback?: (message: Message) => Promise<boolean>
|
||||
): Promise<Message[]> {
|
||||
const repairsToSend = this.repairManager.sweepIncomingBuffer(
|
||||
this.localHistory
|
||||
);
|
||||
const repairsToSend =
|
||||
this.repairManager?.sweepIncomingBuffer(this.localHistory) ?? [];
|
||||
|
||||
if (callback) {
|
||||
for (const message of repairsToSend) {
|
||||
@ -426,9 +434,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
this.lamportTimestamp = lamportTimestampIncrement(this.lamportTimestamp);
|
||||
|
||||
// Get repair requests to include in sync message (SDS-R)
|
||||
const repairRequests = this.repairManager.getRepairRequests(
|
||||
MAX_REPAIR_REQUESTS_PER_MESSAGE
|
||||
);
|
||||
const repairRequests =
|
||||
this.repairManager?.getRepairRequests(MAX_REPAIR_REQUESTS_PER_MESSAGE) ??
|
||||
[];
|
||||
|
||||
const message = new SyncMessage(
|
||||
// does not need to be secure randomness
|
||||
@ -542,7 +550,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
}
|
||||
|
||||
// SDS-R: Handle received message in repair manager
|
||||
this.repairManager.markMessageReceived(message.messageId);
|
||||
this.repairManager?.markMessageReceived(message.messageId);
|
||||
|
||||
// SDS-R: Process incoming repair requests
|
||||
if (message.repairRequest && message.repairRequest.length > 0) {
|
||||
@ -554,7 +562,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
}
|
||||
});
|
||||
|
||||
this.repairManager.processIncomingRepairRequests(
|
||||
this.repairManager?.processIncomingRepairRequests(
|
||||
message.repairRequest,
|
||||
this.localHistory
|
||||
);
|
||||
@ -582,7 +590,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
);
|
||||
|
||||
// SDS-R: Track missing dependencies in repair manager
|
||||
this.repairManager.markDependenciesMissing(missingDependencies);
|
||||
this.repairManager?.markDependenciesMissing(missingDependencies);
|
||||
|
||||
this.safeSendEvent(MessageChannelEvent.InMessageMissing, {
|
||||
detail: Array.from(missingDependencies)
|
||||
@ -648,9 +656,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
log.info(this.senderId, "sending new message", messageId);
|
||||
|
||||
// Get repair requests to include in the message (SDS-R)
|
||||
const repairRequests = this.repairManager.getRepairRequests(
|
||||
MAX_REPAIR_REQUESTS_PER_MESSAGE
|
||||
);
|
||||
const repairRequests =
|
||||
this.repairManager?.getRepairRequests(
|
||||
MAX_REPAIR_REQUESTS_PER_MESSAGE
|
||||
) ?? [];
|
||||
|
||||
message = new ContentMessage(
|
||||
messageId,
|
||||
|
||||
@ -37,8 +37,6 @@ export interface RepairConfig {
|
||||
numResponseGroups?: number;
|
||||
/** Maximum buffer size for repair requests */
|
||||
bufferSize?: number;
|
||||
/** Whether repair is enabled */
|
||||
enabled?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -48,8 +46,7 @@ export const DEFAULT_REPAIR_CONFIG: Required<RepairConfig> = {
|
||||
tMin: 30000, // 30 seconds
|
||||
tMax: 120000, // 120 seconds
|
||||
numResponseGroups: 1, // Recommendation is 1 group per PARTICIPANTS_PER_RESPONSE_GROUP participants
|
||||
bufferSize: 1000,
|
||||
enabled: true
|
||||
bufferSize: 1000
|
||||
};
|
||||
|
||||
/**
|
||||
@ -141,10 +138,6 @@ export class RepairManager {
|
||||
missingEntries: HistoryEntry[],
|
||||
currentTime = Date.now()
|
||||
): void {
|
||||
if (!this.config.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const entry of missingEntries) {
|
||||
// Calculate when to request this repair
|
||||
const tReq = this.calculateTReq(entry.messageId, currentTime);
|
||||
@ -198,10 +191,6 @@ export class RepairManager {
|
||||
maxRequests = 3,
|
||||
currentTime = Date.now()
|
||||
): HistoryEntry[] {
|
||||
if (!this.config.enabled) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return this.outgoingBuffer.getEligible(currentTime, maxRequests);
|
||||
}
|
||||
|
||||
@ -214,10 +203,6 @@ export class RepairManager {
|
||||
localHistory: ILocalHistory,
|
||||
currentTime = Date.now()
|
||||
): void {
|
||||
if (!this.config.enabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const request of requests) {
|
||||
// Remove from our own outgoing buffer (someone else is requesting it)
|
||||
this.outgoingBuffer.remove(request.messageId);
|
||||
@ -278,10 +263,6 @@ export class RepairManager {
|
||||
maxRequests = 3,
|
||||
currentTime = Date.now()
|
||||
): HistoryEntry[] {
|
||||
if (!this.config.enabled) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return this.getRepairRequests(maxRequests, currentTime);
|
||||
}
|
||||
|
||||
@ -293,10 +274,6 @@ export class RepairManager {
|
||||
localHistory: ILocalHistory,
|
||||
currentTime = Date.now()
|
||||
): Message[] {
|
||||
if (!this.config.enabled) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const ready = this.incomingBuffer.getReady(currentTime);
|
||||
const messages: Message[] = [];
|
||||
|
||||
@ -321,13 +298,6 @@ export class RepairManager {
|
||||
this.incomingBuffer.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if repair is enabled
|
||||
*/
|
||||
public get isEnabled(): boolean {
|
||||
return this.config.enabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update number of response groups (e.g., when participants change)
|
||||
*/
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user