Merge ab6d3cf503f89cfea52b4c95d5d101ef2dd37261 into f2ad23ad4354fb3440ca369ed91ba4d882bbacf6

This commit is contained in:
Danish Arora 2025-12-10 20:38:26 +00:00 committed by GitHub
commit 429ec08216
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 521 additions and 54 deletions

2
.gitignore vendored
View File

@ -21,3 +21,5 @@ CLAUDE.md
.env
postgres-data/
packages/rln/waku-rlnv2-contract/
/packages/**/allure-results
/packages/**/allure-results

View File

@ -1,3 +1,16 @@
const config = require("../../karma.conf.cjs");
import path from "path";
module.exports = config;
import baseConfig from "../../karma.conf.cjs";
export default function (config) {
baseConfig(config);
const storageDir = path.resolve(__dirname, "src/message_channel/storage");
// Swap node storage for browser storage in webpack builds
config.webpack.resolve.alias = {
...config.webpack.resolve.alias,
[path.join(storageDir, "node.ts")]: path.join(storageDir, "browser.ts"),
[path.join(storageDir, "node.js")]: path.join(storageDir, "browser.ts")
};
}

View File

@ -4,6 +4,9 @@
"description": "Scalable Data Sync implementation for the browser. Based on https://github.com/vacp2p/rfc-index/blob/main/vac/raw/sds.md",
"types": "./dist/index.d.ts",
"module": "./dist/index.js",
"browser": {
"./dist/message_channel/storage/index.js": "./dist/message_channel/storage/browser.js"
},
"exports": {
".": {
"types": "./dist/index.d.ts",

View File

@ -1,13 +1,13 @@
import { expect } from "chai";
import { MemLocalHistory } from "./mem_local_history.js";
import { LocalHistory } from "./local_history.js";
import { ContentMessage } from "./message.js";
describe("MemLocalHistory", () => {
describe("LocalHistory", () => {
it("Cap max size when messages are pushed one at a time", () => {
const maxSize = 2;
const hist = new MemLocalHistory(maxSize);
const hist = new LocalHistory({ maxSize });
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
@ -31,7 +31,7 @@ describe("MemLocalHistory", () => {
it("Cap max size when a pushed array is exceeding the cap", () => {
const maxSize = 2;
const hist = new MemLocalHistory(maxSize);
const hist = new LocalHistory({ maxSize });
hist.push(
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))

View File

@ -1,9 +1,28 @@
import { Logger } from "@waku/utils";
import _ from "lodash";
import { ContentMessage, isContentMessage } from "./message.js";
import { Storage } from "./storage/index.js";
export const DEFAULT_MAX_LENGTH = 10_000;
/**
* Options for the LocalHistory constructor.
* @param storage - The storage to use for the local history.
* - prefix - The prefix for the storage.
* - customInstance - The custom storage instance to use.
* @param maxSize - The maximum number of messages to store.
*/
export type LocalHistoryOptions = {
storage?: {
prefix?: string;
customInstance?: Storage;
};
maxSize?: number;
};
const log = new Logger("sds:local-history");
/**
* In-Memory implementation of a local history of messages.
*
@ -17,15 +36,28 @@ export const DEFAULT_MAX_LENGTH = 10_000;
* If an array of items longer than `maxLength` is pushed, dropping will happen
* at next push.
*/
export class MemLocalHistory {
export class LocalHistory {
private items: ContentMessage[] = [];
private readonly storage?: Storage;
private readonly maxSize: number;
/**
* Construct a new in-memory local history
*
* @param maxLength The maximum number of message to store.
*/
public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {}
public constructor(opts: LocalHistoryOptions = {}) {
const { storage, maxSize } = opts;
const { prefix, customInstance } = storage ?? {};
this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH;
if (customInstance) {
this.storage = customInstance;
log.info("Using custom storage instance", { customInstance });
} else if (prefix) {
this.storage = new Storage(prefix);
log.info("Creating storage with prefix", { prefix });
} else {
this.storage = undefined;
log.info("Using in-memory storage");
}
this.load();
}
public get length(): number {
return this.items.length;
@ -47,11 +79,13 @@ export class MemLocalHistory {
this.items = _.uniqBy(combinedItems, "messageId");
// Let's drop older messages if max length is reached
if (this.length > this.maxLength) {
const numItemsToRemove = this.length - this.maxLength;
if (this.length > this.maxSize) {
const numItemsToRemove = this.length - this.maxSize;
this.items.splice(0, numItemsToRemove);
}
this.save();
return this.items.length;
}
@ -99,4 +133,15 @@ export class MemLocalHistory {
);
}
}
private save(): void {
this.storage?.save(this.items);
}
private load(): void {
const messages = this.storage?.load() ?? [];
if (messages.length > 0) {
this.items = messages;
}
}
}

View File

@ -4,6 +4,7 @@ import { expect } from "chai";
import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { MessageChannelEvent } from "./events.js";
import { LocalHistory } from "./local_history.js";
import {
ContentMessage,
HistoryEntry,
@ -13,8 +14,8 @@ import {
} from "./message.js";
import {
DEFAULT_BLOOM_FILTER_OPTIONS,
ILocalHistory,
MessageChannel
MessageChannel,
MessageChannelOptions
} from "./message_channel.js";
const channelId = "test-channel";
@ -22,6 +23,18 @@ const callback = (_message: Message): Promise<{ success: boolean }> => {
return Promise.resolve({ success: true });
};
/**
* Test helper to create a MessageChannel with LocalHistory.
* This avoids localStorage pollution in tests and tests core functionality.
*/
const createTestChannel = (
channelId: string,
senderId: string,
options: MessageChannelOptions = {}
): MessageChannel => {
return new MessageChannel(channelId, senderId, options, new LocalHistory());
};
const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => {
return channel["filter"] as DefaultBloomFilter;
};
@ -68,7 +81,7 @@ describe("MessageChannel", function () {
describe("sending a message ", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice");
channelA = createTestChannel(channelId, "alice");
});
it("should increase lamport timestamp", async () => {
@ -98,11 +111,11 @@ describe("MessageChannel", function () {
const expectedTimestamp = channelA["lamportTimestamp"] + 1n;
const messageId = MessageChannel.getMessageId(payload);
await sendMessage(channelA, payload, callback);
const messageIdLog = channelA["localHistory"] as ILocalHistory;
const messageIdLog = channelA["localHistory"] as LocalHistory;
expect(messageIdLog.length).to.equal(1);
expect(
messageIdLog.some(
(log) =>
(log: ContentMessage) =>
log.lamportTimestamp === expectedTimestamp &&
log.messageId === messageId
)
@ -119,7 +132,7 @@ describe("MessageChannel", function () {
return { success: true, retrievalHint: testRetrievalHint };
});
const localHistory = channelA["localHistory"] as ILocalHistory;
const localHistory = channelA["localHistory"] as LocalHistory;
expect(localHistory.length).to.equal(1);
// Find the message in local history
@ -171,8 +184,8 @@ describe("MessageChannel", function () {
describe("receiving a message", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice");
channelB = new MessageChannel(channelId, "bob");
channelA = createTestChannel(channelId, "alice");
channelB = createTestChannel(channelId, "bob");
});
it("should increase lamport timestamp", async () => {
@ -187,8 +200,8 @@ describe("MessageChannel", function () {
// TODO: test is failing in CI, investigate in https://github.com/waku-org/js-waku/issues/2648
it.skip("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => {
const testChannelA = new MessageChannel(channelId, "alice");
const testChannelB = new MessageChannel(channelId, "bob");
const testChannelA = createTestChannel(channelId, "alice");
const testChannelB = createTestChannel(channelId, "bob");
const timestampBefore = testChannelA["lamportTimestamp"];
@ -305,7 +318,7 @@ describe("MessageChannel", function () {
testRetrievalHint
);
const localHistory = channelA["localHistory"] as ILocalHistory;
const localHistory = channelA["localHistory"] as LocalHistory;
expect(localHistory.length).to.equal(1);
// Find the message in local history
@ -427,7 +440,7 @@ describe("MessageChannel", function () {
)
);
const localHistory = channelA["localHistory"] as ILocalHistory;
const localHistory = channelA["localHistory"] as LocalHistory;
expect(localHistory.length).to.equal(2);
// When timestamps are equal, should be ordered by messageId lexicographically
@ -452,10 +465,10 @@ describe("MessageChannel", function () {
describe("reviewing ack status", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice", {
channelA = createTestChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should mark all messages in causal history as acknowledged", async () => {
@ -661,10 +674,10 @@ describe("MessageChannel", function () {
describe("Sweeping incoming buffer", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice", {
channelA = createTestChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should detect messages with missing dependencies", async () => {
@ -746,7 +759,7 @@ describe("MessageChannel", function () {
it("should mark a message as irretrievably lost if timeout is exceeded", async () => {
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
const channelC = createTestChannel(channelId, "carol", {
timeoutForLostMessagesMs: 10
});
@ -789,7 +802,7 @@ describe("MessageChannel", function () {
let lostMessages: HistoryEntry[] = [];
// Create a channel with very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
const channelC = createTestChannel(channelId, "carol", {
timeoutForLostMessagesMs: 10
});
@ -853,7 +866,7 @@ describe("MessageChannel", function () {
it("should remove messages without delivering if timeout is exceeded", async () => {
const causalHistorySize = channelA["causalHistorySize"];
// Create a channel with very very short timeout
const channelC: MessageChannel = new MessageChannel(channelId, "carol", {
const channelC = createTestChannel(channelId, "carol", {
timeoutForLostMessagesMs: 10
});
@ -1043,10 +1056,10 @@ describe("MessageChannel", function () {
describe("Sweeping outgoing buffer", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice", {
channelA = createTestChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 });
});
it("should partition messages based on acknowledgement status", async () => {
@ -1088,10 +1101,10 @@ describe("MessageChannel", function () {
describe("Sync messages", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice", {
channelA = createTestChannel(channelId, "alice", {
causalHistorySize: 2
});
channelB = new MessageChannel(channelId, "bob", { causalHistorySize: 2 });
channelB = createTestChannel(channelId, "bob", { causalHistorySize: 2 });
const message = utf8ToBytes("first message in channel");
channelA["localHistory"].push(
new ContentMessage(
@ -1115,7 +1128,7 @@ describe("MessageChannel", function () {
});
it("should not be sent when there is no history", async () => {
const channelC = new MessageChannel(channelId, "carol", {
const channelC = createTestChannel(channelId, "carol", {
causalHistorySize: 2
});
const res = await channelC.pushOutgoingSyncMessage(async (_msg) => {
@ -1160,7 +1173,7 @@ describe("MessageChannel", function () {
});
it("should update ack status of messages in outgoing buffer", async () => {
const channelC = new MessageChannel(channelId, "carol", {
const channelC = createTestChannel(channelId, "carol", {
causalHistorySize: 2
});
for (const m of messagesA) {
@ -1185,7 +1198,7 @@ describe("MessageChannel", function () {
describe("Ephemeral messages", () => {
beforeEach(() => {
channelA = new MessageChannel(channelId, "alice");
channelA = createTestChannel(channelId, "alice");
});
it("should be sent without a timestamp, causal history, or bloom filter", async () => {
@ -1208,7 +1221,7 @@ describe("MessageChannel", function () {
});
it("should be delivered immediately if received", async () => {
const channelB = new MessageChannel(channelId, "bob");
const channelB = createTestChannel(channelId, "bob");
// Track initial state
const localHistoryBefore = channelB["localHistory"].length;
@ -1236,4 +1249,67 @@ describe("MessageChannel", function () {
expect(channelB["lamportTimestamp"]).to.equal(timestampBefore);
});
});
describe("localStorage persistence", function () {
// LocalStorage specific tests (browser)
before(function () {
if (typeof localStorage === "undefined") {
this.skip();
}
});
it("should restore messages from localStorage on channel recreation", async () => {
const persistentChannelId = "persistent-channel";
const channel1 = new MessageChannel(persistentChannelId, "alice");
await sendMessage(channel1, utf8ToBytes("msg-1"), callback);
await sendMessage(channel1, utf8ToBytes("msg-2"), callback);
expect(channel1["localHistory"].length).to.equal(2);
// Recreate channel with same storage - should load history
const channel2 = new MessageChannel(persistentChannelId, "alice");
expect(channel2["localHistory"].length).to.equal(2);
expect(
channel2["localHistory"].slice(0).map((m) => m.messageId)
).to.deep.equal([
MessageChannel.getMessageId(utf8ToBytes("msg-1")),
MessageChannel.getMessageId(utf8ToBytes("msg-2"))
]);
});
it("should include persisted messages in causal history after restart", async () => {
const persistentChannelId = "persistent-causal";
const channel1 = new MessageChannel(persistentChannelId, "alice", {
causalHistorySize: 2
});
await sendMessage(channel1, utf8ToBytes("msg-1"), callback);
await sendMessage(channel1, utf8ToBytes("msg-2"), callback);
await sendMessage(channel1, utf8ToBytes("msg-3"), callback);
const channel2 = new MessageChannel(persistentChannelId, "alice", {
causalHistorySize: 2
});
let capturedMessage: ContentMessage | null = null;
await sendMessage(channel2, utf8ToBytes("msg-4"), async (message) => {
capturedMessage = message;
return { success: true };
});
expect(capturedMessage).to.not.be.null;
expect(capturedMessage!.causalHistory).to.have.lengthOf(2);
// Should reference the last 2 messages (msg-2 and msg-3)
expect(capturedMessage!.causalHistory[0].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes("msg-2"))
);
expect(capturedMessage!.causalHistory[1].messageId).to.equal(
MessageChannel.getMessageId(utf8ToBytes("msg-3"))
);
});
});
});

View File

@ -7,7 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
import { MemLocalHistory } from "./mem_local_history.js";
import { LocalHistory } from "./local_history.js";
import {
ChannelId,
ContentMessage,
@ -63,11 +63,6 @@ export interface MessageChannelOptions {
repairConfig?: RepairConfig;
}
export type ILocalHistory = Pick<
Array<ContentMessage>,
"some" | "push" | "slice" | "find" | "length" | "findIndex"
>;
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
public readonly channelId: ChannelId;
public readonly senderId: ParticipantId;
@ -76,7 +71,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
private outgoingBuffer: ContentMessage[];
private possibleAcks: Map<MessageId, number>;
private incomingBuffer: Array<ContentMessage | SyncMessage>;
private readonly localHistory: ILocalHistory;
private readonly localHistory: LocalHistory;
private timeReceived: Map<MessageId, number>;
private readonly causalHistorySize: number;
private readonly possibleAcksThreshold: number;
@ -106,7 +101,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
channelId: ChannelId,
senderId: ParticipantId,
options: MessageChannelOptions = {},
localHistory: ILocalHistory = new MemLocalHistory()
localHistory?: LocalHistory
) {
super();
this.channelId = channelId;
@ -117,7 +112,8 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
this.outgoingBuffer = [];
this.possibleAcks = new Map();
this.incomingBuffer = [];
this.localHistory = localHistory;
this.localHistory =
localHistory ?? new LocalHistory({ storage: { prefix: channelId } });
this.causalHistorySize =
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
// TODO: this should be determined based on the bloom filter parameters and number of hashes

View File

@ -0,0 +1,119 @@
import { expect } from "chai";
import { LocalHistory } from "./local_history.js";
import { ContentMessage } from "./message.js";
const channelId = "channel-1";
describe("Storage", () => {
describe("Browser localStorage", () => {
before(function () {
if (typeof localStorage === "undefined") {
this.skip();
}
});
afterEach(() => {
localStorage.removeItem(`waku:sds:storage:${channelId}`);
});
it("persists and restores messages", () => {
const history1 = new LocalHistory({ storage: { prefix: channelId } });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
const history2 = new LocalHistory({ storage: { prefix: channelId } });
expect(history2.length).to.equal(2);
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
"msg-1",
"msg-2"
]);
});
it("handles corrupt data gracefully", () => {
localStorage.setItem(`waku:sds:storage:${channelId}`, "{ invalid json }");
const history = new LocalHistory({ storage: { prefix: channelId } });
expect(history.length).to.equal(0);
// Corrupt data is removed
expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null;
});
it("isolates history by channel ID", () => {
const history1 = new LocalHistory({ storage: { prefix: "channel-1" } });
const history2 = new LocalHistory({ storage: { prefix: "channel-2" } });
history1.push(createMessage("msg-1", 1));
history2.push(createMessage("msg-2", 2));
expect(history1.length).to.equal(1);
expect(history1.slice(0)[0].messageId).to.equal("msg-1");
expect(history2.length).to.equal(1);
expect(history2.slice(0)[0].messageId).to.equal("msg-2");
localStorage.removeItem("waku:sds:storage:channel-2");
});
it("saves messages after each push", () => {
const history = new LocalHistory({ storage: { prefix: channelId } });
expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.be.null;
history.push(createMessage("msg-1", 1));
expect(localStorage.getItem(`waku:sds:storage:${channelId}`)).to.not.be
.null;
const saved = JSON.parse(
localStorage.getItem(`waku:sds:storage:${channelId}`)!
);
expect(saved).to.have.lengthOf(1);
expect(saved[0].messageId).to.equal("msg-1");
});
it("loads messages on initialization", () => {
const history1 = new LocalHistory({ storage: { prefix: channelId } });
history1.push(createMessage("msg-1", 1));
history1.push(createMessage("msg-2", 2));
history1.push(createMessage("msg-3", 3));
const history2 = new LocalHistory({ storage: { prefix: channelId } });
expect(history2.length).to.equal(3);
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
"msg-1",
"msg-2",
"msg-3"
]);
});
});
describe("In-memory fallback", () => {
it("uses in-memory only when no storage is provided", () => {
const history = new LocalHistory({ maxSize: 100 });
history.push(createMessage("msg-3", 3));
expect(history.length).to.equal(1);
expect(history.slice(0)[0].messageId).to.equal("msg-3");
const history2 = new LocalHistory({ maxSize: 100 });
expect(history2.length).to.equal(0);
});
});
});
const createMessage = (id: string, timestamp: number): ContentMessage => {
return new ContentMessage(
id,
channelId,
"sender",
[],
BigInt(timestamp),
undefined,
new Uint8Array([timestamp]),
undefined
);
};

View File

@ -1,8 +1,8 @@
import { Logger } from "@waku/utils";
import { LocalHistory } from "../local_history.js";
import type { HistoryEntry, MessageId } from "../message.js";
import { Message } from "../message.js";
import type { ILocalHistory } from "../message_channel.js";
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
import {
@ -183,7 +183,7 @@ export class RepairManager {
*/
public processIncomingRepairRequests(
requests: HistoryEntry[],
localHistory: ILocalHistory,
localHistory: LocalHistory,
currentTime = Date.now()
): void {
for (const request of requests) {
@ -248,7 +248,7 @@ export class RepairManager {
* Returns messages that should be rebroadcast
*/
public sweepIncomingBuffer(
localHistory: ILocalHistory,
localHistory: LocalHistory,
currentTime = Date.now()
): Message[] {
const ready = this.incomingBuffer.getReady(currentTime);

View File

@ -0,0 +1,52 @@
import { Logger } from "@waku/utils";
import { ContentMessage } from "../message.js";
import {
MessageSerializer,
StoredContentMessage
} from "./message_serializer.js";
const log = new Logger("sds:storage");
const STORAGE_NAMESPACE = "waku:sds:storage:";
/**
* Browser localStorage wrapper for message persistence.
*/
export class Storage {
private readonly storageKey: string;
public constructor(storagePrefix: string) {
this.storageKey = `${STORAGE_NAMESPACE}${storagePrefix}`;
}
public save(messages: ContentMessage[]): void {
try {
const payload = JSON.stringify(
messages.map((msg) => MessageSerializer.serializeContentMessage(msg))
);
localStorage.setItem(this.storageKey, payload);
} catch (error) {
log.error("Failed to save messages to storage:", error);
}
}
public load(): ContentMessage[] {
try {
const raw = localStorage.getItem(this.storageKey);
if (!raw) {
return [];
}
const stored = JSON.parse(raw) as StoredContentMessage[];
return stored
.map((record) => MessageSerializer.deserializeContentMessage(record))
.filter((message): message is ContentMessage => Boolean(message));
} catch (error) {
log.error("Failed to load messages from storage:", error);
localStorage.removeItem(this.storageKey);
return [];
}
}
}

View File

@ -0,0 +1,2 @@
// Node.js implementation - swapped to browser.js via package.json browser field
export { Storage } from "./node.js";

View File

@ -0,0 +1,97 @@
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import { ContentMessage, HistoryEntry } from "../message.js";
export type StoredCausalEntry = {
messageId: string;
retrievalHint?: string;
};
export type StoredContentMessage = {
messageId: string;
channelId: string;
senderId: string;
lamportTimestamp: string;
causalHistory: StoredCausalEntry[];
bloomFilter?: string;
content: string;
retrievalHint?: string;
};
export class MessageSerializer {
public static serializeContentMessage(
message: ContentMessage
): StoredContentMessage {
return {
messageId: message.messageId,
channelId: message.channelId,
senderId: message.senderId,
lamportTimestamp: message.lamportTimestamp.toString(),
causalHistory: message.causalHistory.map((entry) =>
MessageSerializer.serializeCausalEntry(entry)
),
bloomFilter: MessageSerializer.toHex(message.bloomFilter),
content: bytesToHex(new Uint8Array(message.content)),
retrievalHint: MessageSerializer.toHex(message.retrievalHint)
};
}
public static deserializeContentMessage(
record: StoredContentMessage
): ContentMessage | undefined {
try {
const content = hexToBytes(record.content);
return new ContentMessage(
record.messageId,
record.channelId,
record.senderId,
record.causalHistory.map((entry) =>
MessageSerializer.deserializeCausalEntry(entry)
),
BigInt(record.lamportTimestamp),
MessageSerializer.fromHex(record.bloomFilter),
content,
[],
MessageSerializer.fromHex(record.retrievalHint)
);
} catch {
return undefined;
}
}
private static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry {
return {
messageId: entry.messageId,
retrievalHint: entry.retrievalHint
? bytesToHex(entry.retrievalHint)
: undefined
};
}
private static deserializeCausalEntry(
entry: StoredCausalEntry
): HistoryEntry {
return {
messageId: entry.messageId,
retrievalHint: entry.retrievalHint
? hexToBytes(entry.retrievalHint)
: undefined
};
}
private static toHex(
data?: Uint8Array | Uint8Array<ArrayBufferLike>
): string | undefined {
if (!data || data.length === 0) {
return undefined;
}
return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data));
}
private static fromHex(value?: string): Uint8Array | undefined {
if (!value) {
return undefined;
}
return hexToBytes(value);
}
}

View File

@ -0,0 +1,62 @@
import { mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { Logger } from "@waku/utils";
import { ContentMessage } from "../message.js";
import {
MessageSerializer,
StoredContentMessage
} from "./message_serializer.js";
const log = new Logger("sds:storage");
/**
* Node.js file-based storage for message persistence.
*/
export class Storage {
private readonly filePath: string;
public constructor(storagePrefix: string, basePath: string = ".waku") {
this.filePath = join(basePath, `${storagePrefix}.json`);
}
public save(messages: ContentMessage[]): void {
try {
const payload = JSON.stringify(
messages.map((msg) => MessageSerializer.serializeContentMessage(msg)),
null,
2
);
mkdirSync(dirname(this.filePath), { recursive: true });
writeFileSync(this.filePath, payload, "utf-8");
} catch (error) {
log.error("Failed to save messages to storage:", error);
}
}
public load(): ContentMessage[] {
try {
const raw = readFileSync(this.filePath, "utf-8");
if (!raw) {
return [];
}
const stored = JSON.parse(raw) as StoredContentMessage[];
return stored
.map((record) => MessageSerializer.deserializeContentMessage(record))
.filter((message): message is ContentMessage => Boolean(message));
} catch (error: unknown) {
if (
error &&
typeof error === "object" &&
"code" in error &&
error.code !== "ENOENT"
) {
log.error("Failed to load messages from storage:", error);
}
return [];
}
}
}