Merge bc59717cf1b53a301711d1ebbbf0d160eed1c1b2 into 3e3c5111b7fc19dc397a9b498c8dc7e078bbc6d7

This commit is contained in:
Danish Arora 2025-11-28 16:16:25 +00:00 committed by GitHub
commit ebb9fb67b8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 159 additions and 204 deletions

View File

@ -9,23 +9,23 @@ describe("MemLocalHistory", () => {
const hist = new MemLocalHistory({ maxSize: maxSize }); const hist = new MemLocalHistory({ maxSize: maxSize });
hist.push( hist.addMessages(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
); );
expect(hist.length).to.eq(1); expect(hist.size).to.eq(1);
hist.push( hist.addMessages(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])) new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2]))
); );
expect(hist.length).to.eq(2); expect(hist.size).to.eq(2);
hist.push( hist.addMessages(
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
); );
expect(hist.length).to.eq(2); expect(hist.size).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); expect(hist.hasMessage("1")).to.eq(false);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); expect(hist.hasMessage("2")).to.eq(true);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); expect(hist.hasMessage("3")).to.eq(true);
}); });
it("Cap max size when a pushed array is exceeding the cap", () => { it("Cap max size when a pushed array is exceeding the cap", () => {
@ -33,18 +33,18 @@ describe("MemLocalHistory", () => {
const hist = new MemLocalHistory({ maxSize: maxSize }); const hist = new MemLocalHistory({ maxSize: maxSize });
hist.push( hist.addMessages(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
); );
expect(hist.length).to.eq(1); expect(hist.size).to.eq(1);
hist.push( hist.addMessages(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])), new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])),
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
); );
expect(hist.length).to.eq(2); expect(hist.size).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); expect(hist.hasMessage("1")).to.eq(false);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); expect(hist.hasMessage("2")).to.eq(true);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); expect(hist.hasMessage("3")).to.eq(true);
}); });
}); });

View File

