feat: Implement message cache population in MessageManager

This commit is contained in:
Danish Arora 2025-04-16 14:08:26 +05:30
parent 9f8bb87bc1
commit 95401cdb5b
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
8 changed files with 2045 additions and 4 deletions

16
.gitignore vendored
View File

@ -1,3 +1,7 @@
README-task-master.md
.cursor
scripts
# Logs
logs
*.log
@ -22,3 +26,15 @@ dist-ssr
*.njsproj
*.sln
*.sw?
# Added by Claude Task Master
dev-debug.log
# Dependency directories
node_modules/
# Environment variables
.env
.vscode
# OS specific
# Task files
tasks.json
tasks/

1605
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -40,6 +40,7 @@
"@radix-ui/react-toggle-group": "^1.1.0",
"@radix-ui/react-tooltip": "^1.1.4",
"@tanstack/react-query": "^5.56.2",
"@waku/sdk": "^0.0.30",
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"cmdk": "^1.0.0",

51
src/lib/waku/codec.ts Normal file
View File

@ -0,0 +1,51 @@
import { MessageType } from './types';
import { OpchanMessage, CellMessage, PostMessage, CommentMessage, VoteMessage } from './types';
/**
* Encode a message object into a Uint8Array for transmission
*/
export function encodeMessage(message: OpchanMessage): Uint8Array {
// Convert the message to a JSON string
const messageJson = JSON.stringify(message);
// Convert the string to a Uint8Array
return new TextEncoder().encode(messageJson);
}
/**
* Decode a message from a Uint8Array based on its type
*/
export function decodeMessage(payload: Uint8Array, type?: MessageType): OpchanMessage {
// Convert the Uint8Array to a string
const messageJson = new TextDecoder().decode(payload);
// Parse the JSON string to an object
const message = JSON.parse(messageJson) as OpchanMessage;
// Validate the message type if specified
if (type && message.type !== type) {
throw new Error(`Expected message of type ${type}, but got ${message.type}`);
}
// Return the decoded message
return message;
}
/**
* Type-specific decoders
*/
export function decodeCellMessage(payload: Uint8Array): CellMessage {
return decodeMessage(payload, MessageType.CELL) as CellMessage;
}
export function decodePostMessage(payload: Uint8Array): PostMessage {
return decodeMessage(payload, MessageType.POST) as PostMessage;
}
export function decodeCommentMessage(payload: Uint8Array): CommentMessage {
return decodeMessage(payload, MessageType.COMMENT) as CommentMessage;
}
export function decodeVoteMessage(payload: Uint8Array): VoteMessage {
return decodeMessage(payload, MessageType.VOTE) as VoteMessage;
}

30
src/lib/waku/constants.ts Normal file
View File

@ -0,0 +1,30 @@
import { MessageType } from "./types";
import type { QueryRequestParams } from '@waku/sdk'
/**
* Content topics for different message types
*/
export const CONTENT_TOPICS: Record<MessageType, string> = {
[MessageType.CELL]: '/opchan/1/cell/proto',
[MessageType.POST]: '/opchan/1/post/proto',
[MessageType.COMMENT]: '/opchan/1/comment/proto',
[MessageType.VOTE]: '/opchan/1/vote/proto'
};
/**
* Bootstrap nodes for the Waku network
* These are public Waku nodes that our node will connect to on startup
*/
export const BOOTSTRAP_NODES = [
'/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd2Hz8tHPeV4y',
'/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ',
'/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/443/wss/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS'
];
// Default store query options
// export const DEFAULT_STORE_QUERY_OPTIONS: QueryRequestParams = {
// contentTopics: [CONTENT_TOPICS[MessageType.CELL], CONTENT_TOPICS[MessageType.POST], CONTENT_TOPICS[MessageType.COMMENT], CONTENT_TOPICS[MessageType.VOTE]],
// includeData: true,
// paginationForward: false,
// pubsubTopic: ""
// };

108
src/lib/waku/index.ts Normal file
View File

