diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index c4c28f0cd..7ad421eac 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -151,12 +151,14 @@ type disconnectBadPeers*: bool + BackoffTable = Table[string, Table[PeerID, Moment]] + GossipSub* = ref object of FloodSub mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic gossipsub*: PeerTable # peers that are subscribed to a topic explicit*: PeerTable # directpeers that we keep alive explicitly - backingOff*: Table[PeerID, Moment] # explicit (always connected/forward) peers + backingOff*: BackoffTable # peers to backoff from when replenishing the mesh lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages @@ -200,6 +202,13 @@ declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow") declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout") @@ -365,16 +374,21 @@ proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = g.grafted(p, topic) proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(p.peerId, backoff) = backoff + g.peerStats.withValue(p.peerId, stats): if topic in stats.topicInfos: var info = stats.topicInfos[topic] - let topicParams = g.topicParams.mgetOrPut(topic, TopicParams.init()) - - # penalize a peer that delivered no message - let threshold = topicParams.meshMessageDeliveriesThreshold - if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: - let deficit = threshold - info.meshMessageDeliveries - info.meshFailurePenalty += deficit * deficit + if topic in g.topicParams: + let topicParams = g.topicParams[topic] + # penalize a peer that delivered no message + let threshold = topicParams.meshMessageDeliveriesThreshold + if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: + let deficit = threshold - info.meshMessageDeliveries + info.meshFailurePenalty += deficit * deficit info.inMesh = false @@ -462,7 +476,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # don't pick explicit peers it.peerId notin g.parameters.directPeers and # and avoid peers we are backing off - it.peerId notin g.backingOff + it.peerId notin g.backingOff.getOrDefault(topic) ) # shuffle anyway, score might be not used @@ -507,7 +521,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # don't pick explicit peers it.peerId notin g.parameters.directPeers and # and avoid peers we are backing off - it.peerId notin g.backingOff + it.peerId notin g.backingOff.getOrDefault(topic) ) # shuffle anyway, score might be not used @@ -606,7 +620,7 @@ proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) # don't pick explicit peers x.peerId notin g.parameters.directPeers and # and avoid peers we are backing off - x.peerId notin g.backingOff + x.peerId notin g.backingOff.getOrDefault(topic) # by spec, grab only 2 if avail.len > 2: @@ -832,6 +846,32 @@ proc updateScores(g: GossipSub) = # avoid async peer.score += topicScore * topicParams.topicWeight + # Score metrics + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) + libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) + libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) + libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent]) + else: + libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"]) + # Score decay info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay if info.firstMessageDeliveries < g.parameters.decayToZero: @@ -857,7 +897,32 @@ proc updateScores(g: GossipSub) = # avoid async peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight - peer.score += g.colocationFactor(peer) * g.parameters.ipColocationFactorWeight + let colocationFactor = g.colocationFactor(peer) + peer.score += colocationFactor * g.parameters.ipColocationFactorWeight + + # Score metrics + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) + libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) + libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) + else: + libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"]) # decay behaviourPenalty peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay @@ -876,20 +941,6 @@ proc updateScores(g: GossipSub) = # avoid async asyncSpawn g.disconnectPeer(peer) when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() - if KnownLibP2PAgentsSeq.contains(shortAgent): - peer.shortAgent = shortAgent - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) else: libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"]) @@ -899,20 +950,19 @@ proc updateScores(g: GossipSub) = # avoid async trace "updated scores", peers = g.peers.len +proc handleBackingOff(t: var BackoffTable, topic: string) = + let now = Moment.now() + var expired = toSeq(t.getOrDefault(topic).pairs()) + expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: + now >= pair.expire + for (peer, _) in expired: + t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer) + proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: try: trace "running heartbeat", instance = cast[int](g) - # remove expired backoffs - block: - let now = Moment.now() - var expired = toSeq(g.backingOff.pairs()) - expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: - now >= pair.expire - for (peer, _) in expired: - g.backingOff.del(peer) - # reset IWANT budget # reset IHAVE cap block: @@ -925,6 +975,10 @@ proc heartbeat(g: GossipSub) {.async.} = var meshMetrics = MeshMetrics() for t in toSeq(g.topics.keys): + # remove expired backoffs + block: + handleBackingOff(g.backingOff, t) + # prune every negative score peer # do this before relance # in order to avoid grafted -> pruned in the same cycle @@ -1057,13 +1111,11 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic -proc punishPeer(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = +proc punishInvalidMessage(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = for t in topics: if t notin g.topics: continue - # ensure we init a new topic if unknown - let _ = g.topicParams.mgetOrPut(t, TopicParams.init()) # update stats g.peerStats.withValue(peer.peerId, stats): stats[].topicInfos.withValue(t, tstats): @@ -1092,29 +1144,40 @@ proc handleGraft(g: GossipSub, # It is an error to GRAFT on a explicit peer if peer.peerId in g.parameters.directPeers: # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) - warn "attempt to graft an explicit peer", peer=peer.id, - topicID=graft.topicID + warn "attempt to graft an explicit peer", peer=peer.peerId, + topic # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( - topicID: graft.topicID, + topicID: topic, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) - g.punishPeer(peer, @[topic]) + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(peer.peerId, backoff) = backoff + + peer.behaviourPenalty += 0.1 continue - if peer.peerId in g.backingOff and g.backingOff[peer.peerId] > Moment.now(): - trace "attempt to graft a backingOff peer", peer=peer.id, - topicID=graft.topicID, - expire=g.backingOff[peer.peerId] + if g.backingOff + .getOrDefault(topic) + .getOrDefault(peer.peerId) > Moment.now(): + warn "attempt to graft a backingOff peer", peer=peer.peerId, + topic # and such an attempt should be logged and rejected with a PRUNE result.add(ControlPrune( - topicID: graft.topicID, + topicID: topic, peers: @[], # omitting heavy computation here as the remote did something illegal backoff: g.parameters.pruneBackoff.seconds.uint64)) - g.punishPeer(peer, @[topic]) + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(peer.peerId, backoff) = backoff + + peer.behaviourPenalty += 0.1 continue @@ -1150,18 +1213,23 @@ proc handleGraft(g: GossipSub, proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = for prune in prunes: - trace "peer pruned topic", peer, topic = prune.topicID + let topic = prune.topicID + + trace "peer pruned topic", peer, topic # add peer backoff if prune.backoff > 0: - let backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds) - let current = g.backingOff.getOrDefault(peer.peerId) + let + backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds) + current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId) if backoff > current: - g.backingOff[peer.peerId] = backoff + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(peer.peerId, backoff) = backoff trace "pruning rpc received peer", peer, score = peer.score - g.pruned(peer, prune.topicID) - g.mesh.removePeer(prune.topicID, peer) + g.pruned(peer, topic) + g.mesh.removePeer(topic, peer) # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that # another option could be to implement signed peer records @@ -1264,14 +1332,14 @@ method rpcHandler*(g: GossipSub, # always validate if signature is present or required debug "Dropping message due to failed signature verification", msgId = shortLog(msgId), peer - g.punishPeer(peer, msg.topicIDs) + g.punishInvalidMessage(peer, msg.topicIDs) continue if msg.seqno.len > 0 and msg.seqno.len != 8: # if we have seqno should be 8 bytes long debug "Dropping message due to invalid seqno length", msgId = shortLog(msgId), peer - g.punishPeer(peer, msg.topicIDs) + g.punishInvalidMessage(peer, msg.topicIDs) continue # g.anonymize needs no evaluation when receiving messages @@ -1282,7 +1350,7 @@ method rpcHandler*(g: GossipSub, of ValidationResult.Reject: debug "Dropping message after validation, reason: reject", msgId = shortLog(msgId), peer - g.punishPeer(peer, msg.topicIDs) + g.punishInvalidMessage(peer, msg.topicIDs) continue of ValidationResult.Ignore: debug "Dropping message after validation, reason: ignore", @@ -1410,6 +1478,8 @@ proc unsubscribe*(g: GossipSub, topic: string) = else: g.broadcast(toSeq(gpeers), msg) + g.topicParams.del(topic) + method unsubscribeAll*(g: GossipSub, topic: string) = g.unsubscribe(topic) # finally let's remove from g.topics, do that by calling PubSub