mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
feat: persistent history for SDS
This commit is contained in:
parent
f2ad23ad43
commit
dad9cffdbe
@ -7,7 +7,6 @@ import { DefaultBloomFilter } from "../bloom_filter/bloom.js";
|
||||
|
||||
import { Command, Handlers, ParamsByAction, Task } from "./command_queue.js";
|
||||
import { MessageChannelEvent, MessageChannelEvents } from "./events.js";
|
||||
import { MemLocalHistory } from "./mem_local_history.js";
|
||||
import {
|
||||
ChannelId,
|
||||
ContentMessage,
|
||||
@ -21,6 +20,7 @@ import {
|
||||
ParticipantId,
|
||||
SyncMessage
|
||||
} from "./message.js";
|
||||
import { PersistentHistory } from "./persistent_history.js";
|
||||
import { RepairConfig, RepairManager } from "./repair/repair.js";
|
||||
|
||||
export const DEFAULT_BLOOM_FILTER_OPTIONS = {
|
||||
@ -106,7 +106,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
channelId: ChannelId,
|
||||
senderId: ParticipantId,
|
||||
options: MessageChannelOptions = {},
|
||||
localHistory: ILocalHistory = new MemLocalHistory()
|
||||
localHistory?: ILocalHistory
|
||||
) {
|
||||
super();
|
||||
this.channelId = channelId;
|
||||
@ -117,7 +117,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
this.outgoingBuffer = [];
|
||||
this.possibleAcks = new Map();
|
||||
this.incomingBuffer = [];
|
||||
this.localHistory = localHistory;
|
||||
const resolvedLocalHistory =
|
||||
localHistory ?? new PersistentHistory({ channelId: this.channelId });
|
||||
this.localHistory = resolvedLocalHistory;
|
||||
this.causalHistorySize =
|
||||
options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE;
|
||||
// TODO: this should be determined based on the bloom filter parameters and number of hashes
|
||||
|
||||
62
packages/sds/src/message_channel/persistent_history.spec.ts
Normal file
62
packages/sds/src/message_channel/persistent_history.spec.ts
Normal file
@ -0,0 +1,62 @@
|
||||
import { expect } from "chai";
|
||||
|
||||
import { ContentMessage } from "./message.js";
|
||||
import { HistoryStorage, PersistentHistory } from "./persistent_history.js";
|
||||
|
||||
class MemoryStorage implements HistoryStorage {
|
||||
private readonly store = new Map<string, string>();
|
||||
|
||||
public getItem(key: string): string | null {
|
||||
return this.store.get(key) ?? null;
|
||||
}
|
||||
|
||||
public setItem(key: string, value: string): void {
|
||||
this.store.set(key, value);
|
||||
}
|
||||
|
||||
public removeItem(key: string): void {
|
||||
this.store.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
const channelId = "channel-1";
|
||||
|
||||
const createMessage = (id: string, timestamp: number): ContentMessage => {
|
||||
return new ContentMessage(
|
||||
id,
|
||||
channelId,
|
||||
"sender",
|
||||
[],
|
||||
BigInt(timestamp),
|
||||
undefined,
|
||||
new Uint8Array([timestamp]),
|
||||
undefined
|
||||
);
|
||||
};
|
||||
|
||||
describe("PersistentHistory", () => {
|
||||
it("persists and restores messages", () => {
|
||||
const storage = new MemoryStorage();
|
||||
const history = new PersistentHistory({ channelId, storage });
|
||||
|
||||
history.push(createMessage("msg-1", 1));
|
||||
history.push(createMessage("msg-2", 2));
|
||||
|
||||
const restored = new PersistentHistory({ channelId, storage });
|
||||
|
||||
expect(restored.length).to.equal(2);
|
||||
expect(restored.slice(0).map((msg) => msg.messageId)).to.deep.equal([
|
||||
"msg-1",
|
||||
"msg-2"
|
||||
]);
|
||||
});
|
||||
|
||||
it("behaves like memory history when storage is unavailable", () => {
|
||||
const history = new PersistentHistory({ channelId, storage: undefined });
|
||||
|
||||
history.push(createMessage("msg-3", 3));
|
||||
|
||||
expect(history.length).to.equal(1);
|
||||
expect(history.slice(0)[0].messageId).to.equal("msg-3");
|
||||
});
|
||||
});
|
||||
177
packages/sds/src/message_channel/persistent_history.ts
Normal file
177
packages/sds/src/message_channel/persistent_history.ts
Normal file
@ -0,0 +1,177 @@
|
||||
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
|
||||
|
||||
import { MemLocalHistory } from "./mem_local_history.js";
|
||||
import { ChannelId, ContentMessage, HistoryEntry } from "./message.js";
|
||||
|
||||
export interface HistoryStorage {
|
||||
getItem(key: string): string | null;
|
||||
setItem(key: string, value: string): void;
|
||||
removeItem(key: string): void;
|
||||
}
|
||||
|
||||
export interface PersistentHistoryOptions {
|
||||
channelId: ChannelId;
|
||||
storage?: HistoryStorage;
|
||||
storageKey?: string;
|
||||
}
|
||||
|
||||
type StoredHistoryEntry = {
|
||||
messageId: string;
|
||||
retrievalHint?: string;
|
||||
};
|
||||
|
||||
type StoredContentMessage = {
|
||||
messageId: string;
|
||||
channelId: string;
|
||||
senderId: string;
|
||||
lamportTimestamp: string;
|
||||
causalHistory: StoredHistoryEntry[];
|
||||
bloomFilter?: string;
|
||||
content: string;
|
||||
retrievalHint?: string;
|
||||
};
|
||||
|
||||
const HISTORY_STORAGE_PREFIX = "waku:sds:history:";
|
||||
|
||||
/**
|
||||
* Persists the SDS local history in a browser/localStorage compatible backend.
|
||||
*
|
||||
* If no storage backend is available, this behaves like {@link MemLocalHistory}.
|
||||
*/
|
||||
export class PersistentHistory extends MemLocalHistory {
|
||||
private readonly storage?: HistoryStorage;
|
||||
private readonly storageKey: string;
|
||||
|
||||
public constructor(options: PersistentHistoryOptions) {
|
||||
super();
|
||||
this.storage = options.storage ?? getDefaultHistoryStorage();
|
||||
this.storageKey =
|
||||
options.storageKey ?? `${HISTORY_STORAGE_PREFIX}${options.channelId}`;
|
||||
this.restore();
|
||||
}
|
||||
|
||||
public override push(...items: ContentMessage[]): number {
|
||||
const length = super.push(...items);
|
||||
this.persist();
|
||||
return length;
|
||||
}
|
||||
|
||||
private persist(): void {
|
||||
if (!this.storage) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const payload = JSON.stringify(
|
||||
this.slice(0).map(serializeContentMessage)
|
||||
);
|
||||
this.storage.setItem(this.storageKey, payload);
|
||||
} catch {
|
||||
// Ignore persistence errors (e.g. quota exceeded).
|
||||
}
|
||||
}
|
||||
|
||||
private restore(): void {
|
||||
if (!this.storage) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const raw = this.storage.getItem(this.storageKey);
|
||||
if (!raw) {
|
||||
return;
|
||||
}
|
||||
|
||||
const stored = JSON.parse(raw) as StoredContentMessage[];
|
||||
const messages = stored
|
||||
.map(deserializeContentMessage)
|
||||
.filter((message): message is ContentMessage => Boolean(message));
|
||||
if (messages.length) {
|
||||
super.push(...messages);
|
||||
}
|
||||
} catch {
|
||||
try {
|
||||
this.storage.removeItem(this.storageKey);
|
||||
} catch {
|
||||
// Ignore cleanup errors.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const getDefaultHistoryStorage = (): HistoryStorage | undefined => {
|
||||
try {
|
||||
if (typeof localStorage === "undefined") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const probeKey = `${HISTORY_STORAGE_PREFIX}__probe__`;
|
||||
localStorage.setItem(probeKey, probeKey);
|
||||
localStorage.removeItem(probeKey);
|
||||
return localStorage;
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const serializeHistoryEntry = (entry: HistoryEntry): StoredHistoryEntry => ({
|
||||
messageId: entry.messageId,
|
||||
retrievalHint: entry.retrievalHint
|
||||
? bytesToHex(entry.retrievalHint)
|
||||
: undefined
|
||||
});
|
||||
|
||||
const deserializeHistoryEntry = (entry: StoredHistoryEntry): HistoryEntry => ({
|
||||
messageId: entry.messageId,
|
||||
retrievalHint: entry.retrievalHint
|
||||
? hexToBytes(entry.retrievalHint)
|
||||
: undefined
|
||||
});
|
||||
|
||||
const serializeContentMessage = (
|
||||
message: ContentMessage
|
||||
): StoredContentMessage => ({
|
||||
messageId: message.messageId,
|
||||
channelId: message.channelId,
|
||||
senderId: message.senderId,
|
||||
lamportTimestamp: message.lamportTimestamp.toString(),
|
||||
causalHistory: message.causalHistory.map(serializeHistoryEntry),
|
||||
bloomFilter: toHex(message.bloomFilter),
|
||||
content: bytesToHex(new Uint8Array(message.content)),
|
||||
retrievalHint: toHex(message.retrievalHint)
|
||||
});
|
||||
|
||||
const deserializeContentMessage = (
|
||||
record: StoredContentMessage
|
||||
): ContentMessage | undefined => {
|
||||
try {
|
||||
const content = hexToBytes(record.content);
|
||||
return new ContentMessage(
|
||||
record.messageId,
|
||||
record.channelId,
|
||||
record.senderId,
|
||||
record.causalHistory.map(deserializeHistoryEntry),
|
||||
BigInt(record.lamportTimestamp),
|
||||
fromHex(record.bloomFilter),
|
||||
content,
|
||||
fromHex(record.retrievalHint)
|
||||
);
|
||||
} catch {
|
||||
return undefined;
|
||||
}
|
||||
};
|
||||
|
||||
const toHex = (
|
||||
data?: Uint8Array | Uint8Array<ArrayBufferLike>
|
||||
): string | undefined => {
|
||||
if (!data || data.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
return bytesToHex(data instanceof Uint8Array ? data : new Uint8Array(data));
|
||||
};
|
||||
|
||||
const fromHex = (value?: string): Uint8Array | undefined => {
|
||||
if (!value) {
|
||||
return undefined;
|
||||
}
|
||||
return hexToBytes(value);
|
||||
};
|
||||
Loading…
x
Reference in New Issue
Block a user