chore: refactor the codebase to be more modular

This commit is contained in:
Danish Arora 2024-08-14 20:33:21 +05:30
parent 67fe074471
commit b5f71e7cf5
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
10 changed files with 244 additions and 241 deletions

View File

@ -0,0 +1,7 @@
{
"cSpell.words": [
"multiaddr",
"multiformats",
"waku"
]
}

View File

@ -0,0 +1,10 @@
export const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8";
export const TELEMETRY_URL =
process.env.TELEMETRY_URL || "http://localhost:8080/waku-metrics";
export const nodes = [
"/dns4/node-01.do-ams3.waku.test.status.im/tcp/8000/wss",
"/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/8000/wss",
"/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/8000/wss",
];

View File

@ -1,198 +1,20 @@
import {
createLightNode,
createEncoder,
createDecoder,
DecodedMessage,
waitForRemotePeer,
LightNode,
utils
} from "@waku/sdk";
import { Type, Field } from "protobufjs";
import {
TelemetryClient,
TelemetryPushError,
TelemetryPushFilter,
TelemetryType,
} from "./telemetry_client";
import { generateRandomNumber, hashNumber } from "./util";
const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8";
const TELEMETRY_URL =
process.env.TELEMETRY_URL || "http://localhost:8080/waku-metrics";
const ProtoSequencedMessage = new Type("SequencedMessage")
.add(new Field("hash", 1, "string"))
.add(new Field("total", 2, "uint64"))
.add(new Field("index", 3, "uint64"))
.add(new Field("sender", 4, "string"));
const sequenceCompletedEvent = new CustomEvent("sequenceCompleted");
const messageSentEvent = new CustomEvent("messageSent");
const wakuNode = async (): Promise<LightNode> => {
return await createLightNode({
contentTopics: [DEFAULT_CONTENT_TOPIC],
defaultBootstrap: true,
});
};
export async function app(telemetryClient: TelemetryClient) {
const node = await wakuNode();
await node.start();
// TODO: https://github.com/waku-org/js-waku/issues/2079
// Dialing bootstrap peers right on start in order to have Filter subscription initiated properly
await node.dial("/dns4/node-01.do-ams3.waku.test.status.im/tcp/8000/wss");
await node.dial(
"/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/8000/wss"
);
await node.dial(
"/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/8000/wss"
);
await waitForRemotePeer(node);
const peerId = node.libp2p.peerId.toString();
const encoder = createEncoder({
contentTopic: DEFAULT_CONTENT_TOPIC,
});
const startLightPushSequence = async (
numMessages: number,
period: number = 3000
) => {
const sequenceHash = await hashNumber(generateRandomNumber());
const sequenceTotal = numMessages;
let sequenceIndex = 0;
const sendMessage = async () => {
try {
const message = ProtoSequencedMessage.create({
hash: sequenceHash,
total: sequenceTotal,
index: sequenceIndex,
sender: peerId,
});
const payload = ProtoSequencedMessage.encode(message).finish();
const result = await node.lightPush.send(encoder, {
payload,
timestamp: new Date(),
});
console.log("light push successes: ", result.successes.length);
console.log("light push failures: ", result.failures.length);
if (result.successes.length > 0) {
// Push to telemetry client
telemetryClient.push<TelemetryPushFilter>([
{
messageType: TelemetryType.LIGHT_PUSH_FILTER,
timestamp: Math.floor(new Date().getTime() / 1000),
peerIdSender: peerId,
peerIdReporter: peerId,
sequenceHash: sequenceHash,
sequenceTotal: sequenceTotal,
sequenceIndex: sequenceIndex,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
},
]);
// Update ui
const messageElement = document.createElement("div");
const messagesSent = document.getElementById("messagesSent");
messageElement.textContent = `Message: ${sequenceHash} ${sequenceIndex} of ${sequenceTotal}`;
messagesSent.insertBefore(messageElement, messagesSent.firstChild);
messagesSent.insertBefore(
document.createElement("br"),
messagesSent.firstChild
);
document.dispatchEvent(messageSentEvent);
// Increment sequence
sequenceIndex++;
}
if (result.failures.length > 0) {
telemetryClient.push<TelemetryPushError>(
result.failures.map((failure) => ({
messageType: TelemetryType.LIGHT_PUSH_ERROR,
timestamp: Math.floor(new Date().getTime() / 1000),
peerId: peerId,
peerIdRemote: failure.peerId?.toString(),
errorMessage: failure.error.toString(),
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DefaultPubsubTopic,
}))
);
}
if (sequenceIndex < sequenceTotal) {
setTimeout(sendMessage, period); // Schedule the next send
} else {
document.dispatchEvent(sequenceCompletedEvent);
}
} catch (error) {
console.error("Error sending message", error);
}
};
sendMessage(); // Start the recursive sending
};
const startFilterSubscription = async () => {
const decoder = createDecoder(DEFAULT_CONTENT_TOPIC);
const messagesReceived = document.getElementById("messagesReceived");
const subscriptionCallback = (message: DecodedMessage) => {
const sequencedMessage: any = ProtoSequencedMessage.decode(
message.payload
);
// Don't bother reporting messages sent by this same node
if (sequencedMessage.sender === peerId) {
return;
}
telemetryClient.push<TelemetryPushFilter>([
{
messageType: TelemetryType.LIGHT_PUSH_FILTER,
timestamp: Math.floor(new Date().getTime() / 1000),
peerIdSender: sequencedMessage.sender,
peerIdReporter: peerId,
sequenceHash: sequencedMessage.hash,
sequenceTotal: sequencedMessage.total,
sequenceIndex: sequencedMessage.index,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
},
]);
const messageElement = document.createElement("div");
messageElement.textContent = `Message: ${sequencedMessage.hash} ${sequencedMessage.index} of ${sequencedMessage.total}`;
messagesReceived.appendChild(messageElement);
messagesReceived.appendChild(document.createElement("br"));
};
await node.filter.subscribe(decoder, subscriptionCallback);
};
return {
node,
startLightPushSequence,
startFilterSubscription,
};
}
import { TELEMETRY_URL } from "./constants";
import { TelemetryClient } from "./lib/telemetry/client";
import { setupWaku } from "./lib/waku";
import { EVENTS } from "./lib/waku/events";
import { startFilterSubscription } from "./lib/waku/filter";
import { startLightPushSequence } from "./lib/waku/lightpush";
(async () => {
const telemetryClient = new TelemetryClient(TELEMETRY_URL, 5000);
const { node, startLightPushSequence, startFilterSubscription } = await app(
telemetryClient
);
(window as any).waku = node;
const waku = await setupWaku();
(window as any).waku = waku;
const runningScreen = document.getElementById("runningScreen");
runningScreen.style.display = "block";
await telemetryClient.start();
startFilterSubscription();
startFilterSubscription(waku, telemetryClient);
let sentMessagesCount = 0;
const sentMessagesCounter = document.getElementById(
@ -203,12 +25,6 @@ export async function app(telemetryClient: TelemetryClient) {
sentMessagesCounter.textContent = sentMessagesCount.toString();
});
function startSequence() {
const numMessages = Math.floor(Math.random() * 16) + 5;
const messagePeriod = Math.floor(Math.random() * 2001) + 5000;
startLightPushSequence(numMessages, messagePeriod);
}
document.addEventListener(sequenceCompletedEvent.type, () => startSequence());
startSequence();
document.addEventListener(EVENTS.SEQUENCE_COMPLETED.type, async () => await startLightPushSequence(waku, telemetryClient));
await startLightPushSequence(waku, telemetryClient);
})();

