Gossipsub scoring fixes (#508)
* Fix some problematics when running with full scoring * more fixes
This commit is contained in:
parent
0959877b29
commit
1d77d37f17
|
@ -151,12 +151,14 @@ type
|
|||
|
||||
disconnectBadPeers*: bool
|
||||
|
||||
BackoffTable = Table[string, Table[PeerID, Moment]]
|
||||
|
||||
GossipSub* = ref object of FloodSub
|
||||
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
|
||||
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
||||
gossipsub*: PeerTable # peers that are subscribed to a topic
|
||||
explicit*: PeerTable # directpeers that we keep alive explicitly
|
||||
backingOff*: Table[PeerID, Moment] # explicit (always connected/forward) peers
|
||||
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
|
||||
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||
control*: Table[string, ControlMessage] # pending control messages
|
||||
|
@ -200,6 +202,13 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub,
|
|||
declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish")
|
||||
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
|
||||
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||
declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])
|
||||
declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow")
|
||||
declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout")
|
||||
|
@ -365,11 +374,16 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
|
|||
g.grafted(p, topic)
|
||||
|
||||
proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
|
||||
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
|
||||
g.backingOff
|
||||
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||
.mgetOrPut(p.peerId, backoff) = backoff
|
||||
|
||||
g.peerStats.withValue(p.peerId, stats):
|
||||
if topic in stats.topicInfos:
|
||||
var info = stats.topicInfos[topic]
|
||||
let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init())
|
||||
|
||||
if topic in g.topicParams:
|
||||
let topicParams = g.topicParams[topic]
|
||||
# penalize a peer that delivered no message
|
||||
let threshold = topicParams.meshMessageDeliveriesThreshold
|
||||
if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold:
|
||||
|
@ -462,7 +476,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
|||
# don't pick explicit peers
|
||||
it.peerId notin g.parameters.directPeers and
|
||||
# and avoid peers we are backing off
|
||||
it.peerId notin g.backingOff
|
||||
it.peerId notin g.backingOff.getOrDefault(topic)
|
||||
)
|
||||
|
||||
# shuffle anyway, score might be not used
|
||||
|
@ -507,7 +521,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
|||
# don't pick explicit peers
|
||||
it.peerId notin g.parameters.directPeers and
|
||||
# and avoid peers we are backing off
|
||||
it.peerId notin g.backingOff
|
||||
it.peerId notin g.backingOff.getOrDefault(topic)
|
||||
)
|
||||
|
||||
# shuffle anyway, score might be not used
|
||||
|
@ -606,7 +620,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
|
|||
# don't pick explicit peers
|
||||
x.peerId notin g.parameters.directPeers and
|
||||
# and avoid peers we are backing off
|
||||
x.peerId notin g.backingOff
|
||||
x.peerId notin g.backingOff.getOrDefault(topic)
|
||||
|
||||
# by spec, grab only 2
|
||||
if avail.len > 2:
|
||||
|
@ -832,6 +846,32 @@ proc updateScores(g: GossipSub) = # avoid async
|
|||
|
||||
peer.score += topicScore * topicParams.topicWeight
|
||||
|
||||
# Score metrics
|
||||
when defined(libp2p_agents_metrics):
|
||||
let agent =
|
||||
block:
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
if peer.sendConn != nil:
|
||||
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||
peer.shortAgent = shortAgent
|
||||
else:
|
||||
peer.shortAgent = "unknown"
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent])
|
||||
else:
|
||||
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"])
|
||||
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"])
|
||||
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"])
|
||||
libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"])
|
||||
|
||||
# Score decay
|
||||
info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay
|
||||
if info.firstMessageDeliveries < g.parameters.decayToZero:
|
||||
|
@ -857,7 +897,32 @@ proc updateScores(g: GossipSub) = # avoid async
|
|||
|
||||
peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
|
||||
|
||||
peer.score += g.colocationFactor(peer) * g.parameters.ipColocationFactorWeight
|
||||
let colocationFactor = g.colocationFactor(peer)
|
||||
peer.score += colocationFactor * g.parameters.ipColocationFactorWeight
|
||||
|
||||
# Score metrics
|
||||
when defined(libp2p_agents_metrics):
|
||||
let agent =
|
||||
block:
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
if peer.sendConn != nil:
|
||||
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||
peer.shortAgent = shortAgent
|
||||
else:
|
||||
peer.shortAgent = "unknown"
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent])
|
||||
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent])
|
||||
else:
|
||||
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"])
|
||||
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"])
|
||||
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"])
|
||||
|
||||
# decay behaviourPenalty
|
||||
peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay
|
||||
|
@ -876,20 +941,6 @@ proc updateScores(g: GossipSub) = # avoid async
|
|||
asyncSpawn g.disconnectPeer(peer)
|
||||
|
||||
when defined(libp2p_agents_metrics):
|
||||
let agent =
|
||||
block:
|
||||
if peer.shortAgent.len > 0:
|
||||
peer.shortAgent
|
||||
else:
|
||||
if peer.sendConn != nil:
|
||||
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||
peer.shortAgent = shortAgent
|
||||
else:
|
||||
peer.shortAgent = "unknown"
|
||||
peer.shortAgent
|
||||
else:
|
||||
"unknown"
|
||||
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
|
||||
else:
|
||||
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"])
|
||||
|
@ -899,20 +950,19 @@ proc updateScores(g: GossipSub) = # avoid async
|
|||
|
||||
trace "updated scores", peers = g.peers.len
|
||||
|
||||
proc handleBackingOff(t: var BackoffTable, topic: string) =
|
||||
let now = Moment.now()
|
||||
var expired = toSeq(t.getOrDefault(topic).pairs())
|
||||
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
|
||||
now >= pair.expire
|
||||
for (peer, _) in expired:
|
||||
t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer)
|
||||
|
||||
proc heartbeat(g: GossipSub) {.async.} =
|
||||
while g.heartbeatRunning:
|
||||
try:
|
||||
trace "running heartbeat", instance = cast[int](g)
|
||||
|
||||
# remove expired backoffs
|
||||
block:
|
||||
let now = Moment.now()
|
||||
var expired = toSeq(g.backingOff.pairs())
|
||||
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
|
||||
now >= pair.expire
|
||||
for (peer, _) in expired:
|
||||
g.backingOff.del(peer)
|
||||
|
||||
# reset IWANT budget
|
||||
# reset IHAVE cap
|
||||
block:
|
||||
|
@ -925,6 +975,10 @@ proc heartbeat(g: GossipSub) {.async.} =
|
|||
var meshMetrics = MeshMetrics()
|
||||
|
||||
for t in toSeq(g.topics.keys):
|
||||
# remove expired backoffs
|
||||
block:
|
||||
handleBackingOff(g.backingOff, t)
|
||||
|
||||
# prune every negative score peer
|
||||
# do this before relance
|
||||
# in order to avoid grafted -> pruned in the same cycle
|
||||
|
@ -1057,13 +1111,11 @@ method subscribeTopic*(g: GossipSub,
|
|||
|
||||
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
||||
|
||||
proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
|
||||
proc punishInvalidMessage(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
|
||||
for t in topics:
|
||||
if t notin g.topics:
|
||||
continue
|
||||
|
||||
# ensure we init a new topic if unknown
|
||||
let _ = g.topicParams.mgetOrPut(t, TopicParams.init())
|
||||
# update stats
|
||||
g.peerStats.withValue(peer.peerId, stats):
|
||||
stats[].topicInfos.withValue(t, tstats):
|
||||
|
@ -1092,29 +1144,40 @@ proc handleGraft(g: GossipSub,
|
|||
# It is an error to GRAFT on a explicit peer
|
||||
if peer.peerId in g.parameters.directPeers:
|
||||
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
||||
warn "attempt to graft an explicit peer", peer=peer.id,
|
||||
topicID=graft.topicID
|
||||
warn "attempt to graft an explicit peer", peer=peer.peerId,
|
||||
topic
|
||||
# and such an attempt should be logged and rejected with a PRUNE
|
||||
result.add(ControlPrune(
|
||||
topicID: graft.topicID,
|
||||
topicID: topic,
|
||||
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||
|
||||
g.punishPeer(peer, @[topic])
|
||||
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
|
||||
g.backingOff
|
||||
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||
.mgetOrPut(peer.peerId, backoff) = backoff
|
||||
|
||||
peer.behaviourPenalty += 0.1
|
||||
|
||||
continue
|
||||
|
||||
if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now():
|
||||
trace "attempt to graft a backingOff peer", peer=peer.id,
|
||||
topicID=graft.topicID,
|
||||
expire=g.backingOff[peer.peerId]
|
||||
if g.backingOff
|
||||
.getOrDefault(topic)
|
||||
.getOrDefault(peer.peerId) > Moment.now():
|
||||
warn "attempt to graft a backingOff peer", peer=peer.peerId,
|
||||
topic
|
||||
# and such an attempt should be logged and rejected with a PRUNE
|
||||
result.add(ControlPrune(
|
||||
topicID: graft.topicID,
|
||||
topicID: topic,
|
||||
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||
|
||||
g.punishPeer(peer, @[topic])
|
||||
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
|
||||
g.backingOff
|
||||
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||
.mgetOrPut(peer.peerId, backoff) = backoff
|
||||
|
||||
peer.behaviourPenalty += 0.1
|
||||
|
||||
continue
|
||||
|
||||
|
@ -1150,18 +1213,23 @@ proc handleGraft(g: GossipSub,
|
|||
|
||||
proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||
for prune in prunes:
|
||||
trace "peer pruned topic", peer, topic = prune.topicID
|
||||
let topic = prune.topicID
|
||||
|
||||
trace "peer pruned topic", peer, topic
|
||||
|
||||
# add peer backoff
|
||||
if prune.backoff > 0:
|
||||
let backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds)
|
||||
let current = g.backingOff.getOrDefault(peer.peerId)
|
||||
let
|
||||
backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds)
|
||||
current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId)
|
||||
if backoff > current:
|
||||
g.backingOff[peer.peerId] = backoff
|
||||
g.backingOff
|
||||
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||
.mgetOrPut(peer.peerId, backoff) = backoff
|
||||
|
||||
trace "pruning rpc received peer", peer, score = peer.score
|
||||
g.pruned(peer, prune.topicID)
|
||||
g.mesh.removePeer(prune.topicID, peer)
|
||||
g.pruned(peer, topic)
|
||||
g.mesh.removePeer(topic, peer)
|
||||
|
||||
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
|
||||
# another option could be to implement signed peer records
|
||||
|
@ -1264,14 +1332,14 @@ method rpcHandler*(g: GossipSub,
|
|||
# always validate if signature is present or required
|
||||
debug "Dropping message due to failed signature verification",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishPeer(peer, msg.topicIDs)
|
||||
g.punishInvalidMessage(peer, msg.topicIDs)
|
||||
continue
|
||||
|
||||
if msg.seqno.len > 0 and msg.seqno.len != 8:
|
||||
# if we have seqno should be 8 bytes long
|
||||
debug "Dropping message due to invalid seqno length",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishPeer(peer, msg.topicIDs)
|
||||
g.punishInvalidMessage(peer, msg.topicIDs)
|
||||
continue
|
||||
|
||||
# g.anonymize needs no evaluation when receiving messages
|
||||
|
@ -1282,7 +1350,7 @@ method rpcHandler*(g: GossipSub,
|
|||
of ValidationResult.Reject:
|
||||
debug "Dropping message after validation, reason: reject",
|
||||
msgId = shortLog(msgId), peer
|
||||
g.punishPeer(peer, msg.topicIDs)
|
||||
g.punishInvalidMessage(peer, msg.topicIDs)
|
||||
continue
|
||||
of ValidationResult.Ignore:
|
||||
debug "Dropping message after validation, reason: ignore",
|
||||
|
@ -1410,6 +1478,8 @@ proc unsubscribe*(g: GossipSub, topic: string) =
|
|||
else:
|
||||
g.broadcast(toSeq(gpeers), msg)
|
||||
|
||||
g.topicParams.del(topic)
|
||||
|
||||
method unsubscribeAll*(g: GossipSub, topic: string) =
|
||||
g.unsubscribe(topic)
|
||||
# finally let's remove from g.topics, do that by calling PubSub
|
||||
|
|
Loading…
Reference in New Issue