mirror of
https://github.com/codex-storage/nim-libp2p.git
synced 2025-01-12 20:14:09 +00:00
Refactor gossipsub into multiple modules (#515)
* Refactor gossipsub into multiple modules * splitup further gossipsub * move more mesh related stuff to behavior * fix internal tests * fix PubSubPeer.outbound flag, make it more reliable * use discard rather then _
This commit is contained in:
parent
3213e5d377
commit
fd73cf9f9d
File diff suppressed because it is too large
Load Diff
613
libp2p/protocols/pubsub/gossipsub/behavior.nim
Normal file
613
libp2p/protocols/pubsub/gossipsub/behavior.nim
Normal file
@ -0,0 +1,613 @@
|
|||||||
|
## Nim-LibP2P
|
||||||
|
## Copyright (c) 2021 Status Research & Development GmbH
|
||||||
|
## Licensed under either of
|
||||||
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
## at your option.
|
||||||
|
## This file may not be copied, modified, or distributed except according to
|
||||||
|
## those terms.
|
||||||
|
|
||||||
|
import std/[tables, strutils, sequtils, sets, algorithm]
|
||||||
|
import random # for shuffle
|
||||||
|
import chronos, chronicles, metrics
|
||||||
|
import "."/[types, scoring]
|
||||||
|
import ".."/[pubsubpeer, peertable, timedcache, mcache, pubsub]
|
||||||
|
import "../rpc"/[messages]
|
||||||
|
import "../../.."/[peerid, multiaddress, utility, switch]
|
||||||
|
|
||||||
|
declareGauge(libp2p_gossipsub_cache_window_size, "the number of messages in the cache")
|
||||||
|
declareGauge(libp2p_gossipsub_peers_per_topic_mesh, "gossipsub peers per topic in mesh", labels = ["topic"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_per_topic_fanout, "gossipsub peers per topic in fanout", labels = ["topic"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_per_topic_gossipsub, "gossipsub peers per topic in gossipsub", labels = ["topic"])
|
||||||
|
declareGauge(libp2p_gossipsub_under_dlow_topics, "number of topics below dlow")
|
||||||
|
declareGauge(libp2p_gossipsub_under_dout_topics, "number of topics below dout")
|
||||||
|
declareGauge(libp2p_gossipsub_under_dhigh_above_dlow_topics, "number of topics below dhigh but above dlow")
|
||||||
|
declareGauge(libp2p_gossipsub_no_peers_topics, "number of topics without peers available")
|
||||||
|
declareCounter(libp2p_gossipsub_above_dhigh_condition, "number of above dhigh pruning branches ran", labels = ["topic"])
|
||||||
|
|
||||||
|
proc grafted*(g: GossipSub, p: PubSubPeer, topic: string) =
|
||||||
|
g.peerStats.withValue(p.peerId, stats):
|
||||||
|
var info = stats.topicInfos.getOrDefault(topic)
|
||||||
|
info.graftTime = Moment.now()
|
||||||
|
info.meshTime = 0.seconds
|
||||||
|
info.inMesh = true
|
||||||
|
info.meshMessageDeliveriesActive = false
|
||||||
|
|
||||||
|
# mgetOrPut does not work, so we gotta do this without referencing
|
||||||
|
stats.topicInfos[topic] = info
|
||||||
|
assert(g.peerStats[p.peerId].topicInfos[topic].inMesh == true)
|
||||||
|
|
||||||
|
trace "grafted", peer=p, topic
|
||||||
|
do:
|
||||||
|
g.initPeerStats(p)
|
||||||
|
g.grafted(p, topic)
|
||||||
|
|
||||||
|
proc pruned*(g: GossipSub, p: PubSubPeer, topic: string) =
|
||||||
|
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
|
||||||
|
g.backingOff
|
||||||
|
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||||
|
.mgetOrPut(p.peerId, backoff) = backoff
|
||||||
|
|
||||||
|
g.peerStats.withValue(p.peerId, stats):
|
||||||
|
if topic in stats.topicInfos:
|
||||||
|
var info = stats.topicInfos[topic]
|
||||||
|
if topic in g.topicParams:
|
||||||
|
let topicParams = g.topicParams[topic]
|
||||||
|
# penalize a peer that delivered no message
|
||||||
|
let threshold = topicParams.meshMessageDeliveriesThreshold
|
||||||
|
if info.inMesh and info.meshMessageDeliveriesActive and info.meshMessageDeliveries < threshold:
|
||||||
|
let deficit = threshold - info.meshMessageDeliveries
|
||||||
|
info.meshFailurePenalty += deficit * deficit
|
||||||
|
|
||||||
|
info.inMesh = false
|
||||||
|
|
||||||
|
# mgetOrPut does not work, so we gotta do this without referencing
|
||||||
|
stats.topicInfos[topic] = info
|
||||||
|
|
||||||
|
trace "pruned", peer=p, topic
|
||||||
|
|
||||||
|
proc handleBackingOff*(t: var BackoffTable, topic: string) =
|
||||||
|
let now = Moment.now()
|
||||||
|
var expired = toSeq(t.getOrDefault(topic).pairs())
|
||||||
|
expired.keepIf do (pair: tuple[peer: PeerID, expire: Moment]) -> bool:
|
||||||
|
now >= pair.expire
|
||||||
|
for (peer, _) in expired:
|
||||||
|
t.mgetOrPut(topic, initTable[PeerID, Moment]()).del(peer)
|
||||||
|
|
||||||
|
proc peerExchangeList*(g: GossipSub, topic: string): seq[PeerInfoMsg] =
|
||||||
|
var peers = g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()).toSeq()
|
||||||
|
peers.keepIf do (x: PubSubPeer) -> bool:
|
||||||
|
x.score >= 0.0
|
||||||
|
# by spec, larger then Dhi, but let's put some hard caps
|
||||||
|
peers.setLen(min(peers.len, g.parameters.dHigh * 2))
|
||||||
|
peers.map do (x: PubSubPeer) -> PeerInfoMsg:
|
||||||
|
PeerInfoMsg(peerID: x.peerId.getBytes())
|
||||||
|
|
||||||
|
proc handleGraft*(g: GossipSub,
|
||||||
|
peer: PubSubPeer,
|
||||||
|
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
||||||
|
for graft in grafts:
|
||||||
|
let topic = graft.topicID
|
||||||
|
logScope:
|
||||||
|
peer
|
||||||
|
topic
|
||||||
|
|
||||||
|
trace "peer grafted topic"
|
||||||
|
|
||||||
|
# It is an error to GRAFT on a explicit peer
|
||||||
|
if peer.peerId in g.parameters.directPeers:
|
||||||
|
# receiving a graft from a direct peer should yield a more prominent warning (protocol violation)
|
||||||
|
warn "attempt to graft an explicit peer", peer=peer.peerId,
|
||||||
|
topic
|
||||||
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
|
result.add(ControlPrune(
|
||||||
|
topicID: topic,
|
||||||
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
|
||||||
|
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
|
||||||
|
g.backingOff
|
||||||
|
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||||
|
.mgetOrPut(peer.peerId, backoff) = backoff
|
||||||
|
|
||||||
|
peer.behaviourPenalty += 0.1
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
if g.backingOff
|
||||||
|
.getOrDefault(topic)
|
||||||
|
.getOrDefault(peer.peerId) > Moment.now():
|
||||||
|
warn "attempt to graft a backingOff peer", peer=peer.peerId,
|
||||||
|
topic
|
||||||
|
# and such an attempt should be logged and rejected with a PRUNE
|
||||||
|
result.add(ControlPrune(
|
||||||
|
topicID: topic,
|
||||||
|
peers: @[], # omitting heavy computation here as the remote did something illegal
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
|
||||||
|
let backoff = Moment.fromNow(g.parameters.pruneBackoff)
|
||||||
|
g.backingOff
|
||||||
|
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||||
|
.mgetOrPut(peer.peerId, backoff) = backoff
|
||||||
|
|
||||||
|
peer.behaviourPenalty += 0.1
|
||||||
|
|
||||||
|
continue
|
||||||
|
|
||||||
|
if peer.peerId notin g.peerStats:
|
||||||
|
g.initPeerStats(peer)
|
||||||
|
|
||||||
|
# not in the spec exactly, but let's avoid way too low score peers
|
||||||
|
# other clients do it too also was an audit recommendation
|
||||||
|
if peer.score < g.parameters.publishThreshold:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# If they send us a graft before they send us a subscribe, what should
|
||||||
|
# we do? For now, we add them to mesh but don't add them to gossipsub.
|
||||||
|
if topic in g.topics:
|
||||||
|
if g.mesh.peers(topic) < g.parameters.dHigh or peer.outbound:
|
||||||
|
# In the spec, there's no mention of DHi here, but implicitly, a
|
||||||
|
# peer will be removed from the mesh on next rebalance, so we don't want
|
||||||
|
# this peer to push someone else out
|
||||||
|
if g.mesh.addPeer(topic, peer):
|
||||||
|
g.grafted(peer, topic)
|
||||||
|
g.fanout.removePeer(topic, peer)
|
||||||
|
else:
|
||||||
|
trace "peer already in mesh"
|
||||||
|
else:
|
||||||
|
trace "pruning grafting peer, mesh full", peer, score = peer.score, mesh = g.mesh.peers(topic)
|
||||||
|
result.add(ControlPrune(
|
||||||
|
topicID: topic,
|
||||||
|
peers: g.peerExchangeList(topic),
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64))
|
||||||
|
else:
|
||||||
|
trace "peer grafting topic we're not interested in", topic
|
||||||
|
# gossip 1.1, we do not send a control message prune anymore
|
||||||
|
|
||||||
|
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
|
||||||
|
for prune in prunes:
|
||||||
|
let topic = prune.topicID
|
||||||
|
|
||||||
|
trace "peer pruned topic", peer, topic
|
||||||
|
|
||||||
|
# add peer backoff
|
||||||
|
if prune.backoff > 0:
|
||||||
|
let
|
||||||
|
backoff = Moment.fromNow((prune.backoff + BackoffSlackTime).int64.seconds)
|
||||||
|
current = g.backingOff.getOrDefault(topic).getOrDefault(peer.peerId)
|
||||||
|
if backoff > current:
|
||||||
|
g.backingOff
|
||||||
|
.mgetOrPut(topic, initTable[PeerID, Moment]())
|
||||||
|
.mgetOrPut(peer.peerId, backoff) = backoff
|
||||||
|
|
||||||
|
trace "pruning rpc received peer", peer, score = peer.score
|
||||||
|
g.pruned(peer, topic)
|
||||||
|
g.mesh.removePeer(topic, peer)
|
||||||
|
|
||||||
|
# TODO peer exchange, we miss ambient peer discovery in libp2p, so we are blocked by that
|
||||||
|
# another option could be to implement signed peer records
|
||||||
|
## if peer.score > g.parameters.gossipThreshold and prunes.peers.len > 0:
|
||||||
|
|
||||||
|
proc handleIHave*(g: GossipSub,
|
||||||
|
peer: PubSubPeer,
|
||||||
|
ihaves: seq[ControlIHave]): ControlIWant =
|
||||||
|
if peer.score < g.parameters.gossipThreshold:
|
||||||
|
trace "ihave: ignoring low score peer", peer, score = peer.score
|
||||||
|
elif peer.iHaveBudget <= 0:
|
||||||
|
trace "ihave: ignoring out of budget peer", peer, score = peer.score
|
||||||
|
else:
|
||||||
|
var deIhaves = ihaves.deduplicate()
|
||||||
|
for ihave in deIhaves.mitems:
|
||||||
|
trace "peer sent ihave",
|
||||||
|
peer, topic = ihave.topicID, msgs = ihave.messageIDs
|
||||||
|
if ihave.topicID in g.mesh:
|
||||||
|
for m in ihave.messageIDs:
|
||||||
|
let msgId = m & g.randomBytes
|
||||||
|
if msgId notin g.seen:
|
||||||
|
if peer.iHaveBudget > 0:
|
||||||
|
result.messageIDs.add(m)
|
||||||
|
dec peer.iHaveBudget
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
# shuffling result.messageIDs before sending it out to increase the likelihood
|
||||||
|
# of getting an answer if the peer truncates the list due to internal size restrictions.
|
||||||
|
shuffle(result.messageIDs)
|
||||||
|
|
||||||
|
proc handleIWant*(g: GossipSub,
|
||||||
|
peer: PubSubPeer,
|
||||||
|
iwants: seq[ControlIWant]): seq[Message] =
|
||||||
|
if peer.score < g.parameters.gossipThreshold:
|
||||||
|
trace "iwant: ignoring low score peer", peer, score = peer.score
|
||||||
|
elif peer.iWantBudget <= 0:
|
||||||
|
trace "iwant: ignoring out of budget peer", peer, score = peer.score
|
||||||
|
else:
|
||||||
|
var deIwants = iwants.deduplicate()
|
||||||
|
for iwant in deIwants:
|
||||||
|
for mid in iwant.messageIDs:
|
||||||
|
trace "peer sent iwant", peer, messageID = mid
|
||||||
|
let msg = g.mcache.get(mid)
|
||||||
|
if msg.isSome:
|
||||||
|
# avoid spam
|
||||||
|
if peer.iWantBudget > 0:
|
||||||
|
result.add(msg.get())
|
||||||
|
dec peer.iWantBudget
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
proc commitMetrics(metrics: var MeshMetrics) =
|
||||||
|
libp2p_gossipsub_under_dlow_topics.set(metrics.underDlowTopics)
|
||||||
|
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
|
||||||
|
libp2p_gossipsub_under_dout_topics.set(metrics.underDoutTopics)
|
||||||
|
libp2p_gossipsub_under_dhigh_above_dlow_topics.set(metrics.underDhighAboveDlowTopics)
|
||||||
|
libp2p_gossipsub_peers_per_topic_gossipsub.set(metrics.otherPeersPerTopicGossipsub, labelValues = ["other"])
|
||||||
|
libp2p_gossipsub_peers_per_topic_fanout.set(metrics.otherPeersPerTopicFanout, labelValues = ["other"])
|
||||||
|
libp2p_gossipsub_peers_per_topic_mesh.set(metrics.otherPeersPerTopicMesh, labelValues = ["other"])
|
||||||
|
|
||||||
|
proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil) =
|
||||||
|
logScope:
|
||||||
|
topic
|
||||||
|
mesh = g.mesh.peers(topic)
|
||||||
|
gossipsub = g.gossipsub.peers(topic)
|
||||||
|
|
||||||
|
trace "rebalancing mesh"
|
||||||
|
|
||||||
|
# create a mesh topic that we're subscribing to
|
||||||
|
|
||||||
|
var
|
||||||
|
prunes, grafts: seq[PubSubPeer]
|
||||||
|
npeers = g.mesh.peers(topic)
|
||||||
|
|
||||||
|
if npeers < g.parameters.dLow:
|
||||||
|
if not isNil(metrics):
|
||||||
|
inc metrics[].underDlowTopics
|
||||||
|
|
||||||
|
trace "replenishing mesh", peers = npeers
|
||||||
|
# replenish the mesh if we're below Dlo
|
||||||
|
var candidates = toSeq(
|
||||||
|
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||||
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
|
).filterIt(
|
||||||
|
it.connected and
|
||||||
|
# avoid negative score peers
|
||||||
|
it.score >= 0.0 and
|
||||||
|
# don't pick explicit peers
|
||||||
|
it.peerId notin g.parameters.directPeers and
|
||||||
|
# and avoid peers we are backing off
|
||||||
|
it.peerId notin g.backingOff.getOrDefault(topic)
|
||||||
|
)
|
||||||
|
|
||||||
|
# shuffle anyway, score might be not used
|
||||||
|
shuffle(candidates)
|
||||||
|
|
||||||
|
# sort peers by score, high score first since we graft
|
||||||
|
candidates.sort(byScore, SortOrder.Descending)
|
||||||
|
|
||||||
|
# Graft peers so we reach a count of D
|
||||||
|
candidates.setLen(min(candidates.len, g.parameters.d - npeers))
|
||||||
|
|
||||||
|
trace "grafting", grafting = candidates.len
|
||||||
|
|
||||||
|
if candidates.len == 0:
|
||||||
|
if not isNil(metrics):
|
||||||
|
inc metrics[].noPeersTopics
|
||||||
|
else:
|
||||||
|
for peer in candidates:
|
||||||
|
if g.mesh.addPeer(topic, peer):
|
||||||
|
g.grafted(peer, topic)
|
||||||
|
g.fanout.removePeer(topic, peer)
|
||||||
|
grafts &= peer
|
||||||
|
|
||||||
|
else:
|
||||||
|
var meshPeers = toSeq(g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]()))
|
||||||
|
meshPeers.keepIf do (x: PubSubPeer) -> bool: x.outbound
|
||||||
|
if meshPeers.len < g.parameters.dOut:
|
||||||
|
if not isNil(metrics):
|
||||||
|
inc metrics[].underDoutTopics
|
||||||
|
|
||||||
|
trace "replenishing mesh outbound quota", peers = g.mesh.peers(topic)
|
||||||
|
|
||||||
|
var candidates = toSeq(
|
||||||
|
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||||
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
|
).filterIt(
|
||||||
|
it.connected and
|
||||||
|
# get only outbound ones
|
||||||
|
it.outbound and
|
||||||
|
# avoid negative score peers
|
||||||
|
it.score >= 0.0 and
|
||||||
|
# don't pick explicit peers
|
||||||
|
it.peerId notin g.parameters.directPeers and
|
||||||
|
# and avoid peers we are backing off
|
||||||
|
it.peerId notin g.backingOff.getOrDefault(topic)
|
||||||
|
)
|
||||||
|
|
||||||
|
# shuffle anyway, score might be not used
|
||||||
|
shuffle(candidates)
|
||||||
|
|
||||||
|
# sort peers by score, high score first, we are grafting
|
||||||
|
candidates.sort(byScore, SortOrder.Descending)
|
||||||
|
|
||||||
|
# Graft peers so we reach a count of D
|
||||||
|
candidates.setLen(min(candidates.len, g.parameters.dOut))
|
||||||
|
|
||||||
|
trace "grafting outbound peers", topic, peers = candidates.len
|
||||||
|
|
||||||
|
for peer in candidates:
|
||||||
|
if g.mesh.addPeer(topic, peer):
|
||||||
|
g.grafted(peer, topic)
|
||||||
|
g.fanout.removePeer(topic, peer)
|
||||||
|
grafts &= peer
|
||||||
|
|
||||||
|
|
||||||
|
# get again npeers after possible grafts
|
||||||
|
npeers = g.mesh.peers(topic)
|
||||||
|
if npeers > g.parameters.dHigh:
|
||||||
|
if not isNil(metrics):
|
||||||
|
if g.knownTopics.contains(topic):
|
||||||
|
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = [topic])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_above_dhigh_condition.inc(labelValues = ["other"])
|
||||||
|
|
||||||
|
# prune peers if we've gone over Dhi
|
||||||
|
prunes = toSeq(g.mesh[topic])
|
||||||
|
# avoid pruning peers we are currently grafting in this heartbeat
|
||||||
|
prunes.keepIf do (x: PubSubPeer) -> bool: x notin grafts
|
||||||
|
|
||||||
|
# shuffle anyway, score might be not used
|
||||||
|
shuffle(prunes)
|
||||||
|
|
||||||
|
# sort peers by score (inverted), pruning, so low score peers are on top
|
||||||
|
prunes.sort(byScore, SortOrder.Ascending)
|
||||||
|
|
||||||
|
# keep high score peers
|
||||||
|
if prunes.len > g.parameters.dScore:
|
||||||
|
prunes.setLen(prunes.len - g.parameters.dScore)
|
||||||
|
|
||||||
|
# collect inbound/outbound info
|
||||||
|
var outbound: seq[PubSubPeer]
|
||||||
|
var inbound: seq[PubSubPeer]
|
||||||
|
for peer in prunes:
|
||||||
|
if peer.outbound:
|
||||||
|
outbound &= peer
|
||||||
|
else:
|
||||||
|
inbound &= peer
|
||||||
|
|
||||||
|
let
|
||||||
|
meshOutbound = prunes.countIt(it.outbound)
|
||||||
|
maxOutboundPrunes = meshOutbound - g.parameters.dOut
|
||||||
|
|
||||||
|
# ensure that there are at least D_out peers first and rebalance to g.d after that
|
||||||
|
outbound.setLen(min(outbound.len, max(0, maxOutboundPrunes)))
|
||||||
|
|
||||||
|
# concat remaining outbound peers
|
||||||
|
prunes = inbound & outbound
|
||||||
|
|
||||||
|
let pruneLen = prunes.len - g.parameters.d
|
||||||
|
if pruneLen > 0:
|
||||||
|
# Ok we got some peers to prune,
|
||||||
|
# for this heartbeat let's prune those
|
||||||
|
shuffle(prunes)
|
||||||
|
prunes.setLen(pruneLen)
|
||||||
|
|
||||||
|
trace "pruning", prunes = prunes.len
|
||||||
|
for peer in prunes:
|
||||||
|
trace "pruning peer on rebalance", peer, score = peer.score
|
||||||
|
g.pruned(peer, topic)
|
||||||
|
g.mesh.removePeer(topic, peer)
|
||||||
|
elif npeers > g.parameters.dLow and not isNil(metrics):
|
||||||
|
inc metrics[].underDhighAboveDlowTopics
|
||||||
|
|
||||||
|
# opportunistic grafting, by spec mesh should not be empty...
|
||||||
|
if g.mesh.peers(topic) > 1:
|
||||||
|
var peers = toSeq(g.mesh[topic])
|
||||||
|
# grafting so high score has priority
|
||||||
|
peers.sort(byScore, SortOrder.Descending)
|
||||||
|
let medianIdx = peers.len div 2
|
||||||
|
let median = peers[medianIdx]
|
||||||
|
if median.score < g.parameters.opportunisticGraftThreshold:
|
||||||
|
trace "median score below opportunistic threshold", score = median.score
|
||||||
|
var avail = toSeq(
|
||||||
|
g.gossipsub.getOrDefault(topic, initHashSet[PubSubPeer]()) -
|
||||||
|
g.mesh.getOrDefault(topic, initHashSet[PubSubPeer]())
|
||||||
|
)
|
||||||
|
|
||||||
|
avail.keepIf do (x: PubSubPeer) -> bool:
|
||||||
|
# avoid negative score peers
|
||||||
|
x.score >= median.score and
|
||||||
|
# don't pick explicit peers
|
||||||
|
x.peerId notin g.parameters.directPeers and
|
||||||
|
# and avoid peers we are backing off
|
||||||
|
x.peerId notin g.backingOff.getOrDefault(topic)
|
||||||
|
|
||||||
|
# by spec, grab only 2
|
||||||
|
if avail.len > 2:
|
||||||
|
avail.setLen(2)
|
||||||
|
|
||||||
|
for peer in avail:
|
||||||
|
if g.mesh.addPeer(topic, peer):
|
||||||
|
g.grafted(peer, topic)
|
||||||
|
grafts &= peer
|
||||||
|
trace "opportunistic grafting", peer
|
||||||
|
|
||||||
|
if not isNil(metrics):
|
||||||
|
if g.knownTopics.contains(topic):
|
||||||
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
.set(g.gossipsub.peers(topic).int64, labelValues = [topic])
|
||||||
|
libp2p_gossipsub_peers_per_topic_fanout
|
||||||
|
.set(g.fanout.peers(topic).int64, labelValues = [topic])
|
||||||
|
libp2p_gossipsub_peers_per_topic_mesh
|
||||||
|
.set(g.mesh.peers(topic).int64, labelValues = [topic])
|
||||||
|
else:
|
||||||
|
metrics[].otherPeersPerTopicGossipsub += g.gossipsub.peers(topic).int64
|
||||||
|
metrics[].otherPeersPerTopicFanout += g.fanout.peers(topic).int64
|
||||||
|
metrics[].otherPeersPerTopicMesh += g.mesh.peers(topic).int64
|
||||||
|
|
||||||
|
trace "mesh balanced"
|
||||||
|
|
||||||
|
# Send changes to peers after table updates to avoid stale state
|
||||||
|
if grafts.len > 0:
|
||||||
|
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
|
||||||
|
g.broadcast(grafts, graft)
|
||||||
|
if prunes.len > 0:
|
||||||
|
let prune = RPCMsg(control: some(ControlMessage(
|
||||||
|
prune: @[ControlPrune(
|
||||||
|
topicID: topic,
|
||||||
|
peers: g.peerExchangeList(topic),
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
|
||||||
|
g.broadcast(prunes, prune)
|
||||||
|
|
||||||
|
proc dropFanoutPeers*(g: GossipSub) =
|
||||||
|
# drop peers that we haven't published to in
|
||||||
|
# GossipSubFanoutTTL seconds
|
||||||
|
let now = Moment.now()
|
||||||
|
for topic in toSeq(g.lastFanoutPubSub.keys):
|
||||||
|
let val = g.lastFanoutPubSub[topic]
|
||||||
|
if now > val:
|
||||||
|
g.fanout.del(topic)
|
||||||
|
g.lastFanoutPubSub.del(topic)
|
||||||
|
trace "dropping fanout topic", topic
|
||||||
|
|
||||||
|
proc replenishFanout*(g: GossipSub, topic: string) =
|
||||||
|
## get fanout peers for a topic
|
||||||
|
logScope: topic
|
||||||
|
trace "about to replenish fanout"
|
||||||
|
|
||||||
|
if g.fanout.peers(topic) < g.parameters.dLow:
|
||||||
|
trace "replenishing fanout", peers = g.fanout.peers(topic)
|
||||||
|
if topic in g.gossipsub:
|
||||||
|
for peer in g.gossipsub[topic]:
|
||||||
|
if g.fanout.addPeer(topic, peer):
|
||||||
|
if g.fanout.peers(topic) == g.parameters.d:
|
||||||
|
break
|
||||||
|
|
||||||
|
trace "fanout replenished with peers", peers = g.fanout.peers(topic)
|
||||||
|
|
||||||
|
proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] {.gcsafe.} =
|
||||||
|
## gossip iHave messages to peers
|
||||||
|
##
|
||||||
|
|
||||||
|
libp2p_gossipsub_cache_window_size.set(0)
|
||||||
|
|
||||||
|
trace "getting gossip peers (iHave)"
|
||||||
|
let topics = toHashSet(toSeq(g.mesh.keys)) + toHashSet(toSeq(g.fanout.keys))
|
||||||
|
for topic in topics:
|
||||||
|
if topic notin g.gossipsub:
|
||||||
|
trace "topic not in gossip array, skipping", topicID = topic
|
||||||
|
continue
|
||||||
|
|
||||||
|
let mids = g.mcache.window(topic)
|
||||||
|
if not(mids.len > 0):
|
||||||
|
continue
|
||||||
|
|
||||||
|
var midsSeq = toSeq(mids)
|
||||||
|
|
||||||
|
libp2p_gossipsub_cache_window_size.inc(midsSeq.len.int64)
|
||||||
|
|
||||||
|
# not in spec
|
||||||
|
# similar to rust: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/behaviour.rs#L2101
|
||||||
|
# and go https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L582
|
||||||
|
if midsSeq.len > IHaveMaxLength:
|
||||||
|
shuffle(midsSeq)
|
||||||
|
midsSeq.setLen(IHaveMaxLength)
|
||||||
|
|
||||||
|
let
|
||||||
|
ihave = ControlIHave(topicID: topic, messageIDs: midsSeq)
|
||||||
|
mesh = g.mesh.getOrDefault(topic)
|
||||||
|
fanout = g.fanout.getOrDefault(topic)
|
||||||
|
gossipPeers = mesh + fanout
|
||||||
|
var allPeers = toSeq(g.gossipsub.getOrDefault(topic))
|
||||||
|
|
||||||
|
allPeers.keepIf do (x: PubSubPeer) -> bool:
|
||||||
|
x.peerId notin g.parameters.directPeers and
|
||||||
|
x notin gossipPeers and
|
||||||
|
x.score >= g.parameters.gossipThreshold
|
||||||
|
|
||||||
|
var target = g.parameters.dLazy
|
||||||
|
let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
|
||||||
|
if factor > target:
|
||||||
|
target = min(factor, allPeers.len)
|
||||||
|
|
||||||
|
if target < allPeers.len:
|
||||||
|
shuffle(allPeers)
|
||||||
|
allPeers.setLen(target)
|
||||||
|
|
||||||
|
for peer in allPeers:
|
||||||
|
if peer notin result:
|
||||||
|
result[peer] = ControlMessage()
|
||||||
|
result[peer].ihave.add(ihave)
|
||||||
|
|
||||||
|
proc heartbeat*(g: GossipSub) {.async.} =
|
||||||
|
while g.heartbeatRunning:
|
||||||
|
try:
|
||||||
|
trace "running heartbeat", instance = cast[int](g)
|
||||||
|
|
||||||
|
# reset IWANT budget
|
||||||
|
# reset IHAVE cap
|
||||||
|
block:
|
||||||
|
for peer in g.peers.values:
|
||||||
|
peer.iWantBudget = IWantPeerBudget
|
||||||
|
peer.iHaveBudget = IHavePeerBudget
|
||||||
|
|
||||||
|
g.updateScores()
|
||||||
|
|
||||||
|
var meshMetrics = MeshMetrics()
|
||||||
|
|
||||||
|
for t in toSeq(g.topics.keys):
|
||||||
|
# remove expired backoffs
|
||||||
|
block:
|
||||||
|
handleBackingOff(g.backingOff, t)
|
||||||
|
|
||||||
|
# prune every negative score peer
|
||||||
|
# do this before relance
|
||||||
|
# in order to avoid grafted -> pruned in the same cycle
|
||||||
|
let meshPeers = g.mesh.getOrDefault(t)
|
||||||
|
var prunes: seq[PubSubPeer]
|
||||||
|
for peer in meshPeers:
|
||||||
|
if peer.score < 0.0:
|
||||||
|
trace "pruning negative score peer", peer, score = peer.score
|
||||||
|
g.pruned(peer, t)
|
||||||
|
g.mesh.removePeer(t, peer)
|
||||||
|
prunes &= peer
|
||||||
|
if prunes.len > 0:
|
||||||
|
let prune = RPCMsg(control: some(ControlMessage(
|
||||||
|
prune: @[ControlPrune(
|
||||||
|
topicID: t,
|
||||||
|
peers: g.peerExchangeList(t),
|
||||||
|
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
|
||||||
|
g.broadcast(prunes, prune)
|
||||||
|
|
||||||
|
# pass by ptr in order to both signal we want to update metrics
|
||||||
|
# and as well update the struct for each topic during this iteration
|
||||||
|
g.rebalanceMesh(t, addr meshMetrics)
|
||||||
|
|
||||||
|
commitMetrics(meshMetrics)
|
||||||
|
|
||||||
|
g.dropFanoutPeers()
|
||||||
|
|
||||||
|
# replenish known topics to the fanout
|
||||||
|
for t in toSeq(g.fanout.keys):
|
||||||
|
g.replenishFanout(t)
|
||||||
|
|
||||||
|
let peers = g.getGossipPeers()
|
||||||
|
for peer, control in peers:
|
||||||
|
# only ihave from here
|
||||||
|
for ihave in control.ihave:
|
||||||
|
if g.knownTopics.contains(ihave.topicID):
|
||||||
|
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicID])
|
||||||
|
else:
|
||||||
|
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
|
||||||
|
g.send(peer, RPCMsg(control: some(control)))
|
||||||
|
|
||||||
|
g.mcache.shift() # shift the cache
|
||||||
|
except CancelledError as exc:
|
||||||
|
raise exc
|
||||||
|
except CatchableError as exc:
|
||||||
|
warn "exception ocurred in gossipsub heartbeat", exc = exc.msg,
|
||||||
|
trace = exc.getStackTrace()
|
||||||
|
|
||||||
|
for trigger in g.heartbeatEvents:
|
||||||
|
trace "firing heartbeat event", instance = cast[int](g)
|
||||||
|
trigger.fire()
|
||||||
|
|
||||||
|
await sleepAsync(g.parameters.heartbeatInterval)
|
277
libp2p/protocols/pubsub/gossipsub/scoring.nim
Normal file
277
libp2p/protocols/pubsub/gossipsub/scoring.nim
Normal file
@ -0,0 +1,277 @@
|
|||||||
|
## Nim-LibP2P
|
||||||
|
## Copyright (c) 2021 Status Research & Development GmbH
|
||||||
|
## Licensed under either of
|
||||||
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||||
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||||
|
## at your option.
|
||||||
|
## This file may not be copied, modified, or distributed except according to
|
||||||
|
## those terms.
|
||||||
|
|
||||||
|
import std/[tables, strutils, sets, algorithm]
|
||||||
|
import chronos, chronicles, metrics
|
||||||
|
import "."/[types]
|
||||||
|
import ".."/[pubsubpeer]
|
||||||
|
import "../../.."/[peerid, multiaddress, utility, switch]
|
||||||
|
|
||||||
|
declareGauge(libp2p_gossipsub_peers_scores, "the scores of the peers in gossipsub", labels = ["agent"])
|
||||||
|
declareCounter(libp2p_gossipsub_bad_score_disconnection, "the number of peers disconnected by gossipsub", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_firstMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_meshMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_meshFailurePenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_invalidMessageDeliveries, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_appScore, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_behaviourPenalty, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
declareGauge(libp2p_gossipsub_peers_score_colocationFactor, "Detailed gossipsub scoring metric", labels = ["agent"])
|
||||||
|
|
||||||
|
proc initPeerStats*(g: GossipSub, peer: PubSubPeer, stats: PeerStats = PeerStats()) =
|
||||||
|
var initialStats = stats
|
||||||
|
initialStats.expire = Moment.now() + g.parameters.retainScore
|
||||||
|
g.peerStats[peer.peerId] = initialStats
|
||||||
|
peer.iWantBudget = IWantPeerBudget
|
||||||
|
peer.iHaveBudget = IHavePeerBudget
|
||||||
|
|
||||||
|
func `/`(a, b: Duration): float64 =
|
||||||
|
let
|
||||||
|
fa = float64(a.nanoseconds)
|
||||||
|
fb = float64(b.nanoseconds)
|
||||||
|
fa / fb
|
||||||
|
|
||||||
|
func byScore*(x,y: PubSubPeer): int = system.cmp(x.score, y.score)
|
||||||
|
|
||||||
|
proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
|
||||||
|
if peer.sendConn == nil:
|
||||||
|
trace "colocationFactor, no connection", peer
|
||||||
|
0.0
|
||||||
|
else:
|
||||||
|
let
|
||||||
|
address = peer.sendConn.observedAddr
|
||||||
|
|
||||||
|
g.peersInIP.mgetOrPut(address, initHashSet[PubSubPeer]()).incl(peer)
|
||||||
|
if address notin g.peersInIP:
|
||||||
|
g.peersInIP[address] = initHashSet[PubSubPeer]()
|
||||||
|
g.peersInIP[address].incl(peer)
|
||||||
|
|
||||||
|
let
|
||||||
|
ipPeers = g.peersInIP[address]
|
||||||
|
len = ipPeers.len.float64
|
||||||
|
|
||||||
|
if len > g.parameters.ipColocationFactorThreshold:
|
||||||
|
trace "colocationFactor over threshold", peer, address, len
|
||||||
|
let over = len - g.parameters.ipColocationFactorThreshold
|
||||||
|
over * over
|
||||||
|
else:
|
||||||
|
0.0
|
||||||
|
|
||||||
|
proc disconnectPeer(g: GossipSub, peer: PubSubPeer) {.async.} =
|
||||||
|
when defined(libp2p_agents_metrics):
|
||||||
|
let agent =
|
||||||
|
block:
|
||||||
|
if peer.shortAgent.len > 0:
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
if peer.sendConn != nil:
|
||||||
|
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||||
|
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||||
|
peer.shortAgent = shortAgent
|
||||||
|
else:
|
||||||
|
peer.shortAgent = "unknown"
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
"unknown"
|
||||||
|
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = [agent])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_bad_score_disconnection.inc(labelValues = ["unknown"])
|
||||||
|
|
||||||
|
if peer.sendConn != nil:
|
||||||
|
try:
|
||||||
|
await g.switch.disconnect(peer.peerId)
|
||||||
|
except CancelledError:
|
||||||
|
raise
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg
|
||||||
|
|
||||||
|
proc updateScores*(g: GossipSub) = # avoid async
|
||||||
|
trace "updating scores", peers = g.peers.len
|
||||||
|
|
||||||
|
let now = Moment.now()
|
||||||
|
var evicting: seq[PeerID]
|
||||||
|
|
||||||
|
for peerId, stats in g.peerStats.mpairs:
|
||||||
|
let peer = g.peers.getOrDefault(peerId)
|
||||||
|
if isNil(peer) or not(peer.connected):
|
||||||
|
if now > stats.expire:
|
||||||
|
evicting.add(peerId)
|
||||||
|
trace "evicted peer from memory", peer = peerId
|
||||||
|
continue
|
||||||
|
|
||||||
|
trace "updating peer score", peer
|
||||||
|
|
||||||
|
var
|
||||||
|
n_topics = 0
|
||||||
|
is_grafted = 0
|
||||||
|
|
||||||
|
# Per topic
|
||||||
|
for topic, topicParams in g.topicParams:
|
||||||
|
var info = stats.topicInfos.getOrDefault(topic)
|
||||||
|
inc n_topics
|
||||||
|
|
||||||
|
# if weight is 0.0 avoid wasting time
|
||||||
|
if topicParams.topicWeight != 0.0:
|
||||||
|
# Scoring
|
||||||
|
var topicScore = 0'f64
|
||||||
|
|
||||||
|
if info.inMesh:
|
||||||
|
inc is_grafted
|
||||||
|
info.meshTime = now - info.graftTime
|
||||||
|
if info.meshTime > topicParams.meshMessageDeliveriesActivation:
|
||||||
|
info.meshMessageDeliveriesActive = true
|
||||||
|
|
||||||
|
var p1 = info.meshTime / topicParams.timeInMeshQuantum
|
||||||
|
if p1 > topicParams.timeInMeshCap:
|
||||||
|
p1 = topicParams.timeInMeshCap
|
||||||
|
trace "p1", peer, p1, topic, topicScore
|
||||||
|
topicScore += p1 * topicParams.timeInMeshWeight
|
||||||
|
else:
|
||||||
|
info.meshMessageDeliveriesActive = false
|
||||||
|
|
||||||
|
topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
|
||||||
|
trace "p2", peer, p2 = info.firstMessageDeliveries, topic, topicScore
|
||||||
|
|
||||||
|
if info.meshMessageDeliveriesActive:
|
||||||
|
if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold:
|
||||||
|
let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
|
||||||
|
let p3 = deficit * deficit
|
||||||
|
trace "p3", peer, p3, topic, topicScore
|
||||||
|
topicScore += p3 * topicParams.meshMessageDeliveriesWeight
|
||||||
|
|
||||||
|
topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight
|
||||||
|
trace "p3b", peer, p3b = info.meshFailurePenalty, topic, topicScore
|
||||||
|
|
||||||
|
topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
|
||||||
|
trace "p4", p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore
|
||||||
|
|
||||||
|
trace "updated peer topic's scores", peer, topic, info, topicScore
|
||||||
|
|
||||||
|
peer.score += topicScore * topicParams.topicWeight
|
||||||
|
|
||||||
|
# Score metrics
|
||||||
|
when defined(libp2p_agents_metrics):
|
||||||
|
let agent =
|
||||||
|
block:
|
||||||
|
if peer.shortAgent.len > 0:
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
if peer.sendConn != nil:
|
||||||
|
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||||
|
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||||
|
peer.shortAgent = shortAgent
|
||||||
|
else:
|
||||||
|
peer.shortAgent = "unknown"
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
"unknown"
|
||||||
|
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = [agent])
|
||||||
|
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = [agent])
|
||||||
|
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = [agent])
|
||||||
|
libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = [agent])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_peers_score_firstMessageDeliveries.inc(info.firstMessageDeliveries, labelValues = ["unknown"])
|
||||||
|
libp2p_gossipsub_peers_score_meshMessageDeliveries.inc(info.meshMessageDeliveries, labelValues = ["unknown"])
|
||||||
|
libp2p_gossipsub_peers_score_meshFailurePenalty.inc(info.meshFailurePenalty, labelValues = ["unknown"])
|
||||||
|
libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc(info.invalidMessageDeliveries, labelValues = ["unknown"])
|
||||||
|
|
||||||
|
# Score decay
|
||||||
|
info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay
|
||||||
|
if info.firstMessageDeliveries < g.parameters.decayToZero:
|
||||||
|
info.firstMessageDeliveries = 0
|
||||||
|
|
||||||
|
info.meshMessageDeliveries *= topicParams.meshMessageDeliveriesDecay
|
||||||
|
if info.meshMessageDeliveries < g.parameters.decayToZero:
|
||||||
|
info.meshMessageDeliveries = 0
|
||||||
|
|
||||||
|
info.meshFailurePenalty *= topicParams.meshFailurePenaltyDecay
|
||||||
|
if info.meshFailurePenalty < g.parameters.decayToZero:
|
||||||
|
info.meshFailurePenalty = 0
|
||||||
|
|
||||||
|
info.invalidMessageDeliveries *= topicParams.invalidMessageDeliveriesDecay
|
||||||
|
if info.invalidMessageDeliveries < g.parameters.decayToZero:
|
||||||
|
info.invalidMessageDeliveries = 0
|
||||||
|
|
||||||
|
# Wrap up
|
||||||
|
# commit our changes, mgetOrPut does NOT work as wanted with value types (lent?)
|
||||||
|
stats.topicInfos[topic] = info
|
||||||
|
|
||||||
|
peer.score += peer.appScore * g.parameters.appSpecificWeight
|
||||||
|
|
||||||
|
peer.score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
|
||||||
|
|
||||||
|
let colocationFactor = g.colocationFactor(peer)
|
||||||
|
peer.score += colocationFactor * g.parameters.ipColocationFactorWeight
|
||||||
|
|
||||||
|
# Score metrics
|
||||||
|
when defined(libp2p_agents_metrics):
|
||||||
|
let agent =
|
||||||
|
block:
|
||||||
|
if peer.shortAgent.len > 0:
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
if peer.sendConn != nil:
|
||||||
|
let shortAgent = peer.sendConn.peerInfo.agentVersion.split("/")[0].toLowerAscii()
|
||||||
|
if KnownLibP2PAgentsSeq.contains(shortAgent):
|
||||||
|
peer.shortAgent = shortAgent
|
||||||
|
else:
|
||||||
|
peer.shortAgent = "unknown"
|
||||||
|
peer.shortAgent
|
||||||
|
else:
|
||||||
|
"unknown"
|
||||||
|
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = [agent])
|
||||||
|
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = [agent])
|
||||||
|
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = [agent])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_peers_score_appScore.inc(peer.appScore, labelValues = ["unknown"])
|
||||||
|
libp2p_gossipsub_peers_score_behaviourPenalty.inc(peer.behaviourPenalty, labelValues = ["unknown"])
|
||||||
|
libp2p_gossipsub_peers_score_colocationFactor.inc(colocationFactor, labelValues = ["unknown"])
|
||||||
|
|
||||||
|
# decay behaviourPenalty
|
||||||
|
peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay
|
||||||
|
if peer.behaviourPenalty < g.parameters.decayToZero:
|
||||||
|
peer.behaviourPenalty = 0
|
||||||
|
|
||||||
|
# copy into stats the score to keep until expired
|
||||||
|
stats.score = peer.score
|
||||||
|
stats.appScore = peer.appScore
|
||||||
|
stats.behaviourPenalty = peer.behaviourPenalty
|
||||||
|
stats.expire = Moment.now() + g.parameters.retainScore # refresh expiration
|
||||||
|
assert(g.peerStats[peer.peerId].score == peer.score) # nim sanity check
|
||||||
|
trace "updated peer's score", peer, score = peer.score, n_topics, is_grafted
|
||||||
|
|
||||||
|
if g.parameters.disconnectBadPeers and stats.score < g.parameters.graylistThreshold:
|
||||||
|
debug "disconnecting bad score peer", peer, score = peer.score
|
||||||
|
asyncSpawn g.disconnectPeer(peer)
|
||||||
|
|
||||||
|
when defined(libp2p_agents_metrics):
|
||||||
|
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = [agent])
|
||||||
|
else:
|
||||||
|
libp2p_gossipsub_peers_scores.inc(peer.score, labelValues = ["unknown"])
|
||||||
|
|
||||||
|
for peer in evicting:
|
||||||
|
g.peerStats.del(peer)
|
||||||
|
|
||||||
|
trace "updated scores", peers = g.peers.len
|
||||||
|
|
||||||
|
proc punishInvalidMessage*(g: GossipSub, peer: PubSubPeer, topics: seq[string]) =
|
||||||
|
for t in topics:
|
||||||
|
if t notin g.topics:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# update stats
|
||||||
|
g.peerStats.withValue(peer.peerId, stats):
|
||||||
|
stats[].topicInfos.withValue(t, tstats):
|
||||||
|
tstats[].invalidMessageDeliveries += 1
|
||||||
|
do: # if we have no stats populate!
|
||||||
|
stats[].topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1)
|
||||||
|
do: # if we have no stats populate!
|
||||||
|
g.initPeerStats(peer) do:
|
||||||
|
var stats = PeerStats()
|
||||||
|
stats.topicInfos[t] = TopicInfo(invalidMessageDeliveries: 1)
|
||||||
|
stats
|
163
libp2p/protocols/pubsub/gossipsub/types.nim
Normal file
163
libp2p/protocols/pubsub/gossipsub/types.nim
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
import chronos
|
||||||
|
import std/[tables, sets]
|
||||||
|
import ".."/[floodsub, peertable, mcache, pubsubpeer]
|
||||||
|
import "../rpc"/[messages]
|
||||||
|
import "../../.."/[peerid, multiaddress]
|
||||||
|
|
||||||
|
const
|
||||||
|
GossipSubCodec* = "/meshsub/1.1.0"
|
||||||
|
GossipSubCodec_10* = "/meshsub/1.0.0"
|
||||||
|
|
||||||
|
# overlay parameters
|
||||||
|
const
|
||||||
|
GossipSubD* = 6
|
||||||
|
GossipSubDlo* = 4
|
||||||
|
GossipSubDhi* = 12
|
||||||
|
|
||||||
|
# gossip parameters
|
||||||
|
const
|
||||||
|
GossipSubHistoryLength* = 5
|
||||||
|
GossipSubHistoryGossip* = 3
|
||||||
|
|
||||||
|
# heartbeat interval
|
||||||
|
GossipSubHeartbeatInterval* = 1.seconds
|
||||||
|
|
||||||
|
# fanout ttl
|
||||||
|
const
|
||||||
|
GossipSubFanoutTTL* = 1.minutes
|
||||||
|
|
||||||
|
# gossip parameters
|
||||||
|
const
|
||||||
|
GossipBackoffPeriod* = 1.minutes
|
||||||
|
|
||||||
|
const
|
||||||
|
BackoffSlackTime* = 2 # seconds
|
||||||
|
IWantPeerBudget* = 25 # 25 messages per second ( reset every heartbeat )
|
||||||
|
IHavePeerBudget* = 10
|
||||||
|
# the max amount of IHave to expose, not by spec, but go as example
|
||||||
|
# rust sigp: https://github.com/sigp/rust-libp2p/blob/f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c/protocols/gossipsub/src/config.rs#L572
|
||||||
|
# go: https://github.com/libp2p/go-libp2p-pubsub/blob/08c17398fb11b2ab06ca141dddc8ec97272eb772/gossipsub.go#L155
|
||||||
|
IHaveMaxLength* = 5000
|
||||||
|
|
||||||
|
type
|
||||||
|
TopicInfo* = object
|
||||||
|
# gossip 1.1 related
|
||||||
|
graftTime*: Moment
|
||||||
|
meshTime*: Duration
|
||||||
|
inMesh*: bool
|
||||||
|
meshMessageDeliveriesActive*: bool
|
||||||
|
firstMessageDeliveries*: float64
|
||||||
|
meshMessageDeliveries*: float64
|
||||||
|
meshFailurePenalty*: float64
|
||||||
|
invalidMessageDeliveries*: float64
|
||||||
|
|
||||||
|
TopicParams* = object
|
||||||
|
topicWeight*: float64
|
||||||
|
|
||||||
|
# p1
|
||||||
|
timeInMeshWeight*: float64
|
||||||
|
timeInMeshQuantum*: Duration
|
||||||
|
timeInMeshCap*: float64
|
||||||
|
|
||||||
|
# p2
|
||||||
|
firstMessageDeliveriesWeight*: float64
|
||||||
|
firstMessageDeliveriesDecay*: float64
|
||||||
|
firstMessageDeliveriesCap*: float64
|
||||||
|
|
||||||
|
# p3
|
||||||
|
meshMessageDeliveriesWeight*: float64
|
||||||
|
meshMessageDeliveriesDecay*: float64
|
||||||
|
meshMessageDeliveriesThreshold*: float64
|
||||||
|
meshMessageDeliveriesCap*: float64
|
||||||
|
meshMessageDeliveriesActivation*: Duration
|
||||||
|
meshMessageDeliveriesWindow*: Duration
|
||||||
|
|
||||||
|
# p3b
|
||||||
|
meshFailurePenaltyWeight*: float64
|
||||||
|
meshFailurePenaltyDecay*: float64
|
||||||
|
|
||||||
|
# p4
|
||||||
|
invalidMessageDeliveriesWeight*: float64
|
||||||
|
invalidMessageDeliveriesDecay*: float64
|
||||||
|
|
||||||
|
PeerStats* = object
|
||||||
|
topicInfos*: Table[string, TopicInfo]
|
||||||
|
expire*: Moment # updated on disconnect, to retain scores until expire
|
||||||
|
# the following are copies from PubSubPeer, in order to restore them on re-connection
|
||||||
|
score*: float64 # a copy of the score to keep in case the peer is disconnected
|
||||||
|
appScore*: float64 # application specific score
|
||||||
|
behaviourPenalty*: float64 # the eventual penalty score
|
||||||
|
|
||||||
|
GossipSubParams* = object
|
||||||
|
explicit*: bool
|
||||||
|
pruneBackoff*: Duration
|
||||||
|
floodPublish*: bool
|
||||||
|
gossipFactor*: float64
|
||||||
|
d*: int
|
||||||
|
dLow*: int
|
||||||
|
dHigh*: int
|
||||||
|
dScore*: int
|
||||||
|
dOut*: int
|
||||||
|
dLazy*: int
|
||||||
|
|
||||||
|
heartbeatInterval*: Duration
|
||||||
|
|
||||||
|
historyLength*: int
|
||||||
|
historyGossip*: int
|
||||||
|
|
||||||
|
fanoutTTL*: Duration
|
||||||
|
seenTTL*: Duration
|
||||||
|
|
||||||
|
gossipThreshold*: float64
|
||||||
|
publishThreshold*: float64
|
||||||
|
graylistThreshold*: float64
|
||||||
|
acceptPXThreshold*: float64
|
||||||
|
opportunisticGraftThreshold*: float64
|
||||||
|
decayInterval*: Duration
|
||||||
|
decayToZero*: float64
|
||||||
|
retainScore*: Duration
|
||||||
|
|
||||||
|
appSpecificWeight*: float64
|
||||||
|
ipColocationFactorWeight*: float64
|
||||||
|
ipColocationFactorThreshold*: float64
|
||||||
|
behaviourPenaltyWeight*: float64
|
||||||
|
behaviourPenaltyDecay*: float64
|
||||||
|
|
||||||
|
directPeers*: Table[PeerId, seq[MultiAddress]]
|
||||||
|
|
||||||
|
disconnectBadPeers*: bool
|
||||||
|
|
||||||
|
BackoffTable* = Table[string, Table[PeerID, Moment]]
|
||||||
|
|
||||||
|
GossipSub* = ref object of FloodSub
|
||||||
|
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
|
||||||
|
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
|
||||||
|
gossipsub*: PeerTable # peers that are subscribed to a topic
|
||||||
|
explicit*: PeerTable # directpeers that we keep alive explicitly
|
||||||
|
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
|
||||||
|
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
|
||||||
|
gossip*: Table[string, seq[ControlIHave]] # pending gossip
|
||||||
|
control*: Table[string, ControlMessage] # pending control messages
|
||||||
|
mcache*: MCache # messages cache
|
||||||
|
heartbeatFut*: Future[void] # cancellation future for heartbeat interval
|
||||||
|
heartbeatRunning*: bool
|
||||||
|
|
||||||
|
peerStats*: Table[PeerID, PeerStats]
|
||||||
|
parameters*: GossipSubParams
|
||||||
|
topicParams*: Table[string, TopicParams]
|
||||||
|
directPeersLoop*: Future[void]
|
||||||
|
peersInIP*: Table[MultiAddress, HashSet[PubSubPeer]]
|
||||||
|
|
||||||
|
heartbeatEvents*: seq[AsyncEvent]
|
||||||
|
|
||||||
|
randomBytes*: seq[byte]
|
||||||
|
|
||||||
|
MeshMetrics* = object
|
||||||
|
# scratch buffers for metrics
|
||||||
|
otherPeersPerTopicMesh*: int64
|
||||||
|
otherPeersPerTopicFanout*: int64
|
||||||
|
otherPeersPerTopicGossipsub*: int64
|
||||||
|
underDlowTopics*: int64
|
||||||
|
underDoutTopics*: int64
|
||||||
|
underDhighAboveDlowTopics*: int64
|
||||||
|
noPeersTopics*: int64
|
@ -351,8 +351,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} =
|
|||||||
## messages
|
## messages
|
||||||
##
|
##
|
||||||
|
|
||||||
let peer = p.getOrCreatePeer(peer, p.codecs)
|
discard p.getOrCreatePeer(peer, p.codecs)
|
||||||
peer.outbound = true # flag as outbound
|
|
||||||
|
|
||||||
proc updateTopicMetrics(p: PubSub, topic: string) =
|
proc updateTopicMetrics(p: PubSub, topic: string) =
|
||||||
# metrics
|
# metrics
|
||||||
|
@ -55,7 +55,6 @@ type
|
|||||||
score*: float64
|
score*: float64
|
||||||
iWantBudget*: int
|
iWantBudget*: int
|
||||||
iHaveBudget*: int
|
iHaveBudget*: int
|
||||||
outbound*: bool # if this is an outbound connection
|
|
||||||
appScore*: float64 # application specific score
|
appScore*: float64 # application specific score
|
||||||
behaviourPenalty*: float64 # the eventual penalty score
|
behaviourPenalty*: float64 # the eventual penalty score
|
||||||
|
|
||||||
@ -80,6 +79,12 @@ proc connected*(p: PubSubPeer): bool =
|
|||||||
proc hasObservers(p: PubSubPeer): bool =
|
proc hasObservers(p: PubSubPeer): bool =
|
||||||
p.observers != nil and anyIt(p.observers[], it != nil)
|
p.observers != nil and anyIt(p.observers[], it != nil)
|
||||||
|
|
||||||
|
func outbound*(p: PubSubPeer): bool =
|
||||||
|
if p.connected and p.sendConn.dir == Direction.Out:
|
||||||
|
true
|
||||||
|
else:
|
||||||
|
false
|
||||||
|
|
||||||
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
|
proc recvObservers(p: PubSubPeer, msg: var RPCMsg) =
|
||||||
# trigger hooks
|
# trigger hooks
|
||||||
if not(isNil(p.observers)) and p.observers[].len > 0:
|
if not(isNil(p.observers)) and p.observers[].len > 0:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user