mirror of
https://github.com/status-im/nim-libp2p.git
synced 2025-02-03 08:34:49 +00:00
backoff time management
This commit is contained in:
parent
ce61d84db4
commit
c6fc8dee54
@ -54,6 +54,9 @@ const
|
|||||||
const
|
const
|
||||||
GossipSubFanoutTTL* = 1.minutes
|
GossipSubFanoutTTL* = 1.minutes
|
||||||
|
|
||||||
|
const
|
||||||
|
BackoffSlackTime = 2 # seconds
|
||||||
|
|
||||||
type
|
type
|
||||||
TopicInfo* = object
|
TopicInfo* = object
|
||||||
# gossip 1.1 related
|
# gossip 1.1 related
|
||||||
@ -129,7 +132,7 @@ type
|
|||||||
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
||||||
gossipsub*: PeerTable # peers that are subscribed to a topic
|
gossipsub*: PeerTable # peers that are subscribed to a topic
|
||||||
explicit*: PeerTable # directpeers that we keep alive explicitly
|
explicit*: PeerTable # directpeers that we keep alive explicitly
|
||||||
backingOff*: HashSet[PeerID] # explicit (always connected/forward) peers
|
backingOff*: Table[PeerID, Moment] # explicit (always connected/forward) peers
|
||||||
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||||
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||||
control*: Table[string, ControlMessage] # pending control messages
|
control*: Table[string, ControlMessage] # pending control messages
|
||||||
@ -615,6 +618,15 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||||||
try:
|
try:
|
||||||
trace "running heartbeat", instance = cast[int](g)
|
trace "running heartbeat", instance = cast[int](g)
|
||||||
|
|
||||||
|
# remove expired backoffs
|
||||||
|
block:
|
||||||
|
let now = Moment.now()
|
||||||
|
var expired = toSeq(g.backingOff.pairs())
|
||||||
|
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
|
||||||
|
now >= pair.expire
|
||||||
|
for (peer, _) in expired:
|
||||||
|
g.backingOff.del(peer)
|
||||||
|
|
||||||
g.updateScores()
|
g.updateScores()
|
||||||
|
|
||||||
for t in toSeq(g.topics.keys):
|
for t in toSeq(g.topics.keys):
|
||||||
@ -770,14 +782,27 @@ proc handleGraft(g: GossipSub,
|
|||||||
|
|
||||||
trace "peer grafted topic"
|
trace "peer grafted topic"
|
||||||
|
|
||||||
# TODO
|
# It is an error to GRAFT on a explicit peer
|
||||||
# # It is an error to GRAFT on a explicit peer
|
if peer.peerId in g.parameters.directPeers:
|
||||||
# if peer.peerInfo.maintain:
|
trace "attempt to graft an explicit peer", peer=peer.id,
|
||||||
# trace "attempt to graft an explicit peer", peer=peer.id,
|
topicID=graft.topicID
|
||||||
# topicID=graft.topicID
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
# # and such an attempt should be logged and rejected with a PRUNE
|
result.add(ControlPrune(
|
||||||
# result.add(ControlPrune(topicID: graft.topicID))
|
topicID: graft.topicID,
|
||||||
# continue
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
continue
|
||||||
|
|
||||||
|
if peer.peerId in g.backingOff:
|
||||||
|
trace "attempt to graft an backingOff peer", peer=peer.id,
|
||||||
|
topicID=graft.topicID,
|
||||||
|
expire=g.backingOff[peer.peerId]
|
||||||
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
|
result.add(ControlPrune(
|
||||||
|
topicID: graft.topicID,
|
||||||
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
continue
|
||||||
|
|
||||||
# If they send us a graft before they send us a subscribe, what should
|
# If they send us a graft before they send us a subscribe, what should
|
||||||
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
||||||
@ -815,8 +840,16 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
|||||||
for prune in prunes:
|
for prune in prunes:
|
||||||
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
|
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
|
||||||
|
|
||||||
|
# add peer backoff
|
||||||
|
if prune.backoff > 0:
|
||||||
|
let backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds)
|
||||||
|
let current = g.backingOff.getOrDefault(peer.peerId)
|
||||||
|
if backoff > current:
|
||||||
|
g.backingOff[peer.peerId] = backoff
|
||||||
|
|
||||||
g.pruned(peer, prune.topicID)
|
g.pruned(peer, prune.topicID)
|
||||||
g.mesh.removePeer(prune.topicID, peer)
|
g.mesh.removePeer(prune.topicID, peer)
|
||||||
|
|
||||||
when defined(libp2p_expensive_metrics):
|
when defined(libp2p_expensive_metrics):
|
||||||
libp2p_gossipsub_peers_per_topic_mesh
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
|
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
|
||||||
|
Loading…
x
Reference in New Issue
Block a user