diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 8d1ff8d79..1a9d88041 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -392,6 +392,61 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" +proc generateRelayObserver(node: WakuNode): PubSubObserver = + proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = + for msg in msgs.messages: + let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr: + warn "Error generating message id", + my_peer_id = node.peerId, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_id_short = shortLog(msg_id) + + let wakuMessage = WakuMessage.decode(msg.data).valueOr: + warn "Error decoding to Waku Message", + my_peer_id = node.peerId, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() + + if onRecv: + notice "received relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + let msgSizeKB = wakuMessage.payload.len / 1000 + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) + else: + notice "sent relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + to_peer_id = peer.peerId, + topic = msg.topic, + sentTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + discard + + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = + logMessageInfo(peer, msgs, onRecv = false) + + return PubSubObserver(onRecv: onRecv, onSend: onSend) + proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], @@ -412,6 +467,11 @@ proc mountRelay*( node.wakuRelay = initRes.value + # register relay observers for logging + debug "Registering Relay observers" + let observerLogger = node.generateRelayObserver() + node.wakuRelay.addObserver(observerLogger) + ## Add peer exchange handler if peerExchangeHandler.isSome(): node.wakuRelay.parameters.enablePX = true diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 03d5b596e..ca59a6899 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -180,6 +180,9 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + procCall GossipSub(w).addObserver(observer) + method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start()