chore: logging received message info via onValidated observer (#2973)

This commit is contained in:
gabrielmer 2024-08-19 14:13:28 +02:00 committed by GitHub
parent e51ffe0759
commit e8bce67d76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 60 additions and 27 deletions

View File

@ -266,14 +266,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msg_hash = topic.computeMessageHash(msg).to0xHex()
notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])

View File

@ -161,6 +161,9 @@ proc installRelayApiHandlers*(
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)
# Log for message tracking purposes
logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true)
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message",
pubSubTopic = pubSubTopic, rln = not node.wakuRlnRelay.isNil()
@ -272,6 +275,9 @@ proc installRelayApiHandlers*(
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)
# Log for message tracking purposes
logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true)
# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message",
contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil()

View File

@ -151,7 +151,36 @@ proc initProtocolHandler(w: WakuRelay) =
w.handler = handler
w.codec = WakuRelayCodec
proc initRelayMetricObserver(w: WakuRelay) =
proc logMessageInfo*(
w: WakuRelay,
remotePeerId: string,
topic: string,
msg_id_short: string,
msg: WakuMessage,
onRecv: bool,
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()
if onRecv:
notice "received relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = remotePeerId,
topic = topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
else:
notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = remotePeerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
proc initRelayObservers(w: WakuRelay) =
proc decodeRpcMessageInfo(
peer: PubSubPeer, msg: Message
): Result[
@ -179,20 +208,6 @@ proc initRelayMetricObserver(w: WakuRelay) =
let msgSize = msg.data.len + msg.topic.len
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))
proc logMessageInfo(
peer: PubSubPeer, topic: string, msg_id_short: string, msg: WakuMessage
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()
notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
proc updateMetrics(
peer: PubSubPeer,
pubsub_topic: string,
@ -208,18 +223,38 @@ proc initRelayMetricObserver(w: WakuRelay) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
continue
# message receive log happens in treaceHandler as this one is called before checks
# message receive log happens in onValidated observer as onRecv is called before checks
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
discard
proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
let msg_id_short = shortLog(msgId)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "onValidated: failed decoding to Waku Message",
my_peer_id = w.switch.peerInfo.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
pubsub_topic = msg.topic,
error = $error
return
logMessageInfo(
w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true
)
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
warn "onSend: failed decoding RPC info",
my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId
continue
logMessageInfo(peer, topic, msg_id_short, wakuMessage)
logMessageInfo(
w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false
)
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)
let administrativeObserver = PubSubObserver(onRecv: onRecv, onSend: onSend)
let administrativeObserver =
PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated)
w.addObserver(administrativeObserver)
@ -243,7 +278,7 @@ proc new*(
procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
w.initRelayMetricObserver()
w.initRelayObservers()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())