This commit is contained in:
Giovanni Petrantoni 2020-07-17 18:01:30 +09:00
parent 0d60a2f1c6
commit 6b23cae3d4
3 changed files with 59 additions and 25 deletions

View File

@ -54,6 +54,17 @@ const
GossipSubFanoutTTL* = 1.minutes
type
TopicInfo* = object
# gossip 1.1 related
graftTime*: Moment
meshTime*: Duration
inMesh*: bool
meshMessageDeliveriesActive*: bool
PeerStats* = object
topicInfos*: Table[string, TopicInfo]
expire*: Moment # updated on disconnect, to retain scores until expire
GossipSubParams* = object
pruneBackoff*: Duration
floodPublish*: bool
@ -90,6 +101,7 @@ type
heartbeatFut: Future[void] # cancellation future for heartbeat interval
heartbeatRunning: bool
heartbeatLock: AsyncLock # heartbeat lock to prevent two consecutive concurrent heartbeats
peerStats: Table[PubSubPeer, PeerStats]
declareGauge(libp2p_gossipsub_peers_per_topic_mesh,
"gossipsub peers per topic in mesh",
@ -169,6 +181,25 @@ method init*(g: GossipSub) =
g.codecs &= GossipSubCodec
g.codecs &= GossipSubCodec_10
method handleConnect*(g: GossipSub, peer: PubSubPeer) =
if peer notin g.peerStats:
# new peer
g.peerStats[peer] = PeerStats()
return
else:
# we knew this peer
discard
proc grafted(g: GossipSub, p: PubSubPeer, topic: string) =
var stats = g.peerStats[p]
var info = stats.topicInfos.mgetOrPut(topic, TopicInfo())
info.graftTime = Moment.now()
info.meshTime = 0.seconds
proc pruned(g: GossipSub, p: PubSubPeer, topic: string) =
var stats = g.peerStats[p]
var _ = stats.topicInfos[topic]
proc replenishFanout(g: GossipSub, topic: string) =
## get fanout peers for a topic
trace "about to replenish fanout"
@ -227,7 +258,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
for peer in grafts:
if g.mesh.addPeer(topic, peer):
peer.grafted(topic)
g.grafted(peer, topic)
g.fanout.removePeer(topic, peer)
if g.mesh.peers(topic) > GossipSubDhi:
@ -238,7 +269,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
trace "about to prune mesh", prunes = prunes.len
for peer in prunes:
peer.pruned(topic)
g.pruned(peer, topic)
g.mesh.removePeer(topic, peer)
libp2p_gossipsub_peers_per_topic_gossipsub
@ -316,20 +347,27 @@ proc updateScores(g: GossipSub) = # avoid async
let now = Moment.now()
for id, peer in g.peers:
for peer, stats in g.peerStats:
debug "updating peer score", peer = peer, gossipTopics = peer.topics.len
# TODO
if not peer.connected:
if now > stats.expire:
discard # really cleanup peer
# Per topic
for topic in peer.topics:
debug "updating peer topic's scores", peer = peer, topic
# Defect on purpose, no magic here please, this should not fail!
let topicParams = g.topics[topic].parameters
var info = peer.topicInfos[topic]
var info = stats.topicInfos[topic]
if info.inMesh:
info.meshTime = now - info.graftTime
if info.meshTime > topicParams.meshMessageDeliveriesActivation:
info.meshMessageDeliveriesActive = true
# debug assert to check nim compiler is doing what we are asking...
assert(peer.topicInfos[topic].meshTime == info.meshTime)
assert(stats.topicInfos[topic].meshTime == info.meshTime)
proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning:
@ -389,6 +427,14 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) =
g.explicit.removePeer(t, peer)
g.explicitPeers.excl(peer.id)
# don't retain bad score peers
if peer.score() > 0:
g.peerStats.del(peer)
return
var stats = g.peerStats[peer]
stats.expire = Moment.now() + g.parameters.retainScore
method subscribePeer*(p: GossipSub,
conn: Connection) =
@ -465,7 +511,7 @@ proc handleGraft(g: GossipSub,
# peer will be removed from the mesh on next rebalance, so we don't want
# this peer to push someone else out
if g.mesh.addPeer(topic, peer):
peer.grafted(topic)
g.grafted(peer, topic)
g.fanout.removePeer(topic, peer)
else:
trace "peer already in mesh"
@ -484,7 +530,7 @@ proc handlePrune(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
for prune in prunes:
trace "peer pruned topic", peer = peer.id, topic = prune.topicID
peer.pruned(prune.topicID)
g.pruned(peer, prune.topicID)
g.mesh.removePeer(prune.topicID, peer)
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh.peers(prune.topicID).int64, labelValues = [prune.topicID])
@ -609,7 +655,7 @@ method unsubscribe*(g: GossipSub,
g.mesh.del(topic)
for peer in peers:
peer.pruned(topic)
g.pruned(peer, topic)
await peer.sendPrune(@[topic])
method publish*(g: GossipSub,

View File

@ -137,6 +137,9 @@ proc validateParameters*(parameters: TopicParams): Result[void, cstring] =
else:
ok()
method handleConnect*(p: PubSub, peer: PubSubPeer) {.base.} =
discard
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.base.} =
## handle peer disconnects
##
@ -208,6 +211,8 @@ proc getOrCreatePeer(p: PubSub,
p.peers[peer.id] = peer
peer.observers = p.observers
handleConnect(p, peer)
# metrics
libp2p_pubsub_peers.set(p.peers.len.int64)

View File

@ -31,13 +31,6 @@ type
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
TopicInfo* = object
# gossip 1.1 related
graftTime*: Moment
meshTime*: Duration
inMesh*: bool
meshMessageDeliveriesActive*: bool
PubSubPeer* = ref object of RootObj
proto*: string # the protocol that this peer joined from
sendConn: Connection
@ -49,7 +42,6 @@ type
onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr
refs: int # how many active connections this peer has
topicInfos*: Table[string, TopicInfo]
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -223,15 +215,6 @@ proc sendPrune*(p: PubSubPeer, topics: seq[string]): Future[void] =
proc `$`*(p: PubSubPeer): string =
p.id
proc grafted*(p: PubSubPeer, topic: string) =
var info = p.topicInfos.mgetOrPut(topic, TopicInfo())
info.graftTime = Moment.now()
info.meshTime = 0.seconds
proc pruned*(p: PubSubPeer, topic: string) =
var _ = p.topicInfos.mgetOrPut(topic, TopicInfo())
# TODO
proc newPubSubPeer*(peerInfo: PeerInfo,
proto: string): PubSubPeer =
new result