temporary logs

This commit is contained in:
Ivan Folgueira Bande 2025-04-11 13:08:05 +02:00
parent 2786ef6079
commit a4febc88a7
3 changed files with 62 additions and 26 deletions

View File

@ -260,19 +260,28 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic):
return
proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
proc traceHandler(
topic: PubsubTopic, msg: WakuMessage, msgHash: string
) {.async, gcsafe.} =
let msgSizeKB = msg.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
debug "AAAAAAA traceHandler", msg_hash = msgHash, msgSizeKB
proc filterHandler(
topic: PubsubTopic, msg: WakuMessage, msgHash: string
) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
debug "AAAAAAA filterHandler", msg_hash = msgHash
await node.wakuFilter.handleMessage(topic, msg)
proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
proc archiveHandler(
topic: PubsubTopic, msg: WakuMessage, msgHash: string
) {.async, gcsafe.} =
if not node.wakuLegacyArchive.isNil():
## we try to store with legacy archive
await node.wakuLegacyArchive.handleMessage(topic, msg)
@ -281,21 +290,27 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) =
if node.wakuArchive.isNil():
return
debug "AAAAAAA archiveHandler", msg_hash = msgHash
await node.wakuArchive.handleMessage(topic, msg)
proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
proc syncHandler(
topic: PubsubTopic, msg: WakuMessage, msgHash: string
) {.async, gcsafe.} =
if node.wakuStoreReconciliation.isNil():
return
debug "AAAAAAA syncHandler", msg_hash = msgHash
node.wakuStoreReconciliation.messageIngress(topic, msg)
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
let msgHash = computeMessageHash(topic, msg).to0xHex()
debug "AAAAAAA waku_node", msg_hash = msgHash
await traceHandler(topic, msg, msgHash)
await filterHandler(topic, msg, msgHash)
await archiveHandler(topic, msg, msgHash)
await syncHandler(topic, msg, msgHash)
discard node.wakuRelay.subscribe(topic, defaultHandler)

View File

