feat: add offline message persistence to MessageManager

This commit is contained in:
Ashis Kumar Naik 2025-07-02 08:24:07 +05:30
parent 5e7ad5cecd
commit afbafc4374

View File

@ -1,6 +1,3 @@
//TODO: perhaps store all messages in an indexed DB? (helpful when Waku is down)
// with a `isPublished` flag to indicate if the message has been sent to the network
import { createDecoder, createLightNode, HealthStatus, HealthStatusChangeEvents, LightNode } from "@waku/sdk";
import { BOOTSTRAP_NODES } from "./constants";
import StoreManager from "./store";
@ -10,6 +7,7 @@ import { CellCache } from "./types";
import { OpchanMessage } from "@/types";
import { EphemeralProtocolsManager } from "./lightpush_filter";
import { NETWORK_CONFIG } from "./constants";
import { db } from "../storage/db";
export type HealthChangeCallback = (isReady: boolean) => void;
@ -95,7 +93,16 @@ class MessageManager {
private setIsReady(isReady: boolean) {
if (this._isReady !== isReady) {
const wasOffline = !this._isReady;
this._isReady = isReady;
// If we just came back online, sync the outbox
if (isReady && wasOffline) {
this.syncOutbox().catch(err =>
console.warn("Failed to sync outbox after coming online:", err)
);
}
// Notify all health listeners
this.healthListeners.forEach(listener => listener(isReady));
}
@ -166,9 +173,16 @@ class MessageManager {
}
public async sendMessage(message: OpchanMessage) {
await this.ephemeralProtocolsManager.sendMessage(message);
//TODO: should we update the cache here? or just from store/filter?
this.updateCache(message);
if (this._isReady) {
// If we're online, send the message immediately
await this.ephemeralProtocolsManager.sendMessage(message);
this.updateCache(message);
} else {
// If we're offline, add to outbox
await db.addToOutbox(message);
// Still update cache for immediate UI feedback
this.updateCache(message);
}
}
public async subscribeToMessages(types: MessageType[] = [MessageType.CELL, MessageType.POST, MessageType.COMMENT, MessageType.VOTE, MessageType.MODERATE]) {
@ -181,27 +195,53 @@ class MessageManager {
return { messages: result, subscription };
}
private updateCache(message: OpchanMessage) {
private async updateCache(message: OpchanMessage) {
switch (message.type) {
case MessageType.CELL:
this.messageCache.cells[message.id] = message;
case MessageType.CELL: {
// Check for conflicts and resolve by timestamp (newer wins)
const existingCell = this.messageCache.cells[message.id];
if (!existingCell || message.timestamp >= existingCell.timestamp) {
this.messageCache.cells[message.id] = message;
await db.cells.put(message).catch(err => console.warn("Failed to persist cell to IndexedDB:", err));
}
break;
case MessageType.POST:
this.messageCache.posts[message.id] = message;
}
case MessageType.POST: {
// Check for conflicts and resolve by timestamp (newer wins)
const existingPost = this.messageCache.posts[message.id];
if (!existingPost || message.timestamp >= existingPost.timestamp) {
this.messageCache.posts[message.id] = message;
await db.posts.put(message).catch(err => console.warn("Failed to persist post to IndexedDB:", err));
}
break;
case MessageType.COMMENT:
this.messageCache.comments[message.id] = message;
}
case MessageType.COMMENT: {
// Check for conflicts and resolve by timestamp (newer wins)
const existingComment = this.messageCache.comments[message.id];
if (!existingComment || message.timestamp >= existingComment.timestamp) {
this.messageCache.comments[message.id] = message;
await db.comments.put(message).catch(err => console.warn("Failed to persist comment to IndexedDB:", err));
}
break;
}
case MessageType.VOTE: {
// For votes, we use a composite key of targetId + author to handle multiple votes from same user
const voteKey = `${message.targetId}:${message.author}`;
this.messageCache.votes[voteKey] = message;
const existingVote = this.messageCache.votes[voteKey];
if (!existingVote || message.timestamp >= existingVote.timestamp) {
this.messageCache.votes[voteKey] = message;
await db.votes.put(message).catch(err => console.warn("Failed to persist vote to IndexedDB:", err));
}
break;
}
case MessageType.MODERATE: {
// Type guard for ModerateMessage
const modMsg = message as ModerateMessage;
this.messageCache.moderations[modMsg.targetId] = modMsg;
const existingModeration = this.messageCache.moderations[modMsg.targetId];
if (!existingModeration || modMsg.timestamp >= existingModeration.timestamp) {
this.messageCache.moderations[modMsg.targetId] = modMsg;
await db.moderations.put(modMsg).catch(err => console.warn("Failed to persist moderation to IndexedDB:", err));
}
break;
}
default:
@ -210,6 +250,67 @@ class MessageManager {
break;
}
}
/**
* Hydrate the message cache from IndexedDB on startup
* This allows the UI to display cached data immediately
*/
public async hydrateFromStorage(): Promise<void> {
try {
const cachedData = await db.hydrateMessageCache();
this.messageCache.cells = cachedData.cells;
this.messageCache.posts = cachedData.posts;
this.messageCache.comments = cachedData.comments;
this.messageCache.votes = cachedData.votes;
this.messageCache.moderations = cachedData.moderations;
console.log("Message cache hydrated from IndexedDB");
} catch (err) {
console.warn("Failed to hydrate message cache from IndexedDB:", err);
}
}
/**
* Sync the outbox - send all unpublished messages
* Called automatically when coming back online
*/
private async syncOutbox(): Promise<void> {
try {
const unpublishedMessages = await db.getUnpublishedMessages();
console.log(`Syncing ${unpublishedMessages.length} messages from outbox`);
for (const outboxItem of unpublishedMessages) {
try {
// Send the message via Waku
await this.ephemeralProtocolsManager.sendMessage(outboxItem.data);
// Mark as published in the database
await db.markAsPublished(outboxItem.id);
console.log(`Successfully synced message ${outboxItem.id}`);
} catch (err) {
console.warn(`Failed to sync message ${outboxItem.id}:`, err);
// Don't mark as published if sending failed
}
}
// Clean up published messages from outbox
await db.clearOutbox();
} catch (err) {
console.warn("Failed to sync outbox:", err);
}
}
/**
* Get the count of pending messages in the outbox
*/
public async getOutboxCount(): Promise<number> {
try {
return await db.getOutboxCount();
} catch (err) {
console.warn("Failed to get outbox count:", err);
return 0;
}
}
}
const messageManager = await MessageManager.create();