chore: simplify encoders/decoders

This commit is contained in:
Danish Arora 2025-08-29 16:30:19 +05:30
parent 907b5f8c5f
commit 991e5cedc5
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
5 changed files with 109 additions and 77 deletions

View File

@ -336,6 +336,7 @@ export const moderatePost = async (
const modMsg: ModerateMessage = {
type: MessageType.MODERATE,
id: uuidv4(),
cellId,
targetType: 'post',
targetId: postId,
@ -386,6 +387,7 @@ export const moderateComment = async (
const modMsg: ModerateMessage = {
type: MessageType.MODERATE,
id: uuidv4(),
cellId,
targetType: 'comment',
targetId: commentId,
@ -433,6 +435,7 @@ export const moderateUser = async (
const modMsg: ModerateMessage = {
type: MessageType.MODERATE,
id: uuidv4(),
cellId,
targetType: 'user',
targetId: userAddress,

View File

@ -1,70 +1,105 @@
import { createDecoder, createEncoder } from '@waku/sdk';
import {
IDecodedMessage,
IDecoder,
IEncoder,
LightNode,
} from '@waku/sdk';
import { MessageType } from './types';
import { CellMessage, PostMessage, CommentMessage, VoteMessage } from './types';
import { CONTENT_TOPICS, NETWORK_CONFIG } from './constants';
import {
CellMessage,
PostMessage,
CommentMessage,
VoteMessage,
} from './types';
import { CONTENT_TOPICS } from './constants';
import { OpchanMessage } from '@/types/forum';
// Create the sharded pubsub topic
const PUBSUB_TOPIC = `/waku/2/rs/${NETWORK_CONFIG.clusterId}/0`;
export class CodecManager {
private encoders: Map<MessageType, IEncoder> = new Map();
private decoders: Map<MessageType, IDecoder<IDecodedMessage>> = new Map();
export const encoders = {
[MessageType.CELL]: createEncoder({
contentTopic: CONTENT_TOPICS[MessageType.CELL],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.POST]: createEncoder({
contentTopic: CONTENT_TOPICS[MessageType.POST],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.COMMENT]: createEncoder({
contentTopic: CONTENT_TOPICS[MessageType.COMMENT],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.VOTE]: createEncoder({
contentTopic: CONTENT_TOPICS[MessageType.VOTE],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
}),
[MessageType.MODERATE]: createEncoder({
contentTopic: CONTENT_TOPICS[MessageType.MODERATE],
routingInfo: { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }
})
}
constructor(private node: LightNode) {
this.encoders = new Map(
Object.values(MessageType).map((type) => [
type,
this.node.createEncoder({ contentTopic: CONTENT_TOPICS[type] }),
])
);
export const decoders = {
[MessageType.CELL]: createDecoder(CONTENT_TOPICS[MessageType.CELL], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.POST]: createDecoder(CONTENT_TOPICS[MessageType.POST], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.COMMENT]: createDecoder(CONTENT_TOPICS[MessageType.COMMENT], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.VOTE]: createDecoder(CONTENT_TOPICS[MessageType.VOTE], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC }),
[MessageType.MODERATE]: createDecoder(CONTENT_TOPICS[MessageType.MODERATE], { clusterId: NETWORK_CONFIG.clusterId, shardId: 0, pubsubTopic: PUBSUB_TOPIC })
}
/**
* Encode a message object into a Uint8Array for transmission
*/
export function encodeMessage(message: OpchanMessage): Uint8Array {
const messageJson = JSON.stringify(message);
return new TextEncoder().encode(messageJson);
}
/**
* Decode a message from a Uint8Array based on its type
*/
export function decodeMessage(payload: Uint8Array): OpchanMessage {
const messageJson = new TextDecoder().decode(payload);
const message = JSON.parse(messageJson) as OpchanMessage;
switch(message.type) {
case MessageType.CELL:
return message as CellMessage;
case MessageType.POST:
return message as PostMessage;
case MessageType.COMMENT:
return message as CommentMessage;
case MessageType.VOTE:
return message as VoteMessage;
default:
throw new Error(`Unknown message type: ${message}`);
this.decoders = new Map(
Object.values(MessageType).map((type) => [
type,
this.node.createDecoder({ contentTopic: CONTENT_TOPICS[type] }),
])
);
}
}
/**
* Encode a message for transmission
*/
encodeMessage(message: OpchanMessage): Uint8Array {
const messageJson = JSON.stringify(message);
return new TextEncoder().encode(messageJson);
}
/**
* Decode a received message
*/
decodeMessage(payload: Uint8Array): OpchanMessage {
const messageJson = new TextDecoder().decode(payload);
const message = JSON.parse(messageJson) as OpchanMessage;
switch (message.type) {
case MessageType.CELL:
return message as CellMessage;
case MessageType.POST:
return message as PostMessage;
case MessageType.COMMENT:
return message as CommentMessage;
case MessageType.VOTE:
return message as VoteMessage;
default:
throw new Error(`Unknown message type: ${message}`);
}
}
/**
* Get encoder for a specific message type
*/
getEncoder(messageType: MessageType): IEncoder {
const encoder = this.encoders.get(messageType);
if (!encoder) {
throw new Error(`No encoder found for message type: ${messageType}`);
}
return encoder;
}
/**
* Get decoder for a specific message type
*/
getDecoder(
messageType: MessageType
): IDecoder<IDecodedMessage> {
const decoder = this.decoders.get(messageType);
if (!decoder) {
throw new Error(`No decoder found for message type: ${messageType}`);
}
return decoder;
}
/**
* Get all decoders for subscribing to multiple message types
*/
getAllDecoders(): IDecoder<IDecodedMessage>[] {
return Array.from(this.decoders.values());
}
/**
* Get decoders for specific message types
*/
getDecoders(
messageTypes: MessageType[]
): IDecoder<IDecodedMessage>[] {
return messageTypes.map((type) => this.getDecoder(type));
}
}

View File

@ -11,12 +11,6 @@ export const CONTENT_TOPICS: Record<MessageType, string> = {
[MessageType.MODERATE]: '/opchan-sds/1/moderate/proto'
};
export const NETWORK_CONFIG = {
// contentTopics: Object.values(CONTENT_TOPICS),
clusterId: 1,
shards: [0,1,2,3,4,5,6,7]
}
/**
* Bootstrap nodes for the Waku network
* These are public Waku nodes that our node will connect to on startup

View File

@ -6,7 +6,6 @@ import { CommentCache, MessageType, VoteCache, ModerateMessage } from "./types";
import { PostCache } from "./types";
import { CellCache } from "./types";
import { OpchanMessage } from "@/types/forum";
import { NETWORK_CONFIG } from "./constants";
import { ReliableMessageManager } from "./reliable_channel";
export type HealthChangeCallback = (isReady: boolean, health: HealthStatus) => void;
@ -37,7 +36,6 @@ class MessageManager {
public static async create(): Promise<MessageManager> {
const node = await createLightNode({
defaultBootstrap: true,
networkConfig: NETWORK_CONFIG,
autoStart: true,
});

View File

@ -1,6 +1,6 @@
import { IDecodedMessage, LightNode, ReliableChannel, ReliableChannelEvent } from "@waku/sdk";
import { MessageType } from "./types";
import { decodeMessage, decoders, encodeMessage, encoders } from "./codec";
import { CodecManager } from "./codec";
import { generateStringId } from "@/lib/utils";
import { OpchanMessage } from "@/types/forum";
@ -18,15 +18,17 @@ export class ReliableMessageManager {
private channels: Map<MessageType, ReliableChannel<IDecodedMessage>> = new Map();
private messageCallbacks: Map<string, MessageStatusCallback> = new Map();
private incomingMessageCallbacks: IncomingMessageCallback[] = [];
private codecManager: CodecManager;
constructor(node: LightNode) {
this.codecManager = new CodecManager(node);
this.initializeChannels(node);
}
private async initializeChannels(node: LightNode) {
for (const type of Object.values(MessageType)) {
const encoder = encoders[type];
const decoder = decoders[type];
const encoder = this.codecManager.getEncoder(type);
const decoder = this.codecManager.getDecoder(type);
const senderId = generateStringId();
const channelId = `opchan-${type}`; // Unique channel ID for each message type
@ -45,7 +47,7 @@ export class ReliableMessageManager {
try {
const wakuMessage = event.detail;
if (wakuMessage.payload) {
const opchanMessage = decodeMessage(wakuMessage.payload);
const opchanMessage = this.codecManager.decodeMessage(wakuMessage.payload);
this.incomingMessageCallbacks.forEach(callback => {
@ -92,7 +94,7 @@ export class ReliableMessageManager {
throw new Error(`No reliable channel for message type: ${message.type}`);
}
const encodedMessage = encodeMessage(message);
const encodedMessage = this.codecManager.encodeMessage(message);
const messageId = ReliableChannel.getMessageId(encodedMessage);
// Store callback for this message