From afbafc43741d462572adddb6dfc625d29dbbd2dc Mon Sep 17 00:00:00 2001 From: Ashis Kumar Naik Date: Wed, 2 Jul 2025 08:24:07 +0530 Subject: [PATCH] feat: add offline message persistence to MessageManager --- src/lib/waku/index.ts | 131 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 116 insertions(+), 15 deletions(-) diff --git a/src/lib/waku/index.ts b/src/lib/waku/index.ts index 0ac654f..7119729 100644 --- a/src/lib/waku/index.ts +++ b/src/lib/waku/index.ts @@ -1,6 +1,3 @@ -//TODO: perhaps store all messages in an indexed DB? (helpful when Waku is down) -// with a `isPublished` flag to indicate if the message has been sent to the network - import { createDecoder, createLightNode, HealthStatus, HealthStatusChangeEvents, LightNode } from "@waku/sdk"; import { BOOTSTRAP_NODES } from "./constants"; import StoreManager from "./store"; @@ -10,6 +7,7 @@ import { CellCache } from "./types"; import { OpchanMessage } from "@/types"; import { EphemeralProtocolsManager } from "./lightpush_filter"; import { NETWORK_CONFIG } from "./constants"; +import { db } from "../storage/db"; export type HealthChangeCallback = (isReady: boolean) => void; @@ -95,7 +93,16 @@ class MessageManager { private setIsReady(isReady: boolean) { if (this._isReady !== isReady) { + const wasOffline = !this._isReady; this._isReady = isReady; + + // If we just came back online, sync the outbox + if (isReady && wasOffline) { + this.syncOutbox().catch(err => + console.warn("Failed to sync outbox after coming online:", err) + ); + } + // Notify all health listeners this.healthListeners.forEach(listener => listener(isReady)); } @@ -166,9 +173,16 @@ class MessageManager { } public async sendMessage(message: OpchanMessage) { - await this.ephemeralProtocolsManager.sendMessage(message); - //TODO: should we update the cache here? or just from store/filter? - this.updateCache(message); + if (this._isReady) { + // If we're online, send the message immediately + await this.ephemeralProtocolsManager.sendMessage(message); + this.updateCache(message); + } else { + // If we're offline, add to outbox + await db.addToOutbox(message); + // Still update cache for immediate UI feedback + this.updateCache(message); + } } public async subscribeToMessages(types: MessageType[] = [MessageType.CELL, MessageType.POST, MessageType.COMMENT, MessageType.VOTE, MessageType.MODERATE]) { @@ -181,27 +195,53 @@ class MessageManager { return { messages: result, subscription }; } - private updateCache(message: OpchanMessage) { + private async updateCache(message: OpchanMessage) { switch (message.type) { - case MessageType.CELL: - this.messageCache.cells[message.id] = message; + case MessageType.CELL: { + // Check for conflicts and resolve by timestamp (newer wins) + const existingCell = this.messageCache.cells[message.id]; + if (!existingCell || message.timestamp >= existingCell.timestamp) { + this.messageCache.cells[message.id] = message; + await db.cells.put(message).catch(err => console.warn("Failed to persist cell to IndexedDB:", err)); + } break; - case MessageType.POST: - this.messageCache.posts[message.id] = message; + } + case MessageType.POST: { + // Check for conflicts and resolve by timestamp (newer wins) + const existingPost = this.messageCache.posts[message.id]; + if (!existingPost || message.timestamp >= existingPost.timestamp) { + this.messageCache.posts[message.id] = message; + await db.posts.put(message).catch(err => console.warn("Failed to persist post to IndexedDB:", err)); + } break; - case MessageType.COMMENT: - this.messageCache.comments[message.id] = message; + } + case MessageType.COMMENT: { + // Check for conflicts and resolve by timestamp (newer wins) + const existingComment = this.messageCache.comments[message.id]; + if (!existingComment || message.timestamp >= existingComment.timestamp) { + this.messageCache.comments[message.id] = message; + await db.comments.put(message).catch(err => console.warn("Failed to persist comment to IndexedDB:", err)); + } break; + } case MessageType.VOTE: { // For votes, we use a composite key of targetId + author to handle multiple votes from same user const voteKey = `${message.targetId}:${message.author}`; - this.messageCache.votes[voteKey] = message; + const existingVote = this.messageCache.votes[voteKey]; + if (!existingVote || message.timestamp >= existingVote.timestamp) { + this.messageCache.votes[voteKey] = message; + await db.votes.put(message).catch(err => console.warn("Failed to persist vote to IndexedDB:", err)); + } break; } case MessageType.MODERATE: { // Type guard for ModerateMessage const modMsg = message as ModerateMessage; - this.messageCache.moderations[modMsg.targetId] = modMsg; + const existingModeration = this.messageCache.moderations[modMsg.targetId]; + if (!existingModeration || modMsg.timestamp >= existingModeration.timestamp) { + this.messageCache.moderations[modMsg.targetId] = modMsg; + await db.moderations.put(modMsg).catch(err => console.warn("Failed to persist moderation to IndexedDB:", err)); + } break; } default: @@ -210,6 +250,67 @@ class MessageManager { break; } } + + /** + * Hydrate the message cache from IndexedDB on startup + * This allows the UI to display cached data immediately + */ + public async hydrateFromStorage(): Promise { + try { + const cachedData = await db.hydrateMessageCache(); + this.messageCache.cells = cachedData.cells; + this.messageCache.posts = cachedData.posts; + this.messageCache.comments = cachedData.comments; + this.messageCache.votes = cachedData.votes; + this.messageCache.moderations = cachedData.moderations; + console.log("Message cache hydrated from IndexedDB"); + } catch (err) { + console.warn("Failed to hydrate message cache from IndexedDB:", err); + } + } + + /** + * Sync the outbox - send all unpublished messages + * Called automatically when coming back online + */ + private async syncOutbox(): Promise { + try { + const unpublishedMessages = await db.getUnpublishedMessages(); + console.log(`Syncing ${unpublishedMessages.length} messages from outbox`); + + for (const outboxItem of unpublishedMessages) { + try { + // Send the message via Waku + await this.ephemeralProtocolsManager.sendMessage(outboxItem.data); + + // Mark as published in the database + await db.markAsPublished(outboxItem.id); + + console.log(`Successfully synced message ${outboxItem.id}`); + } catch (err) { + console.warn(`Failed to sync message ${outboxItem.id}:`, err); + // Don't mark as published if sending failed + } + } + + // Clean up published messages from outbox + await db.clearOutbox(); + } catch (err) { + console.warn("Failed to sync outbox:", err); + } + } + + /** + * Get the count of pending messages in the outbox + */ + public async getOutboxCount(): Promise { + try { + return await db.getOutboxCount(); + } catch (err) { + console.warn("Failed to get outbox count:", err); + return 0; + } + } } const messageManager = await MessageManager.create();