diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index a544bdc80..6bcebec39 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -260,19 +260,28 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return - proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + proc traceHandler( + topic: PubsubTopic, msg: WakuMessage, msgHash: string + ) {.async, gcsafe.} = let msgSizeKB = msg.payload.len / 1000 waku_node_messages.inc(labelValues = ["relay"]) waku_histogram_message_size.observe(msgSizeKB) - proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + debug "AAAAAAA traceHandler", msg_hash = msgHash, msgSizeKB + + proc filterHandler( + topic: PubsubTopic, msg: WakuMessage, msgHash: string + ) {.async, gcsafe.} = if node.wakuFilter.isNil(): return + debug "AAAAAAA filterHandler", msg_hash = msgHash await node.wakuFilter.handleMessage(topic, msg) - proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + proc archiveHandler( + topic: PubsubTopic, msg: WakuMessage, msgHash: string + ) {.async, gcsafe.} = if not node.wakuLegacyArchive.isNil(): ## we try to store with legacy archive await node.wakuLegacyArchive.handleMessage(topic, msg) @@ -281,21 +290,27 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) = if node.wakuArchive.isNil(): return + debug "AAAAAAA archiveHandler", msg_hash = msgHash await node.wakuArchive.handleMessage(topic, msg) - proc syncHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + proc syncHandler( + topic: PubsubTopic, msg: WakuMessage, msgHash: string + ) {.async, gcsafe.} = if node.wakuStoreReconciliation.isNil(): return + debug "AAAAAAA syncHandler", msg_hash = msgHash node.wakuStoreReconciliation.messageIngress(topic, msg) let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = - await traceHandler(topic, msg) - await filterHandler(topic, msg) - await archiveHandler(topic, msg) - await syncHandler(topic, msg) + let msgHash = computeMessageHash(topic, msg).to0xHex() + debug "AAAAAAA waku_node", msg_hash = msgHash + await traceHandler(topic, msg, msgHash) + await filterHandler(topic, msg, msgHash) + await archiveHandler(topic, msg, msgHash) + await syncHandler(topic, msg, msgHash) discard node.wakuRelay.subscribe(topic, defaultHandler) diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index c3a4683f7..5ffbf4e39 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -52,13 +52,13 @@ proc subscribe( ): Future[FilterSubscribeResult] {.async.} = # TODO: check if this condition is valid??? if pubsubTopic.isNone() or contentTopics.len == 0: - error "pubsubTopic and contentTopics must be specified", peerId = peerId + error "AAAAAAA pubsubTopic and contentTopics must be specified", peerId = peerId return err( FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified") ) if contentTopics.len > MaxContentTopicsPerRequest: - error "exceeds maximum content topics", peerId = peerId + error "AAAAAAA exceeds maximum content topics", peerId = peerId return err( FilterSubscribeError.badRequest( "exceeds maximum content topics: " & $MaxContentTopicsPerRequest @@ -67,13 +67,16 @@ proc subscribe( let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) - debug "subscribing peer to filter criteria", + debug "AAAAAAA subscribing peer to filter criteria", peerId = peerId, filterCriteria = filterCriteria (await wf.subscriptions.addSubscription(peerId, filterCriteria)).isOkOr: + debug "AAAAAAA service unavailable", + peerId = peerId, filterCriteria = filterCriteria, error = $error + return err(FilterSubscribeError.serviceUnavailable(error)) - debug "correct subscription", peerId = peerId + debug "AAAAAAA correct subscription", peerId = peerId ok() @@ -83,14 +86,16 @@ proc unsubscribe( pubsubTopic: Option[PubsubTopic], contentTopics: seq[ContentTopic], ): FilterSubscribeResult = + debug "AAAAAAA unsubscribing peer from filter criteria", peerId = peerId + if pubsubTopic.isNone() or contentTopics.len == 0: - error "pubsubTopic and contentTopics must be specified", peerId = peerId + error "AAAAAAA pubsubTopic and contentTopics must be specified", peerId = peerId return err( FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified") ) if contentTopics.len > MaxContentTopicsPerRequest: - error "exceeds maximum content topics", peerId = peerId + error "AAAAAAA exceeds maximum content topics", peerId = peerId return err( FilterSubscribeError.badRequest( "exceeds maximum content topics: " & $MaxContentTopicsPerRequest @@ -99,16 +104,16 @@ proc unsubscribe( let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) - debug "unsubscribing peer from filter criteria", + debug "AAAAAAA unsubscribing peer from filter criteria", peerId = peerId, filterCriteria = filterCriteria wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr: - error "failed to remove subscription", error = $error + error "AAAAAAA failed to remove subscription", error = $error return err(FilterSubscribeError.notFound()) ## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop ## We remove only if peerManager removes the peer - debug "correct unsubscription", peerId = peerId + debug "AAAAAAA correct unsubscription", peerId = peerId ok() diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index 8d3b8084f..a899f6fab 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -85,7 +85,7 @@ proc findSubscribedPeers*( if s.isSubscribed(peer): foundPeers.add(peer) - debug "findSubscribedPeers result", + debug "AAAAAAA findSubscribedPeers result", filter_criterion = filterCriterion, subscr_set = s.subscriptions, found_peers = foundPeers @@ -94,29 +94,30 @@ proc findSubscribedPeers*( proc removePeer*(s: FilterSubscriptions, peerId: PeerID) {.async.} = ## Remove all subscriptions for a given peer - debug "removePeer", + debug "AAAAAAA removePeer", currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId s.peersSubscribed.del(peerId) - debug "removePeer after deletion", + debug "AAAAAAA removePeer after deletion", currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId proc removePeers*(s: FilterSubscriptions, peerIds: seq[PeerID]) {.async.} = ## Remove all subscriptions for a given list of peers - debug "removePeers", + debug "AAAAAAA removePeers", currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerIds = peerIds.mapIt(shortLog(it)) for peer in peerIds: await s.removePeer(peer) - debug "removePeers after deletion", + debug "AAAAAAA removePeers after deletion", currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerIds = peerIds.mapIt(shortLog(it)) proc cleanUp*(fs: FilterSubscriptions) = - debug "cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it)) + debug "AAAAAAA cleanUp", + currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it)) ## Remove all subscriptions for peers that have not been seen for a while let now = Moment.now() @@ -128,7 +129,7 @@ proc cleanUp*(fs: FilterSubscriptions) = fs.subscriptions.keepItIf(val.len > 0) - debug "after cleanUp", + debug "AAAAAAA after cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it)) proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) = @@ -142,15 +143,24 @@ proc addSubscription*( var peerData: ptr PeerData + debug "AAAAAAA addSubscription", peerId = shortLog(peerId) + s.peersSubscribed.withValue(peerId, data): if data.criteriaCount + cast[uint](filterCriteria.len) > s.maxCriteriaPerPeer: return err("peer has reached maximum number of filter criteria") data.lastSeen = Moment.now() + debug "AAAAAAA peer already subscribed", + peerID = shortLog(peerId), + criteriaCount = data.criteriaCount, + maxCriteriaPerPeer = s.maxCriteriaPerPeer + peerData = data do: ## not yet subscribed if cast[uint](s.peersSubscribed.len) >= s.maxPeers: + debug "AAAAAAA node max number susbcriptions", + peerID = shortLog(peerId), maxPeers = $(s.maxPeers) return err("node has reached maximum number of subscriptions: " & $(s.maxPeers)) let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0) @@ -162,8 +172,10 @@ proc addSubscription*( peersOfSub[].incl(peerId) peerData.criteriaCount += 1 - debug "subscription added correctly", - new_peer = shortLog(peerId), subscr_set = s.subscriptions + debug "AAAAAAA subscription added correctly", + peerId = shortLog(peerId), + subscr_set = s.subscriptions, + criteriaCount = peerData.criteriaCount return ok() @@ -172,6 +184,8 @@ proc removeSubscription*( ): Result[void, string] = ## Remove a subscription for a given peer + debug "AAAAAAA removeSubscription", peerId = shortLog(peerID) + s.peersSubscribed.withValue(peerId, peerData): peerData.lastSeen = Moment.now() for filterCriterion in filterCriteria: @@ -185,10 +199,12 @@ proc removeSubscription*( s.peersSubscribed.del(peerId) do: ## Maybe let just run through and log it as a warning + debug "AAAAAAA Peer was not subscribed to criterion", peerId = shortLog(peerID) return err("Peer was not subscribed to criterion") return ok() do: + debug "AAAAAAA Peer has no subscriptions", peerId = shortLog(peerID) return err("Peer has no subscriptions") proc setSubscriptionTimeout*(s: FilterSubscriptions, newTimeout: Duration) =