wip updateScores

This commit is contained in:
Giovanni Petrantoni 2020-07-17 14:03:25 +09:00
parent 6f638259ee
commit 9e3f70896a
2 changed files with 26 additions and 3 deletions

View File

@ -314,11 +314,33 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
result[peer.id].ihave.add(ihave) result[peer.id].ihave.add(ihave)
proc updateScores(g: GossipSub) = # avoid async
debug "updating scores", peers = g.peers.len
let now = Moment.now()
for id, peer in g.peers:
# TODO
# Per topic
for topic in peer.topics:
# Defect on purpose, no magic here please, this should not fail!
let topicParams = g.topics[topic].parameters
var info = peer.topicInfos[topic]
if info.inMesh:
info.meshTime = now - info.graftTime
if info.meshTime > topicParams.meshMessageDeliveriesActivation:
info.meshMessageDeliveriesActive = true
# debug assert to check nim compiler is doing what we are asking...
assert(peer.topicInfos[topic].meshTime == info.meshTime)
proc heartbeat(g: GossipSub) {.async.} = proc heartbeat(g: GossipSub) {.async.} =
while g.heartbeatRunning: while g.heartbeatRunning:
try: try:
trace "running heartbeat" trace "running heartbeat"
g.updateScores()
for t in toSeq(g.topics.keys): for t in toSeq(g.topics.keys):
await g.rebalanceMesh(t) await g.rebalanceMesh(t)

View File

@ -31,10 +31,12 @@ type
onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onRecv*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].} onSend*: proc(peer: PubSubPeer; msgs: var RPCMsg) {.gcsafe, raises: [Defect].}
TopicInfo = object TopicInfo* = object
# gossip 1.1 related # gossip 1.1 related
graftTime*: Moment graftTime*: Moment
meshTime*: Duration meshTime*: Duration
inMesh*: bool
meshMessageDeliveriesActive*: bool
PubSubPeer* = ref object of RootObj PubSubPeer* = ref object of RootObj
proto*: string # the protocol that this peer joined from proto*: string # the protocol that this peer joined from
@ -47,7 +49,7 @@ type
onConnect*: AsyncEvent onConnect*: AsyncEvent
observers*: ref seq[PubSubObserver] # ref as in smart_ptr observers*: ref seq[PubSubObserver] # ref as in smart_ptr
refs: int # how many active connections this peer has refs: int # how many active connections this peer has
topicInfos: Table[string, TopicInfo] topicInfos*: Table[string, TopicInfo]
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.} RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
@ -228,7 +230,6 @@ proc pruned*(p: PubSubPeer, topic: string) =
var _ = p.topicInfos.mgetOrPut(topic, TopicInfo()) var _ = p.topicInfos.mgetOrPut(topic, TopicInfo())
# TODO # TODO
proc newPubSubPeer*(peerInfo: PeerInfo, proc newPubSubPeer*(peerInfo: PeerInfo,
proto: string): PubSubPeer = proto: string): PubSubPeer =
new result new result