feat: add example for measuring ratio of messages sent to received

This commit is contained in:
Arseniy Klempner 2024-05-14 21:41:05 -07:00
parent d62e3579fd
commit 36564f2ba6
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
11 changed files with 8689 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

View File

@ -0,0 +1,48 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta content="width=device-width, initial-scale=1.0" name="viewport" />
<title>Sent Received Message Ratio</title>
<link rel="apple-touch-icon" href="./favicon.png" />
<link rel="manifest" href="./manifest.json" />
<link rel="icon" href="./favicon.ico" />
<style>
#container {
display: flex;
height: 100vh;
}
#sender,
#receiver {
flex: 1;
display: flex;
flex-direction: column;
align-items: center;
justify-content: flex-start;
}
</style>
</head>
<body>
<div>
<h3>Waku Dogfooding App</h3>
<div id="runningScreen" style="display: none">
<label for="wallet">Wallet Address:</label>
<span id="wallet"></span>
<br />
<label for="numSent">Messages Sent:</label>
<span id="numSent">0</span>
</div>
</div>
<div id="container">
<div id="sender">
<h3>Sent</h3>
<div id="messagesSent"></div>
</div>
<div id="receiver">
<h3>Received</h3>
<div id="messagesReceived"></div>
</div>
</div>
<script src="./index.js"></script>
</body>
</html>

View File

@ -0,0 +1,19 @@
{
"name": "Sent Received Message Ratio",
"description": "Example meant to run across two browsers to test ratio of messages sent to messages received",
"icons": [
{
"src": "favicon.ico",
"sizes": "64x64 32x32 24x24 16x16",
"type": "image/x-icon"
},
{
"src": "favicon.png",
"type": "image/png",
"sizes": "192x192"
}
],
"display": "standalone",
"theme_color": "#ffffff",
"background_color": "#ffffff"
}

8256
examples/dogfooding/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,25 @@
{
"name": "waku-dogfooding",
"version": "0.0.1",
"private": true,
"scripts": {
"build": "webpack --config webpack.config.js",
"start": "webpack-dev-server"
},
"dependencies": {
"@libp2p/peer-id": "^4.1.2",
"@waku/sdk": "0.0.26-16e9116.0",
"protobufjs": "^7.3.0"
},
"devDependencies": {
"@types/node": "^20.12.11",
"copy-webpack-plugin": "^11.0.0",
"eslint": "^8",
"eslint-config-next": "13.5.6",
"ts-loader": "^9.5.1",
"typescript": "^5.4.5",
"webpack": "^5.74.0",
"webpack-cli": "^4.10.0",
"webpack-dev-server": "^4.11.1"
}
}

View File

