From dad9cffdbed0aa52b866987b640ae6cc022e966d Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 14 Nov 2025 16:14:41 -0500 Subject: [PATCH 01/22] feat: persistent history for SDS --- .../src/message_channel/message_channel.ts | 8 +- .../persistent_history.spec.ts | 62 ++++++ .../src/message_channel/persistent_history.ts | 177 ++++++++++++++++++ 3 files changed, 244 insertions(+), 3 deletions(-) create mode 100644 packages/sds/src/message_channel/persistent_history.spec.ts create mode 100644 packages/sds/src/message_channel/persistent_history.ts diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index eccc1435b9..fa01083869 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,7 +7,6 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; -import { MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, @@ -21,6 +20,7 @@ import { ParticipantId, SyncMessage } from "./message.js"; +import { PersistentHistory } from "./persistent_history.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -106,7 +106,7 @@ export class MessageChannel extends TypedEventEmitter { channelId: ChannelId, senderId: ParticipantId, options: MessageChannelOptions = {}, - localHistory: ILocalHistory = new MemLocalHistory() + localHistory?: ILocalHistory ) { super(); this.channelId = channelId; @@ -117,7 +117,9 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.possibleAcks = new Map(); this.incomingBuffer = []; - this.localHistory = localHistory; + const resolvedLocalHistory = + localHistory ?? new PersistentHistory({ channelId: this.channelId }); + this.localHistory = resolvedLocalHistory; this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_history.spec.ts b/packages/sds/src/message_channel/persistent_history.spec.ts new file mode 100644 index 0000000000..941473dd94 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_history.spec.ts @@ -0,0 +1,62 @@ +import { expect } from "chai"; + +import { ContentMessage } from "./message.js"; +import { HistoryStorage, PersistentHistory } from "./persistent_history.js"; + +class MemoryStorage implements HistoryStorage { + private readonly store = new Map(); + + public getItem(key: string): string | null { + return this.store.get(key) ?? null; + } + + public setItem(key: string, value: string): void { + this.store.set(key, value); + } + + public removeItem(key: string): void { + this.store.delete(key); + } +} + +const channelId = "channel-1"; + +const createMessage = (id: string, timestamp: number): ContentMessage => { + return new ContentMessage( + id, + channelId, + "sender", + [], + BigInt(timestamp), + undefined, + new Uint8Array([timestamp]), + undefined + ); +}; + +describe("PersistentHistory", () => { + it("persists and restores messages", () => { + const storage = new MemoryStorage(); + const history = new PersistentHistory({ channelId, storage }); + + history.push(createMessage("msg-1", 1)); + history.push(createMessage("msg-2", 2)); + + const restored = new PersistentHistory({ channelId, storage }); + + expect(restored.length).to.equal(2); + expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + }); + + it("behaves like memory history when storage is unavailable", () => { + const history = new PersistentHistory({ channelId, storage: undefined }); + + history.push(createMessage("msg-3", 3)); + + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); + }); +}); diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts new file mode 100644 index 0000000000..735ce403f8 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -0,0 +1,177 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; + +import { MemLocalHistory } from "./mem_local_history.js"; +import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; + +export interface HistoryStorage { + getItem(key: string): string | null; + setItem(key: string, value: string): void; + removeItem(key: string): void; +} + +export interface PersistentHistoryOptions { + channelId: ChannelId; + storage?: HistoryStorage; + storageKey?: string; +} + +type StoredHistoryEntry = { + messageId: string; + retrievalHint?: string; +}; + +type StoredContentMessage = { + messageId: string; + channelId: string; + senderId: string; + lamportTimestamp: string; + causalHistory: StoredHistoryEntry[]; + bloomFilter?: string; + content: string; + retrievalHint?: string; +}; + +const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; + +/** + * Persists the SDS local history in a browser/localStorage compatible backend. + * + * If no storage backend is available, this behaves like {@link MemLocalHistory}. + */ +export class PersistentHistory extends MemLocalHistory { + private readonly storage?: HistoryStorage; + private readonly storageKey: string; + + public constructor(options: PersistentHistoryOptions) { + super(); + this.storage = options.storage ?? getDefaultHistoryStorage(); + this.storageKey = + options.storageKey ?? `${HISTORY_STORAGE_PREFIX}${options.channelId}`; + this.restore(); + } + + public override push(...items: ContentMessage[]): number { + const length = super.push(...items); + this.persist(); + return length; + } + + private persist(): void { + if (!this.storage) { + return; + } + try { + const payload = JSON.stringify( + this.slice(0).map(serializeContentMessage) + ); + this.storage.setItem(this.storageKey, payload); + } catch { + // Ignore persistence errors (e.g. quota exceeded). + } + } + + private restore(): void { + if (!this.storage) { + return; + } + + try { + const raw = this.storage.getItem(this.storageKey); + if (!raw) { + return; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + const messages = stored + .map(deserializeContentMessage) + .filter((message): message is ContentMessage => Boolean(message)); + if (messages.length) { + super.push(...messages); + } + } catch { + try { + this.storage.removeItem(this.storageKey); + } catch { + // Ignore cleanup errors. + } + } + } +} + +export const getDefaultHistoryStorage = (): HistoryStorage | undefined => { + try { + if (typeof localStorage === "undefined") { + return undefined; + } + + const probeKey = `${HISTORY_STORAGE_PREFIX}__probe__`; + localStorage.setItem(probeKey, probeKey); + localStorage.removeItem(probeKey); + return localStorage; + } catch { + return undefined; + } +}; + +const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({ + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? bytesToHex(entry.retrievalHint) + : undefined +}); + +const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({ + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? hexToBytes(entry.retrievalHint) + : undefined +}); + +const serializeContentMessage = ( + message: ContentMessage +): StoredContentMessage => ({ + messageId: message.messageId, + channelId: message.channelId, + senderId: message.senderId, + lamportTimestamp: message.lamportTimestamp.toString(), + causalHistory: message.causalHistory.map(serializeHistoryEntry), + bloomFilter: toHex(message.bloomFilter), + content: bytesToHex(new Uint8Array(message.content)), + retrievalHint: toHex(message.retrievalHint) +}); + +const deserializeContentMessage = ( + record: StoredContentMessage +): ContentMessage | undefined => { + try { + const content = hexToBytes(record.content); + return new ContentMessage( + record.messageId, + record.channelId, + record.senderId, + record.causalHistory.map(deserializeHistoryEntry), + BigInt(record.lamportTimestamp), + fromHex(record.bloomFilter), + content, + fromHex(record.retrievalHint) + ); + } catch { + return undefined; + } +}; + +const toHex = ( + data?: Uint8Array | Uint8Array +): string | undefined => { + if (!data || data.length === 0) { + return undefined; + } + return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); +}; + +const fromHex = (value?: string): Uint8Array | undefined => { + if (!value) { + return undefined; + } + return hexToBytes(value); +}; From 454dc4a93ba2d2ade1b6f9a3974890b08ed78847 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 20 Nov 2025 16:07:35 -0500 Subject: [PATCH 02/22] refactor: simplify localHistory initialization by removing intermediate constant --- packages/sds/src/message_channel/message_channel.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index fa01083869..e55ac045d4 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -117,9 +117,8 @@ export class MessageChannel extends TypedEventEmitter { this.outgoingBuffer = []; this.possibleAcks = new Map(); this.incomingBuffer = []; - const resolvedLocalHistory = + this.localHistory = localHistory ?? new PersistentHistory({ channelId: this.channelId }); - this.localHistory = resolvedLocalHistory; this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes From ca24c09688750e4773b4d1931e1fea1d7dfd8543 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 20 Nov 2025 16:11:12 -0500 Subject: [PATCH 03/22] feat(history): introduce `ILocalHistory` interface and refactor `PersistentHistory` to use composition over inheritance for history management. --- .../src/message_channel/mem_local_history.ts | 32 ++++++++++- .../src/message_channel/persistent_history.ts | 57 ++++++++++++++++--- 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index fa62bfb9ae..67e0b73ae3 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -17,7 +17,37 @@ export const DEFAULT_MAX_LENGTH = 10_000; * If an array of items longer than `maxLength` is pushed, dropping will happen * at next push. */ -export class MemLocalHistory { +export interface ILocalHistory { + length: number; + push(...items: ContentMessage[]): number; + some( + predicate: ( + value: ContentMessage, + index: number, + array: ContentMessage[] + ) => unknown, + thisArg?: any + ): boolean; + slice(start?: number, end?: number): ContentMessage[]; + find( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): ContentMessage | undefined; + findIndex( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): number; +} + +export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; /** diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index 735ce403f8..1a578554b2 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -1,6 +1,6 @@ import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; -import { MemLocalHistory } from "./mem_local_history.js"; +import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; export interface HistoryStorage { @@ -38,31 +38,73 @@ const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; * * If no storage backend is available, this behaves like {@link MemLocalHistory}. */ -export class PersistentHistory extends MemLocalHistory { +export class PersistentHistory implements ILocalHistory { private readonly storage?: HistoryStorage; private readonly storageKey: string; + private readonly memory: MemLocalHistory; public constructor(options: PersistentHistoryOptions) { - super(); + this.memory = new MemLocalHistory(); this.storage = options.storage ?? getDefaultHistoryStorage(); this.storageKey = options.storageKey ?? `${HISTORY_STORAGE_PREFIX}${options.channelId}`; this.restore(); } - public override push(...items: ContentMessage[]): number { - const length = super.push(...items); + public get length(): number { + return this.memory.length; + } + + public push(...items: ContentMessage[]): number { + const length = this.memory.push(...items); this.persist(); return length; } + public some( + predicate: ( + value: ContentMessage, + index: number, + array: ContentMessage[] + ) => unknown, + thisArg?: any + ): boolean { + return this.memory.some(predicate, thisArg); + } + + public slice(start?: number, end?: number): ContentMessage[] { + return this.memory.slice(start, end); + } + + public find( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): ContentMessage | undefined { + return this.memory.find(predicate, thisArg); + } + + public findIndex( + predicate: ( + value: ContentMessage, + index: number, + obj: ContentMessage[] + ) => unknown, + thisArg?: any + ): number { + return this.memory.findIndex(predicate, thisArg); + } + private persist(): void { if (!this.storage) { return; } try { const payload = JSON.stringify( - this.slice(0).map(serializeContentMessage) + this.memory.slice(0).map(serializeContentMessage) ); this.storage.setItem(this.storageKey, payload); } catch { @@ -86,7 +128,7 @@ export class PersistentHistory extends MemLocalHistory { .map(deserializeContentMessage) .filter((message): message is ContentMessage => Boolean(message)); if (messages.length) { - super.push(...messages); + this.memory.push(...messages); } } catch { try { @@ -153,6 +195,7 @@ const deserializeContentMessage = ( BigInt(record.lamportTimestamp), fromHex(record.bloomFilter), content, + [], fromHex(record.retrievalHint) ); } catch { From 032c3f5faf574caa403fb92130dc0b8d7f0b19bc Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 20 Nov 2025 16:12:39 -0500 Subject: [PATCH 04/22] refactor: rename `storageKey` to `storageKeyPrefix` and update storage key generation. --- packages/sds/src/message_channel/persistent_history.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index 1a578554b2..3b198d9edb 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -12,7 +12,7 @@ export interface HistoryStorage { export interface PersistentHistoryOptions { channelId: ChannelId; storage?: HistoryStorage; - storageKey?: string; + storageKeyPrefix?: string; } type StoredHistoryEntry = { @@ -46,8 +46,7 @@ export class PersistentHistory implements ILocalHistory { public constructor(options: PersistentHistoryOptions) { this.memory = new MemLocalHistory(); this.storage = options.storage ?? getDefaultHistoryStorage(); - this.storageKey = - options.storageKey ?? `${HISTORY_STORAGE_PREFIX}${options.channelId}`; + this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.storageKeyPrefix}:${options.channelId}`; this.restore(); } From b8ee363cb186aa0f073145b34f027000c87e9051 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 20 Nov 2025 16:13:44 -0500 Subject: [PATCH 05/22] refactor: rename persistence methods from `persist`/`restore` to `save`/`load` --- packages/sds/src/message_channel/persistent_history.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index 3b198d9edb..2f8979a623 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -47,7 +47,7 @@ export class PersistentHistory implements ILocalHistory { this.memory = new MemLocalHistory(); this.storage = options.storage ?? getDefaultHistoryStorage(); this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.storageKeyPrefix}:${options.channelId}`; - this.restore(); + this.load(); } public get length(): number { @@ -56,7 +56,7 @@ export class PersistentHistory implements ILocalHistory { public push(...items: ContentMessage[]): number { const length = this.memory.push(...items); - this.persist(); + this.save(); return length; } @@ -97,7 +97,7 @@ export class PersistentHistory implements ILocalHistory { return this.memory.findIndex(predicate, thisArg); } - private persist(): void { + private save(): void { if (!this.storage) { return; } @@ -111,7 +111,7 @@ export class PersistentHistory implements ILocalHistory { } } - private restore(): void { + private load(): void { if (!this.storage) { return; } From 322240b2d0a204430a69386356b36cbd3a94d960 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 20 Nov 2025 16:18:29 -0500 Subject: [PATCH 06/22] refactor: Remove silent error suppression for persistent history storage operations. --- .../src/message_channel/persistent_history.ts | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index 2f8979a623..d9de3e592b 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -101,14 +101,11 @@ export class PersistentHistory implements ILocalHistory { if (!this.storage) { return; } - try { - const payload = JSON.stringify( - this.memory.slice(0).map(serializeContentMessage) - ); - this.storage.setItem(this.storageKey, payload); - } catch { - // Ignore persistence errors (e.g. quota exceeded). - } + + const payload = JSON.stringify( + this.memory.slice(0).map(serializeContentMessage) + ); + this.storage.setItem(this.storageKey, payload); } private load(): void { @@ -130,11 +127,7 @@ export class PersistentHistory implements ILocalHistory { this.memory.push(...messages); } } catch { - try { - this.storage.removeItem(this.storageKey); - } catch { - // Ignore cleanup errors. - } + this.storage.removeItem(this.storageKey); } } } From 15f1a22f2e2bd7e01ac72908a5db860cd8cacbbc Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Thu, 20 Nov 2025 16:26:27 -0500 Subject: [PATCH 07/22] feat: remove `storageKeyPrefix` and `getDefaultHistoryStorage`, simplifying history storage initialization to default to `localStorage` --- .../src/message_channel/persistent_history.ts | 20 ++----------------- 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index d9de3e592b..d68cae183e 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -12,7 +12,6 @@ export interface HistoryStorage { export interface PersistentHistoryOptions { channelId: ChannelId; storage?: HistoryStorage; - storageKeyPrefix?: string; } type StoredHistoryEntry = { @@ -45,8 +44,8 @@ export class PersistentHistory implements ILocalHistory { public constructor(options: PersistentHistoryOptions) { this.memory = new MemLocalHistory(); - this.storage = options.storage ?? getDefaultHistoryStorage(); - this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.storageKeyPrefix}:${options.channelId}`; + this.storage = options.storage || localStorage; + this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`; this.load(); } @@ -132,21 +131,6 @@ export class PersistentHistory implements ILocalHistory { } } -export const getDefaultHistoryStorage = (): HistoryStorage | undefined => { - try { - if (typeof localStorage === "undefined") { - return undefined; - } - - const probeKey = `${HISTORY_STORAGE_PREFIX}__probe__`; - localStorage.setItem(probeKey, probeKey); - localStorage.removeItem(probeKey); - return localStorage; - } catch { - return undefined; - } -}; - const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({ messageId: entry.messageId, retrievalHint: entry.retrievalHint From 581430ab43689433be81ff84e7bcaba1cc0ebd34 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 21 Nov 2025 13:52:20 -0500 Subject: [PATCH 08/22] feat: Conditionally set default history storage to localStorage for improved compatibility. --- packages/sds/src/message_channel/persistent_history.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index d68cae183e..7806476592 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -31,6 +31,8 @@ type StoredContentMessage = { }; const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; +const DEFAULT_HISTORY_STORAGE: HistoryStorage | undefined = + typeof localStorage !== "undefined" ? localStorage : undefined; /** * Persists the SDS local history in a browser/localStorage compatible backend. @@ -44,7 +46,7 @@ export class PersistentHistory implements ILocalHistory { public constructor(options: PersistentHistoryOptions) { this.memory = new MemLocalHistory(); - this.storage = options.storage || localStorage; + this.storage = options.storage ?? DEFAULT_HISTORY_STORAGE; this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`; this.load(); } From d52ea9fd2c1a942ecc916fd013291aec7274482f Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 19:11:01 -0500 Subject: [PATCH 09/22] tests: use memLocalHistory for message_channel.spec.ts --- .../message_channel/message_channel.spec.ts | 63 +++++++++++++------ 1 file changed, 43 insertions(+), 20 deletions(-) diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index ea1629250c..2ba2758452 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -4,6 +4,7 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { MessageChannelEvent } from "./events.js"; +import { MemLocalHistory } from "./mem_local_history.js"; import { ContentMessage, HistoryEntry, @@ -22,6 +23,28 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { return Promise.resolve({ success: true }); }; +/** + * Test helper to create a MessageChannel with MemLocalHistory. + * This avoids localStorage pollution in tests and tests core functionality. + */ +const createTestChannel = ( + channelId: string, + senderId: string, + options: { + causalHistorySize?: number; + possibleAcksThreshold?: number; + timeoutForLostMessagesMs?: number; + enableRepair?: boolean; + } = {} +): MessageChannel => { + return new MessageChannel( + channelId, + senderId, + options, + new MemLocalHistory() + ); +}; + const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { return channel["filter"] as DefaultBloomFilter; }; @@ -68,7 +91,7 @@ describe("MessageChannel", function () { describe("sending a message ", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); + channelA = createTestChannel(channelId, "alice"); }); it("should increase lamport timestamp", async () => { @@ -171,8 +194,8 @@ describe("MessageChannel", function () { describe("receiving a message", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); - channelB = new MessageChannel(channelId, "bob"); + channelA = createTestChannel(channelId, "alice"); + channelB = createTestChannel(channelId, "bob"); }); it("should increase lamport timestamp", async () => { @@ -187,8 +210,8 @@ describe("MessageChannel", function () { // TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648 it.skip("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { - const testChannelA = new MessageChannel(channelId, "alice"); - const testChannelB = new MessageChannel(channelId, "bob"); + const testChannelA = createTestChannel(channelId, "alice"); + const testChannelB = createTestChannel(channelId, "bob"); const timestampBefore = testChannelA["lamportTimestamp"]; @@ -452,10 +475,10 @@ describe("MessageChannel", function () { describe("reviewing ack status", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should mark all messages in causal history as acknowledged", async () => { @@ -661,10 +684,10 @@ describe("MessageChannel", function () { describe("Sweeping incoming buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should detect messages with missing dependencies", async () => { @@ -746,7 +769,7 @@ describe("MessageChannel", function () { it("should mark a message as irretrievably lost if timeout is exceeded", async () => { // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -789,7 +812,7 @@ describe("MessageChannel", function () { let lostMessages: HistoryEntry[] = []; // Create a channel with very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -853,7 +876,7 @@ describe("MessageChannel", function () { it("should remove messages without delivering if timeout is exceeded", async () => { const causalHistorySize = channelA["causalHistorySize"]; // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { timeoutForLostMessagesMs: 10 }); @@ -1043,10 +1066,10 @@ describe("MessageChannel", function () { describe("Sweeping outgoing buffer", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); }); it("should partition messages based on acknowledgement status", async () => { @@ -1088,10 +1111,10 @@ describe("MessageChannel", function () { describe("Sync messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice", { + channelA = createTestChannel(channelId, "alice", { causalHistorySize: 2 }); - channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 }); + channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 }); const message = utf8ToBytes("first message in channel"); channelA["localHistory"].push( new ContentMessage( @@ -1115,7 +1138,7 @@ describe("MessageChannel", function () { }); it("should not be sent when there is no history", async () => { - const channelC = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { causalHistorySize: 2 }); const res = await channelC.pushOutgoingSyncMessage(async (_msg) => { @@ -1160,7 +1183,7 @@ describe("MessageChannel", function () { }); it("should update ack status of messages in outgoing buffer", async () => { - const channelC = new MessageChannel(channelId, "carol", { + const channelC = createTestChannel(channelId, "carol", { causalHistorySize: 2 }); for (const m of messagesA) { @@ -1185,7 +1208,7 @@ describe("MessageChannel", function () { describe("Ephemeral messages", () => { beforeEach(() => { - channelA = new MessageChannel(channelId, "alice"); + channelA = createTestChannel(channelId, "alice"); }); it("should be sent without a timestamp, causal history, or bloom filter", async () => { @@ -1208,7 +1231,7 @@ describe("MessageChannel", function () { }); it("should be delivered immediately if received", async () => { - const channelB = new MessageChannel(channelId, "bob"); + const channelB = createTestChannel(channelId, "bob"); // Track initial state const localHistoryBefore = channelB["localHistory"].length; From d6412e5536b0babfd50c6e4eb8b2cb8019ee06e9 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 19:30:36 -0500 Subject: [PATCH 10/22] test: add unit tests for localStorage persistence and error handling in message channels --- .../message_channel/message_channel.spec.ts | 56 +++++++++++++++++++ .../persistent_history.spec.ts | 37 ++++++++++++ 2 files changed, 93 insertions(+) diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 2ba2758452..461d1ba9c3 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -1259,4 +1259,60 @@ describe("MessageChannel", function () { expect(channelB["lamportTimestamp"]).to.equal(timestampBefore); }); }); + + describe("Default localStorage persistence", () => { + it("should restore messages from localStorage on channel recreation", async () => { + const persistentChannelId = "persistent-channel"; + + const channel1 = new MessageChannel(persistentChannelId, "alice"); + + await sendMessage(channel1, utf8ToBytes("msg-1"), callback); + await sendMessage(channel1, utf8ToBytes("msg-2"), callback); + + expect(channel1["localHistory"].length).to.equal(2); + + // Recreate channel with same storage - should load history + const channel2 = new MessageChannel(persistentChannelId, "alice"); + + expect(channel2["localHistory"].length).to.equal(2); + expect( + channel2["localHistory"].slice(0).map((m) => m.messageId) + ).to.deep.equal([ + MessageChannel.getMessageId(utf8ToBytes("msg-1")), + MessageChannel.getMessageId(utf8ToBytes("msg-2")) + ]); + }); + + it("should include persisted messages in causal history after restart", async () => { + const persistentChannelId = "persistent-causal"; + + const channel1 = new MessageChannel(persistentChannelId, "alice", { + causalHistorySize: 2 + }); + + await sendMessage(channel1, utf8ToBytes("msg-1"), callback); + await sendMessage(channel1, utf8ToBytes("msg-2"), callback); + await sendMessage(channel1, utf8ToBytes("msg-3"), callback); + + const channel2 = new MessageChannel(persistentChannelId, "alice", { + causalHistorySize: 2 + }); + + let capturedMessage: ContentMessage | null = null; + await sendMessage(channel2, utf8ToBytes("msg-4"), async (message) => { + capturedMessage = message; + return { success: true }; + }); + + expect(capturedMessage).to.not.be.null; + expect(capturedMessage!.causalHistory).to.have.lengthOf(2); + // Should reference the last 2 messages (msg-2 and msg-3) + expect(capturedMessage!.causalHistory[0].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes("msg-2")) + ); + expect(capturedMessage!.causalHistory[1].messageId).to.equal( + MessageChannel.getMessageId(utf8ToBytes("msg-3")) + ); + }); + }); }); diff --git a/packages/sds/src/message_channel/persistent_history.spec.ts b/packages/sds/src/message_channel/persistent_history.spec.ts index 941473dd94..7ee1c7914f 100644 --- a/packages/sds/src/message_channel/persistent_history.spec.ts +++ b/packages/sds/src/message_channel/persistent_history.spec.ts @@ -59,4 +59,41 @@ describe("PersistentHistory", () => { expect(history.length).to.equal(1); expect(history.slice(0)[0].messageId).to.equal("msg-3"); }); + + it("handles corrupt data in storage gracefully", () => { + const storage = new MemoryStorage(); + storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); + + const history = new PersistentHistory({ channelId, storage }); + expect(history.length).to.equal(0); + + // Local history should be empty + expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); + }); + + it("isolates history by channel ID", () => { + const storage = new MemoryStorage(); + + const history1 = new PersistentHistory({ + channelId: "channel-1", + storage + }); + const history2 = new PersistentHistory({ + channelId: "channel-2", + storage + }); + + history1.push(createMessage("msg-1", 1)); + history2.push(createMessage("msg-2", 2)); + + // Each channel should only see its own messages + expect(history1.length).to.equal(1); + expect(history1.slice(0)[0].messageId).to.equal("msg-1"); + + expect(history2.length).to.equal(1); + expect(history2.slice(0)[0].messageId).to.equal("msg-2"); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; + }); }); From ad4973aa201193c803a8c3e0e0f5415e62145b18 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 20:00:06 -0500 Subject: [PATCH 11/22] test: local storage specific test --- .../sds/src/message_channel/message_channel.spec.ts | 9 ++++++++- packages/sds/src/message_channel/persistent_history.ts | 10 ++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 461d1ba9c3..38b1881dad 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -1260,7 +1260,14 @@ describe("MessageChannel", function () { }); }); - describe("Default localStorage persistence", () => { + describe("localStorage persistence", function () { + // LocalStorage specific tests (browser) + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + it("should restore messages from localStorage on channel recreation", async () => { const persistentChannelId = "persistent-channel"; diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts index 7806476592..d575e1e3e7 100644 --- a/packages/sds/src/message_channel/persistent_history.ts +++ b/packages/sds/src/message_channel/persistent_history.ts @@ -1,8 +1,11 @@ import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; +import { Logger } from "@waku/utils"; import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; +const log = new Logger("sds:persistent-history"); + export interface HistoryStorage { getItem(key: string): string | null; setItem(key: string, value: string): void; @@ -48,6 +51,13 @@ export class PersistentHistory implements ILocalHistory { this.memory = new MemLocalHistory(); this.storage = options.storage ?? DEFAULT_HISTORY_STORAGE; this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`; + + if (!this.storage) { + log.warn( + "Storage backend unavailable (localStorage not found). Falling back to in-memory history. Messages will not persist across sessions." + ); + } + this.load(); } From 503c348b79701c2be95a580466781bc3d67659d6 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 21:12:59 -0500 Subject: [PATCH 12/22] feat: Introduce `PersistentStorage` for message history management and refactor `MemLocalHistory` to support optional persistent storage. --- packages/sds/src/message_channel/index.ts | 5 + .../message_channel/mem_local_history.spec.ts | 4 +- .../src/message_channel/mem_local_history.ts | 48 +++- .../message_channel/message_channel.spec.ts | 2 +- .../src/message_channel/message_channel.ts | 9 +- .../persistent_history.spec.ts | 99 --------- .../src/message_channel/persistent_history.ts | 208 ------------------ .../persistent_storage.spec.ts | 198 +++++++++++++++++ .../src/message_channel/persistent_storage.ts | 173 +++++++++++++++ .../sds/src/message_channel/repair/repair.ts | 2 +- 10 files changed, 424 insertions(+), 324 deletions(-) delete mode 100644 packages/sds/src/message_channel/persistent_history.spec.ts delete mode 100644 packages/sds/src/message_channel/persistent_history.ts create mode 100644 packages/sds/src/message_channel/persistent_storage.spec.ts create mode 100644 packages/sds/src/message_channel/persistent_storage.ts diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts index 7a8e279c2a..5aad874bed 100644 --- a/packages/sds/src/message_channel/index.ts +++ b/packages/sds/src/message_channel/index.ts @@ -14,3 +14,8 @@ export { isEphemeralMessage, isSyncMessage } from "./message.js"; +export { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; +export { + PersistentStorage, + type HistoryStorage +} from "./persistent_storage.js"; diff --git a/packages/sds/src/message_channel/mem_local_history.spec.ts b/packages/sds/src/message_channel/mem_local_history.spec.ts index 7fd1f567a6..d57e46493a 100644 --- a/packages/sds/src/message_channel/mem_local_history.spec.ts +++ b/packages/sds/src/message_channel/mem_local_history.spec.ts @@ -7,7 +7,7 @@ describe("MemLocalHistory", () => { it("Cap max size when messages are pushed one at a time", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new MemLocalHistory({ maxSize: maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) @@ -31,7 +31,7 @@ describe("MemLocalHistory", () => { it("Cap max size when a pushed array is exceeding the cap", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new MemLocalHistory({ maxSize: maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 67e0b73ae3..8f715a89e0 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,6 +1,7 @@ import _ from "lodash"; -import { ContentMessage, isContentMessage } from "./message.js"; +import { ChannelId, ContentMessage, isContentMessage } from "./message.js"; +import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -49,13 +50,31 @@ export interface ILocalHistory { export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; + private readonly storage?: PersistentStorage; + private readonly maxSize: number; /** - * Construct a new in-memory local history + * Construct a new in-memory local history. * - * @param maxLength The maximum number of message to store. + * @param opts Configuration object. + * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. + * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ - public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {} + public constructor( + opts: { storage?: ChannelId | PersistentStorage; maxSize?: number } = {} + ) { + const { storage, maxSize } = opts; + this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; + if (storage instanceof PersistentStorage) { + this.storage = storage; + } else if (typeof storage === "string") { + this.storage = PersistentStorage.create(storage); + } else { + this.storage = undefined; + } + + this.load(); + } public get length(): number { return this.items.length; @@ -77,11 +96,13 @@ export class MemLocalHistory implements ILocalHistory { this.items = _.uniqBy(combinedItems, "messageId"); // Let's drop older messages if max length is reached - if (this.length > this.maxLength) { - const numItemsToRemove = this.length - this.maxLength; + if (this.length > this.maxSize) { + const numItemsToRemove = this.length - this.maxSize; this.items.splice(0, numItemsToRemove); } + this.save(); + return this.items.length; } @@ -129,4 +150,19 @@ export class MemLocalHistory implements ILocalHistory { ); } } + + private save(): void { + this.storage?.save(this.items); + } + + private load(): void { + if (!this.storage) { + return; + } + + const messages = this.storage.load(); + if (messages.length > 0) { + this.items = messages; + } + } } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 38b1881dad..f66330c478 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -5,6 +5,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { MessageChannelEvent } from "./events.js"; import { MemLocalHistory } from "./mem_local_history.js"; +import { ILocalHistory } from "./mem_local_history.js"; import { ContentMessage, HistoryEntry, @@ -14,7 +15,6 @@ import { } from "./message.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - ILocalHistory, MessageChannel } from "./message_channel.js"; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index e55ac045d4..30f744a10c 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,6 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; +import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, @@ -20,7 +21,6 @@ import { ParticipantId, SyncMessage } from "./message.js"; -import { PersistentHistory } from "./persistent_history.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -63,11 +63,6 @@ export interface MessageChannelOptions { repairConfig?: RepairConfig; } -export type ILocalHistory = Pick< - Array, - "some" | "push" | "slice" | "find" | "length" | "findIndex" ->; - export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: ParticipantId; @@ -118,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter { this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = - localHistory ?? new PersistentHistory({ channelId: this.channelId }); + localHistory ?? new MemLocalHistory({ storage: channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_history.spec.ts b/packages/sds/src/message_channel/persistent_history.spec.ts deleted file mode 100644 index 7ee1c7914f..0000000000 --- a/packages/sds/src/message_channel/persistent_history.spec.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { expect } from "chai"; - -import { ContentMessage } from "./message.js"; -import { HistoryStorage, PersistentHistory } from "./persistent_history.js"; - -class MemoryStorage implements HistoryStorage { - private readonly store = new Map(); - - public getItem(key: string): string | null { - return this.store.get(key) ?? null; - } - - public setItem(key: string, value: string): void { - this.store.set(key, value); - } - - public removeItem(key: string): void { - this.store.delete(key); - } -} - -const channelId = "channel-1"; - -const createMessage = (id: string, timestamp: number): ContentMessage => { - return new ContentMessage( - id, - channelId, - "sender", - [], - BigInt(timestamp), - undefined, - new Uint8Array([timestamp]), - undefined - ); -}; - -describe("PersistentHistory", () => { - it("persists and restores messages", () => { - const storage = new MemoryStorage(); - const history = new PersistentHistory({ channelId, storage }); - - history.push(createMessage("msg-1", 1)); - history.push(createMessage("msg-2", 2)); - - const restored = new PersistentHistory({ channelId, storage }); - - expect(restored.length).to.equal(2); - expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([ - "msg-1", - "msg-2" - ]); - }); - - it("behaves like memory history when storage is unavailable", () => { - const history = new PersistentHistory({ channelId, storage: undefined }); - - history.push(createMessage("msg-3", 3)); - - expect(history.length).to.equal(1); - expect(history.slice(0)[0].messageId).to.equal("msg-3"); - }); - - it("handles corrupt data in storage gracefully", () => { - const storage = new MemoryStorage(); - storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); - - const history = new PersistentHistory({ channelId, storage }); - expect(history.length).to.equal(0); - - // Local history should be empty - expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); - }); - - it("isolates history by channel ID", () => { - const storage = new MemoryStorage(); - - const history1 = new PersistentHistory({ - channelId: "channel-1", - storage - }); - const history2 = new PersistentHistory({ - channelId: "channel-2", - storage - }); - - history1.push(createMessage("msg-1", 1)); - history2.push(createMessage("msg-2", 2)); - - // Each channel should only see its own messages - expect(history1.length).to.equal(1); - expect(history1.slice(0)[0].messageId).to.equal("msg-1"); - - expect(history2.length).to.equal(1); - expect(history2.slice(0)[0].messageId).to.equal("msg-2"); - - expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; - expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; - }); -}); diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts deleted file mode 100644 index d575e1e3e7..0000000000 --- a/packages/sds/src/message_channel/persistent_history.ts +++ /dev/null @@ -1,208 +0,0 @@ -import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; -import { Logger } from "@waku/utils"; - -import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; -import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; - -const log = new Logger("sds:persistent-history"); - -export interface HistoryStorage { - getItem(key: string): string | null; - setItem(key: string, value: string): void; - removeItem(key: string): void; -} - -export interface PersistentHistoryOptions { - channelId: ChannelId; - storage?: HistoryStorage; -} - -type StoredHistoryEntry = { - messageId: string; - retrievalHint?: string; -}; - -type StoredContentMessage = { - messageId: string; - channelId: string; - senderId: string; - lamportTimestamp: string; - causalHistory: StoredHistoryEntry[]; - bloomFilter?: string; - content: string; - retrievalHint?: string; -}; - -const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; -const DEFAULT_HISTORY_STORAGE: HistoryStorage | undefined = - typeof localStorage !== "undefined" ? localStorage : undefined; - -/** - * Persists the SDS local history in a browser/localStorage compatible backend. - * - * If no storage backend is available, this behaves like {@link MemLocalHistory}. - */ -export class PersistentHistory implements ILocalHistory { - private readonly storage?: HistoryStorage; - private readonly storageKey: string; - private readonly memory: MemLocalHistory; - - public constructor(options: PersistentHistoryOptions) { - this.memory = new MemLocalHistory(); - this.storage = options.storage ?? DEFAULT_HISTORY_STORAGE; - this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`; - - if (!this.storage) { - log.warn( - "Storage backend unavailable (localStorage not found). Falling back to in-memory history. Messages will not persist across sessions." - ); - } - - this.load(); - } - - public get length(): number { - return this.memory.length; - } - - public push(...items: ContentMessage[]): number { - const length = this.memory.push(...items); - this.save(); - return length; - } - - public some( - predicate: ( - value: ContentMessage, - index: number, - array: ContentMessage[] - ) => unknown, - thisArg?: any - ): boolean { - return this.memory.some(predicate, thisArg); - } - - public slice(start?: number, end?: number): ContentMessage[] { - return this.memory.slice(start, end); - } - - public find( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): ContentMessage | undefined { - return this.memory.find(predicate, thisArg); - } - - public findIndex( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): number { - return this.memory.findIndex(predicate, thisArg); - } - - private save(): void { - if (!this.storage) { - return; - } - - const payload = JSON.stringify( - this.memory.slice(0).map(serializeContentMessage) - ); - this.storage.setItem(this.storageKey, payload); - } - - private load(): void { - if (!this.storage) { - return; - } - - try { - const raw = this.storage.getItem(this.storageKey); - if (!raw) { - return; - } - - const stored = JSON.parse(raw) as StoredContentMessage[]; - const messages = stored - .map(deserializeContentMessage) - .filter((message): message is ContentMessage => Boolean(message)); - if (messages.length) { - this.memory.push(...messages); - } - } catch { - this.storage.removeItem(this.storageKey); - } - } -} - -const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({ - messageId: entry.messageId, - retrievalHint: entry.retrievalHint - ? bytesToHex(entry.retrievalHint) - : undefined -}); - -const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({ - messageId: entry.messageId, - retrievalHint: entry.retrievalHint - ? hexToBytes(entry.retrievalHint) - : undefined -}); - -const serializeContentMessage = ( - message: ContentMessage -): StoredContentMessage => ({ - messageId: message.messageId, - channelId: message.channelId, - senderId: message.senderId, - lamportTimestamp: message.lamportTimestamp.toString(), - causalHistory: message.causalHistory.map(serializeHistoryEntry), - bloomFilter: toHex(message.bloomFilter), - content: bytesToHex(new Uint8Array(message.content)), - retrievalHint: toHex(message.retrievalHint) -}); - -const deserializeContentMessage = ( - record: StoredContentMessage -): ContentMessage | undefined => { - try { - const content = hexToBytes(record.content); - return new ContentMessage( - record.messageId, - record.channelId, - record.senderId, - record.causalHistory.map(deserializeHistoryEntry), - BigInt(record.lamportTimestamp), - fromHex(record.bloomFilter), - content, - [], - fromHex(record.retrievalHint) - ); - } catch { - return undefined; - } -}; - -const toHex = ( - data?: Uint8Array | Uint8Array -): string | undefined => { - if (!data || data.length === 0) { - return undefined; - } - return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); -}; - -const fromHex = (value?: string): Uint8Array | undefined => { - if (!value) { - return undefined; - } - return hexToBytes(value); -}; diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts new file mode 100644 index 0000000000..ec7547dab0 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -0,0 +1,198 @@ +import { expect } from "chai"; + +import { MemLocalHistory } from "./mem_local_history.js"; +import { ContentMessage } from "./message.js"; +import { HistoryStorage, PersistentStorage } from "./persistent_storage.js"; + +const channelId = "channel-1"; + +describe("PersistentStorage", () => { + describe("Explicit storage", () => { + it("persists and restores messages", () => { + const storage = new MemoryStorage(); + const persistentStorage = PersistentStorage.create(channelId, storage); + + expect(persistentStorage).to.not.be.undefined; + + const history1 = new MemLocalHistory({ storage: persistentStorage }); + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + + const history2 = new MemLocalHistory({ storage: persistentStorage }); + + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + }); + + it("uses in-memory only when no storage is provided", () => { + const history = new MemLocalHistory({ maxSize: 100 }); + history.push(createMessage("msg-3", 3)); + + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); + + const history2 = new MemLocalHistory({ maxSize: 100 }); + expect(history2.length).to.equal(0); + }); + + it("handles corrupt data in storage gracefully", () => { + const storage = new MemoryStorage(); + // Corrupt data + storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); + + const persistentStorage = PersistentStorage.create(channelId, storage); + const history = new MemLocalHistory({ storage: persistentStorage }); + + expect(history.length).to.equal(0); + + // Corrupt data is not saved + expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); + }); + + it("isolates history by channel ID", () => { + const storage = new MemoryStorage(); + + const storage1 = PersistentStorage.create("channel-1", storage); + const storage2 = PersistentStorage.create("channel-2", storage); + + const history1 = new MemLocalHistory({ storage: storage1 }); + const history2 = new MemLocalHistory({ storage: storage2 }); + + history1.push(createMessage("msg-1", 1)); + history2.push(createMessage("msg-2", 2)); + + expect(history1.length).to.equal(1); + expect(history1.slice(0)[0].messageId).to.equal("msg-1"); + + expect(history2.length).to.equal(1); + expect(history2.slice(0)[0].messageId).to.equal("msg-2"); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; + }); + + it("saves messages after each push", () => { + const storage = new MemoryStorage(); + const persistentStorage = PersistentStorage.create(channelId, storage); + const history = new MemLocalHistory({ storage: persistentStorage }); + + expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; + + history.push(createMessage("msg-1", 1)); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + + const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!); + expect(saved).to.have.lengthOf(1); + expect(saved[0].messageId).to.equal("msg-1"); + }); + + it("loads messages on initialization", () => { + const storage = new MemoryStorage(); + const persistentStorage1 = PersistentStorage.create(channelId, storage); + const history1 = new MemLocalHistory({ storage: persistentStorage1 }); + + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + history1.push(createMessage("msg-3", 3)); + + const persistentStorage2 = PersistentStorage.create(channelId, storage); + const history2 = new MemLocalHistory({ storage: persistentStorage2 }); + + expect(history2.length).to.equal(3); + expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + "msg-1", + "msg-2", + "msg-3" + ]); + }); + }); + + describe("Node.js only (no localStorage)", () => { + before(function () { + if (typeof localStorage !== "undefined") { + this.skip(); + } + }); + + it("returns undefined when no storage is available", () => { + const persistentStorage = PersistentStorage.create(channelId, undefined); + + expect(persistentStorage).to.equal(undefined); + }); + }); + + describe("Browser only (localStorage)", () => { + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + it("persists and restores messages with channelId", () => { + const testChannelId = `test-${Date.now()}`; + const history1 = new MemLocalHistory({ storage: testChannelId }); + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + + const history2 = new MemLocalHistory({ storage: testChannelId }); + + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + + localStorage.removeItem(`waku:sds:history:${testChannelId}`); + }); + + it("auto-uses localStorage when channelId is provided", () => { + const testChannelId = `auto-storage-${Date.now()}`; + + const history = new MemLocalHistory({ storage: testChannelId }); + history.push(createMessage("msg-auto-1", 1)); + history.push(createMessage("msg-auto-2", 2)); + + const history2 = new MemLocalHistory({ storage: testChannelId }); + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + "msg-auto-1", + "msg-auto-2" + ]); + + localStorage.removeItem(`waku:sds:history:${testChannelId}`); + }); + }); +}); + +const createMessage = (id: string, timestamp: number): ContentMessage => { + return new ContentMessage( + id, + channelId, + "sender", + [], + BigInt(timestamp), + undefined, + new Uint8Array([timestamp]), + undefined + ); +}; + +class MemoryStorage implements HistoryStorage { + private readonly store = new Map(); + + public getItem(key: string): string | null { + return this.store.get(key) ?? null; + } + + public setItem(key: string, value: string): void { + this.store.set(key, value); + } + + public removeItem(key: string): void { + this.store.delete(key); + } +} diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/persistent_storage.ts new file mode 100644 index 0000000000..ffc0f14674 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.ts @@ -0,0 +1,173 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; +import { Logger } from "@waku/utils"; + +import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; + +const log = new Logger("sds:persistent-storage"); + +const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; + +export interface HistoryStorage { + getItem(key: string): string | null; + setItem(key: string, value: string): void; + removeItem(key: string): void; +} + +type StoredHistoryEntry = { + messageId: string; + retrievalHint?: string; +}; + +type StoredContentMessage = { + messageId: string; + channelId: string; + senderId: string; + lamportTimestamp: string; + causalHistory: StoredHistoryEntry[]; + bloomFilter?: string; + content: string; + retrievalHint?: string; +}; + +/** + * Persistent storage for message history. + */ +export class PersistentStorage { + private readonly storageKey: string; + + /** + * Creates a PersistentStorage for a channel, or returns undefined if no storage is available. + * If no storage is provided, attempts to use global localStorage (if available). + * Returns undefined if no storage is available. + */ + public static create( + channelId: ChannelId, + storage?: HistoryStorage + ): PersistentStorage | undefined { + storage = + storage ?? + (typeof localStorage !== "undefined" ? localStorage : undefined); + if (!storage) { + log.info( + `No storage available. Messages will not persist across sessions. + If you're using NodeJS, you can provide a storage backend using the storage parameter.` + ); + return undefined; + } + return new PersistentStorage(channelId, storage); + } + + private constructor( + channelId: ChannelId, + private readonly storage: HistoryStorage + ) { + this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`; + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) + ); + this.storage.setItem(this.storageKey, payload); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = this.storage.getItem(this.storageKey); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error) { + log.error("Failed to load messages from storage:", error); + this.storage.removeItem(this.storageKey); + return []; + } + } +} + +class MessageSerializer { + public static serializeContentMessage( + message: ContentMessage + ): StoredContentMessage { + return { + messageId: message.messageId, + channelId: message.channelId, + senderId: message.senderId, + lamportTimestamp: message.lamportTimestamp.toString(), + causalHistory: message.causalHistory.map((entry) => + MessageSerializer.serializeHistoryEntry(entry) + ), + bloomFilter: MessageSerializer.toHex(message.bloomFilter), + content: bytesToHex(new Uint8Array(message.content)), + retrievalHint: MessageSerializer.toHex(message.retrievalHint) + }; + } + + public static deserializeContentMessage( + record: StoredContentMessage + ): ContentMessage | undefined { + try { + const content = hexToBytes(record.content); + return new ContentMessage( + record.messageId, + record.channelId, + record.senderId, + record.causalHistory.map((entry) => + MessageSerializer.deserializeHistoryEntry(entry) + ), + BigInt(record.lamportTimestamp), + MessageSerializer.fromHex(record.bloomFilter), + content, + [], + MessageSerializer.fromHex(record.retrievalHint) + ); + } catch { + return undefined; + } + } + + public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? bytesToHex(entry.retrievalHint) + : undefined + }; + } + + public static deserializeHistoryEntry( + entry: StoredHistoryEntry + ): HistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? hexToBytes(entry.retrievalHint) + : undefined + }; + } + + private static toHex( + data?: Uint8Array | Uint8Array + ): string | undefined { + if (!data || data.length === 0) { + return undefined; + } + return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); + } + + private static fromHex(value?: string): Uint8Array | undefined { + if (!value) { + return undefined; + } + return hexToBytes(value); + } +} diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..5da41d224c 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import type { ILocalHistory } from "../mem_local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { From e54dacddd792f2442413cca6a87e5a07c3b6decc Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 21:14:18 -0500 Subject: [PATCH 13/22] chore: remove unused exports --- packages/sds/src/message_channel/index.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts index 5aad874bed..7a8e279c2a 100644 --- a/packages/sds/src/message_channel/index.ts +++ b/packages/sds/src/message_channel/index.ts @@ -14,8 +14,3 @@ export { isEphemeralMessage, isSyncMessage } from "./message.js"; -export { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; -export { - PersistentStorage, - type HistoryStorage -} from "./persistent_storage.js"; From a3202b58dd804144e3ac448cc2de257978c772af Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 21:20:01 -0500 Subject: [PATCH 14/22] chore: interface abstractions --- packages/sds/src/message_channel/mem_local_history.ts | 9 ++++++--- .../sds/src/message_channel/message_channel.spec.ts | 10 +++------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 8f715a89e0..4042ee69c4 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -48,6 +48,11 @@ export interface ILocalHistory { ): number; } +export interface MemLocalHistoryOptions { + storage?: ChannelId | PersistentStorage; + maxSize?: number; +} + export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; private readonly storage?: PersistentStorage; @@ -60,9 +65,7 @@ export class MemLocalHistory implements ILocalHistory { * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ - public constructor( - opts: { storage?: ChannelId | PersistentStorage; maxSize?: number } = {} - ) { + public constructor(opts: MemLocalHistoryOptions = {}) { const { storage, maxSize } = opts; this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; if (storage instanceof PersistentStorage) { diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index f66330c478..ed66f10abf 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -15,7 +15,8 @@ import { } from "./message.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - MessageChannel + MessageChannel, + MessageChannelOptions } from "./message_channel.js"; const channelId = "test-channel"; @@ -30,12 +31,7 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { const createTestChannel = ( channelId: string, senderId: string, - options: { - causalHistorySize?: number; - possibleAcksThreshold?: number; - timeoutForLostMessagesMs?: number; - enableRepair?: boolean; - } = {} + options: MessageChannelOptions = {} ): MessageChannel => { return new MessageChannel( channelId, From 47573224895f302ed6359f09b4fe272eb88d901f Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 21:25:53 -0500 Subject: [PATCH 15/22] chore: update interface to type as we're not implementing it --- packages/sds/src/message_channel/mem_local_history.ts | 6 +++--- packages/sds/src/message_channel/message_channel.ts | 2 ++ packages/sds/src/message_channel/repair/repair.ts | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 4042ee69c4..11fd57199b 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,6 +1,6 @@ import _ from "lodash"; -import { ChannelId, ContentMessage, isContentMessage } from "./message.js"; +import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -48,10 +48,10 @@ export interface ILocalHistory { ): number; } -export interface MemLocalHistoryOptions { +export type MemLocalHistoryOptions = { storage?: ChannelId | PersistentStorage; maxSize?: number; -} +}; export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 30f744a10c..8b23809627 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -23,6 +23,8 @@ import { } from "./message.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; +export type { ILocalHistory }; + export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, errorRate: 0.001 diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 5da41d224c..1a8a85f43e 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; -import type { ILocalHistory } from "../mem_local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; +import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { From ff8b4e5fdb3cbd80a1ea49bcc8a850246032a537 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 21:27:24 -0500 Subject: [PATCH 16/22] fix: MemLocalHistory uses optional chaining --- packages/sds/src/message_channel/mem_local_history.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 11fd57199b..78bc91fd7d 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -159,11 +159,7 @@ export class MemLocalHistory implements ILocalHistory { } private load(): void { - if (!this.storage) { - return; - } - - const messages = this.storage.load(); + const messages = this.storage?.load() ?? []; if (messages.length > 0) { this.items = messages; } From 2ee8b08975970396bb2cc4a0545fc06042de0777 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 22:40:12 -0500 Subject: [PATCH 17/22] chore: add logging to LocalHistory for storage init --- packages/sds/src/message_channel/mem_local_history.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 78bc91fd7d..109ffd8ad3 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,3 +1,4 @@ +import { Logger } from "@waku/utils"; import _ from "lodash"; import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; @@ -53,6 +54,8 @@ export type MemLocalHistoryOptions = { maxSize?: number; }; +const log = new Logger("sds:local-history"); + export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; private readonly storage?: PersistentStorage; @@ -70,10 +73,13 @@ export class MemLocalHistory implements ILocalHistory { this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; if (storage instanceof PersistentStorage) { this.storage = storage; + log.info("Using explicit persistent storage"); } else if (typeof storage === "string") { this.storage = PersistentStorage.create(storage); + log.info("Creating persistent storage for channel", storage); } else { this.storage = undefined; + log.info("Using in-memory storage"); } this.load(); From 41f8fdf8cf596f2a3d1ea84d8eeb21b77b920e4b Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 9 Dec 2025 16:59:52 -0500 Subject: [PATCH 18/22] chore: address comments, reduce diff --- ..._history.spec.ts => local_history.spec.ts} | 8 ++--- ...{mem_local_history.ts => local_history.ts} | 36 +++---------------- .../message_channel/message_channel.spec.ts | 18 ++++------ .../src/message_channel/message_channel.ts | 11 +++--- .../persistent_storage.spec.ts | 30 ++++++++-------- 5 files changed, 36 insertions(+), 67 deletions(-) rename packages/sds/src/message_channel/{mem_local_history.spec.ts => local_history.spec.ts} (87%) rename packages/sds/src/message_channel/{mem_local_history.ts => local_history.ts} (83%) diff --git a/packages/sds/src/message_channel/mem_local_history.spec.ts b/packages/sds/src/message_channel/local_history.spec.ts similarity index 87% rename from packages/sds/src/message_channel/mem_local_history.spec.ts rename to packages/sds/src/message_channel/local_history.spec.ts index d57e46493a..b22634b7ce 100644 --- a/packages/sds/src/message_channel/mem_local_history.spec.ts +++ b/packages/sds/src/message_channel/local_history.spec.ts @@ -1,13 +1,13 @@ import { expect } from "chai"; -import { MemLocalHistory } from "./mem_local_history.js"; +import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; -describe("MemLocalHistory", () => { +describe("LocalHistory", () => { it("Cap max size when messages are pushed one at a time", () => { const maxSize = 2; - const hist = new MemLocalHistory({ maxSize: maxSize }); + const hist = new LocalHistory({ maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) @@ -31,7 +31,7 @@ describe("MemLocalHistory", () => { it("Cap max size when a pushed array is exceeding the cap", () => { const maxSize = 2; - const hist = new MemLocalHistory({ maxSize: maxSize }); + const hist = new LocalHistory({ maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/local_history.ts similarity index 83% rename from packages/sds/src/message_channel/mem_local_history.ts rename to packages/sds/src/message_channel/local_history.ts index 109ffd8ad3..86d6502afa 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -2,6 +2,7 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; +import { ILocalHistory } from "./message_channel.js"; import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -19,44 +20,15 @@ export const DEFAULT_MAX_LENGTH = 10_000; * If an array of items longer than `maxLength` is pushed, dropping will happen * at next push. */ -export interface ILocalHistory { - length: number; - push(...items: ContentMessage[]): number; - some( - predicate: ( - value: ContentMessage, - index: number, - array: ContentMessage[] - ) => unknown, - thisArg?: any - ): boolean; - slice(start?: number, end?: number): ContentMessage[]; - find( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): ContentMessage | undefined; - findIndex( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): number; -} -export type MemLocalHistoryOptions = { +export type LocalHistoryOptions = { storage?: ChannelId | PersistentStorage; maxSize?: number; }; const log = new Logger("sds:local-history"); -export class MemLocalHistory implements ILocalHistory { +export class LocalHistory implements ILocalHistory { private items: ContentMessage[] = []; private readonly storage?: PersistentStorage; private readonly maxSize: number; @@ -68,7 +40,7 @@ export class MemLocalHistory implements ILocalHistory { * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ - public constructor(opts: MemLocalHistoryOptions = {}) { + public constructor(opts: LocalHistoryOptions = {}) { const { storage, maxSize } = opts; this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; if (storage instanceof PersistentStorage) { diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index ed66f10abf..0c29d02979 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -4,8 +4,7 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { MessageChannelEvent } from "./events.js"; -import { MemLocalHistory } from "./mem_local_history.js"; -import { ILocalHistory } from "./mem_local_history.js"; +import { LocalHistory } from "./local_history.js"; import { ContentMessage, HistoryEntry, @@ -25,7 +24,7 @@ const callback = (_message: Message): Promise<{ success: boolean }> => { }; /** - * Test helper to create a MessageChannel with MemLocalHistory. + * Test helper to create a MessageChannel with LocalHistory. * This avoids localStorage pollution in tests and tests core functionality. */ const createTestChannel = ( @@ -33,12 +32,7 @@ const createTestChannel = ( senderId: string, options: MessageChannelOptions = {} ): MessageChannel => { - return new MessageChannel( - channelId, - senderId, - options, - new MemLocalHistory() - ); + return new MessageChannel(channelId, senderId, options, new LocalHistory()); }; const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { @@ -117,11 +111,11 @@ describe("MessageChannel", function () { const expectedTimestamp = channelA["lamportTimestamp"] + 1n; const messageId = MessageChannel.getMessageId(payload); await sendMessage(channelA, payload, callback); - const messageIdLog = channelA["localHistory"] as ILocalHistory; + const messageIdLog = channelA["localHistory"] as LocalHistory; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( - (log) => + (log: ContentMessage) => log.lamportTimestamp === expectedTimestamp && log.messageId === messageId ) @@ -138,7 +132,7 @@ describe("MessageChannel", function () { return { success: true, retrievalHint: testRetrievalHint }; }); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(1); // Find the message in local history diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 8b23809627..6fda3a3803 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,7 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; -import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; +import { LocalHistory } from "./local_history.js"; import { ChannelId, ContentMessage, @@ -23,8 +23,6 @@ import { } from "./message.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; -export type { ILocalHistory }; - export const DEFAULT_BLOOM_FILTER_OPTIONS = { capacity: 10000, errorRate: 0.001 @@ -40,6 +38,11 @@ const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; const log = new Logger("sds:message-channel"); +export type ILocalHistory = Pick< + Array, + "some" | "push" | "slice" | "find" | "length" | "findIndex" +>; + export interface MessageChannelOptions { causalHistorySize?: number; /** @@ -115,7 +118,7 @@ export class MessageChannel extends TypedEventEmitter { this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = - localHistory ?? new MemLocalHistory({ storage: channelId }); + localHistory ?? new LocalHistory({ storage: channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index ec7547dab0..928de1c5ff 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -1,6 +1,6 @@ import { expect } from "chai"; -import { MemLocalHistory } from "./mem_local_history.js"; +import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; import { HistoryStorage, PersistentStorage } from "./persistent_storage.js"; @@ -14,11 +14,11 @@ describe("PersistentStorage", () => { expect(persistentStorage).to.not.be.undefined; - const history1 = new MemLocalHistory({ storage: persistentStorage }); + const history1 = new LocalHistory({ storage: persistentStorage }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); - const history2 = new MemLocalHistory({ storage: persistentStorage }); + const history2 = new LocalHistory({ storage: persistentStorage }); expect(history2.length).to.equal(2); expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ @@ -28,13 +28,13 @@ describe("PersistentStorage", () => { }); it("uses in-memory only when no storage is provided", () => { - const history = new MemLocalHistory({ maxSize: 100 }); + const history = new LocalHistory({ maxSize: 100 }); history.push(createMessage("msg-3", 3)); expect(history.length).to.equal(1); expect(history.slice(0)[0].messageId).to.equal("msg-3"); - const history2 = new MemLocalHistory({ maxSize: 100 }); + const history2 = new LocalHistory({ maxSize: 100 }); expect(history2.length).to.equal(0); }); @@ -44,7 +44,7 @@ describe("PersistentStorage", () => { storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); const persistentStorage = PersistentStorage.create(channelId, storage); - const history = new MemLocalHistory({ storage: persistentStorage }); + const history = new LocalHistory({ storage: persistentStorage }); expect(history.length).to.equal(0); @@ -58,8 +58,8 @@ describe("PersistentStorage", () => { const storage1 = PersistentStorage.create("channel-1", storage); const storage2 = PersistentStorage.create("channel-2", storage); - const history1 = new MemLocalHistory({ storage: storage1 }); - const history2 = new MemLocalHistory({ storage: storage2 }); + const history1 = new LocalHistory({ storage: storage1 }); + const history2 = new LocalHistory({ storage: storage2 }); history1.push(createMessage("msg-1", 1)); history2.push(createMessage("msg-2", 2)); @@ -77,7 +77,7 @@ describe("PersistentStorage", () => { it("saves messages after each push", () => { const storage = new MemoryStorage(); const persistentStorage = PersistentStorage.create(channelId, storage); - const history = new MemLocalHistory({ storage: persistentStorage }); + const history = new LocalHistory({ storage: persistentStorage }); expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; @@ -93,14 +93,14 @@ describe("PersistentStorage", () => { it("loads messages on initialization", () => { const storage = new MemoryStorage(); const persistentStorage1 = PersistentStorage.create(channelId, storage); - const history1 = new MemLocalHistory({ storage: persistentStorage1 }); + const history1 = new LocalHistory({ storage: persistentStorage1 }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); history1.push(createMessage("msg-3", 3)); const persistentStorage2 = PersistentStorage.create(channelId, storage); - const history2 = new MemLocalHistory({ storage: persistentStorage2 }); + const history2 = new LocalHistory({ storage: persistentStorage2 }); expect(history2.length).to.equal(3); expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ @@ -134,11 +134,11 @@ describe("PersistentStorage", () => { it("persists and restores messages with channelId", () => { const testChannelId = `test-${Date.now()}`; - const history1 = new MemLocalHistory({ storage: testChannelId }); + const history1 = new LocalHistory({ storage: testChannelId }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); - const history2 = new MemLocalHistory({ storage: testChannelId }); + const history2 = new LocalHistory({ storage: testChannelId }); expect(history2.length).to.equal(2); expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ @@ -152,11 +152,11 @@ describe("PersistentStorage", () => { it("auto-uses localStorage when channelId is provided", () => { const testChannelId = `auto-storage-${Date.now()}`; - const history = new MemLocalHistory({ storage: testChannelId }); + const history = new LocalHistory({ storage: testChannelId }); history.push(createMessage("msg-auto-1", 1)); history.push(createMessage("msg-auto-2", 2)); - const history2 = new MemLocalHistory({ storage: testChannelId }); + const history2 = new LocalHistory({ storage: testChannelId }); expect(history2.length).to.equal(2); expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ "msg-auto-1", From 721c494567a49c43360932570ada222b8f1a21ff Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Tue, 9 Dec 2025 17:14:14 -0500 Subject: [PATCH 19/22] chore: update word history to storage --- .../sds/src/message_channel/local_history.ts | 3 +-- .../message_channel/message_channel.spec.ts | 4 +-- .../src/message_channel/message_channel.ts | 9 ++----- .../persistent_storage.spec.ts | 22 ++++++++-------- .../src/message_channel/persistent_storage.ts | 26 +++++++++---------- .../sds/src/message_channel/repair/repair.ts | 6 ++--- 6 files changed, 31 insertions(+), 39 deletions(-) diff --git a/packages/sds/src/message_channel/local_history.ts b/packages/sds/src/message_channel/local_history.ts index 86d6502afa..ed4107a2c1 100644 --- a/packages/sds/src/message_channel/local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -2,7 +2,6 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; -import { ILocalHistory } from "./message_channel.js"; import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -28,7 +27,7 @@ export type LocalHistoryOptions = { const log = new Logger("sds:local-history"); -export class LocalHistory implements ILocalHistory { +export class LocalHistory { private items: ContentMessage[] = []; private readonly storage?: PersistentStorage; private readonly maxSize: number; diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 0c29d02979..e7e37582ad 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -318,7 +318,7 @@ describe("MessageChannel", function () { testRetrievalHint ); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(1); // Find the message in local history @@ -440,7 +440,7 @@ describe("MessageChannel", function () { ) ); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(2); // When timestamps are equal, should be ordered by messageId lexicographically diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 6fda3a3803..07dfff26f6 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -38,11 +38,6 @@ const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; const log = new Logger("sds:message-channel"); -export type ILocalHistory = Pick< - Array, - "some" | "push" | "slice" | "find" | "length" | "findIndex" ->; - export interface MessageChannelOptions { causalHistorySize?: number; /** @@ -76,7 +71,7 @@ export class MessageChannel extends TypedEventEmitter { private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; private incomingBuffer: Array; - private readonly localHistory: ILocalHistory; + private readonly localHistory: LocalHistory; private timeReceived: Map; private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; @@ -106,7 +101,7 @@ export class MessageChannel extends TypedEventEmitter { channelId: ChannelId, senderId: ParticipantId, options: MessageChannelOptions = {}, - localHistory?: ILocalHistory + localHistory?: LocalHistory ) { super(); this.channelId = channelId; diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index 928de1c5ff..d6b3f43e67 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -2,7 +2,7 @@ import { expect } from "chai"; import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; -import { HistoryStorage, PersistentStorage } from "./persistent_storage.js"; +import { IStorage, PersistentStorage } from "./persistent_storage.js"; const channelId = "channel-1"; @@ -41,7 +41,7 @@ describe("PersistentStorage", () => { it("handles corrupt data in storage gracefully", () => { const storage = new MemoryStorage(); // Corrupt data - storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); + storage.setItem("waku:sds:messages:channel-1", "{ invalid json }"); const persistentStorage = PersistentStorage.create(channelId, storage); const history = new LocalHistory({ storage: persistentStorage }); @@ -49,7 +49,7 @@ describe("PersistentStorage", () => { expect(history.length).to.equal(0); // Corrupt data is not saved - expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); + expect(storage.getItem("waku:sds:messages:channel-1")).to.equal(null); }); it("isolates history by channel ID", () => { @@ -70,8 +70,8 @@ describe("PersistentStorage", () => { expect(history2.length).to.equal(1); expect(history2.slice(0)[0].messageId).to.equal("msg-2"); - expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; - expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; + expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:messages:channel-2")).to.not.be.null; }); it("saves messages after each push", () => { @@ -79,13 +79,13 @@ describe("PersistentStorage", () => { const persistentStorage = PersistentStorage.create(channelId, storage); const history = new LocalHistory({ storage: persistentStorage }); - expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; + expect(storage.getItem("waku:sds:messages:channel-1")).to.be.null; history.push(createMessage("msg-1", 1)); - expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; - const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!); + const saved = JSON.parse(storage.getItem("waku:sds:messages:channel-1")!); expect(saved).to.have.lengthOf(1); expect(saved[0].messageId).to.equal("msg-1"); }); @@ -146,7 +146,7 @@ describe("PersistentStorage", () => { "msg-2" ]); - localStorage.removeItem(`waku:sds:history:${testChannelId}`); + localStorage.removeItem(`waku:sds:messages:${testChannelId}`); }); it("auto-uses localStorage when channelId is provided", () => { @@ -163,7 +163,7 @@ describe("PersistentStorage", () => { "msg-auto-2" ]); - localStorage.removeItem(`waku:sds:history:${testChannelId}`); + localStorage.removeItem(`waku:sds:messages:${testChannelId}`); }); }); }); @@ -181,7 +181,7 @@ const createMessage = (id: string, timestamp: number): ContentMessage => { ); }; -class MemoryStorage implements HistoryStorage { +class MemoryStorage implements IStorage { private readonly store = new Map(); public getItem(key: string): string | null { diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/persistent_storage.ts index ffc0f14674..36965f0630 100644 --- a/packages/sds/src/message_channel/persistent_storage.ts +++ b/packages/sds/src/message_channel/persistent_storage.ts @@ -5,15 +5,15 @@ import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; const log = new Logger("sds:persistent-storage"); -const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; +const STORAGE_PREFIX = "waku:sds:storage:"; -export interface HistoryStorage { +export interface IStorage { getItem(key: string): string | null; setItem(key: string, value: string): void; removeItem(key: string): void; } -type StoredHistoryEntry = { +type StoredCausalEntry = { messageId: string; retrievalHint?: string; }; @@ -23,14 +23,14 @@ type StoredContentMessage = { channelId: string; senderId: string; lamportTimestamp: string; - causalHistory: StoredHistoryEntry[]; + causalHistory: StoredCausalEntry[]; bloomFilter?: string; content: string; retrievalHint?: string; }; /** - * Persistent storage for message history. + * Persistent storage for messages. */ export class PersistentStorage { private readonly storageKey: string; @@ -42,7 +42,7 @@ export class PersistentStorage { */ public static create( channelId: ChannelId, - storage?: HistoryStorage + storage?: IStorage ): PersistentStorage | undefined { storage = storage ?? @@ -59,9 +59,9 @@ export class PersistentStorage { private constructor( channelId: ChannelId, - private readonly storage: HistoryStorage + private readonly storage: IStorage ) { - this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`; + this.storageKey = `${STORAGE_PREFIX}${channelId}`; } public save(messages: ContentMessage[]): void { @@ -104,7 +104,7 @@ class MessageSerializer { senderId: message.senderId, lamportTimestamp: message.lamportTimestamp.toString(), causalHistory: message.causalHistory.map((entry) => - MessageSerializer.serializeHistoryEntry(entry) + MessageSerializer.serializeCausalEntry(entry) ), bloomFilter: MessageSerializer.toHex(message.bloomFilter), content: bytesToHex(new Uint8Array(message.content)), @@ -122,7 +122,7 @@ class MessageSerializer { record.channelId, record.senderId, record.causalHistory.map((entry) => - MessageSerializer.deserializeHistoryEntry(entry) + MessageSerializer.deserializeCausalEntry(entry) ), BigInt(record.lamportTimestamp), MessageSerializer.fromHex(record.bloomFilter), @@ -135,7 +135,7 @@ class MessageSerializer { } } - public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry { + public static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry { return { messageId: entry.messageId, retrievalHint: entry.retrievalHint @@ -144,9 +144,7 @@ class MessageSerializer { }; } - public static deserializeHistoryEntry( - entry: StoredHistoryEntry - ): HistoryEntry { + public static deserializeCausalEntry(entry: StoredCausalEntry): HistoryEntry { return { messageId: entry.messageId, retrievalHint: entry.retrievalHint diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..108dabfbdd 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import { LocalHistory } from "../local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { @@ -183,7 +183,7 @@ export class RepairManager { */ public processIncomingRepairRequests( requests: HistoryEntry[], - localHistory: ILocalHistory, + localHistory: LocalHistory, currentTime = Date.now() ): void { for (const request of requests) { @@ -248,7 +248,7 @@ export class RepairManager { * Returns messages that should be rebroadcast */ public sweepIncomingBuffer( - localHistory: ILocalHistory, + localHistory: LocalHistory, currentTime = Date.now() ): Message[] { const ready = this.incomingBuffer.getReady(currentTime); From 8a749c453a7e53c2639e218f20474d1041cbacd7 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 10 Dec 2025 15:13:10 -0500 Subject: [PATCH 20/22] refactor: implement browser and Node.js storage solutions for message persistence, updating LocalHistory to utilize a unified Storage interface and enhancing tests for localStorage functionality --- .gitignore | 2 + packages/sds/karma.conf.cjs | 17 +- packages/sds/package.json | 3 + .../sds/src/message_channel/local_history.ts | 24 +-- .../src/message_channel/message_channel.ts | 2 +- .../persistent_storage.spec.ts | 157 +++++------------- .../src/message_channel/storage/browser.ts | 52 ++++++ .../sds/src/message_channel/storage/index.ts | 2 + .../message_serializer.ts} | 84 +--------- .../sds/src/message_channel/storage/node.ts | 62 +++++++ 10 files changed, 193 insertions(+), 212 deletions(-) create mode 100644 packages/sds/src/message_channel/storage/browser.ts create mode 100644 packages/sds/src/message_channel/storage/index.ts rename packages/sds/src/message_channel/{persistent_storage.ts => storage/message_serializer.ts} (50%) create mode 100644 packages/sds/src/message_channel/storage/node.ts diff --git a/.gitignore b/.gitignore index 188172b1d3..d655a890db 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ CLAUDE.md .env postgres-data/ packages/rln/waku-rlnv2-contract/ +/packages/**/allure-results +/packages/**/allure-results diff --git a/packages/sds/karma.conf.cjs b/packages/sds/karma.conf.cjs index 1acbc3dd2a..cd18c96f02 100644 --- a/packages/sds/karma.conf.cjs +++ b/packages/sds/karma.conf.cjs @@ -1,3 +1,16 @@ -const config = require("../../karma.conf.cjs"); +import path from "path"; -module.exports = config; +import baseConfig from "../../karma.conf.cjs"; + +export default function (config) { + baseConfig(config); + + const storageDir = path.resolve(__dirname, "src/message_channel/storage"); + + // Swap node storage for browser storage in webpack builds + config.webpack.resolve.alias = { + ...config.webpack.resolve.alias, + [path.join(storageDir, "node.ts")]: path.join(storageDir, "browser.ts"), + [path.join(storageDir, "node.js")]: path.join(storageDir, "browser.ts") + }; +} diff --git a/packages/sds/package.json b/packages/sds/package.json index 4c1155930e..e871849722 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -4,6 +4,9 @@ "description": "Scalable Data Sync implementation for the browser. Based on https://github.com/vacp2p/rfc-index/blob/main/vac/raw/sds.md", "types": "./dist/index.d.ts", "module": "./dist/index.js", + "browser": { + "./dist/message_channel/storage/index.js": "./dist/message_channel/storage/browser.js" + }, "exports": { ".": { "types": "./dist/index.d.ts", diff --git a/packages/sds/src/message_channel/local_history.ts b/packages/sds/src/message_channel/local_history.ts index ed4107a2c1..6a099ee8e9 100644 --- a/packages/sds/src/message_channel/local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; -import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; -import { PersistentStorage } from "./persistent_storage.js"; +import { ContentMessage, isContentMessage } from "./message.js"; +import { Storage } from "./storage/index.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -21,7 +21,8 @@ export const DEFAULT_MAX_LENGTH = 10_000; */ export type LocalHistoryOptions = { - storage?: ChannelId | PersistentStorage; + storagePrefix?: string; + storage?: Storage; maxSize?: number; }; @@ -29,25 +30,26 @@ const log = new Logger("sds:local-history"); export class LocalHistory { private items: ContentMessage[] = []; - private readonly storage?: PersistentStorage; + private readonly storage?: Storage; private readonly maxSize: number; /** * Construct a new in-memory local history. * * @param opts Configuration object. - * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. + * - storagePrefix: Optional prefix for persistent storage (creates Storage if provided). + * - storage: Optional explicit Storage instance. * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ public constructor(opts: LocalHistoryOptions = {}) { - const { storage, maxSize } = opts; + const { storagePrefix, storage, maxSize } = opts; this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; - if (storage instanceof PersistentStorage) { + if (storage) { this.storage = storage; - log.info("Using explicit persistent storage"); - } else if (typeof storage === "string") { - this.storage = PersistentStorage.create(storage); - log.info("Creating persistent storage for channel", storage); + log.info("Using explicit storage"); + } else if (storagePrefix) { + this.storage = new Storage(storagePrefix); + log.info("Creating storage for prefix", storagePrefix); } else { this.storage = undefined; log.info("Using in-memory storage"); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 07dfff26f6..fe460123e7 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -113,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter { this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = - localHistory ?? new LocalHistory({ storage: channelId }); + localHistory ?? new LocalHistory({ storagePrefix: channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index d6b3f43e67..b575ea7b5f 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -2,23 +2,27 @@ import { expect } from "chai"; import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; -import { IStorage, PersistentStorage } from "./persistent_storage.js"; const channelId = "channel-1"; -describe("PersistentStorage", () => { - describe("Explicit storage", () => { +describe("Storage", () => { + describe("Browser localStorage", () => { + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + afterEach(() => { + localStorage.removeItem(`waku:sds:storage:${channelId}`); + }); + it("persists and restores messages", () => { - const storage = new MemoryStorage(); - const persistentStorage = PersistentStorage.create(channelId, storage); - - expect(persistentStorage).to.not.be.undefined; - - const history1 = new LocalHistory({ storage: persistentStorage }); + const history1 = new LocalHistory({ storagePrefix: channelId }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); - const history2 = new LocalHistory({ storage: persistentStorage }); + const history2 = new LocalHistory({ storagePrefix: channelId }); expect(history2.length).to.equal(2); expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ @@ -27,39 +31,18 @@ describe("PersistentStorage", () => { ]); }); - it("uses in-memory only when no storage is provided", () => { - const history = new LocalHistory({ maxSize: 100 }); - history.push(createMessage("msg-3", 3)); - - expect(history.length).to.equal(1); - expect(history.slice(0)[0].messageId).to.equal("msg-3"); - - const history2 = new LocalHistory({ maxSize: 100 }); - expect(history2.length).to.equal(0); - }); - - it("handles corrupt data in storage gracefully", () => { - const storage = new MemoryStorage(); - // Corrupt data - storage.setItem("waku:sds:messages:channel-1", "{ invalid json }"); - - const persistentStorage = PersistentStorage.create(channelId, storage); - const history = new LocalHistory({ storage: persistentStorage }); + it("handles corrupt data gracefully", () => { + localStorage.setItem(`waku:sds:storage:${channelId}`, "{ invalid json }"); + const history = new LocalHistory({ storagePrefix: channelId }); expect(history.length).to.equal(0); - - // Corrupt data is not saved - expect(storage.getItem("waku:sds:messages:channel-1")).to.equal(null); + // Corrupt data is removed + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; }); it("isolates history by channel ID", () => { - const storage = new MemoryStorage(); - - const storage1 = PersistentStorage.create("channel-1", storage); - const storage2 = PersistentStorage.create("channel-2", storage); - - const history1 = new LocalHistory({ storage: storage1 }); - const history2 = new LocalHistory({ storage: storage2 }); + const history1 = new LocalHistory({ storagePrefix: "channel-1" }); + const history2 = new LocalHistory({ storagePrefix: "channel-2" }); history1.push(createMessage("msg-1", 1)); history2.push(createMessage("msg-2", 2)); @@ -70,37 +53,34 @@ describe("PersistentStorage", () => { expect(history2.length).to.equal(1); expect(history2.slice(0)[0].messageId).to.equal("msg-2"); - expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; - expect(storage.getItem("waku:sds:messages:channel-2")).to.not.be.null; + localStorage.removeItem("waku:sds:storage:channel-2"); }); it("saves messages after each push", () => { - const storage = new MemoryStorage(); - const persistentStorage = PersistentStorage.create(channelId, storage); - const history = new LocalHistory({ storage: persistentStorage }); + const history = new LocalHistory({ storagePrefix: channelId }); - expect(storage.getItem("waku:sds:messages:channel-1")).to.be.null; + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; history.push(createMessage("msg-1", 1)); - expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.not.be + .null; - const saved = JSON.parse(storage.getItem("waku:sds:messages:channel-1")!); + const saved = JSON.parse( + localStorage.getItem(`waku:sds:storage:${channelId}`)! + ); expect(saved).to.have.lengthOf(1); expect(saved[0].messageId).to.equal("msg-1"); }); it("loads messages on initialization", () => { - const storage = new MemoryStorage(); - const persistentStorage1 = PersistentStorage.create(channelId, storage); - const history1 = new LocalHistory({ storage: persistentStorage1 }); + const history1 = new LocalHistory({ storagePrefix: channelId }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); history1.push(createMessage("msg-3", 3)); - const persistentStorage2 = PersistentStorage.create(channelId, storage); - const history2 = new LocalHistory({ storage: persistentStorage2 }); + const history2 = new LocalHistory({ storagePrefix: channelId }); expect(history2.length).to.equal(3); expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ @@ -111,59 +91,16 @@ describe("PersistentStorage", () => { }); }); - describe("Node.js only (no localStorage)", () => { - before(function () { - if (typeof localStorage !== "undefined") { - this.skip(); - } - }); + describe("In-memory fallback", () => { + it("uses in-memory only when no storage is provided", () => { + const history = new LocalHistory({ maxSize: 100 }); + history.push(createMessage("msg-3", 3)); - it("returns undefined when no storage is available", () => { - const persistentStorage = PersistentStorage.create(channelId, undefined); + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); - expect(persistentStorage).to.equal(undefined); - }); - }); - - describe("Browser only (localStorage)", () => { - before(function () { - if (typeof localStorage === "undefined") { - this.skip(); - } - }); - - it("persists and restores messages with channelId", () => { - const testChannelId = `test-${Date.now()}`; - const history1 = new LocalHistory({ storage: testChannelId }); - history1.push(createMessage("msg-1", 1)); - history1.push(createMessage("msg-2", 2)); - - const history2 = new LocalHistory({ storage: testChannelId }); - - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ - "msg-1", - "msg-2" - ]); - - localStorage.removeItem(`waku:sds:messages:${testChannelId}`); - }); - - it("auto-uses localStorage when channelId is provided", () => { - const testChannelId = `auto-storage-${Date.now()}`; - - const history = new LocalHistory({ storage: testChannelId }); - history.push(createMessage("msg-auto-1", 1)); - history.push(createMessage("msg-auto-2", 2)); - - const history2 = new LocalHistory({ storage: testChannelId }); - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ - "msg-auto-1", - "msg-auto-2" - ]); - - localStorage.removeItem(`waku:sds:messages:${testChannelId}`); + const history2 = new LocalHistory({ maxSize: 100 }); + expect(history2.length).to.equal(0); }); }); }); @@ -180,19 +117,3 @@ const createMessage = (id: string, timestamp: number): ContentMessage => { undefined ); }; - -class MemoryStorage implements IStorage { - private readonly store = new Map(); - - public getItem(key: string): string | null { - return this.store.get(key) ?? null; - } - - public setItem(key: string, value: string): void { - this.store.set(key, value); - } - - public removeItem(key: string): void { - this.store.delete(key); - } -} diff --git a/packages/sds/src/message_channel/storage/browser.ts b/packages/sds/src/message_channel/storage/browser.ts new file mode 100644 index 0000000000..aa570bf915 --- /dev/null +++ b/packages/sds/src/message_channel/storage/browser.ts @@ -0,0 +1,52 @@ +import { Logger } from "@waku/utils"; + +import { ContentMessage } from "../message.js"; + +import { + MessageSerializer, + StoredContentMessage +} from "./message_serializer.js"; + +const log = new Logger("sds:storage"); + +const STORAGE_PREFIX = "waku:sds:storage:"; + +/** + * Browser localStorage wrapper for message persistence. + */ +export class Storage { + private readonly storageKey: string; + + public constructor(storagePrefix: string) { + this.storageKey = `${STORAGE_PREFIX}${storagePrefix}`; + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) + ); + localStorage.setItem(this.storageKey, payload); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = localStorage.getItem(this.storageKey); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error) { + log.error("Failed to load messages from storage:", error); + localStorage.removeItem(this.storageKey); + return []; + } + } +} diff --git a/packages/sds/src/message_channel/storage/index.ts b/packages/sds/src/message_channel/storage/index.ts new file mode 100644 index 0000000000..365a399450 --- /dev/null +++ b/packages/sds/src/message_channel/storage/index.ts @@ -0,0 +1,2 @@ +// Node.js implementation - swapped to browser.js via package.json browser field +export { Storage } from "./node.js"; diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/storage/message_serializer.ts similarity index 50% rename from packages/sds/src/message_channel/persistent_storage.ts rename to packages/sds/src/message_channel/storage/message_serializer.ts index 36965f0630..034d688d36 100644 --- a/packages/sds/src/message_channel/persistent_storage.ts +++ b/packages/sds/src/message_channel/storage/message_serializer.ts @@ -1,24 +1,13 @@ import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; -import { Logger } from "@waku/utils"; -import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; +import { ContentMessage, HistoryEntry } from "../message.js"; -const log = new Logger("sds:persistent-storage"); - -const STORAGE_PREFIX = "waku:sds:storage:"; - -export interface IStorage { - getItem(key: string): string | null; - setItem(key: string, value: string): void; - removeItem(key: string): void; -} - -type StoredCausalEntry = { +export type StoredCausalEntry = { messageId: string; retrievalHint?: string; }; -type StoredContentMessage = { +export type StoredContentMessage = { messageId: string; channelId: string; senderId: string; @@ -29,72 +18,7 @@ type StoredContentMessage = { retrievalHint?: string; }; -/** - * Persistent storage for messages. - */ -export class PersistentStorage { - private readonly storageKey: string; - - /** - * Creates a PersistentStorage for a channel, or returns undefined if no storage is available. - * If no storage is provided, attempts to use global localStorage (if available). - * Returns undefined if no storage is available. - */ - public static create( - channelId: ChannelId, - storage?: IStorage - ): PersistentStorage | undefined { - storage = - storage ?? - (typeof localStorage !== "undefined" ? localStorage : undefined); - if (!storage) { - log.info( - `No storage available. Messages will not persist across sessions. - If you're using NodeJS, you can provide a storage backend using the storage parameter.` - ); - return undefined; - } - return new PersistentStorage(channelId, storage); - } - - private constructor( - channelId: ChannelId, - private readonly storage: IStorage - ) { - this.storageKey = `${STORAGE_PREFIX}${channelId}`; - } - - public save(messages: ContentMessage[]): void { - try { - const payload = JSON.stringify( - messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) - ); - this.storage.setItem(this.storageKey, payload); - } catch (error) { - log.error("Failed to save messages to storage:", error); - } - } - - public load(): ContentMessage[] { - try { - const raw = this.storage.getItem(this.storageKey); - if (!raw) { - return []; - } - - const stored = JSON.parse(raw) as StoredContentMessage[]; - return stored - .map((record) => MessageSerializer.deserializeContentMessage(record)) - .filter((message): message is ContentMessage => Boolean(message)); - } catch (error) { - log.error("Failed to load messages from storage:", error); - this.storage.removeItem(this.storageKey); - return []; - } - } -} - -class MessageSerializer { +export class MessageSerializer { public static serializeContentMessage( message: ContentMessage ): StoredContentMessage { diff --git a/packages/sds/src/message_channel/storage/node.ts b/packages/sds/src/message_channel/storage/node.ts new file mode 100644 index 0000000000..dfd2f0bc03 --- /dev/null +++ b/packages/sds/src/message_channel/storage/node.ts @@ -0,0 +1,62 @@ +import { mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname, join } from "node:path"; + +import { Logger } from "@waku/utils"; + +import { ContentMessage } from "../message.js"; + +import { + MessageSerializer, + StoredContentMessage +} from "./message_serializer.js"; + +const log = new Logger("sds:storage"); + +/** + * Node.js file-based storage for message persistence. + */ +export class Storage { + private readonly filePath: string; + + public constructor(storagePrefix: string, basePath: string = ".waku") { + this.filePath = join(basePath, `${storagePrefix}.json`); + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)), + null, + 2 + ); + mkdirSync(dirname(this.filePath), { recursive: true }); + writeFileSync(this.filePath, payload, "utf-8"); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = readFileSync(this.filePath, "utf-8"); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error: unknown) { + if ( + error && + typeof error === "object" && + "code" in error && + error.code !== "ENOENT" + ) { + log.error("Failed to load messages from storage:", error); + } + return []; + } + } +} From e396c3b755669579c51adea6eda1c306f9d6b502 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 10 Dec 2025 15:17:18 -0500 Subject: [PATCH 21/22] refactor: cleaner API for storage in LocalHistoryOptions --- .../sds/src/message_channel/local_history.ts | 21 +++++++++++-------- .../storage/message_serializer.ts | 6 ++++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/packages/sds/src/message_channel/local_history.ts b/packages/sds/src/message_channel/local_history.ts index 6a099ee8e9..a8d332a7ff 100644 --- a/packages/sds/src/message_channel/local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -21,8 +21,10 @@ export const DEFAULT_MAX_LENGTH = 10_000; */ export type LocalHistoryOptions = { - storagePrefix?: string; - storage?: Storage; + storage?: { + prefix?: string; + customInstance?: Storage; + }; maxSize?: number; }; @@ -42,14 +44,15 @@ export class LocalHistory { * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ public constructor(opts: LocalHistoryOptions = {}) { - const { storagePrefix, storage, maxSize } = opts; + const { storage, maxSize } = opts; + const { prefix, customInstance } = storage ?? {}; this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; - if (storage) { - this.storage = storage; - log.info("Using explicit storage"); - } else if (storagePrefix) { - this.storage = new Storage(storagePrefix); - log.info("Creating storage for prefix", storagePrefix); + if (customInstance) { + this.storage = customInstance; + log.info("Using custom storage instance", { customInstance }); + } else if (prefix) { + this.storage = new Storage(prefix); + log.info("Creating storage with prefix", { prefix }); } else { this.storage = undefined; log.info("Using in-memory storage"); diff --git a/packages/sds/src/message_channel/storage/message_serializer.ts b/packages/sds/src/message_channel/storage/message_serializer.ts index 034d688d36..49bf4c8f90 100644 --- a/packages/sds/src/message_channel/storage/message_serializer.ts +++ b/packages/sds/src/message_channel/storage/message_serializer.ts @@ -59,7 +59,7 @@ export class MessageSerializer { } } - public static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry { + private static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry { return { messageId: entry.messageId, retrievalHint: entry.retrievalHint @@ -68,7 +68,9 @@ export class MessageSerializer { }; } - public static deserializeCausalEntry(entry: StoredCausalEntry): HistoryEntry { + private static deserializeCausalEntry( + entry: StoredCausalEntry + ): HistoryEntry { return { messageId: entry.messageId, retrievalHint: entry.retrievalHint From ab6d3cf503f89cfea52b4c95d5d101ef2dd37261 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 10 Dec 2025 15:38:16 -0500 Subject: [PATCH 22/22] minor fixes --- .../sds/src/message_channel/local_history.ts | 36 +++++++++---------- .../src/message_channel/message_channel.ts | 2 +- .../persistent_storage.spec.ts | 16 ++++----- .../src/message_channel/storage/browser.ts | 4 +-- 4 files changed, 28 insertions(+), 30 deletions(-) diff --git a/packages/sds/src/message_channel/local_history.ts b/packages/sds/src/message_channel/local_history.ts index a8d332a7ff..971b52fe74 100644 --- a/packages/sds/src/message_channel/local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -6,6 +6,23 @@ import { Storage } from "./storage/index.js"; export const DEFAULT_MAX_LENGTH = 10_000; +/** + * Options for the LocalHistory constructor. + * @param storage - The storage to use for the local history. + * - prefix - The prefix for the storage. + * - customInstance - The custom storage instance to use. + * @param maxSize - The maximum number of messages to store. + */ +export type LocalHistoryOptions = { + storage?: { + prefix?: string; + customInstance?: Storage; + }; + maxSize?: number; +}; + +const log = new Logger("sds:local-history"); + /** * In-Memory implementation of a local history of messages. * @@ -19,30 +36,11 @@ export const DEFAULT_MAX_LENGTH = 10_000; * If an array of items longer than `maxLength` is pushed, dropping will happen * at next push. */ - -export type LocalHistoryOptions = { - storage?: { - prefix?: string; - customInstance?: Storage; - }; - maxSize?: number; -}; - -const log = new Logger("sds:local-history"); - export class LocalHistory { private items: ContentMessage[] = []; private readonly storage?: Storage; private readonly maxSize: number; - /** - * Construct a new in-memory local history. - * - * @param opts Configuration object. - * - storagePrefix: Optional prefix for persistent storage (creates Storage if provided). - * - storage: Optional explicit Storage instance. - * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. - */ public constructor(opts: LocalHistoryOptions = {}) { const { storage, maxSize } = opts; const { prefix, customInstance } = storage ?? {}; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index fe460123e7..6091b010dc 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -113,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter { this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = - localHistory ?? new LocalHistory({ storagePrefix: channelId }); + localHistory ?? new LocalHistory({ storage: { prefix: channelId } }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index b575ea7b5f..6109e188f7 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -18,11 +18,11 @@ describe("Storage", () => { }); it("persists and restores messages", () => { - const history1 = new LocalHistory({ storagePrefix: channelId }); + const history1 = new LocalHistory({ storage: { prefix: channelId } }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); - const history2 = new LocalHistory({ storagePrefix: channelId }); + const history2 = new LocalHistory({ storage: { prefix: channelId } }); expect(history2.length).to.equal(2); expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ @@ -34,15 +34,15 @@ describe("Storage", () => { it("handles corrupt data gracefully", () => { localStorage.setItem(`waku:sds:storage:${channelId}`, "{ invalid json }"); - const history = new LocalHistory({ storagePrefix: channelId }); + const history = new LocalHistory({ storage: { prefix: channelId } }); expect(history.length).to.equal(0); // Corrupt data is removed expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; }); it("isolates history by channel ID", () => { - const history1 = new LocalHistory({ storagePrefix: "channel-1" }); - const history2 = new LocalHistory({ storagePrefix: "channel-2" }); + const history1 = new LocalHistory({ storage: { prefix: "channel-1" } }); + const history2 = new LocalHistory({ storage: { prefix: "channel-2" } }); history1.push(createMessage("msg-1", 1)); history2.push(createMessage("msg-2", 2)); @@ -57,7 +57,7 @@ describe("Storage", () => { }); it("saves messages after each push", () => { - const history = new LocalHistory({ storagePrefix: channelId }); + const history = new LocalHistory({ storage: { prefix: channelId } }); expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; @@ -74,13 +74,13 @@ describe("Storage", () => { }); it("loads messages on initialization", () => { - const history1 = new LocalHistory({ storagePrefix: channelId }); + const history1 = new LocalHistory({ storage: { prefix: channelId } }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); history1.push(createMessage("msg-3", 3)); - const history2 = new LocalHistory({ storagePrefix: channelId }); + const history2 = new LocalHistory({ storage: { prefix: channelId } }); expect(history2.length).to.equal(3); expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ diff --git a/packages/sds/src/message_channel/storage/browser.ts b/packages/sds/src/message_channel/storage/browser.ts index aa570bf915..ae0d2a5531 100644 --- a/packages/sds/src/message_channel/storage/browser.ts +++ b/packages/sds/src/message_channel/storage/browser.ts @@ -9,7 +9,7 @@ import { const log = new Logger("sds:storage"); -const STORAGE_PREFIX = "waku:sds:storage:"; +const STORAGE_NAMESPACE = "waku:sds:storage:"; /** * Browser localStorage wrapper for message persistence. @@ -18,7 +18,7 @@ export class Storage { private readonly storageKey: string; public constructor(storagePrefix: string) { - this.storageKey = `${STORAGE_PREFIX}${storagePrefix}`; + this.storageKey = `${STORAGE_NAMESPACE}${storagePrefix}`; } public save(messages: ContentMessage[]): void {