mirror of https://github.com/waku-org/nwaku.git
chore: log enhancement for message reliability analysis (#2640)
* log enhancement for message reliability analysis The next modules are touched: - waku_node.nim - archive.nim - waku_filter_v2/protocol.nim - waku_relay/protocol.nim Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
This commit is contained in:
parent
a9e19efd7a
commit
d5e0e4a9b1
|
@ -227,10 +227,10 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
|
||||||
return
|
return
|
||||||
|
|
||||||
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
||||||
trace "waku.relay received",
|
debug "waku.relay received",
|
||||||
peerId = node.peerId,
|
my_peer_id = node.peerId,
|
||||||
pubsubTopic = topic,
|
pubsubTopic = topic,
|
||||||
hash = topic.computeMessageHash(msg).to0xHex(),
|
msg_hash = topic.computeMessageHash(msg).to0xHex(),
|
||||||
receivedTime = getNowInNanosecondTime(),
|
receivedTime = getNowInNanosecondTime(),
|
||||||
payloadSizeBytes = msg.payload.len
|
payloadSizeBytes = msg.payload.len
|
||||||
|
|
||||||
|
@ -914,7 +914,9 @@ proc mountLightPush*(
|
||||||
|
|
||||||
if publishedCount == 0:
|
if publishedCount == 0:
|
||||||
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
## Agreed change expected to the lightpush protocol to better handle such case. https://github.com/waku-org/pm/issues/93
|
||||||
debug "Lightpush request has not been published to any peers"
|
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||||
|
debug "Lightpush request has not been published to any peers",
|
||||||
|
msg_hash = msgHash
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
|
@ -953,16 +955,21 @@ proc lightpushPublish*(
|
||||||
message: WakuMessage,
|
message: WakuMessage,
|
||||||
peer: RemotePeerInfo,
|
peer: RemotePeerInfo,
|
||||||
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
|
||||||
|
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
|
||||||
if not node.wakuLightpushClient.isNil():
|
if not node.wakuLightpushClient.isNil():
|
||||||
debug "publishing message with lightpush",
|
debug "publishing message with lightpush",
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = message.contentTopic,
|
contentTopic = message.contentTopic,
|
||||||
peer = peer.peerId
|
target_peer_id = peer.peerId,
|
||||||
|
msg_hash = msgHash
|
||||||
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
return await node.wakuLightpushClient.publish(pubsubTopic, message, peer)
|
||||||
|
|
||||||
if not node.wakuLightPush.isNil():
|
if not node.wakuLightPush.isNil():
|
||||||
debug "publishing message with self hosted lightpush",
|
debug "publishing message with self hosted lightpush",
|
||||||
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = message.contentTopic,
|
||||||
|
target_peer_id = peer.peerId,
|
||||||
|
msg_hash = msgHash
|
||||||
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)
|
return await node.wakuLightPush.handleSelfLightPushRequest(pubsubTopic, message)
|
||||||
|
|
||||||
if pubsubTopic.isSome():
|
if pubsubTopic.isSome():
|
||||||
|
|
|
@ -94,7 +94,9 @@ proc handleMessage*(
|
||||||
|
|
||||||
let
|
let
|
||||||
msgDigest = computeDigest(msg)
|
msgDigest = computeDigest(msg)
|
||||||
|
msgDigestHex = msgDigest.data.to0xHex()
|
||||||
msgHash = computeMessageHash(pubsubTopic, msg)
|
msgHash = computeMessageHash(pubsubTopic, msg)
|
||||||
|
msgHashHex = msgHash.to0xHex()
|
||||||
msgTimestamp =
|
msgTimestamp =
|
||||||
if msg.timestamp > 0:
|
if msg.timestamp > 0:
|
||||||
msg.timestamp
|
msg.timestamp
|
||||||
|
@ -102,18 +104,27 @@ proc handleMessage*(
|
||||||
getNanosecondTime(getTime().toUnixFloat())
|
getNanosecondTime(getTime().toUnixFloat())
|
||||||
|
|
||||||
trace "handling message",
|
trace "handling message",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = msg.contentTopic,
|
contentTopic = msg.contentTopic,
|
||||||
msgTimestamp = msg.timestamp,
|
msgTimestamp = msg.timestamp,
|
||||||
usedTimestamp = msgTimestamp,
|
usedTimestamp = msgTimestamp,
|
||||||
digest = toHex(msgDigest.data),
|
digest = msgDigestHex
|
||||||
messageHash = toHex(msgHash)
|
|
||||||
|
|
||||||
let insertStartTime = getTime().toUnixFloat()
|
let insertStartTime = getTime().toUnixFloat()
|
||||||
|
|
||||||
(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
|
(await self.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgTimestamp)).isOkOr:
|
||||||
waku_archive_errors.inc(labelValues = [insertFailure])
|
waku_archive_errors.inc(labelValues = [insertFailure])
|
||||||
debug "failed to insert message", err = error
|
error "failed to insert message", error = error
|
||||||
|
|
||||||
|
debug "message archived",
|
||||||
|
msg_hash = msgHashHex,
|
||||||
|
pubsubTopic = pubsubTopic,
|
||||||
|
contentTopic = msg.contentTopic,
|
||||||
|
msgTimestamp = msg.timestamp,
|
||||||
|
usedTimestamp = msgTimestamp,
|
||||||
|
digest = msgDigestHex
|
||||||
|
|
||||||
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
let insertDuration = getTime().toUnixFloat() - insertStartTime
|
||||||
waku_archive_insert_duration_seconds.observe(insertDuration)
|
waku_archive_insert_duration_seconds.observe(insertDuration)
|
||||||
|
|
||||||
|
|
|
@ -156,7 +156,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||||
|
|
||||||
if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
|
if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec):
|
||||||
# Check that peer has not been removed from peer store
|
# Check that peer has not been removed from peer store
|
||||||
trace "no addresses for peer", peer = peer
|
error "no addresses for peer", peer_id = shortLog(peer)
|
||||||
return
|
return
|
||||||
|
|
||||||
## TODO: Check if dial is necessary always???
|
## TODO: Check if dial is necessary always???
|
||||||
|
@ -164,7 +164,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||||
if conn.isNone():
|
if conn.isNone():
|
||||||
## We do not remove this peer, but allow the underlying peer manager
|
## We do not remove this peer, but allow the underlying peer manager
|
||||||
## to do so if it is deemed necessary
|
## to do so if it is deemed necessary
|
||||||
trace "no connection to peer", peer = peer
|
error "no connection to peer", peer_id = shortLog(peer)
|
||||||
return
|
return
|
||||||
|
|
||||||
await conn.get().writeLp(buffer)
|
await conn.get().writeLp(buffer)
|
||||||
|
@ -172,11 +172,15 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
|
||||||
proc pushToPeers(
|
proc pushToPeers(
|
||||||
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
|
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
|
let targetPeerIds = peers.mapIt(shortLog(it))
|
||||||
|
let msgHash =
|
||||||
|
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
||||||
|
|
||||||
debug "pushing message to subscribed peers",
|
debug "pushing message to subscribed peers",
|
||||||
pubsubTopic = messagePush.pubsubTopic,
|
pubsubTopic = messagePush.pubsubTopic,
|
||||||
contentTopic = messagePush.wakuMessage.contentTopic,
|
contentTopic = messagePush.wakuMessage.contentTopic,
|
||||||
peers = peers,
|
target_peer_ids = targetPeerIds,
|
||||||
hash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()
|
msg_hash = msgHash
|
||||||
|
|
||||||
let bufferToPublish = messagePush.encode().buffer
|
let bufferToPublish = messagePush.encode().buffer
|
||||||
|
|
||||||
|
@ -210,7 +214,10 @@ const MessagePushTimeout = 20.seconds
|
||||||
proc handleMessage*(
|
proc handleMessage*(
|
||||||
wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage
|
wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||||
) {.async.} =
|
) {.async.} =
|
||||||
trace "handling message", pubsubTopic = pubsubTopic, message = message
|
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||||
|
|
||||||
|
debug "handling message",
|
||||||
|
pubsubTopic = pubsubTopic, message = message, msg_hash = msgHash
|
||||||
|
|
||||||
let handleMessageStartTime = Moment.now()
|
let handleMessageStartTime = Moment.now()
|
||||||
|
|
||||||
|
@ -219,7 +226,7 @@ proc handleMessage*(
|
||||||
let subscribedPeers =
|
let subscribedPeers =
|
||||||
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
|
||||||
if subscribedPeers.len == 0:
|
if subscribedPeers.len == 0:
|
||||||
trace "no subscribed peers found",
|
debug "no subscribed peers found",
|
||||||
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
|
pubsubTopic = pubsubTopic, contentTopic = message.contentTopic
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -228,16 +235,20 @@ proc handleMessage*(
|
||||||
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(
|
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(
|
||||||
MessagePushTimeout
|
MessagePushTimeout
|
||||||
):
|
):
|
||||||
debug "timed out pushing message to peers",
|
error "timed out pushing message to peers",
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = message.contentTopic,
|
contentTopic = message.contentTopic,
|
||||||
hash = pubsubTopic.computeMessageHash(message).to0xHex()
|
msg_hash = msgHash,
|
||||||
|
numPeers = subscribedPeers.len,
|
||||||
|
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
|
||||||
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])
|
||||||
else:
|
else:
|
||||||
debug "pushed message succesfully to all subscribers",
|
debug "pushed message succesfully to all subscribers",
|
||||||
pubsubTopic = pubsubTopic,
|
pubsubTopic = pubsubTopic,
|
||||||
contentTopic = message.contentTopic,
|
contentTopic = message.contentTopic,
|
||||||
hash = pubsubTopic.computeMessageHash(message).to0xHex()
|
msg_hash = msgHash,
|
||||||
|
numPeers = subscribedPeers.len,
|
||||||
|
target_peer_ids = subscribedPeers.mapIt(shortLog(it))
|
||||||
|
|
||||||
let
|
let
|
||||||
handleMessageDuration = Moment.now() - handleMessageStartTime
|
handleMessageDuration = Moment.now() - handleMessageStartTime
|
||||||
|
@ -247,14 +258,14 @@ proc handleMessage*(
|
||||||
|
|
||||||
proc initProtocolHandler(wf: WakuFilter) =
|
proc initProtocolHandler(wf: WakuFilter) =
|
||||||
proc handler(conn: Connection, proto: string) {.async.} =
|
proc handler(conn: Connection, proto: string) {.async.} =
|
||||||
trace "filter subscribe request handler triggered", peerId = conn.peerId
|
trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId)
|
||||||
|
|
||||||
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))
|
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))
|
||||||
|
|
||||||
let decodeRes = FilterSubscribeRequest.decode(buf)
|
let decodeRes = FilterSubscribeRequest.decode(buf)
|
||||||
if decodeRes.isErr():
|
if decodeRes.isErr():
|
||||||
error "Failed to decode filter subscribe request",
|
error "Failed to decode filter subscribe request",
|
||||||
peerId = conn.peerId, err = decodeRes.error
|
peer_id = conn.peerId, err = decodeRes.error
|
||||||
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -262,7 +273,8 @@ proc initProtocolHandler(wf: WakuFilter) =
|
||||||
|
|
||||||
let response = wf.handleSubscribeRequest(conn.peerId, request)
|
let response = wf.handleSubscribeRequest(conn.peerId, request)
|
||||||
|
|
||||||
debug "sending filter subscribe response", peerId = conn.peerId, response = response
|
debug "sending filter subscribe response",
|
||||||
|
peer_id = shortLog(conn.peerId), response = response
|
||||||
|
|
||||||
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
|
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
|
||||||
return
|
return
|
||||||
|
|
|
@ -9,7 +9,7 @@ else:
|
||||||
|
|
||||||
import
|
import
|
||||||
std/strformat,
|
std/strformat,
|
||||||
stew/results,
|
stew/[results, byteutils],
|
||||||
sequtils,
|
sequtils,
|
||||||
chronos,
|
chronos,
|
||||||
chronicles,
|
chronicles,
|
||||||
|
@ -201,38 +201,54 @@ proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} =
|
||||||
): Future[ValidationResult] {.async.} =
|
): Future[ValidationResult] {.async.} =
|
||||||
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
# can be optimized by checking if the message is a WakuMessage without allocating memory
|
||||||
# see nim-libp2p protobuf library
|
# see nim-libp2p protobuf library
|
||||||
let msgRes = WakuMessage.decode(message.data)
|
let msg = WakuMessage.decode(message.data).valueOr:
|
||||||
if msgRes.isErr():
|
error "protocol generateOrderedValidator reject decode error",
|
||||||
trace "protocol generateOrderedValidator reject decode error",
|
pubsubTopic = pubsubTopic, error = $error
|
||||||
error = msgRes.error
|
|
||||||
return ValidationResult.Reject
|
return ValidationResult.Reject
|
||||||
let msg = msgRes.get()
|
|
||||||
|
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||||||
|
|
||||||
# now sequentially validate the message
|
# now sequentially validate the message
|
||||||
for (validator, _) in w.wakuValidators:
|
for (validator, _) in w.wakuValidators:
|
||||||
let validatorRes = await validator(pubsubTopic, msg)
|
let validatorRes = await validator(pubsubTopic, msg)
|
||||||
|
|
||||||
if validatorRes != ValidationResult.Accept:
|
if validatorRes != ValidationResult.Accept:
|
||||||
|
error "protocol generateOrderedValidator reject waku validator",
|
||||||
|
msg_hash = msgHash, pubsubTopic = pubsubTopic, validatorRes = validatorRes
|
||||||
|
|
||||||
return validatorRes
|
return validatorRes
|
||||||
|
|
||||||
return ValidationResult.Accept
|
return ValidationResult.Accept
|
||||||
|
|
||||||
return wrappedValidator
|
return wrappedValidator
|
||||||
|
|
||||||
proc validateMessage*(
|
proc validateMessage*(
|
||||||
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
|
w: WakuRelay, pubsubTopic: string, msg: WakuMessage
|
||||||
): Future[Result[void, string]] {.async.} =
|
): Future[Result[void, string]] {.async.} =
|
||||||
let messageSizeBytes = msg.encode().buffer.len
|
let messageSizeBytes = msg.encode().buffer.len
|
||||||
|
let msgHash = computeMessageHash(pubsubTopic, msg).to0xHex()
|
||||||
|
|
||||||
if messageSizeBytes > w.maxMessageSize:
|
if messageSizeBytes > w.maxMessageSize:
|
||||||
let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes"
|
let message = fmt"Message size exceeded maximum of {w.maxMessageSize} bytes"
|
||||||
debug "Invalid Waku Message", error = message
|
error "too large Waku message",
|
||||||
|
msg_hash = msgHash,
|
||||||
|
error = message,
|
||||||
|
messageSizeBytes = messageSizeBytes,
|
||||||
|
maxMessageSize = w.maxMessageSize
|
||||||
|
|
||||||
return err(message)
|
return err(message)
|
||||||
|
|
||||||
for (validator, message) in w.wakuValidators:
|
for (validator, message) in w.wakuValidators:
|
||||||
let validatorRes = await validator(pubsubTopic, msg)
|
let validatorRes = await validator(pubsubTopic, msg)
|
||||||
if validatorRes != ValidationResult.Accept:
|
if validatorRes != ValidationResult.Accept:
|
||||||
if message.len > 0:
|
if message.len > 0:
|
||||||
|
error "invalid Waku message", msg_hash = msgHash, error = message
|
||||||
return err(message)
|
return err(message)
|
||||||
else:
|
else:
|
||||||
return err("Validator failed")
|
## This should never happen
|
||||||
|
error "uncertain invalid Waku message", msg_hash = msgHash, error = message
|
||||||
|
return err("validator failed")
|
||||||
|
|
||||||
return ok()
|
return ok()
|
||||||
|
|
||||||
proc subscribe*(
|
proc subscribe*(
|
||||||
|
@ -248,7 +264,7 @@ proc subscribe*(
|
||||||
if decMsg.isErr():
|
if decMsg.isErr():
|
||||||
# fine if triggerSelf enabled, since validators are bypassed
|
# fine if triggerSelf enabled, since validators are bypassed
|
||||||
error "failed to decode WakuMessage, validator passed a wrong message",
|
error "failed to decode WakuMessage, validator passed a wrong message",
|
||||||
error = decMsg.error
|
pubsubTopic = pubsubTopic, error = decMsg.error
|
||||||
let fut = newFuture[void]()
|
let fut = newFuture[void]()
|
||||||
fut.complete()
|
fut.complete()
|
||||||
return fut
|
return fut
|
||||||
|
@ -288,7 +304,9 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler)
|
||||||
proc publish*(
|
proc publish*(
|
||||||
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
|
w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||||
): Future[int] {.async.} =
|
): Future[int] {.async.} =
|
||||||
trace "publish", pubsubTopic = pubsubTopic
|
|
||||||
let data = message.encode().buffer
|
let data = message.encode().buffer
|
||||||
|
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
|
||||||
|
|
||||||
|
debug "start publish Waku message", msg_hash = msgHash, pubsubTopic = pubsubTopic
|
||||||
|
|
||||||
return await procCall GossipSub(w).publish(pubsubTopic, data)
|
return await procCall GossipSub(w).publish(pubsubTopic, data)
|
||||||
|
|
Loading…
Reference in New Issue