@ -0,0 +1,108 @@
import { createDecoder, createLightNode, LightNode } from "@waku/sdk";
import { BOOTSTRAP_NODES } from "./constants";
import StoreManager from "./store";
import { CommentCache, MessageType, VoteCache } from "./types";
import { PostCache } from "./types";
import { CellCache } from "./types";
import { OpchanMessage } from "@/types";
import { EphemeralProtocolsManager } from "./lightpush_filter";
import { NETWORK_CONFIG } from "./constants";
class MessageManager {
private node: LightNode;
//TODO: implement SDS?
private ephemeralProtocolsManager: EphemeralProtocolsManager;
private storeManager: StoreManager;
public readonly messageCache: {
cells: CellCache;
posts: PostCache;
comments: CommentCache;
votes: VoteCache;
} = {
cells: {},
posts: {},
comments: {},
votes: {}
}
public static async create(): Promise<MessageManager> {
const node = await createLightNode({
defaultBootstrap: false,
networkConfig: NETWORK_CONFIG,
autoStart: true,
bootstrapPeers: BOOTSTRAP_NODES,
lightPush:{autoRetry: true, retryIntervalMs: 1000}
});
return new MessageManager(node);
}
public async stop() {
await this.node.stop();
}
private constructor(node: LightNode) {
this.node = node;
this.ephemeralProtocolsManager = new EphemeralProtocolsManager(node);
this.storeManager = new StoreManager(node);
}
private updateCache(message: OpchanMessage) {
switch (message.type) {
case MessageType.CELL:
this.messageCache.cells[message.id] = message;
break;
case MessageType.POST:
this.messageCache.posts[message.id] = message;
break;
case MessageType.COMMENT:
this.messageCache.comments[message.id] = message;
break;
case MessageType.VOTE: {
// For votes, we use a composite key of targetId + author to handle multiple votes from same user
const voteKey = `${message.targetId}:${message.author}`;
this.messageCache.votes[voteKey] = message;
break;
}
default:
// TypeScript should ensure we don't reach this case with proper OpchanMessage types
console.warn("Received message with unknown type");
break;
}
}
public async queryStore() {
const messages = await this.storeManager.queryStore();
// Populate cache from store messages
for (const message of messages) {
this.updateCache(message);
}
return messages;
}
public async sendMessage(message: OpchanMessage) {
await this.ephemeralProtocolsManager.sendMessage(message);
// Also update local cache with the message we just sent
this.updateCache(message);
}
public async subscribeToMessages(types: MessageType[] = [MessageType.CELL, MessageType.POST, MessageType.COMMENT, MessageType.VOTE]) {
const { result, subscription } = await this.ephemeralProtocolsManager.subscribeToMessages(types);
// Set up a callback that will be triggered for new messages
// New messages from the subscription will be processed directly by the ephemeralProtocolsManager
// and returned via the result array, so we just need to add them to the cache
for (const message of result) {
this.updateCache(message);
}
// Return result and subscription for any external processing
return { messages: result, subscription };
}
}
const messageManager = await MessageManager.create();
export default messageManager;

142
src/lib/waku/messages.ts Normal file
View File

