mirror of
https://github.com/logos-messaging/logos-messaging-js.git
synced 2026-02-02 15:23:24 +00:00
chore: update word history to storage
This commit is contained in:
parent
41f8fdf8cf
commit
721c494567
@ -2,7 +2,6 @@ import { Logger } from "@waku/utils";
|
||||
import _ from "lodash";
|
||||
|
||||
import { type ChannelId, ContentMessage, isContentMessage } from "./message.js";
|
||||
import { ILocalHistory } from "./message_channel.js";
|
||||
import { PersistentStorage } from "./persistent_storage.js";
|
||||
|
||||
export const DEFAULT_MAX_LENGTH = 10_000;
|
||||
@ -28,7 +27,7 @@ export type LocalHistoryOptions = {
|
||||
|
||||
const log = new Logger("sds:local-history");
|
||||
|
||||
export class LocalHistory implements ILocalHistory {
|
||||
export class LocalHistory {
|
||||
private items: ContentMessage[] = [];
|
||||
private readonly storage?: PersistentStorage;
|
||||
private readonly maxSize: number;
|
||||
|
||||
@ -318,7 +318,7 @@ describe("MessageChannel", function () {
|
||||
testRetrievalHint
|
||||
);
|
||||
|
||||
const localHistory = channelA["localHistory"] as ILocalHistory;
|
||||
const localHistory = channelA["localHistory"] as LocalHistory;
|
||||
expect(localHistory.length).to.equal(1);
|
||||
|
||||
// Find the message in local history
|
||||
@ -440,7 +440,7 @@ describe("MessageChannel", function () {
|
||||
)
|
||||
);
|
||||
|
||||
const localHistory = channelA["localHistory"] as ILocalHistory;
|
||||
const localHistory = channelA["localHistory"] as LocalHistory;
|
||||
expect(localHistory.length).to.equal(2);
|
||||
|
||||
// When timestamps are equal, should be ordered by messageId lexicographically
|
||||
|
||||
@ -38,11 +38,6 @@ const DEFAULT_POSSIBLE_ACKS_THRESHOLD = 2;
|
||||
|
||||
const log = new Logger("sds:message-channel");
|
||||
|
||||
export type ILocalHistory = Pick<
|
||||
Array<ContentMessage>,
|
||||
"some" | "push" | "slice" | "find" | "length" | "findIndex"
|
||||
>;
|
||||
|
||||
export interface MessageChannelOptions {
|
||||
causalHistorySize?: number;
|
||||
/**
|
||||
@ -76,7 +71,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
private outgoingBuffer: ContentMessage[];
|
||||
private possibleAcks: Map<MessageId, number>;
|
||||
private incomingBuffer: Array<ContentMessage | SyncMessage>;
|
||||
private readonly localHistory: ILocalHistory;
|
||||
private readonly localHistory: LocalHistory;
|
||||
private timeReceived: Map<MessageId, number>;
|
||||
private readonly causalHistorySize: number;
|
||||
private readonly possibleAcksThreshold: number;
|
||||
@ -106,7 +101,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> {
|
||||
channelId: ChannelId,
|
||||
senderId: ParticipantId,
|
||||
options: MessageChannelOptions = {},
|
||||
localHistory?: ILocalHistory
|
||||
localHistory?: LocalHistory
|
||||
) {
|
||||
super();
|
||||
this.channelId = channelId;
|
||||
|
||||
@ -2,7 +2,7 @@ import { expect } from "chai";
|
||||
|
||||
import { LocalHistory } from "./local_history.js";
|
||||
import { ContentMessage } from "./message.js";
|
||||
import { HistoryStorage, PersistentStorage } from "./persistent_storage.js";
|
||||
import { IStorage, PersistentStorage } from "./persistent_storage.js";
|
||||
|
||||
const channelId = "channel-1";
|
||||
|
||||
@ -41,7 +41,7 @@ describe("PersistentStorage", () => {
|
||||
it("handles corrupt data in storage gracefully", () => {
|
||||
const storage = new MemoryStorage();
|
||||
// Corrupt data
|
||||
storage.setItem("waku:sds:history:channel-1", "{ invalid json }");
|
||||
storage.setItem("waku:sds:messages:channel-1", "{ invalid json }");
|
||||
|
||||
const persistentStorage = PersistentStorage.create(channelId, storage);
|
||||
const history = new LocalHistory({ storage: persistentStorage });
|
||||
@ -49,7 +49,7 @@ describe("PersistentStorage", () => {
|
||||
expect(history.length).to.equal(0);
|
||||
|
||||
// Corrupt data is not saved
|
||||
expect(storage.getItem("waku:sds:history:channel-1")).to.equal(null);
|
||||
expect(storage.getItem("waku:sds:messages:channel-1")).to.equal(null);
|
||||
});
|
||||
|
||||
it("isolates history by channel ID", () => {
|
||||
@ -70,8 +70,8 @@ describe("PersistentStorage", () => {
|
||||
expect(history2.length).to.equal(1);
|
||||
expect(history2.slice(0)[0].messageId).to.equal("msg-2");
|
||||
|
||||
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
|
||||
expect(storage.getItem("waku:sds:history:channel-2")).to.not.be.null;
|
||||
expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null;
|
||||
expect(storage.getItem("waku:sds:messages:channel-2")).to.not.be.null;
|
||||
});
|
||||
|
||||
it("saves messages after each push", () => {
|
||||
@ -79,13 +79,13 @@ describe("PersistentStorage", () => {
|
||||
const persistentStorage = PersistentStorage.create(channelId, storage);
|
||||
const history = new LocalHistory({ storage: persistentStorage });
|
||||
|
||||
expect(storage.getItem("waku:sds:history:channel-1")).to.be.null;
|
||||
expect(storage.getItem("waku:sds:messages:channel-1")).to.be.null;
|
||||
|
||||
history.push(createMessage("msg-1", 1));
|
||||
|
||||
expect(storage.getItem("waku:sds:history:channel-1")).to.not.be.null;
|
||||
expect(storage.getItem("waku:sds:messages:channel-1")).to.not.be.null;
|
||||
|
||||
const saved = JSON.parse(storage.getItem("waku:sds:history:channel-1")!);
|
||||
const saved = JSON.parse(storage.getItem("waku:sds:messages:channel-1")!);
|
||||
expect(saved).to.have.lengthOf(1);
|
||||
expect(saved[0].messageId).to.equal("msg-1");
|
||||
});
|
||||
@ -146,7 +146,7 @@ describe("PersistentStorage", () => {
|
||||
"msg-2"
|
||||
]);
|
||||
|
||||
localStorage.removeItem(`waku:sds:history:${testChannelId}`);
|
||||
localStorage.removeItem(`waku:sds:messages:${testChannelId}`);
|
||||
});
|
||||
|
||||
it("auto-uses localStorage when channelId is provided", () => {
|
||||
@ -163,7 +163,7 @@ describe("PersistentStorage", () => {
|
||||
"msg-auto-2"
|
||||
]);
|
||||
|
||||
localStorage.removeItem(`waku:sds:history:${testChannelId}`);
|
||||
localStorage.removeItem(`waku:sds:messages:${testChannelId}`);
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -181,7 +181,7 @@ const createMessage = (id: string, timestamp: number): ContentMessage => {
|
||||
);
|
||||
};
|
||||
|
||||
class MemoryStorage implements HistoryStorage {
|
||||
class MemoryStorage implements IStorage {
|
||||
private readonly store = new Map<string, string>();
|
||||
|
||||
public getItem(key: string): string | null {
|
||||
|
||||
@ -5,15 +5,15 @@ import { ChannelId, ContentMessage, HistoryEntry } from "./message.js";
|
||||
|
||||
const log = new Logger("sds:persistent-storage");
|
||||
|
||||
const HISTORY_STORAGE_PREFIX = "waku:sds:history:";
|
||||
const STORAGE_PREFIX = "waku:sds:storage:";
|
||||
|
||||
export interface HistoryStorage {
|
||||
export interface IStorage {
|
||||
getItem(key: string): string | null;
|
||||
setItem(key: string, value: string): void;
|
||||
removeItem(key: string): void;
|
||||
}
|
||||
|
||||
type StoredHistoryEntry = {
|
||||
type StoredCausalEntry = {
|
||||
messageId: string;
|
||||
retrievalHint?: string;
|
||||
};
|
||||
@ -23,14 +23,14 @@ type StoredContentMessage = {
|
||||
channelId: string;
|
||||
senderId: string;
|
||||
lamportTimestamp: string;
|
||||
causalHistory: StoredHistoryEntry[];
|
||||
causalHistory: StoredCausalEntry[];
|
||||
bloomFilter?: string;
|
||||
content: string;
|
||||
retrievalHint?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Persistent storage for message history.
|
||||
* Persistent storage for messages.
|
||||
*/
|
||||
export class PersistentStorage {
|
||||
private readonly storageKey: string;
|
||||
@ -42,7 +42,7 @@ export class PersistentStorage {
|
||||
*/
|
||||
public static create(
|
||||
channelId: ChannelId,
|
||||
storage?: HistoryStorage
|
||||
storage?: IStorage
|
||||
): PersistentStorage | undefined {
|
||||
storage =
|
||||
storage ??
|
||||
@ -59,9 +59,9 @@ export class PersistentStorage {
|
||||
|
||||
private constructor(
|
||||
channelId: ChannelId,
|
||||
private readonly storage: HistoryStorage
|
||||
private readonly storage: IStorage
|
||||
) {
|
||||
this.storageKey = `${HISTORY_STORAGE_PREFIX}${channelId}`;
|
||||
this.storageKey = `${STORAGE_PREFIX}${channelId}`;
|
||||
}
|
||||
|
||||
public save(messages: ContentMessage[]): void {
|
||||
@ -104,7 +104,7 @@ class MessageSerializer {
|
||||
senderId: message.senderId,
|
||||
lamportTimestamp: message.lamportTimestamp.toString(),
|
||||
causalHistory: message.causalHistory.map((entry) =>
|
||||
MessageSerializer.serializeHistoryEntry(entry)
|
||||
MessageSerializer.serializeCausalEntry(entry)
|
||||
),
|
||||
bloomFilter: MessageSerializer.toHex(message.bloomFilter),
|
||||
content: bytesToHex(new Uint8Array(message.content)),
|
||||
@ -122,7 +122,7 @@ class MessageSerializer {
|
||||
record.channelId,
|
||||
record.senderId,
|
||||
record.causalHistory.map((entry) =>
|
||||
MessageSerializer.deserializeHistoryEntry(entry)
|
||||
MessageSerializer.deserializeCausalEntry(entry)
|
||||
),
|
||||
BigInt(record.lamportTimestamp),
|
||||
MessageSerializer.fromHex(record.bloomFilter),
|
||||
@ -135,7 +135,7 @@ class MessageSerializer {
|
||||
}
|
||||
}
|
||||
|
||||
public static serializeHistoryEntry(entry: HistoryEntry): StoredHistoryEntry {
|
||||
public static serializeCausalEntry(entry: HistoryEntry): StoredCausalEntry {
|
||||
return {
|
||||
messageId: entry.messageId,
|
||||
retrievalHint: entry.retrievalHint
|
||||
@ -144,9 +144,7 @@ class MessageSerializer {
|
||||
};
|
||||
}
|
||||
|
||||
public static deserializeHistoryEntry(
|
||||
entry: StoredHistoryEntry
|
||||
): HistoryEntry {
|
||||
public static deserializeCausalEntry(entry: StoredCausalEntry): HistoryEntry {
|
||||
return {
|
||||
messageId: entry.messageId,
|
||||
retrievalHint: entry.retrievalHint
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { LocalHistory } from "../local_history.js";
|
||||
import type { HistoryEntry, MessageId } from "../message.js";
|
||||
import { Message } from "../message.js";
|
||||
import type { ILocalHistory } from "../message_channel.js";
|
||||
|
||||
import { IncomingRepairBuffer, OutgoingRepairBuffer } from "./buffers.js";
|
||||
import {
|
||||
@ -183,7 +183,7 @@ export class RepairManager {
|
||||
*/
|
||||
public processIncomingRepairRequests(
|
||||
requests: HistoryEntry[],
|
||||
localHistory: ILocalHistory,
|
||||
localHistory: LocalHistory,
|
||||
currentTime = Date.now()
|
||||
): void {
|
||||
for (const request of requests) {
|
||||
@ -248,7 +248,7 @@ export class RepairManager {
|
||||
* Returns messages that should be rebroadcast
|
||||
*/
|
||||
public sweepIncomingBuffer(
|
||||
localHistory: ILocalHistory,
|
||||
localHistory: LocalHistory,
|
||||
currentTime = Date.now()
|
||||
): Message[] {
|
||||
const ready = this.incomingBuffer.getReady(currentTime);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user