From b5f71e7cf5263aea4a2af0cd7e57e3c87ad480bc Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 14 Aug 2024 20:33:21 +0530 Subject: [PATCH] chore: refactor the codebase to be more modular --- examples/dogfooding/.vscode/settings.json | 7 + examples/dogfooding/src/constants.ts | 10 + examples/dogfooding/src/index.ts | 206 +----------------- .../telemetry/client.ts} | 47 +--- .../dogfooding/src/lib/telemetry/types.ts | 45 ++++ examples/dogfooding/src/lib/waku/events.ts | 7 + examples/dogfooding/src/lib/waku/filter.ts | 41 ++++ examples/dogfooding/src/lib/waku/index.ts | 19 ++ examples/dogfooding/src/lib/waku/lightpush.ts | 96 ++++++++ examples/dogfooding/src/lib/waku/proto.ts | 7 + 10 files changed, 244 insertions(+), 241 deletions(-) create mode 100644 examples/dogfooding/.vscode/settings.json create mode 100644 examples/dogfooding/src/constants.ts rename examples/dogfooding/src/{telemetry_client.ts => lib/telemetry/client.ts} (60%) create mode 100644 examples/dogfooding/src/lib/telemetry/types.ts create mode 100644 examples/dogfooding/src/lib/waku/events.ts create mode 100644 examples/dogfooding/src/lib/waku/filter.ts create mode 100644 examples/dogfooding/src/lib/waku/index.ts create mode 100644 examples/dogfooding/src/lib/waku/lightpush.ts create mode 100644 examples/dogfooding/src/lib/waku/proto.ts diff --git a/examples/dogfooding/.vscode/settings.json b/examples/dogfooding/.vscode/settings.json new file mode 100644 index 0000000..613c00b --- /dev/null +++ b/examples/dogfooding/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "cSpell.words": [ + "multiaddr", + "multiformats", + "waku" + ] +} \ No newline at end of file diff --git a/examples/dogfooding/src/constants.ts b/examples/dogfooding/src/constants.ts new file mode 100644 index 0000000..82392eb --- /dev/null +++ b/examples/dogfooding/src/constants.ts @@ -0,0 +1,10 @@ +export const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8"; +export const TELEMETRY_URL = + process.env.TELEMETRY_URL || "http://localhost:8080/waku-metrics"; + + +export const nodes = [ + "/dns4/node-01.do-ams3.waku.test.status.im/tcp/8000/wss", + "/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/8000/wss", + "/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/8000/wss", + ]; diff --git a/examples/dogfooding/src/index.ts b/examples/dogfooding/src/index.ts index 1ed625f..8e51352 100644 --- a/examples/dogfooding/src/index.ts +++ b/examples/dogfooding/src/index.ts @@ -1,198 +1,20 @@ -import { - createLightNode, - createEncoder, - createDecoder, - DecodedMessage, - waitForRemotePeer, - LightNode, - utils -} from "@waku/sdk"; - -import { Type, Field } from "protobufjs"; -import { - TelemetryClient, - TelemetryPushError, - TelemetryPushFilter, - TelemetryType, -} from "./telemetry_client"; -import { generateRandomNumber, hashNumber } from "./util"; - -const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8"; -const TELEMETRY_URL = - process.env.TELEMETRY_URL || "http://localhost:8080/waku-metrics"; - -const ProtoSequencedMessage = new Type("SequencedMessage") - .add(new Field("hash", 1, "string")) - .add(new Field("total", 2, "uint64")) - .add(new Field("index", 3, "uint64")) - .add(new Field("sender", 4, "string")); - -const sequenceCompletedEvent = new CustomEvent("sequenceCompleted"); -const messageSentEvent = new CustomEvent("messageSent"); - -const wakuNode = async (): Promise => { - return await createLightNode({ - contentTopics: [DEFAULT_CONTENT_TOPIC], - defaultBootstrap: true, - }); -}; - -export async function app(telemetryClient: TelemetryClient) { - const node = await wakuNode(); - await node.start(); - - // TODO: https://github.com/waku-org/js-waku/issues/2079 - // Dialing bootstrap peers right on start in order to have Filter subscription initiated properly - await node.dial("/dns4/node-01.do-ams3.waku.test.status.im/tcp/8000/wss"); - await node.dial( - "/dns4/node-01.ac-cn-hongkong-c.waku.test.status.im/tcp/8000/wss" - ); - await node.dial( - "/dns4/node-01.gc-us-central1-a.waku.test.status.im/tcp/8000/wss" - ); - - await waitForRemotePeer(node); - - const peerId = node.libp2p.peerId.toString(); - const encoder = createEncoder({ - contentTopic: DEFAULT_CONTENT_TOPIC, - }); - - const startLightPushSequence = async ( - numMessages: number, - period: number = 3000 - ) => { - const sequenceHash = await hashNumber(generateRandomNumber()); - const sequenceTotal = numMessages; - let sequenceIndex = 0; - - const sendMessage = async () => { - try { - const message = ProtoSequencedMessage.create({ - hash: sequenceHash, - total: sequenceTotal, - index: sequenceIndex, - sender: peerId, - }); - const payload = ProtoSequencedMessage.encode(message).finish(); - const result = await node.lightPush.send(encoder, { - payload, - timestamp: new Date(), - }); - console.log("light push successes: ", result.successes.length); - console.log("light push failures: ", result.failures.length); - if (result.successes.length > 0) { - // Push to telemetry client - telemetryClient.push([ - { - messageType: TelemetryType.LIGHT_PUSH_FILTER, - timestamp: Math.floor(new Date().getTime() / 1000), - peerIdSender: peerId, - peerIdReporter: peerId, - sequenceHash: sequenceHash, - sequenceTotal: sequenceTotal, - sequenceIndex: sequenceIndex, - contentTopic: DEFAULT_CONTENT_TOPIC, - pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC), - }, - ]); - - // Update ui - const messageElement = document.createElement("div"); - const messagesSent = document.getElementById("messagesSent"); - messageElement.textContent = `Message: ${sequenceHash} ${sequenceIndex} of ${sequenceTotal}`; - messagesSent.insertBefore(messageElement, messagesSent.firstChild); - messagesSent.insertBefore( - document.createElement("br"), - messagesSent.firstChild - ); - - document.dispatchEvent(messageSentEvent); - - // Increment sequence - sequenceIndex++; - } - if (result.failures.length > 0) { - telemetryClient.push( - result.failures.map((failure) => ({ - messageType: TelemetryType.LIGHT_PUSH_ERROR, - timestamp: Math.floor(new Date().getTime() / 1000), - peerId: peerId, - peerIdRemote: failure.peerId?.toString(), - errorMessage: failure.error.toString(), - contentTopic: DEFAULT_CONTENT_TOPIC, - pubsubTopic: DefaultPubsubTopic, - })) - ); - } - if (sequenceIndex < sequenceTotal) { - setTimeout(sendMessage, period); // Schedule the next send - } else { - document.dispatchEvent(sequenceCompletedEvent); - } - } catch (error) { - console.error("Error sending message", error); - } - }; - - sendMessage(); // Start the recursive sending - }; - - const startFilterSubscription = async () => { - const decoder = createDecoder(DEFAULT_CONTENT_TOPIC); - - const messagesReceived = document.getElementById("messagesReceived"); - const subscriptionCallback = (message: DecodedMessage) => { - const sequencedMessage: any = ProtoSequencedMessage.decode( - message.payload - ); - - // Don't bother reporting messages sent by this same node - if (sequencedMessage.sender === peerId) { - return; - } - telemetryClient.push([ - { - messageType: TelemetryType.LIGHT_PUSH_FILTER, - timestamp: Math.floor(new Date().getTime() / 1000), - peerIdSender: sequencedMessage.sender, - peerIdReporter: peerId, - sequenceHash: sequencedMessage.hash, - sequenceTotal: sequencedMessage.total, - sequenceIndex: sequencedMessage.index, - contentTopic: DEFAULT_CONTENT_TOPIC, - pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC), - }, - ]); - - const messageElement = document.createElement("div"); - messageElement.textContent = `Message: ${sequencedMessage.hash} ${sequencedMessage.index} of ${sequencedMessage.total}`; - messagesReceived.appendChild(messageElement); - messagesReceived.appendChild(document.createElement("br")); - }; - - await node.filter.subscribe(decoder, subscriptionCallback); - }; - - return { - node, - startLightPushSequence, - startFilterSubscription, - }; -} +import { TELEMETRY_URL } from "./constants"; +import { TelemetryClient } from "./lib/telemetry/client"; +import { setupWaku } from "./lib/waku"; +import { EVENTS } from "./lib/waku/events"; +import { startFilterSubscription } from "./lib/waku/filter"; +import { startLightPushSequence } from "./lib/waku/lightpush"; (async () => { const telemetryClient = new TelemetryClient(TELEMETRY_URL, 5000); - const { node, startLightPushSequence, startFilterSubscription } = await app( - telemetryClient - ); - (window as any).waku = node; + const waku = await setupWaku(); + (window as any).waku = waku; const runningScreen = document.getElementById("runningScreen"); runningScreen.style.display = "block"; await telemetryClient.start(); - startFilterSubscription(); + startFilterSubscription(waku, telemetryClient); let sentMessagesCount = 0; const sentMessagesCounter = document.getElementById( @@ -203,12 +25,6 @@ export async function app(telemetryClient: TelemetryClient) { sentMessagesCounter.textContent = sentMessagesCount.toString(); }); - function startSequence() { - const numMessages = Math.floor(Math.random() * 16) + 5; - const messagePeriod = Math.floor(Math.random() * 2001) + 5000; - startLightPushSequence(numMessages, messagePeriod); - } - - document.addEventListener(sequenceCompletedEvent.type, () => startSequence()); - startSequence(); + document.addEventListener(EVENTS.SEQUENCE_COMPLETED.type, async () => await startLightPushSequence(waku, telemetryClient)); + await startLightPushSequence(waku, telemetryClient); })(); diff --git a/examples/dogfooding/src/telemetry_client.ts b/examples/dogfooding/src/lib/telemetry/client.ts similarity index 60% rename from examples/dogfooding/src/telemetry_client.ts rename to examples/dogfooding/src/lib/telemetry/client.ts index 588c926..1f806c1 100644 --- a/examples/dogfooding/src/telemetry_client.ts +++ b/examples/dogfooding/src/lib/telemetry/client.ts @@ -1,49 +1,4 @@ -export enum TelemetryType { - LIGHT_PUSH_FILTER = "LightPushFilter", - LIGHT_PUSH_ERROR = "LightPushError", - GENERIC = "Generic" -} - -// Top level structure of a telemetry request -export interface TelemetryRequest { - id: number; - telemetryType: TelemetryType; - telemetryData: any; // Using 'any' to represent the raw JSON data -} - -// Common to all telemetry messages -export interface TelemetryMessage { - timestamp: number; - messageType: TelemetryType; -} - -export interface TelemetryPushFilter extends TelemetryMessage { - peerIdSender: string; - peerIdReporter: string; - sequenceHash: string; - sequenceTotal: number; - sequenceIndex: number; - contentTopic: string; - pubsubTopic: string; -} - -export interface TelemetryPushError extends TelemetryMessage { - peerId: string; - errorMessage: string; - peerIdRemote?: string; - contentTopic?: string; - pubsubTopic?: string; -} - -export interface TelemetryGeneric extends TelemetryMessage { - peerId: string; - metricType: string; - contentTopic?: string; - pubsubTopic?: string; - genericData?: string; - errorMessage?: string; -} - +import { TelemetryMessage } from "./types"; export class TelemetryClient { constructor( diff --git a/examples/dogfooding/src/lib/telemetry/types.ts b/examples/dogfooding/src/lib/telemetry/types.ts new file mode 100644 index 0000000..d261059 --- /dev/null +++ b/examples/dogfooding/src/lib/telemetry/types.ts @@ -0,0 +1,45 @@ +export enum TelemetryType { + LIGHT_PUSH_FILTER = "LightPushFilter", + LIGHT_PUSH_ERROR = "LightPushError", + GENERIC = "Generic" + } + + // Top level structure of a telemetry request + export interface TelemetryRequest { + id: number; + telemetryType: TelemetryType; + telemetryData: any; // Using 'any' to represent the raw JSON data + } + + // Common to all telemetry messages + export interface TelemetryMessage { + timestamp: number; + messageType: TelemetryType; + } + + export interface TelemetryPushFilter extends TelemetryMessage { + peerIdSender: string; + peerIdReporter: string; + sequenceHash: string; + sequenceTotal: number; + sequenceIndex: number; + contentTopic: string; + pubsubTopic: string; + } + + export interface TelemetryPushError extends TelemetryMessage { + peerId: string; + errorMessage: string; + peerIdRemote?: string; + contentTopic?: string; + pubsubTopic?: string; + } + + export interface TelemetryGeneric extends TelemetryMessage { + peerId: string; + metricType: string; + contentTopic?: string; + pubsubTopic?: string; + genericData?: string; + errorMessage?: string; + } \ No newline at end of file diff --git a/examples/dogfooding/src/lib/waku/events.ts b/examples/dogfooding/src/lib/waku/events.ts new file mode 100644 index 0000000..39ca6d4 --- /dev/null +++ b/examples/dogfooding/src/lib/waku/events.ts @@ -0,0 +1,7 @@ +const SEQUENCE_COMPLETED = new CustomEvent("sequenceCompleted"); +const MESSAGE_SENT = new CustomEvent("messageSent"); + +export const EVENTS = { + SEQUENCE_COMPLETED, + MESSAGE_SENT, +} \ No newline at end of file diff --git a/examples/dogfooding/src/lib/waku/filter.ts b/examples/dogfooding/src/lib/waku/filter.ts new file mode 100644 index 0000000..9a109da --- /dev/null +++ b/examples/dogfooding/src/lib/waku/filter.ts @@ -0,0 +1,41 @@ +import { createDecoder, type LightNode, type DecodedMessage, utils } from "@waku/sdk"; +import { TelemetryClient } from "../telemetry/client"; +import { ProtoSequencedMessage } from "./proto"; +import { DEFAULT_CONTENT_TOPIC } from "../../constants"; +import { TelemetryPushFilter, TelemetryType } from "../telemetry/types"; + +export async function startFilterSubscription (waku: LightNode, telemetry: TelemetryClient) { + const decoder = createDecoder(DEFAULT_CONTENT_TOPIC); + + const messagesReceived = document.getElementById("messagesReceived"); + const subscriptionCallback = (message: DecodedMessage) => { + const sequencedMessage: any = ProtoSequencedMessage.decode( + message.payload + ); + + // Don't bother reporting messages sent by this same node + if (sequencedMessage.sender === waku.libp2p.peerId.toString()) { + return; + } + telemetry.push([ + { + messageType: TelemetryType.LIGHT_PUSH_FILTER, + timestamp: Math.floor(new Date().getTime() / 1000), + peerIdSender: sequencedMessage.sender, + peerIdReporter: waku.libp2p.peerId.toString(), + sequenceHash: sequencedMessage.hash, + sequenceTotal: sequencedMessage.total, + sequenceIndex: sequencedMessage.index, + contentTopic: DEFAULT_CONTENT_TOPIC, + pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC), + }, + ]); + + const messageElement = document.createElement("div"); + messageElement.textContent = `Message: ${sequencedMessage.hash} ${sequencedMessage.index} of ${sequencedMessage.total}`; + messagesReceived.appendChild(messageElement); + messagesReceived.appendChild(document.createElement("br")); + }; + + await waku.filter.subscribe(decoder, subscriptionCallback); + }; \ No newline at end of file diff --git a/examples/dogfooding/src/lib/waku/index.ts b/examples/dogfooding/src/lib/waku/index.ts new file mode 100644 index 0000000..c69ab12 --- /dev/null +++ b/examples/dogfooding/src/lib/waku/index.ts @@ -0,0 +1,19 @@ +import { createLightNode, waitForRemotePeer } from "@waku/sdk"; +import { nodes as bootstrap, DEFAULT_CONTENT_TOPIC } from "../../constants"; +import {multiaddr} from '@multiformats/multiaddr' + + + +export async function setupWaku() { + const waku = await createLightNode({contentTopics: [DEFAULT_CONTENT_TOPIC], defaultBootstrap: true}) + await waku.start(); + // TODO: https://github.com/waku-org/js-waku/issues/2079 + await Promise.all(getBootstrapNodes().map((node) => waku.dial(node))); + await waitForRemotePeer(waku); + + return waku; + } + + function getBootstrapNodes(nodes: string[] = bootstrap) { + return bootstrap.map((node) => multiaddr(node)); + } \ No newline at end of file diff --git a/examples/dogfooding/src/lib/waku/lightpush.ts b/examples/dogfooding/src/lib/waku/lightpush.ts new file mode 100644 index 0000000..71ab92d --- /dev/null +++ b/examples/dogfooding/src/lib/waku/lightpush.ts @@ -0,0 +1,96 @@ +import { createEncoder, utils, type LightNode } from "@waku/sdk"; +import { generateRandomNumber, hashNumber } from "../../util"; +import { ProtoSequencedMessage } from "./proto"; +import { DEFAULT_CONTENT_TOPIC } from "../../constants"; +import { TelemetryClient } from "../telemetry/client"; +import { EVENTS } from "./events"; +import { TelemetryPushError, TelemetryPushFilter, TelemetryType } from "../telemetry/types"; + +export async function startLightPushSequence ( + waku: LightNode, + telemetry: TelemetryClient, + numMessages: number = 10, + period: number = 10_000 + ) { + console.info("Starting a new lightpush sequence"); + const sequenceHash = await hashNumber(generateRandomNumber()); + const sequenceTotal = numMessages; + let sequenceIndex = 0; + + const sendMessage = async () => { + try { + const message = ProtoSequencedMessage.create({ + hash: sequenceHash, + total: sequenceTotal, + index: sequenceIndex, + sender: waku.libp2p.peerId.toString(), + }); + const payload = ProtoSequencedMessage.encode(message).finish(); + const result = await waku.lightPush.send(encoder, { + payload, + timestamp: new Date(), + }); + console.log("light push successes: ", result.successes.length); + console.log("light push failures: ", result.failures.length); + console.error(result.failures) + if (result.successes.length > 0) { + // Push to telemetry client + telemetry.push([ + { + messageType: TelemetryType.LIGHT_PUSH_FILTER, + timestamp: Math.floor(new Date().getTime() / 1000), + peerIdSender: waku.libp2p.peerId.toString(), + peerIdReporter: waku.libp2p.peerId.toString(), + sequenceHash: sequenceHash, + sequenceTotal: sequenceTotal, + sequenceIndex: sequenceIndex, + contentTopic: DEFAULT_CONTENT_TOPIC, + pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC), + }, + ]); + + // Update ui + const messageElement = document.createElement("div"); + const messagesSent = document.getElementById("messagesSent"); + messageElement.textContent = `Message: ${sequenceHash} ${sequenceIndex} of ${sequenceTotal}`; + messagesSent.insertBefore(messageElement, messagesSent.firstChild); + messagesSent.insertBefore( + document.createElement("br"), + messagesSent.firstChild + ); + + document.dispatchEvent(EVENTS.MESSAGE_SENT); + + // Increment sequence + sequenceIndex++; + } + if (result.failures.length > 0) { + telemetry.push( + result.failures.map((failure) => ({ + messageType: TelemetryType.LIGHT_PUSH_ERROR, + timestamp: Math.floor(new Date().getTime() / 1000), + peerId: waku.libp2p.peerId.toString(), + peerIdRemote: failure.peerId?.toString(), + errorMessage: failure.error.toString(), + contentTopic: DEFAULT_CONTENT_TOPIC, + pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC), + + })) + ); + } + if (sequenceIndex < sequenceTotal) { + setTimeout(sendMessage, period); // Schedule the next send + } else { + document.dispatchEvent(EVENTS.SEQUENCE_COMPLETED); + } + } catch (error) { + console.error("Error sending message", error); + } + }; + + sendMessage(); // Start the recursive sending + }; + + const encoder = createEncoder({ + contentTopic: DEFAULT_CONTENT_TOPIC, +}); \ No newline at end of file diff --git a/examples/dogfooding/src/lib/waku/proto.ts b/examples/dogfooding/src/lib/waku/proto.ts new file mode 100644 index 0000000..0d27842 --- /dev/null +++ b/examples/dogfooding/src/lib/waku/proto.ts @@ -0,0 +1,7 @@ +import { Field, Type } from "protobufjs"; + +export const ProtoSequencedMessage = new Type("SequencedMessage") + .add(new Field("hash", 1, "string")) + .add(new Field("total", 2, "uint64")) + .add(new Field("index", 3, "uint64")) + .add(new Field("sender", 4, "string")); \ No newline at end of file