feat: use new telemetry (#88)
This commit is contained in:
parent
50d3cac3a8
commit
a7755a7167
|
@ -5,17 +5,16 @@ import {
|
|||
DecodedMessage,
|
||||
waitForRemotePeer,
|
||||
LightNode,
|
||||
utils
|
||||
utils,
|
||||
} from "@waku/sdk";
|
||||
|
||||
import { Type, Field } from "protobufjs";
|
||||
import {
|
||||
TelemetryClient,
|
||||
TelemetryPushError,
|
||||
TelemetryPushFilter,
|
||||
TelemetryType,
|
||||
} from "./telemetry_client";
|
||||
import { generateRandomNumber, hashNumber } from "./util";
|
||||
import { generateRandomNumber, sha256 } from "./util";
|
||||
|
||||
const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8";
|
||||
const TELEMETRY_URL =
|
||||
|
@ -55,14 +54,19 @@ export async function app(telemetryClient: TelemetryClient) {
|
|||
numMessages: number,
|
||||
period: number = 3000
|
||||
) => {
|
||||
const sequenceHash = await hashNumber(generateRandomNumber());
|
||||
const sequenceHash = await sha256(generateRandomNumber());
|
||||
const sequenceTotal = numMessages;
|
||||
let sequenceIndex = 0;
|
||||
|
||||
const sendMessage = async () => {
|
||||
try {
|
||||
// TODO(weboko): replace with @waku/message-hash ideally
|
||||
const reportingHash = await sha256(`${sequenceHash}-${sequenceIndex}-${sequenceTotal}`);
|
||||
|
||||
const timestamp = Math.floor(new Date().getTime() / 1000);
|
||||
const message = ProtoSequencedMessage.create({
|
||||
hash: sequenceHash,
|
||||
hash: reportingHash,
|
||||
seqHash: sequenceHash,
|
||||
total: sequenceTotal,
|
||||
index: sequenceIndex,
|
||||
sender: peerId,
|
||||
|
@ -83,16 +87,19 @@ export async function app(telemetryClient: TelemetryClient) {
|
|||
// Push to telemetry client
|
||||
telemetryClient.push<TelemetryPushFilter>([
|
||||
{
|
||||
messageType: TelemetryType.LIGHT_PUSH_FILTER,
|
||||
timestamp: Math.floor(new Date().getTime() / 1000),
|
||||
peerIdSender: peerId,
|
||||
peerIdReporter: peerId,
|
||||
sequenceHash: sequenceHash,
|
||||
sequenceTotal: sequenceTotal,
|
||||
sequenceIndex: sequenceIndex,
|
||||
type: TelemetryType.LIGHT_PUSH_FILTER,
|
||||
protocol: "lightPush",
|
||||
timestamp: timestamp,
|
||||
createdAt: timestamp,
|
||||
seenTimestamp: timestamp,
|
||||
peerId,
|
||||
contentTopic: DEFAULT_CONTENT_TOPIC,
|
||||
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
|
||||
},
|
||||
ephemeral: false,
|
||||
messageHash: reportingHash,
|
||||
errorMessage: "",
|
||||
extraData: "",
|
||||
}
|
||||
]);
|
||||
|
||||
// Update ui
|
||||
|
@ -111,14 +118,20 @@ export async function app(telemetryClient: TelemetryClient) {
|
|||
sequenceIndex++;
|
||||
}
|
||||
if (result.failures.length > 0) {
|
||||
telemetryClient.push<TelemetryPushError>(
|
||||
telemetryClient.push<TelemetryPushFilter>(
|
||||
result.failures.map((failure) => ({
|
||||
messageType: TelemetryType.LIGHT_PUSH_ERROR,
|
||||
timestamp: Math.floor(new Date().getTime() / 1000),
|
||||
peerId: peerId,
|
||||
peerIdRemote: failure.peerId?.toString(),
|
||||
errorMessage: failure.error.toString(),
|
||||
type: TelemetryType.LIGHT_PUSH_FILTER,
|
||||
protocol: "lightPush",
|
||||
timestamp: timestamp,
|
||||
createdAt: timestamp,
|
||||
seenTimestamp: timestamp,
|
||||
peerId,
|
||||
contentTopic: DEFAULT_CONTENT_TOPIC,
|
||||
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
|
||||
ephemeral: false,
|
||||
messageHash: reportingHash,
|
||||
errorMessage: failure.error.toString(),
|
||||
extraData: "",
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
@ -140,30 +153,35 @@ export async function app(telemetryClient: TelemetryClient) {
|
|||
|
||||
const messagesReceived = document.getElementById("messagesReceived");
|
||||
const subscriptionCallback = (message: DecodedMessage) => {
|
||||
const sequencedMessage: any = ProtoSequencedMessage.decode(
|
||||
const decodedMessage: any = ProtoSequencedMessage.decode(
|
||||
message.payload
|
||||
);
|
||||
|
||||
// Don't bother reporting messages sent by this same node
|
||||
if (sequencedMessage.sender === peerId) {
|
||||
if (decodedMessage.sender === peerId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const timestamp = Math.floor(new Date().getTime() / 1000);
|
||||
telemetryClient.push<TelemetryPushFilter>([
|
||||
{
|
||||
messageType: TelemetryType.LIGHT_PUSH_FILTER,
|
||||
timestamp: Math.floor(new Date().getTime() / 1000),
|
||||
peerIdSender: sequencedMessage.sender,
|
||||
peerIdReporter: peerId,
|
||||
sequenceHash: sequencedMessage.hash,
|
||||
sequenceTotal: sequencedMessage.total,
|
||||
sequenceIndex: sequencedMessage.index,
|
||||
contentTopic: DEFAULT_CONTENT_TOPIC,
|
||||
pubsubTopic: utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC),
|
||||
type: TelemetryType.LIGHT_PUSH_FILTER,
|
||||
protocol: "filter",
|
||||
timestamp,
|
||||
createdAt: Math.floor(message.timestamp.getTime() / 1000),
|
||||
seenTimestamp: timestamp,
|
||||
peerId: decodedMessage.sender,
|
||||
contentTopic: message.contentTopic,
|
||||
pubsubTopic: message.pubsubTopic,
|
||||
ephemeral: message.ephemeral,
|
||||
messageHash: decodedMessage.hash,
|
||||
errorMessage: "",
|
||||
extraData: "",
|
||||
},
|
||||
]);
|
||||
|
||||
const messageElement = document.createElement("div");
|
||||
messageElement.textContent = `Message: ${sequencedMessage.hash} ${sequencedMessage.index} of ${sequencedMessage.total}`;
|
||||
messageElement.textContent = `Message: ${decodedMessage.hash} ${decodedMessage.index} of ${decodedMessage.total}`;
|
||||
messagesReceived.appendChild(messageElement);
|
||||
messagesReceived.appendChild(document.createElement("br"));
|
||||
|
||||
|
|
|
@ -1,50 +1,27 @@
|
|||
export enum TelemetryType {
|
||||
LIGHT_PUSH_FILTER = "LightPushFilter",
|
||||
LIGHT_PUSH_ERROR = "LightPushError",
|
||||
GENERIC = "Generic"
|
||||
}
|
||||
|
||||
// Top level structure of a telemetry request
|
||||
export interface TelemetryRequest {
|
||||
id: number;
|
||||
telemetryType: TelemetryType;
|
||||
telemetryData: any; // Using 'any' to represent the raw JSON data
|
||||
}
|
||||
|
||||
// Common to all telemetry messages
|
||||
export interface TelemetryMessage {
|
||||
interface TelemetryMessage {
|
||||
type: string;
|
||||
|
||||
timestamp: number;
|
||||
messageType: TelemetryType;
|
||||
contentTopic: string;
|
||||
pubsubTopic: string;
|
||||
peerId: string;
|
||||
errorMessage: string;
|
||||
extraData: string;
|
||||
}
|
||||
|
||||
export interface TelemetryPushFilter extends TelemetryMessage {
|
||||
peerIdSender: string;
|
||||
peerIdReporter: string;
|
||||
sequenceHash: string;
|
||||
sequenceTotal: number;
|
||||
sequenceIndex: number;
|
||||
contentTopic: string;
|
||||
pubsubTopic: string;
|
||||
type: "LightPushFilter",
|
||||
protocol: string;
|
||||
ephemeral: boolean;
|
||||
seenTimestamp: number;
|
||||
createdAt: number;
|
||||
messageHash: string;
|
||||
}
|
||||
|
||||
export interface TelemetryPushError extends TelemetryMessage {
|
||||
peerId: string;
|
||||
errorMessage: string;
|
||||
peerIdRemote?: string;
|
||||
contentTopic?: string;
|
||||
pubsubTopic?: string;
|
||||
}
|
||||
|
||||
export interface TelemetryGeneric extends TelemetryMessage {
|
||||
peerId: string;
|
||||
metricType: string;
|
||||
contentTopic?: string;
|
||||
pubsubTopic?: string;
|
||||
genericData?: string;
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
|
||||
export class TelemetryClient {
|
||||
constructor(
|
||||
private readonly url: string,
|
||||
|
@ -82,7 +59,7 @@ export class TelemetryClient {
|
|||
private async send<T extends TelemetryMessage>(messages: T[]) {
|
||||
const telemetryRequests = messages.map((message) => ({
|
||||
id: ++this.requestId,
|
||||
telemetryType: message.messageType.toString(),
|
||||
telemetryType: message.type.toString(),
|
||||
telemetryData: message
|
||||
}));
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ export const generateRandomNumber = (): number => {
|
|||
return Math.floor(Math.random() * 1000000);
|
||||
};
|
||||
|
||||
export const hashNumber = async (number: number): Promise<string> => {
|
||||
export const sha256 = async (number: number | string ): Promise<string> => {
|
||||
const encoder = new TextEncoder();
|
||||
const data = encoder.encode(number.toString());
|
||||
const buffer = await crypto.subtle.digest("SHA-256", data);
|
||||
|
|
Loading…
Reference in New Issue