diff --git a/app/src/components/Header.tsx b/app/src/components/Header.tsx
index 1327545..a90275a 100644
--- a/app/src/components/Header.tsx
+++ b/app/src/components/Header.tsx
@@ -22,6 +22,7 @@ import {
X,
Clock,
Trash2,
+ Loader2,
} from 'lucide-react';
import {
DropdownMenu,
@@ -30,6 +31,12 @@ import {
DropdownMenuSeparator,
DropdownMenuTrigger,
} from '@/components/ui/dropdown-menu';
+import {
+ Tooltip,
+ TooltipContent,
+ TooltipProvider,
+ TooltipTrigger,
+} from '@/components/ui/tooltip';
import {
AlertDialog,
AlertDialogAction,
@@ -50,7 +57,7 @@ import { WakuHealthDot } from '@/components/ui/waku-health-indicator';
const Header = () => {
const { currentUser, delegationInfo } = useAuth();
- const { statusMessage } = useNetwork();
+ const { statusMessage, syncStatus, syncDetail } = useNetwork();
const location = useLocation();
const { toast } = useToast();
@@ -166,16 +173,52 @@ const Header = () => {
{statusMessage}
+ {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 && (
+
+
+
+
+
+ SYNCING ({syncDetail.missing})
+
+
+
+
+ Syncing messages
+
+ Pending: {syncDetail.missing}
+
+ Received: {syncDetail.received}
+ {syncDetail.lost > 0 && (
+ <>
+
+ Lost: {syncDetail.lost}
+ >
+ )}
+
+
+
+
+ )}
{content.lastSync && (
-
{/* Network Status (Mobile) */}
-
-
-
+
+
+
+
+ {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 ? (
+
+ ) : (
+
+ )}
+
+
+
+
+ {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0
+ ? `Syncing ${syncDetail.missing} messages...`
+ : 'Network connected'}
+
+
+
+
{/* User Status & Actions */}
{isConnected || currentUser?.verificationStatus === EVerificationStatus.ANONYMOUS ? (
@@ -466,8 +526,14 @@ const Header = () => {
{statusMessage}
+ {syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 && (
+
+
+ SYNCING ({syncDetail.missing})
+
+ )}
{content.lastSync && (
-
+
{new Date(content.lastSync).toLocaleTimeString([], {
hour: '2-digit',
minute: '2-digit',
diff --git a/packages/core/src/lib/waku/core/ReliableMessaging.ts b/packages/core/src/lib/waku/core/ReliableMessaging.ts
index 24ad698..abf0eb0 100644
--- a/packages/core/src/lib/waku/core/ReliableMessaging.ts
+++ b/packages/core/src/lib/waku/core/ReliableMessaging.ts
@@ -16,16 +16,19 @@ export interface MessageStatusCallback {
}
export type IncomingMessageCallback = (message: OpchanMessage) => void;
+export type SyncStatusCallback = (status: 'syncing' | 'synced', detail: { received: number; missing: number; lost: number }) => void;
export class ReliableMessaging {
private channel: ReliableChannel | null = null;
private messageCallbacks: Map = new Map();
private incomingMessageCallbacks: Set = new Set();
+ private syncStatusCallbacks: Set = new Set();
private codecManager: CodecManager;
constructor(node: LightNode, config: WakuConfig) {
this.codecManager = new CodecManager(node, config);
this.initializeChannel(node, config);
+
}
// ===== PUBLIC METHODS =====
@@ -58,9 +61,15 @@ export class ReliableMessaging {
return () => this.incomingMessageCallbacks.delete(callback);
}
+ public onSyncStatus(callback: SyncStatusCallback): () => void {
+ this.syncStatusCallbacks.add(callback);
+ return () => this.syncStatusCallbacks.delete(callback);
+ }
+
public cleanup(): void {
this.messageCallbacks.clear();
this.incomingMessageCallbacks.clear();
+ this.syncStatusCallbacks.clear();
this.channel = null;
}
@@ -81,11 +90,30 @@ export class ReliableMessaging {
decoder
);
this.setupChannelListeners(this.channel);
+ this.setupSyncStatusListeners(this.channel);
} catch (error) {
console.error('Failed to create reliable channel:', error);
}
}
+ private setupSyncStatusListeners(channel: ReliableChannel): void {
+ // Check if syncStatus API is available
+ if (!channel.syncStatus) {
+ console.warn('ReliableChannel.syncStatus is not available in this SDK version');
+ return;
+ }
+
+ channel.syncStatus.addEventListener('syncing', (event) => {
+ const detail = event.detail;
+ this.syncStatusCallbacks.forEach(cb => cb('syncing', detail));
+ });
+
+ channel.syncStatus.addEventListener('synced', (event) => {
+ const detail = event.detail;
+ this.syncStatusCallbacks.forEach(cb => cb('synced', detail));
+ });
+ }
+
private setupChannelListeners(
channel: ReliableChannel
): void {
diff --git a/packages/core/src/lib/waku/index.ts b/packages/core/src/lib/waku/index.ts
index 244e5f9..14148b5 100644
--- a/packages/core/src/lib/waku/index.ts
+++ b/packages/core/src/lib/waku/index.ts
@@ -5,10 +5,10 @@ import {
MessageService,
MessageStatusCallback,
} from './services/MessageService';
-import { ReliableMessaging } from './core/ReliableMessaging';
+import { ReliableMessaging, SyncStatusCallback } from './core/ReliableMessaging';
import { WakuConfig } from '../../types';
-export type { HealthChangeCallback, MessageStatusCallback };
+export type { HealthChangeCallback, MessageStatusCallback, SyncStatusCallback };
class MessageManager {
private nodeManager: WakuNodeManager | null = null;
@@ -71,6 +71,13 @@ class MessageManager {
return this.messageService.onMessageReceived(callback);
}
+ public onSyncStatus(callback: SyncStatusCallback): () => void {
+ if (!this.reliableMessaging) {
+ throw new Error('Reliable messaging not initialized');
+ }
+ return this.reliableMessaging.onSyncStatus(callback);
+ }
+
// ===== PRIVATE METHODS =====
private async initialize(): Promise {
@@ -84,9 +91,9 @@ class MessageManager {
);
// Set up health-based reliable messaging initialization
- this.nodeManager.onHealthChange(isReady => {
+ this.nodeManager.onHealthChange(async (isReady) => {
if (isReady && !this.reliableMessaging) {
- this.initializeReliableMessaging();
+ await this.initializeReliableMessaging();
} else if (!isReady && this.reliableMessaging) {
this.cleanupReliableMessaging();
}
@@ -115,6 +122,10 @@ class MessageManager {
}
}
+ public getReliableMessaging(): ReliableMessaging | null {
+ return this.reliableMessaging;
+ }
+
private cleanupReliableMessaging(): void {
if (this.reliableMessaging) {
console.log('Cleaning up reliable messaging due to health status');
@@ -131,6 +142,7 @@ export class DefaultMessageManager {
private _initPromise: Promise | null = null;
private _pendingHealthSubscriptions: HealthChangeCallback[] = [];
private _pendingMessageSubscriptions: ((message: any) => void)[] = [];
+ private _pendingSyncStatusSubscriptions: SyncStatusCallback[] = [];
private _wakuConfig: WakuConfig | null = null;
// ===== PUBLIC METHODS =====
@@ -157,6 +169,31 @@ export class DefaultMessageManager {
this._instance!.onMessageReceived(callback);
});
this._pendingMessageSubscriptions = [];
+
+ // Establish all pending sync status subscriptions
+ this._pendingSyncStatusSubscriptions.forEach(callback => {
+ try {
+ this._instance!.onSyncStatus(callback);
+ } catch (e) {
+ // Reliable messaging might not be ready yet, keep in pending
+ }
+ });
+
+ // Set up a listener to retry sync subscriptions when reliable messaging becomes available
+ const reliableMessaging = this._instance.getReliableMessaging();
+ if (!reliableMessaging) {
+ // Watch for when it becomes available
+ const checkInterval = setInterval(() => {
+ const rm = this._instance?.getReliableMessaging();
+ if (rm && this._pendingSyncStatusSubscriptions.length > 0) {
+ this.retryPendingSyncSubscriptions();
+ clearInterval(checkInterval);
+ }
+ }, 1000);
+
+ // Clean up after 30 seconds
+ setTimeout(() => clearInterval(checkInterval), 30000);
+ }
}
// Proxy other common methods
@@ -209,6 +246,49 @@ export class DefaultMessageManager {
}
return this._instance.onMessageReceived(callback);
}
+
+ onSyncStatus(callback: SyncStatusCallback) {
+ if (!this._instance) {
+ // Queue the callback for when we're initialized
+ this._pendingSyncStatusSubscriptions.push(callback);
+
+ return () => {
+ const index = this._pendingSyncStatusSubscriptions.indexOf(callback);
+ if (index !== -1) {
+ this._pendingSyncStatusSubscriptions.splice(index, 1);
+ }
+ };
+ }
+ try {
+ return this._instance.onSyncStatus(callback);
+ } catch (e) {
+ // Reliable messaging not ready, queue it
+ this._pendingSyncStatusSubscriptions.push(callback);
+ return () => {
+ const index = this._pendingSyncStatusSubscriptions.indexOf(callback);
+ if (index !== -1) {
+ this._pendingSyncStatusSubscriptions.splice(index, 1);
+ }
+ };
+ }
+ }
+
+ // Helper to retry pending sync subscriptions when reliable messaging becomes available
+ private retryPendingSyncSubscriptions() {
+ if (!this._instance) return;
+
+ const pending = [...this._pendingSyncStatusSubscriptions];
+ this._pendingSyncStatusSubscriptions = [];
+
+ pending.forEach(callback => {
+ try {
+ this._instance!.onSyncStatus(callback);
+ } catch (e) {
+ // Still not ready, put it back
+ this._pendingSyncStatusSubscriptions.push(callback);
+ }
+ });
+ }
}
const messageManager = new DefaultMessageManager();
diff --git a/packages/react/src/v1/hooks/useNetwork.ts b/packages/react/src/v1/hooks/useNetwork.ts
index ee5f352..6229ad6 100644
--- a/packages/react/src/v1/hooks/useNetwork.ts
+++ b/packages/react/src/v1/hooks/useNetwork.ts
@@ -20,6 +20,8 @@ export function useNetwork() {
statusMessage: network.statusMessage,
issues: network.issues,
isHydrated: network.isHydrated,
+ syncStatus: network.syncStatus,
+ syncDetail: network.syncDetail,
canRefresh: true,
refresh,
} as const;
diff --git a/packages/react/src/v1/provider/StoreWiring.tsx b/packages/react/src/v1/provider/StoreWiring.tsx
index 8ad0f10..31422e9 100644
--- a/packages/react/src/v1/provider/StoreWiring.tsx
+++ b/packages/react/src/v1/provider/StoreWiring.tsx
@@ -103,6 +103,22 @@ export const StoreWiring: React.FC = () => {
}));
});
+ // Wire sync status
+ try {
+ client.messageManager.onSyncStatus((status, detail) => {
+ setOpchanState(prev => ({
+ ...prev,
+ network: {
+ ...prev.network,
+ syncStatus: status,
+ syncDetail: detail,
+ },
+ }));
+ });
+ } catch (e) {
+ // Reliable messaging not ready yet
+ }
+
unsubMessages = client.messageManager.onMessageReceived(async (message: OpchanMessage) => {
// Persist, then reflect cache in store
try {
diff --git a/packages/react/src/v1/store/opchanStore.ts b/packages/react/src/v1/store/opchanStore.ts
index ee69208..4d43de8 100644
--- a/packages/react/src/v1/store/opchanStore.ts
+++ b/packages/react/src/v1/store/opchanStore.ts
@@ -47,6 +47,8 @@ export interface NetworkSlice {
statusMessage: string;
issues: string[];
isHydrated: boolean;
+ syncStatus: 'syncing' | 'synced' | 'unknown';
+ syncDetail: { received: number; missing: number; lost: number } | null;
}
export interface OpchanState {
@@ -87,6 +89,8 @@ const defaultState: OpchanState = {
statusMessage: 'connecting…',
issues: [],
isHydrated: false,
+ syncStatus: 'unknown',
+ syncDetail: null,
},
};