feat: add sync status for messages

This commit is contained in:
Danish Arora 2025-11-14 14:37:00 -05:00
parent 78ff8b537b
commit bf7b3f20a1
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
6 changed files with 214 additions and 18 deletions

View File

@ -22,6 +22,7 @@ import {
X,
Clock,
Trash2,
Loader2,
} from 'lucide-react';
import {
DropdownMenu,
@ -30,6 +31,12 @@ import {
DropdownMenuSeparator,
DropdownMenuTrigger,
} from '@/components/ui/dropdown-menu';
import {
Tooltip,
TooltipContent,
TooltipProvider,
TooltipTrigger,
} from '@/components/ui/tooltip';
import {
AlertDialog,
AlertDialogAction,
@ -50,7 +57,7 @@ import { WakuHealthDot } from '@/components/ui/waku-health-indicator';
const Header = () => {
const { currentUser, delegationInfo } = useAuth();
const { statusMessage } = useNetwork();
const { statusMessage, syncStatus, syncDetail } = useNetwork();
const location = useLocation();
const { toast } = useToast();
@ -166,16 +173,52 @@ const Header = () => {
<span className="text-[10px] text-muted-foreground">
{statusMessage}
</span>
{syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 && (
<TooltipProvider delayDuration={200}>
<Tooltip>
<TooltipTrigger asChild>
<div className="flex items-center space-x-1 text-[10px] text-yellow-400 cursor-help">
<Loader2 className="w-3 h-3 animate-spin" />
<span>SYNCING ({syncDetail.missing})</span>
</div>
</TooltipTrigger>
<TooltipContent>
<p className="text-xs">
<strong>Syncing messages</strong>
<br />
Pending: {syncDetail.missing}
<br />
Received: {syncDetail.received}
{syncDetail.lost > 0 && (
<>
<br />
Lost: {syncDetail.lost}
</>
)}
</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
)}
{content.lastSync && (
<div className="flex items-center space-x-1 text-[10px] text-muted-foreground">
<Clock className="w-3 h-3" />
<span>
{new Date(content.lastSync).toLocaleTimeString([], {
hour: '2-digit',
minute: '2-digit',
})}
</span>
</div>
<TooltipProvider delayDuration={200}>
<Tooltip>
<TooltipTrigger asChild>
<div className="flex items-center space-x-1 text-[10px] text-muted-foreground cursor-help">
<Clock className="w-3 h-3" />
<span>
{new Date(content.lastSync).toLocaleTimeString([], {
hour: '2-digit',
minute: '2-digit',
})}
</span>
</div>
</TooltipTrigger>
<TooltipContent>
<p className="text-xs">Last message sync time</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
)}
</div>
</div>
@ -183,9 +226,26 @@ const Header = () => {
{/* Right: User Actions */}
<div className="flex items-center space-x-2 sm:space-x-3 flex-shrink-0">
{/* Network Status (Mobile) */}
<div className="lg:hidden">
<WakuHealthDot />
</div>
<TooltipProvider delayDuration={200}>
<Tooltip>
<TooltipTrigger asChild>
<div className="lg:hidden flex items-center space-x-1 cursor-help">
{syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 ? (
<Loader2 className="w-4 h-4 text-yellow-400 animate-spin" />
) : (
<WakuHealthDot />
)}
</div>
</TooltipTrigger>
<TooltipContent>
<p className="text-xs">
{syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0
? `Syncing ${syncDetail.missing} messages...`
: 'Network connected'}
</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
{/* User Status & Actions */}
{isConnected || currentUser?.verificationStatus === EVerificationStatus.ANONYMOUS ? (
@ -466,8 +526,14 @@ const Header = () => {
<div className="flex items-center space-x-2 text-[10px] uppercase tracking-[0.2em] text-muted-foreground">
<WakuHealthDot />
<span>{statusMessage}</span>
{syncStatus === 'syncing' && syncDetail && syncDetail.missing > 0 && (
<span className="text-yellow-400 flex items-center space-x-1">
<Loader2 className="w-3 h-3 animate-spin" />
<span>SYNCING ({syncDetail.missing})</span>
</span>
)}
{content.lastSync && (
<span className="ml-auto">
<span className="ml-auto" title="Last message sync">
{new Date(content.lastSync).toLocaleTimeString([], {
hour: '2-digit',
minute: '2-digit',

View File

@ -16,16 +16,19 @@ export interface MessageStatusCallback {
}
export type IncomingMessageCallback = (message: OpchanMessage) => void;
export type SyncStatusCallback = (status: 'syncing' | 'synced', detail: { received: number; missing: number; lost: number }) => void;
export class ReliableMessaging {
private channel: ReliableChannel<IDecodedMessage> | null = null;
private messageCallbacks: Map<string, MessageStatusCallback> = new Map();
private incomingMessageCallbacks: Set<IncomingMessageCallback> = new Set();
private syncStatusCallbacks: Set<SyncStatusCallback> = new Set();
private codecManager: CodecManager;
constructor(node: LightNode, config: WakuConfig) {
this.codecManager = new CodecManager(node, config);
this.initializeChannel(node, config);
}
// ===== PUBLIC METHODS =====
@ -58,9 +61,15 @@ export class ReliableMessaging {
return () => this.incomingMessageCallbacks.delete(callback);
}
public onSyncStatus(callback: SyncStatusCallback): () => void {
this.syncStatusCallbacks.add(callback);
return () => this.syncStatusCallbacks.delete(callback);
}
public cleanup(): void {
this.messageCallbacks.clear();
this.incomingMessageCallbacks.clear();
this.syncStatusCallbacks.clear();
this.channel = null;
}
@ -81,11 +90,30 @@ export class ReliableMessaging {
decoder
);
this.setupChannelListeners(this.channel);
this.setupSyncStatusListeners(this.channel);
} catch (error) {
console.error('Failed to create reliable channel:', error);
}
}
private setupSyncStatusListeners(channel: ReliableChannel<IDecodedMessage>): void {
// Check if syncStatus API is available
if (!channel.syncStatus) {
console.warn('ReliableChannel.syncStatus is not available in this SDK version');
return;
}
channel.syncStatus.addEventListener('syncing', (event) => {
const detail = event.detail;
this.syncStatusCallbacks.forEach(cb => cb('syncing', detail));
});
channel.syncStatus.addEventListener('synced', (event) => {
const detail = event.detail;
this.syncStatusCallbacks.forEach(cb => cb('synced', detail));
});
}
private setupChannelListeners(
channel: ReliableChannel<IDecodedMessage>
): void {

View File

@ -5,10 +5,10 @@ import {
MessageService,
MessageStatusCallback,
} from './services/MessageService';
import { ReliableMessaging } from './core/ReliableMessaging';
import { ReliableMessaging, SyncStatusCallback } from './core/ReliableMessaging';
import { WakuConfig } from '../../types';
export type { HealthChangeCallback, MessageStatusCallback };
export type { HealthChangeCallback, MessageStatusCallback, SyncStatusCallback };
class MessageManager {
private nodeManager: WakuNodeManager | null = null;
@ -71,6 +71,13 @@ class MessageManager {
return this.messageService.onMessageReceived(callback);
}
public onSyncStatus(callback: SyncStatusCallback): () => void {
if (!this.reliableMessaging) {
throw new Error('Reliable messaging not initialized');
}
return this.reliableMessaging.onSyncStatus(callback);
}
// ===== PRIVATE METHODS =====
private async initialize(): Promise<void> {
@ -84,9 +91,9 @@ class MessageManager {
);
// Set up health-based reliable messaging initialization
this.nodeManager.onHealthChange(isReady => {
this.nodeManager.onHealthChange(async (isReady) => {
if (isReady && !this.reliableMessaging) {
this.initializeReliableMessaging();
await this.initializeReliableMessaging();
} else if (!isReady && this.reliableMessaging) {
this.cleanupReliableMessaging();
}
@ -115,6 +122,10 @@ class MessageManager {
}
}
public getReliableMessaging(): ReliableMessaging | null {
return this.reliableMessaging;
}
private cleanupReliableMessaging(): void {
if (this.reliableMessaging) {
console.log('Cleaning up reliable messaging due to health status');
@ -131,6 +142,7 @@ export class DefaultMessageManager {
private _initPromise: Promise<MessageManager> | null = null;
private _pendingHealthSubscriptions: HealthChangeCallback[] = [];
private _pendingMessageSubscriptions: ((message: any) => void)[] = [];
private _pendingSyncStatusSubscriptions: SyncStatusCallback[] = [];
private _wakuConfig: WakuConfig | null = null;
// ===== PUBLIC METHODS =====
@ -157,6 +169,31 @@ export class DefaultMessageManager {
this._instance!.onMessageReceived(callback);
});
this._pendingMessageSubscriptions = [];
// Establish all pending sync status subscriptions
this._pendingSyncStatusSubscriptions.forEach(callback => {
try {
this._instance!.onSyncStatus(callback);
} catch (e) {
// Reliable messaging might not be ready yet, keep in pending
}
});
// Set up a listener to retry sync subscriptions when reliable messaging becomes available
const reliableMessaging = this._instance.getReliableMessaging();
if (!reliableMessaging) {
// Watch for when it becomes available
const checkInterval = setInterval(() => {
const rm = this._instance?.getReliableMessaging();
if (rm && this._pendingSyncStatusSubscriptions.length > 0) {
this.retryPendingSyncSubscriptions();
clearInterval(checkInterval);
}
}, 1000);
// Clean up after 30 seconds
setTimeout(() => clearInterval(checkInterval), 30000);
}
}
// Proxy other common methods
@ -209,6 +246,49 @@ export class DefaultMessageManager {
}
return this._instance.onMessageReceived(callback);
}
onSyncStatus(callback: SyncStatusCallback) {
if (!this._instance) {
// Queue the callback for when we're initialized
this._pendingSyncStatusSubscriptions.push(callback);
return () => {
const index = this._pendingSyncStatusSubscriptions.indexOf(callback);
if (index !== -1) {
this._pendingSyncStatusSubscriptions.splice(index, 1);
}
};
}
try {
return this._instance.onSyncStatus(callback);
} catch (e) {
// Reliable messaging not ready, queue it
this._pendingSyncStatusSubscriptions.push(callback);
return () => {
const index = this._pendingSyncStatusSubscriptions.indexOf(callback);
if (index !== -1) {
this._pendingSyncStatusSubscriptions.splice(index, 1);
}
};
}
}
// Helper to retry pending sync subscriptions when reliable messaging becomes available
private retryPendingSyncSubscriptions() {
if (!this._instance) return;
const pending = [...this._pendingSyncStatusSubscriptions];
this._pendingSyncStatusSubscriptions = [];
pending.forEach(callback => {
try {
this._instance!.onSyncStatus(callback);
} catch (e) {
// Still not ready, put it back
this._pendingSyncStatusSubscriptions.push(callback);
}
});
}
}
const messageManager = new DefaultMessageManager();

View File

@ -20,6 +20,8 @@ export function useNetwork() {
statusMessage: network.statusMessage,
issues: network.issues,
isHydrated: network.isHydrated,
syncStatus: network.syncStatus,
syncDetail: network.syncDetail,
canRefresh: true,
refresh,
} as const;

View File

@ -103,6 +103,22 @@ export const StoreWiring: React.FC = () => {
}));
});
// Wire sync status
try {
client.messageManager.onSyncStatus((status, detail) => {
setOpchanState(prev => ({
...prev,
network: {
...prev.network,
syncStatus: status,
syncDetail: detail,
},
}));
});
} catch (e) {
// Reliable messaging not ready yet
}
unsubMessages = client.messageManager.onMessageReceived(async (message: OpchanMessage) => {
// Persist, then reflect cache in store
try {

View File

@ -47,6 +47,8 @@ export interface NetworkSlice {
statusMessage: string;
issues: string[];
isHydrated: boolean;
syncStatus: 'syncing' | 'synced' | 'unknown';
syncDetail: { received: number; missing: number; lost: number } | null;
}
export interface OpchanState {
@ -87,6 +89,8 @@ const defaultState: OpchanState = {
statusMessage: 'connecting…',
issues: [],
isHydrated: false,
syncStatus: 'unknown',
syncDetail: null,
},
};