prune by score (score is stub still)
This commit is contained in:
parent
cc3a0c29c0
commit
08ed2a7974
|
@ -7,7 +7,7 @@
|
||||||
## This file may not be copied, modified, or distributed except according to
|
## This file may not be copied, modified, or distributed except according to
|
||||||
## those terms.
|
## those terms.
|
||||||
|
|
||||||
import tables, sets, options, sequtils, random
|
import tables, sets, options, sequtils, random, algorithm
|
||||||
import chronos, chronicles, metrics
|
import chronos, chronicles, metrics
|
||||||
import pubsub,
|
import pubsub,
|
||||||
floodsub,
|
floodsub,
|
||||||
|
@ -31,17 +31,21 @@ const
|
||||||
GossipSubCodec_11* = "/meshsub/1.1.0"
|
GossipSubCodec_11* = "/meshsub/1.1.0"
|
||||||
|
|
||||||
# overlay parameters
|
# overlay parameters
|
||||||
const GossipSubD* = 6
|
const
|
||||||
const GossipSubDlo* = 4
|
GossipSubD* = 6
|
||||||
const GossipSubDhi* = 12
|
GossipSubDlo* = 4
|
||||||
|
GossipSubDhi* = 12
|
||||||
|
|
||||||
# gossip parameters
|
# gossip parameters
|
||||||
const GossipSubHistoryLength* = 5
|
const
|
||||||
const GossipSubHistoryGossip* = 3
|
GossipSubHistoryLength* = 5
|
||||||
|
GossipSubHistoryGossip* = 3
|
||||||
|
GossipBackoffPeriod* = 1.minutes
|
||||||
|
|
||||||
# heartbeat interval
|
# heartbeat interval
|
||||||
const GossipSubHeartbeatInitialDelay* = 100.millis
|
const
|
||||||
const GossipSubHeartbeatInterval* = 1.seconds
|
GossipSubHeartbeatInitialDelay* = 100.millis
|
||||||
|
GossipSubHeartbeatInterval* = 1.seconds
|
||||||
|
|
||||||
# fanout ttl
|
# fanout ttl
|
||||||
const GossipSubFanoutTTL* = 1.minutes
|
const GossipSubFanoutTTL* = 1.minutes
|
||||||
|
@ -148,13 +152,33 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
|
||||||
# prune peers if we've gone over
|
# prune peers if we've gone over
|
||||||
if g.mesh.getOrDefault(topic).len > GossipSubDhi:
|
if g.mesh.getOrDefault(topic).len > GossipSubDhi:
|
||||||
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len
|
trace "about to prune mesh", mesh = g.mesh.getOrDefault(topic).len
|
||||||
|
|
||||||
|
# ATTN possible perf bottleneck here... score is a "red" function
|
||||||
|
# and we call a lot of Table[] etc etc
|
||||||
|
|
||||||
|
# gather peers
|
||||||
|
var peers = toSeq(g.mesh[topic])
|
||||||
|
# sort peers by score
|
||||||
|
peers.sort(proc (x, y: string): int =
|
||||||
|
let
|
||||||
|
peerx = g.peers[x].score()
|
||||||
|
peery = g.peers[y].score()
|
||||||
|
if peerx < peery: -1
|
||||||
|
elif peerx == peery: 0
|
||||||
|
else: 1)
|
||||||
|
|
||||||
while g.mesh.getOrDefault(topic).len > GossipSubD:
|
while g.mesh.getOrDefault(topic).len > GossipSubD:
|
||||||
trace "pruning peers", peers = g.mesh[topic].len
|
trace "pruning peers", peers = g.mesh[topic].len
|
||||||
let id = toSeq(g.mesh[topic])[rand(0..<g.mesh[topic].len)]
|
|
||||||
|
# pop a low score peer
|
||||||
|
let
|
||||||
|
id = peers.pop()
|
||||||
g.mesh[topic].excl(id)
|
g.mesh[topic].excl(id)
|
||||||
|
|
||||||
let p = g.peers[id]
|
# send a prune message to the peer
|
||||||
# send a graft message to the peer
|
let
|
||||||
|
p = g.peers[id]
|
||||||
|
# TODO send a set of other peers where the pruned peer can connect to reform its mesh
|
||||||
await p.sendPrune(@[topic])
|
await p.sendPrune(@[topic])
|
||||||
|
|
||||||
libp2p_gossipsub_peers_per_topic_gossipsub
|
libp2p_gossipsub_peers_per_topic_gossipsub
|
||||||
|
|
|
@ -45,6 +45,10 @@ type
|
||||||
|
|
||||||
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
RPCHandler* = proc(peer: PubSubPeer, msg: seq[RPCMsg]): Future[void] {.gcsafe.}
|
||||||
|
|
||||||
|
func score*(p: PubSubPeer): float64 =
|
||||||
|
# TODO
|
||||||
|
0.0
|
||||||
|
|
||||||
proc id*(p: PubSubPeer): string = p.peerInfo.id
|
proc id*(p: PubSubPeer): string = p.peerInfo.id
|
||||||
|
|
||||||
proc isConnected*(p: PubSubPeer): bool =
|
proc isConnected*(p: PubSubPeer): bool =
|
||||||
|
@ -159,10 +163,10 @@ proc sendGraft*(p: PubSubPeer, topics: seq[string]) {.async.} =
|
||||||
trace "sending graft msg to peer", peer = p.id, topicID = topic
|
trace "sending graft msg to peer", peer = p.id, topicID = topic
|
||||||
await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))])
|
await p.send(@[RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))])
|
||||||
|
|
||||||
proc sendPrune*(p: PubSubPeer, topics: seq[string]) {.async.} =
|
proc sendPrune*(p: PubSubPeer, topics: seq[string], peers: seq[PeerInfoMsg] = @[], backoff: uint64 = 0) {.async.} =
|
||||||
for topic in topics:
|
for topic in topics:
|
||||||
trace "sending prune msg to peer", peer = p.id, topicID = topic
|
trace "sending prune msg to peer", peer = p.id, topicID = topic
|
||||||
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic)])))])
|
await p.send(@[RPCMsg(control: some(ControlMessage(prune: @[ControlPrune(topicID: topic, peers: peers, backoff: backoff)])))])
|
||||||
|
|
||||||
proc newPubSubPeer*(peerInfo: PeerInfo,
|
proc newPubSubPeer*(peerInfo: PeerInfo,
|
||||||
proto: string): PubSubPeer =
|
proto: string): PubSubPeer =
|
||||||
|
|
Loading…
Reference in New Issue