diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 31c515cf7..17fd3ccb9 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -22,199 +22,19 @@ import ./pubsub, ../../peerid, ../../utility, ../../switch + import stew/results export results +import gossipsub/[types, scoring, behavior] +export types +export scoring +export behavior + logScope: topics = "libp2p gossipsub" -const - GossipSubCodec* = "/meshsub/1.1.0" - GossipSubCodec_10* = "/meshsub/1.0.0" - -# overlay parameters -const - GossipSubD* = 6 - GossipSubDlo* = 4 - GossipSubDhi* = 12 - -# gossip parameters -const - GossipSubHistoryLength* = 5 - GossipSubHistoryGossip* = 3 - -# heartbeat interval - GossipSubHeartbeatInterval* = 1.seconds - -# fanout ttl -const - GossipSubFanoutTTL* = 1.minutes - -# gossip parameters -const - GossipBackoffPeriod* = 1.minutes - -const - BackoffSlackTime = 2 # seconds - IWantPeerBudget = 25 # 25 messages per second ( reset every heartbeat ) - IHavePeerBudget = 10 - # the max amount of IHave to expose, not by spec, but go as example - # rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572 - # go: https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L155 - IHaveMaxLength = 5000 - -type - TopicInfo* = object - # gossip 1.1 related - graftTime: Moment - meshTime: Duration - inMesh: bool - meshMessageDeliveriesActive: bool - firstMessageDeliveries: float64 - meshMessageDeliveries: float64 - meshFailurePenalty: float64 - invalidMessageDeliveries: float64 - - TopicParams* = object - topicWeight*: float64 - - # p1 - timeInMeshWeight*: float64 - timeInMeshQuantum*: Duration - timeInMeshCap*: float64 - - # p2 - firstMessageDeliveriesWeight*: float64 - firstMessageDeliveriesDecay*: float64 - firstMessageDeliveriesCap*: float64 - - # p3 - meshMessageDeliveriesWeight*: float64 - meshMessageDeliveriesDecay*: float64 - meshMessageDeliveriesThreshold*: float64 - meshMessageDeliveriesCap*: float64 - meshMessageDeliveriesActivation*: Duration - meshMessageDeliveriesWindow*: Duration - - # p3b - meshFailurePenaltyWeight*: float64 - meshFailurePenaltyDecay*: float64 - - # p4 - invalidMessageDeliveriesWeight*: float64 - invalidMessageDeliveriesDecay*: float64 - - PeerStats* = object - topicInfos*: Table[string, TopicInfo] - expire*: Moment # updated on disconnect, to retain scores until expire - # the following are copies from PubSubPeer, in order to restore them on re-connection - score*: float64 # a copy of the score to keep in case the peer is disconnected - appScore*: float64 # application specific score - behaviourPenalty*: float64 # the eventual penalty score - - GossipSubParams* = object - explicit: bool - pruneBackoff*: Duration - floodPublish*: bool - gossipFactor*: float64 - d*: int - dLow*: int - dHigh*: int - dScore*: int - dOut*: int - dLazy*: int - - heartbeatInterval*: Duration - - historyLength*: int - historyGossip*: int - - fanoutTTL*: Duration - seenTTL*: Duration - - gossipThreshold*: float64 - publishThreshold*: float64 - graylistThreshold*: float64 - acceptPXThreshold*: float64 - opportunisticGraftThreshold*: float64 - decayInterval*: Duration - decayToZero*: float64 - retainScore*: Duration - - appSpecificWeight*: float64 - ipColocationFactorWeight*: float64 - ipColocationFactorThreshold*: float64 - behaviourPenaltyWeight*: float64 - behaviourPenaltyDecay*: float64 - - directPeers*: Table[PeerId, seq[MultiAddress]] - - disconnectBadPeers*: bool - - BackoffTable = Table[string, Table[PeerID, Moment]] - - GossipSub* = ref object of FloodSub - mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic - fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic - gossipsub*: PeerTable # peers that are subscribed to a topic - explicit*: PeerTable # directpeers that we keep alive explicitly - backingOff*: BackoffTable # peers to backoff from when replenishing the mesh - lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics - gossip*: Table[string, seq[ControlIHave]] # pending gossip - control*: Table[string, ControlMessage] # pending control messages - mcache*: MCache # messages cache - heartbeatFut: Future[void] # cancellation future for heartbeat interval - heartbeatRunning: bool - - peerStats: Table[PeerID, PeerStats] - parameters*: GossipSubParams - topicParams*: Table[string, TopicParams] - directPeersLoop: Future[void] - peersInIP: Table[MultiAddress, HashSet[PubSubPeer]] - - heartbeatEvents*: seq[AsyncEvent] - - randomBytes: seq[byte] - - MeshMetrics = object - # scratch buffers for metrics - otherPeersPerTopicMesh: int64 - otherPeersPerTopicFanout: int64 - otherPeersPerTopicGossipsub: int64 - underDlowTopics: int64 - underDoutTopics: int64 - underDhighAboveDlowTopics: int64 - noPeersTopics: int64 - - -# the following 3 metrics are updated only inside rebalanceMesh -# this is the most reliable place and rebalance anyway happens every heartbeat -declareGauge(libp2p_gossipsub_peers_per_topic_mesh, - "gossipsub peers per topic in mesh", - labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_fanout, - "gossipsub peers per topic in fanout", - labels = ["topic"]) -declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, - "gossipsub peers per topic in gossipsub", - labels = ["topic"]) - declareCounter(libp2p_gossipsub_failed_publish, "number of failed publish") -declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") -declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) -declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) -declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow") -declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout") -declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow") -declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics without peers available") -declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) declareCounter(libp2p_gossipsub_invalid_topic_subscription, "number of invalid topic subscriptions that happened") proc init*(_: type[GossipSubParams]): GossipSubParams = @@ -319,8 +139,6 @@ proc validateParameters*(parameters: TopicParams): Result[void, cstring] = else: ok() -func byScore(x,y: PubSubPeer): int = system.cmp(x.score, y.score) - method init*(g: GossipSub) = proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -340,13 +158,6 @@ method init*(g: GossipSub) = g.codecs &= GossipSubCodec g.codecs &= GossipSubCodec_10 -proc initPeerStats(g: GossipSub, peer: PubSubPeer, stats: PeerStats = PeerStats()) = - var initialStats = stats - initialStats.expire = Moment.now() + g.parameters.retainScore - g.peerStats[peer.peerId] = initialStats - peer.iWantBudget = IWantPeerBudget - peer.iHaveBudget = IHavePeerBudget - method onNewPeer(g: GossipSub, peer: PubSubPeer) = if peer.peerId notin g.peerStats: g.initPeerStats(peer) @@ -358,71 +169,6 @@ method onNewPeer(g: GossipSub, peer: PubSubPeer) = peer.appScore = stats.appScore peer.behaviourPenalty = stats.behaviourPenalty -proc grafted(g: GossipSub, p: PubSubPeer, topic: string) = - g.peerStats.withValue(p.peerId, stats): - var info = stats.topicInfos.getOrDefault(topic) - info.graftTime = Moment.now() - info.meshTime = 0.seconds - info.inMesh = true - info.meshMessageDeliveriesActive = false - - # mgetOrPut does not work, so we gotta do this without referencing - stats.topicInfos[topic] = info - assert(g.peerStats[p.peerId].topicInfos[topic].inMesh == true) - - trace "grafted", peer=p, topic - do: - g.initPeerStats(p) - g.grafted(p, topic) - -proc pruned(g: GossipSub, p: PubSubPeer, topic: string) = - let backoff = Moment.fromNow(g.parameters.pruneBackoff) - g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(p.peerId, backoff) = backoff - - g.peerStats.withValue(p.peerId, stats): - if topic in stats.topicInfos: - var info = stats.topicInfos[topic] - if topic in g.topicParams: - let topicParams = g.topicParams[topic] - # penalize a peer that delivered no message - let threshold = topicParams.meshMessageDeliveriesThreshold - if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: - let deficit = threshold - info.meshMessageDeliveries - info.meshFailurePenalty += deficit * deficit - - info.inMesh = false - - # mgetOrPut does not work, so we gotta do this without referencing - stats.topicInfos[topic] = info - - trace "pruned", peer=p, topic - -proc peerExchangeList(g: GossipSub, topic: string): seq[PeerInfoMsg] = - var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() - peers.keepIf do (x: PubSubPeer) -> bool: - x.score >= 0.0 - # by spec, larger then Dhi, but let's put some hard caps - peers.setLen(min(peers.len, g.parameters.dHigh * 2)) - peers.map do (x: PubSubPeer) -> PeerInfoMsg: - PeerInfoMsg(peerID: x.peerId.getBytes()) - -proc replenishFanout(g: GossipSub, topic: string) = - ## get fanout peers for a topic - logScope: topic - trace "about to replenish fanout" - - if g.fanout.peers(topic) < g.parameters.dLow: - trace "replenishing fanout", peers = g.fanout.peers(topic) - if topic in g.gossipsub: - for peer in g.gossipsub[topic]: - if g.fanout.addPeer(topic, peer): - if g.fanout.peers(topic) == g.parameters.d: - break - - trace "fanout replenished with peers", peers = g.fanout.peers(topic) - method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent) {.gcsafe.} = case event.kind of PubSubPeerEventKind.Connected: @@ -439,603 +185,6 @@ method onPubSubPeerEvent*(p: GossipSub, peer: PubsubPeer, event: PubSubPeerEvent procCall FloodSub(p).onPubSubPeerEvent(peer, event) -proc commitMetrics(metrics: var MeshMetrics) = - libp2p_gossipsub_under_dlow_topics.set(metrics.underDlowTopics) - libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) - libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) - libp2p_gossipsub_under_dhigh_above_dlow_topics.set(metrics.underDhighAboveDlowTopics) - libp2p_gossipsub_peers_per_topic_gossipsub.set(metrics.otherPeersPerTopicGossipsub, labelValues = ["other"]) - libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) - libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) - -proc rebalanceMesh(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) = - logScope: - topic - mesh = g.mesh.peers(topic) - gossipsub = g.gossipsub.peers(topic) - - trace "rebalancing mesh" - - # create a mesh topic that we're subscribing to - - var - prunes, grafts: seq[PubSubPeer] - npeers = g.mesh.peers(topic) - - if npeers < g.parameters.dLow: - if not isNil(metrics): - inc metrics[].underDlowTopics - - trace "replenishing mesh", peers = npeers - # replenish the mesh if we're below Dlo - var candidates = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ).filterIt( - it.connected and - # avoid negative score peers - it.score >= 0.0 and - # don't pick explicit peers - it.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - it.peerId notin g.backingOff.getOrDefault(topic) - ) - - # shuffle anyway, score might be not used - shuffle(candidates) - - # sort peers by score, high score first since we graft - candidates.sort(byScore, SortOrder.Descending) - - # Graft peers so we reach a count of D - candidates.setLen(min(candidates.len, g.parameters.d - npeers)) - - trace "grafting", grafting = candidates.len - - if candidates.len == 0: - if not isNil(metrics): - inc metrics[].noPeersTopics - else: - for peer in candidates: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - grafts &= peer - - else: - var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())) - meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound - if meshPeers.len < g.parameters.dOut: - if not isNil(metrics): - inc metrics[].underDoutTopics - - trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) - - var candidates = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ).filterIt( - it.connected and - # get only outbound ones - it.outbound and - # avoid negative score peers - it.score >= 0.0 and - # don't pick explicit peers - it.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - it.peerId notin g.backingOff.getOrDefault(topic) - ) - - # shuffle anyway, score might be not used - shuffle(candidates) - - # sort peers by score, high score first, we are grafting - candidates.sort(byScore, SortOrder.Descending) - - # Graft peers so we reach a count of D - candidates.setLen(min(candidates.len, g.parameters.dOut)) - - trace "grafting outbound peers", topic, peers = candidates.len - - for peer in candidates: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - grafts &= peer - - - # get again npeers after possible grafts - npeers = g.mesh.peers(topic) - if npeers > g.parameters.dHigh: - if not isNil(metrics): - if g.knownTopics.contains(topic): - libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic]) - else: - libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"]) - - # prune peers if we've gone over Dhi - prunes = toSeq(g.mesh[topic]) - # avoid pruning peers we are currently grafting in this heartbeat - prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts - - # shuffle anyway, score might be not used - shuffle(prunes) - - # sort peers by score (inverted), pruning, so low score peers are on top - prunes.sort(byScore, SortOrder.Ascending) - - # keep high score peers - if prunes.len > g.parameters.dScore: - prunes.setLen(prunes.len - g.parameters.dScore) - - # collect inbound/outbound info - var outbound: seq[PubSubPeer] - var inbound: seq[PubSubPeer] - for peer in prunes: - if peer.outbound: - outbound &= peer - else: - inbound &= peer - - let - meshOutbound = prunes.countIt(it.outbound) - maxOutboundPrunes = meshOutbound - g.parameters.dOut - - # ensure that there are at least D_out peers first and rebalance to g.d after that - outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes))) - - # concat remaining outbound peers - prunes = inbound & outbound - - let pruneLen = prunes.len - g.parameters.d - if pruneLen > 0: - # Ok we got some peers to prune, - # for this heartbeat let's prune those - shuffle(prunes) - prunes.setLen(pruneLen) - - trace "pruning", prunes = prunes.len - for peer in prunes: - trace "pruning peer on rebalance", peer, score = peer.score - g.pruned(peer, topic) - g.mesh.removePeer(topic, peer) - elif npeers > g.parameters.dLow and not isNil(metrics): - inc metrics[].underDhighAboveDlowTopics - - # opportunistic grafting, by spec mesh should not be empty... - if g.mesh.peers(topic) > 1: - var peers = toSeq(g.mesh[topic]) - # grafting so high score has priority - peers.sort(byScore, SortOrder.Descending) - let medianIdx = peers.len div 2 - let median = peers[medianIdx] - if median.score < g.parameters.opportunisticGraftThreshold: - trace "median score below opportunistic threshold", score = median.score - var avail = toSeq( - g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - - g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) - ) - - avail.keepIf do (x: PubSubPeer) -> bool: - # avoid negative score peers - x.score >= median.score and - # don't pick explicit peers - x.peerId notin g.parameters.directPeers and - # and avoid peers we are backing off - x.peerId notin g.backingOff.getOrDefault(topic) - - # by spec, grab only 2 - if avail.len > 2: - avail.setLen(2) - - for peer in avail: - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - grafts &= peer - trace "opportunistic grafting", peer - - if not isNil(metrics): - if g.knownTopics.contains(topic): - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout.peers(topic).int64, labelValues = [topic]) - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh.peers(topic).int64, labelValues = [topic]) - else: - metrics[].otherPeersPerTopicGossipsub += g.gossipsub.peers(topic).int64 - metrics[].otherPeersPerTopicFanout += g.fanout.peers(topic).int64 - metrics[].otherPeersPerTopicMesh += g.mesh.peers(topic).int64 - - trace "mesh balanced" - - # Send changes to peers after table updates to avoid stale state - if grafts.len > 0: - let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) - g.broadcast(grafts, graft) - if prunes.len > 0: - let prune = RPCMsg(control: some(ControlMessage( - prune: @[ControlPrune( - topicID: topic, - peers: g.peerExchangeList(topic), - backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) - -proc dropFanoutPeers(g: GossipSub) = - # drop peers that we haven't published to in - # GossipSubFanoutTTL seconds - let now = Moment.now() - for topic in toSeq(g.lastFanoutPubSub.keys): - let val = g.lastFanoutPubSub[topic] - if now > val: - g.fanout.del(topic) - g.lastFanoutPubSub.del(topic) - trace "dropping fanout topic", topic - -proc getGossipPeers(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = - ## gossip iHave messages to peers - ## - - libp2p_gossipsub_cache_window_size.set(0) - - trace "getting gossip peers (iHave)" - let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) - for topic in topics: - if topic notin g.gossipsub: - trace "topic not in gossip array, skipping", topicID = topic - continue - - let mids = g.mcache.window(topic) - if not(mids.len > 0): - continue - - var midsSeq = toSeq(mids) - - libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64) - - # not in spec - # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 - # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 - if midsSeq.len > IHaveMaxLength: - shuffle(midsSeq) - midsSeq.setLen(IHaveMaxLength) - - let - ihave = ControlIHave(topicID: topic, messageIDs: midsSeq) - mesh = g.mesh.getOrDefault(topic) - fanout = g.fanout.getOrDefault(topic) - gossipPeers = mesh + fanout - var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) - - allPeers.keepIf do (x: PubSubPeer) -> bool: - x.peerId notin g.parameters.directPeers and - x notin gossipPeers and - x.score >= g.parameters.gossipThreshold - - var target = g.parameters.dLazy - let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int - if factor > target: - target = min(factor, allPeers.len) - - if target < allPeers.len: - shuffle(allPeers) - allPeers.setLen(target) - - for peer in allPeers: - if peer notin result: - result[peer] = ControlMessage() - result[peer].ihave.add(ihave) - -func `/`(a, b: Duration): float64 = - let - fa = float64(a.nanoseconds) - fb = float64(b.nanoseconds) - fa / fb - -proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = - when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() - if KnownLibP2PAgentsSeq.contains(shortAgent): - peer.shortAgent = shortAgent - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) - else: - libp2p_gossipsub_bad_score_disconnection.inc(labelValues = ["unknown"]) - - if peer.sendConn != nil: - try: - await g.switch.disconnect(peer.peerId) - except CancelledError: - raise - except CatchableError as exc: - trace "Failed to close connection", peer, error = exc.name, msg = exc.msg - -proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = - if peer.sendConn == nil: - trace "colocationFactor, no connection", peer - 0.0 - else: - let - address = peer.sendConn.observedAddr - - g.peersInIP.mgetOrPut(address, initHashSet[PubSubPeer]()).incl(peer) - if address notin g.peersInIP: - g.peersInIP[address] = initHashSet[PubSubPeer]() - g.peersInIP[address].incl(peer) - - let - ipPeers = g.peersInIP[address] - len = ipPeers.len.float64 - - if len > g.parameters.ipColocationFactorThreshold: - trace "colocationFactor over threshold", peer, address, len - let over = len - g.parameters.ipColocationFactorThreshold - over * over - else: - 0.0 - -proc updateScores(g: GossipSub) = # avoid async - trace "updating scores", peers = g.peers.len - - let now = Moment.now() - var evicting: seq[PeerID] - - for peerId, stats in g.peerStats.mpairs: - let peer = g.peers.getOrDefault(peerId) - if isNil(peer) or not(peer.connected): - if now > stats.expire: - evicting.add(peerId) - trace "evicted peer from memory", peer = peerId - continue - - trace "updating peer score", peer - - var - n_topics = 0 - is_grafted = 0 - - # Per topic - for topic, topicParams in g.topicParams: - var info = stats.topicInfos.getOrDefault(topic) - inc n_topics - - # if weight is 0.0 avoid wasting time - if topicParams.topicWeight != 0.0: - # Scoring - var topicScore = 0'f64 - - if info.inMesh: - inc is_grafted - info.meshTime = now - info.graftTime - if info.meshTime > topicParams.meshMessageDeliveriesActivation: - info.meshMessageDeliveriesActive = true - - var p1 = info.meshTime / topicParams.timeInMeshQuantum - if p1 > topicParams.timeInMeshCap: - p1 = topicParams.timeInMeshCap - trace "p1", peer, p1, topic, topicScore - topicScore += p1 * topicParams.timeInMeshWeight - else: - info.meshMessageDeliveriesActive = false - - topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight - trace "p2", peer, p2 = info.firstMessageDeliveries, topic, topicScore - - if info.meshMessageDeliveriesActive: - if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: - let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries - let p3 = deficit * deficit - trace "p3", peer, p3, topic, topicScore - topicScore += p3 * topicParams.meshMessageDeliveriesWeight - - topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight - trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore - - topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight - trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore - - trace "updated peer topic's scores", peer, topic, info, topicScore - - peer.score += topicScore * topicParams.topicWeight - - # Score metrics - when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() - if KnownLibP2PAgentsSeq.contains(shortAgent): - peer.shortAgent = shortAgent - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" - libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) - libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) - libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) - libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent]) - else: - libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"]) - - # Score decay - info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay - if info.firstMessageDeliveries < g.parameters.decayToZero: - info.firstMessageDeliveries = 0 - - info.meshMessageDeliveries *= topicParams.meshMessageDeliveriesDecay - if info.meshMessageDeliveries < g.parameters.decayToZero: - info.meshMessageDeliveries = 0 - - info.meshFailurePenalty *= topicParams.meshFailurePenaltyDecay - if info.meshFailurePenalty < g.parameters.decayToZero: - info.meshFailurePenalty = 0 - - info.invalidMessageDeliveries *= topicParams.invalidMessageDeliveriesDecay - if info.invalidMessageDeliveries < g.parameters.decayToZero: - info.invalidMessageDeliveries = 0 - - # Wrap up - # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) - stats.topicInfos[topic] = info - - peer.score += peer.appScore * g.parameters.appSpecificWeight - - peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight - - let colocationFactor = g.colocationFactor(peer) - peer.score += colocationFactor * g.parameters.ipColocationFactorWeight - - # Score metrics - when defined(libp2p_agents_metrics): - let agent = - block: - if peer.shortAgent.len > 0: - peer.shortAgent - else: - if peer.sendConn != nil: - let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() - if KnownLibP2PAgentsSeq.contains(shortAgent): - peer.shortAgent = shortAgent - else: - peer.shortAgent = "unknown" - peer.shortAgent - else: - "unknown" - libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) - libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) - libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) - else: - libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"]) - libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"]) - - # decay behaviourPenalty - peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay - if peer.behaviourPenalty < g.parameters.decayToZero: - peer.behaviourPenalty = 0 - - # copy into stats the score to keep until expired - stats.score = peer.score - stats.appScore = peer.appScore - stats.behaviourPenalty = peer.behaviourPenalty - stats.expire = Moment.now() + g.parameters.retainScore # refresh expiration - assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check - trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted - - if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold: - debug "disconnecting bad score peer", peer, score = peer.score - asyncSpawn g.disconnectPeer(peer) - - when defined(libp2p_agents_metrics): - libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) - else: - libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"]) - - for peer in evicting: - g.peerStats.del(peer) - - trace "updated scores", peers = g.peers.len - -proc handleBackingOff(t: var BackoffTable, topic: string) = - let now = Moment.now() - var expired = toSeq(t.getOrDefault(topic).pairs()) - expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: - now >= pair.expire - for (peer, _) in expired: - t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer) - -proc heartbeat(g: GossipSub) {.async.} = - while g.heartbeatRunning: - try: - trace "running heartbeat", instance = cast[int](g) - - # reset IWANT budget - # reset IHAVE cap - block: - for peer in g.peers.values: - peer.iWantBudget = IWantPeerBudget - peer.iHaveBudget = IHavePeerBudget - - g.updateScores() - - var meshMetrics = MeshMetrics() - - for t in toSeq(g.topics.keys): - # remove expired backoffs - block: - handleBackingOff(g.backingOff, t) - - # prune every negative score peer - # do this before relance - # in order to avoid grafted -> pruned in the same cycle - let meshPeers = g.mesh.getOrDefault(t) - var prunes: seq[PubSubPeer] - for peer in meshPeers: - if peer.score < 0.0: - trace "pruning negative score peer", peer, score = peer.score - g.pruned(peer, t) - g.mesh.removePeer(t, peer) - prunes &= peer - if prunes.len > 0: - let prune = RPCMsg(control: some(ControlMessage( - prune: @[ControlPrune( - topicID: t, - peers: g.peerExchangeList(t), - backoff: g.parameters.pruneBackoff.seconds.uint64)]))) - g.broadcast(prunes, prune) - - # pass by ptr in order to both signal we want to update metrics - # and as well update the struct for each topic during this iteration - g.rebalanceMesh(t, addr meshMetrics) - - commitMetrics(meshMetrics) - - g.dropFanoutPeers() - - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) - - let peers = g.getGossipPeers() - for peer, control in peers: - # only ihave from here - for ihave in control.ihave: - if g.knownTopics.contains(ihave.topicID): - libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID]) - else: - libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) - g.send(peer, RPCMsg(control: some(control))) - - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, - trace = exc.getStackTrace() - - for trigger in g.heartbeatEvents: - trace "firing heartbeat event", instance = cast[int](g) - trigger.fire() - - await sleepAsync(g.parameters.heartbeatInterval) - method unsubscribePeer*(g: GossipSub, peer: PeerID) = ## handle peer disconnects ## @@ -1114,176 +263,6 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic -proc punishInvalidMessage(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = - for t in topics: - if t notin g.topics: - continue - - # update stats - g.peerStats.withValue(peer.peerId, stats): - stats[].topicInfos.withValue(t, tstats): - tstats[].invalidMessageDeliveries += 1 - do: # if we have no stats populate! - stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) - do: # if we have no stats populate! - g.initPeerStats(peer) do: - var stats = PeerStats() - stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) - stats - - -proc handleGraft(g: GossipSub, - peer: PubSubPeer, - grafts: seq[ControlGraft]): seq[ControlPrune] = - for graft in grafts: - let topic = graft.topicID - logScope: - peer - topic - - trace "peer grafted topic" - - # It is an error to GRAFT on a explicit peer - if peer.peerId in g.parameters.directPeers: - # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) - warn "attempt to graft an explicit peer", peer=peer.peerId, - topic - # and such an attempt should be logged and rejected with a PRUNE - result.add(ControlPrune( - topicID: topic, - peers: @[], # omitting heavy computation here as the remote did something illegal - backoff: g.parameters.pruneBackoff.seconds.uint64)) - - let backoff = Moment.fromNow(g.parameters.pruneBackoff) - g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(peer.peerId, backoff) = backoff - - peer.behaviourPenalty += 0.1 - - continue - - if g.backingOff - .getOrDefault(topic) - .getOrDefault(peer.peerId) > Moment.now(): - warn "attempt to graft a backingOff peer", peer=peer.peerId, - topic - # and such an attempt should be logged and rejected with a PRUNE - result.add(ControlPrune( - topicID: topic, - peers: @[], # omitting heavy computation here as the remote did something illegal - backoff: g.parameters.pruneBackoff.seconds.uint64)) - - let backoff = Moment.fromNow(g.parameters.pruneBackoff) - g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(peer.peerId, backoff) = backoff - - peer.behaviourPenalty += 0.1 - - continue - - if peer.peerId notin g.peerStats: - g.initPeerStats(peer) - - # not in the spec exactly, but let's avoid way too low score peers - # other clients do it too also was an audit recommendation - if peer.score < g.parameters.publishThreshold: - continue - - # If they send us a graft before they send us a subscribe, what should - # we do? For now, we add them to mesh but don't add them to gossipsub. - if topic in g.topics: - if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound: - # In the spec, there's no mention of DHi here, but implicitly, a - # peer will be removed from the mesh on next rebalance, so we don't want - # this peer to push someone else out - if g.mesh.addPeer(topic, peer): - g.grafted(peer, topic) - g.fanout.removePeer(topic, peer) - else: - trace "peer already in mesh" - else: - trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic) - result.add(ControlPrune( - topicID: topic, - peers: g.peerExchangeList(topic), - backoff: g.parameters.pruneBackoff.seconds.uint64)) - else: - trace "peer grafting topic we're not interested in", topic - # gossip 1.1, we do not send a control message prune anymore - -proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = - for prune in prunes: - let topic = prune.topicID - - trace "peer pruned topic", peer, topic - - # add peer backoff - if prune.backoff > 0: - let - backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds) - current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId) - if backoff > current: - g.backingOff - .mgetOrPut(topic, initTable[PeerID, Moment]()) - .mgetOrPut(peer.peerId, backoff) = backoff - - trace "pruning rpc received peer", peer, score = peer.score - g.pruned(peer, topic) - g.mesh.removePeer(topic, peer) - - # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that - # another option could be to implement signed peer records - ## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0: - -proc handleIHave(g: GossipSub, - peer: PubSubPeer, - ihaves: seq[ControlIHave]): ControlIWant = - if peer.score < g.parameters.gossipThreshold: - trace "ihave: ignoring low score peer", peer, score = peer.score - elif peer.iHaveBudget <= 0: - trace "ihave: ignoring out of budget peer", peer, score = peer.score - else: - var deIhaves = ihaves.deduplicate() - for ihave in deIhaves.mitems: - trace "peer sent ihave", - peer, topic = ihave.topicID, msgs = ihave.messageIDs - if ihave.topicID in g.mesh: - for m in ihave.messageIDs: - let msgId = m & g.randomBytes - if msgId notin g.seen: - if peer.iHaveBudget > 0: - result.messageIDs.add(m) - dec peer.iHaveBudget - else: - return - - # shuffling result.messageIDs before sending it out to increase the likelihood - # of getting an answer if the peer truncates the list due to internal size restrictions. - shuffle(result.messageIDs) - -proc handleIWant(g: GossipSub, - peer: PubSubPeer, - iwants: seq[ControlIWant]): seq[Message] = - if peer.score < g.parameters.gossipThreshold: - trace "iwant: ignoring low score peer", peer, score = peer.score - elif peer.iWantBudget <= 0: - trace "iwant: ignoring out of budget peer", peer, score = peer.score - else: - var deIwants = iwants.deduplicate() - for iwant in deIwants: - for mid in iwant.messageIDs: - trace "peer sent iwant", peer, messageID = mid - let msg = g.mcache.get(mid) - if msg.isSome: - # avoid spam - if peer.iWantBudget > 0: - result.add(msg.get()) - dec peer.iWantBudget - else: - return - method rpcHandler*(g: GossipSub, peer: PubSubPeer, rpcMsg: RPCMsg) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub/behavior.nim b/libp2p/protocols/pubsub/gossipsub/behavior.nim new file mode 100644 index 000000000..9f8ed0987 --- /dev/null +++ b/libp2p/protocols/pubsub/gossipsub/behavior.nim @@ -0,0 +1,613 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/[tables, strutils, sequtils, sets, algorithm] +import random # for shuffle +import chronos, chronicles, metrics +import "."/[types, scoring] +import ".."/[pubsubpeer, peertable, timedcache, mcache, pubsub] +import "../rpc"/[messages] +import "../../.."/[peerid, multiaddress, utility, switch] + +declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache") +declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"]) +declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"]) +declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow") +declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout") +declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow") +declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics without peers available") +declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"]) + +proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) = + g.peerStats.withValue(p.peerId, stats): + var info = stats.topicInfos.getOrDefault(topic) + info.graftTime = Moment.now() + info.meshTime = 0.seconds + info.inMesh = true + info.meshMessageDeliveriesActive = false + + # mgetOrPut does not work, so we gotta do this without referencing + stats.topicInfos[topic] = info + assert(g.peerStats[p.peerId].topicInfos[topic].inMesh == true) + + trace "grafted", peer=p, topic + do: + g.initPeerStats(p) + g.grafted(p, topic) + +proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) = + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(p.peerId, backoff) = backoff + + g.peerStats.withValue(p.peerId, stats): + if topic in stats.topicInfos: + var info = stats.topicInfos[topic] + if topic in g.topicParams: + let topicParams = g.topicParams[topic] + # penalize a peer that delivered no message + let threshold = topicParams.meshMessageDeliveriesThreshold + if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold: + let deficit = threshold - info.meshMessageDeliveries + info.meshFailurePenalty += deficit * deficit + + info.inMesh = false + + # mgetOrPut does not work, so we gotta do this without referencing + stats.topicInfos[topic] = info + + trace "pruned", peer=p, topic + +proc handleBackingOff*(t: var BackoffTable, topic: string) = + let now = Moment.now() + var expired = toSeq(t.getOrDefault(topic).pairs()) + expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool: + now >= pair.expire + for (peer, _) in expired: + t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer) + +proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] = + var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq() + peers.keepIf do (x: PubSubPeer) -> bool: + x.score >= 0.0 + # by spec, larger then Dhi, but let's put some hard caps + peers.setLen(min(peers.len, g.parameters.dHigh * 2)) + peers.map do (x: PubSubPeer) -> PeerInfoMsg: + PeerInfoMsg(peerID: x.peerId.getBytes()) + +proc handleGraft*(g: GossipSub, + peer: PubSubPeer, + grafts: seq[ControlGraft]): seq[ControlPrune] = + for graft in grafts: + let topic = graft.topicID + logScope: + peer + topic + + trace "peer grafted topic" + + # It is an error to GRAFT on a explicit peer + if peer.peerId in g.parameters.directPeers: + # receiving a graft from a direct peer should yield a more prominent warning (protocol violation) + warn "attempt to graft an explicit peer", peer=peer.peerId, + topic + # and such an attempt should be logged and rejected with a PRUNE + result.add(ControlPrune( + topicID: topic, + peers: @[], # omitting heavy computation here as the remote did something illegal + backoff: g.parameters.pruneBackoff.seconds.uint64)) + + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(peer.peerId, backoff) = backoff + + peer.behaviourPenalty += 0.1 + + continue + + if g.backingOff + .getOrDefault(topic) + .getOrDefault(peer.peerId) > Moment.now(): + warn "attempt to graft a backingOff peer", peer=peer.peerId, + topic + # and such an attempt should be logged and rejected with a PRUNE + result.add(ControlPrune( + topicID: topic, + peers: @[], # omitting heavy computation here as the remote did something illegal + backoff: g.parameters.pruneBackoff.seconds.uint64)) + + let backoff = Moment.fromNow(g.parameters.pruneBackoff) + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(peer.peerId, backoff) = backoff + + peer.behaviourPenalty += 0.1 + + continue + + if peer.peerId notin g.peerStats: + g.initPeerStats(peer) + + # not in the spec exactly, but let's avoid way too low score peers + # other clients do it too also was an audit recommendation + if peer.score < g.parameters.publishThreshold: + continue + + # If they send us a graft before they send us a subscribe, what should + # we do? For now, we add them to mesh but don't add them to gossipsub. + if topic in g.topics: + if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound: + # In the spec, there's no mention of DHi here, but implicitly, a + # peer will be removed from the mesh on next rebalance, so we don't want + # this peer to push someone else out + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + else: + trace "peer already in mesh" + else: + trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic) + result.add(ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)) + else: + trace "peer grafting topic we're not interested in", topic + # gossip 1.1, we do not send a control message prune anymore + +proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) = + for prune in prunes: + let topic = prune.topicID + + trace "peer pruned topic", peer, topic + + # add peer backoff + if prune.backoff > 0: + let + backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds) + current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId) + if backoff > current: + g.backingOff + .mgetOrPut(topic, initTable[PeerID, Moment]()) + .mgetOrPut(peer.peerId, backoff) = backoff + + trace "pruning rpc received peer", peer, score = peer.score + g.pruned(peer, topic) + g.mesh.removePeer(topic, peer) + + # TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that + # another option could be to implement signed peer records + ## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0: + +proc handleIHave*(g: GossipSub, + peer: PubSubPeer, + ihaves: seq[ControlIHave]): ControlIWant = + if peer.score < g.parameters.gossipThreshold: + trace "ihave: ignoring low score peer", peer, score = peer.score + elif peer.iHaveBudget <= 0: + trace "ihave: ignoring out of budget peer", peer, score = peer.score + else: + var deIhaves = ihaves.deduplicate() + for ihave in deIhaves.mitems: + trace "peer sent ihave", + peer, topic = ihave.topicID, msgs = ihave.messageIDs + if ihave.topicID in g.mesh: + for m in ihave.messageIDs: + let msgId = m & g.randomBytes + if msgId notin g.seen: + if peer.iHaveBudget > 0: + result.messageIDs.add(m) + dec peer.iHaveBudget + else: + return + + # shuffling result.messageIDs before sending it out to increase the likelihood + # of getting an answer if the peer truncates the list due to internal size restrictions. + shuffle(result.messageIDs) + +proc handleIWant*(g: GossipSub, + peer: PubSubPeer, + iwants: seq[ControlIWant]): seq[Message] = + if peer.score < g.parameters.gossipThreshold: + trace "iwant: ignoring low score peer", peer, score = peer.score + elif peer.iWantBudget <= 0: + trace "iwant: ignoring out of budget peer", peer, score = peer.score + else: + var deIwants = iwants.deduplicate() + for iwant in deIwants: + for mid in iwant.messageIDs: + trace "peer sent iwant", peer, messageID = mid + let msg = g.mcache.get(mid) + if msg.isSome: + # avoid spam + if peer.iWantBudget > 0: + result.add(msg.get()) + dec peer.iWantBudget + else: + return + +proc commitMetrics(metrics: var MeshMetrics) = + libp2p_gossipsub_under_dlow_topics.set(metrics.underDlowTopics) + libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics) + libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics) + libp2p_gossipsub_under_dhigh_above_dlow_topics.set(metrics.underDhighAboveDlowTopics) + libp2p_gossipsub_peers_per_topic_gossipsub.set(metrics.otherPeersPerTopicGossipsub, labelValues = ["other"]) + libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"]) + libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"]) + +proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) = + logScope: + topic + mesh = g.mesh.peers(topic) + gossipsub = g.gossipsub.peers(topic) + + trace "rebalancing mesh" + + # create a mesh topic that we're subscribing to + + var + prunes, grafts: seq[PubSubPeer] + npeers = g.mesh.peers(topic) + + if npeers < g.parameters.dLow: + if not isNil(metrics): + inc metrics[].underDlowTopics + + trace "replenishing mesh", peers = npeers + # replenish the mesh if we're below Dlo + var candidates = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ).filterIt( + it.connected and + # avoid negative score peers + it.score >= 0.0 and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin g.backingOff.getOrDefault(topic) + ) + + # shuffle anyway, score might be not used + shuffle(candidates) + + # sort peers by score, high score first since we graft + candidates.sort(byScore, SortOrder.Descending) + + # Graft peers so we reach a count of D + candidates.setLen(min(candidates.len, g.parameters.d - npeers)) + + trace "grafting", grafting = candidates.len + + if candidates.len == 0: + if not isNil(metrics): + inc metrics[].noPeersTopics + else: + for peer in candidates: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafts &= peer + + else: + var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())) + meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound + if meshPeers.len < g.parameters.dOut: + if not isNil(metrics): + inc metrics[].underDoutTopics + + trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic) + + var candidates = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ).filterIt( + it.connected and + # get only outbound ones + it.outbound and + # avoid negative score peers + it.score >= 0.0 and + # don't pick explicit peers + it.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + it.peerId notin g.backingOff.getOrDefault(topic) + ) + + # shuffle anyway, score might be not used + shuffle(candidates) + + # sort peers by score, high score first, we are grafting + candidates.sort(byScore, SortOrder.Descending) + + # Graft peers so we reach a count of D + candidates.setLen(min(candidates.len, g.parameters.dOut)) + + trace "grafting outbound peers", topic, peers = candidates.len + + for peer in candidates: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + g.fanout.removePeer(topic, peer) + grafts &= peer + + + # get again npeers after possible grafts + npeers = g.mesh.peers(topic) + if npeers > g.parameters.dHigh: + if not isNil(metrics): + if g.knownTopics.contains(topic): + libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic]) + else: + libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"]) + + # prune peers if we've gone over Dhi + prunes = toSeq(g.mesh[topic]) + # avoid pruning peers we are currently grafting in this heartbeat + prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts + + # shuffle anyway, score might be not used + shuffle(prunes) + + # sort peers by score (inverted), pruning, so low score peers are on top + prunes.sort(byScore, SortOrder.Ascending) + + # keep high score peers + if prunes.len > g.parameters.dScore: + prunes.setLen(prunes.len - g.parameters.dScore) + + # collect inbound/outbound info + var outbound: seq[PubSubPeer] + var inbound: seq[PubSubPeer] + for peer in prunes: + if peer.outbound: + outbound &= peer + else: + inbound &= peer + + let + meshOutbound = prunes.countIt(it.outbound) + maxOutboundPrunes = meshOutbound - g.parameters.dOut + + # ensure that there are at least D_out peers first and rebalance to g.d after that + outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes))) + + # concat remaining outbound peers + prunes = inbound & outbound + + let pruneLen = prunes.len - g.parameters.d + if pruneLen > 0: + # Ok we got some peers to prune, + # for this heartbeat let's prune those + shuffle(prunes) + prunes.setLen(pruneLen) + + trace "pruning", prunes = prunes.len + for peer in prunes: + trace "pruning peer on rebalance", peer, score = peer.score + g.pruned(peer, topic) + g.mesh.removePeer(topic, peer) + elif npeers > g.parameters.dLow and not isNil(metrics): + inc metrics[].underDhighAboveDlowTopics + + # opportunistic grafting, by spec mesh should not be empty... + if g.mesh.peers(topic) > 1: + var peers = toSeq(g.mesh[topic]) + # grafting so high score has priority + peers.sort(byScore, SortOrder.Descending) + let medianIdx = peers.len div 2 + let median = peers[medianIdx] + if median.score < g.parameters.opportunisticGraftThreshold: + trace "median score below opportunistic threshold", score = median.score + var avail = toSeq( + g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) - + g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()) + ) + + avail.keepIf do (x: PubSubPeer) -> bool: + # avoid negative score peers + x.score >= median.score and + # don't pick explicit peers + x.peerId notin g.parameters.directPeers and + # and avoid peers we are backing off + x.peerId notin g.backingOff.getOrDefault(topic) + + # by spec, grab only 2 + if avail.len > 2: + avail.setLen(2) + + for peer in avail: + if g.mesh.addPeer(topic, peer): + g.grafted(peer, topic) + grafts &= peer + trace "opportunistic grafting", peer + + if not isNil(metrics): + if g.knownTopics.contains(topic): + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout.peers(topic).int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh.peers(topic).int64, labelValues = [topic]) + else: + metrics[].otherPeersPerTopicGossipsub += g.gossipsub.peers(topic).int64 + metrics[].otherPeersPerTopicFanout += g.fanout.peers(topic).int64 + metrics[].otherPeersPerTopicMesh += g.mesh.peers(topic).int64 + + trace "mesh balanced" + + # Send changes to peers after table updates to avoid stale state + if grafts.len > 0: + let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)]))) + g.broadcast(grafts, graft) + if prunes.len > 0: + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: topic, + peers: g.peerExchangeList(topic), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) + g.broadcast(prunes, prune) + +proc dropFanoutPeers*(g: GossipSub) = + # drop peers that we haven't published to in + # GossipSubFanoutTTL seconds + let now = Moment.now() + for topic in toSeq(g.lastFanoutPubSub.keys): + let val = g.lastFanoutPubSub[topic] + if now > val: + g.fanout.del(topic) + g.lastFanoutPubSub.del(topic) + trace "dropping fanout topic", topic + +proc replenishFanout*(g: GossipSub, topic: string) = + ## get fanout peers for a topic + logScope: topic + trace "about to replenish fanout" + + if g.fanout.peers(topic) < g.parameters.dLow: + trace "replenishing fanout", peers = g.fanout.peers(topic) + if topic in g.gossipsub: + for peer in g.gossipsub[topic]: + if g.fanout.addPeer(topic, peer): + if g.fanout.peers(topic) == g.parameters.d: + break + + trace "fanout replenished with peers", peers = g.fanout.peers(topic) + +proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} = + ## gossip iHave messages to peers + ## + + libp2p_gossipsub_cache_window_size.set(0) + + trace "getting gossip peers (iHave)" + let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys)) + for topic in topics: + if topic notin g.gossipsub: + trace "topic not in gossip array, skipping", topicID = topic + continue + + let mids = g.mcache.window(topic) + if not(mids.len > 0): + continue + + var midsSeq = toSeq(mids) + + libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64) + + # not in spec + # similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101 + # and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582 + if midsSeq.len > IHaveMaxLength: + shuffle(midsSeq) + midsSeq.setLen(IHaveMaxLength) + + let + ihave = ControlIHave(topicID: topic, messageIDs: midsSeq) + mesh = g.mesh.getOrDefault(topic) + fanout = g.fanout.getOrDefault(topic) + gossipPeers = mesh + fanout + var allPeers = toSeq(g.gossipsub.getOrDefault(topic)) + + allPeers.keepIf do (x: PubSubPeer) -> bool: + x.peerId notin g.parameters.directPeers and + x notin gossipPeers and + x.score >= g.parameters.gossipThreshold + + var target = g.parameters.dLazy + let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int + if factor > target: + target = min(factor, allPeers.len) + + if target < allPeers.len: + shuffle(allPeers) + allPeers.setLen(target) + + for peer in allPeers: + if peer notin result: + result[peer] = ControlMessage() + result[peer].ihave.add(ihave) + +proc heartbeat*(g: GossipSub) {.async.} = + while g.heartbeatRunning: + try: + trace "running heartbeat", instance = cast[int](g) + + # reset IWANT budget + # reset IHAVE cap + block: + for peer in g.peers.values: + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + + g.updateScores() + + var meshMetrics = MeshMetrics() + + for t in toSeq(g.topics.keys): + # remove expired backoffs + block: + handleBackingOff(g.backingOff, t) + + # prune every negative score peer + # do this before relance + # in order to avoid grafted -> pruned in the same cycle + let meshPeers = g.mesh.getOrDefault(t) + var prunes: seq[PubSubPeer] + for peer in meshPeers: + if peer.score < 0.0: + trace "pruning negative score peer", peer, score = peer.score + g.pruned(peer, t) + g.mesh.removePeer(t, peer) + prunes &= peer + if prunes.len > 0: + let prune = RPCMsg(control: some(ControlMessage( + prune: @[ControlPrune( + topicID: t, + peers: g.peerExchangeList(t), + backoff: g.parameters.pruneBackoff.seconds.uint64)]))) + g.broadcast(prunes, prune) + + # pass by ptr in order to both signal we want to update metrics + # and as well update the struct for each topic during this iteration + g.rebalanceMesh(t, addr meshMetrics) + + commitMetrics(meshMetrics) + + g.dropFanoutPeers() + + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) + + let peers = g.getGossipPeers() + for peer, control in peers: + # only ihave from here + for ihave in control.ihave: + if g.knownTopics.contains(ihave.topicID): + libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID]) + else: + libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"]) + g.send(peer, RPCMsg(control: some(control))) + + g.mcache.shift() # shift the cache + except CancelledError as exc: + raise exc + except CatchableError as exc: + warn "exception ocurred in gossipsub heartbeat", exc = exc.msg, + trace = exc.getStackTrace() + + for trigger in g.heartbeatEvents: + trace "firing heartbeat event", instance = cast[int](g) + trigger.fire() + + await sleepAsync(g.parameters.heartbeatInterval) diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim new file mode 100644 index 000000000..43638ca65 --- /dev/null +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -0,0 +1,277 @@ +## Nim-LibP2P +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/[tables, strutils, sets, algorithm] +import chronos, chronicles, metrics +import "."/[types] +import ".."/[pubsubpeer] +import "../../.."/[peerid, multiaddress, utility, switch] + +declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"]) +declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"]) +declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"]) + +proc initPeerStats*(g: GossipSub, peer: PubSubPeer, stats: PeerStats = PeerStats()) = + var initialStats = stats + initialStats.expire = Moment.now() + g.parameters.retainScore + g.peerStats[peer.peerId] = initialStats + peer.iWantBudget = IWantPeerBudget + peer.iHaveBudget = IHavePeerBudget + +func `/`(a, b: Duration): float64 = + let + fa = float64(a.nanoseconds) + fb = float64(b.nanoseconds) + fa / fb + +func byScore*(x,y: PubSubPeer): int = system.cmp(x.score, y.score) + +proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = + if peer.sendConn == nil: + trace "colocationFactor, no connection", peer + 0.0 + else: + let + address = peer.sendConn.observedAddr + + g.peersInIP.mgetOrPut(address, initHashSet[PubSubPeer]()).incl(peer) + if address notin g.peersInIP: + g.peersInIP[address] = initHashSet[PubSubPeer]() + g.peersInIP[address].incl(peer) + + let + ipPeers = g.peersInIP[address] + len = ipPeers.len.float64 + + if len > g.parameters.ipColocationFactorThreshold: + trace "colocationFactor over threshold", peer, address, len + let over = len - g.parameters.ipColocationFactorThreshold + over * over + else: + 0.0 + +proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} = + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent]) + else: + libp2p_gossipsub_bad_score_disconnection.inc(labelValues = ["unknown"]) + + if peer.sendConn != nil: + try: + await g.switch.disconnect(peer.peerId) + except CancelledError: + raise + except CatchableError as exc: + trace "Failed to close connection", peer, error = exc.name, msg = exc.msg + +proc updateScores*(g: GossipSub) = # avoid async + trace "updating scores", peers = g.peers.len + + let now = Moment.now() + var evicting: seq[PeerID] + + for peerId, stats in g.peerStats.mpairs: + let peer = g.peers.getOrDefault(peerId) + if isNil(peer) or not(peer.connected): + if now > stats.expire: + evicting.add(peerId) + trace "evicted peer from memory", peer = peerId + continue + + trace "updating peer score", peer + + var + n_topics = 0 + is_grafted = 0 + + # Per topic + for topic, topicParams in g.topicParams: + var info = stats.topicInfos.getOrDefault(topic) + inc n_topics + + # if weight is 0.0 avoid wasting time + if topicParams.topicWeight != 0.0: + # Scoring + var topicScore = 0'f64 + + if info.inMesh: + inc is_grafted + info.meshTime = now - info.graftTime + if info.meshTime > topicParams.meshMessageDeliveriesActivation: + info.meshMessageDeliveriesActive = true + + var p1 = info.meshTime / topicParams.timeInMeshQuantum + if p1 > topicParams.timeInMeshCap: + p1 = topicParams.timeInMeshCap + trace "p1", peer, p1, topic, topicScore + topicScore += p1 * topicParams.timeInMeshWeight + else: + info.meshMessageDeliveriesActive = false + + topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight + trace "p2", peer, p2 = info.firstMessageDeliveries, topic, topicScore + + if info.meshMessageDeliveriesActive: + if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold: + let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries + let p3 = deficit * deficit + trace "p3", peer, p3, topic, topicScore + topicScore += p3 * topicParams.meshMessageDeliveriesWeight + + topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight + trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore + + topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight + trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore + + trace "updated peer topic's scores", peer, topic, info, topicScore + + peer.score += topicScore * topicParams.topicWeight + + # Score metrics + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent]) + libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent]) + libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent]) + libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent]) + else: + libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"]) + + # Score decay + info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay + if info.firstMessageDeliveries < g.parameters.decayToZero: + info.firstMessageDeliveries = 0 + + info.meshMessageDeliveries *= topicParams.meshMessageDeliveriesDecay + if info.meshMessageDeliveries < g.parameters.decayToZero: + info.meshMessageDeliveries = 0 + + info.meshFailurePenalty *= topicParams.meshFailurePenaltyDecay + if info.meshFailurePenalty < g.parameters.decayToZero: + info.meshFailurePenalty = 0 + + info.invalidMessageDeliveries *= topicParams.invalidMessageDeliveriesDecay + if info.invalidMessageDeliveries < g.parameters.decayToZero: + info.invalidMessageDeliveries = 0 + + # Wrap up + # commit our changes, mgetOrPut does NOT work as wanted with value types (lent?) + stats.topicInfos[topic] = info + + peer.score += peer.appScore * g.parameters.appSpecificWeight + + peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight + + let colocationFactor = g.colocationFactor(peer) + peer.score += colocationFactor * g.parameters.ipColocationFactorWeight + + # Score metrics + when defined(libp2p_agents_metrics): + let agent = + block: + if peer.shortAgent.len > 0: + peer.shortAgent + else: + if peer.sendConn != nil: + let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii() + if KnownLibP2PAgentsSeq.contains(shortAgent): + peer.shortAgent = shortAgent + else: + peer.shortAgent = "unknown" + peer.shortAgent + else: + "unknown" + libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent]) + libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent]) + libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent]) + else: + libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"]) + libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"]) + + # decay behaviourPenalty + peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay + if peer.behaviourPenalty < g.parameters.decayToZero: + peer.behaviourPenalty = 0 + + # copy into stats the score to keep until expired + stats.score = peer.score + stats.appScore = peer.appScore + stats.behaviourPenalty = peer.behaviourPenalty + stats.expire = Moment.now() + g.parameters.retainScore # refresh expiration + assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check + trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted + + if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold: + debug "disconnecting bad score peer", peer, score = peer.score + asyncSpawn g.disconnectPeer(peer) + + when defined(libp2p_agents_metrics): + libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent]) + else: + libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"]) + + for peer in evicting: + g.peerStats.del(peer) + + trace "updated scores", peers = g.peers.len + +proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) = + for t in topics: + if t notin g.topics: + continue + + # update stats + g.peerStats.withValue(peer.peerId, stats): + stats[].topicInfos.withValue(t, tstats): + tstats[].invalidMessageDeliveries += 1 + do: # if we have no stats populate! + stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) + do: # if we have no stats populate! + g.initPeerStats(peer) do: + var stats = PeerStats() + stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1) + stats diff --git a/libp2p/protocols/pubsub/gossipsub/types.nim b/libp2p/protocols/pubsub/gossipsub/types.nim new file mode 100644 index 000000000..59844ccc5 --- /dev/null +++ b/libp2p/protocols/pubsub/gossipsub/types.nim @@ -0,0 +1,163 @@ +import chronos +import std/[tables, sets] +import ".."/[floodsub, peertable, mcache, pubsubpeer] +import "../rpc"/[messages] +import "../../.."/[peerid, multiaddress] + +const + GossipSubCodec* = "/meshsub/1.1.0" + GossipSubCodec_10* = "/meshsub/1.0.0" + +# overlay parameters +const + GossipSubD* = 6 + GossipSubDlo* = 4 + GossipSubDhi* = 12 + +# gossip parameters +const + GossipSubHistoryLength* = 5 + GossipSubHistoryGossip* = 3 + +# heartbeat interval + GossipSubHeartbeatInterval* = 1.seconds + +# fanout ttl +const + GossipSubFanoutTTL* = 1.minutes + +# gossip parameters +const + GossipBackoffPeriod* = 1.minutes + +const + BackoffSlackTime* = 2 # seconds + IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat ) + IHavePeerBudget* = 10 + # the max amount of IHave to expose, not by spec, but go as example + # rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572 + # go: https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L155 + IHaveMaxLength* = 5000 + +type + TopicInfo* = object + # gossip 1.1 related + graftTime*: Moment + meshTime*: Duration + inMesh*: bool + meshMessageDeliveriesActive*: bool + firstMessageDeliveries*: float64 + meshMessageDeliveries*: float64 + meshFailurePenalty*: float64 + invalidMessageDeliveries*: float64 + + TopicParams* = object + topicWeight*: float64 + + # p1 + timeInMeshWeight*: float64 + timeInMeshQuantum*: Duration + timeInMeshCap*: float64 + + # p2 + firstMessageDeliveriesWeight*: float64 + firstMessageDeliveriesDecay*: float64 + firstMessageDeliveriesCap*: float64 + + # p3 + meshMessageDeliveriesWeight*: float64 + meshMessageDeliveriesDecay*: float64 + meshMessageDeliveriesThreshold*: float64 + meshMessageDeliveriesCap*: float64 + meshMessageDeliveriesActivation*: Duration + meshMessageDeliveriesWindow*: Duration + + # p3b + meshFailurePenaltyWeight*: float64 + meshFailurePenaltyDecay*: float64 + + # p4 + invalidMessageDeliveriesWeight*: float64 + invalidMessageDeliveriesDecay*: float64 + + PeerStats* = object + topicInfos*: Table[string, TopicInfo] + expire*: Moment # updated on disconnect, to retain scores until expire + # the following are copies from PubSubPeer, in order to restore them on re-connection + score*: float64 # a copy of the score to keep in case the peer is disconnected + appScore*: float64 # application specific score + behaviourPenalty*: float64 # the eventual penalty score + + GossipSubParams* = object + explicit*: bool + pruneBackoff*: Duration + floodPublish*: bool + gossipFactor*: float64 + d*: int + dLow*: int + dHigh*: int + dScore*: int + dOut*: int + dLazy*: int + + heartbeatInterval*: Duration + + historyLength*: int + historyGossip*: int + + fanoutTTL*: Duration + seenTTL*: Duration + + gossipThreshold*: float64 + publishThreshold*: float64 + graylistThreshold*: float64 + acceptPXThreshold*: float64 + opportunisticGraftThreshold*: float64 + decayInterval*: Duration + decayToZero*: float64 + retainScore*: Duration + + appSpecificWeight*: float64 + ipColocationFactorWeight*: float64 + ipColocationFactorThreshold*: float64 + behaviourPenaltyWeight*: float64 + behaviourPenaltyDecay*: float64 + + directPeers*: Table[PeerId, seq[MultiAddress]] + + disconnectBadPeers*: bool + + BackoffTable* = Table[string, Table[PeerID, Moment]] + + GossipSub* = ref object of FloodSub + mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic + fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic + gossipsub*: PeerTable # peers that are subscribed to a topic + explicit*: PeerTable # directpeers that we keep alive explicitly + backingOff*: BackoffTable # peers to backoff from when replenishing the mesh + lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics + gossip*: Table[string, seq[ControlIHave]] # pending gossip + control*: Table[string, ControlMessage] # pending control messages + mcache*: MCache # messages cache + heartbeatFut*: Future[void] # cancellation future for heartbeat interval + heartbeatRunning*: bool + + peerStats*: Table[PeerID, PeerStats] + parameters*: GossipSubParams + topicParams*: Table[string, TopicParams] + directPeersLoop*: Future[void] + peersInIP*: Table[MultiAddress, HashSet[PubSubPeer]] + + heartbeatEvents*: seq[AsyncEvent] + + randomBytes*: seq[byte] + + MeshMetrics* = object + # scratch buffers for metrics + otherPeersPerTopicMesh*: int64 + otherPeersPerTopicFanout*: int64 + otherPeersPerTopicGossipsub*: int64 + underDlowTopics*: int64 + underDoutTopics*: int64 + underDhighAboveDlowTopics*: int64 + noPeersTopics*: int64 diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 77e944feb..5f6bf9dee 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -351,8 +351,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = ## messages ## - let peer = p.getOrCreatePeer(peer, p.codecs) - peer.outbound = true # flag as outbound + discard p.getOrCreatePeer(peer, p.codecs) proc updateTopicMetrics(p: PubSub, topic: string) = # metrics diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 0ef2013f9..5c816a373 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -55,7 +55,6 @@ type score*: float64 iWantBudget*: int iHaveBudget*: int - outbound*: bool # if this is an outbound connection appScore*: float64 # application specific score behaviourPenalty*: float64 # the eventual penalty score @@ -80,6 +79,12 @@ proc connected*(p: PubSubPeer): bool = proc hasObservers(p: PubSubPeer): bool = p.observers != nil and anyIt(p.observers[], it != nil) +func outbound*(p: PubSubPeer): bool = + if p.connected and p.sendConn.dir == Direction.Out: + true + else: + false + proc recvObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not(isNil(p.observers)) and p.observers[].len > 0: