chore: address comments, reduce diff

This commit is contained in:
Danish Arora 2025-12-09 16:59:52 -05:00
parent 2ee8b08975
commit 41f8fdf8cf
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
5 changed files with 36 additions and 67 deletions

View File

@ -1,13 +1,13 @@
import { expect } from "chai";
import { MemLocalHistory } from "./mem_local_history.js";
import { LocalHistory } from "./local_history.js";
import { ContentMessage } from "./message.js";
describe("MemLocalHistory", () => {
describe("LocalHistory", () => {
it("Cap max size when messages are pushed one at a time", () => {
const maxSize = 2;
const hist = new MemLocalHistory({ maxSize: maxSize });
const hist = new LocalHistory({ maxSize });
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
@ -31,7 +31,7 @@ describe("MemLocalHistory", () => {
it("Cap max size when a pushed array is exceeding the cap", () => {
const maxSize = 2;
const hist = new MemLocalHistory({ maxSize: maxSize });
const hist = new LocalHistory({ maxSize });
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))

View File

@ -2,6 +2,7 @@ import { Logger } from "@waku/utils";
import _ from "lodash";
import { type ChannelId, ContentMessage, isContentMessage } from "./message.js";
import { ILocalHistory } from "./message_channel.js";
import { PersistentStorage } from "./persistent_storage.js";
export const DEFAULT_MAX_LENGTH = 10_000;
@ -19,44 +20,15 @@ export const DEFAULT_MAX_LENGTH = 10_000;
* If an array of items longer than `maxLength` is pushed, dropping will happen
* at next push.
*/
export interface ILocalHistory {
length: number;
push(...items: ContentMessage[]): number;
some(
predicate: (
value: ContentMessage,
index: number,
array: ContentMessage[]
) => unknown,
thisArg?: any
): boolean;
slice(start?: number, end?: number): ContentMessage[];
find(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): ContentMessage | undefined;
findIndex(
predicate: (
value: ContentMessage,
index: number,
obj: ContentMessage[]
) => unknown,
thisArg?: any
): number;
}
export type MemLocalHistoryOptions = {
export type LocalHistoryOptions = {
storage?: ChannelId | PersistentStorage;
maxSize?: number;
};
const log = new Logger("sds:local-history");
export class MemLocalHistory implements ILocalHistory {
export class LocalHistory implements ILocalHistory {
private items: ContentMessage[] = [];
private readonly storage?: PersistentStorage;
private readonly maxSize: number;
@ -68,7 +40,7 @@ export class MemLocalHistory implements ILocalHistory {
* - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage.
* - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH.
*/
public constructor(opts: MemLocalHistoryOptions = {}) {
public constructor(opts: LocalHistoryOptions = {}) {
const { storage, maxSize } = opts;
this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH;
if (storage instanceof PersistentStorage) {

View File

@ -4,8 +4,7 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { MessageChannelEvent } from "./events.js";
import { MemLocalHistory } from "./mem_local_history.js";
import { ILocalHistory } from "./mem_local_history.js";
import { LocalHistory } from "./local_history.js";
import {
ContentMessage,
HistoryEntry,
@ -25,7 +24,7 @@ const callback = (_message: Message): Promise<{ success: boolean }> => {
};
/**
* Test helper to create a MessageChannel with MemLocalHistory.
* Test helper to create a MessageChannel with LocalHistory.
* This avoids localStorage pollution in tests and tests core functionality.
*/
const createTestChannel = (
@ -33,12 +32,7 @@ const createTestChannel = (
senderId: string,
options: MessageChannelOptions = {}
): MessageChannel => {
return new MessageChannel(
channelId,
senderId,
options,
new MemLocalHistory()
);
return new MessageChannel(channelId, senderId, options, new LocalHistory());
};
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
@ -117,11 +111,11 @@ describe("MessageChannel", function () {
const expectedTimestamp = channelA["lamportTimestamp"] + 1n;
const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback);
const messageIdLog = channelA["localHistory"] as ILocalHistory;
const messageIdLog = channelA["localHistory"] as LocalHistory;
expect(messageIdLog.length).to.equal(1);
expect(
messageIdLog.some(
(log) =>
(log: ContentMessage) =>
log.lamportTimestamp === expectedTimestamp &&
log.messageId === messageId
)
@ -138,7 +132,7 @@ describe("MessageChannel", function () {
return { success: true, retrievalHint: testRetrievalHint };
});
const localHistory = channelA["localHistory"] as ILocalHistory;
const localHistory = channelA["localHistory"] as LocalHistory;
expect(localHistory.length).to.equal(1);
// Find the message in local history

View File

@ -7,7 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js";
import { LocalHistory } from "./local_history.js";
import {
ChannelId,
ContentMessage,
@ -23,8 +23,6 @@ import {
} from "./message.js";
import { RepairConfig, RepairManager } from "./repair/repair.js";
export type { ILocalHistory };
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
capacity: 10000,
errorRate: 0.001
@ -40,6 +38,11 @@ const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2;
const log = new Logger("sds:message-channel");
export type ILocalHistory = Pick<
Array<ContentMessage>,
"some" | "push" | "slice" | "find" | "length" | "findIndex"
>;
export interface MessageChannelOptions {
causalHistorySize?: number;
/**
@ -115,7 +118,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.possibleAcks = new Map();
this.incomingBuffer = [];
this.localHistory =
localHistory ?? new MemLocalHistory({ storage: channelId });
localHistory ?? new LocalHistory({ storage: channelId });
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
// TODO: this should be determined based on the bloom filter parameters and number of hashes

View File

@ -1,6 +1,6 @@
import { expect } from "chai";
import { MemLocalHistory } from "./mem_local_history.js";
import { LocalHistory } from "./local_history.js";
import { ContentMessage } from "./message.js";
import { HistoryStorage, PersistentStorage } from "./persistent_storage.js";
@ -14,11 +14,11 @@ describe("PersistentStorage", () => {
expect(persistentStorage).to.not.be.undefined;
const history1 = new MemLocalHistory({ storage: persistentStorage });
const history1 = new LocalHistory({ storage: persistentStorage });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
const history2 = new MemLocalHistory({ storage: persistentStorage });
const history2 = new LocalHistory({ storage: persistentStorage });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
@ -28,13 +28,13 @@ describe("PersistentStorage", () => {
});
it("uses in-memory only when no storage is provided", () => {
const history = new MemLocalHistory({ maxSize: 100 });
const history = new LocalHistory({ maxSize: 100 });
history.push(createMessage("msg-3", 3));
expect(history.length).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3");
const history2 = new MemLocalHistory({ maxSize: 100 });
const history2 = new LocalHistory({ maxSize: 100 });
expect(history2.length).to.equal(0);
});
@ -44,7 +44,7 @@ describe("PersistentStorage", () => {
storage.setItem("waku:sds:history:channel-1", "{ invalid json }");
const persistentStorage = PersistentStorage.create(channelId, storage);
const history = new MemLocalHistory({ storage: persistentStorage });
const history = new LocalHistory({ storage: persistentStorage });
expect(history.length).to.equal(0);
@ -58,8 +58,8 @@ describe("PersistentStorage", () => {
const storage1 = PersistentStorage.create("channel-1", storage);
const storage2 = PersistentStorage.create("channel-2", storage);
const history1 = new MemLocalHistory({ storage: storage1 });
const history2 = new MemLocalHistory({ storage: storage2 });
const history1 = new LocalHistory({ storage: storage1 });
const history2 = new LocalHistory({ storage: storage2 });
history1.push(createMessage("msg-1", 1));
history2.push(createMessage("msg-2", 2));
@ -77,7 +77,7 @@ describe("PersistentStorage", () => {
it("saves messages after each push", () => {
const storage = new MemoryStorage();
const persistentStorage = PersistentStorage.create(channelId, storage);
const history = new MemLocalHistory({ storage: persistentStorage });
const history = new LocalHistory({ storage: persistentStorage });
expect(storage.getItem("waku:sds:history:channel-1")).to.be.null;
@ -93,14 +93,14 @@ describe("PersistentStorage", () => {
it("loads messages on initialization", () => {
const storage = new MemoryStorage();
const persistentStorage1 = PersistentStorage.create(channelId, storage);
const history1 = new MemLocalHistory({ storage: persistentStorage1 });
const history1 = new LocalHistory({ storage: persistentStorage1 });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
history1.push(createMessage("msg-3", 3));
const persistentStorage2 = PersistentStorage.create(channelId, storage);
const history2 = new MemLocalHistory({ storage: persistentStorage2 });
const history2 = new LocalHistory({ storage: persistentStorage2 });
expect(history2.length).to.equal(3);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
@ -134,11 +134,11 @@ describe("PersistentStorage", () => {
it("persists and restores messages with channelId", () => {
const testChannelId = `test-${Date.now()}`;
const history1 = new MemLocalHistory({ storage: testChannelId });
const history1 = new LocalHistory({ storage: testChannelId });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
const history2 = new MemLocalHistory({ storage: testChannelId });
const history2 = new LocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
@ -152,11 +152,11 @@ describe("PersistentStorage", () => {
it("auto-uses localStorage when channelId is provided", () => {
const testChannelId = `auto-storage-${Date.now()}`;
const history = new MemLocalHistory({ storage: testChannelId });
const history = new LocalHistory({ storage: testChannelId });
history.push(createMessage("msg-auto-1", 1));
history.push(createMessage("msg-auto-2", 2));
const history2 = new MemLocalHistory({ storage: testChannelId });
const history2 = new LocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
"msg-auto-1",