@ -0,0 +1,194 @@
import {
createLightNode,
createEncoder,
createDecoder,
DecodedMessage,
waitForRemotePeer,
DefaultPubsubTopic,
LightNode,
} from "@waku/sdk";
import { Type, Field } from "protobufjs";
import {
TelemetryClient,
TelemetryPushFilter,
TelemetryType,
} from "./telemetry_client";
import { generateRandomNumber, hashNumber } from "./util";
const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8";
const TELEMETRY_URL = process.env.TELEMETRY_URL || "http://localhost:8080/waku-metrics";
const ProtoSequencedMessage = new Type("SequencedMessage")
.add(new Field("hash", 1, "string"))
.add(new Field("total", 2, "uint64"))
.add(new Field("index", 3, "uint64"))
.add(new Field("sender", 4, "string"));
const sequenceCompletedEvent = new CustomEvent("sequenceCompleted");
const messageSentEvent = new CustomEvent("messageSent");
const wakuNode = async (): Promise<LightNode> => {
return await createLightNode({
contentTopics: [DEFAULT_CONTENT_TOPIC],
defaultBootstrap: true
});
};
export async function app(telemetryClient: TelemetryClient) {
const node = await wakuNode();
await node.start();
await waitForRemotePeer(node);
const peerId = node.libp2p.peerId.toString();
const encoder = createEncoder({
contentTopic: DEFAULT_CONTENT_TOPIC,
});
const startLightPushSequence = async (
numMessages: number,
period: number = 3000
) => {
const sequenceHash = await hashNumber(generateRandomNumber());
const sequenceTotal = numMessages;
let sequenceIndex = 0;
const sendMessage = async () => {
try {
const message = ProtoSequencedMessage.create({
hash: sequenceHash,
total: sequenceTotal,
index: sequenceIndex,
sender: peerId,
});
const payload = ProtoSequencedMessage.encode(message).finish();
const result = await node.lightPush.send(encoder, {
payload,
timestamp: new Date(),
});
console.log("light push successes: ", result.successes.length);
console.log("light push failures: ", result.failures.length);
if (result.successes.length > 0) {
// Push to telemetry client
telemetryClient.push<TelemetryPushFilter>([
{
messageType: TelemetryType.LIGHT_PUSH_FILTER,
timestamp: Math.floor(new Date().getTime() / 1000),
peerIdSender: peerId,
peerIdReporter: peerId,
sequenceHash: sequenceHash,
sequenceTotal: sequenceTotal,
sequenceIndex: sequenceIndex,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DefaultPubsubTopic,
},
]);
// Update ui
const messageElement = document.createElement("div");
const messagesSent = document.getElementById("messagesSent");
messageElement.textContent = `Message: ${sequenceHash} ${sequenceIndex} of ${sequenceTotal}`;
messagesSent.insertBefore(messageElement, messagesSent.firstChild);
messagesSent.insertBefore(
document.createElement("br"),
messagesSent.firstChild
);
document.dispatchEvent(messageSentEvent);
// Increment sequence
sequenceIndex++;
}
if (result.failures.length > 0) {
console.error("Failed to send message", result.failures);
}
if (sequenceIndex < sequenceTotal) {
setTimeout(sendMessage, period); // Schedule the next send
} else {
document.dispatchEvent(sequenceCompletedEvent);
}
} catch (error) {
console.error("Error sending message", error);
}
};
sendMessage(); // Start the recursive sending
};
const startFilterSubscription = async () => {
const decoder = createDecoder(DEFAULT_CONTENT_TOPIC);
const messagesReceived = document.getElementById("messagesReceived");
const subscriptionCallback = (message: DecodedMessage) => {
const sequencedMessage: any = ProtoSequencedMessage.decode(
message.payload
);
// Don't bother reporting messages sent by this same node
if (sequencedMessage.sender === peerId) {
return;
}
telemetryClient.push<TelemetryPushFilter>([
{
messageType: TelemetryType.LIGHT_PUSH_FILTER,
timestamp: Math.floor(new Date().getTime() / 1000),
peerIdSender: sequencedMessage.sender,
peerIdReporter: peerId,
sequenceHash: sequencedMessage.hash,
sequenceTotal: sequencedMessage.total,
sequenceIndex: sequencedMessage.index,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DefaultPubsubTopic,
},
]);
const messageElement = document.createElement("div");
messageElement.textContent = `Message: ${sequencedMessage.hash} ${sequencedMessage.index} of ${sequencedMessage.total}`;
messagesReceived.appendChild(messageElement);
messagesReceived.appendChild(document.createElement("br"));
};
await node.filter.subscribe(decoder, subscriptionCallback);
};
return {
node,
startLightPushSequence,
startFilterSubscription,
};
}
(async () => {
const telemetryClient = new TelemetryClient(
TELEMETRY_URL,
5000
);
const { node, startLightPushSequence, startFilterSubscription } = await app(
telemetryClient
);
(window as any).waku = node;
const runningScreen = document.getElementById("runningScreen");
runningScreen.style.display = "block";
await telemetryClient.start();
startFilterSubscription();
let sentMessagesCount = 0;
const sentMessagesCounter = document.getElementById(
"numSent"
) as HTMLSpanElement;
document.addEventListener("messageSent", () => {
sentMessagesCount++;
sentMessagesCounter.textContent = sentMessagesCount.toString();
});
function startSequence() {
const numMessages = Math.floor(Math.random() * 16) + 5;
const messagePeriod = Math.floor(Math.random() * 2001) + 1000;
startLightPushSequence(numMessages, messagePeriod);
}
document.addEventListener(sequenceCompletedEvent.type, () => startSequence());
startSequence();
})();