@ -52,13 +52,13 @@ proc subscribe(
): Future[FilterSubscribeResult] {.async.} =
# TODO: check if this condition is valid???
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
error "AAAAAAA pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)
if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
error "AAAAAAA exceeds maximum content topics", peerId = peerId
return err(
FilterSubscribeError.badRequest(
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
@ -67,13 +67,16 @@ proc subscribe(
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
debug "subscribing peer to filter criteria",
debug "AAAAAAA subscribing peer to filter criteria",
peerId = peerId, filterCriteria = filterCriteria
(await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr:
debug "AAAAAAA service unavailable",
peerId = peerId, filterCriteria = filterCriteria, error = $error
return err(FilterSubscribeError.serviceUnavailable(error))
debug "correct subscription", peerId = peerId
debug "AAAAAAA correct subscription", peerId = peerId
ok()
@ -83,14 +86,16 @@ proc unsubscribe(
pubsubTopic: Option[PubsubTopic],
contentTopics: seq[ContentTopic],
): FilterSubscribeResult =
debug "AAAAAAA unsubscribing peer from filter criteria", peerId = peerId
if pubsubTopic.isNone() or contentTopics.len == 0:
error "pubsubTopic and contentTopics must be specified", peerId = peerId
error "AAAAAAA pubsubTopic and contentTopics must be specified", peerId = peerId
return err(
FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")
)
if contentTopics.len > MaxContentTopicsPerRequest:
error "exceeds maximum content topics", peerId = peerId
error "AAAAAAA exceeds maximum content topics", peerId = peerId
return err(
FilterSubscribeError.badRequest(
"exceeds maximum content topics: " & $MaxContentTopicsPerRequest
@ -99,16 +104,16 @@ proc unsubscribe(
let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))
debug "unsubscribing peer from filter criteria",
debug "AAAAAAA unsubscribing peer from filter criteria",
peerId = peerId, filterCriteria = filterCriteria
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
error "failed to remove subscription", error = $error
error "AAAAAAA failed to remove subscription", error = $error
return err(FilterSubscribeError.notFound())
## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop
## We remove only if peerManager removes the peer
debug "correct unsubscription", peerId = peerId
debug "AAAAAAA correct unsubscription", peerId = peerId
ok()

View File

@ -85,7 +85,7 @@ proc findSubscribedPeers*(
if s.isSubscribed(peer):
foundPeers.add(peer)
debug "findSubscribedPeers result",
debug "AAAAAAA findSubscribedPeers result",
filter_criterion = filterCriterion,
subscr_set = s.subscriptions,
found_peers = foundPeers
@ -94,29 +94,30 @@ proc findSubscribedPeers*(
proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} =
## Remove all subscriptions for a given peer
debug "removePeer",
debug "AAAAAAA removePeer",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
s.peersSubscribed.del(peerId)
debug "removePeer after deletion",
debug "AAAAAAA removePeer after deletion",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
proc removePeers*(s: FilterSubscriptions, peerIds: seq[PeerID]) {.async.} =
## Remove all subscriptions for a given list of peers
debug "removePeers",
debug "AAAAAAA removePeers",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
peerIds = peerIds.mapIt(shortLog(it))
for peer in peerIds:
await s.removePeer(peer)
debug "removePeers after deletion",
debug "AAAAAAA removePeers after deletion",
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
peerIds = peerIds.mapIt(shortLog(it))
proc cleanUp*(fs: FilterSubscriptions) =
debug "cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
debug "AAAAAAA cleanUp",
currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
## Remove all subscriptions for peers that have not been seen for a while
let now = Moment.now()
@ -128,7 +129,7 @@ proc cleanUp*(fs: FilterSubscriptions) =
fs.subscriptions.keepItIf(val.len > 0)
debug "after cleanUp",
debug "AAAAAAA after cleanUp",
currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
@ -142,15 +143,24 @@ proc addSubscription*(
var peerData: ptr PeerData
debug "AAAAAAA addSubscription", peerId = shortLog(peerId)
s.peersSubscribed.withValue(peerId, data):
if data.criteriaCount + cast[uint](filterCriteria.len) > s.maxCriteriaPerPeer:
return err("peer has reached maximum number of filter criteria")
data.lastSeen = Moment.now()
debug "AAAAAAA peer already subscribed",
peerID = shortLog(peerId),
criteriaCount = data.criteriaCount,
maxCriteriaPerPeer = s.maxCriteriaPerPeer
peerData = data
do:
## not yet subscribed
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
debug "AAAAAAA node max number susbcriptions",
peerID = shortLog(peerId), maxPeers = $(s.maxPeers)
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
@ -162,8 +172,10 @@ proc addSubscription*(
peersOfSub[].incl(peerId)
peerData.criteriaCount += 1
debug "subscription added correctly",
new_peer = shortLog(peerId), subscr_set = s.subscriptions
debug "AAAAAAA subscription added correctly",
peerId = shortLog(peerId),
subscr_set = s.subscriptions,
criteriaCount = peerData.criteriaCount
return ok()
@ -172,6 +184,8 @@ proc removeSubscription*(
): Result[void, string] =
## Remove a subscription for a given peer
debug "AAAAAAA removeSubscription", peerId = shortLog(peerID)
s.peersSubscribed.withValue(peerId, peerData):
peerData.lastSeen = Moment.now()
for filterCriterion in filterCriteria:
@ -185,10 +199,12 @@ proc removeSubscription*(
s.peersSubscribed.del(peerId)
do:
## Maybe let just run through and log it as a warning
debug "AAAAAAA Peer was not subscribed to criterion", peerId = shortLog(peerID)
return err("Peer was not subscribed to criterion")
return ok()
do:
debug "AAAAAAA Peer has no subscriptions", peerId = shortLog(peerID)
return err("Peer has no subscriptions")
proc setSubscriptionTimeout*(s: FilterSubscriptions, newTimeout: Duration) =