mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-05 23:33:08 +00:00
feat: Introduce PersistentStorage for message history management and refactor MemLocalHistory to support optional persistent storage.
This commit is contained in:
parent
ad4973aa20
commit
503c348b79
@ -14,3 +14,8 @@ export {
|
|||||||
isEphemeralMessage,
|
isEphemeralMessage,
|
||||||
isSyncMessage
|
isSyncMessage
|
||||||
} from "./message.js";
|
} from "./message.js";
|
||||||
|
export { ILocalHistory, MemLocalHistory } from "./mem_local_history.js";
|
||||||
|
export {
|
||||||
|
PersistentStorage,
|
||||||
|
type HistoryStorage
|
||||||
|
} from "./persistent_storage.js";
|
||||||
|
|||||||
@ -7,7 +7,7 @@ describe("MemLocalHistory", () => {
|
|||||||
it("Cap max size when messages are pushed one at a time", () => {
|
it("Cap max size when messages are pushed one at a time", () => {
|
||||||
const maxSize = 2;
|
const maxSize = 2;
|
||||||
|
|
||||||
const hist = new MemLocalHistory(maxSize);
|
const hist = new MemLocalHistory({ maxSize: maxSize });
|
||||||
|
|
||||||
hist.push(
|
hist.push(
|
||||||
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
|
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
|
||||||
@ -31,7 +31,7 @@ describe("MemLocalHistory", () => {
|
|||||||
it("Cap max size when a pushed array is exceeding the cap", () => {
|
it("Cap max size when a pushed array is exceeding the cap", () => {
|
||||||
const maxSize = 2;
|
const maxSize = 2;
|
||||||
|
|
||||||
const hist = new MemLocalHistory(maxSize);
|
const hist = new MemLocalHistory({ maxSize: maxSize });
|
||||||
|
|
||||||
hist.push(
|
hist.push(
|
||||||
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
|
new ContentMessage("1", "c", "a", [], 1n, undefined, new Uint8Array([1]))
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
import _ from "lodash";
|
import _ from "lodash";
|
||||||
|
|
||||||
import { ContentMessage, isContentMessage } from "./message.js";
|
import { ChannelId, ContentMessage, isContentMessage } from "./message.js";
|
||||||
|
import { PersistentStorage } from "./persistent_storage.js";
|
||||||
|
|
||||||
export const DEFAULT_MAX_LENGTH = 10_000;
|
export const DEFAULT_MAX_LENGTH = 10_000;
|
||||||
|
|
||||||
@ -49,13 +50,31 @@ export interface ILocalHistory {
|
|||||||
|
|
||||||
export class MemLocalHistory implements ILocalHistory {
|
export class MemLocalHistory implements ILocalHistory {
|
||||||
private items: ContentMessage[] = [];
|
private items: ContentMessage[] = [];
|
||||||
|
private readonly storage?: PersistentStorage;
|
||||||
|
private readonly maxSize: number;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a new in-memory local history
|
* Construct a new in-memory local history.
|
||||||
*
|
*
|
||||||
* @param maxLength The maximum number of message to store.
|
* @param opts Configuration object.
|
||||||
|
* - storage: Optional persistent storage backend for message persistence or channelId to use with PersistentStorage.
|
||||||
|
* - maxSize: The maximum number of messages to store. Optional, defaults to DEFAULT_MAX_LENGTH.
|
||||||
*/
|
*/
|
||||||
public constructor(private maxLength: number = DEFAULT_MAX_LENGTH) {}
|
public constructor(
|
||||||
|
opts: { storage?: ChannelId | PersistentStorage; maxSize?: number } = {}
|
||||||
|
) {
|
||||||
|
const { storage, maxSize } = opts;
|
||||||
|
this.maxSize = maxSize ?? DEFAULT_MAX_LENGTH;
|
||||||
|
if (storage instanceof PersistentStorage) {
|
||||||
|
this.storage = storage;
|
||||||
|
} else if (typeof storage === "string") {
|
||||||
|
this.storage = PersistentStorage.create(storage);
|
||||||
|
} else {
|
||||||
|
this.storage = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.load();
|
||||||
|
}
|
||||||
|
|
||||||
public get length(): number {
|
public get length(): number {
|
||||||
return this.items.length;
|
return this.items.length;
|
||||||
@ -77,11 +96,13 @@ export class MemLocalHistory implements ILocalHistory {
|
|||||||
this.items = _.uniqBy(combinedItems, "messageId");
|
this.items = _.uniqBy(combinedItems, "messageId");
|
||||||
|
|
||||||
// Let's drop older messages if max length is reached
|
// Let's drop older messages if max length is reached
|
||||||
if (this.length > this.maxLength) {
|
if (this.length > this.maxSize) {
|
||||||
const numItemsToRemove = this.length - this.maxLength;
|
const numItemsToRemove = this.length - this.maxSize;
|
||||||
this.items.splice(0, numItemsToRemove);
|
this.items.splice(0, numItemsToRemove);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.save();
|
||||||
|
|
||||||
return this.items.length;
|
return this.items.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,4 +150,19 @@ export class MemLocalHistory implements ILocalHistory {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private save(): void {
|
||||||
|
this.storage?.save(this.items);
|
||||||
|
}
|
||||||
|
|
||||||
|
private load(): void {
|
||||||
|
if (!this.storage) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const messages = this.storage.load();
|
||||||
|
if (messages.length > 0) {
|
||||||
|
this.items = messages;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
|
|||||||
|
|
||||||
import { MessageChannelEvent } from "./events.js";
|
import { MessageChannelEvent } from "./events.js";
|
||||||
import { MemLocalHistory } from "./mem_local_history.js";
|
import { MemLocalHistory } from "./mem_local_history.js";
|
||||||
|
import { ILocalHistory } from "./mem_local_history.js";
|
||||||
import {
|
import {
|
||||||
ContentMessage,
|
ContentMessage,
|
||||||
HistoryEntry,
|
HistoryEntry,
|
||||||
@ -14,7 +15,6 @@ import {
|
|||||||
} from "./message.js";
|
} from "./message.js";
|
||||||
import {
|
import {
|
||||||
DEFAULT_BLOOM_FILTER_OPTIONS,
|
DEFAULT_BLOOM_FILTER_OPTIONS,
|
||||||
ILocalHistory,
|
|
||||||
MessageChannel
|
MessageChannel
|
||||||
} from "./message_channel.js";
|
} from "./message_channel.js";
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
|
|||||||
|
|
||||||
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
|
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
|
||||||
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
|
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
|
||||||
|
import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js";
|
||||||
import {
|
import {
|
||||||
ChannelId,
|
ChannelId,
|
||||||
ContentMessage,
|
ContentMessage,
|
||||||
@ -20,7 +21,6 @@ import {
|
|||||||
ParticipantId,
|
ParticipantId,
|
||||||
SyncMessage
|
SyncMessage
|
||||||
} from "./message.js";
|
} from "./message.js";
|
||||||
import { PersistentHistory } from "./persistent_history.js";
|
|
||||||
import { RepairConfig, RepairManager } from "./repair/repair.js";
|
import { RepairConfig, RepairManager } from "./repair/repair.js";
|
||||||
|
|
||||||
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
||||||
@ -63,11 +63,6 @@ export interface MessageChannelOptions {
|
|||||||
repairConfig?: RepairConfig;
|
repairConfig?: RepairConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type ILocalHistory = Pick<
|
|
||||||
Array<ContentMessage>,
|
|
||||||
"some" | "push" | "slice" | "find" | "length" | "findIndex"
|
|
||||||
>;
|
|
||||||
|
|
||||||
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||||
public readonly channelId: ChannelId;
|
public readonly channelId: ChannelId;
|
||||||
public readonly senderId: ParticipantId;
|
public readonly senderId: ParticipantId;
|
||||||
@ -118,7 +113,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
|||||||
this.possibleAcks = new Map();
|
this.possibleAcks = new Map();
|
||||||
this.incomingBuffer = [];
|
this.incomingBuffer = [];
|
||||||
this.localHistory =
|
this.localHistory =
|
||||||
localHistory ?? new PersistentHistory({ channelId: this.channelId });
|
localHistory ?? new MemLocalHistory({ storage: channelId });
|
||||||
this.causalHistorySize =
|
this.causalHistorySize =
|
||||||
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
|
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
|
||||||
// TODO: this should be determined based on the bloom filter parameters and number of hashes
|
// TODO: this should be determined based on the bloom filter parameters and number of hashes
|
||||||
|
|||||||
@ -1,99 +0,0 @@
|
|||||||
import { expect } from "chai";
|
|
||||||
|
|
||||||
import { ContentMessage } from "./message.js";
|
|
||||||
import { HistoryStorage, PersistentHistory } from "./persistent_history.js";
|
|
||||||
|
|
||||||
class MemoryStorage implements HistoryStorage {
|
|
||||||
private readonly store = new Map<string, string>();
|
|
||||||
|
|
||||||
public getItem(key: string): string | null {
|
|
||||||
return this.store.get(key) ?? null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public setItem(key: string, value: string): void {
|
|
||||||
this.store.set(key, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
public removeItem(key: string): void {
|
|
||||||
this.store.delete(key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const channelId = "channel-1";
|
|
||||||
|
|
||||||
const createMessage = (id: string, timestamp: number): ContentMessage => {
|
|
||||||
return new ContentMessage(
|
|
||||||
id,
|
|
||||||
channelId,
|
|
||||||
"sender",
|
|
||||||
[],
|
|
||||||
BigInt(timestamp),
|
|
||||||
undefined,
|
|
||||||
new Uint8Array([timestamp]),
|
|
||||||
undefined
|
|
||||||
);
|
|
||||||
};
|
|
||||||
|
|
||||||
describe("PersistentHistory", () => {
|
|
||||||
it("persists and restores messages", () => {
|
|
||||||
const storage = new MemoryStorage();
|
|
||||||
const history = new PersistentHistory({ channelId, storage });
|
|
||||||
|
|
||||||
history.push(createMessage("msg-1", 1));
|
|
||||||
history.push(createMessage("msg-2", 2));
|
|
||||||
|
|
||||||
const restored = new PersistentHistory({ channelId, storage });
|
|
||||||
|
|
||||||
expect(restored.length).to.equal(2);
|
|
||||||
expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([
|
|
||||||
"msg-1",
|
|
||||||
"msg-2"
|
|
||||||
]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("behaves like memory history when storage is unavailable", () => {
|
|
||||||
const history = new PersistentHistory({ channelId, storage: undefined });
|
|
||||||
|
|
||||||
history.push(createMessage("msg-3", 3));
|
|
||||||
|
|
||||||
expect(history.length).to.equal(1);
|
|
||||||
expect(history.slice(0)[0].messageId).to.equal("msg-3");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("handles corrupt data in storage gracefully", () => {
|
|
||||||
const storage = new MemoryStorage();
|
|
||||||
storage.setItem("waku:sds:history:channel-1", "{ invalid json }");
|
|
||||||
|
|
||||||
const history = new PersistentHistory({ channelId, storage });
|
|
||||||
expect(history.length).to.equal(0);
|
|
||||||
|
|
||||||
// Local history should be empty
|
|
||||||
expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("isolates history by channel ID", () => {
|
|
||||||
const storage = new MemoryStorage();
|
|
||||||
|
|
||||||
const history1 = new PersistentHistory({
|
|
||||||
channelId: "channel-1",
|
|
||||||
storage
|
|
||||||
});
|
|
||||||
const history2 = new PersistentHistory({
|
|
||||||
channelId: "channel-2",
|
|
||||||
storage
|
|
||||||
});
|
|
||||||
|
|
||||||
history1.push(createMessage("msg-1", 1));
|
|
||||||
history2.push(createMessage("msg-2", 2));
|
|
||||||
|
|
||||||
// Each channel should only see its own messages
|
|
||||||
expect(history1.length).to.equal(1);
|
|
||||||
expect(history1.slice(0)[0].messageId).to.equal("msg-1");
|
|
||||||
|
|
||||||
expect(history2.length).to.equal(1);
|
|
||||||
expect(history2.slice(0)[0].messageId).to.equal("msg-2");
|
|
||||||
|
|
||||||
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
|
|
||||||
expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@ -1,208 +0,0 @@
|
|||||||
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
|
|
||||||
import { Logger } from "@waku/utils";
|
|
||||||
|
|
||||||
import { ILocalHistory, MemLocalHistory } from "./mem_local_history.js";
|
|
||||||
import { ChannelId, ContentMessage, HistoryEntry } from "./message.js";
|
|
||||||
|
|
||||||
const log = new Logger("sds:persistent-history");
|
|
||||||
|
|
||||||
export interface HistoryStorage {
|
|
||||||
getItem(key: string): string | null;
|
|
||||||
setItem(key: string, value: string): void;
|
|
||||||
removeItem(key: string): void;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface PersistentHistoryOptions {
|
|
||||||
channelId: ChannelId;
|
|
||||||
storage?: HistoryStorage;
|
|
||||||
}
|
|
||||||
|
|
||||||
type StoredHistoryEntry = {
|
|
||||||
messageId: string;
|
|
||||||
retrievalHint?: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
type StoredContentMessage = {
|
|
||||||
messageId: string;
|
|
||||||
channelId: string;
|
|
||||||
senderId: string;
|
|
||||||
lamportTimestamp: string;
|
|
||||||
causalHistory: StoredHistoryEntry[];
|
|
||||||
bloomFilter?: string;
|
|
||||||
content: string;
|
|
||||||
retrievalHint?: string;
|
|
||||||
};
|
|
||||||
|
|
||||||
const HISTORY_STORAGE_PREFIX = "waku:sds:history:";
|
|
||||||
const DEFAULT_HISTORY_STORAGE: HistoryStorage | undefined =
|
|
||||||
typeof localStorage !== "undefined" ? localStorage : undefined;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Persists the SDS local history in a browser/localStorage compatible backend.
|
|
||||||
*
|
|
||||||
* If no storage backend is available, this behaves like {@link MemLocalHistory}.
|
|
||||||
*/
|
|
||||||
export class PersistentHistory implements ILocalHistory {
|
|
||||||
private readonly storage?: HistoryStorage;
|
|
||||||
private readonly storageKey: string;
|
|
||||||
private readonly memory: MemLocalHistory;
|
|
||||||
|
|
||||||
public constructor(options: PersistentHistoryOptions) {
|
|
||||||
this.memory = new MemLocalHistory();
|
|
||||||
this.storage = options.storage ?? DEFAULT_HISTORY_STORAGE;
|
|
||||||
this.storageKey = `${HISTORY_STORAGE_PREFIX}${options.channelId}`;
|
|
||||||
|
|
||||||
if (!this.storage) {
|
|
||||||
log.warn(
|
|
||||||
"Storage backend unavailable (localStorage not found). Falling back to in-memory history. Messages will not persist across sessions."
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.load();
|
|
||||||
}
|
|
||||||
|
|
||||||
public get length(): number {
|
|
||||||
return this.memory.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
public push(...items: ContentMessage[]): number {
|
|
||||||
const length = this.memory.push(...items);
|
|
||||||
this.save();
|
|
||||||
return length;
|
|
||||||
}
|
|
||||||
|
|
||||||
public some(
|
|
||||||
predicate: (
|
|
||||||
value: ContentMessage,
|
|
||||||
index: number,
|
|
||||||
array: ContentMessage[]
|
|
||||||
) => unknown,
|
|
||||||
thisArg?: any
|
|
||||||
): boolean {
|
|
||||||
return this.memory.some(predicate, thisArg);
|
|
||||||
}
|
|
||||||
|
|
||||||
public slice(start?: number, end?: number): ContentMessage[] {
|
|
||||||
return this.memory.slice(start, end);
|
|
||||||
}
|
|
||||||
|
|
||||||
public find(
|
|
||||||
predicate: (
|
|
||||||
value: ContentMessage,
|
|
||||||
index: number,
|
|
||||||
obj: ContentMessage[]
|
|
||||||
) => unknown,
|
|
||||||
thisArg?: any
|
|
||||||
): ContentMessage | undefined {
|
|
||||||
return this.memory.find(predicate, thisArg);
|
|
||||||
}
|
|
||||||
|
|
||||||
public findIndex(
|
|
||||||
predicate: (
|
|
||||||
value: ContentMessage,
|
|
||||||
index: number,
|
|
||||||
obj: ContentMessage[]
|
|
||||||
) => unknown,
|
|
||||||
thisArg?: any
|
|
||||||
): number {
|
|
||||||
return this.memory.findIndex(predicate, thisArg);
|
|
||||||
}
|
|
||||||
|
|
||||||
private save(): void {
|
|
||||||
if (!this.storage) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const payload = JSON.stringify(
|
|
||||||
this.memory.slice(0).map(serializeContentMessage)
|
|
||||||
);
|
|
||||||
this.storage.setItem(this.storageKey, payload);
|
|
||||||
}
|
|
||||||
|
|
||||||
private load(): void {
|
|
||||||
if (!this.storage) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const raw = this.storage.getItem(this.storageKey);
|
|
||||||
if (!raw) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const stored = JSON.parse(raw) as StoredContentMessage[];
|
|
||||||
const messages = stored
|
|
||||||
.map(deserializeContentMessage)
|
|
||||||
.filter((message): message is ContentMessage => Boolean(message));
|
|
||||||
if (messages.length) {
|
|
||||||
this.memory.push(...messages);
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
this.storage.removeItem(this.storageKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({
|
|
||||||
messageId: entry.messageId,
|
|
||||||
retrievalHint: entry.retrievalHint
|
|
||||||
? bytesToHex(entry.retrievalHint)
|
|
||||||
: undefined
|
|
||||||
});
|
|
||||||
|
|
||||||
const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({
|
|
||||||
messageId: entry.messageId,
|
|
||||||
retrievalHint: entry.retrievalHint
|
|
||||||
? hexToBytes(entry.retrievalHint)
|
|
||||||
: undefined
|
|
||||||
});
|
|
||||||
|
|
||||||
const serializeContentMessage = (
|
|
||||||
message: ContentMessage
|
|
||||||
): StoredContentMessage => ({
|
|
||||||
messageId: message.messageId,
|
|
||||||
channelId: message.channelId,
|
|
||||||
senderId: message.senderId,
|
|
||||||
lamportTimestamp: message.lamportTimestamp.toString(),
|
|
||||||
causalHistory: message.causalHistory.map(serializeHistoryEntry),
|
|
||||||
bloomFilter: toHex(message.bloomFilter),
|
|
||||||
content: bytesToHex(new Uint8Array(message.content)),
|
|
||||||
retrievalHint: toHex(message.retrievalHint)
|
|
||||||
});
|
|
||||||
|
|
||||||
const deserializeContentMessage = (
|
|
||||||
record: StoredContentMessage
|
|
||||||
): ContentMessage | undefined => {
|
|
||||||
try {
|
|
||||||
const content = hexToBytes(record.content);
|
|
||||||
return new ContentMessage(
|
|
||||||
record.messageId,
|
|
||||||
record.channelId,
|
|
||||||
record.senderId,
|
|
||||||
record.causalHistory.map(deserializeHistoryEntry),
|
|
||||||
BigInt(record.lamportTimestamp),
|
|
||||||
fromHex(record.bloomFilter),
|
|
||||||
content,
|
|
||||||
[],
|
|
||||||
fromHex(record.retrievalHint)
|
|
||||||
);
|
|
||||||
} catch {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const toHex = (
|
|
||||||
data?: Uint8Array | Uint8Array<ArrayBufferLike>
|
|
||||||
): string | undefined => {
|
|
||||||
if (!data || data.length === 0) {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data));
|
|
||||||
};
|
|
||||||
|
|
||||||
const fromHex = (value?: string): Uint8Array | undefined => {
|
|
||||||
if (!value) {
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
return hexToBytes(value);
|
|
||||||
};
|
|
||||||
198
packages/sds/src/message_channel/persistent_storage.spec.ts
Normal file
198
packages/sds/src/message_channel/persistent_storage.spec.ts
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import { MemLocalHistory } from "./mem_local_history.js";
|
||||||
|
import { ContentMessage } from "./message.js";
|
||||||
|
import { HistoryStorage, PersistentStorage } from "./persistent_storage.js";
|
||||||
|
|
||||||
|
const channelId = "channel-1";
|
||||||
|
|
||||||
|
describe("PersistentStorage", () => {
|
||||||
|
describe("Explicit storage", () => {
|
||||||
|
it("persists and restores messages", () => {
|
||||||
|
const storage = new MemoryStorage();
|
||||||
|
const persistentStorage = PersistentStorage.create(channelId, storage);
|
||||||
|
|
||||||
|
expect(persistentStorage).to.not.be.undefined;
|
||||||
|
|
||||||
|
const history1 = new MemLocalHistory({ storage: persistentStorage });
|
||||||
|
history1.push(createMessage("msg-1", 1));
|
||||||
|
history1.push(createMessage("msg-2", 2));
|
||||||
|
|
||||||
|
const history2 = new MemLocalHistory({ storage: persistentStorage });
|
||||||
|
|
||||||
|
expect(history2.length).to.equal(2);
|
||||||
|
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
|
||||||
|
"msg-1",
|
||||||
|
"msg-2"
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses in-memory only when no storage is provided", () => {
|
||||||
|
const history = new MemLocalHistory({ maxSize: 100 });
|
||||||
|
history.push(createMessage("msg-3", 3));
|
||||||
|
|
||||||
|
expect(history.length).to.equal(1);
|
||||||
|
expect(history.slice(0)[0].messageId).to.equal("msg-3");
|
||||||
|
|
||||||
|
const history2 = new MemLocalHistory({ maxSize: 100 });
|
||||||
|
expect(history2.length).to.equal(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles corrupt data in storage gracefully", () => {
|
||||||
|
const storage = new MemoryStorage();
|
||||||
|
// Corrupt data
|
||||||
|
storage.setItem("waku:sds:history:channel-1", "{ invalid json }");
|
||||||
|
|
||||||
|
const persistentStorage = PersistentStorage.create(channelId, storage);
|
||||||
|
const history = new MemLocalHistory({ storage: persistentStorage });
|
||||||
|
|
||||||
|
expect(history.length).to.equal(0);
|
||||||
|
|
||||||
|
// Corrupt data is not saved
|
||||||
|
expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("isolates history by channel ID", () => {
|
||||||
|
const storage = new MemoryStorage();
|
||||||
|
|
||||||
|
const storage1 = PersistentStorage.create("channel-1", storage);
|
||||||
|
const storage2 = PersistentStorage.create("channel-2", storage);
|
||||||
|
|
||||||
|
const history1 = new MemLocalHistory({ storage: storage1 });
|
||||||
|
const history2 = new MemLocalHistory({ storage: storage2 });
|
||||||
|
|
||||||
|
history1.push(createMessage("msg-1", 1));
|
||||||
|
history2.push(createMessage("msg-2", 2));
|
||||||
|
|
||||||
|
expect(history1.length).to.equal(1);
|
||||||
|
expect(history1.slice(0)[0].messageId).to.equal("msg-1");
|
||||||
|
|
||||||
|
expect(history2.length).to.equal(1);
|
||||||
|
expect(history2.slice(0)[0].messageId).to.equal("msg-2");
|
||||||
|
|
||||||
|
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
|
||||||
|
expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("saves messages after each push", () => {
|
||||||
|
const storage = new MemoryStorage();
|
||||||
|
const persistentStorage = PersistentStorage.create(channelId, storage);
|
||||||
|
const history = new MemLocalHistory({ storage: persistentStorage });
|
||||||
|
|
||||||
|
expect(storage.getItem("waku:sds:history:channel-1")).to.be.null;
|
||||||
|
|
||||||
|
history.push(createMessage("msg-1", 1));
|
||||||
|
|
||||||
|
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
|
||||||
|
|
||||||
|
const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!);
|
||||||
|
expect(saved).to.have.lengthOf(1);
|
||||||
|
expect(saved[0].messageId).to.equal("msg-1");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("loads messages on initialization", () => {
|
||||||
|
const storage = new MemoryStorage();
|
||||||
|
const persistentStorage1 = PersistentStorage.create(channelId, storage);
|
||||||
|
const history1 = new MemLocalHistory({ storage: persistentStorage1 });
|
||||||
|
|
||||||
|
history1.push(createMessage("msg-1", 1));
|
||||||
|
history1.push(createMessage("msg-2", 2));
|
||||||
|
history1.push(createMessage("msg-3", 3));
|
||||||
|
|
||||||
|
const persistentStorage2 = PersistentStorage.create(channelId, storage);
|
||||||
|
const history2 = new MemLocalHistory({ storage: persistentStorage2 });
|
||||||
|
|
||||||
|
expect(history2.length).to.equal(3);
|
||||||
|
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
|
||||||
|
"msg-1",
|
||||||
|
"msg-2",
|
||||||
|
"msg-3"
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Node.js only (no localStorage)", () => {
|
||||||
|
before(function () {
|
||||||
|
if (typeof localStorage !== "undefined") {
|
||||||
|
this.skip();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns undefined when no storage is available", () => {
|
||||||
|
const persistentStorage = PersistentStorage.create(channelId, undefined);
|
||||||
|
|
||||||
|
expect(persistentStorage).to.equal(undefined);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Browser only (localStorage)", () => {
|
||||||
|
before(function () {
|
||||||
|
if (typeof localStorage === "undefined") {
|
||||||
|
this.skip();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("persists and restores messages with channelId", () => {
|
||||||
|
const testChannelId = `test-${Date.now()}`;
|
||||||
|
const history1 = new MemLocalHistory({ storage: testChannelId });
|
||||||
|
history1.push(createMessage("msg-1", 1));
|
||||||
|
history1.push(createMessage("msg-2", 2));
|
||||||
|
|
||||||
|
const history2 = new MemLocalHistory({ storage: testChannelId });
|
||||||
|
|
||||||
|
expect(history2.length).to.equal(2);
|
||||||
|
expect(history2.slice(0).map((msg) => msg.messageId)).to.deep.equal([
|
||||||
|
"msg-1",
|
||||||
|
"msg-2"
|
||||||
|
]);
|
||||||
|
|
||||||
|
localStorage.removeItem(`waku:sds:history:${testChannelId}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("auto-uses localStorage when channelId is provided", () => {
|
||||||
|
const testChannelId = `auto-storage-${Date.now()}`;
|
||||||
|
|
||||||
|
const history = new MemLocalHistory({ storage: testChannelId });
|
||||||
|
history.push(createMessage("msg-auto-1", 1));
|
||||||
|
history.push(createMessage("msg-auto-2", 2));
|
||||||
|
|
||||||
|
const history2 = new MemLocalHistory({ storage: testChannelId });
|
||||||
|
expect(history2.length).to.equal(2);
|
||||||
|
expect(history2.slice(0).map((m) => m.messageId)).to.deep.equal([
|
||||||
|
"msg-auto-1",
|
||||||
|
"msg-auto-2"
|
||||||
|
]);
|
||||||
|
|
||||||
|
localStorage.removeItem(`waku:sds:history:${testChannelId}`);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const createMessage = (id: string, timestamp: number): ContentMessage => {
|
||||||
|
return new ContentMessage(
|
||||||
|
id,
|
||||||
|
channelId,
|
||||||
|
"sender",
|
||||||
|
[],
|
||||||
|
BigInt(timestamp),
|
||||||
|
undefined,
|
||||||
|
new Uint8Array([timestamp]),
|
||||||
|
undefined
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
class MemoryStorage implements HistoryStorage {
|
||||||
|
private readonly store = new Map<string, string>();
|
||||||
|
|
||||||
|
public getItem(key: string): string | null {
|
||||||
|
return this.store.get(key) ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public setItem(key: string, value: string): void {
|
||||||
|
this.store.set(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public removeItem(key: string): void {
|
||||||
|
this.store.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
173
packages/sds/src/message_channel/persistent_storage.ts
Normal file
173
packages/sds/src/message_channel/persistent_storage.ts
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
|
||||||
|
import { Logger } from "@waku/utils";
|
||||||
|
|
||||||
|
import { ChannelId, ContentMessage, HistoryEntry } from "./message.js";
|
||||||
|
|
||||||
|
const log = new Logger("sds:persistent-storage");
|
||||||
|
|
||||||
|
const HISTORY_STORAGE_PREFIX = "waku:sds:history:";
|
||||||
|
|
||||||
|
export interface HistoryStorage {
|
||||||
|
getItem(key: string): string | null;
|
||||||
|
setItem(key: string, value: string): void;
|
||||||
|
removeItem(key: string): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
type StoredHistoryEntry = {
|
||||||
|
messageId: string;
|
||||||
|
retrievalHint?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type StoredContentMessage = {
|
||||||
|
messageId: string;
|
||||||
|
channelId: string;
|
||||||
|
senderId: string;
|
||||||
|
lamportTimestamp: string;
|
||||||
|
causalHistory: StoredHistoryEntry[];
|
||||||
|
bloomFilter?: string;
|
||||||
|
content: string;
|
||||||
|
retrievalHint?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Persistent storage for message history.
|
||||||
|
*/
|
||||||
|
export class PersistentStorage {
|
||||||
|
private readonly storageKey: string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a PersistentStorage for a channel, or returns undefined if no storage is available.
|
||||||
|
* If no storage is provided, attempts to use global localStorage (if available).
|
||||||
|
* Returns undefined if no storage is available.
|
||||||
|
*/
|
||||||
|
public static create(
|
||||||
|
channelId: ChannelId,
|
||||||
|
storage?: HistoryStorage
|
||||||
|
): PersistentStorage | undefined {
|
||||||
|
storage =
|
||||||
|
storage ??
|
||||||
|
(typeof localStorage !== "undefined" ? localStorage : undefined);
|
||||||
|
if (!storage) {
|
||||||
|
log.info(
|
||||||
|
`No storage available. Messages will not persist across sessions.
|
||||||
|
If you're using NodeJS, you can provide a storage backend using the storage parameter.`
|
||||||
|
);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return new PersistentStorage(channelId, storage);
|
||||||
|
}
|
||||||
|
|
||||||
|
private constructor(
|
||||||
|
channelId: ChannelId,
|
||||||
|
private readonly storage: HistoryStorage
|
||||||
|
) {
|
||||||
|
this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
public save(messages: ContentMessage[]): void {
|
||||||
|
try {
|
||||||
|
const payload = JSON.stringify(
|
||||||
|
messages.map((msg) => MessageSerializer.serializeContentMessage(msg))
|
||||||
|
);
|
||||||
|
this.storage.setItem(this.storageKey, payload);
|
||||||
|
} catch (error) {
|
||||||
|
log.error("Failed to save messages to storage:", error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public load(): ContentMessage[] {
|
||||||
|
try {
|
||||||
|
const raw = this.storage.getItem(this.storageKey);
|
||||||
|
if (!raw) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const stored = JSON.parse(raw) as StoredContentMessage[];
|
||||||
|
return stored
|
||||||
|
.map((record) => MessageSerializer.deserializeContentMessage(record))
|
||||||
|
.filter((message): message is ContentMessage => Boolean(message));
|
||||||
|
} catch (error) {
|
||||||
|
log.error("Failed to load messages from storage:", error);
|
||||||
|
this.storage.removeItem(this.storageKey);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MessageSerializer {
|
||||||
|
public static serializeContentMessage(
|
||||||
|
message: ContentMessage
|
||||||
|
): StoredContentMessage {
|
||||||
|
return {
|
||||||
|
messageId: message.messageId,
|
||||||
|
channelId: message.channelId,
|
||||||
|
senderId: message.senderId,
|
||||||
|
lamportTimestamp: message.lamportTimestamp.toString(),
|
||||||
|
causalHistory: message.causalHistory.map((entry) =>
|
||||||
|
MessageSerializer.serializeHistoryEntry(entry)
|
||||||
|
),
|
||||||
|
bloomFilter: MessageSerializer.toHex(message.bloomFilter),
|
||||||
|
content: bytesToHex(new Uint8Array(message.content)),
|
||||||
|
retrievalHint: MessageSerializer.toHex(message.retrievalHint)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static deserializeContentMessage(
|
||||||
|
record: StoredContentMessage
|
||||||
|
): ContentMessage | undefined {
|
||||||
|
try {
|
||||||
|
const content = hexToBytes(record.content);
|
||||||
|
return new ContentMessage(
|
||||||
|
record.messageId,
|
||||||
|
record.channelId,
|
||||||
|
record.senderId,
|
||||||
|
record.causalHistory.map((entry) =>
|
||||||
|
MessageSerializer.deserializeHistoryEntry(entry)
|
||||||
|
),
|
||||||
|
BigInt(record.lamportTimestamp),
|
||||||
|
MessageSerializer.fromHex(record.bloomFilter),
|
||||||
|
content,
|
||||||
|
[],
|
||||||
|
MessageSerializer.fromHex(record.retrievalHint)
|
||||||
|
);
|
||||||
|
} catch {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry {
|
||||||
|
return {
|
||||||
|
messageId: entry.messageId,
|
||||||
|
retrievalHint: entry.retrievalHint
|
||||||
|
? bytesToHex(entry.retrievalHint)
|
||||||
|
: undefined
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static deserializeHistoryEntry(
|
||||||
|
entry: StoredHistoryEntry
|
||||||
|
): HistoryEntry {
|
||||||
|
return {
|
||||||
|
messageId: entry.messageId,
|
||||||
|
retrievalHint: entry.retrievalHint
|
||||||
|
? hexToBytes(entry.retrievalHint)
|
||||||
|
: undefined
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static toHex(
|
||||||
|
data?: Uint8Array | Uint8Array<ArrayBufferLike>
|
||||||
|
): string | undefined {
|
||||||
|
if (!data || data.length === 0) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static fromHex(value?: string): Uint8Array | undefined {
|
||||||
|
if (!value) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
return hexToBytes(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,8 +1,8 @@
|
|||||||
import { Logger } from "@waku/utils";
|
import { Logger } from "@waku/utils";
|
||||||
|
|
||||||
|
import type { ILocalHistory } from "../mem_local_history.js";
|
||||||
import type { HistoryEntry, MessageId } from "../message.js";
|
import type { HistoryEntry, MessageId } from "../message.js";
|
||||||
import { Message } from "../message.js";
|
import { Message } from "../message.js";
|
||||||
import type { ILocalHistory } from "../message_channel.js";
|
|
||||||
|
|
||||||
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
|
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
|
||||||
import {
|
import {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user