fix: fixed buffer handling incoming and outgoing

This commit is contained in:
jm-clius 2025-10-23 12:09:31 +01:00
parent 0334f72137
commit febd7cbd2e
No known key found for this signature in database
GPG Key ID: 5FCD9D5211B952DA
3 changed files with 107 additions and 61 deletions

View File

@ -285,9 +285,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
);
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
({ messageId }) => messageId === messageHistoryEntry.messageId
)
!this.isMessageAvailable(messageHistoryEntry.messageId)
);
if (missingDependencies.length === 0) {
if (isContentMessage(message) && this.deliverMessage(message)) {
@ -562,9 +560,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
const missingDependencies = message.causalHistory.filter(
(messageHistoryEntry) =>
!this.localHistory.some(
({ messageId }) => messageId === messageHistoryEntry.messageId
)
!this.isMessageAvailable(messageHistoryEntry.messageId)
);
if (missingDependencies.length > 0) {
@ -715,6 +711,26 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
}
}
/**
* Check if a message is available (either in localHistory or incomingBuffer)
* This prevents treating messages as "missing" when they've already been received
* but are waiting in the incoming buffer for their dependencies.
*
* @param messageId - The ID of the message to check
* @private
*/
private isMessageAvailable(messageId: MessageId): boolean {
// Check if in local history
if (this.localHistory.some((m) => m.messageId === messageId)) {
return true;
}
// Check if in incoming buffer (already received, waiting for dependencies)
if (this.incomingBuffer.some((m) => m.messageId === messageId)) {
return true;
}
return false;
}
/**
* Return true if the message was "delivered"
*

View File

@ -1,5 +1,4 @@
import { Logger } from "@waku/utils";
import _ from "lodash";
import type { HistoryEntry } from "../message.js";
@ -11,6 +10,7 @@ const log = new Logger("sds:repair:buffers");
interface OutgoingBufferEntry {
entry: HistoryEntry;
tReq: number; // Timestamp when this repair request should be sent
requested: boolean; // Whether this repair has been requested already
}
/**
@ -30,66 +30,84 @@ export class OutgoingRepairBuffer {
private items: OutgoingBufferEntry[] = [];
private readonly maxSize: number;
constructor(maxSize = 1000) {
public constructor(maxSize = 1000) {
this.maxSize = maxSize;
}
/**
* Add a missing message to the outgoing repair request buffer
* If message already exists, it is not updated (keeps original T_req)
* @returns true if the entry was added, false if it already existed
*/
public add(entry: HistoryEntry, tReq: number): void {
public add(entry: HistoryEntry, tReq: number): boolean {
const messageId = entry.messageId;
// Check if already exists - do NOT update T_req per spec
const existingIndex = this.items.findIndex(item => item.entry.messageId === messageId);
const existingIndex = this.items.findIndex(
(item) => item.entry.messageId === messageId
);
if (existingIndex !== -1) {
log.info(`Message ${messageId} already in outgoing buffer, keeping original T_req`);
return;
log.info(
`Message ${messageId} already in outgoing buffer, keeping original T_req`
);
return false;
}
// Check buffer size limit
if (this.items.length >= this.maxSize) {
// Evict oldest T_req entry (first in sorted array since we want to evict oldest)
const evicted = this.items.shift()!;
log.warn(`Buffer full, evicted oldest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`);
log.warn(
`Buffer full, evicted oldest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`
);
}
// Add new entry and re-sort
const newEntry: OutgoingBufferEntry = { entry, tReq };
const newEntry: OutgoingBufferEntry = { entry, tReq, requested: false };
const combined = [...this.items, newEntry];
// Sort by T_req (ascending)
combined.sort((a, b) => a.tReq - b.tReq);
this.items = combined;
log.info(`Added ${messageId} to outgoing buffer with T_req: ${tReq}`);
return true;
}
/**
* Remove a message from the buffer (e.g., when received)
*/
public remove(messageId: string): void {
this.items = this.items.filter(item => item.entry.messageId !== messageId);
this.items = this.items.filter(
(item) => item.entry.messageId !== messageId
);
}
/**
* Get eligible repair requests (where T_req <= currentTime)
* Returns up to maxRequests entries from the front of the sorted array
* Marks returned entries as requested but keeps them in buffer until received
*/
public getEligible(currentTime: number, maxRequests = 3): HistoryEntry[] {
const eligible: HistoryEntry[] = [];
// Iterate from front of sorted array (earliest T_req first)
for (const item of this.items) {
if (item.tReq <= currentTime && eligible.length < maxRequests) {
// Only return items that are eligible and haven't been requested yet
if (
item.tReq <= currentTime &&
!item.requested &&
eligible.length < maxRequests
) {
eligible.push(item.entry);
} else if (item.tReq > currentTime) {
// Since array is sorted, no more eligible entries
break;
// Mark as requested so we don't request it again
item.requested = true;
log.info(
`Repair request for ${item.entry.messageId} is eligible and marked as requested`
);
}
}
return eligible;
}
@ -97,7 +115,7 @@ export class OutgoingRepairBuffer {
* Check if a message is in the buffer
*/
public has(messageId: string): boolean {
return this.items.some(item => item.entry.messageId === messageId);
return this.items.some((item) => item.entry.messageId === messageId);
}
/**
@ -118,7 +136,7 @@ export class OutgoingRepairBuffer {
* Get all entries (for testing/debugging)
*/
public getAll(): HistoryEntry[] {
return this.items.map(item => item.entry);
return this.items.map((item) => item.entry);
}
/**
@ -138,47 +156,55 @@ export class IncomingRepairBuffer {
private items: IncomingBufferEntry[] = [];
private readonly maxSize: number;
constructor(maxSize = 1000) {
public constructor(maxSize = 1000) {
this.maxSize = maxSize;
}
/**
* Add a repair request that we can fulfill
* If message already exists, it is ignored (not updated)
* @returns true if the entry was added, false if it already existed
*/
public add(entry: HistoryEntry, tResp: number): void {
public add(entry: HistoryEntry, tResp: number): boolean {
const messageId = entry.messageId;
// Check if already exists - ignore per spec
const existingIndex = this.items.findIndex(item => item.entry.messageId === messageId);
const existingIndex = this.items.findIndex(
(item) => item.entry.messageId === messageId
);
if (existingIndex !== -1) {
log.info(`Message ${messageId} already in incoming buffer, ignoring`);
return;
return false;
}
// Check buffer size limit
if (this.items.length >= this.maxSize) {
// Evict furthest T_resp entry (last in sorted array)
const evicted = this.items.pop()!;
log.warn(`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}`);
log.warn(
`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_resp ${evicted.tResp}`
);
}
// Add new entry and re-sort
const newEntry: IncomingBufferEntry = { entry, tResp };
const combined = [...this.items, newEntry];
// Sort by T_resp (ascending)
combined.sort((a, b) => a.tResp - b.tResp);
this.items = combined;
log.info(`Added ${messageId} to incoming buffer with T_resp: ${tResp}`);
return true;
}
/**
* Remove a message from the buffer
*/
public remove(messageId: string): void {
this.items = this.items.filter(item => item.entry.messageId !== messageId);
this.items = this.items.filter(
(item) => item.entry.messageId !== messageId
);
}
/**
@ -188,7 +214,7 @@ export class IncomingRepairBuffer {
public getReady(currentTime: number): HistoryEntry[] {
const ready: HistoryEntry[] = [];
const remaining: IncomingBufferEntry[] = [];
for (const item of this.items) {
if (item.tResp <= currentTime) {
ready.push(item.entry);
@ -198,10 +224,10 @@ export class IncomingRepairBuffer {
remaining.push(item);
}
}
// Keep only non-ready entries
this.items = remaining;
return ready;
}
@ -209,7 +235,7 @@ export class IncomingRepairBuffer {
* Check if a message is in the buffer
*/
public has(messageId: string): boolean {
return this.items.some(item => item.entry.messageId === messageId);
return this.items.some((item) => item.entry.messageId === messageId);
}
/**
@ -230,7 +256,7 @@ export class IncomingRepairBuffer {
* Get all entries (for testing/debugging)
*/
public getAll(): HistoryEntry[] {
return this.items.map(item => item.entry);
return this.items.map((item) => item.entry);
}
/**
@ -239,4 +265,4 @@ export class IncomingRepairBuffer {
public getItems(): IncomingBufferEntry[] {
return [...this.items];
}
}
}

View File

@ -144,18 +144,20 @@ export class RepairManager {
// Calculate when to request this repair
const tReq = this.calculateTReq(entry.messageId, currentTime);
// Add to outgoing buffer
this.outgoingBuffer.add(entry, tReq);
// Add to outgoing buffer - only log and emit event if actually added
const wasAdded = this.outgoingBuffer.add(entry, tReq);
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
if (wasAdded) {
log.info(
`Added missing dependency ${entry.messageId} to repair buffer with T_req=${tReq}`
);
// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
// Emit event
this.eventEmitter?.("RepairRequestQueued", {
messageId: entry.messageId,
tReq
});
}
}
}
@ -234,18 +236,20 @@ export class RepairManager {
currentTime
);
// Add to incoming buffer
this.incomingBuffer.add(request, tResp);
// Add to incoming buffer - only log and emit event if actually added
const wasAdded = this.incomingBuffer.add(request, tResp);
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
if (wasAdded) {
log.info(
`Will respond to repair request for ${request.messageId} at T_resp=${tResp}`
);
// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
// Emit event
this.eventEmitter?.("RepairResponseQueued", {
messageId: request.messageId,
tResp
});
}
}
}