@ -1,7 +1,12 @@
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import _ from "lodash"; import _ from "lodash";
import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; import {
type ChannelId,
ContentMessage,
type HistoryEntry,
isContentMessage
} from "./message.js";
import { PersistentStorage } from "./persistent_storage.js"; import { PersistentStorage } from "./persistent_storage.js";
export const DEFAULT_MAX_LENGTH = 10_000; export const DEFAULT_MAX_LENGTH = 10_000;
@ -20,33 +25,13 @@ export const DEFAULT_MAX_LENGTH = 10_000;
* at next push. * at next push.
*/ */
export interface ILocalHistory { export interface ILocalHistory {
length: number; readonly size: number;
push(...items: ContentMessage[]): number; addMessages(...messages: ContentMessage[]): void;
some( hasMessage(messageId: string): boolean;
predicate: ( getMessage(messageId: string): ContentMessage | undefined;
value: ContentMessage, getRecentMessages(count: number): ContentMessage[];
index: number, getAllMessages(): ContentMessage[];
array: ContentMessage[] findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[];
) => unknown,
thisArg?: any
): boolean;
slice(start?: number, end?: number): ContentMessage[];
find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined;
findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): number;
} }
export type MemLocalHistoryOptions = { export type MemLocalHistoryOptions = {
@ -58,6 +43,7 @@ const log = new Logger("sds:local-history");
export class MemLocalHistory implements ILocalHistory { export class MemLocalHistory implements ILocalHistory {
private items: ContentMessage[] = []; private items: ContentMessage[] = [];
private messageIndex: Map<string, ContentMessage> = new Map();
private readonly storage?: PersistentStorage; private readonly storage?: PersistentStorage;
private readonly maxSize: number; private readonly maxSize: number;
@ -85,18 +71,18 @@ export class MemLocalHistory implements ILocalHistory {
this.load(); this.load();
} }
public get length(): number { public get size(): number {
return this.items.length; return this.items.length;
} }
public push(...items: ContentMessage[]): number { public addMessages(...messages: ContentMessage[]): void {
for (const item of items) { for (const message of messages) {
this.validateMessage(item); this.validateMessage(message);
} }
// Add new items and sort by timestamp, ensuring uniqueness by messageId // Add new items and sort by timestamp, ensuring uniqueness by messageId
// The valueOf() method on ContentMessage enables native < operator sorting // The valueOf() method on ContentMessage enables native < operator sorting
const combinedItems = [...this.items, ...items]; const combinedItems = [...this.items, ...messages];
// Sort by timestamp (using valueOf() which creates timestamp_messageId string) // Sort by timestamp (using valueOf() which creates timestamp_messageId string)
combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf()));
@ -104,52 +90,45 @@ export class MemLocalHistory implements ILocalHistory {
// Remove duplicates by messageId while maintaining order // Remove duplicates by messageId while maintaining order
this.items = _.uniqBy(combinedItems, "messageId"); this.items = _.uniqBy(combinedItems, "messageId");
this.rebuildIndex();
// Let's drop older messages if max length is reached // Let's drop older messages if max length is reached
if (this.length > this.maxSize) { if (this.size > this.maxSize) {
const numItemsToRemove = this.length - this.maxSize; const numItemsToRemove = this.size - this.maxSize;
this.items.splice(0, numItemsToRemove); const removedItems = this.items.splice(0, numItemsToRemove);
for (const item of removedItems) {
this.messageIndex.delete(item.messageId);
}
} }
this.save(); this.save();
return this.items.length;
} }
public some( public hasMessage(messageId: string): boolean {
predicate: ( return this.messageIndex.has(messageId);
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean {
return this.items.some(predicate, thisArg);
} }
public slice(start?: number, end?: number): ContentMessage[] { public getRecentMessages(count: number): ContentMessage[] {
return this.items.slice(start, end); return this.items.slice(-count);
} }
public find( public getAllMessages(): ContentMessage[] {
predicate: ( return [...this.items];
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined {
return this.items.find(predicate, thisArg);
} }
public findIndex( public getMessage(messageId: string): ContentMessage | undefined {
predicate: ( return this.messageIndex.get(messageId);
value: ContentMessage, }
index: number,
obj: ContentMessage[] public findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] {
) => unknown, return entries.filter((entry) => !this.messageIndex.has(entry.messageId));
thisArg?: any }
): number {
return this.items.findIndex(predicate, thisArg); private rebuildIndex(): void {
this.messageIndex.clear();
for (const message of this.items) {
this.messageIndex.set(message.messageId, message);
}
} }
private validateMessage(message: ContentMessage): void { private validateMessage(message: ContentMessage): void {
@ -168,6 +147,7 @@ export class MemLocalHistory implements ILocalHistory {
const messages = this.storage?.load() ?? []; const messages = this.storage?.load() ?? [];
if (messages.length > 0) { if (messages.length > 0) {
this.items = messages; this.items = messages;
this.rebuildIndex();
} }
} }
} }

View File

@ -118,14 +118,10 @@ describe("MessageChannel", function () {
const messageId = MessageChannel.getMessageId(payload); const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback); await sendMessage(channelA, payload, callback);
const messageIdLog = channelA["localHistory"] as ILocalHistory; const messageIdLog = channelA["localHistory"] as ILocalHistory;
expect(messageIdLog.length).to.equal(1); expect(messageIdLog.size).to.equal(1);
expect( const msg = messageIdLog.getMessage(messageId);
messageIdLog.some( expect(msg).to.exist;
(log) => expect(msg!.lamportTimestamp).to.equal(expectedTimestamp);
log.lamportTimestamp === expectedTimestamp &&
log.messageId === messageId
)
).to.equal(true);
}); });
it("should add sent message to localHistory with retrievalHint", async () => { it("should add sent message to localHistory with retrievalHint", async () => {
@ -139,12 +135,10 @@ describe("MessageChannel", function () {
}); });
const localHistory = channelA["localHistory"] as ILocalHistory; const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(1); expect(localHistory.size).to.equal(1);
// Find the message in local history // Find the message in local history
const historyEntry = localHistory.find( const historyEntry = localHistory.getMessage(messageId);
(entry) => entry.messageId === messageId
);
expect(historyEntry).to.exist; expect(historyEntry).to.exist;
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
}); });
@ -296,11 +290,9 @@ describe("MessageChannel", function () {
// Message should not be in local history // Message should not be in local history
const localHistory = channelB["localHistory"]; const localHistory = channelB["localHistory"];
expect( expect(localHistory.hasMessage(receivedMessage!.messageId)).to.equal(
localHistory.some( false
({ messageId }) => messageId === receivedMessage!.messageId );
)
).to.equal(false);
}); });
it("should add received message to localHistory with retrievalHint", async () => { it("should add received message to localHistory with retrievalHint", async () => {
@ -325,12 +317,10 @@ describe("MessageChannel", function () {
); );
const localHistory = channelA["localHistory"] as ILocalHistory; const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(1); expect(localHistory.size).to.equal(1);
// Find the message in local history // Find the message in local history
const historyEntry = localHistory.find( const historyEntry = localHistory.getMessage(messageId);
(entry) => entry.messageId === messageId
);
expect(historyEntry).to.exist; expect(historyEntry).to.exist;
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
}); });
@ -379,35 +369,35 @@ describe("MessageChannel", function () {
); );
const localHistory = channelA["localHistory"]; const localHistory = channelA["localHistory"];
expect(localHistory.length).to.equal(3); expect(localHistory.size).to.equal(3);
// Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3) // Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3)
const first = localHistory.findIndex( const first = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message1Id && lamportTimestamp === startTimestamp + 1n messageId === message1Id && lamportTimestamp === startTimestamp + 1n
); );
} });
);
expect(first).to.eq(0); expect(first).to.eq(0);
const second = localHistory.findIndex( const second = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message2Id && lamportTimestamp === startTimestamp + 2n messageId === message2Id && lamportTimestamp === startTimestamp + 2n
); );
} });
);
expect(second).to.eq(1); expect(second).to.eq(1);
const third = localHistory.findIndex( const third = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message3Id && lamportTimestamp === startTimestamp + 3n messageId === message3Id && lamportTimestamp === startTimestamp + 3n
); );
} });
);
expect(third).to.eq(2); expect(third).to.eq(2);
}); });
@ -447,24 +437,24 @@ describe("MessageChannel", function () {
); );
const localHistory = channelA["localHistory"] as ILocalHistory; const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(2); expect(localHistory.size).to.equal(2);
// When timestamps are equal, should be ordered by messageId lexicographically // When timestamps are equal, should be ordered by messageId lexicographically
// The valueOf() method creates "000000000000005_messageId" for comparison // The valueOf() method creates "000000000000005_messageId" for comparison
const expectedOrder = [message1Id, message2Id].sort(); const expectedOrder = [message1Id, message2Id].sort();
const first = localHistory.findIndex( const first = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[0] && lamportTimestamp == 5n; return messageId === expectedOrder[0] && lamportTimestamp == 5n;
} });
);
expect(first).to.eq(0); expect(first).to.eq(0);
const second = localHistory.findIndex( const second = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[1] && lamportTimestamp == 5n; return messageId === expectedOrder[1] && lamportTimestamp == 5n;
} });
);
expect(second).to.eq(1); expect(second).to.eq(1);
}); });
}); });
@ -1112,7 +1102,7 @@ describe("MessageChannel", function () {
}); });
channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 });
const message = utf8ToBytes("first message in channel"); const message = utf8ToBytes("first message in channel");
channelA["localHistory"].push( channelA["localHistory"].addMessages(
new ContentMessage( new ContentMessage(
MessageChannel.getMessageId(message), MessageChannel.getMessageId(message),
"MyChannel", "MyChannel",
@ -1156,7 +1146,7 @@ describe("MessageChannel", function () {
).to.equal(false); ).to.equal(false);
const localLog = channelA["localHistory"]; const localLog = channelA["localHistory"];
expect(localLog.length).to.equal(1); // beforeEach adds one message expect(localLog.size).to.equal(1); // beforeEach adds one message
}); });
it("should not be delivered", async () => { it("should not be delivered", async () => {
@ -1170,7 +1160,7 @@ describe("MessageChannel", function () {
expect(timestampAfter).to.equal(timestampBefore); expect(timestampAfter).to.equal(timestampBefore);
const localLog = channelB["localHistory"]; const localLog = channelB["localHistory"];
expect(localLog.length).to.equal(0); expect(localLog.size).to.equal(0);
const bloomFilter = getBloomFilter(channelB); const bloomFilter = getBloomFilter(channelB);
expect( expect(
@ -1230,7 +1220,7 @@ describe("MessageChannel", function () {
const channelB = createTestChannel(channelId, "bob"); const channelB = createTestChannel(channelId, "bob");
// Track initial state // Track initial state
const localHistoryBefore = channelB["localHistory"].length; const localHistoryBefore = channelB["localHistory"].size;
const incomingBufferBefore = channelB["incomingBuffer"].length; const incomingBufferBefore = channelB["incomingBuffer"].length;
const timestampBefore = channelB["lamportTimestamp"]; const timestampBefore = channelB["lamportTimestamp"];
@ -1248,7 +1238,7 @@ describe("MessageChannel", function () {
// Verify ephemeral message behavior: // Verify ephemeral message behavior:
// 1. Not added to local history // 1. Not added to local history
expect(channelB["localHistory"].length).to.equal(localHistoryBefore); expect(channelB["localHistory"].size).to.equal(localHistoryBefore);
// 2. Not added to incoming buffer // 2. Not added to incoming buffer
expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore); expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore);
// 3. Doesn't update lamport timestamp // 3. Doesn't update lamport timestamp
@ -1272,14 +1262,14 @@ describe("MessageChannel", function () {
await sendMessage(channel1, utf8ToBytes("msg-1"), callback); await sendMessage(channel1, utf8ToBytes("msg-1"), callback);
await sendMessage(channel1, utf8ToBytes("msg-2"), callback); await sendMessage(channel1, utf8ToBytes("msg-2"), callback);
expect(channel1["localHistory"].length).to.equal(2); expect(channel1["localHistory"].size).to.equal(2);
// Recreate channel with same storage - should load history // Recreate channel with same storage - should load history
const channel2 = new MessageChannel(persistentChannelId, "alice"); const channel2 = new MessageChannel(persistentChannelId, "alice");
expect(channel2["localHistory"].length).to.equal(2); expect(channel2["localHistory"].size).to.equal(2);
expect( expect(
channel2["localHistory"].slice(0).map((m) => m.messageId) channel2["localHistory"].getAllMessages().map((m) => m.messageId)
).to.deep.equal([ ).to.deep.equal([
MessageChannel.getMessageId(utf8ToBytes("msg-1")), MessageChannel.getMessageId(utf8ToBytes("msg-1")),
MessageChannel.getMessageId(utf8ToBytes("msg-2")) MessageChannel.getMessageId(utf8ToBytes("msg-2"))

View File

@ -297,9 +297,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
message.messageId, message.messageId,
message.causalHistory.map((ch) => ch.messageId) message.causalHistory.map((ch) => ch.messageId)
); );
const missingDependencies = message.causalHistory.filter( const missingDependencies = this.findMissingDependencies(
(messageHistoryEntry) => message.causalHistory
!this.isMessageAvailable(messageHistoryEntry.messageId)
); );
if (missingDependencies.length === 0) { if (missingDependencies.length === 0) {
if (isContentMessage(message) && this.deliverMessage(message)) { if (isContentMessage(message) && this.deliverMessage(message)) {
@ -454,7 +453,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.channelId, this.channelId,
this.senderId, this.senderId,
this.localHistory this.localHistory
.slice(-this.causalHistorySize) .getRecentMessages(this.causalHistorySize)
.map(({ messageId, retrievalHint, senderId }) => { .map(({ messageId, retrievalHint, senderId }) => {
return { messageId, retrievalHint, senderId }; return { messageId, retrievalHint, senderId };
}), }),
@ -583,9 +582,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.filter.insert(message.messageId); this.filter.insert(message.messageId);
} }
const missingDependencies = message.causalHistory.filter( const missingDependencies = this.findMissingDependencies(
(messageHistoryEntry) => message.causalHistory
!this.isMessageAvailable(messageHistoryEntry.messageId)
); );
if (missingDependencies.length > 0) { if (missingDependencies.length > 0) {
@ -676,7 +674,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.channelId, this.channelId,
this.senderId, this.senderId,
this.localHistory this.localHistory
.slice(-this.causalHistorySize) .getRecentMessages(this.causalHistorySize)
.map(({ messageId, retrievalHint, senderId }) => { .map(({ messageId, retrievalHint, senderId }) => {
return { messageId, retrievalHint, senderId }; return { messageId, retrievalHint, senderId };
}), }),
@ -699,7 +697,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
if (success && isContentMessage(message)) { if (success && isContentMessage(message)) {
message.retrievalHint = retrievalHint; message.retrievalHint = retrievalHint;
this.filter.insert(messageId); this.filter.insert(messageId);
this.localHistory.push(message); this.localHistory.addMessages(message);
this.timeReceived.set(messageId, Date.now()); this.timeReceived.set(messageId, Date.now());
this.safeSendEvent(MessageChannelEvent.OutMessageSent, { this.safeSendEvent(MessageChannelEvent.OutMessageSent, {
detail: message detail: message
@ -739,24 +737,15 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
} }
} }
/** private findMissingDependencies(entries: HistoryEntry[]): HistoryEntry[] {
* Check if a message is available (either in localHistory or incomingBuffer) const missingFromHistory =
* This prevents treating messages as "missing" when they've already been received this.localHistory.findMissingDependencies(entries);
* but are waiting in the incoming buffer for their dependencies.
* const incomingIds = new Set(this.incomingBuffer.map((m) => m.messageId));
* @param messageId - The ID of the message to check
* @private return missingFromHistory.filter(
*/ (entry) => !incomingIds.has(entry.messageId)
private isMessageAvailable(messageId: MessageId): boolean { );
// Check if in local history
if (this.localHistory.some((m) => m.messageId === messageId)) {
return true;
}
// Check if in incoming buffer (already received, waiting for dependencies)
if (this.incomingBuffer.some((m) => m.messageId === messageId)) {
return true;
}
return false;
} }
/** /**
@ -786,8 +775,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
} }
// Check if the entry is already present // Check if the entry is already present
const existingHistoryEntry = this.localHistory.find( const existingHistoryEntry = this.localHistory.getMessage(
({ messageId }) => messageId === message.messageId message.messageId
); );
// The history entry is already present, no need to re-add // The history entry is already present, no need to re-add
@ -799,7 +788,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
log.warn("message delivered without a retrieval hint", message.messageId); log.warn("message delivered without a retrieval hint", message.messageId);
} }
this.localHistory.push(message); this.localHistory.addMessages(message);
return true; return true;
} }

View File

@ -15,27 +15,26 @@ describe("PersistentStorage", () => {
expect(persistentStorage).to.not.be.undefined; expect(persistentStorage).to.not.be.undefined;
const history1 = new MemLocalHistory({ storage: persistentStorage }); const history1 = new MemLocalHistory({ storage: persistentStorage });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2)); history1.addMessages(createMessage("msg-2", 2));
const history2 = new MemLocalHistory({ storage: persistentStorage }); const history2 = new MemLocalHistory({ storage: persistentStorage });
expect(history2.length).to.equal(2); expect(history2.size).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ expect(
"msg-1", history2.getAllMessages().map((msg) => msg.messageId)
"msg-2" ).to.deep.equal(["msg-1", "msg-2"]);
]);
}); });
it("uses in-memory only when no storage is provided", () => { it("uses in-memory only when no storage is provided", () => {
const history = new MemLocalHistory({ maxSize: 100 }); const history = new MemLocalHistory({ maxSize: 100 });
history.push(createMessage("msg-3", 3)); history.addMessages(createMessage("msg-3", 3));
expect(history.length).to.equal(1); expect(history.size).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3"); expect(history.getAllMessages()[0].messageId).to.equal("msg-3");
const history2 = new MemLocalHistory({ maxSize: 100 }); const history2 = new MemLocalHistory({ maxSize: 100 });
expect(history2.length).to.equal(0); expect(history2.size).to.equal(0);
}); });
it("handles corrupt data in storage gracefully", () => { it("handles corrupt data in storage gracefully", () => {
@ -46,7 +45,7 @@ describe("PersistentStorage", () => {
const persistentStorage = PersistentStorage.create(channelId, storage); const persistentStorage = PersistentStorage.create(channelId, storage);
const history = new MemLocalHistory({ storage: persistentStorage }); const history = new MemLocalHistory({ storage: persistentStorage });
expect(history.length).to.equal(0); expect(history.size).to.equal(0);
// Corrupt data is not saved // Corrupt data is not saved
expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null);
@ -61,14 +60,14 @@ describe("PersistentStorage", () => {
const history1 = new MemLocalHistory({ storage: storage1 }); const history1 = new MemLocalHistory({ storage: storage1 });
const history2 = new MemLocalHistory({ storage: storage2 }); const history2 = new MemLocalHistory({ storage: storage2 });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history2.push(createMessage("msg-2", 2)); history2.addMessages(createMessage("msg-2", 2));
expect(history1.length).to.equal(1); expect(history1.size).to.equal(1);
expect(history1.slice(0)[0].messageId).to.equal("msg-1"); expect(history1.getAllMessages()[0].messageId).to.equal("msg-1");
expect(history2.length).to.equal(1); expect(history2.size).to.equal(1);
expect(history2.slice(0)[0].messageId).to.equal("msg-2"); expect(history2.getAllMessages()[0].messageId).to.equal("msg-2");
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null;
@ -81,7 +80,7 @@ describe("PersistentStorage", () => {
expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; expect(storage.getItem("waku:sds:history:channel-1")).to.be.null;
history.push(createMessage("msg-1", 1)); history.addMessages(createMessage("msg-1", 1));
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
@ -95,15 +94,15 @@ describe("PersistentStorage", () => {
const persistentStorage1 = PersistentStorage.create(channelId, storage); const persistentStorage1 = PersistentStorage.create(channelId, storage);
const history1 = new MemLocalHistory({ storage: persistentStorage1 }); const history1 = new MemLocalHistory({ storage: persistentStorage1 });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2)); history1.addMessages(createMessage("msg-2", 2));
history1.push(createMessage("msg-3", 3)); history1.addMessages(createMessage("msg-3", 3));
const persistentStorage2 = PersistentStorage.create(channelId, storage); const persistentStorage2 = PersistentStorage.create(channelId, storage);
const history2 = new MemLocalHistory({ storage: persistentStorage2 }); const history2 = new MemLocalHistory({ storage: persistentStorage2 });
expect(history2.length).to.equal(3); expect(history2.size).to.equal(3);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([
"msg-1", "msg-1",
"msg-2", "msg-2",
"msg-3" "msg-3"
@ -135,16 +134,15 @@ describe("PersistentStorage", () => {
it("persists and restores messages with channelId", () => { it("persists and restores messages with channelId", () => {
const testChannelId = `test-${Date.now()}`; const testChannelId = `test-${Date.now()}`;
const history1 = new MemLocalHistory({ storage: testChannelId }); const history1 = new MemLocalHistory({ storage: testChannelId });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2)); history1.addMessages(createMessage("msg-2", 2));
const history2 = new MemLocalHistory({ storage: testChannelId }); const history2 = new MemLocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2); expect(history2.size).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ expect(
"msg-1", history2.getAllMessages().map((msg) => msg.messageId)
"msg-2" ).to.deep.equal(["msg-1", "msg-2"]);
]);
localStorage.removeItem(`waku:sds:history:${testChannelId}`); localStorage.removeItem(`waku:sds:history:${testChannelId}`);
}); });
@ -153,12 +151,12 @@ describe("PersistentStorage", () => {
const testChannelId = `auto-storage-${Date.now()}`; const testChannelId = `auto-storage-${Date.now()}`;
const history = new MemLocalHistory({ storage: testChannelId }); const history = new MemLocalHistory({ storage: testChannelId });
history.push(createMessage("msg-auto-1", 1)); history.addMessages(createMessage("msg-auto-1", 1));
history.push(createMessage("msg-auto-2", 2)); history.addMessages(createMessage("msg-auto-2", 2));
const history2 = new MemLocalHistory({ storage: testChannelId }); const history2 = new MemLocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2); expect(history2.size).to.equal(2);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([
"msg-auto-1", "msg-auto-1",
"msg-auto-2" "msg-auto-2"
]); ]);

View File

@ -1,8 +1,8 @@
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import type { ILocalHistory } from "../mem_local_history.js";
import type { HistoryEntry, MessageId } from "../message.js"; import type { HistoryEntry, MessageId } from "../message.js";
import { Message } from "../message.js"; import { Message } from "../message.js";
import type { ILocalHistory } from "../message_channel.js";
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
import { import {
@ -191,9 +191,7 @@ export class RepairManager {
this.outgoingBuffer.remove(request.messageId); this.outgoingBuffer.remove(request.messageId);
// Check if we have this message // Check if we have this message
const message = localHistory.find( const message = localHistory.getMessage(request.messageId);
(m) => m.messageId === request.messageId
);
if (!message) { if (!message) {
log.info( log.info(
`Cannot fulfill repair for ${request.messageId} - not in local history` `Cannot fulfill repair for ${request.messageId} - not in local history`
@ -255,7 +253,7 @@ export class RepairManager {
const messages: Message[] = []; const messages: Message[] = [];
for (const entry of ready) { for (const entry of ready) {
const message = localHistory.find((m) => m.messageId === entry.messageId); const message = localHistory.getMessage(entry.messageId);
if (message) { if (message) {
messages.push(message); messages.push(message);
log.info(`Sending repair for ${entry.messageId}`); log.info(`Sending repair for ${entry.messageId}`);