refactor: implement browser and Node.js storage solutions for message persistence, updating LocalHistory to utilize a unified Storage interface and enhancing tests for localStorage functionality

This commit is contained in:
Danish Arora 2025-12-10 15:13:10 -05:00
parent 721c494567
commit 8a749c453a
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
10 changed files with 193 additions and 212 deletions

2
.gitignore vendored
View File

@ -21,3 +21,5 @@ CLAUDE.md
.env
postgres-data/
packages/rln/waku-rlnv2-contract/
/packages/**/allure-results
/packages/**/allure-results

View File

@ -1,3 +1,16 @@
const config = require("../../karma.conf.cjs");
import path from "path";
module.exports = config;
import baseConfig from "../../karma.conf.cjs";
export default function (config) {
baseConfig(config);
const storageDir = path.resolve(__dirname, "src/message_channel/storage");
// Swap node storage for browser storage in webpack builds
config.webpack.resolve.alias = {
...config.webpack.resolve.alias,
[path.join(storageDir, "node.ts")]: path.join(storageDir, "browser.ts"),
[path.join(storageDir, "node.js")]: path.join(storageDir, "browser.ts")
};
}

View File

@ -4,6 +4,9 @@
"description": "Scalable Data Sync implementation for the browser. Based on https://github.com/vacp2p/rfc-index/blob/main/vac/raw/sds.md",
"types": "./dist/index.d.ts",
"module": "./dist/index.js",
"browser": {
"./dist/message_channel/storage/index.js": "./dist/message_channel/storage/browser.js"
},
"exports": {
".": {
"types": "./dist/index.d.ts",

View File

@ -1,8 +1,8 @@
import { Logger } from "@waku/utils";
import _ from "lodash";
import { type ChannelId, ContentMessage, isContentMessage } from "./message.js";
import { PersistentStorage } from "./persistent_storage.js";
import { ContentMessage, isContentMessage } from "./message.js";
import { Storage } from "./storage/index.js";
export const DEFAULT_MAX_LENGTH = 10_000;
@ -21,7 +21,8 @@ export const DEFAULT_MAX_LENGTH = 10_000;
*/
export type LocalHistoryOptions = {
storage?: ChannelId | PersistentStorage;
storagePrefix?: string;
storage?: Storage;
maxSize?: number;
};
@ -29,25 +30,26 @@ const log = new Logger("sds:local-history");
export class LocalHistory {
private items: ContentMessage[] = [];
private readonly storage?: PersistentStorage;
private readonly storage?: Storage;
private readonly maxSize: number;
/**
* Construct a new in-memory local history.
*
* @param opts Configuration object.
* - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage.
* - storagePrefix: Optional prefix for persistent storage (creates Storage if provided).
* - storage: Optional explicit Storage instance.
* - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH.
*/
public constructor(opts: LocalHistoryOptions = {}) {
const { storage, maxSize } = opts;
const { storagePrefix, storage, maxSize } = opts;
this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH;
if (storage instanceof PersistentStorage) {
if (storage) {
this.storage = storage;
log.info("Using explicit persistent storage");
} else if (typeof storage === "string") {
this.storage = PersistentStorage.create(storage);
log.info("Creating persistent storage for channel", storage);
log.info("Using explicit storage");
} else if (storagePrefix) {
this.storage = new Storage(storagePrefix);
log.info("Creating storage for prefix", storagePrefix);
} else {
this.storage = undefined;
log.info("Using in-memory storage");

View File

@ -113,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.possibleAcks = new Map();
this.incomingBuffer = [];
this.localHistory =
localHistory ?? new LocalHistory({ storage: channelId });
localHistory ?? new LocalHistory({ storagePrefix: channelId });
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
// TODO: this should be determined based on the bloom filter parameters and number of hashes

View File

@ -2,23 +2,27 @@ import { expect } from "chai";
import { LocalHistory } from "./local_history.js";
import { ContentMessage } from "./message.js";
import { IStorage, PersistentStorage } from "./persistent_storage.js";
const channelId = "channel-1";
describe("PersistentStorage", () => {
describe("Explicit storage", () => {
describe("Storage", () => {
describe("Browser localStorage", () => {
before(function () {
if (typeof localStorage === "undefined") {
this.skip();
}
});
afterEach(() => {
localStorage.removeItem(`waku:sds:storage:${channelId}`);
});
it("persists and restores messages", () => {
const storage = new MemoryStorage();
const persistentStorage = PersistentStorage.create(channelId, storage);
expect(persistentStorage).to.not.be.undefined;
const history1 = new LocalHistory({ storage: persistentStorage });
const history1 = new LocalHistory({ storagePrefix: channelId });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
const history2 = new LocalHistory({ storage: persistentStorage });
const history2 = new LocalHistory({ storagePrefix: channelId });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
@ -27,39 +31,18 @@ describe("PersistentStorage", () => {
]);
});
it("uses in-memory only when no storage is provided", () => {
const history = new LocalHistory({ maxSize: 100 });
history.push(createMessage("msg-3", 3));
expect(history.length).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3");
const history2 = new LocalHistory({ maxSize: 100 });
expect(history2.length).to.equal(0);
});
it("handles corrupt data in storage gracefully", () => {
const storage = new MemoryStorage();
// Corrupt data
storage.setItem("waku:sds:messages:channel-1", "{ invalid json }");
const persistentStorage = PersistentStorage.create(channelId, storage);
const history = new LocalHistory({ storage: persistentStorage });
it("handles corrupt data gracefully", () => {
localStorage.setItem(`waku:sds:storage:${channelId}`, "{ invalid json }");
const history = new LocalHistory({ storagePrefix: channelId });
expect(history.length).to.equal(0);
// Corrupt data is not saved
expect(storage.getItem("waku:sds:messages:channel-1")).to.equal(null);
// Corrupt data is removed
expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null;
});
it("isolates history by channel ID", () => {
const storage = new MemoryStorage();
const storage1 = PersistentStorage.create("channel-1", storage);
const storage2 = PersistentStorage.create("channel-2", storage);
const history1 = new LocalHistory({ storage: storage1 });
const history2 = new LocalHistory({ storage: storage2 });
const history1 = new LocalHistory({ storagePrefix: "channel-1" });
const history2 = new LocalHistory({ storagePrefix: "channel-2" });
history1.push(createMessage("msg-1", 1));
history2.push(createMessage("msg-2", 2));
@ -70,37 +53,34 @@ describe("PersistentStorage", () => {
expect(history2.length).to.equal(1);
expect(history2.slice(0)[0].messageId).to.equal("msg-2");
expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null;
expect(storage.getItem("waku:sds:messages:channel-2")).to.not.be.null;
localStorage.removeItem("waku:sds:storage:channel-2");
});
it("saves messages after each push", () => {
const storage = new MemoryStorage();
const persistentStorage = PersistentStorage.create(channelId, storage);
const history = new LocalHistory({ storage: persistentStorage });
const history = new LocalHistory({ storagePrefix: channelId });
expect(storage.getItem("waku:sds:messages:channel-1")).to.be.null;
expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null;
history.push(createMessage("msg-1", 1));
expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null;
expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.not.be
.null;
const saved = JSON.parse(storage.getItem("waku:sds:messages:channel-1")!);
const saved = JSON.parse(
localStorage.getItem(`waku:sds:storage:${channelId}`)!
);
expect(saved).to.have.lengthOf(1);
expect(saved[0].messageId).to.equal("msg-1");
});
it("loads messages on initialization", () => {
const storage = new MemoryStorage();
const persistentStorage1 = PersistentStorage.create(channelId, storage);
const history1 = new LocalHistory({ storage: persistentStorage1 });
const history1 = new LocalHistory({ storagePrefix: channelId });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
history1.push(createMessage("msg-3", 3));
const persistentStorage2 = PersistentStorage.create(channelId, storage);
const history2 = new LocalHistory({ storage: persistentStorage2 });
const history2 = new LocalHistory({ storagePrefix: channelId });
expect(history2.length).to.equal(3);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
@ -111,59 +91,16 @@ describe("PersistentStorage", () => {
});
});
describe("Node.js only (no localStorage)", () => {
before(function () {
if (typeof localStorage !== "undefined") {
this.skip();
}
});
describe("In-memory fallback", () => {
it("uses in-memory only when no storage is provided", () => {
const history = new LocalHistory({ maxSize: 100 });
history.push(createMessage("msg-3", 3));
it("returns undefined when no storage is available", () => {
const persistentStorage = PersistentStorage.create(channelId, undefined);
expect(history.length).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3");
expect(persistentStorage).to.equal(undefined);
});
});
describe("Browser only (localStorage)", () => {
before(function () {
if (typeof localStorage === "undefined") {
this.skip();
}
});
it("persists and restores messages with channelId", () => {
const testChannelId = `test-${Date.now()}`;
const history1 = new LocalHistory({ storage: testChannelId });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
const history2 = new LocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
"msg-1",
"msg-2"
]);
localStorage.removeItem(`waku:sds:messages:${testChannelId}`);
});
it("auto-uses localStorage when channelId is provided", () => {
const testChannelId = `auto-storage-${Date.now()}`;
const history = new LocalHistory({ storage: testChannelId });
history.push(createMessage("msg-auto-1", 1));
history.push(createMessage("msg-auto-2", 2));
const history2 = new LocalHistory({ storage: testChannelId });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
"msg-auto-1",
"msg-auto-2"
]);
localStorage.removeItem(`waku:sds:messages:${testChannelId}`);
const history2 = new LocalHistory({ maxSize: 100 });
expect(history2.length).to.equal(0);
});
});
});
@ -180,19 +117,3 @@ const createMessage = (id: string, timestamp: number): ContentMessage => {
undefined
);
};
class MemoryStorage implements IStorage {
private readonly store = new Map<string, string>();
public getItem(key: string): string | null {
return this.store.get(key) ?? null;
}
public setItem(key: string, value: string): void {
this.store.set(key, value);
}
public removeItem(key: string): void {
this.store.delete(key);
}
}

View File

@ -0,0 +1,52 @@
import { Logger } from "@waku/utils";
import { ContentMessage } from "../message.js";
import {
MessageSerializer,
StoredContentMessage
} from "./message_serializer.js";
const log = new Logger("sds:storage");
const STORAGE_PREFIX = "waku:sds:storage:";
/**
* Browser localStorage wrapper for message persistence.
*/
export class Storage {
private readonly storageKey: string;
public constructor(storagePrefix: string) {
this.storageKey = `${STORAGE_PREFIX}${storagePrefix}`;
}
public save(messages: ContentMessage[]): void {
try {
const payload = JSON.stringify(
messages.map((msg) => MessageSerializer.serializeContentMessage(msg))
);
localStorage.setItem(this.storageKey, payload);
} catch (error) {
log.error("Failed to save messages to storage:", error);
}
}
public load(): ContentMessage[] {
try {
const raw = localStorage.getItem(this.storageKey);
if (!raw) {
return [];
}
const stored = JSON.parse(raw) as StoredContentMessage[];
return stored
.map((record) => MessageSerializer.deserializeContentMessage(record))
.filter((message): message is ContentMessage => Boolean(message));
} catch (error) {
log.error("Failed to load messages from storage:", error);
localStorage.removeItem(this.storageKey);
return [];
}
}
}

View File

@ -0,0 +1,2 @@
// Node.js implementation - swapped to browser.js via package.json browser field
export { Storage } from "./node.js";

View File

@ -1,24 +1,13 @@
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import { Logger } from "@waku/utils";
import { ChannelId, ContentMessage, HistoryEntry } from "./message.js";
import { ContentMessage, HistoryEntry } from "../message.js";
const log = new Logger("sds:persistent-storage");
const STORAGE_PREFIX = "waku:sds:storage:";
export interface IStorage {
getItem(key: string): string | null;
setItem(key: string, value: string): void;
removeItem(key: string): void;
}
type StoredCausalEntry = {
export type StoredCausalEntry = {
messageId: string;
retrievalHint?: string;
};
type StoredContentMessage = {
export type StoredContentMessage = {
messageId: string;
channelId: string;
senderId: string;
@ -29,72 +18,7 @@ type StoredContentMessage = {
retrievalHint?: string;
};
/**
* Persistent storage for messages.
*/
export class PersistentStorage {
private readonly storageKey: string;
/**
* Creates a PersistentStorage for a channel, or returns undefined if no storage is available.
* If no storage is provided, attempts to use global localStorage (if available).
* Returns undefined if no storage is available.
*/
public static create(
channelId: ChannelId,
storage?: IStorage
): PersistentStorage | undefined {
storage =
storage ??
(typeof localStorage !== "undefined" ? localStorage : undefined);
if (!storage) {
log.info(
`No storage available. Messages will not persist across sessions.
If you're using NodeJS, you can provide a storage backend using the storage parameter.`
);
return undefined;
}
return new PersistentStorage(channelId, storage);
}
private constructor(
channelId: ChannelId,
private readonly storage: IStorage
) {
this.storageKey = `${STORAGE_PREFIX}${channelId}`;
}
public save(messages: ContentMessage[]): void {
try {
const payload = JSON.stringify(
messages.map((msg) => MessageSerializer.serializeContentMessage(msg))
);
this.storage.setItem(this.storageKey, payload);
} catch (error) {
log.error("Failed to save messages to storage:", error);
}
}
public load(): ContentMessage[] {
try {
const raw = this.storage.getItem(this.storageKey);
if (!raw) {
return [];
}
const stored = JSON.parse(raw) as StoredContentMessage[];
return stored
.map((record) => MessageSerializer.deserializeContentMessage(record))
.filter((message): message is ContentMessage => Boolean(message));
} catch (error) {
log.error("Failed to load messages from storage:", error);
this.storage.removeItem(this.storageKey);
return [];
}
}
}
class MessageSerializer {
export class MessageSerializer {
public static serializeContentMessage(
message: ContentMessage
): StoredContentMessage {

View File

@ -0,0 +1,62 @@
import { mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { Logger } from "@waku/utils";
import { ContentMessage } from "../message.js";
import {
MessageSerializer,
StoredContentMessage
} from "./message_serializer.js";
const log = new Logger("sds:storage");
/**
* Node.js file-based storage for message persistence.
*/
export class Storage {
private readonly filePath: string;
public constructor(storagePrefix: string, basePath: string = ".waku") {
this.filePath = join(basePath, `${storagePrefix}.json`);
}
public save(messages: ContentMessage[]): void {
try {
const payload = JSON.stringify(
messages.map((msg) => MessageSerializer.serializeContentMessage(msg)),
null,
2
);
mkdirSync(dirname(this.filePath), { recursive: true });
writeFileSync(this.filePath, payload, "utf-8");
} catch (error) {
log.error("Failed to save messages to storage:", error);
}
}
public load(): ContentMessage[] {
try {
const raw = readFileSync(this.filePath, "utf-8");
if (!raw) {
return [];
}
const stored = JSON.parse(raw) as StoredContentMessage[];
return stored
.map((record) => MessageSerializer.deserializeContentMessage(record))
.filter((message): message is ContentMessage => Boolean(message));
} catch (error: unknown) {
if (
error &&
typeof error === "object" &&
"code" in error &&
error.code !== "ENOENT"
) {
log.error("Failed to load messages from storage:", error);
}
return [];
}
}
}