diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 0a7abb4b0..3dd514f6c 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -227,10 +227,10 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = return proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - trace "waku.relay received", - peerId = node.peerId, + debug "waku.relay received", + my_peer_id = node.peerId, pubsubTopic = topic, - hash = topic.computeMessageHash(msg).to0xHex(), + msg_hash = topic.computeMessageHash(msg).to0xHex(), receivedTime = getNowInNanosecondTime(), payloadSizeBytes = msg.payload.len @@ -914,7 +914,9 @@ proc mountLightPush*( if publishedCount == 0: ## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93 - debug "Lightpush request has not been published to any peers" + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + debug "Lightpush request has not been published to any peers", + msg_hash = msgHash return ok() @@ -953,16 +955,21 @@ proc lightpushPublish*( message: WakuMessage, peer: RemotePeerInfo, ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + let msgHash = pubsubTopic.computeMessageHash(message).to0xHex() if not node.wakuLightpushClient.isNil(): debug "publishing message with lightpush", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, - peer = peer.peerId + target_peer_id = peer.peerId, + msg_hash = msgHash return await node.wakuLightpushClient.publish(pubsubTopic, message, peer) if not node.wakuLightPush.isNil(): debug "publishing message with self hosted lightpush", - pubsubTopic = pubsubTopic, contentTopic = message.contentTopic + pubsubTopic = pubsubTopic, + contentTopic = message.contentTopic, + target_peer_id = peer.peerId, + msg_hash = msgHash return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message) if pubsubTopic.isSome(): diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index efbf17e0a..1829ff055 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -94,7 +94,9 @@ proc handleMessage*( let msgDigest = computeDigest(msg) + msgDigestHex = msgDigest.data.to0xHex() msgHash = computeMessageHash(pubsubTopic, msg) + msgHashHex = msgHash.to0xHex() msgTimestamp = if msg.timestamp > 0: msg.timestamp @@ -102,18 +104,27 @@ proc handleMessage*( getNanosecondTime(getTime().toUnixFloat()) trace "handling message", + msg_hash = msgHashHex, pubsubTopic = pubsubTopic, contentTopic = msg.contentTopic, msgTimestamp = msg.timestamp, usedTimestamp = msgTimestamp, - digest = toHex(msgDigest.data), - messageHash = toHex(msgHash) + digest = msgDigestHex let insertStartTime = getTime().toUnixFloat() (await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr: waku_archive_errors.inc(labelValues = [insertFailure]) - debug "failed to insert message", err = error + error "failed to insert message", error = error + + debug "message archived", + msg_hash = msgHashHex, + pubsubTopic = pubsubTopic, + contentTopic = msg.contentTopic, + msgTimestamp = msg.timestamp, + usedTimestamp = msgTimestamp, + digest = msgDigestHex + let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 11dcca6c8..a072a9987 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -156,7 +156,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec): # Check that peer has not been removed from peer store - trace "no addresses for peer", peer = peer + error "no addresses for peer", peer_id = shortLog(peer) return ## TODO: Check if dial is necessary always??? @@ -164,7 +164,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = if conn.isNone(): ## We do not remove this peer, but allow the underlying peer manager ## to do so if it is deemed necessary - trace "no connection to peer", peer = peer + error "no connection to peer", peer_id = shortLog(peer) return await conn.get().writeLp(buffer) @@ -172,11 +172,15 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = proc pushToPeers( wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush ) {.async.} = + let targetPeerIds = peers.mapIt(shortLog(it)) + let msgHash = + messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() + debug "pushing message to subscribed peers", pubsubTopic = messagePush.pubsubTopic, contentTopic = messagePush.wakuMessage.contentTopic, - peers = peers, - hash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() + target_peer_ids = targetPeerIds, + msg_hash = msgHash let bufferToPublish = messagePush.encode().buffer @@ -210,7 +214,10 @@ const MessagePushTimeout = 20.seconds proc handleMessage*( wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage ) {.async.} = - trace "handling message", pubsubTopic = pubsubTopic, message = message + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + + debug "handling message", + pubsubTopic = pubsubTopic, message = message, msg_hash = msgHash let handleMessageStartTime = Moment.now() @@ -219,7 +226,7 @@ proc handleMessage*( let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) if subscribedPeers.len == 0: - trace "no subscribed peers found", + debug "no subscribed peers found", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic return @@ -228,16 +235,20 @@ proc handleMessage*( if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout( MessagePushTimeout ): - debug "timed out pushing message to peers", + error "timed out pushing message to peers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = msgHash, + numPeers = subscribedPeers.len, + target_peer_ids = subscribedPeers.mapIt(shortLog(it)) waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: debug "pushed message succesfully to all subscribers", pubsubTopic = pubsubTopic, contentTopic = message.contentTopic, - hash = pubsubTopic.computeMessageHash(message).to0xHex() + msg_hash = msgHash, + numPeers = subscribedPeers.len, + target_peer_ids = subscribedPeers.mapIt(shortLog(it)) let handleMessageDuration = Moment.now() - handleMessageStartTime @@ -247,14 +258,14 @@ proc handleMessage*( proc initProtocolHandler(wf: WakuFilter) = proc handler(conn: Connection, proto: string) {.async.} = - trace "filter subscribe request handler triggered", peerId = conn.peerId + trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId) let buf = await conn.readLp(int(DefaultMaxSubscribeSize)) let decodeRes = FilterSubscribeRequest.decode(buf) if decodeRes.isErr(): error "Failed to decode filter subscribe request", - peerId = conn.peerId, err = decodeRes.error + peer_id = conn.peerId, err = decodeRes.error waku_filter_errors.inc(labelValues = [decodeRpcFailure]) return @@ -262,7 +273,8 @@ proc initProtocolHandler(wf: WakuFilter) = let response = wf.handleSubscribeRequest(conn.peerId, request) - debug "sending filter subscribe response", peerId = conn.peerId, response = response + debug "sending filter subscribe response", + peer_id = shortLog(conn.peerId), response = response await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here return diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index f2537c58a..171b1deeb 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -9,7 +9,7 @@ else: import std/strformat, - stew/results, + stew/[results, byteutils], sequtils, chronos, chronicles, @@ -201,38 +201,54 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = ): Future[ValidationResult] {.async.} = # can be optimized by checking if the message is a WakuMessage without allocating memory # see nim-libp2p protobuf library - let msgRes = WakuMessage.decode(message.data) - if msgRes.isErr(): - trace "protocol generateOrderedValidator reject decode error", - error = msgRes.error + let msg = WakuMessage.decode(message.data).valueOr: + error "protocol generateOrderedValidator reject decode error", + pubsubTopic = pubsubTopic, error = $error return ValidationResult.Reject - let msg = msgRes.get() + + let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() # now sequentially validate the message for (validator, _) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) + if validatorRes != ValidationResult.Accept: + error "protocol generateOrderedValidator reject waku validator", + msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes + return validatorRes + return ValidationResult.Accept + return wrappedValidator proc validateMessage*( w: WakuRelay, pubsubTopic: string, msg: WakuMessage ): Future[Result[void, string]] {.async.} = let messageSizeBytes = msg.encode().buffer.len + let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex() if messageSizeBytes > w.maxMessageSize: let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes" - debug "Invalid Waku Message", error = message + error "too large Waku message", + msg_hash = msgHash, + error = message, + messageSizeBytes = messageSizeBytes, + maxMessageSize = w.maxMessageSize + return err(message) for (validator, message) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: if message.len > 0: + error "invalid Waku message", msg_hash = msgHash, error = message return err(message) else: - return err("Validator failed") + ## This should never happen + error "uncertain invalid Waku message", msg_hash = msgHash, error = message + return err("validator failed") + return ok() proc subscribe*( @@ -248,7 +264,7 @@ proc subscribe*( if decMsg.isErr(): # fine if triggerSelf enabled, since validators are bypassed error "failed to decode WakuMessage, validator passed a wrong message", - error = decMsg.error + pubsubTopic = pubsubTopic, error = decMsg.error let fut = newFuture[void]() fut.complete() return fut @@ -288,7 +304,9 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage ): Future[int] {.async.} = - trace "publish", pubsubTopic = pubsubTopic let data = message.encode().buffer + let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() + + debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic return await procCall GossipSub(w).publish(pubsubTopic, data)