View File

@ -1,49 +1,4 @@
export enum TelemetryType {
LIGHT_PUSH_FILTER = "LightPushFilter",
LIGHT_PUSH_ERROR = "LightPushError",
GENERIC = "Generic"
}
// Top level structure of a telemetry request
export interface TelemetryRequest {
id: number;
telemetryType: TelemetryType;
telemetryData: any; // Using 'any' to represent the raw JSON data
}
// Common to all telemetry messages
export interface TelemetryMessage {
timestamp: number;
messageType: TelemetryType;
}
export interface TelemetryPushFilter extends TelemetryMessage {
peerIdSender: string;
peerIdReporter: string;
sequenceHash: string;
sequenceTotal: number;
sequenceIndex: number;
contentTopic: string;
pubsubTopic: string;
}
export interface TelemetryPushError extends TelemetryMessage {
peerId: string;
errorMessage: string;
peerIdRemote?: string;
contentTopic?: string;
pubsubTopic?: string;
}
export interface TelemetryGeneric extends TelemetryMessage {
peerId: string;
metricType: string;
contentTopic?: string;
pubsubTopic?: string;
genericData?: string;
errorMessage?: string;
}
import { TelemetryMessage } from "./types";
export class TelemetryClient {
constructor(

View File

@ -0,0 +1,45 @@
export enum TelemetryType {
LIGHT_PUSH_FILTER = "LightPushFilter",
LIGHT_PUSH_ERROR = "LightPushError",
GENERIC = "Generic"
}
// Top level structure of a telemetry request
export interface TelemetryRequest {
id: number;
telemetryType: TelemetryType;
telemetryData: any; // Using 'any' to represent the raw JSON data
}
// Common to all telemetry messages
export interface TelemetryMessage {
timestamp: number;
messageType: TelemetryType;
}
export interface TelemetryPushFilter extends TelemetryMessage {
peerIdSender: string;
peerIdReporter: string;
sequenceHash: string;
sequenceTotal: number;
sequenceIndex: number;
contentTopic: string;
pubsubTopic: string;
}
export interface TelemetryPushError extends TelemetryMessage {
peerId: string;
errorMessage: string;
peerIdRemote?: string;
contentTopic?: string;
pubsubTopic?: string;
}
export interface TelemetryGeneric extends TelemetryMessage {
peerId: string;
metricType: string;
contentTopic?: string;
pubsubTopic?: string;
genericData?: string;
errorMessage?: string;
}

View File

@ -0,0 +1,7 @@
const SEQUENCE_COMPLETED = new CustomEvent("sequenceCompleted");
const MESSAGE_SENT = new CustomEvent("messageSent");
export const EVENTS = {
SEQUENCE_COMPLETED,
MESSAGE_SENT,
}

View File

@ -0,0 +1,41 @@
import { createDecoder, type LightNode, type DecodedMessage, utils } from "@waku/sdk";
import { TelemetryClient } from "../telemetry/client";
import { ProtoSequencedMessage } from "./proto";
import { DEFAULT_CONTENT_TOPIC } from "../../constants";
import { TelemetryPushFilter, TelemetryType } from "../telemetry/types";
export async function startFilterSubscription (waku: LightNode, telemetry: TelemetryClient) {
const decoder = createDecoder(DEFAULT_CONTENT_TOPIC);
const messagesReceived = document.getElementById("messagesReceived");
const subscriptionCallback = (message: DecodedMessage) => {
const sequencedMessage: any = ProtoSequencedMessage.decode(
message.payload
);
// Don't bother reporting messages sent by this same node
if (sequencedMessage.sender === waku.libp2p.peerId.toString()) {
return;
}
telemetry.push<TelemetryPushFilter>([
{
messageType: TelemetryType.LIGHT_PUSH_FILTER,
timestamp: Math.floor(new Date().getTime() / 1000),
peerIdSender: sequencedMessage.sender,
peerIdReporter: waku.libp2p.peerId.toString(),
sequenceHash: sequencedMessage.hash,
sequenceTotal: sequencedMessage.total,
sequenceIndex: sequencedMessage.index,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
},
]);
const messageElement = document.createElement("div");
messageElement.textContent = `Message: ${sequencedMessage.hash} ${sequencedMessage.index} of ${sequencedMessage.total}`;
messagesReceived.appendChild(messageElement);
messagesReceived.appendChild(document.createElement("br"));
};
await waku.filter.subscribe(decoder, subscriptionCallback);
};

View File

@ -0,0 +1,19 @@
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import { nodes as bootstrap, DEFAULT_CONTENT_TOPIC } from "../../constants";
import {multiaddr} from '@multiformats/multiaddr'
export async function setupWaku() {
const waku = await createLightNode({contentTopics: [DEFAULT_CONTENT_TOPIC], defaultBootstrap: true})
await waku.start();
// TODO: https://github.com/waku-org/js-waku/issues/2079
await Promise.all(getBootstrapNodes().map((node) => waku.dial(node)));
await waitForRemotePeer(waku);
return waku;
}
function getBootstrapNodes(nodes: string[] = bootstrap) {
return bootstrap.map((node) => multiaddr(node));
}

View File

@ -0,0 +1,96 @@
import { createEncoder, utils, type LightNode } from "@waku/sdk";
import { generateRandomNumber, hashNumber } from "../../util";
import { ProtoSequencedMessage } from "./proto";
import { DEFAULT_CONTENT_TOPIC } from "../../constants";
import { TelemetryClient } from "../telemetry/client";
import { EVENTS } from "./events";
import { TelemetryPushError, TelemetryPushFilter, TelemetryType } from "../telemetry/types";
export async function startLightPushSequence (
waku: LightNode,
telemetry: TelemetryClient,
numMessages: number = 10,
period: number = 10_000
) {
console.info("Starting a new lightpush sequence");
const sequenceHash = await hashNumber(generateRandomNumber());
const sequenceTotal = numMessages;
let sequenceIndex = 0;
const sendMessage = async () => {
try {
const message = ProtoSequencedMessage.create({
hash: sequenceHash,
total: sequenceTotal,
index: sequenceIndex,
sender: waku.libp2p.peerId.toString(),
});
const payload = ProtoSequencedMessage.encode(message).finish();
const result = await waku.lightPush.send(encoder, {
payload,
timestamp: new Date(),
});
console.log("light push successes: ", result.successes.length);
console.log("light push failures: ", result.failures.length);
console.error(result.failures)
if (result.successes.length > 0) {
// Push to telemetry client
telemetry.push<TelemetryPushFilter>([
{
messageType: TelemetryType.LIGHT_PUSH_FILTER,
timestamp: Math.floor(new Date().getTime() / 1000),
peerIdSender: waku.libp2p.peerId.toString(),
peerIdReporter: waku.libp2p.peerId.toString(),
sequenceHash: sequenceHash,
sequenceTotal: sequenceTotal,
sequenceIndex: sequenceIndex,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
},
]);
// Update ui
const messageElement = document.createElement("div");
const messagesSent = document.getElementById("messagesSent");
messageElement.textContent = `Message: ${sequenceHash} ${sequenceIndex} of ${sequenceTotal}`;
messagesSent.insertBefore(messageElement, messagesSent.firstChild);
messagesSent.insertBefore(
document.createElement("br"),
messagesSent.firstChild
);
document.dispatchEvent(EVENTS.MESSAGE_SENT);
// Increment sequence
sequenceIndex++;
}
if (result.failures.length > 0) {
telemetry.push<TelemetryPushError>(
result.failures.map((failure) => ({
messageType: TelemetryType.LIGHT_PUSH_ERROR,
timestamp: Math.floor(new Date().getTime() / 1000),
peerId: waku.libp2p.peerId.toString(),
peerIdRemote: failure.peerId?.toString(),
errorMessage: failure.error.toString(),
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
}))
);
}
if (sequenceIndex < sequenceTotal) {
setTimeout(sendMessage, period); // Schedule the next send
} else {
document.dispatchEvent(EVENTS.SEQUENCE_COMPLETED);
}
} catch (error) {
console.error("Error sending message", error);
}
};
sendMessage(); // Start the recursive sending
};
const encoder = createEncoder({
contentTopic: DEFAULT_CONTENT_TOPIC,
});

View File

@ -0,0 +1,7 @@
import { Field, Type } from "protobufjs";
export const ProtoSequencedMessage = new Type("SequencedMessage")
.add(new Field("hash", 1, "string"))
.add(new Field("total", 2, "uint64"))
.add(new Field("index", 3, "uint64"))
.add(new Field("sender", 4, "string"));