mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 23:43:07 +00:00
Add traces for benchmarking
This commit is contained in:
parent
5ea532bc80
commit
0ecd0363cb
@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||||||
else:
|
else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
|
from std/times import getTime, toUnixFloat, `-`
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[hashes, options, sugar, tables, strutils, sequtils, os],
|
std/[hashes, options, sugar, tables, strutils, sequtils, os],
|
||||||
chronos, chronicles, metrics,
|
chronos, chronicles, metrics,
|
||||||
@ -47,7 +49,6 @@ import
|
|||||||
./config,
|
./config,
|
||||||
./peer_manager
|
./peer_manager
|
||||||
|
|
||||||
|
|
||||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||||
declarePublicHistogram waku_histogram_message_size, "message size histogram in kB",
|
declarePublicHistogram waku_histogram_message_size, "message size histogram in kB",
|
||||||
buckets = [0.0, 5.0, 15.0, 50.0, 100.0, 300.0, 700.0, 1000.0, Inf]
|
buckets = [0.0, 5.0, 15.0, 50.0, 100.0, 300.0, 700.0, 1000.0, Inf]
|
||||||
@ -214,6 +215,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
|||||||
payloadSizeBytes=msg.payload.len
|
payloadSizeBytes=msg.payload.len
|
||||||
|
|
||||||
let msgSizeKB = msg.payload.len/1000
|
let msgSizeKB = msg.payload.len/1000
|
||||||
|
echo "nwaku msg received: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
|
|
||||||
waku_node_messages.inc(labelValues = ["relay"])
|
waku_node_messages.inc(labelValues = ["relay"])
|
||||||
waku_histogram_message_size.observe(msgSizeKB)
|
waku_histogram_message_size.observe(msgSizeKB)
|
||||||
@ -342,7 +344,7 @@ proc publish*(
|
|||||||
pubsubTopic=pubsubTopic,
|
pubsubTopic=pubsubTopic,
|
||||||
hash=pubsubTopic.computeMessageHash(message).to0xHex(),
|
hash=pubsubTopic.computeMessageHash(message).to0xHex(),
|
||||||
publishTime=getNowInNanosecondTime()
|
publishTime=getNowInNanosecondTime()
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc startRelay*(node: WakuNode) {.async.} =
|
proc startRelay*(node: WakuNode) {.async.} =
|
||||||
@ -961,10 +963,10 @@ proc lightpushPublish*(node: WakuNode, pubsubTopic: Option[PubsubTopic], message
|
|||||||
return err(msg)
|
return err(msg)
|
||||||
|
|
||||||
let publishRes = await node.lightpushPublish(pubsubTopic, message, peer=peerOpt.get())
|
let publishRes = await node.lightpushPublish(pubsubTopic, message, peer=peerOpt.get())
|
||||||
|
|
||||||
if publishRes.isErr():
|
if publishRes.isErr():
|
||||||
error "failed to publish message", error=publishRes.error
|
error "failed to publish message", error=publishRes.error
|
||||||
|
|
||||||
return publishRes
|
return publishRes
|
||||||
|
|
||||||
|
|
||||||
@ -988,7 +990,7 @@ proc mountRlnRelay*(node: WakuNode,
|
|||||||
# register rln validator as default validator
|
# register rln validator as default validator
|
||||||
debug "Registering RLN validator"
|
debug "Registering RLN validator"
|
||||||
node.wakuRelay.addValidator(validator, "RLN validation failed")
|
node.wakuRelay.addValidator(validator, "RLN validation failed")
|
||||||
|
|
||||||
node.wakuRlnRelay = rlnRelay
|
node.wakuRlnRelay = rlnRelay
|
||||||
|
|
||||||
## Waku peer-exchange
|
## Waku peer-exchange
|
||||||
|
|||||||
@ -3,6 +3,9 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||||||
else:
|
else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
|
from std/times import getTime, toUnixFloat, `-`
|
||||||
|
import std/[sysrand, os]
|
||||||
|
|
||||||
import
|
import
|
||||||
std/sequtils,
|
std/sequtils,
|
||||||
stew/[byteutils, results],
|
stew/[byteutils, results],
|
||||||
@ -108,6 +111,11 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
|
|||||||
return resp.get()
|
return resp.get()
|
||||||
|
|
||||||
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (pubsubTopic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
|
router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (pubsubTopic: string, contentBody: Option[ContentBody]) -> RestApiResponse:
|
||||||
|
let msgSizeInKb = parseInt(getEnv("MSG_SIZE_KB"))
|
||||||
|
echo "Hardcoding message size to: ", msgSizeInKb, " Kbytes"
|
||||||
|
let randomBytes = urandom(msgSizeInKb * 1000)
|
||||||
|
|
||||||
|
echo "nwaku api call: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
if pubsubTopic.isErr():
|
if pubsubTopic.isErr():
|
||||||
return RestApiResponse.badRequest()
|
return RestApiResponse.badRequest()
|
||||||
let pubSubTopic = pubsubTopic.get()
|
let pubSubTopic = pubsubTopic.get()
|
||||||
@ -127,22 +135,29 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes
|
|||||||
var message: WakuMessage = reqWakuMessage.toWakuMessage(version = 0).valueOr:
|
var message: WakuMessage = reqWakuMessage.toWakuMessage(version = 0).valueOr:
|
||||||
return RestApiResponse.badRequest($error)
|
return RestApiResponse.badRequest($error)
|
||||||
|
|
||||||
|
message.payload = randomBytes
|
||||||
|
|
||||||
# if RLN is mounted, append the proof to the message
|
# if RLN is mounted, append the proof to the message
|
||||||
if not node.wakuRlnRelay.isNil():
|
if not node.wakuRlnRelay.isNil():
|
||||||
|
echo "nwaku api before proof: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
# append the proof to the message
|
# append the proof to the message
|
||||||
node.wakuRlnRelay.appendRLNProof(message,
|
node.wakuRlnRelay.appendRLNProof(message,
|
||||||
float64(getTime().toUnix())).isOkOr:
|
float64(getTime().toUnix())).isOkOr:
|
||||||
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
|
return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message")
|
||||||
|
echo "nwaku api after proof: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
|
|
||||||
|
echo "nwaku api before validate: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
|
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
|
||||||
return RestApiResponse.badRequest("Failed to publish: " & error)
|
return RestApiResponse.badRequest("Failed to publish: " & error)
|
||||||
|
echo "nwaku api after validate: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
|
|
||||||
# if we reach here its either a non-RLN message or a RLN message with a valid proof
|
# if we reach here its either a non-RLN message or a RLN message with a valid proof
|
||||||
debug "Publishing message", pubSubTopic=pubSubTopic, rln=not node.wakuRlnRelay.isNil()
|
debug "Publishing message", pubSubTopic=pubSubTopic, rln=not node.wakuRlnRelay.isNil()
|
||||||
|
echo "nwaku api before publish: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
if not (waitFor node.publish(some(pubSubTopic), message).withTimeout(futTimeout)):
|
if not (waitFor node.publish(some(pubSubTopic), message).withTimeout(futTimeout)):
|
||||||
error "Failed to publish message to topic", pubSubTopic=pubSubTopic
|
error "Failed to publish message to topic", pubSubTopic=pubSubTopic
|
||||||
return RestApiResponse.internalServerError("Failed to publish: timedout")
|
return RestApiResponse.internalServerError("Failed to publish: timedout")
|
||||||
|
echo "nwaku api after publish: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
return RestApiResponse.ok()
|
return RestApiResponse.ok()
|
||||||
|
|
||||||
# Autosharding API
|
# Autosharding API
|
||||||
|
|||||||
@ -7,6 +7,8 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||||||
else:
|
else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
|
from std/times import getTime, toUnixFloat, `-`
|
||||||
|
|
||||||
import
|
import
|
||||||
std/strformat,
|
std/strformat,
|
||||||
stew/results,
|
stew/results,
|
||||||
@ -209,10 +211,12 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} =
|
|||||||
let msg = msgRes.get()
|
let msg = msgRes.get()
|
||||||
|
|
||||||
# now sequentially validate the message
|
# now sequentially validate the message
|
||||||
|
echo "nwaku validators start: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
for (validator, _) in w.wakuValidators:
|
for (validator, _) in w.wakuValidators:
|
||||||
let validatorRes = await validator(pubsubTopic, msg)
|
let validatorRes = await validator(pubsubTopic, msg)
|
||||||
if validatorRes != ValidationResult.Accept:
|
if validatorRes != ValidationResult.Accept:
|
||||||
return validatorRes
|
return validatorRes
|
||||||
|
echo "nwaku validators end: ", int64(getTime().toUnixFloat()*1_000_000_000)
|
||||||
return ValidationResult.Accept
|
return ValidationResult.Accept
|
||||||
return wrappedValidator
|
return wrappedValidator
|
||||||
|
|
||||||
@ -234,7 +238,7 @@ proc validateMessage*(w: WakuRelay, pubsubTopic: string, msg: WakuMessage):
|
|||||||
else:
|
else:
|
||||||
return err("Validator failed")
|
return err("Validator failed")
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler =
|
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler =
|
||||||
debug "subscribe", pubsubTopic=pubsubTopic
|
debug "subscribe", pubsubTopic=pubsubTopic
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user