chore: adding observers for message logging (#2800)

This commit is contained in:
gabrielmer 2024-06-13 18:35:56 +02:00 committed by GitHub
parent 15d578ad87
commit b52286524b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 63 additions and 14 deletions

View File

@ -224,19 +224,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic):
return
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = topic.computeMessageHash(msg).to0xHex(),
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
@ -252,7 +239,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
@ -389,6 +375,61 @@ proc startRelay*(node: WakuNode) {.async.} =
info "relay started successfully"
proc generateRelayObserver(node: WakuNode): PubSubObserver =
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
for msg in msgs.messages:
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
warn "Error generating message id",
my_peer_id = node.peerId,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue
let msg_id_short = shortLog(msg_id)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
my_peer_id = node.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue
let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()
if onRecv:
notice "received relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len
let msgSizeKB = wakuMessage.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
else:
notice "sent relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = msg.topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = true)
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
discard
return PubSubObserver(onRecv: onRecv, onSend: onSend)
proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
@ -409,6 +450,11 @@ proc mountRelay*(
node.wakuRelay = initRes.value
# register relay observers for logging
debug "Registering Relay observers"
let observerLogger = node.generateRelayObserver()
node.wakuRelay.addObserver(observerLogger)
## Add peer exchange handler
if peerExchangeHandler.isSome():
node.wakuRelay.parameters.enablePX = true

View File

@ -180,6 +180,9 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
procCall GossipSub(w).addObserver(observer)
method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()