chore: handle redundant messages

This commit is contained in:
Danish Arora 2025-08-29 16:16:20 +05:30
parent 2b9b93c88f
commit 907b5f8c5f
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
3 changed files with 59 additions and 53 deletions

View File

@ -17,6 +17,7 @@ class MessageManager {
private _isReady: boolean = false;
private _currentHealth: HealthStatus = HealthStatus.Unhealthy;
private healthListeners: Set<HealthChangeCallback> = new Set();
private processedMessageIds: Set<string> = new Set(); // Track processed message IDs
public readonly messageCache: {
@ -38,7 +39,6 @@ class MessageManager {
defaultBootstrap: true,
networkConfig: NETWORK_CONFIG,
autoStart: true,
// bootstrapPeers: BOOTSTRAP_NODES[42],
});
return new MessageManager(node);
@ -97,7 +97,14 @@ class MessageManager {
// Set up listener for incoming reliable messages
this.reliableMessageManager.addIncomingMessageListener({
onMessage: (message) => {
console.log("Received reliable message:", message);
// Check if we've already processed this exact message
const messageKey = `${message.type}:${message.id}:${message.timestamp}`;
if (this.processedMessageIds.has(messageKey)) {
console.log(`Received message ${messageKey} but it has already been processed`);
return;
}
this.processedMessageIds.add(messageKey);
this.updateCache(message);
}
});
@ -155,46 +162,24 @@ class MessageManager {
};
}
/**
* Waits for the node to achieve at least minimally healthy status
* @param timeoutMs Maximum time to wait in milliseconds
* @returns Promise that resolves when healthy or rejects on timeout
*/
public async waitForRemotePeer(timeoutMs: number = 15000): Promise<boolean> {
if (this._isReady) return true;
return new Promise<boolean>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Timed out waiting for healthy network connection after ${timeoutMs}ms`));
}, timeoutMs);
const checkHandler = (isReady: boolean, health: HealthStatus) => {
if (isReady && (health === HealthStatus.MinimallyHealthy || health === HealthStatus.SufficientlyHealthy)) {
clearTimeout(timeout);
this.healthListeners.delete(checkHandler);
resolve(true);
}
};
// Add temporary listener for health status
this.healthListeners.add(checkHandler);
});
}
public async sendMessage(message: OpchanMessage) {
if (!this.reliableMessageManager) {
throw new Error("Reliable message manager not initialized");
}
// Track this message as processed (optimistic)
const messageKey = `${message.type}:${message.id}:${message.timestamp}`;
this.processedMessageIds.add(messageKey);
// Use reliable channel with status tracking
const messageId = await this.reliableMessageManager.sendMessage(message, {
onSent: (id) => console.log(`Message ${id} sent ✓`),
onAcknowledged: (id) => console.log(`Message ${id} acknowledged ✓✓`),
onSent: (id) => console.log(`Message ${id} sent`),
onAcknowledged: (id) => console.log(`Message ${id} acknowledged`),
onError: (id, error) => console.error(`Message ${id} failed:`, error)
});
console.log(`Sent reliable message with ID: ${messageId}`);
// Update local cache immediately for optimistic UI
this.updateCache(message);
@ -204,28 +189,42 @@ class MessageManager {
private updateCache(message: OpchanMessage) {
switch (message.type) {
case MessageType.CELL:
this.messageCache.cells[message.id] = message;
if (!this.messageCache.cells[message.id] ||
this.messageCache.cells[message.id].timestamp !== message.timestamp) {
this.messageCache.cells[message.id] = message;
}
break;
case MessageType.POST:
this.messageCache.posts[message.id] = message;
if (!this.messageCache.posts[message.id] ||
this.messageCache.posts[message.id].timestamp !== message.timestamp) {
this.messageCache.posts[message.id] = message;
}
break;
case MessageType.COMMENT:
this.messageCache.comments[message.id] = message;
if (!this.messageCache.comments[message.id] ||
this.messageCache.comments[message.id].timestamp !== message.timestamp) {
this.messageCache.comments[message.id] = message;
}
break;
case MessageType.VOTE: {
// For votes, we use a composite key of targetId + author to handle multiple votes from same user
const voteKey = `${message.targetId}:${message.author}`;
this.messageCache.votes[voteKey] = message;
if (!this.messageCache.votes[voteKey] ||
this.messageCache.votes[voteKey].timestamp !== message.timestamp) {
this.messageCache.votes[voteKey] = message;
}
break;
}
case MessageType.MODERATE: {
// Type guard for ModerateMessage
const modMsg = message as ModerateMessage;
this.messageCache.moderations[modMsg.targetId] = modMsg;
if (!this.messageCache.moderations[modMsg.targetId] ||
this.messageCache.moderations[modMsg.targetId].timestamp !== modMsg.timestamp) {
this.messageCache.moderations[modMsg.targetId] = modMsg;
}
break;
}
default:
// TypeScript should ensure we don't reach this case with proper OpchanMessage types
console.warn("Received message with unknown type");
break;
}

View File

@ -15,13 +15,16 @@ export const refreshData = async (
): Promise<void> => {
try {
toast({ title: 'Refreshing data', description: 'SDS handles message syncing automatically...' });
if (!isNetworkConnected) {
try {
await messageManager.waitForRemotePeer(10000);
} catch (err) {
console.warn('Could not connect to peer during refresh:', err);
}
toast({
title: 'Network disconnected',
description: 'Unable to refresh data. Please wait for network connection to be restored.',
variant: 'destructive'
});
return;
}
updateStateFromCache();
toast({ title: 'Data refreshed', description: 'Your view has been updated with the latest messages.' });
} catch (err) {
@ -38,17 +41,24 @@ export const initializeNetwork = async (
): Promise<void> => {
try {
toast({ title: 'Loading data', description: 'Connecting to the Waku network...' });
try {
await messageManager.waitForRemotePeer(15000);
} catch (err) {
toast({ title: 'Connection timeout', description: 'Could not connect to any peers. Some features may be unavailable.', variant: 'destructive' });
console.warn('Timeout connecting to peer:', err);
}
// Load data from cache immediately - health monitoring will handle network status
updateStateFromCache();
// Check current network status and provide appropriate feedback
if (messageManager.isReady) {
toast({ title: 'Connected', description: 'Successfully connected to Waku network.' });
} else {
toast({
title: 'Connecting...',
description: 'Establishing network connection. You can view cached data while we connect.',
variant: 'default'
});
}
} catch (err) {
console.error('Error loading forum data:', err);
setError('Failed to load forum data. Please try again later.');
toast({ title: 'Connection error', description: 'Failed to connect to Waku network. Please try refreshing.', variant: 'destructive' });
toast({ title: 'Load error', description: 'Failed to load forum data. Please try refreshing.', variant: 'destructive' });
}
};

View File

@ -13,6 +13,7 @@ export enum MessageType {
* Base interface for all message types
*/
export interface BaseMessage {
id: string;
type: MessageType;
timestamp: number | Date;
author: string;
@ -25,7 +26,6 @@ export interface BaseMessage {
*/
export interface CellMessage extends BaseMessage {
type: MessageType.CELL;
id: string;
name: string;
description: string;
icon?: string;
@ -36,7 +36,6 @@ export interface CellMessage extends BaseMessage {
*/
export interface PostMessage extends BaseMessage {
type: MessageType.POST;
id: string;
cellId: string;
title: string;
content: string;
@ -47,7 +46,6 @@ export interface PostMessage extends BaseMessage {
*/
export interface CommentMessage extends BaseMessage {
type: MessageType.COMMENT;
id: string;
postId: string;
content: string;
}
@ -57,7 +55,6 @@ export interface CommentMessage extends BaseMessage {
*/
export interface VoteMessage extends BaseMessage {
type: MessageType.VOTE;
id: string;
targetId: string; // ID of the post or comment being voted on
value: 1 | -1;
}