diff --git a/src/lib/services/AuthService.ts b/src/lib/services/AuthService.ts index 34802e9..d64e0e4 100644 --- a/src/lib/services/AuthService.ts +++ b/src/lib/services/AuthService.ts @@ -1,10 +1,10 @@ -import { WalletService } from '../wallets/index'; +import { WalletService } from '../identity/wallets/index'; import { UseAppKitAccountReturn } from '@reown/appkit/react'; import { AppKit } from '@reown/appkit'; -import { OrdinalAPI } from '../ordinal'; +import { OrdinalAPI } from '../identity/ordinal'; import { CryptoService, DelegationDuration } from './CryptoService'; import { EVerificationStatus, User } from '@/types/forum'; -import { WalletInfo } from '../wallets/ReOwnWalletService'; +import { WalletInfo } from '../identity/wallets/ReOwnWalletService'; export interface AuthResult { success: boolean; @@ -31,7 +31,7 @@ export interface AuthServiceInterface { clearStoredUser(): void; // Wallet info - getWalletInfo(): Promise; + getWalletInfo(): Promise; } export class AuthService implements AuthServiceInterface { @@ -152,8 +152,6 @@ export class AuthService implements AuthServiceInterface { async disconnectWallet(): Promise { // Clear any existing delegations when disconnecting this.cryptoService.clearDelegation(); - this.walletService.clearDelegation('bitcoin'); - this.walletService.clearDelegation('ethereum'); // Clear stored user data this.clearStoredUser(); @@ -298,7 +296,7 @@ export class AuthService implements AuthServiceInterface { /** * Get current wallet info */ - async getWalletInfo() { + async getWalletInfo(): Promise { // Use the wallet service to get detailed wallet info including ENS return await this.walletService.getWalletInfo(); } diff --git a/src/lib/waku/codec.ts b/src/lib/waku/codec.ts index ae84f12..bde1e28 100644 --- a/src/lib/waku/codec.ts +++ b/src/lib/waku/codec.ts @@ -4,50 +4,38 @@ import { CellMessage, PostMessage, CommentMessage, VoteMessage } from './types' import { CONTENT_TOPICS, NETWORK_CONFIG } from './constants'; import { OpchanMessage } from '@/types/forum'; +// Create the sharded pubsub topic +const PUBSUB_TOPIC = `/waku/2/rs/${NETWORK_CONFIG.clusterId}/0`; + export const encoders = { [MessageType.CELL]: createEncoder({ - contentTopic: CONTENT_TOPICS['cell'], - pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0} + contentTopic: CONTENT_TOPICS[MessageType.CELL], + routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC } }), [MessageType.POST]: createEncoder({ - contentTopic: CONTENT_TOPICS['post'], - pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0} + contentTopic: CONTENT_TOPICS[MessageType.POST], + routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC } }), [MessageType.COMMENT]: createEncoder({ - contentTopic: CONTENT_TOPICS['comment'], - pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0} + contentTopic: CONTENT_TOPICS[MessageType.COMMENT], + routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC } }), [MessageType.VOTE]: createEncoder({ - contentTopic: CONTENT_TOPICS['vote'], - pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0} + contentTopic: CONTENT_TOPICS[MessageType.VOTE], + routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC } }), [MessageType.MODERATE]: createEncoder({ - contentTopic: CONTENT_TOPICS['moderate'], - pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0} + contentTopic: CONTENT_TOPICS[MessageType.MODERATE], + routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC } }) } export const decoders = { - [MessageType.CELL]: createDecoder(CONTENT_TOPICS['cell'], { - clusterId: NETWORK_CONFIG.clusterId, - shard: 0 - }), - [MessageType.POST]: createDecoder(CONTENT_TOPICS['post'], { - clusterId: NETWORK_CONFIG.clusterId, - shard: 0 - }), - [MessageType.COMMENT]: createDecoder(CONTENT_TOPICS['comment'], { - clusterId: NETWORK_CONFIG.clusterId, - shard: 0 - }), - [MessageType.VOTE]: createDecoder(CONTENT_TOPICS['vote'], { - clusterId: NETWORK_CONFIG.clusterId, - shard: 0 - }), - [MessageType.MODERATE]: createDecoder(CONTENT_TOPICS['moderate'], { - clusterId: NETWORK_CONFIG.clusterId, - shard: 0 - }) + [MessageType.CELL]: createDecoder(CONTENT_TOPICS[MessageType.CELL], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }), + [MessageType.POST]: createDecoder(CONTENT_TOPICS[MessageType.POST], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }), + [MessageType.COMMENT]: createDecoder(CONTENT_TOPICS[MessageType.COMMENT], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }), + [MessageType.VOTE]: createDecoder(CONTENT_TOPICS[MessageType.VOTE], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }), + [MessageType.MODERATE]: createDecoder(CONTENT_TOPICS[MessageType.MODERATE], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }) } /** diff --git a/src/lib/waku/index.ts b/src/lib/waku/index.ts index a32e05c..ec9318d 100644 --- a/src/lib/waku/index.ts +++ b/src/lib/waku/index.ts @@ -1,25 +1,24 @@ //TODO: perhaps store all messages in an indexed DB? (helpful when Waku is down) // with a `isPublished` flag to indicate if the message has been sent to the network -import { createLightNode, LightNode } from "@waku/sdk"; +import { createLightNode, LightNode, WakuEvent, HealthStatus } from "@waku/sdk"; import StoreManager from "./store"; import { CommentCache, MessageType, VoteCache, ModerateMessage } from "./types"; import { PostCache } from "./types"; import { CellCache } from "./types"; import { OpchanMessage } from "@/types/forum"; -import { EphemeralProtocolsManager } from "./lightpush_filter"; import { NETWORK_CONFIG } from "./constants"; +import { ReliableMessageManager } from "./reliable_channel"; -export type HealthChangeCallback = (isReady: boolean) => void; +export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void; class MessageManager { private node: LightNode; - //TODO: implement SDS? - private ephemeralProtocolsManager: EphemeralProtocolsManager; + private reliableMessageManager: ReliableMessageManager | null = null; private storeManager: StoreManager; private _isReady: boolean = false; + private _currentHealth: HealthStatus = HealthStatus.Unhealthy; private healthListeners: Set = new Set(); - private peerCheckInterval: NodeJS.Timeout | null = null; public readonly messageCache: { @@ -48,55 +47,84 @@ class MessageManager { } public async stop() { - if (this.peerCheckInterval) { - clearInterval(this.peerCheckInterval); - this.peerCheckInterval = null; + if (this.reliableMessageManager) { + this.reliableMessageManager.cleanup(); + this.reliableMessageManager = null; } + await this.node.stop(); this.setIsReady(false); } private constructor(node: LightNode) { this.node = node; - this.ephemeralProtocolsManager = new EphemeralProtocolsManager(node); this.storeManager = new StoreManager(node); - // Start peer monitoring - this.startPeerMonitoring(); + this.setupHealthMonitoring(); } /** - * Start monitoring connected peers to determine node health - * Runs every 1 second to check if we have at least one peer + * Set up health monitoring using Waku's built-in health events */ - private startPeerMonitoring() { - // Initial peer check - this.checkPeers(); - - // Regular peer checking - this.peerCheckInterval = setInterval(() => { - this.checkPeers(); - }, 1000); + private setupHealthMonitoring() { + this.node.events.addEventListener(WakuEvent.Health, (event) => { + const health = event.detail; + this._currentHealth = health; + + console.log(`Waku health status: ${health}`); + + if (health === HealthStatus.SufficientlyHealthy) { + console.log("Waku is sufficiently healthy - initializing reliable messaging"); + this.setIsReady(true); + this.initializeReliableManager(); + } else if (health === HealthStatus.MinimallyHealthy) { + console.log("Waku is minimally healthy - may have issues sending/receiving messages"); + this.setIsReady(true); + this.initializeReliableManager(); + } else { + console.log("Waku is unhealthy - disconnected from network"); + this.setIsReady(false); + this.cleanupReliableManager(); + } + }); } - - /** - * Check if we have connected peers and update ready state - */ - private async checkPeers() { + + private async initializeReliableManager() { + // Only initialize if not already initialized + if (this.reliableMessageManager) { + return; + } + try { - const peers = await this.node.getConnectedPeers(); - this.setIsReady(peers.length >= 1); - } catch (err) { - console.error("Error checking peers:", err); - this.setIsReady(false); + this.reliableMessageManager = new ReliableMessageManager(this.node); + + // Set up listener for incoming reliable messages + this.reliableMessageManager.addIncomingMessageListener({ + onMessage: (message) => { + console.log("Received reliable message:", message); + this.updateCache(message); + } + }); + + console.log("Reliable message manager initialized successfully"); + } catch (error) { + console.error("Failed to initialize reliable message manager:", error); + } + } + + private cleanupReliableManager() { + if (this.reliableMessageManager) { + console.log("Cleaning up reliable message manager due to health status"); + this.reliableMessageManager.cleanup(); + this.reliableMessageManager = null; } } private setIsReady(isReady: boolean) { if (this._isReady !== isReady) { this._isReady = isReady; - // Notify all health listeners - this.healthListeners.forEach(listener => listener(isReady)); + // Notify all health listeners with both ready state and health status + this.healthListeners.forEach(listener => listener(isReady, this._currentHealth)); } } @@ -107,6 +135,13 @@ class MessageManager { return this._isReady; } + /** + * Returns the current Waku health status + */ + public get currentHealth(): HealthStatus { + return this._currentHealth; + } + /** * Subscribe to health status changes * @param callback Function to call when health status changes @@ -116,7 +151,7 @@ class MessageManager { this.healthListeners.add(callback); // Immediately call with current status - callback(this._isReady); + callback(this._isReady, this._currentHealth); // Return unsubscribe function return () => { @@ -125,31 +160,28 @@ class MessageManager { } /** - * Waits for the node to connect to at least one peer + * Waits for the node to achieve at least minimally healthy status * @param timeoutMs Maximum time to wait in milliseconds - * @returns Promise that resolves when connected or rejects on timeout + * @returns Promise that resolves when healthy or rejects on timeout */ public async waitForRemotePeer(timeoutMs: number = 15000): Promise { if (this._isReady) return true; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { - reject(new Error(`Timed out waiting for remote peer after ${timeoutMs}ms`)); + reject(new Error(`Timed out waiting for healthy network connection after ${timeoutMs}ms`)); }, timeoutMs); - const checkHandler = (isReady: boolean) => { - if (isReady) { + const checkHandler = (isReady: boolean, health: HealthStatus) => { + if (isReady && (health === HealthStatus.MinimallyHealthy || health === HealthStatus.SufficientlyHealthy)) { clearTimeout(timeout); this.healthListeners.delete(checkHandler); resolve(true); } }; - // Add temporary listener for peer connection + // Add temporary listener for health status this.healthListeners.add(checkHandler); - - // Also do an immediate check in case we already have peers - this.checkPeers(); }); } @@ -165,19 +197,23 @@ class MessageManager { } public async sendMessage(message: OpchanMessage) { - await this.ephemeralProtocolsManager.sendMessage(message); - //TODO: should we update the cache here? or just from store/filter? - this.updateCache(message); - } - - public async subscribeToMessages(types: MessageType[] = [MessageType.CELL, MessageType.POST, MessageType.COMMENT, MessageType.VOTE, MessageType.MODERATE]) { - const { result, subscription } = await this.ephemeralProtocolsManager.subscribeToMessages(types); - - for (const message of result) { - this.updateCache(message); + if (!this.reliableMessageManager) { + throw new Error("Reliable message manager not initialized"); } + + // Use reliable channel with status tracking + const messageId = await this.reliableMessageManager.sendMessage(message, { + onSent: (id) => console.log(`Message ${id} sent ✓`), + onAcknowledged: (id) => console.log(`Message ${id} acknowledged ✓✓`), + onError: (id, error) => console.error(`Message ${id} failed:`, error) + }); - return { messages: result, subscription }; + console.log(`Sent reliable message with ID: ${messageId}`); + + // Update local cache immediately for optimistic UI + this.updateCache(message); + + return messageId; } private updateCache(message: OpchanMessage) { diff --git a/src/lib/waku/lightpush_filter.ts b/src/lib/waku/lightpush_filter.ts deleted file mode 100644 index 8fb28d7..0000000 --- a/src/lib/waku/lightpush_filter.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { LightNode } from "@waku/sdk"; -import { CellMessage, CommentMessage, MessageType, PostMessage, VoteMessage, ModerateMessage } from "./types"; -import { OpchanMessage } from "@/types/forum"; -import { encodeMessage, encoders, decoders, decodeMessage } from "./codec"; - -export class EphemeralProtocolsManager { - private node: LightNode; - - constructor(node: LightNode) { - this.node = node; - } - - public async sendMessage(message: OpchanMessage) { - const encodedMessage = encodeMessage(message); - const result = await this.node.lightPush.send(encoders[message.type], { - payload: encodedMessage - }); - return result; - } - - public async subscribeToMessages(types: MessageType[]) { - const result: (CellMessage | PostMessage | CommentMessage | VoteMessage | ModerateMessage)[] = []; - - const subscription = await this.node.filter.subscribe(Object.values(decoders), async (message) => { - const {payload} = message; - - const decodedMessage = decodeMessage(payload); - if (types.includes(decodedMessage.type)) { - result.push(decodedMessage); - } - }); - - if (subscription.error) { - throw new Error(subscription.error); - } - - if (subscription.results.successes.length === 0) { - throw new Error("No successes"); - } - - return {result, subscription}; - } -} \ No newline at end of file diff --git a/src/lib/waku/network.ts b/src/lib/waku/network.ts index 0dcaa33..a974af8 100644 --- a/src/lib/waku/network.ts +++ b/src/lib/waku/network.ts @@ -1,4 +1,5 @@ import messageManager from '@/lib/waku'; +import { HealthStatus } from '@waku/sdk'; export type ToastFunction = (props: { title: string; @@ -44,8 +45,7 @@ export const initializeNetwork = async ( toast({ title: 'Connection timeout', description: 'Could not connect to any peers. Some features may be unavailable.', variant: 'destructive' }); console.warn('Timeout connecting to peer:', err); } - await messageManager.queryStore(); - await messageManager.subscribeToMessages(); + // await messageManager.queryStore(); updateStateFromCache(); } catch (err) { console.error('Error loading forum data:', err); @@ -81,10 +81,13 @@ export const monitorNetworkHealth = ( toast: ToastFunction, ): { unsubscribe: () => void } => { setIsNetworkConnected(messageManager.isReady); - const unsubscribe = messageManager.onHealthChange((isReady) => { + const unsubscribe = messageManager.onHealthChange((isReady, health) => { setIsNetworkConnected(isReady); - if (isReady) { - toast({ title: 'Network connected', description: 'Connected to the Waku network' }); + + if (health === HealthStatus.SufficientlyHealthy) { + toast({ title: 'Network connected', description: 'Connected to the Waku network with excellent connectivity' }); + } else if (health === HealthStatus.MinimallyHealthy) { + toast({ title: 'Network connected', description: 'Connected to Waku network. Some features may be limited.', variant: 'default' }); } else { toast({ title: 'Network disconnected', description: 'Lost connection to the Waku network', variant: 'destructive' }); } diff --git a/src/lib/waku/reliable_channel.ts b/src/lib/waku/reliable_channel.ts new file mode 100644 index 0000000..4cbc279 --- /dev/null +++ b/src/lib/waku/reliable_channel.ts @@ -0,0 +1,133 @@ +import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk"; +import { MessageType } from "./types"; +import { decodeMessage, decoders, encodeMessage, encoders } from "./codec"; +import { generateStringId } from "@/lib/utils"; +import { OpchanMessage } from "@/types/forum"; + +export interface MessageStatusCallback { + onSent?: (messageId: string) => void; + onAcknowledged?: (messageId: string) => void; + onError?: (messageId: string, error: string) => void; +} + +export interface IncomingMessageCallback { + onMessage: (message: OpchanMessage) => void; +} + +export class ReliableMessageManager { + private channels: Map> = new Map(); + private messageCallbacks: Map = new Map(); + private incomingMessageCallbacks: IncomingMessageCallback[] = []; + + constructor(node: LightNode) { + this.initializeChannels(node); + } + + private async initializeChannels(node: LightNode) { + for (const type of Object.values(MessageType)) { + const encoder = encoders[type]; + const decoder = decoders[type]; + const senderId = generateStringId(); + const channelId = `opchan-${type}`; // Unique channel ID for each message type + + try { + const channel = await ReliableChannel.create(node, channelId, senderId, encoder, decoder); + this.channels.set(type, channel); + this.setupChannelListeners(channel, type); + } catch (error) { + console.error(`Failed to create reliable channel for ${type}:`, error); + } + } + } + + private setupChannelListeners(channel: ReliableChannel, type: MessageType) { + channel.addEventListener(ReliableChannelEvent.InMessageReceived, (event) => { + try { + const wakuMessage = event.detail; + if (wakuMessage.payload) { + const opchanMessage = decodeMessage(wakuMessage.payload); + + + this.incomingMessageCallbacks.forEach(callback => { + callback.onMessage(opchanMessage); + }); + } + } catch (error) { + console.error(`Failed to process incoming message for ${type}:`, error); + } + }); + + // Listen for outgoing message status updates + channel.addEventListener(ReliableChannelEvent.OutMessageSent, (event) => { + const messageId = event.detail; + const callback = this.messageCallbacks.get(messageId); + if (callback?.onSent) { + callback.onSent(messageId); + } + }); + + channel.addEventListener(ReliableChannelEvent.OutMessageAcknowledged, (event) => { + const messageId = event.detail; + const callback = this.messageCallbacks.get(messageId); + if (callback?.onAcknowledged) { + callback.onAcknowledged(messageId); + } + }); + + channel.addEventListener(ReliableChannelEvent.OutMessageIrrecoverableError, (event) => { + const messageId = event.detail.messageId; + const error = event.detail.error; + const callback = this.messageCallbacks.get(messageId); + if (callback?.onError) { + callback.onError(messageId, error?.toString() || 'Unknown error'); + } + // Clean up callback after error + this.messageCallbacks.delete(messageId); + }); + } + + public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise { + const channel = this.channels.get(message.type); + if (!channel) { + throw new Error(`No reliable channel for message type: ${message.type}`); + } + + const encodedMessage = encodeMessage(message); + const messageId = ReliableChannel.getMessageId(encodedMessage); + + // Store callback for this message + if (statusCallback) { + this.messageCallbacks.set(messageId, statusCallback); + } + + try { + await channel.send(encodedMessage); + return messageId; + } catch (error) { + // Clean up callback on immediate send failure + this.messageCallbacks.delete(messageId); + throw error; + } + } + + public addIncomingMessageListener(callback: IncomingMessageCallback) { + this.incomingMessageCallbacks.push(callback); + } + + public removeIncomingMessageListener(callback: IncomingMessageCallback) { + const index = this.incomingMessageCallbacks.indexOf(callback); + if (index > -1) { + this.incomingMessageCallbacks.splice(index, 1); + } + } + + public getChannelStatus(type: MessageType): boolean { + return this.channels.has(type); + } + + public cleanup() { + this.messageCallbacks.clear(); + this.incomingMessageCallbacks.length = 0; + this.channels.clear(); + } +} \ No newline at end of file