From c64bcff475c845ba91aaa62cc0be816a92671249 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 29 Aug 2025 17:08:04 +0530 Subject: [PATCH] chore(lib/waku): improve structuring --- src/lib/forum/__tests__/relevance.test.ts | 2 +- src/lib/forum/actions.ts | 2 +- src/lib/forum/relevance.ts | 2 +- src/lib/forum/transformers.ts | 2 +- src/lib/waku/{codec.ts => CodecManager.ts} | 4 +- src/lib/waku/constants.ts | 2 +- src/lib/waku/core/ReliableMessaging.ts | 111 ++++++++ src/lib/waku/core/WakuNodeManager.ts | 85 ++++++ src/lib/waku/index.ts | 303 +++++++-------------- src/lib/waku/reliable_channel.ts | 135 --------- src/lib/waku/services/CacheService.ts | 85 ++++++ src/lib/waku/services/MessageService.ts | 80 ++++++ src/types/forum.ts | 2 +- src/{lib/waku/types.ts => types/waku.ts} | 0 14 files changed, 462 insertions(+), 353 deletions(-) rename src/lib/waku/{codec.ts => CodecManager.ts} (97%) create mode 100644 src/lib/waku/core/ReliableMessaging.ts create mode 100644 src/lib/waku/core/WakuNodeManager.ts delete mode 100644 src/lib/waku/reliable_channel.ts create mode 100644 src/lib/waku/services/CacheService.ts create mode 100644 src/lib/waku/services/MessageService.ts rename src/{lib/waku/types.ts => types/waku.ts} (100%) diff --git a/src/lib/forum/__tests__/relevance.test.ts b/src/lib/forum/__tests__/relevance.test.ts index 369119d..4996421 100644 --- a/src/lib/forum/__tests__/relevance.test.ts +++ b/src/lib/forum/__tests__/relevance.test.ts @@ -1,6 +1,6 @@ import { RelevanceCalculator } from '../relevance'; import { Post, Comment, User, UserVerificationStatus, EVerificationStatus } from '@/types/forum'; -import { VoteMessage, MessageType } from '@/lib/waku/types'; +import { VoteMessage, MessageType } from '@/types/waku'; import { expect, describe, beforeEach, it } from 'vitest'; describe('RelevanceCalculator', () => { diff --git a/src/lib/forum/actions.ts b/src/lib/forum/actions.ts index dda9058..3c692ac 100644 --- a/src/lib/forum/actions.ts +++ b/src/lib/forum/actions.ts @@ -6,7 +6,7 @@ import { PostMessage, VoteMessage, ModerateMessage, -} from '@/lib/waku/types'; +} from '@/types/waku'; import { Cell, Comment, Post, User } from '@/types/forum'; import { transformCell, transformComment, transformPost } from './transformers'; import { MessageService, AuthService, CryptoService } from '@/lib/services'; diff --git a/src/lib/forum/relevance.ts b/src/lib/forum/relevance.ts index bcdbc0e..8c8ab1c 100644 --- a/src/lib/forum/relevance.ts +++ b/src/lib/forum/relevance.ts @@ -1,5 +1,5 @@ import { Post, Comment, Cell, User, RelevanceScoreDetails, UserVerificationStatus } from '@/types/forum'; -import { VoteMessage } from '@/lib/waku/types'; +import { VoteMessage } from '@/types/waku'; export class RelevanceCalculator { private static readonly BASE_SCORES = { diff --git a/src/lib/forum/transformers.ts b/src/lib/forum/transformers.ts index f134151..2eb62bb 100644 --- a/src/lib/forum/transformers.ts +++ b/src/lib/forum/transformers.ts @@ -1,5 +1,5 @@ import { Cell, Post, Comment, OpchanMessage } from '@/types/forum'; -import { CellMessage, CommentMessage, PostMessage, VoteMessage } from '@/lib/waku/types'; +import { CellMessage, CommentMessage, PostMessage, VoteMessage } from '@/types/waku'; import messageManager from '@/lib/waku'; import { RelevanceCalculator } from './relevance'; import { UserVerificationStatus } from '@/types/forum'; diff --git a/src/lib/waku/codec.ts b/src/lib/waku/CodecManager.ts similarity index 97% rename from src/lib/waku/codec.ts rename to src/lib/waku/CodecManager.ts index 441419f..14d16a0 100644 --- a/src/lib/waku/codec.ts +++ b/src/lib/waku/CodecManager.ts @@ -4,13 +4,13 @@ import { IEncoder, LightNode, } from '@waku/sdk'; -import { MessageType } from './types'; +import { MessageType } from '../../types/waku'; import { CellMessage, PostMessage, CommentMessage, VoteMessage, -} from './types'; +} from '../../types/waku'; import { CONTENT_TOPICS } from './constants'; import { OpchanMessage } from '@/types/forum'; diff --git a/src/lib/waku/constants.ts b/src/lib/waku/constants.ts index b8907b0..6c37097 100644 --- a/src/lib/waku/constants.ts +++ b/src/lib/waku/constants.ts @@ -1,4 +1,4 @@ -import { MessageType } from "./types"; +import { MessageType } from "../../types/waku"; /** * Content topics for different message types diff --git a/src/lib/waku/core/ReliableMessaging.ts b/src/lib/waku/core/ReliableMessaging.ts new file mode 100644 index 0000000..9b28fbb --- /dev/null +++ b/src/lib/waku/core/ReliableMessaging.ts @@ -0,0 +1,111 @@ +import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk"; +import { MessageType } from "../../../types/waku"; +import { CodecManager } from "../CodecManager"; +import { generateStringId } from "@/lib/utils"; +import { OpchanMessage } from "@/types/forum"; + +export interface MessageStatusCallback { + onSent?: (messageId: string) => void; + onAcknowledged?: (messageId: string) => void; + onError?: (messageId: string, error: string) => void; +} + +export type IncomingMessageCallback = (message: OpchanMessage) => void; + +export class ReliableMessaging { + private channels: Map> = new Map(); + private messageCallbacks: Map = new Map(); + private incomingMessageCallbacks: Set = new Set(); + private codecManager: CodecManager; + + constructor(node: LightNode) { + this.codecManager = new CodecManager(node); + this.initializeChannels(node); + } + + private async initializeChannels(node: LightNode): Promise { + for (const type of Object.values(MessageType)) { + const encoder = this.codecManager.getEncoder(type); + const decoder = this.codecManager.getDecoder(type); + const senderId = generateStringId(); + const channelId = `opchan-${type}`; + + try { + const channel = await ReliableChannel.create(node, channelId, senderId, encoder, decoder); + this.channels.set(type, channel); + this.setupChannelListeners(channel, type); + } catch (error) { + console.error(`Failed to create reliable channel for ${type}:`, error); + } + } + } + + private setupChannelListeners(channel: ReliableChannel, type: MessageType): void { + channel.addEventListener(ReliableChannelEvent.InMessageReceived, (event) => { + try { + const wakuMessage = event.detail; + if (wakuMessage.payload) { + const opchanMessage = this.codecManager.decodeMessage(wakuMessage.payload); + this.incomingMessageCallbacks.forEach(callback => callback(opchanMessage)); + } + } catch (error) { + console.error(`Failed to process incoming message for ${type}:`, error); + } + }); + + channel.addEventListener(ReliableChannelEvent.OutMessageSent, (event) => { + const messageId = event.detail; + this.messageCallbacks.get(messageId)?.onSent?.(messageId); + }); + + channel.addEventListener(ReliableChannelEvent.OutMessageAcknowledged, (event) => { + const messageId = event.detail; + this.messageCallbacks.get(messageId)?.onAcknowledged?.(messageId); + }); + + channel.addEventListener(ReliableChannelEvent.OutMessageIrrecoverableError, (event) => { + const messageId = event.detail.messageId; + const error = event.detail.error; + const callback = this.messageCallbacks.get(messageId); + + if (callback?.onError) { + callback.onError(messageId, error?.toString() || 'Unknown error'); + } + + this.messageCallbacks.delete(messageId); + }); + } + + public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise { + const channel = this.channels.get(message.type); + if (!channel) { + throw new Error(`No reliable channel for message type: ${message.type}`); + } + + const encodedMessage = this.codecManager.encodeMessage(message); + const messageId = ReliableChannel.getMessageId(encodedMessage); + + if (statusCallback) { + this.messageCallbacks.set(messageId, statusCallback); + } + + try { + await channel.send(encodedMessage); + return messageId; + } catch (error) { + this.messageCallbacks.delete(messageId); + throw error; + } + } + + public onMessage(callback: IncomingMessageCallback): () => void { + this.incomingMessageCallbacks.add(callback); + return () => this.incomingMessageCallbacks.delete(callback); + } + + public cleanup(): void { + this.messageCallbacks.clear(); + this.incomingMessageCallbacks.clear(); + this.channels.clear(); + } +} diff --git a/src/lib/waku/core/WakuNodeManager.ts b/src/lib/waku/core/WakuNodeManager.ts new file mode 100644 index 0000000..5430b2c --- /dev/null +++ b/src/lib/waku/core/WakuNodeManager.ts @@ -0,0 +1,85 @@ +import { createLightNode, LightNode, WakuEvent, HealthStatus } from "@waku/sdk"; + +export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void; + +export class WakuNodeManager { + private node: LightNode | null = null; + private _isReady: boolean = false; + private _currentHealth: HealthStatus = HealthStatus.Unhealthy; + private healthListeners: Set = new Set(); + + public static async create(): Promise { + const manager = new WakuNodeManager(); + await manager.initialize(); + return manager; + } + + private async initialize(): Promise { + this.node = await createLightNode({ + defaultBootstrap: true, + autoStart: true, + }); + this.setupHealthMonitoring(); + } + + private setupHealthMonitoring(): void { + if (!this.node) return; + + this.node.events.addEventListener(WakuEvent.Health, (event) => { + const health = event.detail; + this._currentHealth = health; + + console.log(`Waku health status: ${health}`); + + const wasReady = this._isReady; + this._isReady = health === HealthStatus.SufficientlyHealthy || health === HealthStatus.MinimallyHealthy; + + if (wasReady !== this._isReady) { + this.notifyHealthChange(); + } + }); + } + + private notifyHealthChange(): void { + this.healthListeners.forEach(listener => listener(this._isReady, this._currentHealth)); + } + + public getNode(): LightNode { + if (!this.node) { + throw new Error("Node not initialized"); + } + return this.node; + } + + public async stop(): Promise { + this.healthListeners.clear(); + if (this.node) { + await this.node.stop(); + this.node = null; + } + } + + public get isInitialized(): boolean { + return this.node !== null; + } + + public get isReady(): boolean { + return this._isReady; + } + + public get currentHealth(): HealthStatus { + return this._currentHealth; + } + + public onHealthChange(callback: HealthChangeCallback): () => void { + this.healthListeners.add(callback); + + // Immediately call with current status + callback(this._isReady, this._currentHealth); + + // Return unsubscribe function + return () => { + this.healthListeners.delete(callback); + }; + } +} diff --git a/src/lib/waku/index.ts b/src/lib/waku/index.ts index 2202385..5a9d377 100644 --- a/src/lib/waku/index.ts +++ b/src/lib/waku/index.ts @@ -1,232 +1,115 @@ -//TODO: perhaps store all messages in an indexed DB? (helpful when Waku is down) -// with a `isPublished` flag to indicate if the message has been sent to the network - -import { createLightNode, LightNode, WakuEvent, HealthStatus } from "@waku/sdk"; -import { CommentCache, MessageType, VoteCache, ModerateMessage } from "./types"; -import { PostCache } from "./types"; -import { CellCache } from "./types"; +import { HealthStatus } from "@waku/sdk"; import { OpchanMessage } from "@/types/forum"; -import { ReliableMessageManager } from "./reliable_channel"; +import { WakuNodeManager, HealthChangeCallback } from "./core/WakuNodeManager"; +import { CacheService } from "./services/CacheService"; +import { MessageService, MessageStatusCallback } from "./services/MessageService"; +import { ReliableMessaging } from "./core/ReliableMessaging"; -export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void; +export type { HealthChangeCallback, MessageStatusCallback }; class MessageManager { - private node: LightNode; - private reliableMessageManager: ReliableMessageManager | null = null; - private _isReady: boolean = false; - private _currentHealth: HealthStatus = HealthStatus.Unhealthy; - private healthListeners: Set = new Set(); - private processedMessageIds: Set = new Set(); // Track processed message IDs + private nodeManager: WakuNodeManager | null = null; + private cacheService: CacheService; + private messageService: MessageService | null = null; + private reliableMessaging: ReliableMessaging | null = null; + constructor() { + this.cacheService = new CacheService(); + } - public readonly messageCache: { - cells: CellCache; - posts: PostCache; - comments: CommentCache; - votes: VoteCache; - moderations: { [targetId: string]: ModerateMessage }; - } = { - cells: {}, - posts: {}, - comments: {}, - votes: {}, - moderations: {} - } + public static async create(): Promise { + const manager = new MessageManager(); + await manager.initialize(); + return manager; + } - public static async create(): Promise { - const node = await createLightNode({ - defaultBootstrap: true, - autoStart: true, - }); - - return new MessageManager(node); - } - - public async stop() { - if (this.reliableMessageManager) { - this.reliableMessageManager.cleanup(); - this.reliableMessageManager = null; + private async initialize(): Promise { + try { + this.nodeManager = await WakuNodeManager.create(); + + // Now create message service with proper dependencies + this.messageService = new MessageService(this.cacheService, this.reliableMessaging, this.nodeManager); + + // Set up health-based reliable messaging initialization + this.nodeManager.onHealthChange((isReady) => { + if (isReady && !this.reliableMessaging) { + this.initializeReliableMessaging(); + } else if (!isReady && this.reliableMessaging) { + this.cleanupReliableMessaging(); } - - await this.node.stop(); - this.setIsReady(false); + }); + + } catch (error) { + console.error("Failed to initialize MessageManager:", error); + throw error; + } + } + + private async initializeReliableMessaging(): Promise { + if (!this.nodeManager || this.reliableMessaging) { + return; } - private constructor(node: LightNode) { - this.node = node; - this.setupHealthMonitoring(); + try { + console.log("Initializing reliable messaging..."); + this.reliableMessaging = new ReliableMessaging(this.nodeManager.getNode()); + this.messageService?.updateReliableMessaging(this.reliableMessaging); + console.log("Reliable messaging initialized successfully"); + } catch (error) { + console.error("Failed to initialize reliable messaging:", error); + } + } + + private cleanupReliableMessaging(): void { + if (this.reliableMessaging) { + console.log("Cleaning up reliable messaging due to health status"); + this.reliableMessaging.cleanup(); + this.reliableMessaging = null; + this.messageService?.updateReliableMessaging(null); } + } - /** - * Set up health monitoring using Waku's built-in health events - */ - private setupHealthMonitoring() { - this.node.events.addEventListener(WakuEvent.Health, (event) => { - const health = event.detail; - this._currentHealth = health; - - console.log(`Waku health status: ${health}`); - - if (health === HealthStatus.SufficientlyHealthy) { - console.log("Waku is sufficiently healthy - initializing reliable messaging"); - this.setIsReady(true); - this.initializeReliableManager(); - } else if (health === HealthStatus.MinimallyHealthy) { - console.log("Waku is minimally healthy - may have issues sending/receiving messages"); - this.setIsReady(true); - this.initializeReliableManager(); - } else { - console.log("Waku is unhealthy - disconnected from network"); - this.setIsReady(false); - this.cleanupReliableManager(); - } - }); + public async stop(): Promise { + this.cleanupReliableMessaging(); + this.messageService?.cleanup(); + await this.nodeManager?.stop(); + } + + public get isReady(): boolean { + return this.nodeManager?.isReady ?? false; + } + + public get currentHealth(): HealthStatus { + return this.nodeManager?.currentHealth ?? HealthStatus.Unhealthy; + } + + public onHealthChange(callback: HealthChangeCallback): () => void { + if (!this.nodeManager) { + throw new Error("Node manager not initialized"); } + return this.nodeManager.onHealthChange(callback); + } - private async initializeReliableManager() { - // Only initialize if not already initialized - if (this.reliableMessageManager) { - return; - } - - try { - this.reliableMessageManager = new ReliableMessageManager(this.node); - - // Set up listener for incoming reliable messages - this.reliableMessageManager.addIncomingMessageListener({ - onMessage: (message) => { - // Check if we've already processed this exact message - const messageKey = `${message.type}:${message.id}:${message.timestamp}`; - if (this.processedMessageIds.has(messageKey)) { - console.log(`Received message ${messageKey} but it has already been processed`); - return; - } - - this.processedMessageIds.add(messageKey); - this.updateCache(message); - } - }); - - console.log("Reliable message manager initialized successfully"); - } catch (error) { - console.error("Failed to initialize reliable message manager:", error); - } + public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise { + if (!this.messageService) { + throw new Error("MessageManager not fully initialized"); } + return this.messageService.sendMessage(message, statusCallback); + } - private cleanupReliableManager() { - if (this.reliableMessageManager) { - console.log("Cleaning up reliable message manager due to health status"); - this.reliableMessageManager.cleanup(); - this.reliableMessageManager = null; - } + public onMessageReceived(callback: (message: OpchanMessage) => void): () => void { + if (!this.messageService) { + throw new Error("MessageManager not fully initialized"); } + return this.messageService.onMessageReceived(callback); + } - private setIsReady(isReady: boolean) { - if (this._isReady !== isReady) { - this._isReady = isReady; - // Notify all health listeners with both ready state and health status - this.healthListeners.forEach(listener => listener(isReady, this._currentHealth)); - } - } - - /** - * Returns whether the node is currently healthy and ready for use - */ - public get isReady(): boolean { - return this._isReady; - } - - /** - * Returns the current Waku health status - */ - public get currentHealth(): HealthStatus { - return this._currentHealth; - } - - /** - * Subscribe to health status changes - * @param callback Function to call when health status changes - * @returns Function to unsubscribe - */ - public onHealthChange(callback: HealthChangeCallback): () => void { - this.healthListeners.add(callback); - - // Immediately call with current status - callback(this._isReady, this._currentHealth); - - // Return unsubscribe function - return () => { - this.healthListeners.delete(callback); - }; - } - - - - public async sendMessage(message: OpchanMessage) { - if (!this.reliableMessageManager) { - throw new Error("Reliable message manager not initialized"); - } - - // Track this message as processed (optimistic) - const messageKey = `${message.type}:${message.id}:${message.timestamp}`; - this.processedMessageIds.add(messageKey); - - // Use reliable channel with status tracking - const messageId = await this.reliableMessageManager.sendMessage(message, { - onSent: (id) => console.log(`Message ${id} sent`), - onAcknowledged: (id) => console.log(`Message ${id} acknowledged`), - onError: (id, error) => console.error(`Message ${id} failed:`, error) - }); - - // Update local cache immediately for optimistic UI - this.updateCache(message); - - return messageId; - } - - private updateCache(message: OpchanMessage) { - switch (message.type) { - case MessageType.CELL: - if (!this.messageCache.cells[message.id] || - this.messageCache.cells[message.id].timestamp !== message.timestamp) { - this.messageCache.cells[message.id] = message; - } - break; - case MessageType.POST: - if (!this.messageCache.posts[message.id] || - this.messageCache.posts[message.id].timestamp !== message.timestamp) { - this.messageCache.posts[message.id] = message; - } - break; - case MessageType.COMMENT: - if (!this.messageCache.comments[message.id] || - this.messageCache.comments[message.id].timestamp !== message.timestamp) { - this.messageCache.comments[message.id] = message; - } - break; - case MessageType.VOTE: { - // For votes, we use a composite key of targetId + author to handle multiple votes from same user - const voteKey = `${message.targetId}:${message.author}`; - if (!this.messageCache.votes[voteKey] || - this.messageCache.votes[voteKey].timestamp !== message.timestamp) { - this.messageCache.votes[voteKey] = message; - } - break; - } - case MessageType.MODERATE: { - // Type guard for ModerateMessage - const modMsg = message as ModerateMessage; - if (!this.messageCache.moderations[modMsg.targetId] || - this.messageCache.moderations[modMsg.targetId].timestamp !== modMsg.timestamp) { - this.messageCache.moderations[modMsg.targetId] = modMsg; - } - break; - } - default: - console.warn("Received message with unknown type"); - break; - } + public get messageCache() { + if (!this.messageService) { + throw new Error("MessageManager not fully initialized"); } + return this.messageService.messageCache; + } } const messageManager = await MessageManager.create(); diff --git a/src/lib/waku/reliable_channel.ts b/src/lib/waku/reliable_channel.ts deleted file mode 100644 index 6348a62..0000000 --- a/src/lib/waku/reliable_channel.ts +++ /dev/null @@ -1,135 +0,0 @@ -import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk"; -import { MessageType } from "./types"; -import { CodecManager } from "./codec"; -import { generateStringId } from "@/lib/utils"; -import { OpchanMessage } from "@/types/forum"; - -export interface MessageStatusCallback { - onSent?: (messageId: string) => void; - onAcknowledged?: (messageId: string) => void; - onError?: (messageId: string, error: string) => void; -} - -export interface IncomingMessageCallback { - onMessage: (message: OpchanMessage) => void; -} - -export class ReliableMessageManager { - private channels: Map> = new Map(); - private messageCallbacks: Map = new Map(); - private incomingMessageCallbacks: IncomingMessageCallback[] = []; - private codecManager: CodecManager; - - constructor(node: LightNode) { - this.codecManager = new CodecManager(node); - this.initializeChannels(node); - } - - private async initializeChannels(node: LightNode) { - for (const type of Object.values(MessageType)) { - const encoder = this.codecManager.getEncoder(type); - const decoder = this.codecManager.getDecoder(type); - const senderId = generateStringId(); - const channelId = `opchan-${type}`; // Unique channel ID for each message type - - try { - const channel = await ReliableChannel.create(node, channelId, senderId, encoder, decoder); - this.channels.set(type, channel); - this.setupChannelListeners(channel, type); - } catch (error) { - console.error(`Failed to create reliable channel for ${type}:`, error); - } - } - } - - private setupChannelListeners(channel: ReliableChannel, type: MessageType) { - channel.addEventListener(ReliableChannelEvent.InMessageReceived, (event) => { - try { - const wakuMessage = event.detail; - if (wakuMessage.payload) { - const opchanMessage = this.codecManager.decodeMessage(wakuMessage.payload); - - - this.incomingMessageCallbacks.forEach(callback => { - callback.onMessage(opchanMessage); - }); - } - } catch (error) { - console.error(`Failed to process incoming message for ${type}:`, error); - } - }); - - // Listen for outgoing message status updates - channel.addEventListener(ReliableChannelEvent.OutMessageSent, (event) => { - const messageId = event.detail; - const callback = this.messageCallbacks.get(messageId); - if (callback?.onSent) { - callback.onSent(messageId); - } - }); - - channel.addEventListener(ReliableChannelEvent.OutMessageAcknowledged, (event) => { - const messageId = event.detail; - const callback = this.messageCallbacks.get(messageId); - if (callback?.onAcknowledged) { - callback.onAcknowledged(messageId); - } - }); - - channel.addEventListener(ReliableChannelEvent.OutMessageIrrecoverableError, (event) => { - const messageId = event.detail.messageId; - const error = event.detail.error; - const callback = this.messageCallbacks.get(messageId); - if (callback?.onError) { - callback.onError(messageId, error?.toString() || 'Unknown error'); - } - // Clean up callback after error - this.messageCallbacks.delete(messageId); - }); - } - - public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise { - const channel = this.channels.get(message.type); - if (!channel) { - throw new Error(`No reliable channel for message type: ${message.type}`); - } - - const encodedMessage = this.codecManager.encodeMessage(message); - const messageId = ReliableChannel.getMessageId(encodedMessage); - - // Store callback for this message - if (statusCallback) { - this.messageCallbacks.set(messageId, statusCallback); - } - - try { - await channel.send(encodedMessage); - return messageId; - } catch (error) { - // Clean up callback on immediate send failure - this.messageCallbacks.delete(messageId); - throw error; - } - } - - public addIncomingMessageListener(callback: IncomingMessageCallback) { - this.incomingMessageCallbacks.push(callback); - } - - public removeIncomingMessageListener(callback: IncomingMessageCallback) { - const index = this.incomingMessageCallbacks.indexOf(callback); - if (index > -1) { - this.incomingMessageCallbacks.splice(index, 1); - } - } - - public getChannelStatus(type: MessageType): boolean { - return this.channels.has(type); - } - - public cleanup() { - this.messageCallbacks.clear(); - this.incomingMessageCallbacks.length = 0; - this.channels.clear(); - } -} \ No newline at end of file diff --git a/src/lib/waku/services/CacheService.ts b/src/lib/waku/services/CacheService.ts new file mode 100644 index 0000000..ebaf5cd --- /dev/null +++ b/src/lib/waku/services/CacheService.ts @@ -0,0 +1,85 @@ +import { MessageType, CellCache, PostCache, CommentCache, VoteCache, ModerateMessage } from "../../../types/waku"; +import { OpchanMessage } from "@/types/forum"; + +export interface MessageCache { + cells: CellCache; + posts: PostCache; + comments: CommentCache; + votes: VoteCache; + moderations: { [targetId: string]: ModerateMessage }; +} + +export class CacheService { + private processedMessageIds: Set = new Set(); + + public readonly cache: MessageCache = { + cells: {}, + posts: {}, + comments: {}, + votes: {}, + moderations: {} + }; + + public updateCache(message: OpchanMessage): boolean { + // Check if we've already processed this exact message + const messageKey = `${message.type}:${message.id}:${message.timestamp}`; + if (this.processedMessageIds.has(messageKey)) { + return false; // Already processed + } + + this.processedMessageIds.add(messageKey); + this.storeMessage(message); + return true; // Newly processed + } + + private storeMessage(message: OpchanMessage): void { + switch (message.type) { + case MessageType.CELL: + if (!this.cache.cells[message.id] || + this.cache.cells[message.id].timestamp !== message.timestamp) { + this.cache.cells[message.id] = message; + } + break; + case MessageType.POST: + if (!this.cache.posts[message.id] || + this.cache.posts[message.id].timestamp !== message.timestamp) { + this.cache.posts[message.id] = message; + } + break; + case MessageType.COMMENT: + if (!this.cache.comments[message.id] || + this.cache.comments[message.id].timestamp !== message.timestamp) { + this.cache.comments[message.id] = message; + } + break; + case MessageType.VOTE: { + const voteKey = `${message.targetId}:${message.author}`; + if (!this.cache.votes[voteKey] || + this.cache.votes[voteKey].timestamp !== message.timestamp) { + this.cache.votes[voteKey] = message; + } + break; + } + case MessageType.MODERATE: { + const modMsg = message as ModerateMessage; + if (!this.cache.moderations[modMsg.targetId] || + this.cache.moderations[modMsg.targetId].timestamp !== modMsg.timestamp) { + this.cache.moderations[modMsg.targetId] = modMsg; + } + break; + } + default: + console.warn("Received message with unknown type"); + break; + } + } + + public clear(): void { + this.processedMessageIds.clear(); + this.cache.cells = {}; + this.cache.posts = {}; + this.cache.comments = {}; + this.cache.votes = {}; + this.cache.moderations = {}; + } +} diff --git a/src/lib/waku/services/MessageService.ts b/src/lib/waku/services/MessageService.ts new file mode 100644 index 0000000..daff2a0 --- /dev/null +++ b/src/lib/waku/services/MessageService.ts @@ -0,0 +1,80 @@ +import { OpchanMessage } from "@/types/forum"; +import { CacheService } from "./CacheService"; +import { ReliableMessaging, MessageStatusCallback } from "../core/ReliableMessaging"; +import { WakuNodeManager } from "../core/WakuNodeManager"; + +export type MessageReceivedCallback = (message: OpchanMessage) => void; +export type { MessageStatusCallback }; + +export class MessageService { + private messageReceivedCallbacks: Set = new Set(); + + constructor( + private cacheService: CacheService, + private reliableMessaging: ReliableMessaging | null, + private nodeManager: WakuNodeManager + ) { + this.setupMessageHandling(); + } + + private setupMessageHandling(): void { + if (this.reliableMessaging) { + this.reliableMessaging.onMessage((message) => { + const isNew = this.cacheService.updateCache(message); + if (isNew) { + this.messageReceivedCallbacks.forEach(callback => callback(message)); + } + }); + } + } + + public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise { + if (!this.reliableMessaging) { + throw new Error("Reliable messaging not initialized"); + } + + if (!this.nodeManager.isReady) { + throw new Error("Network not ready"); + } + + // Update cache optimistically + this.cacheService.updateCache(message); + + // Send via reliable messaging with status tracking + const messageId = await this.reliableMessaging.sendMessage(message, { + onSent: (id) => { + console.log(`Message ${id} sent`); + statusCallback?.onSent?.(id); + }, + onAcknowledged: (id) => { + console.log(`Message ${id} acknowledged`); + statusCallback?.onAcknowledged?.(id); + }, + onError: (id, error) => { + console.error(`Message ${id} failed:`, error); + statusCallback?.onError?.(id, error); + } + }); + + return messageId; + } + + public onMessageReceived(callback: MessageReceivedCallback): () => void { + this.messageReceivedCallbacks.add(callback); + return () => this.messageReceivedCallbacks.delete(callback); + } + + public updateReliableMessaging(reliableMessaging: ReliableMessaging | null): void { + this.reliableMessaging = reliableMessaging; + this.setupMessageHandling(); + } + + public get messageCache() { + return this.cacheService.cache; + } + + public cleanup(): void { + this.messageReceivedCallbacks.clear(); + this.reliableMessaging?.cleanup(); + } +} diff --git a/src/types/forum.ts b/src/types/forum.ts index e8b2312..8e48cc6 100644 --- a/src/types/forum.ts +++ b/src/types/forum.ts @@ -1,4 +1,4 @@ -import { CellMessage, CommentMessage, PostMessage, VoteMessage, ModerateMessage } from "@/lib/waku/types"; +import { CellMessage, CommentMessage, PostMessage, VoteMessage, ModerateMessage } from "@/types/waku"; export type OpchanMessage = CellMessage | PostMessage | CommentMessage | VoteMessage | ModerateMessage; diff --git a/src/lib/waku/types.ts b/src/types/waku.ts similarity index 100% rename from src/lib/waku/types.ts rename to src/types/waku.ts