feat: initial SDS implementaiton

This commit is contained in:
Danish Arora 2025-08-29 15:43:10 +05:30
parent fe17cbc57b
commit 4b96bedcdd
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
6 changed files with 255 additions and 140 deletions

View File

@ -1,10 +1,10 @@
import { WalletService } from '../wallets/index';
import { WalletService } from '../identity/wallets/index';
import { UseAppKitAccountReturn } from '@reown/appkit/react';
import { AppKit } from '@reown/appkit';
import { OrdinalAPI } from '../ordinal';
import { OrdinalAPI } from '../identity/ordinal';
import { CryptoService, DelegationDuration } from './CryptoService';
import { EVerificationStatus, User } from '@/types/forum';
import { WalletInfo } from '../wallets/ReOwnWalletService';
import { WalletInfo } from '../identity/wallets/ReOwnWalletService';
export interface AuthResult {
success: boolean;
@ -31,7 +31,7 @@ export interface AuthServiceInterface {
clearStoredUser(): void;
// Wallet info
getWalletInfo(): Promise<WalletInfo>;
getWalletInfo(): Promise<WalletInfo | null>;
}
export class AuthService implements AuthServiceInterface {
@ -152,8 +152,6 @@ export class AuthService implements AuthServiceInterface {
async disconnectWallet(): Promise<void> {
// Clear any existing delegations when disconnecting
this.cryptoService.clearDelegation();
this.walletService.clearDelegation('bitcoin');
this.walletService.clearDelegation('ethereum');
// Clear stored user data
this.clearStoredUser();
@ -298,7 +296,7 @@ export class AuthService implements AuthServiceInterface {
/**
* Get current wallet info
*/
async getWalletInfo() {
async getWalletInfo(): Promise<WalletInfo | null> {
// Use the wallet service to get detailed wallet info including ENS
return await this.walletService.getWalletInfo();
}

View File

@ -4,50 +4,38 @@ import { CellMessage, PostMessage, CommentMessage, VoteMessage } from './types'
import { CONTENT_TOPICS, NETWORK_CONFIG } from './constants';
import { OpchanMessage } from '@/types/forum';
// Create the sharded pubsub topic
const PUBSUB_TOPIC = `/waku/2/rs/${NETWORK_CONFIG.clusterId}/0`;
export const encoders = {
[MessageType.CELL]: createEncoder({
contentTopic: CONTENT_TOPICS['cell'],
pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0}
contentTopic: CONTENT_TOPICS[MessageType.CELL],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.POST]: createEncoder({
contentTopic: CONTENT_TOPICS['post'],
pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0}
contentTopic: CONTENT_TOPICS[MessageType.POST],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.COMMENT]: createEncoder({
contentTopic: CONTENT_TOPICS['comment'],
pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0}
contentTopic: CONTENT_TOPICS[MessageType.COMMENT],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.VOTE]: createEncoder({
contentTopic: CONTENT_TOPICS['vote'],
pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0}
contentTopic: CONTENT_TOPICS[MessageType.VOTE],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.MODERATE]: createEncoder({
contentTopic: CONTENT_TOPICS['moderate'],
pubsubTopicShardInfo: {clusterId: NETWORK_CONFIG.clusterId, shard: 0}
contentTopic: CONTENT_TOPICS[MessageType.MODERATE],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
})
}
export const decoders = {
[MessageType.CELL]: createDecoder(CONTENT_TOPICS['cell'], {
clusterId: NETWORK_CONFIG.clusterId,
shard: 0
}),
[MessageType.POST]: createDecoder(CONTENT_TOPICS['post'], {
clusterId: NETWORK_CONFIG.clusterId,
shard: 0
}),
[MessageType.COMMENT]: createDecoder(CONTENT_TOPICS['comment'], {
clusterId: NETWORK_CONFIG.clusterId,
shard: 0
}),
[MessageType.VOTE]: createDecoder(CONTENT_TOPICS['vote'], {
clusterId: NETWORK_CONFIG.clusterId,
shard: 0
}),
[MessageType.MODERATE]: createDecoder(CONTENT_TOPICS['moderate'], {
clusterId: NETWORK_CONFIG.clusterId,
shard: 0
})
[MessageType.CELL]: createDecoder(CONTENT_TOPICS[MessageType.CELL], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.POST]: createDecoder(CONTENT_TOPICS[MessageType.POST], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.COMMENT]: createDecoder(CONTENT_TOPICS[MessageType.COMMENT], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.VOTE]: createDecoder(CONTENT_TOPICS[MessageType.VOTE], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.MODERATE]: createDecoder(CONTENT_TOPICS[MessageType.MODERATE], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC })
}
/**

View File

@ -1,25 +1,24 @@
//TODO: perhaps store all messages in an indexed DB? (helpful when Waku is down)
// with a `isPublished` flag to indicate if the message has been sent to the network
import { createLightNode, LightNode } from "@waku/sdk";
import { createLightNode, LightNode, WakuEvent, HealthStatus } from "@waku/sdk";
import StoreManager from "./store";
import { CommentCache, MessageType, VoteCache, ModerateMessage } from "./types";
import { PostCache } from "./types";
import { CellCache } from "./types";
import { OpchanMessage } from "@/types/forum";
import { EphemeralProtocolsManager } from "./lightpush_filter";
import { NETWORK_CONFIG } from "./constants";
import { ReliableMessageManager } from "./reliable_channel";
export type HealthChangeCallback = (isReady: boolean) => void;
export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void;
class MessageManager {
private node: LightNode;
//TODO: implement SDS?
private ephemeralProtocolsManager: EphemeralProtocolsManager;
private reliableMessageManager: ReliableMessageManager | null = null;
private storeManager: StoreManager;
private _isReady: boolean = false;
private _currentHealth: HealthStatus = HealthStatus.Unhealthy;
private healthListeners: Set<HealthChangeCallback> = new Set();
private peerCheckInterval: NodeJS.Timeout | null = null;
public readonly messageCache: {
@ -48,55 +47,84 @@ class MessageManager {
}
public async stop() {
if (this.peerCheckInterval) {
clearInterval(this.peerCheckInterval);
this.peerCheckInterval = null;
if (this.reliableMessageManager) {
this.reliableMessageManager.cleanup();
this.reliableMessageManager = null;
}
await this.node.stop();
this.setIsReady(false);
}
private constructor(node: LightNode) {
this.node = node;
this.ephemeralProtocolsManager = new EphemeralProtocolsManager(node);
this.storeManager = new StoreManager(node);
// Start peer monitoring
this.startPeerMonitoring();
this.setupHealthMonitoring();
}
/**
* Start monitoring connected peers to determine node health
* Runs every 1 second to check if we have at least one peer
* Set up health monitoring using Waku's built-in health events
*/
private startPeerMonitoring() {
// Initial peer check
this.checkPeers();
// Regular peer checking
this.peerCheckInterval = setInterval(() => {
this.checkPeers();
}, 1000);
private setupHealthMonitoring() {
this.node.events.addEventListener(WakuEvent.Health, (event) => {
const health = event.detail;
this._currentHealth = health;
console.log(`Waku health status: ${health}`);
if (health === HealthStatus.SufficientlyHealthy) {
console.log("Waku is sufficiently healthy - initializing reliable messaging");
this.setIsReady(true);
this.initializeReliableManager();
} else if (health === HealthStatus.MinimallyHealthy) {
console.log("Waku is minimally healthy - may have issues sending/receiving messages");
this.setIsReady(true);
this.initializeReliableManager();
} else {
console.log("Waku is unhealthy - disconnected from network");
this.setIsReady(false);
this.cleanupReliableManager();
}
});
}
/**
* Check if we have connected peers and update ready state
*/
private async checkPeers() {
private async initializeReliableManager() {
// Only initialize if not already initialized
if (this.reliableMessageManager) {
return;
}
try {
const peers = await this.node.getConnectedPeers();
this.setIsReady(peers.length >= 1);
} catch (err) {
console.error("Error checking peers:", err);
this.setIsReady(false);
this.reliableMessageManager = new ReliableMessageManager(this.node);
// Set up listener for incoming reliable messages
this.reliableMessageManager.addIncomingMessageListener({
onMessage: (message) => {
console.log("Received reliable message:", message);
this.updateCache(message);
}
});
console.log("Reliable message manager initialized successfully");
} catch (error) {
console.error("Failed to initialize reliable message manager:", error);
}
}
private cleanupReliableManager() {
if (this.reliableMessageManager) {
console.log("Cleaning up reliable message manager due to health status");
this.reliableMessageManager.cleanup();
this.reliableMessageManager = null;
}
}
private setIsReady(isReady: boolean) {
if (this._isReady !== isReady) {
this._isReady = isReady;
// Notify all health listeners
this.healthListeners.forEach(listener => listener(isReady));
// Notify all health listeners with both ready state and health status
this.healthListeners.forEach(listener => listener(isReady, this._currentHealth));
}
}
@ -107,6 +135,13 @@ class MessageManager {
return this._isReady;
}
/**
* Returns the current Waku health status
*/
public get currentHealth(): HealthStatus {
return this._currentHealth;
}
/**
* Subscribe to health status changes
* @param callback Function to call when health status changes
@ -116,7 +151,7 @@ class MessageManager {
this.healthListeners.add(callback);
// Immediately call with current status
callback(this._isReady);
callback(this._isReady, this._currentHealth);
// Return unsubscribe function
return () => {
@ -125,31 +160,28 @@ class MessageManager {
}
/**
* Waits for the node to connect to at least one peer
* Waits for the node to achieve at least minimally healthy status
* @param timeoutMs Maximum time to wait in milliseconds
* @returns Promise that resolves when connected or rejects on timeout
* @returns Promise that resolves when healthy or rejects on timeout
*/
public async waitForRemotePeer(timeoutMs: number = 15000): Promise<boolean> {
if (this._isReady) return true;
return new Promise<boolean>((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error(`Timed out waiting for remote peer after ${timeoutMs}ms`));
reject(new Error(`Timed out waiting for healthy network connection after ${timeoutMs}ms`));
}, timeoutMs);
const checkHandler = (isReady: boolean) => {
if (isReady) {
const checkHandler = (isReady: boolean, health: HealthStatus) => {
if (isReady && (health === HealthStatus.MinimallyHealthy || health === HealthStatus.SufficientlyHealthy)) {
clearTimeout(timeout);
this.healthListeners.delete(checkHandler);
resolve(true);
}
};
// Add temporary listener for peer connection
// Add temporary listener for health status
this.healthListeners.add(checkHandler);
// Also do an immediate check in case we already have peers
this.checkPeers();
});
}
@ -165,19 +197,23 @@ class MessageManager {
}
public async sendMessage(message: OpchanMessage) {
await this.ephemeralProtocolsManager.sendMessage(message);
//TODO: should we update the cache here? or just from store/filter?
this.updateCache(message);
}
public async subscribeToMessages(types: MessageType[] = [MessageType.CELL, MessageType.POST, MessageType.COMMENT, MessageType.VOTE, MessageType.MODERATE]) {
const { result, subscription } = await this.ephemeralProtocolsManager.subscribeToMessages(types);
for (const message of result) {
this.updateCache(message);
if (!this.reliableMessageManager) {
throw new Error("Reliable message manager not initialized");
}
// Use reliable channel with status tracking
const messageId = await this.reliableMessageManager.sendMessage(message, {
onSent: (id) => console.log(`Message ${id} sent ✓`),
onAcknowledged: (id) => console.log(`Message ${id} acknowledged ✓✓`),
onError: (id, error) => console.error(`Message ${id} failed:`, error)
});
return { messages: result, subscription };
console.log(`Sent reliable message with ID: ${messageId}`);
// Update local cache immediately for optimistic UI
this.updateCache(message);
return messageId;
}
private updateCache(message: OpchanMessage) {

View File

@ -1,43 +0,0 @@
import { LightNode } from "@waku/sdk";
import { CellMessage, CommentMessage, MessageType, PostMessage, VoteMessage, ModerateMessage } from "./types";
import { OpchanMessage } from "@/types/forum";
import { encodeMessage, encoders, decoders, decodeMessage } from "./codec";
export class EphemeralProtocolsManager {
private node: LightNode;
constructor(node: LightNode) {
this.node = node;
}
public async sendMessage(message: OpchanMessage) {
const encodedMessage = encodeMessage(message);
const result = await this.node.lightPush.send(encoders[message.type], {
payload: encodedMessage
});
return result;
}
public async subscribeToMessages(types: MessageType[]) {
const result: (CellMessage | PostMessage | CommentMessage | VoteMessage | ModerateMessage)[] = [];
const subscription = await this.node.filter.subscribe(Object.values(decoders), async (message) => {
const {payload} = message;
const decodedMessage = decodeMessage(payload);
if (types.includes(decodedMessage.type)) {
result.push(decodedMessage);
}
});
if (subscription.error) {
throw new Error(subscription.error);
}
if (subscription.results.successes.length === 0) {
throw new Error("No successes");
}
return {result, subscription};
}
}

View File

@ -1,4 +1,5 @@
import messageManager from '@/lib/waku';
import { HealthStatus } from '@waku/sdk';
export type ToastFunction = (props: {
title: string;
@ -44,8 +45,7 @@ export const initializeNetwork = async (
toast({ title: 'Connection timeout', description: 'Could not connect to any peers. Some features may be unavailable.', variant: 'destructive' });
console.warn('Timeout connecting to peer:', err);
}
await messageManager.queryStore();
await messageManager.subscribeToMessages();
// await messageManager.queryStore();
updateStateFromCache();
} catch (err) {
console.error('Error loading forum data:', err);
@ -81,10 +81,13 @@ export const monitorNetworkHealth = (
toast: ToastFunction,
): { unsubscribe: () => void } => {
setIsNetworkConnected(messageManager.isReady);
const unsubscribe = messageManager.onHealthChange((isReady) => {
const unsubscribe = messageManager.onHealthChange((isReady, health) => {
setIsNetworkConnected(isReady);
if (isReady) {
toast({ title: 'Network connected', description: 'Connected to the Waku network' });
if (health === HealthStatus.SufficientlyHealthy) {
toast({ title: 'Network connected', description: 'Connected to the Waku network with excellent connectivity' });
} else if (health === HealthStatus.MinimallyHealthy) {
toast({ title: 'Network connected', description: 'Connected to Waku network. Some features may be limited.', variant: 'default' });
} else {
toast({ title: 'Network disconnected', description: 'Lost connection to the Waku network', variant: 'destructive' });
}

View File

@ -0,0 +1,133 @@
import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk";
import { MessageType } from "./types";
import { decodeMessage, decoders, encodeMessage, encoders } from "./codec";
import { generateStringId } from "@/lib/utils";
import { OpchanMessage } from "@/types/forum";
export interface MessageStatusCallback {
onSent?: (messageId: string) => void;
onAcknowledged?: (messageId: string) => void;
onError?: (messageId: string, error: string) => void;
}
export interface IncomingMessageCallback {
onMessage: (message: OpchanMessage) => void;
}
export class ReliableMessageManager {
private channels: Map<MessageType, ReliableChannel<IDecodedMessage>> = new Map();
private messageCallbacks: Map<string, MessageStatusCallback> = new Map();
private incomingMessageCallbacks: IncomingMessageCallback[] = [];
constructor(node: LightNode) {
this.initializeChannels(node);
}
private async initializeChannels(node: LightNode) {
for (const type of Object.values(MessageType)) {
const encoder = encoders[type];
const decoder = decoders[type];
const senderId = generateStringId();
const channelId = `opchan-${type}`; // Unique channel ID for each message type
try {
const channel = await ReliableChannel.create(node, channelId, senderId, encoder, decoder);
this.channels.set(type, channel);
this.setupChannelListeners(channel, type);
} catch (error) {
console.error(`Failed to create reliable channel for ${type}:`, error);
}
}
}
private setupChannelListeners(channel: ReliableChannel<IDecodedMessage>, type: MessageType) {
channel.addEventListener(ReliableChannelEvent.InMessageReceived, (event) => {
try {
const wakuMessage = event.detail;
if (wakuMessage.payload) {
const opchanMessage = decodeMessage(wakuMessage.payload);
this.incomingMessageCallbacks.forEach(callback => {
callback.onMessage(opchanMessage);
});
}
} catch (error) {
console.error(`Failed to process incoming message for ${type}:`, error);
}
});
// Listen for outgoing message status updates
channel.addEventListener(ReliableChannelEvent.OutMessageSent, (event) => {
const messageId = event.detail;
const callback = this.messageCallbacks.get(messageId);
if (callback?.onSent) {
callback.onSent(messageId);
}
});
channel.addEventListener(ReliableChannelEvent.OutMessageAcknowledged, (event) => {
const messageId = event.detail;
const callback = this.messageCallbacks.get(messageId);
if (callback?.onAcknowledged) {
callback.onAcknowledged(messageId);
}
});
channel.addEventListener(ReliableChannelEvent.OutMessageIrrecoverableError, (event) => {
const messageId = event.detail.messageId;
const error = event.detail.error;
const callback = this.messageCallbacks.get(messageId);
if (callback?.onError) {
callback.onError(messageId, error?.toString() || 'Unknown error');
}
// Clean up callback after error
this.messageCallbacks.delete(messageId);
});
}
public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise<string> {
const channel = this.channels.get(message.type);
if (!channel) {
throw new Error(`No reliable channel for message type: ${message.type}`);
}
const encodedMessage = encodeMessage(message);
const messageId = ReliableChannel.getMessageId(encodedMessage);
// Store callback for this message
if (statusCallback) {
this.messageCallbacks.set(messageId, statusCallback);
}
try {
await channel.send(encodedMessage);
return messageId;
} catch (error) {
// Clean up callback on immediate send failure
this.messageCallbacks.delete(messageId);
throw error;
}
}
public addIncomingMessageListener(callback: IncomingMessageCallback) {
this.incomingMessageCallbacks.push(callback);
}
public removeIncomingMessageListener(callback: IncomingMessageCallback) {
const index = this.incomingMessageCallbacks.indexOf(callback);
if (index > -1) {
this.incomingMessageCallbacks.splice(index, 1);
}
}
public getChannelStatus(type: MessageType): boolean {
return this.channels.has(type);
}
public cleanup() {
this.messageCallbacks.clear();
this.incomingMessageCallbacks.length = 0;
this.channels.clear();
}
}