View File

@ -0,0 +1,86 @@
export enum TelemetryType {
LIGHT_PUSH_FILTER = "LightPushFilter",
}
// Top level structure of a telemetry request
export interface TelemetryRequest {
id: number;
telemetryType: TelemetryType;
telemetryData: any; // Using 'any' to represent the raw JSON data
}
// Common to all telemetry messages
export interface TelemetryMessage {
timestamp: number;
messageType: TelemetryType;
}
export interface TelemetryPushFilter extends TelemetryMessage {
peerIdSender: string;
peerIdReporter: string;
sequenceHash: string;
sequenceTotal: number;
sequenceIndex: number;
contentTopic: string;
pubsubTopic: string;
}
export class TelemetryClient {
constructor(
private readonly url: string,
private intervalPeriod: number = 5000
) {}
private queue: TelemetryMessage[] = [];
private intervalId: NodeJS.Timeout | null = null;
private requestId = 0;
public push<T extends TelemetryMessage>(messages: T[]) {
this.queue.push(...messages);
}
public async start() {
if (!this.intervalId) {
this.intervalId = setInterval(async () => {
if (this.queue.length > 0) {
const success = await this.send(this.queue);
if (success) {
console.log("Sent ", this.queue.length, " telemetry logs");
this.queue = [];
}
}
}, this.intervalPeriod);
}
}
public stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
private async send<T extends TelemetryMessage>(messages: T[]) {
const telemetryRequests = messages.map((message) => ({
id: ++this.requestId,
telemetryType: message.messageType.toString(),
telemetryData: message
}));
try {
const res = await fetch(this.url, {
method: "POST",
body: JSON.stringify(telemetryRequests),
});
if (res.status !== 201) {
console.error("Error sending messages to telemetry service: ", res.status, res.statusText, res.json);
return false
}
return true;
} catch (e) {
console.error("Error sending messages to telemetry service: ", e);
console.error("Failed trying to send the following messages: ", telemetryRequests);
return false;
}
}
}

View File

@ -0,0 +1,12 @@
export const generateRandomNumber = (): number => {
return Math.floor(Math.random() * 1000000);
};
export const hashNumber = async (number: number): Promise<string> => {
const encoder = new TextEncoder();
const data = encoder.encode(number.toString());
const buffer = await crypto.subtle.digest("SHA-256", data);
return Array.from(new Uint8Array(buffer))
.map((b) => b.toString(16).padStart(2, "0"))
.join("");
};

View File

@ -0,0 +1,14 @@
{
"compilerOptions": {
"outDir": "./dist/",
"noImplicitAny": true,
"module": "es6",
"target": "es5",
"jsx": "react",
"allowJs": true,
"moduleResolution": "node"
},
"include": [
"./src/**/*"
]
}

View File

@ -0,0 +1,35 @@
const CopyWebpackPlugin = require("copy-webpack-plugin");
const path = require("path");
const webpack = require("webpack");
module.exports = {
entry: "./src/index.ts", // Changed from index.js to index.ts
output: {
path: path.resolve(__dirname, "build"),
filename: "index.js",
},
experiments: {
asyncWebAssembly: true,
},
resolve: {
extensions: ['.ts', '.js'], // Add .ts to the extensions
},
module: {
rules: [
{
test: /\.ts$/, // Add a rule for TypeScript files
use: 'ts-loader',
exclude: /node_modules/,
},
],
},
mode: "development",
plugins: [
new CopyWebpackPlugin({
patterns: ["index.html", "favicon.ico", "favicon.png", "manifest.json"],
}),
new webpack.DefinePlugin({
'process.env.TELEMETRY_URL': JSON.stringify(process.env.TELEMETRY_URL || "https://telemetry.status.im/waku-metrics")
}),
],
};