mirror of
https://github.com/logos-messaging/OpChan.git
synced 2026-01-02 21:03:09 +00:00
chore(lib/waku): improve structuring
This commit is contained in:
parent
f720a96735
commit
c64bcff475
@ -1,6 +1,6 @@
|
||||
import { RelevanceCalculator } from '../relevance';
|
||||
import { Post, Comment, User, UserVerificationStatus, EVerificationStatus } from '@/types/forum';
|
||||
import { VoteMessage, MessageType } from '@/lib/waku/types';
|
||||
import { VoteMessage, MessageType } from '@/types/waku';
|
||||
import { expect, describe, beforeEach, it } from 'vitest';
|
||||
|
||||
describe('RelevanceCalculator', () => {
|
||||
|
||||
@ -6,7 +6,7 @@ import {
|
||||
PostMessage,
|
||||
VoteMessage,
|
||||
ModerateMessage,
|
||||
} from '@/lib/waku/types';
|
||||
} from '@/types/waku';
|
||||
import { Cell, Comment, Post, User } from '@/types/forum';
|
||||
import { transformCell, transformComment, transformPost } from './transformers';
|
||||
import { MessageService, AuthService, CryptoService } from '@/lib/services';
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { Post, Comment, Cell, User, RelevanceScoreDetails, UserVerificationStatus } from '@/types/forum';
|
||||
import { VoteMessage } from '@/lib/waku/types';
|
||||
import { VoteMessage } from '@/types/waku';
|
||||
|
||||
export class RelevanceCalculator {
|
||||
private static readonly BASE_SCORES = {
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
import { Cell, Post, Comment, OpchanMessage } from '@/types/forum';
|
||||
import { CellMessage, CommentMessage, PostMessage, VoteMessage } from '@/lib/waku/types';
|
||||
import { CellMessage, CommentMessage, PostMessage, VoteMessage } from '@/types/waku';
|
||||
import messageManager from '@/lib/waku';
|
||||
import { RelevanceCalculator } from './relevance';
|
||||
import { UserVerificationStatus } from '@/types/forum';
|
||||
|
||||
@ -4,13 +4,13 @@ import {
|
||||
IEncoder,
|
||||
LightNode,
|
||||
} from '@waku/sdk';
|
||||
import { MessageType } from './types';
|
||||
import { MessageType } from '../../types/waku';
|
||||
import {
|
||||
CellMessage,
|
||||
PostMessage,
|
||||
CommentMessage,
|
||||
VoteMessage,
|
||||
} from './types';
|
||||
} from '../../types/waku';
|
||||
import { CONTENT_TOPICS } from './constants';
|
||||
import { OpchanMessage } from '@/types/forum';
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { MessageType } from "./types";
|
||||
import { MessageType } from "../../types/waku";
|
||||
|
||||
/**
|
||||
* Content topics for different message types
|
||||
|
||||
111
src/lib/waku/core/ReliableMessaging.ts
Normal file
111
src/lib/waku/core/ReliableMessaging.ts
Normal file
@ -0,0 +1,111 @@
|
||||
import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk";
|
||||
import { MessageType } from "../../../types/waku";
|
||||
import { CodecManager } from "../CodecManager";
|
||||
import { generateStringId } from "@/lib/utils";
|
||||
import { OpchanMessage } from "@/types/forum";
|
||||
|
||||
export interface MessageStatusCallback {
|
||||
onSent?: (messageId: string) => void;
|
||||
onAcknowledged?: (messageId: string) => void;
|
||||
onError?: (messageId: string, error: string) => void;
|
||||
}
|
||||
|
||||
export type IncomingMessageCallback = (message: OpchanMessage) => void;
|
||||
|
||||
export class ReliableMessaging {
|
||||
private channels: Map<MessageType, ReliableChannel<IDecodedMessage>> = new Map();
|
||||
private messageCallbacks: Map<string, MessageStatusCallback> = new Map();
|
||||
private incomingMessageCallbacks: Set<IncomingMessageCallback> = new Set();
|
||||
private codecManager: CodecManager;
|
||||
|
||||
constructor(node: LightNode) {
|
||||
this.codecManager = new CodecManager(node);
|
||||
this.initializeChannels(node);
|
||||
}
|
||||
|
||||
private async initializeChannels(node: LightNode): Promise<void> {
|
||||
for (const type of Object.values(MessageType)) {
|
||||
const encoder = this.codecManager.getEncoder(type);
|
||||
const decoder = this.codecManager.getDecoder(type);
|
||||
const senderId = generateStringId();
|
||||
const channelId = `opchan-${type}`;
|
||||
|
||||
try {
|
||||
const channel = await ReliableChannel.create(node, channelId, senderId, encoder, decoder);
|
||||
this.channels.set(type, channel);
|
||||
this.setupChannelListeners(channel, type);
|
||||
} catch (error) {
|
||||
console.error(`Failed to create reliable channel for ${type}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private setupChannelListeners(channel: ReliableChannel<IDecodedMessage>, type: MessageType): void {
|
||||
channel.addEventListener(ReliableChannelEvent.InMessageReceived, (event) => {
|
||||
try {
|
||||
const wakuMessage = event.detail;
|
||||
if (wakuMessage.payload) {
|
||||
const opchanMessage = this.codecManager.decodeMessage(wakuMessage.payload);
|
||||
this.incomingMessageCallbacks.forEach(callback => callback(opchanMessage));
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Failed to process incoming message for ${type}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
channel.addEventListener(ReliableChannelEvent.OutMessageSent, (event) => {
|
||||
const messageId = event.detail;
|
||||
this.messageCallbacks.get(messageId)?.onSent?.(messageId);
|
||||
});
|
||||
|
||||
channel.addEventListener(ReliableChannelEvent.OutMessageAcknowledged, (event) => {
|
||||
const messageId = event.detail;
|
||||
this.messageCallbacks.get(messageId)?.onAcknowledged?.(messageId);
|
||||
});
|
||||
|
||||
channel.addEventListener(ReliableChannelEvent.OutMessageIrrecoverableError, (event) => {
|
||||
const messageId = event.detail.messageId;
|
||||
const error = event.detail.error;
|
||||
const callback = this.messageCallbacks.get(messageId);
|
||||
|
||||
if (callback?.onError) {
|
||||
callback.onError(messageId, error?.toString() || 'Unknown error');
|
||||
}
|
||||
|
||||
this.messageCallbacks.delete(messageId);
|
||||
});
|
||||
}
|
||||
|
||||
public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise<string> {
|
||||
const channel = this.channels.get(message.type);
|
||||
if (!channel) {
|
||||
throw new Error(`No reliable channel for message type: ${message.type}`);
|
||||
}
|
||||
|
||||
const encodedMessage = this.codecManager.encodeMessage(message);
|
||||
const messageId = ReliableChannel.getMessageId(encodedMessage);
|
||||
|
||||
if (statusCallback) {
|
||||
this.messageCallbacks.set(messageId, statusCallback);
|
||||
}
|
||||
|
||||
try {
|
||||
await channel.send(encodedMessage);
|
||||
return messageId;
|
||||
} catch (error) {
|
||||
this.messageCallbacks.delete(messageId);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public onMessage(callback: IncomingMessageCallback): () => void {
|
||||
this.incomingMessageCallbacks.add(callback);
|
||||
return () => this.incomingMessageCallbacks.delete(callback);
|
||||
}
|
||||
|
||||
public cleanup(): void {
|
||||
this.messageCallbacks.clear();
|
||||
this.incomingMessageCallbacks.clear();
|
||||
this.channels.clear();
|
||||
}
|
||||
}
|
||||
85
src/lib/waku/core/WakuNodeManager.ts
Normal file
85
src/lib/waku/core/WakuNodeManager.ts
Normal file
@ -0,0 +1,85 @@
|
||||
import { createLightNode, LightNode, WakuEvent, HealthStatus } from "@waku/sdk";
|
||||
|
||||
export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void;
|
||||
|
||||
export class WakuNodeManager {
|
||||
private node: LightNode | null = null;
|
||||
private _isReady: boolean = false;
|
||||
private _currentHealth: HealthStatus = HealthStatus.Unhealthy;
|
||||
private healthListeners: Set<HealthChangeCallback> = new Set();
|
||||
|
||||
public static async create(): Promise<WakuNodeManager> {
|
||||
const manager = new WakuNodeManager();
|
||||
await manager.initialize();
|
||||
return manager;
|
||||
}
|
||||
|
||||
private async initialize(): Promise<void> {
|
||||
this.node = await createLightNode({
|
||||
defaultBootstrap: true,
|
||||
autoStart: true,
|
||||
});
|
||||
this.setupHealthMonitoring();
|
||||
}
|
||||
|
||||
private setupHealthMonitoring(): void {
|
||||
if (!this.node) return;
|
||||
|
||||
this.node.events.addEventListener(WakuEvent.Health, (event) => {
|
||||
const health = event.detail;
|
||||
this._currentHealth = health;
|
||||
|
||||
console.log(`Waku health status: ${health}`);
|
||||
|
||||
const wasReady = this._isReady;
|
||||
this._isReady = health === HealthStatus.SufficientlyHealthy || health === HealthStatus.MinimallyHealthy;
|
||||
|
||||
if (wasReady !== this._isReady) {
|
||||
this.notifyHealthChange();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private notifyHealthChange(): void {
|
||||
this.healthListeners.forEach(listener => listener(this._isReady, this._currentHealth));
|
||||
}
|
||||
|
||||
public getNode(): LightNode {
|
||||
if (!this.node) {
|
||||
throw new Error("Node not initialized");
|
||||
}
|
||||
return this.node;
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
this.healthListeners.clear();
|
||||
if (this.node) {
|
||||
await this.node.stop();
|
||||
this.node = null;
|
||||
}
|
||||
}
|
||||
|
||||
public get isInitialized(): boolean {
|
||||
return this.node !== null;
|
||||
}
|
||||
|
||||
public get isReady(): boolean {
|
||||
return this._isReady;
|
||||
}
|
||||
|
||||
public get currentHealth(): HealthStatus {
|
||||
return this._currentHealth;
|
||||
}
|
||||
|
||||
public onHealthChange(callback: HealthChangeCallback): () => void {
|
||||
this.healthListeners.add(callback);
|
||||
|
||||
// Immediately call with current status
|
||||
callback(this._isReady, this._currentHealth);
|
||||
|
||||
// Return unsubscribe function
|
||||
return () => {
|
||||
this.healthListeners.delete(callback);
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -1,232 +1,115 @@
|
||||
//TODO: perhaps store all messages in an indexed DB? (helpful when Waku is down)
|
||||
// with a `isPublished` flag to indicate if the message has been sent to the network
|
||||
|
||||
import { createLightNode, LightNode, WakuEvent, HealthStatus } from "@waku/sdk";
|
||||
import { CommentCache, MessageType, VoteCache, ModerateMessage } from "./types";
|
||||
import { PostCache } from "./types";
|
||||
import { CellCache } from "./types";
|
||||
import { HealthStatus } from "@waku/sdk";
|
||||
import { OpchanMessage } from "@/types/forum";
|
||||
import { ReliableMessageManager } from "./reliable_channel";
|
||||
import { WakuNodeManager, HealthChangeCallback } from "./core/WakuNodeManager";
|
||||
import { CacheService } from "./services/CacheService";
|
||||
import { MessageService, MessageStatusCallback } from "./services/MessageService";
|
||||
import { ReliableMessaging } from "./core/ReliableMessaging";
|
||||
|
||||
export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void;
|
||||
export type { HealthChangeCallback, MessageStatusCallback };
|
||||
|
||||
class MessageManager {
|
||||
private node: LightNode;
|
||||
private reliableMessageManager: ReliableMessageManager | null = null;
|
||||
private _isReady: boolean = false;
|
||||
private _currentHealth: HealthStatus = HealthStatus.Unhealthy;
|
||||
private healthListeners: Set<HealthChangeCallback> = new Set();
|
||||
private processedMessageIds: Set<string> = new Set(); // Track processed message IDs
|
||||
private nodeManager: WakuNodeManager | null = null;
|
||||
private cacheService: CacheService;
|
||||
private messageService: MessageService | null = null;
|
||||
private reliableMessaging: ReliableMessaging | null = null;
|
||||
|
||||
constructor() {
|
||||
this.cacheService = new CacheService();
|
||||
}
|
||||
|
||||
public readonly messageCache: {
|
||||
cells: CellCache;
|
||||
posts: PostCache;
|
||||
comments: CommentCache;
|
||||
votes: VoteCache;
|
||||
moderations: { [targetId: string]: ModerateMessage };
|
||||
} = {
|
||||
cells: {},
|
||||
posts: {},
|
||||
comments: {},
|
||||
votes: {},
|
||||
moderations: {}
|
||||
}
|
||||
public static async create(): Promise<MessageManager> {
|
||||
const manager = new MessageManager();
|
||||
await manager.initialize();
|
||||
return manager;
|
||||
}
|
||||
|
||||
public static async create(): Promise<MessageManager> {
|
||||
const node = await createLightNode({
|
||||
defaultBootstrap: true,
|
||||
autoStart: true,
|
||||
});
|
||||
|
||||
return new MessageManager(node);
|
||||
}
|
||||
|
||||
public async stop() {
|
||||
if (this.reliableMessageManager) {
|
||||
this.reliableMessageManager.cleanup();
|
||||
this.reliableMessageManager = null;
|
||||
private async initialize(): Promise<void> {
|
||||
try {
|
||||
this.nodeManager = await WakuNodeManager.create();
|
||||
|
||||
// Now create message service with proper dependencies
|
||||
this.messageService = new MessageService(this.cacheService, this.reliableMessaging, this.nodeManager);
|
||||
|
||||
// Set up health-based reliable messaging initialization
|
||||
this.nodeManager.onHealthChange((isReady) => {
|
||||
if (isReady && !this.reliableMessaging) {
|
||||
this.initializeReliableMessaging();
|
||||
} else if (!isReady && this.reliableMessaging) {
|
||||
this.cleanupReliableMessaging();
|
||||
}
|
||||
|
||||
await this.node.stop();
|
||||
this.setIsReady(false);
|
||||
});
|
||||
|
||||
} catch (error) {
|
||||
console.error("Failed to initialize MessageManager:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
private async initializeReliableMessaging(): Promise<void> {
|
||||
if (!this.nodeManager || this.reliableMessaging) {
|
||||
return;
|
||||
}
|
||||
|
||||
private constructor(node: LightNode) {
|
||||
this.node = node;
|
||||
this.setupHealthMonitoring();
|
||||
try {
|
||||
console.log("Initializing reliable messaging...");
|
||||
this.reliableMessaging = new ReliableMessaging(this.nodeManager.getNode());
|
||||
this.messageService?.updateReliableMessaging(this.reliableMessaging);
|
||||
console.log("Reliable messaging initialized successfully");
|
||||
} catch (error) {
|
||||
console.error("Failed to initialize reliable messaging:", error);
|
||||
}
|
||||
}
|
||||
|
||||
private cleanupReliableMessaging(): void {
|
||||
if (this.reliableMessaging) {
|
||||
console.log("Cleaning up reliable messaging due to health status");
|
||||
this.reliableMessaging.cleanup();
|
||||
this.reliableMessaging = null;
|
||||
this.messageService?.updateReliableMessaging(null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up health monitoring using Waku's built-in health events
|
||||
*/
|
||||
private setupHealthMonitoring() {
|
||||
this.node.events.addEventListener(WakuEvent.Health, (event) => {
|
||||
const health = event.detail;
|
||||
this._currentHealth = health;
|
||||
|
||||
console.log(`Waku health status: ${health}`);
|
||||
|
||||
if (health === HealthStatus.SufficientlyHealthy) {
|
||||
console.log("Waku is sufficiently healthy - initializing reliable messaging");
|
||||
this.setIsReady(true);
|
||||
this.initializeReliableManager();
|
||||
} else if (health === HealthStatus.MinimallyHealthy) {
|
||||
console.log("Waku is minimally healthy - may have issues sending/receiving messages");
|
||||
this.setIsReady(true);
|
||||
this.initializeReliableManager();
|
||||
} else {
|
||||
console.log("Waku is unhealthy - disconnected from network");
|
||||
this.setIsReady(false);
|
||||
this.cleanupReliableManager();
|
||||
}
|
||||
});
|
||||
public async stop(): Promise<void> {
|
||||
this.cleanupReliableMessaging();
|
||||
this.messageService?.cleanup();
|
||||
await this.nodeManager?.stop();
|
||||
}
|
||||
|
||||
public get isReady(): boolean {
|
||||
return this.nodeManager?.isReady ?? false;
|
||||
}
|
||||
|
||||
public get currentHealth(): HealthStatus {
|
||||
return this.nodeManager?.currentHealth ?? HealthStatus.Unhealthy;
|
||||
}
|
||||
|
||||
public onHealthChange(callback: HealthChangeCallback): () => void {
|
||||
if (!this.nodeManager) {
|
||||
throw new Error("Node manager not initialized");
|
||||
}
|
||||
return this.nodeManager.onHealthChange(callback);
|
||||
}
|
||||
|
||||
private async initializeReliableManager() {
|
||||
// Only initialize if not already initialized
|
||||
if (this.reliableMessageManager) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
this.reliableMessageManager = new ReliableMessageManager(this.node);
|
||||
|
||||
// Set up listener for incoming reliable messages
|
||||
this.reliableMessageManager.addIncomingMessageListener({
|
||||
onMessage: (message) => {
|
||||
// Check if we've already processed this exact message
|
||||
const messageKey = `${message.type}:${message.id}:${message.timestamp}`;
|
||||
if (this.processedMessageIds.has(messageKey)) {
|
||||
console.log(`Received message ${messageKey} but it has already been processed`);
|
||||
return;
|
||||
}
|
||||
|
||||
this.processedMessageIds.add(messageKey);
|
||||
this.updateCache(message);
|
||||
}
|
||||
});
|
||||
|
||||
console.log("Reliable message manager initialized successfully");
|
||||
} catch (error) {
|
||||
console.error("Failed to initialize reliable message manager:", error);
|
||||
}
|
||||
public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise<string> {
|
||||
if (!this.messageService) {
|
||||
throw new Error("MessageManager not fully initialized");
|
||||
}
|
||||
return this.messageService.sendMessage(message, statusCallback);
|
||||
}
|
||||
|
||||
private cleanupReliableManager() {
|
||||
if (this.reliableMessageManager) {
|
||||
console.log("Cleaning up reliable message manager due to health status");
|
||||
this.reliableMessageManager.cleanup();
|
||||
this.reliableMessageManager = null;
|
||||
}
|
||||
public onMessageReceived(callback: (message: OpchanMessage) => void): () => void {
|
||||
if (!this.messageService) {
|
||||
throw new Error("MessageManager not fully initialized");
|
||||
}
|
||||
return this.messageService.onMessageReceived(callback);
|
||||
}
|
||||
|
||||
private setIsReady(isReady: boolean) {
|
||||
if (this._isReady !== isReady) {
|
||||
this._isReady = isReady;
|
||||
// Notify all health listeners with both ready state and health status
|
||||
this.healthListeners.forEach(listener => listener(isReady, this._currentHealth));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the node is currently healthy and ready for use
|
||||
*/
|
||||
public get isReady(): boolean {
|
||||
return this._isReady;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current Waku health status
|
||||
*/
|
||||
public get currentHealth(): HealthStatus {
|
||||
return this._currentHealth;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to health status changes
|
||||
* @param callback Function to call when health status changes
|
||||
* @returns Function to unsubscribe
|
||||
*/
|
||||
public onHealthChange(callback: HealthChangeCallback): () => void {
|
||||
this.healthListeners.add(callback);
|
||||
|
||||
// Immediately call with current status
|
||||
callback(this._isReady, this._currentHealth);
|
||||
|
||||
// Return unsubscribe function
|
||||
return () => {
|
||||
this.healthListeners.delete(callback);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
|
||||
public async sendMessage(message: OpchanMessage) {
|
||||
if (!this.reliableMessageManager) {
|
||||
throw new Error("Reliable message manager not initialized");
|
||||
}
|
||||
|
||||
// Track this message as processed (optimistic)
|
||||
const messageKey = `${message.type}:${message.id}:${message.timestamp}`;
|
||||
this.processedMessageIds.add(messageKey);
|
||||
|
||||
// Use reliable channel with status tracking
|
||||
const messageId = await this.reliableMessageManager.sendMessage(message, {
|
||||
onSent: (id) => console.log(`Message ${id} sent`),
|
||||
onAcknowledged: (id) => console.log(`Message ${id} acknowledged`),
|
||||
onError: (id, error) => console.error(`Message ${id} failed:`, error)
|
||||
});
|
||||
|
||||
// Update local cache immediately for optimistic UI
|
||||
this.updateCache(message);
|
||||
|
||||
return messageId;
|
||||
}
|
||||
|
||||
private updateCache(message: OpchanMessage) {
|
||||
switch (message.type) {
|
||||
case MessageType.CELL:
|
||||
if (!this.messageCache.cells[message.id] ||
|
||||
this.messageCache.cells[message.id].timestamp !== message.timestamp) {
|
||||
this.messageCache.cells[message.id] = message;
|
||||
}
|
||||
break;
|
||||
case MessageType.POST:
|
||||
if (!this.messageCache.posts[message.id] ||
|
||||
this.messageCache.posts[message.id].timestamp !== message.timestamp) {
|
||||
this.messageCache.posts[message.id] = message;
|
||||
}
|
||||
break;
|
||||
case MessageType.COMMENT:
|
||||
if (!this.messageCache.comments[message.id] ||
|
||||
this.messageCache.comments[message.id].timestamp !== message.timestamp) {
|
||||
this.messageCache.comments[message.id] = message;
|
||||
}
|
||||
break;
|
||||
case MessageType.VOTE: {
|
||||
// For votes, we use a composite key of targetId + author to handle multiple votes from same user
|
||||
const voteKey = `${message.targetId}:${message.author}`;
|
||||
if (!this.messageCache.votes[voteKey] ||
|
||||
this.messageCache.votes[voteKey].timestamp !== message.timestamp) {
|
||||
this.messageCache.votes[voteKey] = message;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case MessageType.MODERATE: {
|
||||
// Type guard for ModerateMessage
|
||||
const modMsg = message as ModerateMessage;
|
||||
if (!this.messageCache.moderations[modMsg.targetId] ||
|
||||
this.messageCache.moderations[modMsg.targetId].timestamp !== modMsg.timestamp) {
|
||||
this.messageCache.moderations[modMsg.targetId] = modMsg;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
console.warn("Received message with unknown type");
|
||||
break;
|
||||
}
|
||||
public get messageCache() {
|
||||
if (!this.messageService) {
|
||||
throw new Error("MessageManager not fully initialized");
|
||||
}
|
||||
return this.messageService.messageCache;
|
||||
}
|
||||
}
|
||||
|
||||
const messageManager = await MessageManager.create();
|
||||
|
||||
@ -1,135 +0,0 @@
|
||||
import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk";
|
||||
import { MessageType } from "./types";
|
||||
import { CodecManager } from "./codec";
|
||||
import { generateStringId } from "@/lib/utils";
|
||||
import { OpchanMessage } from "@/types/forum";
|
||||
|
||||
export interface MessageStatusCallback {
|
||||
onSent?: (messageId: string) => void;
|
||||
onAcknowledged?: (messageId: string) => void;
|
||||
onError?: (messageId: string, error: string) => void;
|
||||
}
|
||||
|
||||
export interface IncomingMessageCallback {
|
||||
onMessage: (message: OpchanMessage) => void;
|
||||
}
|
||||
|
||||
export class ReliableMessageManager {
|
||||
private channels: Map<MessageType, ReliableChannel<IDecodedMessage>> = new Map();
|
||||
private messageCallbacks: Map<string, MessageStatusCallback> = new Map();
|
||||
private incomingMessageCallbacks: IncomingMessageCallback[] = [];
|
||||
private codecManager: CodecManager;
|
||||
|
||||
constructor(node: LightNode) {
|
||||
this.codecManager = new CodecManager(node);
|
||||
this.initializeChannels(node);
|
||||
}
|
||||
|
||||
private async initializeChannels(node: LightNode) {
|
||||
for (const type of Object.values(MessageType)) {
|
||||
const encoder = this.codecManager.getEncoder(type);
|
||||
const decoder = this.codecManager.getDecoder(type);
|
||||
const senderId = generateStringId();
|
||||
const channelId = `opchan-${type}`; // Unique channel ID for each message type
|
||||
|
||||
try {
|
||||
const channel = await ReliableChannel.create(node, channelId, senderId, encoder, decoder);
|
||||
this.channels.set(type, channel);
|
||||
this.setupChannelListeners(channel, type);
|
||||
} catch (error) {
|
||||
console.error(`Failed to create reliable channel for ${type}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private setupChannelListeners(channel: ReliableChannel<IDecodedMessage>, type: MessageType) {
|
||||
channel.addEventListener(ReliableChannelEvent.InMessageReceived, (event) => {
|
||||
try {
|
||||
const wakuMessage = event.detail;
|
||||
if (wakuMessage.payload) {
|
||||
const opchanMessage = this.codecManager.decodeMessage(wakuMessage.payload);
|
||||
|
||||
|
||||
this.incomingMessageCallbacks.forEach(callback => {
|
||||
callback.onMessage(opchanMessage);
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Failed to process incoming message for ${type}:`, error);
|
||||
}
|
||||
});
|
||||
|
||||
// Listen for outgoing message status updates
|
||||
channel.addEventListener(ReliableChannelEvent.OutMessageSent, (event) => {
|
||||
const messageId = event.detail;
|
||||
const callback = this.messageCallbacks.get(messageId);
|
||||
if (callback?.onSent) {
|
||||
callback.onSent(messageId);
|
||||
}
|
||||
});
|
||||
|
||||
channel.addEventListener(ReliableChannelEvent.OutMessageAcknowledged, (event) => {
|
||||
const messageId = event.detail;
|
||||
const callback = this.messageCallbacks.get(messageId);
|
||||
if (callback?.onAcknowledged) {
|
||||
callback.onAcknowledged(messageId);
|
||||
}
|
||||
});
|
||||
|
||||
channel.addEventListener(ReliableChannelEvent.OutMessageIrrecoverableError, (event) => {
|
||||
const messageId = event.detail.messageId;
|
||||
const error = event.detail.error;
|
||||
const callback = this.messageCallbacks.get(messageId);
|
||||
if (callback?.onError) {
|
||||
callback.onError(messageId, error?.toString() || 'Unknown error');
|
||||
}
|
||||
// Clean up callback after error
|
||||
this.messageCallbacks.delete(messageId);
|
||||
});
|
||||
}
|
||||
|
||||
public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise<string> {
|
||||
const channel = this.channels.get(message.type);
|
||||
if (!channel) {
|
||||
throw new Error(`No reliable channel for message type: ${message.type}`);
|
||||
}
|
||||
|
||||
const encodedMessage = this.codecManager.encodeMessage(message);
|
||||
const messageId = ReliableChannel.getMessageId(encodedMessage);
|
||||
|
||||
// Store callback for this message
|
||||
if (statusCallback) {
|
||||
this.messageCallbacks.set(messageId, statusCallback);
|
||||
}
|
||||
|
||||
try {
|
||||
await channel.send(encodedMessage);
|
||||
return messageId;
|
||||
} catch (error) {
|
||||
// Clean up callback on immediate send failure
|
||||
this.messageCallbacks.delete(messageId);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
public addIncomingMessageListener(callback: IncomingMessageCallback) {
|
||||
this.incomingMessageCallbacks.push(callback);
|
||||
}
|
||||
|
||||
public removeIncomingMessageListener(callback: IncomingMessageCallback) {
|
||||
const index = this.incomingMessageCallbacks.indexOf(callback);
|
||||
if (index > -1) {
|
||||
this.incomingMessageCallbacks.splice(index, 1);
|
||||
}
|
||||
}
|
||||
|
||||
public getChannelStatus(type: MessageType): boolean {
|
||||
return this.channels.has(type);
|
||||
}
|
||||
|
||||
public cleanup() {
|
||||
this.messageCallbacks.clear();
|
||||
this.incomingMessageCallbacks.length = 0;
|
||||
this.channels.clear();
|
||||
}
|
||||
}
|
||||
85
src/lib/waku/services/CacheService.ts
Normal file
85
src/lib/waku/services/CacheService.ts
Normal file
@ -0,0 +1,85 @@
|
||||
import { MessageType, CellCache, PostCache, CommentCache, VoteCache, ModerateMessage } from "../../../types/waku";
|
||||
import { OpchanMessage } from "@/types/forum";
|
||||
|
||||
export interface MessageCache {
|
||||
cells: CellCache;
|
||||
posts: PostCache;
|
||||
comments: CommentCache;
|
||||
votes: VoteCache;
|
||||
moderations: { [targetId: string]: ModerateMessage };
|
||||
}
|
||||
|
||||
export class CacheService {
|
||||
private processedMessageIds: Set<string> = new Set();
|
||||
|
||||
public readonly cache: MessageCache = {
|
||||
cells: {},
|
||||
posts: {},
|
||||
comments: {},
|
||||
votes: {},
|
||||
moderations: {}
|
||||
};
|
||||
|
||||
public updateCache(message: OpchanMessage): boolean {
|
||||
// Check if we've already processed this exact message
|
||||
const messageKey = `${message.type}:${message.id}:${message.timestamp}`;
|
||||
if (this.processedMessageIds.has(messageKey)) {
|
||||
return false; // Already processed
|
||||
}
|
||||
|
||||
this.processedMessageIds.add(messageKey);
|
||||
this.storeMessage(message);
|
||||
return true; // Newly processed
|
||||
}
|
||||
|
||||
private storeMessage(message: OpchanMessage): void {
|
||||
switch (message.type) {
|
||||
case MessageType.CELL:
|
||||
if (!this.cache.cells[message.id] ||
|
||||
this.cache.cells[message.id].timestamp !== message.timestamp) {
|
||||
this.cache.cells[message.id] = message;
|
||||
}
|
||||
break;
|
||||
case MessageType.POST:
|
||||
if (!this.cache.posts[message.id] ||
|
||||
this.cache.posts[message.id].timestamp !== message.timestamp) {
|
||||
this.cache.posts[message.id] = message;
|
||||
}
|
||||
break;
|
||||
case MessageType.COMMENT:
|
||||
if (!this.cache.comments[message.id] ||
|
||||
this.cache.comments[message.id].timestamp !== message.timestamp) {
|
||||
this.cache.comments[message.id] = message;
|
||||
}
|
||||
break;
|
||||
case MessageType.VOTE: {
|
||||
const voteKey = `${message.targetId}:${message.author}`;
|
||||
if (!this.cache.votes[voteKey] ||
|
||||
this.cache.votes[voteKey].timestamp !== message.timestamp) {
|
||||
this.cache.votes[voteKey] = message;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case MessageType.MODERATE: {
|
||||
const modMsg = message as ModerateMessage;
|
||||
if (!this.cache.moderations[modMsg.targetId] ||
|
||||
this.cache.moderations[modMsg.targetId].timestamp !== modMsg.timestamp) {
|
||||
this.cache.moderations[modMsg.targetId] = modMsg;
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
console.warn("Received message with unknown type");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public clear(): void {
|
||||
this.processedMessageIds.clear();
|
||||
this.cache.cells = {};
|
||||
this.cache.posts = {};
|
||||
this.cache.comments = {};
|
||||
this.cache.votes = {};
|
||||
this.cache.moderations = {};
|
||||
}
|
||||
}
|
||||
80
src/lib/waku/services/MessageService.ts
Normal file
80
src/lib/waku/services/MessageService.ts
Normal file
@ -0,0 +1,80 @@
|
||||
import { OpchanMessage } from "@/types/forum";
|
||||
import { CacheService } from "./CacheService";
|
||||
import { ReliableMessaging, MessageStatusCallback } from "../core/ReliableMessaging";
|
||||
import { WakuNodeManager } from "../core/WakuNodeManager";
|
||||
|
||||
export type MessageReceivedCallback = (message: OpchanMessage) => void;
|
||||
export type { MessageStatusCallback };
|
||||
|
||||
export class MessageService {
|
||||
private messageReceivedCallbacks: Set<MessageReceivedCallback> = new Set();
|
||||
|
||||
constructor(
|
||||
private cacheService: CacheService,
|
||||
private reliableMessaging: ReliableMessaging | null,
|
||||
private nodeManager: WakuNodeManager
|
||||
) {
|
||||
this.setupMessageHandling();
|
||||
}
|
||||
|
||||
private setupMessageHandling(): void {
|
||||
if (this.reliableMessaging) {
|
||||
this.reliableMessaging.onMessage((message) => {
|
||||
const isNew = this.cacheService.updateCache(message);
|
||||
if (isNew) {
|
||||
this.messageReceivedCallbacks.forEach(callback => callback(message));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async sendMessage(message: OpchanMessage, statusCallback?: MessageStatusCallback): Promise<string> {
|
||||
if (!this.reliableMessaging) {
|
||||
throw new Error("Reliable messaging not initialized");
|
||||
}
|
||||
|
||||
if (!this.nodeManager.isReady) {
|
||||
throw new Error("Network not ready");
|
||||
}
|
||||
|
||||
// Update cache optimistically
|
||||
this.cacheService.updateCache(message);
|
||||
|
||||
// Send via reliable messaging with status tracking
|
||||
const messageId = await this.reliableMessaging.sendMessage(message, {
|
||||
onSent: (id) => {
|
||||
console.log(`Message ${id} sent`);
|
||||
statusCallback?.onSent?.(id);
|
||||
},
|
||||
onAcknowledged: (id) => {
|
||||
console.log(`Message ${id} acknowledged`);
|
||||
statusCallback?.onAcknowledged?.(id);
|
||||
},
|
||||
onError: (id, error) => {
|
||||
console.error(`Message ${id} failed:`, error);
|
||||
statusCallback?.onError?.(id, error);
|
||||
}
|
||||
});
|
||||
|
||||
return messageId;
|
||||
}
|
||||
|
||||
public onMessageReceived(callback: MessageReceivedCallback): () => void {
|
||||
this.messageReceivedCallbacks.add(callback);
|
||||
return () => this.messageReceivedCallbacks.delete(callback);
|
||||
}
|
||||
|
||||
public updateReliableMessaging(reliableMessaging: ReliableMessaging | null): void {
|
||||
this.reliableMessaging = reliableMessaging;
|
||||
this.setupMessageHandling();
|
||||
}
|
||||
|
||||
public get messageCache() {
|
||||
return this.cacheService.cache;
|
||||
}
|
||||
|
||||
public cleanup(): void {
|
||||
this.messageReceivedCallbacks.clear();
|
||||
this.reliableMessaging?.cleanup();
|
||||
}
|
||||
}
|
||||
@ -1,4 +1,4 @@
|
||||
import { CellMessage, CommentMessage, PostMessage, VoteMessage, ModerateMessage } from "@/lib/waku/types";
|
||||
import { CellMessage, CommentMessage, PostMessage, VoteMessage, ModerateMessage } from "@/types/waku";
|
||||
|
||||
export type OpchanMessage = CellMessage | PostMessage | CommentMessage | VoteMessage | ModerateMessage;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user