chore: dogfood subscription and peer managment rework (#111)

This commit is contained in:
Sasha 2025-01-23 11:59:38 +01:00 committed by GitHub
parent cb5031047a
commit 836b1e8920
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 1538 additions and 1206 deletions

File diff suppressed because it is too large Load Diff

View File

@ -7,16 +7,15 @@
"start": "webpack-dev-server"
},
"dependencies": {
"@libp2p/crypto": "^4.0.1",
"@libp2p/peer-id-factory": "^4.2.4",
"@libp2p/crypto": "^5.0.5",
"@multiformats/multiaddr": "^12.3.1",
"@waku/sdk": "0.0.29-11bf1c1.0",
"libp2p": "^1.8.3",
"@waku/sdk": "0.0.30-c43cec2.0",
"libp2p": "^2.1.10",
"protobufjs": "^7.3.0",
"uint8arrays": "^5.1.0"
},
"devDependencies": {
"@libp2p/interface": "^1.7.0",
"@libp2p/interface": "^2.1.3",
"@types/node": "^20.12.11",
"copy-webpack-plugin": "^11.0.0",
"eslint": "^8",

View File

@ -6,8 +6,7 @@ import {
LightNode,
utils,
} from "@waku/sdk";
import { createFromPrivKey } from "@libp2p/peer-id-factory";
import { unmarshalPrivateKey, generateKeyPairFromSeed } from "@libp2p/crypto/keys";
import { generateKeyPairFromSeed } from "@libp2p/crypto/keys";
import { fromString } from "uint8arrays";
import { Type, Field } from "protobufjs";
@ -16,10 +15,17 @@ import {
TelemetryPushFilter,
TelemetryType,
} from "./telemetry_client";
import { generateRandomNumber, sha256, buildExtraData, DEFAULT_EXTRA_DATA_STR } from "./util";
import {
generateRandomNumber,
sha256,
buildExtraData,
DEFAULT_EXTRA_DATA_STR,
} from "./util";
const DEFAULT_CONTENT_TOPIC = "/js-waku-examples/1/message-ratio/utf8";
const DEFAULT_PUBSUB_TOPIC = utils.contentTopicToPubsubTopic(DEFAULT_CONTENT_TOPIC);
const DEFAULT_PUBSUB_TOPIC = utils.contentTopicToPubsubTopic(
DEFAULT_CONTENT_TOPIC
);
const TELEMETRY_URL =
process.env.TELEMETRY_URL || "http://localhost:8080/waku-metrics";
@ -33,34 +39,34 @@ const sequenceCompletedEvent = new CustomEvent("sequenceCompleted");
const messageSentEvent = new CustomEvent("messageSent");
const messageReceivedEvent = new CustomEvent("messageReceived");
const wakuNode = async (): Promise<LightNode> => {
async function wakuNode(): Promise<LightNode> {
let seed = localStorage.getItem("seed");
if (!seed) {
seed = (await sha256(generateRandomNumber())).slice(0, 32)
seed = (await sha256(generateRandomNumber())).slice(0, 32);
localStorage.setItem("seed", seed);
}
const privateKey = await generateKeyPairFromSeed("Ed25519", fromString(seed));
return await createLightNode({
return createLightNode({
networkConfig: {
contentTopics: [DEFAULT_CONTENT_TOPIC],
},
numPeersToUse: 2,
defaultBootstrap: true,
libp2p: {
peerId: await createFromPrivKey(await unmarshalPrivateKey(privateKey.bytes))
}
privateKey,
},
});
};
}
export async function app(telemetryClient: TelemetryClient) {
const node = await wakuNode();
(window as any).waku = node;
console.log("DEBUG: your peer ID is:", node.peerId.toString());
console.log("DEBUG: your peer ID is:", node.libp2p.peerId.toString());
await node.start();
await node.waitForPeers();
@ -83,7 +89,7 @@ export async function app(telemetryClient: TelemetryClient) {
timestamp,
createdAt: timestamp,
seenTimestamp: timestamp,
peerId: peerId,
peerId,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DEFAULT_PUBSUB_TOPIC,
ephemeral: false,
@ -104,91 +110,37 @@ export async function app(telemetryClient: TelemetryClient) {
const sendMessage = async () => {
try {
// TODO(weboko): replace with @waku/message-hash ideally
const messageHash = await sha256(`${sequenceHash}-${sequenceIndex}-${sequenceTotal}`);
const messageHash = await sha256(
`${sequenceHash}-${sequenceIndex}-${sequenceTotal}`
);
const timestamp = Math.floor(new Date().getTime() / 1000);
const message = ProtoSequencedMessage.create({
hash: messageHash,
seqHash: sequenceHash,
total: sequenceTotal,
index: sequenceIndex,
sender: peerId,
});
const payload = ProtoSequencedMessage.encode(message).finish();
const result = await node.lightPush.send(encoder, {
payload,
timestamp: new Date(),
}, {autoRetry: true });
console.log("DEBUG: light push successes: ", result.successes.length, result.successes.map(p => p.toString()));
console.log("DEBUG: light push failures: ", result.failures.length, result.failures.map(f => ({ error: f.error, peerId: f?.peerId?.toString()})));
const result = await node.lightPush.send(
encoder,
{
payload,
timestamp: new Date(),
},
{ autoRetry: true }
);
const successEvents = result
.successes
.map(async (peerId) => {
const extraData = await buildExtraData(node, peerId.toString());
return {
type: TelemetryType.LIGHT_PUSH_FILTER,
protocol: "lightPush",
timestamp: timestamp,
createdAt: timestamp,
seenTimestamp: timestamp,
peerId: peerId.toString(),
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DEFAULT_PUBSUB_TOPIC,
ephemeral: false,
messageHash: messageHash,
errorMessage: "",
extraData,
};
});
const failureEvents = result
.failures
.map(async (fail) => {
const extraData = await buildExtraData(node, fail?.peerId?.toString());
return {
type: TelemetryType.LIGHT_PUSH_FILTER,
protocol: "lightPush",
timestamp: timestamp,
createdAt: timestamp,
seenTimestamp: timestamp,
peerId: fail?.peerId?.toString(),
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DEFAULT_PUBSUB_TOPIC,
ephemeral: false,
messageHash: messageHash,
errorMessage: fail.error.toString(),
extraData,
};
});
const events = await Promise.all([
...successEvents,
...failureEvents,
]);
console.log("DEBUG: light push successes: ", result.successes.length);
console.log(
"DEBUG: light push failures: ",
result.failures.length
);
if (events.length > 0) {
telemetryClient.push<TelemetryPushFilter>(events);
}
// Increment sequence
sequenceIndex++;
if (result.successes.length > 0) {
// Update ui
const messageElement = document.createElement("div");
const messagesSent = document.getElementById("messagesSent");
messageElement.textContent = `Message: ${messageHash} ${sequenceIndex} of ${sequenceTotal}`;
messagesSent.insertBefore(messageElement, messagesSent.firstChild);
messagesSent.insertBefore(
document.createElement("br"),
messagesSent.firstChild
);
document.dispatchEvent(messageSentEvent);
// Increment sequence
sequenceIndex++;
}
if (sequenceIndex < sequenceTotal) {
setTimeout(sendMessage, period); // Schedule the next send
} else {
@ -205,116 +157,23 @@ export async function app(telemetryClient: TelemetryClient) {
const startFilterSubscription = async () => {
const decoder = createDecoder(DEFAULT_CONTENT_TOPIC);
const messagesReceived = document.getElementById("messagesReceived");
const subscriptionCallback = async (message: DecodedMessage) => {
const decodedMessage: any = ProtoSequencedMessage.decode(
message.payload
);
// Don't bother reporting messages sent by this same node
if (decodedMessage.sender === peerId) {
return;
}
const extraData = await buildExtraData(node, decodedMessage.sender);
const timestamp = Math.floor(new Date().getTime() / 1000);
telemetryClient.push<TelemetryPushFilter>([
{
type: TelemetryType.LIGHT_PUSH_FILTER,
protocol: "filter",
timestamp,
createdAt: Math.floor(message.timestamp.getTime() / 1000),
seenTimestamp: timestamp,
peerId: peerId,
contentTopic: message.contentTopic,
pubsubTopic: message.pubsubTopic,
ephemeral: message.ephemeral,
messageHash: decodedMessage.hash,
errorMessage: "",
extraData,
},
]);
const messageElement = document.createElement("div");
messageElement.textContent = `Message: ${decodedMessage.hash} ${decodedMessage.index} of ${decodedMessage.total}`;
messagesReceived.appendChild(messageElement);
messagesReceived.appendChild(document.createElement("br"));
messageElement.textContent = `Message: ${decodedMessage.hash}`;
document.dispatchEvent(messageReceivedEvent);
};
const result = await node.filter.subscribe(decoder, subscriptionCallback, {}, { enableLightPushFilterCheck: true });
let errorEvent = [];
if (result.error) {
const timestamp = Math.floor(new Date().getTime() / 1000);
errorEvent.push({
type: TelemetryType.LIGHT_PUSH_FILTER,
protocol: "filterCreateSubscription",
timestamp,
createdAt: timestamp,
seenTimestamp: timestamp,
peerId: peerId,
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DEFAULT_PUBSUB_TOPIC,
ephemeral: false,
messageHash: await sha256(generateRandomNumber()),
errorMessage: result.error,
extraData: DEFAULT_EXTRA_DATA_STR,
});
}
const failEvents = result.results.failures.map(async (fail) => {
const extraData = await buildExtraData(node, fail?.peerId?.toString());
const timestamp = Math.floor(new Date().getTime() / 1000);
return {
type: TelemetryType.LIGHT_PUSH_FILTER,
protocol: "filterCreateSubscription",
timestamp,
createdAt: timestamp,
seenTimestamp: timestamp,
peerId: fail?.peerId?.toString(),
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DEFAULT_PUBSUB_TOPIC,
ephemeral: false,
messageHash: await sha256(generateRandomNumber()),
errorMessage: fail.error,
extraData,
};
});
const successEvents = result.results.successes.map(async (peerId) => {
const extraData = await buildExtraData(node, peerId.toString());
const timestamp = Math.floor(new Date().getTime() / 1000);
return {
type: TelemetryType.LIGHT_PUSH_FILTER,
protocol: "filterCreateSubscription",
timestamp,
createdAt: timestamp,
seenTimestamp: timestamp,
peerId: peerId.toString(),
contentTopic: DEFAULT_CONTENT_TOPIC,
pubsubTopic: DEFAULT_PUBSUB_TOPIC,
ephemeral: false,
messageHash: await sha256(generateRandomNumber()),
errorMessage: "",
extraData,
};
});
const resolvedEvents = await Promise.all([
...failEvents,
...successEvents,
]);
const events = [
...errorEvent,
...resolvedEvents,
];
if (events.length > 0) {
telemetryClient.push<TelemetryPushFilter>(events);
}
await node.filter.subscribe(decoder, subscriptionCallback);
};
return {
@ -330,39 +189,11 @@ export async function app(telemetryClient: TelemetryClient) {
telemetryClient
);
const peerIDBlock = document.getElementById("peerID");
peerIDBlock.innerText = node.libp2p.peerId.toString();
const runningScreen = document.getElementById("runningScreen");
runningScreen.style.display = "block";
await telemetryClient.start();
startFilterSubscription();
let sentMessagesCount = 0;
const sentMessagesCounter = document.getElementById(
"numSent"
) as HTMLSpanElement;
document.addEventListener("messageSent", () => {
sentMessagesCount++;
sentMessagesCounter.textContent = sentMessagesCount.toString();
});
document.addEventListener(sequenceCompletedEvent.type, () =>
startLightPushSequence(10, 3000)
);
let receivedMessagesCount = 0;
const receivedMessagesCounter = document.getElementById(
"numReceived"
) as HTMLSpanElement;
document.addEventListener("messageReceived", () => {
receivedMessagesCount++;
receivedMessagesCounter.textContent = receivedMessagesCount.toString();
});
function startSequence() {
const numMessages = Math.floor(Math.random() * 16) + 5;
const messagePeriod = Math.floor(Math.random() * 2001) + 5_000;
startLightPushSequence(numMessages, messagePeriod);
}
document.addEventListener(sequenceCompletedEvent.type, () => startSequence());
startSequence();
startLightPushSequence(10, 3000);
})();