fix: revert "chore: adding observers for message logging (#2800)" (#2815)

This commit is contained in:
gabrielmer 2024-06-17 13:14:05 +02:00 committed by GitHub
parent 3b27aee820
commit 93e9ba22aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 66 deletions

View File

@ -225,6 +225,21 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic): if node.wakuRelay.isSubscribed(topic):
return return
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msg_hash = topic.computeMessageHash(msg).to0xHex()
notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil(): if node.wakuFilter.isNil():
return return
@ -240,6 +255,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
let defaultHandler = proc( let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} = ): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg) await filterHandler(topic, msg)
await archiveHandler(topic, msg) await archiveHandler(topic, msg)
@ -376,61 +392,6 @@ proc startRelay*(node: WakuNode) {.async.} =
info "relay started successfully" info "relay started successfully"
proc generateRelayObserver(node: WakuNode): PubSubObserver =
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
for msg in msgs.messages:
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
warn "Error generating message id",
my_peer_id = node.peerId,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue
let msg_id_short = shortLog(msg_id)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
my_peer_id = node.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue
let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()
if onRecv:
notice "received relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len
let msgSizeKB = wakuMessage.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
else:
notice "sent relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = msg.topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = true)
proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
discard
return PubSubObserver(onRecv: onRecv, onSend: onSend)
proc mountRelay*( proc mountRelay*(
node: WakuNode, node: WakuNode,
pubsubTopics: seq[string] = @[], pubsubTopics: seq[string] = @[],
@ -451,11 +412,6 @@ proc mountRelay*(
node.wakuRelay = initRes.value node.wakuRelay = initRes.value
# register relay observers for logging
debug "Registering Relay observers"
let observerLogger = node.generateRelayObserver()
node.wakuRelay.addObserver(observerLogger)
## Add peer exchange handler ## Add peer exchange handler
if peerExchangeHandler.isSome(): if peerExchangeHandler.isSome():
node.wakuRelay.parameters.enablePX = true node.wakuRelay.parameters.enablePX = true

View File

@ -180,9 +180,6 @@ proc addValidator*(
) {.gcsafe.} = ) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage)) w.wakuValidators.add((handler, errorMessage))
proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
procCall GossipSub(w).addObserver(observer)
method start*(w: WakuRelay) {.async, base.} = method start*(w: WakuRelay) {.async, base.} =
debug "start" debug "start"
await procCall GossipSub(w).start() await procCall GossipSub(w).start()