From 907b5f8c5fd94364e1d41c60696a8b88259296a7 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 29 Aug 2025 16:16:20 +0530 Subject: [PATCH] chore: handle redundant messages --- src/lib/waku/index.ts | 73 ++++++++++++++++++++--------------------- src/lib/waku/network.ts | 34 ++++++++++++------- src/lib/waku/types.ts | 5 +-- 3 files changed, 59 insertions(+), 53 deletions(-) diff --git a/src/lib/waku/index.ts b/src/lib/waku/index.ts index f75f536..9aaa894 100644 --- a/src/lib/waku/index.ts +++ b/src/lib/waku/index.ts @@ -17,6 +17,7 @@ class MessageManager { private _isReady: boolean = false; private _currentHealth: HealthStatus = HealthStatus.Unhealthy; private healthListeners: Set = new Set(); + private processedMessageIds: Set = new Set(); // Track processed message IDs public readonly messageCache: { @@ -38,7 +39,6 @@ class MessageManager { defaultBootstrap: true, networkConfig: NETWORK_CONFIG, autoStart: true, - // bootstrapPeers: BOOTSTRAP_NODES[42], }); return new MessageManager(node); @@ -97,7 +97,14 @@ class MessageManager { // Set up listener for incoming reliable messages this.reliableMessageManager.addIncomingMessageListener({ onMessage: (message) => { - console.log("Received reliable message:", message); + // Check if we've already processed this exact message + const messageKey = `${message.type}:${message.id}:${message.timestamp}`; + if (this.processedMessageIds.has(messageKey)) { + console.log(`Received message ${messageKey} but it has already been processed`); + return; + } + + this.processedMessageIds.add(messageKey); this.updateCache(message); } }); @@ -155,46 +162,24 @@ class MessageManager { }; } - /** - * Waits for the node to achieve at least minimally healthy status - * @param timeoutMs Maximum time to wait in milliseconds - * @returns Promise that resolves when healthy or rejects on timeout - */ - public async waitForRemotePeer(timeoutMs: number = 15000): Promise { - if (this._isReady) return true; - - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - reject(new Error(`Timed out waiting for healthy network connection after ${timeoutMs}ms`)); - }, timeoutMs); - - const checkHandler = (isReady: boolean, health: HealthStatus) => { - if (isReady && (health === HealthStatus.MinimallyHealthy || health === HealthStatus.SufficientlyHealthy)) { - clearTimeout(timeout); - this.healthListeners.delete(checkHandler); - resolve(true); - } - }; - - // Add temporary listener for health status - this.healthListeners.add(checkHandler); - }); - } + public async sendMessage(message: OpchanMessage) { if (!this.reliableMessageManager) { throw new Error("Reliable message manager not initialized"); } + // Track this message as processed (optimistic) + const messageKey = `${message.type}:${message.id}:${message.timestamp}`; + this.processedMessageIds.add(messageKey); + // Use reliable channel with status tracking const messageId = await this.reliableMessageManager.sendMessage(message, { - onSent: (id) => console.log(`Message ${id} sent ✓`), - onAcknowledged: (id) => console.log(`Message ${id} acknowledged ✓✓`), + onSent: (id) => console.log(`Message ${id} sent`), + onAcknowledged: (id) => console.log(`Message ${id} acknowledged`), onError: (id, error) => console.error(`Message ${id} failed:`, error) }); - console.log(`Sent reliable message with ID: ${messageId}`); - // Update local cache immediately for optimistic UI this.updateCache(message); @@ -204,28 +189,42 @@ class MessageManager { private updateCache(message: OpchanMessage) { switch (message.type) { case MessageType.CELL: - this.messageCache.cells[message.id] = message; + if (!this.messageCache.cells[message.id] || + this.messageCache.cells[message.id].timestamp !== message.timestamp) { + this.messageCache.cells[message.id] = message; + } break; case MessageType.POST: - this.messageCache.posts[message.id] = message; + if (!this.messageCache.posts[message.id] || + this.messageCache.posts[message.id].timestamp !== message.timestamp) { + this.messageCache.posts[message.id] = message; + } break; case MessageType.COMMENT: - this.messageCache.comments[message.id] = message; + if (!this.messageCache.comments[message.id] || + this.messageCache.comments[message.id].timestamp !== message.timestamp) { + this.messageCache.comments[message.id] = message; + } break; case MessageType.VOTE: { // For votes, we use a composite key of targetId + author to handle multiple votes from same user const voteKey = `${message.targetId}:${message.author}`; - this.messageCache.votes[voteKey] = message; + if (!this.messageCache.votes[voteKey] || + this.messageCache.votes[voteKey].timestamp !== message.timestamp) { + this.messageCache.votes[voteKey] = message; + } break; } case MessageType.MODERATE: { // Type guard for ModerateMessage const modMsg = message as ModerateMessage; - this.messageCache.moderations[modMsg.targetId] = modMsg; + if (!this.messageCache.moderations[modMsg.targetId] || + this.messageCache.moderations[modMsg.targetId].timestamp !== modMsg.timestamp) { + this.messageCache.moderations[modMsg.targetId] = modMsg; + } break; } default: - // TypeScript should ensure we don't reach this case with proper OpchanMessage types console.warn("Received message with unknown type"); break; } diff --git a/src/lib/waku/network.ts b/src/lib/waku/network.ts index c223b1b..026301e 100644 --- a/src/lib/waku/network.ts +++ b/src/lib/waku/network.ts @@ -15,13 +15,16 @@ export const refreshData = async ( ): Promise => { try { toast({ title: 'Refreshing data', description: 'SDS handles message syncing automatically...' }); + if (!isNetworkConnected) { - try { - await messageManager.waitForRemotePeer(10000); - } catch (err) { - console.warn('Could not connect to peer during refresh:', err); - } + toast({ + title: 'Network disconnected', + description: 'Unable to refresh data. Please wait for network connection to be restored.', + variant: 'destructive' + }); + return; } + updateStateFromCache(); toast({ title: 'Data refreshed', description: 'Your view has been updated with the latest messages.' }); } catch (err) { @@ -38,17 +41,24 @@ export const initializeNetwork = async ( ): Promise => { try { toast({ title: 'Loading data', description: 'Connecting to the Waku network...' }); - try { - await messageManager.waitForRemotePeer(15000); - } catch (err) { - toast({ title: 'Connection timeout', description: 'Could not connect to any peers. Some features may be unavailable.', variant: 'destructive' }); - console.warn('Timeout connecting to peer:', err); - } + + // Load data from cache immediately - health monitoring will handle network status updateStateFromCache(); + + // Check current network status and provide appropriate feedback + if (messageManager.isReady) { + toast({ title: 'Connected', description: 'Successfully connected to Waku network.' }); + } else { + toast({ + title: 'Connecting...', + description: 'Establishing network connection. You can view cached data while we connect.', + variant: 'default' + }); + } } catch (err) { console.error('Error loading forum data:', err); setError('Failed to load forum data. Please try again later.'); - toast({ title: 'Connection error', description: 'Failed to connect to Waku network. Please try refreshing.', variant: 'destructive' }); + toast({ title: 'Load error', description: 'Failed to load forum data. Please try refreshing.', variant: 'destructive' }); } }; diff --git a/src/lib/waku/types.ts b/src/lib/waku/types.ts index 477aff7..14f1f53 100644 --- a/src/lib/waku/types.ts +++ b/src/lib/waku/types.ts @@ -13,6 +13,7 @@ export enum MessageType { * Base interface for all message types */ export interface BaseMessage { + id: string; type: MessageType; timestamp: number | Date; author: string; @@ -25,7 +26,6 @@ export interface BaseMessage { */ export interface CellMessage extends BaseMessage { type: MessageType.CELL; - id: string; name: string; description: string; icon?: string; @@ -36,7 +36,6 @@ export interface CellMessage extends BaseMessage { */ export interface PostMessage extends BaseMessage { type: MessageType.POST; - id: string; cellId: string; title: string; content: string; @@ -47,7 +46,6 @@ export interface PostMessage extends BaseMessage { */ export interface CommentMessage extends BaseMessage { type: MessageType.COMMENT; - id: string; postId: string; content: string; } @@ -57,7 +55,6 @@ export interface CommentMessage extends BaseMessage { */ export interface VoteMessage extends BaseMessage { type: MessageType.VOTE; - id: string; targetId: string; // ID of the post or comment being voted on value: 1 | -1; }