This commit is contained in:
Danish Arora 2025-11-27 00:08:35 -05:00
parent 3e3c5111b7
commit 055d7fecca
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
6 changed files with 136 additions and 180 deletions

View File

@ -9,23 +9,23 @@ describe("MemLocalHistory", () => {
const hist = new MemLocalHistory({ maxSize: maxSize }); const hist = new MemLocalHistory({ maxSize: maxSize });
hist.push( hist.addMessages(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
); );
expect(hist.length).to.eq(1); expect(hist.size).to.eq(1);
hist.push( hist.addMessages(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])) new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2]))
); );
expect(hist.length).to.eq(2); expect(hist.size).to.eq(2);
hist.push( hist.addMessages(
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
); );
expect(hist.length).to.eq(2); expect(hist.size).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); expect(hist.hasMessage("1")).to.eq(false);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); expect(hist.hasMessage("2")).to.eq(true);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); expect(hist.hasMessage("3")).to.eq(true);
}); });
it("Cap max size when a pushed array is exceeding the cap", () => { it("Cap max size when a pushed array is exceeding the cap", () => {
@ -33,18 +33,18 @@ describe("MemLocalHistory", () => {
const hist = new MemLocalHistory({ maxSize: maxSize }); const hist = new MemLocalHistory({ maxSize: maxSize });
hist.push( hist.addMessages(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
); );
expect(hist.length).to.eq(1); expect(hist.size).to.eq(1);
hist.push( hist.addMessages(
new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])), new ContentMessage("2", "c", "a", [], 2n, undefined, new Uint8Array([2])),
new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3])) new ContentMessage("3", "c", "a", [], 3n, undefined, new Uint8Array([3]))
); );
expect(hist.length).to.eq(2); expect(hist.size).to.eq(2);
expect(hist.findIndex((m) => m.messageId === "1")).to.eq(-1); expect(hist.hasMessage("1")).to.eq(false);
expect(hist.findIndex((m) => m.messageId === "2")).to.not.eq(-1); expect(hist.hasMessage("2")).to.eq(true);
expect(hist.findIndex((m) => m.messageId === "3")).to.not.eq(-1); expect(hist.hasMessage("3")).to.eq(true);
}); });
}); });

View File

