From bf7b3f20a11e0b740d76c085f41bd09f4ab6831e Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Fri, 14 Nov 2025 14:37:00 -0500 Subject: [PATCH] feat: add sync status for messages --- app/src/components/Header.tsx | 94 ++++++++++++++++--- .../src/lib/waku/core/ReliableMessaging.ts | 28 ++++++ packages/core/src/lib/waku/index.ts | 88 ++++++++++++++++- packages/react/src/v1/hooks/useNetwork.ts | 2 + .../react/src/v1/provider/StoreWiring.tsx | 16 ++++ packages/react/src/v1/store/opchanStore.ts | 4 + 6 files changed, 214 insertions(+), 18 deletions(-) diff --git a/app/src/components/Header.tsx b/app/src/components/Header.tsx index 1327545..a90275a 100644 --- a/app/src/components/Header.tsx +++ b/app/src/components/Header.tsx @@ -22,6 +22,7 @@ import { X, Clock, Trash2, + Loader2, } from 'lucide-react'; import { DropdownMenu, @@ -30,6 +31,12 @@ import { DropdownMenuSeparator, DropdownMenuTrigger, } from '@/components/ui/dropdown-menu'; +import { + Tooltip, + TooltipContent, + TooltipProvider, + TooltipTrigger, +} from '@/components/ui/tooltip'; import { AlertDialog, AlertDialogAction, @@ -50,7 +57,7 @@ import { WakuHealthDot } from '@/components/ui/waku-health-indicator'; const Header = () => { const { currentUser, delegationInfo } = useAuth(); - const { statusMessage } = useNetwork(); + const { statusMessage, syncStatus, syncDetail } = useNetwork(); const location = useLocation(); const { toast } = useToast(); @@ -166,16 +173,52 @@ const Header = () => { {statusMessage} + {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 && ( + + + +
+ + SYNCING ({syncDetail.missing}) +
+
+ +

+ Syncing messages +
+ Pending: {syncDetail.missing} +
+ Received: {syncDetail.received} + {syncDetail.lost > 0 && ( + <> +
+ Lost: {syncDetail.lost} + + )} +

+
+
+
+ )} {content.lastSync && ( -
- - - {new Date(content.lastSync).toLocaleTimeString([], { - hour: '2-digit', - minute: '2-digit', - })} - -
+ + + +
+ + + {new Date(content.lastSync).toLocaleTimeString([], { + hour: '2-digit', + minute: '2-digit', + })} + +
+
+ +

Last message sync time

+
+
+
)} @@ -183,9 +226,26 @@ const Header = () => { {/* Right: User Actions */}
{/* Network Status (Mobile) */} -
- -
+ + + +
+ {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 ? ( + + ) : ( + + )} +
+
+ +

+ {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 + ? `Syncing ${syncDetail.missing} messages...` + : 'Network connected'} +

+
+
+
{/* User Status & Actions */} {isConnected || currentUser?.verificationStatus === EVerificationStatus.ANONYMOUS ? ( @@ -466,8 +526,14 @@ const Header = () => {
{statusMessage} + {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 && ( + + + SYNCING ({syncDetail.missing}) + + )} {content.lastSync && ( - + {new Date(content.lastSync).toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', diff --git a/packages/core/src/lib/waku/core/ReliableMessaging.ts b/packages/core/src/lib/waku/core/ReliableMessaging.ts index 24ad698..abf0eb0 100644 --- a/packages/core/src/lib/waku/core/ReliableMessaging.ts +++ b/packages/core/src/lib/waku/core/ReliableMessaging.ts @@ -16,16 +16,19 @@ export interface MessageStatusCallback { } export type IncomingMessageCallback = (message: OpchanMessage) => void; +export type SyncStatusCallback = (status: 'syncing' | 'synced', detail: { received: number; missing: number; lost: number }) => void; export class ReliableMessaging { private channel: ReliableChannel | null = null; private messageCallbacks: Map = new Map(); private incomingMessageCallbacks: Set = new Set(); + private syncStatusCallbacks: Set = new Set(); private codecManager: CodecManager; constructor(node: LightNode, config: WakuConfig) { this.codecManager = new CodecManager(node, config); this.initializeChannel(node, config); + } // ===== PUBLIC METHODS ===== @@ -58,9 +61,15 @@ export class ReliableMessaging { return () => this.incomingMessageCallbacks.delete(callback); } + public onSyncStatus(callback: SyncStatusCallback): () => void { + this.syncStatusCallbacks.add(callback); + return () => this.syncStatusCallbacks.delete(callback); + } + public cleanup(): void { this.messageCallbacks.clear(); this.incomingMessageCallbacks.clear(); + this.syncStatusCallbacks.clear(); this.channel = null; } @@ -81,11 +90,30 @@ export class ReliableMessaging { decoder ); this.setupChannelListeners(this.channel); + this.setupSyncStatusListeners(this.channel); } catch (error) { console.error('Failed to create reliable channel:', error); } } + private setupSyncStatusListeners(channel: ReliableChannel): void { + // Check if syncStatus API is available + if (!channel.syncStatus) { + console.warn('ReliableChannel.syncStatus is not available in this SDK version'); + return; + } + + channel.syncStatus.addEventListener('syncing', (event) => { + const detail = event.detail; + this.syncStatusCallbacks.forEach(cb => cb('syncing', detail)); + }); + + channel.syncStatus.addEventListener('synced', (event) => { + const detail = event.detail; + this.syncStatusCallbacks.forEach(cb => cb('synced', detail)); + }); + } + private setupChannelListeners( channel: ReliableChannel ): void { diff --git a/packages/core/src/lib/waku/index.ts b/packages/core/src/lib/waku/index.ts index 244e5f9..14148b5 100644 --- a/packages/core/src/lib/waku/index.ts +++ b/packages/core/src/lib/waku/index.ts @@ -5,10 +5,10 @@ import { MessageService, MessageStatusCallback, } from './services/MessageService'; -import { ReliableMessaging } from './core/ReliableMessaging'; +import { ReliableMessaging, SyncStatusCallback } from './core/ReliableMessaging'; import { WakuConfig } from '../../types'; -export type { HealthChangeCallback, MessageStatusCallback }; +export type { HealthChangeCallback, MessageStatusCallback, SyncStatusCallback }; class MessageManager { private nodeManager: WakuNodeManager | null = null; @@ -71,6 +71,13 @@ class MessageManager { return this.messageService.onMessageReceived(callback); } + public onSyncStatus(callback: SyncStatusCallback): () => void { + if (!this.reliableMessaging) { + throw new Error('Reliable messaging not initialized'); + } + return this.reliableMessaging.onSyncStatus(callback); + } + // ===== PRIVATE METHODS ===== private async initialize(): Promise { @@ -84,9 +91,9 @@ class MessageManager { ); // Set up health-based reliable messaging initialization - this.nodeManager.onHealthChange(isReady => { + this.nodeManager.onHealthChange(async (isReady) => { if (isReady && !this.reliableMessaging) { - this.initializeReliableMessaging(); + await this.initializeReliableMessaging(); } else if (!isReady && this.reliableMessaging) { this.cleanupReliableMessaging(); } @@ -115,6 +122,10 @@ class MessageManager { } } + public getReliableMessaging(): ReliableMessaging | null { + return this.reliableMessaging; + } + private cleanupReliableMessaging(): void { if (this.reliableMessaging) { console.log('Cleaning up reliable messaging due to health status'); @@ -131,6 +142,7 @@ export class DefaultMessageManager { private _initPromise: Promise | null = null; private _pendingHealthSubscriptions: HealthChangeCallback[] = []; private _pendingMessageSubscriptions: ((message: any) => void)[] = []; + private _pendingSyncStatusSubscriptions: SyncStatusCallback[] = []; private _wakuConfig: WakuConfig | null = null; // ===== PUBLIC METHODS ===== @@ -157,6 +169,31 @@ export class DefaultMessageManager { this._instance!.onMessageReceived(callback); }); this._pendingMessageSubscriptions = []; + + // Establish all pending sync status subscriptions + this._pendingSyncStatusSubscriptions.forEach(callback => { + try { + this._instance!.onSyncStatus(callback); + } catch (e) { + // Reliable messaging might not be ready yet, keep in pending + } + }); + + // Set up a listener to retry sync subscriptions when reliable messaging becomes available + const reliableMessaging = this._instance.getReliableMessaging(); + if (!reliableMessaging) { + // Watch for when it becomes available + const checkInterval = setInterval(() => { + const rm = this._instance?.getReliableMessaging(); + if (rm && this._pendingSyncStatusSubscriptions.length > 0) { + this.retryPendingSyncSubscriptions(); + clearInterval(checkInterval); + } + }, 1000); + + // Clean up after 30 seconds + setTimeout(() => clearInterval(checkInterval), 30000); + } } // Proxy other common methods @@ -209,6 +246,49 @@ export class DefaultMessageManager { } return this._instance.onMessageReceived(callback); } + + onSyncStatus(callback: SyncStatusCallback) { + if (!this._instance) { + // Queue the callback for when we're initialized + this._pendingSyncStatusSubscriptions.push(callback); + + return () => { + const index = this._pendingSyncStatusSubscriptions.indexOf(callback); + if (index !== -1) { + this._pendingSyncStatusSubscriptions.splice(index, 1); + } + }; + } + try { + return this._instance.onSyncStatus(callback); + } catch (e) { + // Reliable messaging not ready, queue it + this._pendingSyncStatusSubscriptions.push(callback); + return () => { + const index = this._pendingSyncStatusSubscriptions.indexOf(callback); + if (index !== -1) { + this._pendingSyncStatusSubscriptions.splice(index, 1); + } + }; + } + } + + // Helper to retry pending sync subscriptions when reliable messaging becomes available + private retryPendingSyncSubscriptions() { + if (!this._instance) return; + + const pending = [...this._pendingSyncStatusSubscriptions]; + this._pendingSyncStatusSubscriptions = []; + + pending.forEach(callback => { + try { + this._instance!.onSyncStatus(callback); + } catch (e) { + // Still not ready, put it back + this._pendingSyncStatusSubscriptions.push(callback); + } + }); + } } const messageManager = new DefaultMessageManager(); diff --git a/packages/react/src/v1/hooks/useNetwork.ts b/packages/react/src/v1/hooks/useNetwork.ts index ee5f352..6229ad6 100644 --- a/packages/react/src/v1/hooks/useNetwork.ts +++ b/packages/react/src/v1/hooks/useNetwork.ts @@ -20,6 +20,8 @@ export function useNetwork() { statusMessage: network.statusMessage, issues: network.issues, isHydrated: network.isHydrated, + syncStatus: network.syncStatus, + syncDetail: network.syncDetail, canRefresh: true, refresh, } as const; diff --git a/packages/react/src/v1/provider/StoreWiring.tsx b/packages/react/src/v1/provider/StoreWiring.tsx index 8ad0f10..31422e9 100644 --- a/packages/react/src/v1/provider/StoreWiring.tsx +++ b/packages/react/src/v1/provider/StoreWiring.tsx @@ -103,6 +103,22 @@ export const StoreWiring: React.FC = () => { })); }); + // Wire sync status + try { + client.messageManager.onSyncStatus((status, detail) => { + setOpchanState(prev => ({ + ...prev, + network: { + ...prev.network, + syncStatus: status, + syncDetail: detail, + }, + })); + }); + } catch (e) { + // Reliable messaging not ready yet + } + unsubMessages = client.messageManager.onMessageReceived(async (message: OpchanMessage) => { // Persist, then reflect cache in store try { diff --git a/packages/react/src/v1/store/opchanStore.ts b/packages/react/src/v1/store/opchanStore.ts index ee69208..4d43de8 100644 --- a/packages/react/src/v1/store/opchanStore.ts +++ b/packages/react/src/v1/store/opchanStore.ts @@ -47,6 +47,8 @@ export interface NetworkSlice { statusMessage: string; issues: string[]; isHydrated: boolean; + syncStatus: 'syncing' | 'synced' | 'unknown'; + syncDetail: { received: number; missing: number; lost: number } | null; } export interface OpchanState { @@ -87,6 +89,8 @@ const defaultState: OpchanState = { statusMessage: 'connecting…', issues: [], isHydrated: false, + syncStatus: 'unknown', + syncDetail: null, }, };