From 503c348b79701c2be95a580466781bc3d67659d6 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 26 Nov 2025 21:12:59 -0500 Subject: [PATCH] feat: Introduce `PersistentStorage` for message history management and refactor `MemLocalHistory` to support optional persistent storage. --- packages/sds/src/message_channel/index.ts | 5 + .../message_channel/mem_local_history.spec.ts | 4 +- .../src/message_channel/mem_local_history.ts | 48 +++- .../message_channel/message_channel.spec.ts | 2 +- .../src/message_channel/message_channel.ts | 9 +- .../persistent_history.spec.ts | 99 --------- .../src/message_channel/persistent_history.ts | 208 ------------------ .../persistent_storage.spec.ts | 198 +++++++++++++++++ .../src/message_channel/persistent_storage.ts | 173 +++++++++++++++ .../sds/src/message_channel/repair/repair.ts | 2 +- 10 files changed, 424 insertions(+), 324 deletions(-) delete mode 100644 packages/sds/src/message_channel/persistent_history.spec.ts delete mode 100644 packages/sds/src/message_channel/persistent_history.ts create mode 100644 packages/sds/src/message_channel/persistent_storage.spec.ts create mode 100644 packages/sds/src/message_channel/persistent_storage.ts diff --git a/packages/sds/src/message_channel/index.ts b/packages/sds/src/message_channel/index.ts index 7a8e279c2a..5aad874bed 100644 --- a/packages/sds/src/message_channel/index.ts +++ b/packages/sds/src/message_channel/index.ts @@ -14,3 +14,8 @@ export { isEphemeralMessage, isSyncMessage } from "./message.js"; +export { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; +export { + PersistentStorage, + type HistoryStorage +} from "./persistent_storage.js"; diff --git a/packages/sds/src/message_channel/mem_local_history.spec.ts b/packages/sds/src/message_channel/mem_local_history.spec.ts index 7fd1f567a6..d57e46493a 100644 --- a/packages/sds/src/message_channel/mem_local_history.spec.ts +++ b/packages/sds/src/message_channel/mem_local_history.spec.ts @@ -7,7 +7,7 @@ describe("MemLocalHistory", () => { it("Cap max size when messages are pushed one at a time", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new MemLocalHistory({ maxSize: maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) @@ -31,7 +31,7 @@ describe("MemLocalHistory", () => { it("Cap max size when a pushed array is exceeding the cap", () => { const maxSize = 2; - const hist = new MemLocalHistory(maxSize); + const hist = new MemLocalHistory({ maxSize: maxSize }); hist.push( new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1])) diff --git a/packages/sds/src/message_channel/mem_local_history.ts b/packages/sds/src/message_channel/mem_local_history.ts index 67e0b73ae3..8f715a89e0 100644 --- a/packages/sds/src/message_channel/mem_local_history.ts +++ b/packages/sds/src/message_channel/mem_local_history.ts @@ -1,6 +1,7 @@ import _ from "lodash"; -import { ContentMessage, isContentMessage } from "./message.js"; +import { ChannelId, ContentMessage, isContentMessage } from "./message.js"; +import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -49,13 +50,31 @@ export interface ILocalHistory { export class MemLocalHistory implements ILocalHistory { private items: ContentMessage[] = []; + private readonly storage?: PersistentStorage; + private readonly maxSize: number; /** - * Construct a new in-memory local history + * Construct a new in-memory local history. * - * @param maxLength The maximum number of message to store. + * @param opts Configuration object. + * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. + * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ - public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {} + public constructor( + opts: { storage?: ChannelId | PersistentStorage; maxSize?: number } = {} + ) { + const { storage, maxSize } = opts; + this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; + if (storage instanceof PersistentStorage) { + this.storage = storage; + } else if (typeof storage === "string") { + this.storage = PersistentStorage.create(storage); + } else { + this.storage = undefined; + } + + this.load(); + } public get length(): number { return this.items.length; @@ -77,11 +96,13 @@ export class MemLocalHistory implements ILocalHistory { this.items = _.uniqBy(combinedItems, "messageId"); // Let's drop older messages if max length is reached - if (this.length > this.maxLength) { - const numItemsToRemove = this.length - this.maxLength; + if (this.length > this.maxSize) { + const numItemsToRemove = this.length - this.maxSize; this.items.splice(0, numItemsToRemove); } + this.save(); + return this.items.length; } @@ -129,4 +150,19 @@ export class MemLocalHistory implements ILocalHistory { ); } } + + private save(): void { + this.storage?.save(this.items); + } + + private load(): void { + if (!this.storage) { + return; + } + + const messages = this.storage.load(); + if (messages.length > 0) { + this.items = messages; + } + } } diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 38b1881dad..f66330c478 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -5,6 +5,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { MessageChannelEvent } from "./events.js"; import { MemLocalHistory } from "./mem_local_history.js"; +import { ILocalHistory } from "./mem_local_history.js"; import { ContentMessage, HistoryEntry, @@ -14,7 +15,6 @@ import { } from "./message.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, - ILocalHistory, MessageChannel } from "./message_channel.js"; diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index e55ac045d4..30f744a10c 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -7,6 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js"; import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js"; import { MessageChannelEvent, MessageChannelEvents } from "./events.js"; +import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; import { ChannelId, ContentMessage, @@ -20,7 +21,6 @@ import { ParticipantId, SyncMessage } from "./message.js"; -import { PersistentHistory } from "./persistent_history.js"; import { RepairConfig, RepairManager } from "./repair/repair.js"; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -63,11 +63,6 @@ export interface MessageChannelOptions { repairConfig?: RepairConfig; } -export type ILocalHistory = Pick< - Array, - "some" | "push" | "slice" | "find" | "length" | "findIndex" ->; - export class MessageChannel extends TypedEventEmitter { public readonly channelId: ChannelId; public readonly senderId: ParticipantId; @@ -118,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter { this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = - localHistory ?? new PersistentHistory({ channelId: this.channelId }); + localHistory ?? new MemLocalHistory({ storage: channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_history.spec.ts b/packages/sds/src/message_channel/persistent_history.spec.ts deleted file mode 100644 index 7ee1c7914f..0000000000 --- a/packages/sds/src/message_channel/persistent_history.spec.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { expect } from "chai"; - -import { ContentMessage } from "./message.js"; -import { HistoryStorage, PersistentHistory } from "./persistent_history.js"; - -class MemoryStorage implements HistoryStorage { - private readonly store = new Map(); - - public getItem(key: string): string | null { - return this.store.get(key) ?? null; - } - - public setItem(key: string, value: string): void { - this.store.set(key, value); - } - - public removeItem(key: string): void { - this.store.delete(key); - } -} - -const channelId = "channel-1"; - -const createMessage = (id: string, timestamp: number): ContentMessage => { - return new ContentMessage( - id, - channelId, - "sender", - [], - BigInt(timestamp), - undefined, - new Uint8Array([timestamp]), - undefined - ); -}; - -describe("PersistentHistory", () => { - it("persists and restores messages", () => { - const storage = new MemoryStorage(); - const history = new PersistentHistory({ channelId, storage }); - - history.push(createMessage("msg-1", 1)); - history.push(createMessage("msg-2", 2)); - - const restored = new PersistentHistory({ channelId, storage }); - - expect(restored.length).to.equal(2); - expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([ - "msg-1", - "msg-2" - ]); - }); - - it("behaves like memory history when storage is unavailable", () => { - const history = new PersistentHistory({ channelId, storage: undefined }); - - history.push(createMessage("msg-3", 3)); - - expect(history.length).to.equal(1); - expect(history.slice(0)[0].messageId).to.equal("msg-3"); - }); - - it("handles corrupt data in storage gracefully", () => { - const storage = new MemoryStorage(); - storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); - - const history = new PersistentHistory({ channelId, storage }); - expect(history.length).to.equal(0); - - // Local history should be empty - expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); - }); - - it("isolates history by channel ID", () => { - const storage = new MemoryStorage(); - - const history1 = new PersistentHistory({ - channelId: "channel-1", - storage - }); - const history2 = new PersistentHistory({ - channelId: "channel-2", - storage - }); - - history1.push(createMessage("msg-1", 1)); - history2.push(createMessage("msg-2", 2)); - - // Each channel should only see its own messages - expect(history1.length).to.equal(1); - expect(history1.slice(0)[0].messageId).to.equal("msg-1"); - - expect(history2.length).to.equal(1); - expect(history2.slice(0)[0].messageId).to.equal("msg-2"); - - expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; - expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; - }); -}); diff --git a/packages/sds/src/message_channel/persistent_history.ts b/packages/sds/src/message_channel/persistent_history.ts deleted file mode 100644 index d575e1e3e7..0000000000 --- a/packages/sds/src/message_channel/persistent_history.ts +++ /dev/null @@ -1,208 +0,0 @@ -import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; -import { Logger } from "@waku/utils"; - -import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js"; -import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; - -const log = new Logger("sds:persistent-history"); - -export interface HistoryStorage { - getItem(key: string): string | null; - setItem(key: string, value: string): void; - removeItem(key: string): void; -} - -export interface PersistentHistoryOptions { - channelId: ChannelId; - storage?: HistoryStorage; -} - -type StoredHistoryEntry = { - messageId: string; - retrievalHint?: string; -}; - -type StoredContentMessage = { - messageId: string; - channelId: string; - senderId: string; - lamportTimestamp: string; - causalHistory: StoredHistoryEntry[]; - bloomFilter?: string; - content: string; - retrievalHint?: string; -}; - -const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; -const DEFAULT_HISTORY_STORAGE: HistoryStorage | undefined = - typeof localStorage !== "undefined" ? localStorage : undefined; - -/** - * Persists the SDS local history in a browser/localStorage compatible backend. - * - * If no storage backend is available, this behaves like {@link MemLocalHistory}. - */ -export class PersistentHistory implements ILocalHistory { - private readonly storage?: HistoryStorage; - private readonly storageKey: string; - private readonly memory: MemLocalHistory; - - public constructor(options: PersistentHistoryOptions) { - this.memory = new MemLocalHistory(); - this.storage = options.storage ?? DEFAULT_HISTORY_STORAGE; - this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`; - - if (!this.storage) { - log.warn( - "Storage backend unavailable (localStorage not found). Falling back to in-memory history. Messages will not persist across sessions." - ); - } - - this.load(); - } - - public get length(): number { - return this.memory.length; - } - - public push(...items: ContentMessage[]): number { - const length = this.memory.push(...items); - this.save(); - return length; - } - - public some( - predicate: ( - value: ContentMessage, - index: number, - array: ContentMessage[] - ) => unknown, - thisArg?: any - ): boolean { - return this.memory.some(predicate, thisArg); - } - - public slice(start?: number, end?: number): ContentMessage[] { - return this.memory.slice(start, end); - } - - public find( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): ContentMessage | undefined { - return this.memory.find(predicate, thisArg); - } - - public findIndex( - predicate: ( - value: ContentMessage, - index: number, - obj: ContentMessage[] - ) => unknown, - thisArg?: any - ): number { - return this.memory.findIndex(predicate, thisArg); - } - - private save(): void { - if (!this.storage) { - return; - } - - const payload = JSON.stringify( - this.memory.slice(0).map(serializeContentMessage) - ); - this.storage.setItem(this.storageKey, payload); - } - - private load(): void { - if (!this.storage) { - return; - } - - try { - const raw = this.storage.getItem(this.storageKey); - if (!raw) { - return; - } - - const stored = JSON.parse(raw) as StoredContentMessage[]; - const messages = stored - .map(deserializeContentMessage) - .filter((message): message is ContentMessage => Boolean(message)); - if (messages.length) { - this.memory.push(...messages); - } - } catch { - this.storage.removeItem(this.storageKey); - } - } -} - -const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({ - messageId: entry.messageId, - retrievalHint: entry.retrievalHint - ? bytesToHex(entry.retrievalHint) - : undefined -}); - -const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({ - messageId: entry.messageId, - retrievalHint: entry.retrievalHint - ? hexToBytes(entry.retrievalHint) - : undefined -}); - -const serializeContentMessage = ( - message: ContentMessage -): StoredContentMessage => ({ - messageId: message.messageId, - channelId: message.channelId, - senderId: message.senderId, - lamportTimestamp: message.lamportTimestamp.toString(), - causalHistory: message.causalHistory.map(serializeHistoryEntry), - bloomFilter: toHex(message.bloomFilter), - content: bytesToHex(new Uint8Array(message.content)), - retrievalHint: toHex(message.retrievalHint) -}); - -const deserializeContentMessage = ( - record: StoredContentMessage -): ContentMessage | undefined => { - try { - const content = hexToBytes(record.content); - return new ContentMessage( - record.messageId, - record.channelId, - record.senderId, - record.causalHistory.map(deserializeHistoryEntry), - BigInt(record.lamportTimestamp), - fromHex(record.bloomFilter), - content, - [], - fromHex(record.retrievalHint) - ); - } catch { - return undefined; - } -}; - -const toHex = ( - data?: Uint8Array | Uint8Array -): string | undefined => { - if (!data || data.length === 0) { - return undefined; - } - return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); -}; - -const fromHex = (value?: string): Uint8Array | undefined => { - if (!value) { - return undefined; - } - return hexToBytes(value); -}; diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts new file mode 100644 index 0000000000..ec7547dab0 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -0,0 +1,198 @@ +import { expect } from "chai"; + +import { MemLocalHistory } from "./mem_local_history.js"; +import { ContentMessage } from "./message.js"; +import { HistoryStorage, PersistentStorage } from "./persistent_storage.js"; + +const channelId = "channel-1"; + +describe("PersistentStorage", () => { + describe("Explicit storage", () => { + it("persists and restores messages", () => { + const storage = new MemoryStorage(); + const persistentStorage = PersistentStorage.create(channelId, storage); + + expect(persistentStorage).to.not.be.undefined; + + const history1 = new MemLocalHistory({ storage: persistentStorage }); + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + + const history2 = new MemLocalHistory({ storage: persistentStorage }); + + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + }); + + it("uses in-memory only when no storage is provided", () => { + const history = new MemLocalHistory({ maxSize: 100 }); + history.push(createMessage("msg-3", 3)); + + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); + + const history2 = new MemLocalHistory({ maxSize: 100 }); + expect(history2.length).to.equal(0); + }); + + it("handles corrupt data in storage gracefully", () => { + const storage = new MemoryStorage(); + // Corrupt data + storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); + + const persistentStorage = PersistentStorage.create(channelId, storage); + const history = new MemLocalHistory({ storage: persistentStorage }); + + expect(history.length).to.equal(0); + + // Corrupt data is not saved + expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); + }); + + it("isolates history by channel ID", () => { + const storage = new MemoryStorage(); + + const storage1 = PersistentStorage.create("channel-1", storage); + const storage2 = PersistentStorage.create("channel-2", storage); + + const history1 = new MemLocalHistory({ storage: storage1 }); + const history2 = new MemLocalHistory({ storage: storage2 }); + + history1.push(createMessage("msg-1", 1)); + history2.push(createMessage("msg-2", 2)); + + expect(history1.length).to.equal(1); + expect(history1.slice(0)[0].messageId).to.equal("msg-1"); + + expect(history2.length).to.equal(1); + expect(history2.slice(0)[0].messageId).to.equal("msg-2"); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; + }); + + it("saves messages after each push", () => { + const storage = new MemoryStorage(); + const persistentStorage = PersistentStorage.create(channelId, storage); + const history = new MemLocalHistory({ storage: persistentStorage }); + + expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; + + history.push(createMessage("msg-1", 1)); + + expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + + const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!); + expect(saved).to.have.lengthOf(1); + expect(saved[0].messageId).to.equal("msg-1"); + }); + + it("loads messages on initialization", () => { + const storage = new MemoryStorage(); + const persistentStorage1 = PersistentStorage.create(channelId, storage); + const history1 = new MemLocalHistory({ storage: persistentStorage1 }); + + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + history1.push(createMessage("msg-3", 3)); + + const persistentStorage2 = PersistentStorage.create(channelId, storage); + const history2 = new MemLocalHistory({ storage: persistentStorage2 }); + + expect(history2.length).to.equal(3); + expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + "msg-1", + "msg-2", + "msg-3" + ]); + }); + }); + + describe("Node.js only (no localStorage)", () => { + before(function () { + if (typeof localStorage !== "undefined") { + this.skip(); + } + }); + + it("returns undefined when no storage is available", () => { + const persistentStorage = PersistentStorage.create(channelId, undefined); + + expect(persistentStorage).to.equal(undefined); + }); + }); + + describe("Browser only (localStorage)", () => { + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + it("persists and restores messages with channelId", () => { + const testChannelId = `test-${Date.now()}`; + const history1 = new MemLocalHistory({ storage: testChannelId }); + history1.push(createMessage("msg-1", 1)); + history1.push(createMessage("msg-2", 2)); + + const history2 = new MemLocalHistory({ storage: testChannelId }); + + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ + "msg-1", + "msg-2" + ]); + + localStorage.removeItem(`waku:sds:history:${testChannelId}`); + }); + + it("auto-uses localStorage when channelId is provided", () => { + const testChannelId = `auto-storage-${Date.now()}`; + + const history = new MemLocalHistory({ storage: testChannelId }); + history.push(createMessage("msg-auto-1", 1)); + history.push(createMessage("msg-auto-2", 2)); + + const history2 = new MemLocalHistory({ storage: testChannelId }); + expect(history2.length).to.equal(2); + expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ + "msg-auto-1", + "msg-auto-2" + ]); + + localStorage.removeItem(`waku:sds:history:${testChannelId}`); + }); + }); +}); + +const createMessage = (id: string, timestamp: number): ContentMessage => { + return new ContentMessage( + id, + channelId, + "sender", + [], + BigInt(timestamp), + undefined, + new Uint8Array([timestamp]), + undefined + ); +}; + +class MemoryStorage implements HistoryStorage { + private readonly store = new Map(); + + public getItem(key: string): string | null { + return this.store.get(key) ?? null; + } + + public setItem(key: string, value: string): void { + this.store.set(key, value); + } + + public removeItem(key: string): void { + this.store.delete(key); + } +} diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/persistent_storage.ts new file mode 100644 index 0000000000..ffc0f14674 --- /dev/null +++ b/packages/sds/src/message_channel/persistent_storage.ts @@ -0,0 +1,173 @@ +import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; +import { Logger } from "@waku/utils"; + +import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; + +const log = new Logger("sds:persistent-storage"); + +const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; + +export interface HistoryStorage { + getItem(key: string): string | null; + setItem(key: string, value: string): void; + removeItem(key: string): void; +} + +type StoredHistoryEntry = { + messageId: string; + retrievalHint?: string; +}; + +type StoredContentMessage = { + messageId: string; + channelId: string; + senderId: string; + lamportTimestamp: string; + causalHistory: StoredHistoryEntry[]; + bloomFilter?: string; + content: string; + retrievalHint?: string; +}; + +/** + * Persistent storage for message history. + */ +export class PersistentStorage { + private readonly storageKey: string; + + /** + * Creates a PersistentStorage for a channel, or returns undefined if no storage is available. + * If no storage is provided, attempts to use global localStorage (if available). + * Returns undefined if no storage is available. + */ + public static create( + channelId: ChannelId, + storage?: HistoryStorage + ): PersistentStorage | undefined { + storage = + storage ?? + (typeof localStorage !== "undefined" ? localStorage : undefined); + if (!storage) { + log.info( + `No storage available. Messages will not persist across sessions. + If you're using NodeJS, you can provide a storage backend using the storage parameter.` + ); + return undefined; + } + return new PersistentStorage(channelId, storage); + } + + private constructor( + channelId: ChannelId, + private readonly storage: HistoryStorage + ) { + this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`; + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) + ); + this.storage.setItem(this.storageKey, payload); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = this.storage.getItem(this.storageKey); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error) { + log.error("Failed to load messages from storage:", error); + this.storage.removeItem(this.storageKey); + return []; + } + } +} + +class MessageSerializer { + public static serializeContentMessage( + message: ContentMessage + ): StoredContentMessage { + return { + messageId: message.messageId, + channelId: message.channelId, + senderId: message.senderId, + lamportTimestamp: message.lamportTimestamp.toString(), + causalHistory: message.causalHistory.map((entry) => + MessageSerializer.serializeHistoryEntry(entry) + ), + bloomFilter: MessageSerializer.toHex(message.bloomFilter), + content: bytesToHex(new Uint8Array(message.content)), + retrievalHint: MessageSerializer.toHex(message.retrievalHint) + }; + } + + public static deserializeContentMessage( + record: StoredContentMessage + ): ContentMessage | undefined { + try { + const content = hexToBytes(record.content); + return new ContentMessage( + record.messageId, + record.channelId, + record.senderId, + record.causalHistory.map((entry) => + MessageSerializer.deserializeHistoryEntry(entry) + ), + BigInt(record.lamportTimestamp), + MessageSerializer.fromHex(record.bloomFilter), + content, + [], + MessageSerializer.fromHex(record.retrievalHint) + ); + } catch { + return undefined; + } + } + + public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? bytesToHex(entry.retrievalHint) + : undefined + }; + } + + public static deserializeHistoryEntry( + entry: StoredHistoryEntry + ): HistoryEntry { + return { + messageId: entry.messageId, + retrievalHint: entry.retrievalHint + ? hexToBytes(entry.retrievalHint) + : undefined + }; + } + + private static toHex( + data?: Uint8Array | Uint8Array + ): string | undefined { + if (!data || data.length === 0) { + return undefined; + } + return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data)); + } + + private static fromHex(value?: string): Uint8Array | undefined { + if (!value) { + return undefined; + } + return hexToBytes(value); + } +} diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..5da41d224c 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import type { ILocalHistory } from "../mem_local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import {