From 8a749c453a7e53c2639e218f20474d1041cbacd7 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 10 Dec 2025 15:13:10 -0500 Subject: [PATCH] refactor: implement browser and Node.js storage solutions for message persistence, updating LocalHistory to utilize a unified Storage interface and enhancing tests for localStorage functionality --- .gitignore | 2 + packages/sds/karma.conf.cjs | 17 +- packages/sds/package.json | 3 + .../sds/src/message_channel/local_history.ts | 24 +-- .../src/message_channel/message_channel.ts | 2 +- .../persistent_storage.spec.ts | 157 +++++------------- .../src/message_channel/storage/browser.ts | 52 ++++++ .../sds/src/message_channel/storage/index.ts | 2 + .../message_serializer.ts} | 84 +--------- .../sds/src/message_channel/storage/node.ts | 62 +++++++ 10 files changed, 193 insertions(+), 212 deletions(-) create mode 100644 packages/sds/src/message_channel/storage/browser.ts create mode 100644 packages/sds/src/message_channel/storage/index.ts rename packages/sds/src/message_channel/{persistent_storage.ts => storage/message_serializer.ts} (50%) create mode 100644 packages/sds/src/message_channel/storage/node.ts diff --git a/.gitignore b/.gitignore index 188172b1d3..d655a890db 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,5 @@ CLAUDE.md .env postgres-data/ packages/rln/waku-rlnv2-contract/ +/packages/**/allure-results +/packages/**/allure-results diff --git a/packages/sds/karma.conf.cjs b/packages/sds/karma.conf.cjs index 1acbc3dd2a..cd18c96f02 100644 --- a/packages/sds/karma.conf.cjs +++ b/packages/sds/karma.conf.cjs @@ -1,3 +1,16 @@ -const config = require("../../karma.conf.cjs"); +import path from "path"; -module.exports = config; +import baseConfig from "../../karma.conf.cjs"; + +export default function (config) { + baseConfig(config); + + const storageDir = path.resolve(__dirname, "src/message_channel/storage"); + + // Swap node storage for browser storage in webpack builds + config.webpack.resolve.alias = { + ...config.webpack.resolve.alias, + [path.join(storageDir, "node.ts")]: path.join(storageDir, "browser.ts"), + [path.join(storageDir, "node.js")]: path.join(storageDir, "browser.ts") + }; +} diff --git a/packages/sds/package.json b/packages/sds/package.json index 4c1155930e..e871849722 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -4,6 +4,9 @@ "description": "Scalable Data Sync implementation for the browser. Based on https://github.com/vacp2p/rfc-index/blob/main/vac/raw/sds.md", "types": "./dist/index.d.ts", "module": "./dist/index.js", + "browser": { + "./dist/message_channel/storage/index.js": "./dist/message_channel/storage/browser.js" + }, "exports": { ".": { "types": "./dist/index.d.ts", diff --git a/packages/sds/src/message_channel/local_history.ts b/packages/sds/src/message_channel/local_history.ts index ed4107a2c1..6a099ee8e9 100644 --- a/packages/sds/src/message_channel/local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; -import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; -import { PersistentStorage } from "./persistent_storage.js"; +import { ContentMessage, isContentMessage } from "./message.js"; +import { Storage } from "./storage/index.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -21,7 +21,8 @@ export const DEFAULT_MAX_LENGTH = 10_000; */ export type LocalHistoryOptions = { - storage?: ChannelId | PersistentStorage; + storagePrefix?: string; + storage?: Storage; maxSize?: number; }; @@ -29,25 +30,26 @@ const log = new Logger("sds:local-history"); export class LocalHistory { private items: ContentMessage[] = []; - private readonly storage?: PersistentStorage; + private readonly storage?: Storage; private readonly maxSize: number; /** * Construct a new in-memory local history. * * @param opts Configuration object. - * - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage. + * - storagePrefix: Optional prefix for persistent storage (creates Storage if provided). + * - storage: Optional explicit Storage instance. * - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH. */ public constructor(opts: LocalHistoryOptions = {}) { - const { storage, maxSize } = opts; + const { storagePrefix, storage, maxSize } = opts; this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH; - if (storage instanceof PersistentStorage) { + if (storage) { this.storage = storage; - log.info("Using explicit persistent storage"); - } else if (typeof storage === "string") { - this.storage = PersistentStorage.create(storage); - log.info("Creating persistent storage for channel", storage); + log.info("Using explicit storage"); + } else if (storagePrefix) { + this.storage = new Storage(storagePrefix); + log.info("Creating storage for prefix", storagePrefix); } else { this.storage = undefined; log.info("Using in-memory storage"); diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 07dfff26f6..fe460123e7 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -113,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter { this.possibleAcks = new Map(); this.incomingBuffer = []; this.localHistory = - localHistory ?? new LocalHistory({ storage: channelId }); + localHistory ?? new LocalHistory({ storagePrefix: channelId }); this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; // TODO: this should be determined based on the bloom filter parameters and number of hashes diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index d6b3f43e67..b575ea7b5f 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -2,23 +2,27 @@ import { expect } from "chai"; import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; -import { IStorage, PersistentStorage } from "./persistent_storage.js"; const channelId = "channel-1"; -describe("PersistentStorage", () => { - describe("Explicit storage", () => { +describe("Storage", () => { + describe("Browser localStorage", () => { + before(function () { + if (typeof localStorage === "undefined") { + this.skip(); + } + }); + + afterEach(() => { + localStorage.removeItem(`waku:sds:storage:${channelId}`); + }); + it("persists and restores messages", () => { - const storage = new MemoryStorage(); - const persistentStorage = PersistentStorage.create(channelId, storage); - - expect(persistentStorage).to.not.be.undefined; - - const history1 = new LocalHistory({ storage: persistentStorage }); + const history1 = new LocalHistory({ storagePrefix: channelId }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); - const history2 = new LocalHistory({ storage: persistentStorage }); + const history2 = new LocalHistory({ storagePrefix: channelId }); expect(history2.length).to.equal(2); expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ @@ -27,39 +31,18 @@ describe("PersistentStorage", () => { ]); }); - it("uses in-memory only when no storage is provided", () => { - const history = new LocalHistory({ maxSize: 100 }); - history.push(createMessage("msg-3", 3)); - - expect(history.length).to.equal(1); - expect(history.slice(0)[0].messageId).to.equal("msg-3"); - - const history2 = new LocalHistory({ maxSize: 100 }); - expect(history2.length).to.equal(0); - }); - - it("handles corrupt data in storage gracefully", () => { - const storage = new MemoryStorage(); - // Corrupt data - storage.setItem("waku:sds:messages:channel-1", "{ invalid json }"); - - const persistentStorage = PersistentStorage.create(channelId, storage); - const history = new LocalHistory({ storage: persistentStorage }); + it("handles corrupt data gracefully", () => { + localStorage.setItem(`waku:sds:storage:${channelId}`, "{ invalid json }"); + const history = new LocalHistory({ storagePrefix: channelId }); expect(history.length).to.equal(0); - - // Corrupt data is not saved - expect(storage.getItem("waku:sds:messages:channel-1")).to.equal(null); + // Corrupt data is removed + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; }); it("isolates history by channel ID", () => { - const storage = new MemoryStorage(); - - const storage1 = PersistentStorage.create("channel-1", storage); - const storage2 = PersistentStorage.create("channel-2", storage); - - const history1 = new LocalHistory({ storage: storage1 }); - const history2 = new LocalHistory({ storage: storage2 }); + const history1 = new LocalHistory({ storagePrefix: "channel-1" }); + const history2 = new LocalHistory({ storagePrefix: "channel-2" }); history1.push(createMessage("msg-1", 1)); history2.push(createMessage("msg-2", 2)); @@ -70,37 +53,34 @@ describe("PersistentStorage", () => { expect(history2.length).to.equal(1); expect(history2.slice(0)[0].messageId).to.equal("msg-2"); - expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; - expect(storage.getItem("waku:sds:messages:channel-2")).to.not.be.null; + localStorage.removeItem("waku:sds:storage:channel-2"); }); it("saves messages after each push", () => { - const storage = new MemoryStorage(); - const persistentStorage = PersistentStorage.create(channelId, storage); - const history = new LocalHistory({ storage: persistentStorage }); + const history = new LocalHistory({ storagePrefix: channelId }); - expect(storage.getItem("waku:sds:messages:channel-1")).to.be.null; + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null; history.push(createMessage("msg-1", 1)); - expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; + expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.not.be + .null; - const saved = JSON.parse(storage.getItem("waku:sds:messages:channel-1")!); + const saved = JSON.parse( + localStorage.getItem(`waku:sds:storage:${channelId}`)! + ); expect(saved).to.have.lengthOf(1); expect(saved[0].messageId).to.equal("msg-1"); }); it("loads messages on initialization", () => { - const storage = new MemoryStorage(); - const persistentStorage1 = PersistentStorage.create(channelId, storage); - const history1 = new LocalHistory({ storage: persistentStorage1 }); + const history1 = new LocalHistory({ storagePrefix: channelId }); history1.push(createMessage("msg-1", 1)); history1.push(createMessage("msg-2", 2)); history1.push(createMessage("msg-3", 3)); - const persistentStorage2 = PersistentStorage.create(channelId, storage); - const history2 = new LocalHistory({ storage: persistentStorage2 }); + const history2 = new LocalHistory({ storagePrefix: channelId }); expect(history2.length).to.equal(3); expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ @@ -111,59 +91,16 @@ describe("PersistentStorage", () => { }); }); - describe("Node.js only (no localStorage)", () => { - before(function () { - if (typeof localStorage !== "undefined") { - this.skip(); - } - }); + describe("In-memory fallback", () => { + it("uses in-memory only when no storage is provided", () => { + const history = new LocalHistory({ maxSize: 100 }); + history.push(createMessage("msg-3", 3)); - it("returns undefined when no storage is available", () => { - const persistentStorage = PersistentStorage.create(channelId, undefined); + expect(history.length).to.equal(1); + expect(history.slice(0)[0].messageId).to.equal("msg-3"); - expect(persistentStorage).to.equal(undefined); - }); - }); - - describe("Browser only (localStorage)", () => { - before(function () { - if (typeof localStorage === "undefined") { - this.skip(); - } - }); - - it("persists and restores messages with channelId", () => { - const testChannelId = `test-${Date.now()}`; - const history1 = new LocalHistory({ storage: testChannelId }); - history1.push(createMessage("msg-1", 1)); - history1.push(createMessage("msg-2", 2)); - - const history2 = new LocalHistory({ storage: testChannelId }); - - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([ - "msg-1", - "msg-2" - ]); - - localStorage.removeItem(`waku:sds:messages:${testChannelId}`); - }); - - it("auto-uses localStorage when channelId is provided", () => { - const testChannelId = `auto-storage-${Date.now()}`; - - const history = new LocalHistory({ storage: testChannelId }); - history.push(createMessage("msg-auto-1", 1)); - history.push(createMessage("msg-auto-2", 2)); - - const history2 = new LocalHistory({ storage: testChannelId }); - expect(history2.length).to.equal(2); - expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([ - "msg-auto-1", - "msg-auto-2" - ]); - - localStorage.removeItem(`waku:sds:messages:${testChannelId}`); + const history2 = new LocalHistory({ maxSize: 100 }); + expect(history2.length).to.equal(0); }); }); }); @@ -180,19 +117,3 @@ const createMessage = (id: string, timestamp: number): ContentMessage => { undefined ); }; - -class MemoryStorage implements IStorage { - private readonly store = new Map(); - - public getItem(key: string): string | null { - return this.store.get(key) ?? null; - } - - public setItem(key: string, value: string): void { - this.store.set(key, value); - } - - public removeItem(key: string): void { - this.store.delete(key); - } -} diff --git a/packages/sds/src/message_channel/storage/browser.ts b/packages/sds/src/message_channel/storage/browser.ts new file mode 100644 index 0000000000..aa570bf915 --- /dev/null +++ b/packages/sds/src/message_channel/storage/browser.ts @@ -0,0 +1,52 @@ +import { Logger } from "@waku/utils"; + +import { ContentMessage } from "../message.js"; + +import { + MessageSerializer, + StoredContentMessage +} from "./message_serializer.js"; + +const log = new Logger("sds:storage"); + +const STORAGE_PREFIX = "waku:sds:storage:"; + +/** + * Browser localStorage wrapper for message persistence. + */ +export class Storage { + private readonly storageKey: string; + + public constructor(storagePrefix: string) { + this.storageKey = `${STORAGE_PREFIX}${storagePrefix}`; + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) + ); + localStorage.setItem(this.storageKey, payload); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = localStorage.getItem(this.storageKey); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error) { + log.error("Failed to load messages from storage:", error); + localStorage.removeItem(this.storageKey); + return []; + } + } +} diff --git a/packages/sds/src/message_channel/storage/index.ts b/packages/sds/src/message_channel/storage/index.ts new file mode 100644 index 0000000000..365a399450 --- /dev/null +++ b/packages/sds/src/message_channel/storage/index.ts @@ -0,0 +1,2 @@ +// Node.js implementation - swapped to browser.js via package.json browser field +export { Storage } from "./node.js"; diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/storage/message_serializer.ts similarity index 50% rename from packages/sds/src/message_channel/persistent_storage.ts rename to packages/sds/src/message_channel/storage/message_serializer.ts index 36965f0630..034d688d36 100644 --- a/packages/sds/src/message_channel/persistent_storage.ts +++ b/packages/sds/src/message_channel/storage/message_serializer.ts @@ -1,24 +1,13 @@ import { bytesToHex, hexToBytes } from "@noble/hashes/utils"; -import { Logger } from "@waku/utils"; -import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; +import { ContentMessage, HistoryEntry } from "../message.js"; -const log = new Logger("sds:persistent-storage"); - -const STORAGE_PREFIX = "waku:sds:storage:"; - -export interface IStorage { - getItem(key: string): string | null; - setItem(key: string, value: string): void; - removeItem(key: string): void; -} - -type StoredCausalEntry = { +export type StoredCausalEntry = { messageId: string; retrievalHint?: string; }; -type StoredContentMessage = { +export type StoredContentMessage = { messageId: string; channelId: string; senderId: string; @@ -29,72 +18,7 @@ type StoredContentMessage = { retrievalHint?: string; }; -/** - * Persistent storage for messages. - */ -export class PersistentStorage { - private readonly storageKey: string; - - /** - * Creates a PersistentStorage for a channel, or returns undefined if no storage is available. - * If no storage is provided, attempts to use global localStorage (if available). - * Returns undefined if no storage is available. - */ - public static create( - channelId: ChannelId, - storage?: IStorage - ): PersistentStorage | undefined { - storage = - storage ?? - (typeof localStorage !== "undefined" ? localStorage : undefined); - if (!storage) { - log.info( - `No storage available. Messages will not persist across sessions. - If you're using NodeJS, you can provide a storage backend using the storage parameter.` - ); - return undefined; - } - return new PersistentStorage(channelId, storage); - } - - private constructor( - channelId: ChannelId, - private readonly storage: IStorage - ) { - this.storageKey = `${STORAGE_PREFIX}${channelId}`; - } - - public save(messages: ContentMessage[]): void { - try { - const payload = JSON.stringify( - messages.map((msg) => MessageSerializer.serializeContentMessage(msg)) - ); - this.storage.setItem(this.storageKey, payload); - } catch (error) { - log.error("Failed to save messages to storage:", error); - } - } - - public load(): ContentMessage[] { - try { - const raw = this.storage.getItem(this.storageKey); - if (!raw) { - return []; - } - - const stored = JSON.parse(raw) as StoredContentMessage[]; - return stored - .map((record) => MessageSerializer.deserializeContentMessage(record)) - .filter((message): message is ContentMessage => Boolean(message)); - } catch (error) { - log.error("Failed to load messages from storage:", error); - this.storage.removeItem(this.storageKey); - return []; - } - } -} - -class MessageSerializer { +export class MessageSerializer { public static serializeContentMessage( message: ContentMessage ): StoredContentMessage { diff --git a/packages/sds/src/message_channel/storage/node.ts b/packages/sds/src/message_channel/storage/node.ts new file mode 100644 index 0000000000..dfd2f0bc03 --- /dev/null +++ b/packages/sds/src/message_channel/storage/node.ts @@ -0,0 +1,62 @@ +import { mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname, join } from "node:path"; + +import { Logger } from "@waku/utils"; + +import { ContentMessage } from "../message.js"; + +import { + MessageSerializer, + StoredContentMessage +} from "./message_serializer.js"; + +const log = new Logger("sds:storage"); + +/** + * Node.js file-based storage for message persistence. + */ +export class Storage { + private readonly filePath: string; + + public constructor(storagePrefix: string, basePath: string = ".waku") { + this.filePath = join(basePath, `${storagePrefix}.json`); + } + + public save(messages: ContentMessage[]): void { + try { + const payload = JSON.stringify( + messages.map((msg) => MessageSerializer.serializeContentMessage(msg)), + null, + 2 + ); + mkdirSync(dirname(this.filePath), { recursive: true }); + writeFileSync(this.filePath, payload, "utf-8"); + } catch (error) { + log.error("Failed to save messages to storage:", error); + } + } + + public load(): ContentMessage[] { + try { + const raw = readFileSync(this.filePath, "utf-8"); + if (!raw) { + return []; + } + + const stored = JSON.parse(raw) as StoredContentMessage[]; + return stored + .map((record) => MessageSerializer.deserializeContentMessage(record)) + .filter((message): message is ContentMessage => Boolean(message)); + } catch (error: unknown) { + if ( + error && + typeof error === "object" && + "code" in error && + error.code !== "ENOENT" + ) { + log.error("Failed to load messages from storage:", error); + } + return []; + } + } +}