@ -20,33 +20,12 @@ export const DEFAULT_MAX_LENGTH = 10_000;
* at next push. * at next push.
*/ */
export interface ILocalHistory { export interface ILocalHistory {
length: number; readonly size: number;
push(...items: ContentMessage[]): number; addMessages(...messages: ContentMessage[]): void;
some( hasMessage(messageId: string): boolean;
predicate: ( getMessage(messageId: string): ContentMessage | undefined;
value: ContentMessage, getRecentMessages(count: number): ContentMessage[];
index: number, getAllMessages(): ContentMessage[];
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean;
slice(start?: number, end?: number): ContentMessage[];
find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined;
findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): number;
} }
export type MemLocalHistoryOptions = { export type MemLocalHistoryOptions = {
@ -58,6 +37,7 @@ const log = new Logger("sds:local-history");
export class MemLocalHistory implements ILocalHistory { export class MemLocalHistory implements ILocalHistory {
private items: ContentMessage[] = []; private items: ContentMessage[] = [];
private messageIndex: Map<string, ContentMessage> = new Map();
private readonly storage?: PersistentStorage; private readonly storage?: PersistentStorage;
private readonly maxSize: number; private readonly maxSize: number;
@ -85,18 +65,18 @@ export class MemLocalHistory implements ILocalHistory {
this.load(); this.load();
} }
public get length(): number { public get size(): number {
return this.items.length; return this.items.length;
} }
public push(...items: ContentMessage[]): number { public addMessages(...messages: ContentMessage[]): void {
for (const item of items) { for (const message of messages) {
this.validateMessage(item); this.validateMessage(message);
} }
// Add new items and sort by timestamp, ensuring uniqueness by messageId // Add new items and sort by timestamp, ensuring uniqueness by messageId
// The valueOf() method on ContentMessage enables native < operator sorting // The valueOf() method on ContentMessage enables native < operator sorting
const combinedItems = [...this.items, ...items]; const combinedItems = [...this.items, ...messages];
// Sort by timestamp (using valueOf() which creates timestamp_messageId string) // Sort by timestamp (using valueOf() which creates timestamp_messageId string)
combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf())); combinedItems.sort((a, b) => a.valueOf().localeCompare(b.valueOf()));
@ -104,52 +84,41 @@ export class MemLocalHistory implements ILocalHistory {
// Remove duplicates by messageId while maintaining order // Remove duplicates by messageId while maintaining order
this.items = _.uniqBy(combinedItems, "messageId"); this.items = _.uniqBy(combinedItems, "messageId");
this.rebuildIndex();
// Let's drop older messages if max length is reached // Let's drop older messages if max length is reached
if (this.length > this.maxSize) { if (this.size > this.maxSize) {
const numItemsToRemove = this.length - this.maxSize; const numItemsToRemove = this.size - this.maxSize;
this.items.splice(0, numItemsToRemove); const removedItems = this.items.splice(0, numItemsToRemove);
for (const item of removedItems) {
this.messageIndex.delete(item.messageId);
}
} }
this.save(); this.save();
return this.items.length;
} }
public some( public hasMessage(messageId: string): boolean {
predicate: ( return this.messageIndex.has(messageId);
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean {
return this.items.some(predicate, thisArg);
} }
public slice(start?: number, end?: number): ContentMessage[] { public getRecentMessages(count: number): ContentMessage[] {
return this.items.slice(start, end); return this.items.slice(-count);
} }
public find( public getAllMessages(): ContentMessage[] {
predicate: ( return [...this.items];
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined {
return this.items.find(predicate, thisArg);
} }
public findIndex( public getMessage(messageId: string): ContentMessage | undefined {
predicate: ( return this.messageIndex.get(messageId);
value: ContentMessage, }
index: number,
obj: ContentMessage[] private rebuildIndex(): void {
) => unknown, this.messageIndex.clear();
thisArg?: any for (const message of this.items) {
): number { this.messageIndex.set(message.messageId, message);
return this.items.findIndex(predicate, thisArg); }
} }
private validateMessage(message: ContentMessage): void { private validateMessage(message: ContentMessage): void {
@ -168,6 +137,7 @@ export class MemLocalHistory implements ILocalHistory {
const messages = this.storage?.load() ?? []; const messages = this.storage?.load() ?? [];
if (messages.length > 0) { if (messages.length > 0) {
this.items = messages; this.items = messages;
this.rebuildIndex();
} }
} }
} }

View File

@ -118,14 +118,10 @@ describe("MessageChannel", function () {
const messageId = MessageChannel.getMessageId(payload); const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback); await sendMessage(channelA, payload, callback);
const messageIdLog = channelA["localHistory"] as ILocalHistory; const messageIdLog = channelA["localHistory"] as ILocalHistory;
expect(messageIdLog.length).to.equal(1); expect(messageIdLog.size).to.equal(1);
expect( const msg = messageIdLog.getMessage(messageId);
messageIdLog.some( expect(msg).to.exist;
(log) => expect(msg!.lamportTimestamp).to.equal(expectedTimestamp);
log.lamportTimestamp === expectedTimestamp &&
log.messageId === messageId
)
).to.equal(true);
}); });
it("should add sent message to localHistory with retrievalHint", async () => { it("should add sent message to localHistory with retrievalHint", async () => {
@ -139,12 +135,10 @@ describe("MessageChannel", function () {
}); });
const localHistory = channelA["localHistory"] as ILocalHistory; const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(1); expect(localHistory.size).to.equal(1);
// Find the message in local history // Find the message in local history
const historyEntry = localHistory.find( const historyEntry = localHistory.getMessage(messageId);
(entry) => entry.messageId === messageId
);
expect(historyEntry).to.exist; expect(historyEntry).to.exist;
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
}); });
@ -296,11 +290,9 @@ describe("MessageChannel", function () {
// Message should not be in local history // Message should not be in local history
const localHistory = channelB["localHistory"]; const localHistory = channelB["localHistory"];
expect( expect(localHistory.hasMessage(receivedMessage!.messageId)).to.equal(
localHistory.some( false
({ messageId }) => messageId === receivedMessage!.messageId );
)
).to.equal(false);
}); });
it("should add received message to localHistory with retrievalHint", async () => { it("should add received message to localHistory with retrievalHint", async () => {
@ -325,12 +317,10 @@ describe("MessageChannel", function () {
); );
const localHistory = channelA["localHistory"] as ILocalHistory; const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(1); expect(localHistory.size).to.equal(1);
// Find the message in local history // Find the message in local history
const historyEntry = localHistory.find( const historyEntry = localHistory.getMessage(messageId);
(entry) => entry.messageId === messageId
);
expect(historyEntry).to.exist; expect(historyEntry).to.exist;
expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint); expect(historyEntry!.retrievalHint).to.deep.equal(testRetrievalHint);
}); });
@ -379,35 +369,35 @@ describe("MessageChannel", function () {
); );
const localHistory = channelA["localHistory"]; const localHistory = channelA["localHistory"];
expect(localHistory.length).to.equal(3); expect(localHistory.size).to.equal(3);
// Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3) // Verify chronological order: message1 (ts=1), message2 (ts=2), message3 (ts=3)
const first = localHistory.findIndex( const first = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message1Id && lamportTimestamp === startTimestamp + 1n messageId === message1Id && lamportTimestamp === startTimestamp + 1n
); );
} });
);
expect(first).to.eq(0); expect(first).to.eq(0);
const second = localHistory.findIndex( const second = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message2Id && lamportTimestamp === startTimestamp + 2n messageId === message2Id && lamportTimestamp === startTimestamp + 2n
); );
} });
);
expect(second).to.eq(1); expect(second).to.eq(1);
const third = localHistory.findIndex( const third = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return ( return (
messageId === message3Id && lamportTimestamp === startTimestamp + 3n messageId === message3Id && lamportTimestamp === startTimestamp + 3n
); );
} });
);
expect(third).to.eq(2); expect(third).to.eq(2);
}); });
@ -447,24 +437,24 @@ describe("MessageChannel", function () {
); );
const localHistory = channelA["localHistory"] as ILocalHistory; const localHistory = channelA["localHistory"] as ILocalHistory;
expect(localHistory.length).to.equal(2); expect(localHistory.size).to.equal(2);
// When timestamps are equal, should be ordered by messageId lexicographically // When timestamps are equal, should be ordered by messageId lexicographically
// The valueOf() method creates "000000000000005_messageId" for comparison // The valueOf() method creates "000000000000005_messageId" for comparison
const expectedOrder = [message1Id, message2Id].sort(); const expectedOrder = [message1Id, message2Id].sort();
const first = localHistory.findIndex( const first = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[0] && lamportTimestamp == 5n; return messageId === expectedOrder[0] && lamportTimestamp == 5n;
} });
);
expect(first).to.eq(0); expect(first).to.eq(0);
const second = localHistory.findIndex( const second = localHistory
({ messageId, lamportTimestamp }) => { .getAllMessages()
.findIndex(({ messageId, lamportTimestamp }) => {
return messageId === expectedOrder[1] && lamportTimestamp == 5n; return messageId === expectedOrder[1] && lamportTimestamp == 5n;
} });
);
expect(second).to.eq(1); expect(second).to.eq(1);
}); });
}); });
@ -1112,7 +1102,7 @@ describe("MessageChannel", function () {
}); });
channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 });
const message = utf8ToBytes("first message in channel"); const message = utf8ToBytes("first message in channel");
channelA["localHistory"].push( channelA["localHistory"].addMessages(
new ContentMessage( new ContentMessage(
MessageChannel.getMessageId(message), MessageChannel.getMessageId(message),
"MyChannel", "MyChannel",
@ -1156,7 +1146,7 @@ describe("MessageChannel", function () {
).to.equal(false); ).to.equal(false);
const localLog = channelA["localHistory"]; const localLog = channelA["localHistory"];
expect(localLog.length).to.equal(1); // beforeEach adds one message expect(localLog.size).to.equal(1); // beforeEach adds one message
}); });
it("should not be delivered", async () => { it("should not be delivered", async () => {
@ -1170,7 +1160,7 @@ describe("MessageChannel", function () {
expect(timestampAfter).to.equal(timestampBefore); expect(timestampAfter).to.equal(timestampBefore);
const localLog = channelB["localHistory"]; const localLog = channelB["localHistory"];
expect(localLog.length).to.equal(0); expect(localLog.size).to.equal(0);
const bloomFilter = getBloomFilter(channelB); const bloomFilter = getBloomFilter(channelB);
expect( expect(
@ -1230,7 +1220,7 @@ describe("MessageChannel", function () {
const channelB = createTestChannel(channelId, "bob"); const channelB = createTestChannel(channelId, "bob");
// Track initial state // Track initial state
const localHistoryBefore = channelB["localHistory"].length; const localHistoryBefore = channelB["localHistory"].size;
const incomingBufferBefore = channelB["incomingBuffer"].length; const incomingBufferBefore = channelB["incomingBuffer"].length;
const timestampBefore = channelB["lamportTimestamp"]; const timestampBefore = channelB["lamportTimestamp"];
@ -1248,7 +1238,7 @@ describe("MessageChannel", function () {
// Verify ephemeral message behavior: // Verify ephemeral message behavior:
// 1. Not added to local history // 1. Not added to local history
expect(channelB["localHistory"].length).to.equal(localHistoryBefore); expect(channelB["localHistory"].size).to.equal(localHistoryBefore);
// 2. Not added to incoming buffer // 2. Not added to incoming buffer
expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore); expect(channelB["incomingBuffer"].length).to.equal(incomingBufferBefore);
// 3. Doesn't update lamport timestamp // 3. Doesn't update lamport timestamp
@ -1272,14 +1262,14 @@ describe("MessageChannel", function () {
await sendMessage(channel1, utf8ToBytes("msg-1"), callback); await sendMessage(channel1, utf8ToBytes("msg-1"), callback);
await sendMessage(channel1, utf8ToBytes("msg-2"), callback); await sendMessage(channel1, utf8ToBytes("msg-2"), callback);
expect(channel1["localHistory"].length).to.equal(2); expect(channel1["localHistory"].size).to.equal(2);
// Recreate channel with same storage - should load history // Recreate channel with same storage - should load history
const channel2 = new MessageChannel(persistentChannelId, "alice"); const channel2 = new MessageChannel(persistentChannelId, "alice");
expect(channel2["localHistory"].length).to.equal(2); expect(channel2["localHistory"].size).to.equal(2);
expect( expect(
channel2["localHistory"].slice(0).map((m) => m.messageId) channel2["localHistory"].getAllMessages().map((m) => m.messageId)
).to.deep.equal([ ).to.deep.equal([
MessageChannel.getMessageId(utf8ToBytes("msg-1")), MessageChannel.getMessageId(utf8ToBytes("msg-1")),
MessageChannel.getMessageId(utf8ToBytes("msg-2")) MessageChannel.getMessageId(utf8ToBytes("msg-2"))

View File

@ -454,7 +454,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.channelId, this.channelId,
this.senderId, this.senderId,
this.localHistory this.localHistory
.slice(-this.causalHistorySize) .getRecentMessages(this.causalHistorySize)
.map(({ messageId, retrievalHint, senderId }) => { .map(({ messageId, retrievalHint, senderId }) => {
return { messageId, retrievalHint, senderId }; return { messageId, retrievalHint, senderId };
}), }),
@ -676,7 +676,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.channelId, this.channelId,
this.senderId, this.senderId,
this.localHistory this.localHistory
.slice(-this.causalHistorySize) .getRecentMessages(this.causalHistorySize)
.map(({ messageId, retrievalHint, senderId }) => { .map(({ messageId, retrievalHint, senderId }) => {
return { messageId, retrievalHint, senderId }; return { messageId, retrievalHint, senderId };
}), }),
@ -699,7 +699,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
if (success && isContentMessage(message)) { if (success && isContentMessage(message)) {
message.retrievalHint = retrievalHint; message.retrievalHint = retrievalHint;
this.filter.insert(messageId); this.filter.insert(messageId);
this.localHistory.push(message); this.localHistory.addMessages(message);
this.timeReceived.set(messageId, Date.now()); this.timeReceived.set(messageId, Date.now());
this.safeSendEvent(MessageChannelEvent.OutMessageSent, { this.safeSendEvent(MessageChannelEvent.OutMessageSent, {
detail: message detail: message
@ -749,7 +749,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
*/ */
private isMessageAvailable(messageId: MessageId): boolean { private isMessageAvailable(messageId: MessageId): boolean {
// Check if in local history // Check if in local history
if (this.localHistory.some((m) => m.messageId === messageId)) { if (this.localHistory.hasMessage(messageId)) {
return true; return true;
} }
// Check if in incoming buffer (already received, waiting for dependencies) // Check if in incoming buffer (already received, waiting for dependencies)
@ -786,8 +786,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
} }
// Check if the entry is already present // Check if the entry is already present
const existingHistoryEntry = this.localHistory.find( const existingHistoryEntry = this.localHistory.getMessage(
({ messageId }) => messageId === message.messageId message.messageId
); );
// The history entry is already present, no need to re-add // The history entry is already present, no need to re-add
@ -799,7 +799,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
log.warn("message delivered without a retrieval hint", message.messageId); log.warn("message delivered without a retrieval hint", message.messageId);
} }
this.localHistory.push(message); this.localHistory.addMessages(message);
return true; return true;
} }

View File

@ -15,27 +15,26 @@ describe("PersistentStorage", () => {
expect(persistentStorage).to.not.be.undefined; expect(persistentStorage).to.not.be.undefined;
const history1 = new MemLocalHistory({ storage: persistentStorage }); const history1 = new MemLocalHistory({ storage: persistentStorage });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2)); history1.addMessages(createMessage("msg-2", 2));
const history2 = new MemLocalHistory({ storage: persistentStorage }); const history2 = new MemLocalHistory({ storage: persistentStorage });
expect(history2.length).to.equal(2); expect(history2.size).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ expect(
"msg-1", history2.getAllMessages().map((msg) => msg.messageId)
"msg-2" ).to.deep.equal(["msg-1", "msg-2"]);
]);
}); });
it("uses in-memory only when no storage is provided", () => { it("uses in-memory only when no storage is provided", () => {
const history = new MemLocalHistory({ maxSize: 100 }); const history = new MemLocalHistory({ maxSize: 100 });
history.push(createMessage("msg-3", 3)); history.addMessages(createMessage("msg-3", 3));
expect(history.length).to.equal(1); expect(history.size).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3"); expect(history.getAllMessages()[0].messageId).to.equal("msg-3");
const history2 = new MemLocalHistory({ maxSize: 100 }); const history2 = new MemLocalHistory({ maxSize: 100 });
expect(history2.length).to.equal(0); expect(history2.size).to.equal(0);
}); });
it("handles corrupt data in storage gracefully", () => { it("handles corrupt data in storage gracefully", () => {
@ -46,7 +45,7 @@ describe("PersistentStorage", () => {
const persistentStorage = PersistentStorage.create(channelId, storage); const persistentStorage = PersistentStorage.create(channelId, storage);
const history = new MemLocalHistory({ storage: persistentStorage }); const history = new MemLocalHistory({ storage: persistentStorage });
expect(history.length).to.equal(0); expect(history.size).to.equal(0);
// Corrupt data is not saved // Corrupt data is not saved
expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null);
@ -61,14 +60,14 @@ describe("PersistentStorage", () => {
const history1 = new MemLocalHistory({ storage: storage1 }); const history1 = new MemLocalHistory({ storage: storage1 });
const history2 = new MemLocalHistory({ storage: storage2 }); const history2 = new MemLocalHistory({ storage: storage2 });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history2.push(createMessage("msg-2", 2)); history2.addMessages(createMessage("msg-2", 2));
expect(history1.length).to.equal(1); expect(history1.size).to.equal(1);
expect(history1.slice(0)[0].messageId).to.equal("msg-1"); expect(history1.getAllMessages()[0].messageId).to.equal("msg-1");
expect(history2.length).to.equal(1); expect(history2.size).to.equal(1);
expect(history2.slice(0)[0].messageId).to.equal("msg-2"); expect(history2.getAllMessages()[0].messageId).to.equal("msg-2");
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null;
@ -81,7 +80,7 @@ describe("PersistentStorage", () => {
expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; expect(storage.getItem("waku:sds:history:channel-1")).to.be.null;
history.push(createMessage("msg-1", 1)); history.addMessages(createMessage("msg-1", 1));
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
@ -95,15 +94,15 @@ describe("PersistentStorage", () => {
const persistentStorage1 = PersistentStorage.create(channelId, storage); const persistentStorage1 = PersistentStorage.create(channelId, storage);
const history1 = new MemLocalHistory({ storage: persistentStorage1 }); const history1 = new MemLocalHistory({ storage: persistentStorage1 });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2)); history1.addMessages(createMessage("msg-2", 2));
history1.push(createMessage("msg-3", 3)); history1.addMessages(createMessage("msg-3", 3));
const persistentStorage2 = PersistentStorage.create(channelId, storage); const persistentStorage2 = PersistentStorage.create(channelId, storage);
const history2 = new MemLocalHistory({ storage: persistentStorage2 }); const history2 = new MemLocalHistory({ storage: persistentStorage2 });
expect(history2.length).to.equal(3); expect(history2.size).to.equal(3);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([
"msg-1", "msg-1",
"msg-2", "msg-2",
"msg-3" "msg-3"
@ -135,16 +134,15 @@ describe("PersistentStorage", () => {
it("persists and restores messages with channelId", () => { it("persists and restores messages with channelId", () => {
const testChannelId = `test-${Date.now()}`; const testChannelId = `test-${Date.now()}`;
const history1 = new MemLocalHistory({ storage: testChannelId }); const history1 = new MemLocalHistory({ storage: testChannelId });
history1.push(createMessage("msg-1", 1)); history1.addMessages(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2)); history1.addMessages(createMessage("msg-2", 2));
const history2 = new MemLocalHistory({ storage: testChannelId }); const history2 = new MemLocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2); expect(history2.size).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ expect(
"msg-1", history2.getAllMessages().map((msg) => msg.messageId)
"msg-2" ).to.deep.equal(["msg-1", "msg-2"]);
]);
localStorage.removeItem(`waku:sds:history:${testChannelId}`); localStorage.removeItem(`waku:sds:history:${testChannelId}`);
}); });
@ -153,12 +151,12 @@ describe("PersistentStorage", () => {
const testChannelId = `auto-storage-${Date.now()}`; const testChannelId = `auto-storage-${Date.now()}`;
const history = new MemLocalHistory({ storage: testChannelId }); const history = new MemLocalHistory({ storage: testChannelId });
history.push(createMessage("msg-auto-1", 1)); history.addMessages(createMessage("msg-auto-1", 1));
history.push(createMessage("msg-auto-2", 2)); history.addMessages(createMessage("msg-auto-2", 2));
const history2 = new MemLocalHistory({ storage: testChannelId }); const history2 = new MemLocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2); expect(history2.size).to.equal(2);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ expect(history2.getAllMessages().map((m) => m.messageId)).to.deep.equal([
"msg-auto-1", "msg-auto-1",
"msg-auto-2" "msg-auto-2"
]); ]);

View File

@ -1,8 +1,8 @@
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import type { ILocalHistory } from "../mem_local_history.js";
import type { HistoryEntry, MessageId } from "../message.js"; import type { HistoryEntry, MessageId } from "../message.js";
import { Message } from "../message.js"; import { Message } from "../message.js";
import type { ILocalHistory } from "../message_channel.js";
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
import { import {
@ -191,9 +191,7 @@ export class RepairManager {
this.outgoingBuffer.remove(request.messageId); this.outgoingBuffer.remove(request.messageId);
// Check if we have this message // Check if we have this message
const message = localHistory.find( const message = localHistory.getMessage(request.messageId);
(m) => m.messageId === request.messageId
);
if (!message) { if (!message) {
log.info( log.info(
`Cannot fulfill repair for ${request.messageId} - not in local history` `Cannot fulfill repair for ${request.messageId} - not in local history`
@ -255,7 +253,7 @@ export class RepairManager {
const messages: Message[] = []; const messages: Message[] = [];
for (const entry of ready) { for (const entry of ready) {
const message = localHistory.find((m) => m.messageId === entry.messageId); const message = localHistory.getMessage(entry.messageId);
if (message) { if (message) {
messages.push(message); messages.push(message);
log.info(`Sending repair for ${entry.messageId}`); log.info(`Sending repair for ${entry.messageId}`);