mirror of https://github.com/waku-org/nwaku.git
filter subscriptions: add more debug logs
This commit is contained in:
parent
3ffdaadf2e
commit
7ade979d00
|
@ -1,6 +1,12 @@
|
||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import std/[sets, tables], chronicles, chronos, libp2p/peerid, stew/shims/sets
|
import
|
||||||
|
std/[sets, tables, sequtils],
|
||||||
|
chronicles,
|
||||||
|
chronos,
|
||||||
|
libp2p/peerid,
|
||||||
|
libp2p/stream/connection,
|
||||||
|
stew/shims/sets
|
||||||
import ../waku_core, ../utils/tableutils, ../common/rate_limit/setting
|
import ../waku_core, ../utils/tableutils, ../common/rate_limit/setting
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
|
@ -83,14 +89,30 @@ proc findSubscribedPeers*(
|
||||||
return foundPeers
|
return foundPeers
|
||||||
|
|
||||||
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
|
proc removePeer*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||||
|
debug "removePeer",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
|
||||||
|
|
||||||
## Remove all subscriptions for a given peer
|
## Remove all subscriptions for a given peer
|
||||||
s.peersSubscribed.del(peerId)
|
s.peersSubscribed.del(peerId)
|
||||||
|
|
||||||
|
debug "removePeer after deletion",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)), peerId = peerId
|
||||||
|
|
||||||
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
proc removePeers*(s: var FilterSubscriptions, peerIds: seq[PeerID]) =
|
||||||
|
debug "removePeers",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
|
||||||
|
peerIds = peerIds.mapIt(shortLog(it))
|
||||||
|
|
||||||
## Remove all subscriptions for a given list of peers
|
## Remove all subscriptions for a given list of peers
|
||||||
s.peersSubscribed.keepItIf(key notin peerIds)
|
s.peersSubscribed.keepItIf(key notin peerIds)
|
||||||
|
|
||||||
|
debug "removePeers after deletion",
|
||||||
|
currentPeerIds = toSeq(s.peersSubscribed.keys).mapIt(shortLog(it)),
|
||||||
|
peerIds = peerIds.mapIt(shortLog(it))
|
||||||
|
|
||||||
proc cleanUp*(fs: var FilterSubscriptions) =
|
proc cleanUp*(fs: var FilterSubscriptions) =
|
||||||
|
debug "cleanUp", currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
|
||||||
|
|
||||||
## Remove all subscriptions for peers that have not been seen for a while
|
## Remove all subscriptions for peers that have not been seen for a while
|
||||||
let now = Moment.now()
|
let now = Moment.now()
|
||||||
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
|
fs.peersSubscribed.keepItIf(now - val.lastSeen <= fs.subscriptionTimeout)
|
||||||
|
@ -101,6 +123,9 @@ proc cleanUp*(fs: var FilterSubscriptions) =
|
||||||
|
|
||||||
fs.subscriptions.keepItIf(val.len > 0)
|
fs.subscriptions.keepItIf(val.len > 0)
|
||||||
|
|
||||||
|
debug "after cleanUp",
|
||||||
|
currentPeerIds = toSeq(fs.peersSubscribed.keys).mapIt(shortLog(it))
|
||||||
|
|
||||||
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
proc refreshSubscription*(s: var FilterSubscriptions, peerId: PeerID) =
|
||||||
s.peersSubscribed.withValue(peerId, data):
|
s.peersSubscribed.withValue(peerId, data):
|
||||||
data.lastSeen = Moment.now()
|
data.lastSeen = Moment.now()
|
||||||
|
@ -120,7 +145,7 @@ proc addSubscription*(
|
||||||
do:
|
do:
|
||||||
## not yet subscribed
|
## not yet subscribed
|
||||||
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
if cast[uint](s.peersSubscribed.len) >= s.maxPeers:
|
||||||
return err("node has reached maximum number of subscriptions")
|
return err("node has reached maximum number of subscriptions: " & $(s.maxPeers))
|
||||||
|
|
||||||
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
|
let newPeerData: PeerData = (lastSeen: Moment.now(), criteriaCount: 0)
|
||||||
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
peerData = addr(s.peersSubscribed.mgetOrPut(peerId, newPeerData))
|
||||||
|
|
Loading…
Reference in New Issue