@ -0,0 +1,142 @@
import { MessageType } from './constants';
import { DecodedMessage } from '@waku/sdk';
import { Cell, Post, Comment } from '@/types/forum';
// Base structure for all messages
export interface WakuMessageBase {
messageType: MessageType;
timestamp: number;
sender: string; // Bitcoin address of sender
signature?: string; // Signature to verify sender
}
// Message structures for different content types
export interface CellMessage extends WakuMessageBase {
messageType: MessageType.CELL;
cellId: string;
name: string;
description: string;
icon: string;
}
export interface PostMessage extends WakuMessageBase {
messageType: MessageType.POST;
postId: string;
cellId: string;
content: string;
}
export interface CommentMessage extends WakuMessageBase {
messageType: MessageType.COMMENT;
commentId: string;
postId: string;
content: string;
}
export interface VoteMessage extends WakuMessageBase {
messageType: MessageType.VOTE;
targetId: string; // postId or commentId
isUpvote: boolean;
}
// Type for all possible messages
export type WakuMessage =
| CellMessage
| PostMessage
| CommentMessage
| VoteMessage;
// Utility functions for converting between message types and application models
export function cellToMessage(cell: Cell, sender: string): CellMessage {
return {
messageType: MessageType.CELL,
timestamp: Date.now(),
sender,
cellId: cell.id,
name: cell.name,
description: cell.description,
icon: cell.icon
};
}
export function messageToCell(message: CellMessage): Cell {
return {
id: message.cellId,
name: message.name,
description: message.description,
icon: message.icon
};
}
export function postToMessage(post: Post, sender: string): PostMessage {
return {
messageType: MessageType.POST,
timestamp: Date.now(),
sender,
postId: post.id,
cellId: post.cellId,
content: post.content
};
}
export function messageToPost(message: PostMessage): Post {
return {
id: message.postId,
cellId: message.cellId,
authorAddress: message.sender,
content: message.content,
timestamp: message.timestamp,
upvotes: [],
downvotes: []
};
}
export function commentToMessage(comment: Comment, sender: string): CommentMessage {
return {
messageType: MessageType.COMMENT,
timestamp: Date.now(),
sender,
commentId: comment.id,
postId: comment.postId,
content: comment.content
};
}
export function messageToComment(message: CommentMessage): Comment {
return {
id: message.commentId,
postId: message.postId,
authorAddress: message.sender,
content: message.content,
timestamp: message.timestamp,
upvotes: [],
downvotes: []
};
}
// Parse message from decoded waku message
export function parseMessage(decodedMessage: DecodedMessage): WakuMessage | null {
try {
if (!decodedMessage.payload) return null;
const messageString = new TextDecoder().decode(decodedMessage.payload);
const message = JSON.parse(messageString) as WakuMessage;
// Validate message has required fields
if (!message.messageType || !message.timestamp || !message.sender) {
console.error('Invalid message format:', message);
return null;
}
return message;
} catch (error) {
console.error('Error parsing message:', error);
return null;
}
}
// Serialize message to payload bytes
export function serializeMessage(message: WakuMessage): Uint8Array {
const messageString = JSON.stringify(message);
return new TextEncoder().encode(messageString);
}

96
src/lib/waku/types.ts Normal file
View File

@ -0,0 +1,96 @@
/**
* Message types for Waku communication
*/
export enum MessageType {
CELL = 'cell',
POST = 'post',
COMMENT = 'comment',
VOTE = 'vote'
}
/**
* Base interface for all message types
*/
export interface BaseMessage {
type: MessageType;
timestamp: number;
author: string;
}
/**
* Represents a cell message
*/
export interface CellMessage extends BaseMessage {
type: MessageType.CELL;
id: string;
name: string;
description: string;
}
/**
* Represents a post message
*/
export interface PostMessage extends BaseMessage {
type: MessageType.POST;
id: string;
cellId: string;
title: string;
content: string;
}
/**
* Represents a comment message
*/
export interface CommentMessage extends BaseMessage {
type: MessageType.COMMENT;
id: string;
postId: string;
parentId?: string; // Optional for nested comments
content: string;
}
/**
* Represents a vote message
*/
export interface VoteMessage extends BaseMessage {
type: MessageType.VOTE;
id: string;
targetId: string; // ID of the post or comment being voted on
value: number; // 1 for upvote, -1 for downvote
}
/**
* Union type of all possible message types
*/
export type OpchanMessage = CellMessage | PostMessage | CommentMessage | VoteMessage;
/**
* Listener function type for Waku service events
*/
export type MessageListener<T extends OpchanMessage> = (message: T) => void;
/**
* Subscription object returned when registering listeners
*/
export interface Subscription {
unsubscribe: () => void;
}
/**
* Cache objects for storing messages
*/
export interface CellCache {
[cellId: string]: CellMessage;
}
export interface PostCache {
[postId: string]: PostMessage;
}
export interface CommentCache {
[commentId: string]: CommentMessage;
}
export interface VoteCache {
[key: string]: VoteMessage; // key = targetId + authorAddress
}