mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-05 14:29:29 +00:00
chore: add better e2 trace logging for relay (#1526)
This commit is contained in:
parent
0769d5fe98
commit
e9ba53bbd0
@ -38,6 +38,7 @@ import
|
|||||||
../protocol/waku_peer_exchange,
|
../protocol/waku_peer_exchange,
|
||||||
../utils/peers,
|
../utils/peers,
|
||||||
../utils/wakuenr,
|
../utils/wakuenr,
|
||||||
|
../utils/time,
|
||||||
./peer_manager/peer_manager,
|
./peer_manager/peer_manager,
|
||||||
./dnsdisc/waku_dnsdisc,
|
./dnsdisc/waku_dnsdisc,
|
||||||
./discv5/waku_discv5,
|
./discv5/waku_discv5,
|
||||||
@ -300,13 +301,16 @@ proc subscribe(node: WakuNode, topic: PubsubTopic, handler: Option[TopicHandler]
|
|||||||
|
|
||||||
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
# A default handler should be registered for all topics
|
# A default handler should be registered for all topics
|
||||||
trace "Hit default handler", topic=topic, data=data
|
|
||||||
|
|
||||||
let msg = WakuMessage.decode(data)
|
let msg = WakuMessage.decode(data)
|
||||||
if msg.isErr():
|
if msg.isErr():
|
||||||
# TODO: Add metric to track waku message decode errors
|
# TODO: Add metric to track waku message decode errors
|
||||||
return
|
return
|
||||||
|
|
||||||
|
trace "waku.relay received",
|
||||||
|
pubsubTopic=topic,
|
||||||
|
hash=MultiHash.digest("sha2-256", data).expect("valid hash").data.buffer.to0xHex(), # TODO: this could be replaced by a message UID
|
||||||
|
receivedTime=getNowInNanosecondTime()
|
||||||
|
|
||||||
# Notify mounted protocols of new message
|
# Notify mounted protocols of new message
|
||||||
if not node.wakuFilter.isNil():
|
if not node.wakuFilter.isNil():
|
||||||
@ -371,10 +375,13 @@ proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async,
|
|||||||
# TODO: Improve error handling
|
# TODO: Improve error handling
|
||||||
return
|
return
|
||||||
|
|
||||||
trace "publish", topic=topic, contentTopic=message.contentTopic
|
|
||||||
|
|
||||||
discard await node.wakuRelay.publish(topic, message)
|
discard await node.wakuRelay.publish(topic, message)
|
||||||
|
|
||||||
|
trace "waku.relay published",
|
||||||
|
pubsubTopic=topic,
|
||||||
|
hash=MultiHash.digest("sha2-256", message.encode().buffer).expect("valid hash").data.buffer.to0xHex(), # TODO: this could be replaced by a message UID
|
||||||
|
publishTime=getNowInNanosecondTime()
|
||||||
|
|
||||||
proc startRelay*(node: WakuNode) {.async.} =
|
proc startRelay*(node: WakuNode) {.async.} =
|
||||||
## Setup and start relay protocol
|
## Setup and start relay protocol
|
||||||
info "starting relay protocol"
|
info "starting relay protocol"
|
||||||
|
|||||||
@ -4,27 +4,30 @@ when (NimMajor, NimMinor) < (1, 4):
|
|||||||
else:
|
else:
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/times,
|
std/times,
|
||||||
metrics
|
metrics
|
||||||
|
|
||||||
type Timestamp* = int64
|
type Timestamp* = int64
|
||||||
|
|
||||||
proc getNanosecondTime*[T](timeInSeconds: T): Timestamp =
|
proc getNanosecondTime*[T](timeInSeconds: T): Timestamp =
|
||||||
var ns = Timestamp(timeInSeconds.int64 * 1000_000_000.int64)
|
var ns = Timestamp(timeInSeconds.int64 * 1000_000_000.int64)
|
||||||
return ns
|
return ns
|
||||||
|
|
||||||
proc getMicrosecondTime*[T](timeInSeconds: T): Timestamp =
|
proc getMicrosecondTime*[T](timeInSeconds: T): Timestamp =
|
||||||
var us = Timestamp(timeInSeconds.int64 * 1000_000.int64)
|
var us = Timestamp(timeInSeconds.int64 * 1000_000.int64)
|
||||||
return us
|
return us
|
||||||
|
|
||||||
proc getMillisecondTime*[T](timeInSeconds: T): Timestamp =
|
proc getMillisecondTime*[T](timeInSeconds: T): Timestamp =
|
||||||
var ms = Timestamp(timeInSeconds.int64 * 1000.int64)
|
var ms = Timestamp(timeInSeconds.int64 * 1000.int64)
|
||||||
return ms
|
return ms
|
||||||
|
|
||||||
proc nowInUnixFloat(): float =
|
proc nowInUnixFloat(): float =
|
||||||
return getTime().toUnixFloat()
|
return getTime().toUnixFloat()
|
||||||
|
|
||||||
|
proc getNowInNanosecondTime*(): Timestamp =
|
||||||
|
return getNanosecondTime(nowInUnixFloat())
|
||||||
|
|
||||||
template nanosecondTime*(collector: Summary | Histogram, body: untyped) =
|
template nanosecondTime*(collector: Summary | Histogram, body: untyped) =
|
||||||
when defined(metrics):
|
when defined(metrics):
|
||||||
let start = nowInUnixFloat()
|
let start = nowInUnixFloat()
|
||||||
@ -39,4 +42,4 @@ template nanosecondTime*(collector: Gauge, body: untyped) =
|
|||||||
body
|
body
|
||||||
metrics.set(collector, nowInUnixFloat() - start)
|
metrics.set(collector, nowInUnixFloat() - start)
|
||||||
else:
|
else:
|
||||||
body
|
body
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user