fix: more buffer fixes

This commit is contained in:
jm-clius 2025-10-23 14:36:48 +01:00
parent febd7cbd2e
commit 5580f2f1e9
No known key found for this signature in database
GPG Key ID: 5FCD9D5211B952DA
3 changed files with 40 additions and 11 deletions

View File

@ -38,7 +38,7 @@ describe("OutgoingRepairBuffer", () => {
expect(items[0].tReq).to.equal(1000); // Should keep original
});
it("should evict oldest entry when buffer is full", () => {
it("should evict furthest entry when buffer is full", () => {
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
@ -47,13 +47,13 @@ describe("OutgoingRepairBuffer", () => {
buffer.add(entry2, 2000);
buffer.add(entry1, 1000);
buffer.add(entry3, 3000);
buffer.add(entry4, 1500); // Should evict msg1 (oldest T_req)
buffer.add(entry4, 1500); // Should evict msg3 (furthest T_req = 3000)
const items = buffer.getItems();
expect(items).to.have.lengthOf(3);
expect(buffer.has("msg1")).to.be.false; // msg1 should be evicted
expect(buffer.has("msg3")).to.be.false; // msg3 should be evicted (furthest T_req)
expect(buffer.has("msg1")).to.be.true;
expect(buffer.has("msg2")).to.be.true;
expect(buffer.has("msg3")).to.be.true;
expect(buffer.has("msg4")).to.be.true;
});
@ -69,11 +69,23 @@ describe("OutgoingRepairBuffer", () => {
const eligible = buffer.getEligible(1500, 3);
expect(eligible).to.have.lengthOf(1);
expect(eligible[0].messageId).to.equal("msg1");
});
const eligible2 = buffer.getEligible(2500, 3);
expect(eligible2).to.have.lengthOf(2);
expect(eligible2[0].messageId).to.equal("msg1");
expect(eligible2[1].messageId).to.equal("msg2");
it("should get multiple eligible entries at later time", () => {
const entry1: HistoryEntry = { messageId: "msg1" };
const entry2: HistoryEntry = { messageId: "msg2" };
const entry3: HistoryEntry = { messageId: "msg3" };
// Create new buffer for second test since getEligible marks entries as requested
const buffer2 = new OutgoingRepairBuffer(3);
buffer2.add(entry1, 1000);
buffer2.add(entry2, 2000);
buffer2.add(entry3, 3000);
const eligible = buffer2.getEligible(2500, 3);
expect(eligible).to.have.lengthOf(2);
expect(eligible[0].messageId).to.equal("msg1");
expect(eligible[1].messageId).to.equal("msg2");
});
it("should respect maxRequests limit", () => {

View File

@ -55,10 +55,10 @@ export class OutgoingRepairBuffer {
// Check buffer size limit
if (this.items.length >= this.maxSize) {
// Evict oldest T_req entry (first in sorted array since we want to evict oldest)
const evicted = this.items.shift()!;
// Evict furthest T_req entry (last in sorted array) to preserve most urgent repairs
const evicted = this.items.pop()!;
log.warn(
`Buffer full, evicted oldest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`
`Buffer full, evicted furthest entry ${evicted.entry.messageId} with T_req ${evicted.tReq}`
);
}

View File

@ -315,6 +315,23 @@ export class RepairManager {
* Update number of response groups (e.g., when participants change)
*/
public updateResponseGroups(numParticipants: number): void {
if (
numParticipants < 0 ||
!Number.isFinite(numParticipants) ||
!Number.isInteger(numParticipants)
) {
throw new Error(
`Invalid numParticipants: ${numParticipants}. Must be a non-negative integer.`
);
}
if (numParticipants > Number.MAX_SAFE_INTEGER) {
log.warn(
`numParticipants ${numParticipants} exceeds MAX_SAFE_INTEGER, using MAX_SAFE_INTEGER`
);
numParticipants = Number.MAX_SAFE_INTEGER;
}
// Per spec: num_response_groups = max(1, num_participants / 128)
this.config.numResponseGroups = Math.max(
1,