diff --git a/packages/sds/src/message_channel/local_history.ts b/packages/sds/src/message_channel/local_history.ts index 86d6502afa..ed4107a2c1 100644 --- a/packages/sds/src/message_channel/local_history.ts +++ b/packages/sds/src/message_channel/local_history.ts @@ -2,7 +2,6 @@ import { Logger } from "@waku/utils"; import _ from "lodash"; import { type ChannelId, ContentMessage, isContentMessage } from "./message.js"; -import { ILocalHistory } from "./message_channel.js"; import { PersistentStorage } from "./persistent_storage.js"; export const DEFAULT_MAX_LENGTH = 10_000; @@ -28,7 +27,7 @@ export type LocalHistoryOptions = { const log = new Logger("sds:local-history"); -export class LocalHistory implements ILocalHistory { +export class LocalHistory { private items: ContentMessage[] = []; private readonly storage?: PersistentStorage; private readonly maxSize: number; diff --git a/packages/sds/src/message_channel/message_channel.spec.ts b/packages/sds/src/message_channel/message_channel.spec.ts index 0c29d02979..e7e37582ad 100644 --- a/packages/sds/src/message_channel/message_channel.spec.ts +++ b/packages/sds/src/message_channel/message_channel.spec.ts @@ -318,7 +318,7 @@ describe("MessageChannel", function () { testRetrievalHint ); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(1); // Find the message in local history @@ -440,7 +440,7 @@ describe("MessageChannel", function () { ) ); - const localHistory = channelA["localHistory"] as ILocalHistory; + const localHistory = channelA["localHistory"] as LocalHistory; expect(localHistory.length).to.equal(2); // When timestamps are equal, should be ordered by messageId lexicographically diff --git a/packages/sds/src/message_channel/message_channel.ts b/packages/sds/src/message_channel/message_channel.ts index 6fda3a3803..07dfff26f6 100644 --- a/packages/sds/src/message_channel/message_channel.ts +++ b/packages/sds/src/message_channel/message_channel.ts @@ -38,11 +38,6 @@ const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2; const log = new Logger("sds:message-channel"); -export type ILocalHistory = Pick< - Array, - "some" | "push" | "slice" | "find" | "length" | "findIndex" ->; - export interface MessageChannelOptions { causalHistorySize?: number; /** @@ -76,7 +71,7 @@ export class MessageChannel extends TypedEventEmitter { private outgoingBuffer: ContentMessage[]; private possibleAcks: Map; private incomingBuffer: Array; - private readonly localHistory: ILocalHistory; + private readonly localHistory: LocalHistory; private timeReceived: Map; private readonly causalHistorySize: number; private readonly possibleAcksThreshold: number; @@ -106,7 +101,7 @@ export class MessageChannel extends TypedEventEmitter { channelId: ChannelId, senderId: ParticipantId, options: MessageChannelOptions = {}, - localHistory?: ILocalHistory + localHistory?: LocalHistory ) { super(); this.channelId = channelId; diff --git a/packages/sds/src/message_channel/persistent_storage.spec.ts b/packages/sds/src/message_channel/persistent_storage.spec.ts index 928de1c5ff..d6b3f43e67 100644 --- a/packages/sds/src/message_channel/persistent_storage.spec.ts +++ b/packages/sds/src/message_channel/persistent_storage.spec.ts @@ -2,7 +2,7 @@ import { expect } from "chai"; import { LocalHistory } from "./local_history.js"; import { ContentMessage } from "./message.js"; -import { HistoryStorage, PersistentStorage } from "./persistent_storage.js"; +import { IStorage, PersistentStorage } from "./persistent_storage.js"; const channelId = "channel-1"; @@ -41,7 +41,7 @@ describe("PersistentStorage", () => { it("handles corrupt data in storage gracefully", () => { const storage = new MemoryStorage(); // Corrupt data - storage.setItem("waku:sds:history:channel-1", "{ invalid json }"); + storage.setItem("waku:sds:messages:channel-1", "{ invalid json }"); const persistentStorage = PersistentStorage.create(channelId, storage); const history = new LocalHistory({ storage: persistentStorage }); @@ -49,7 +49,7 @@ describe("PersistentStorage", () => { expect(history.length).to.equal(0); // Corrupt data is not saved - expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null); + expect(storage.getItem("waku:sds:messages:channel-1")).to.equal(null); }); it("isolates history by channel ID", () => { @@ -70,8 +70,8 @@ describe("PersistentStorage", () => { expect(history2.length).to.equal(1); expect(history2.slice(0)[0].messageId).to.equal("msg-2"); - expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; - expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null; + expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:messages:channel-2")).to.not.be.null; }); it("saves messages after each push", () => { @@ -79,13 +79,13 @@ describe("PersistentStorage", () => { const persistentStorage = PersistentStorage.create(channelId, storage); const history = new LocalHistory({ storage: persistentStorage }); - expect(storage.getItem("waku:sds:history:channel-1")).to.be.null; + expect(storage.getItem("waku:sds:messages:channel-1")).to.be.null; history.push(createMessage("msg-1", 1)); - expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null; + expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null; - const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!); + const saved = JSON.parse(storage.getItem("waku:sds:messages:channel-1")!); expect(saved).to.have.lengthOf(1); expect(saved[0].messageId).to.equal("msg-1"); }); @@ -146,7 +146,7 @@ describe("PersistentStorage", () => { "msg-2" ]); - localStorage.removeItem(`waku:sds:history:${testChannelId}`); + localStorage.removeItem(`waku:sds:messages:${testChannelId}`); }); it("auto-uses localStorage when channelId is provided", () => { @@ -163,7 +163,7 @@ describe("PersistentStorage", () => { "msg-auto-2" ]); - localStorage.removeItem(`waku:sds:history:${testChannelId}`); + localStorage.removeItem(`waku:sds:messages:${testChannelId}`); }); }); }); @@ -181,7 +181,7 @@ const createMessage = (id: string, timestamp: number): ContentMessage => { ); }; -class MemoryStorage implements HistoryStorage { +class MemoryStorage implements IStorage { private readonly store = new Map(); public getItem(key: string): string | null { diff --git a/packages/sds/src/message_channel/persistent_storage.ts b/packages/sds/src/message_channel/persistent_storage.ts index ffc0f14674..36965f0630 100644 --- a/packages/sds/src/message_channel/persistent_storage.ts +++ b/packages/sds/src/message_channel/persistent_storage.ts @@ -5,15 +5,15 @@ import { ChannelId, ContentMessage, HistoryEntry } from "./message.js"; const log = new Logger("sds:persistent-storage"); -const HISTORY_STORAGE_PREFIX = "waku:sds:history:"; +const STORAGE_PREFIX = "waku:sds:storage:"; -export interface HistoryStorage { +export interface IStorage { getItem(key: string): string | null; setItem(key: string, value: string): void; removeItem(key: string): void; } -type StoredHistoryEntry = { +type StoredCausalEntry = { messageId: string; retrievalHint?: string; }; @@ -23,14 +23,14 @@ type StoredContentMessage = { channelId: string; senderId: string; lamportTimestamp: string; - causalHistory: StoredHistoryEntry[]; + causalHistory: StoredCausalEntry[]; bloomFilter?: string; content: string; retrievalHint?: string; }; /** - * Persistent storage for message history. + * Persistent storage for messages. */ export class PersistentStorage { private readonly storageKey: string; @@ -42,7 +42,7 @@ export class PersistentStorage { */ public static create( channelId: ChannelId, - storage?: HistoryStorage + storage?: IStorage ): PersistentStorage | undefined { storage = storage ?? @@ -59,9 +59,9 @@ export class PersistentStorage { private constructor( channelId: ChannelId, - private readonly storage: HistoryStorage + private readonly storage: IStorage ) { - this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`; + this.storageKey = `${STORAGE_PREFIX}${channelId}`; } public save(messages: ContentMessage[]): void { @@ -104,7 +104,7 @@ class MessageSerializer { senderId: message.senderId, lamportTimestamp: message.lamportTimestamp.toString(), causalHistory: message.causalHistory.map((entry) => - MessageSerializer.serializeHistoryEntry(entry) + MessageSerializer.serializeCausalEntry(entry) ), bloomFilter: MessageSerializer.toHex(message.bloomFilter), content: bytesToHex(new Uint8Array(message.content)), @@ -122,7 +122,7 @@ class MessageSerializer { record.channelId, record.senderId, record.causalHistory.map((entry) => - MessageSerializer.deserializeHistoryEntry(entry) + MessageSerializer.deserializeCausalEntry(entry) ), BigInt(record.lamportTimestamp), MessageSerializer.fromHex(record.bloomFilter), @@ -135,7 +135,7 @@ class MessageSerializer { } } - public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry { + public static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry { return { messageId: entry.messageId, retrievalHint: entry.retrievalHint @@ -144,9 +144,7 @@ class MessageSerializer { }; } - public static deserializeHistoryEntry( - entry: StoredHistoryEntry - ): HistoryEntry { + public static deserializeCausalEntry(entry: StoredCausalEntry): HistoryEntry { return { messageId: entry.messageId, retrievalHint: entry.retrievalHint diff --git a/packages/sds/src/message_channel/repair/repair.ts b/packages/sds/src/message_channel/repair/repair.ts index 1a8a85f43e..108dabfbdd 100644 --- a/packages/sds/src/message_channel/repair/repair.ts +++ b/packages/sds/src/message_channel/repair/repair.ts @@ -1,8 +1,8 @@ import { Logger } from "@waku/utils"; +import { LocalHistory } from "../local_history.js"; import type { HistoryEntry, MessageId } from "../message.js"; import { Message } from "../message.js"; -import type { ILocalHistory } from "../message_channel.js"; import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js"; import { @@ -183,7 +183,7 @@ export class RepairManager { */ public processIncomingRepairRequests( requests: HistoryEntry[], - localHistory: ILocalHistory, + localHistory: LocalHistory, currentTime = Date.now() ): void { for (const request of requests) { @@ -248,7 +248,7 @@ export class RepairManager { * Returns messages that should be rebroadcast */ public sweepIncomingBuffer( - localHistory: ILocalHistory, + localHistory: LocalHistory, currentTime = Date.now() ): Message[] { const ready = this.incomingBuffer.getReady(currentTime);