restore explicit peering

This commit is contained in:
Giovanni Petrantoni 2020-08-12 18:25:57 +09:00
parent 8932427a8b
commit 05463873a2

View File

@ -100,7 +100,7 @@ type
expire*: Moment # updated on disconnect, to retain scores until expire
GossipSubParams* = object
explicit*: bool
explicit: bool
pruneBackoff*: Duration
floodPublish*: bool
gossipFactor*: float64
@ -122,12 +122,13 @@ type
behaviourPenaltyWeight*: float64
behaviourPenaltyDecay*: float64
directPeers*: seq[PeerId]
GossipSub* = ref object of FloodSub
mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic
fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic
gossipsub*: PeerTable # peers that are subscribed to a topic
explicit*: PeerTable # directpeers that we keep alive explicitly
explicitPeers*: HashSet[PeerID] # explicit (always connected/forward) peers
backingOff*: HashSet[PeerID] # explicit (always connected/forward) peers
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
gossip*: Table[string, seq[ControlIHave]] # pending gossip
@ -140,6 +141,7 @@ type
peerStats: Table[PubSubPeer, PeerStats]
parameters*: GossipSubParams
topicParams*: Table[string, TopicParams]
directPeersLoop: Future[void]
heartbeatEvents*: seq[AsyncEvent]
@ -369,7 +371,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# avoid negative score peers
x.score >= 0.0 and
# don't pick explicit peers
x.peerId notin g.explicitPeers and
x.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
x.peerId notin g.backingOff
@ -427,7 +429,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} =
# avoid negative score peers
x.score >= median.score and
# don't pick explicit peers
x.peerId notin g.explicitPeers and
x.peerId notin g.parameters.directPeers and
# and avoid peers we are backing off
x.peerId notin g.backingOff
@ -658,6 +660,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
for t in toSeq(g.gossipsub.keys):
g.gossipsub.removePeer(t, pubSubPeer)
# also try to remove from explicit table here
g.explicit.removePeer(t, pubSubPeer)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_peers_per_topic_gossipsub
@ -679,12 +683,6 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout.peers(t).int64, labelValues = [t])
# TODO
# if peer.peerInfo.maintain:
# for t in toSeq(g.explicit.keys):
# g.explicit.removePeer(t, peer)
# g.explicitPeers.excl(peer.id)
# don't retain bad score peers
if pubSubPeer.score < 0.0:
g.peerStats.del(pubSubPeer)
@ -717,7 +715,7 @@ method subscribeTopic*(g: GossipSub,
trace "peer subscribed to topic"
# subscribe remote peer to the topic
discard g.gossipsub.addPeer(topic, peer)
if peerId in g.explicitPeers:
if peerId in g.parameters.directPeers:
discard g.explicit.addPeer(topic, peer)
else:
trace "peer unsubscribed from topic"
@ -725,7 +723,7 @@ method subscribeTopic*(g: GossipSub,
g.gossipsub.removePeer(topic, peer)
g.mesh.removePeer(topic, peer)
g.fanout.removePeer(topic, peer)
if peerId in g.explicitPeers:
if peerId in g.parameters.directPeers:
g.explicit.removePeer(topic, peer)
when defined(libp2p_expensive_metrics):
@ -1044,6 +1042,18 @@ method publish*(g: GossipSub,
msg = msg.shortLog()
return published
proc maintainDirectPeers(g: GossipSub) {.async.} =
while g.heartbeatRunning:
for id in g.parameters.directPeers:
let peer = g.peers.getOrDefault(id)
if peer == nil:
# this creates a new peer and assigns the current switch to it
# as a result the next time we try to Send we will as well try to open a connection
# see pubsubpeer.nim send and such
discard g.getOrCreatePeer(id, g.codec)
await sleepAsync(1.minutes)
method start*(g: GossipSub) {.async.} =
trace "gossipsub start"
@ -1054,6 +1064,7 @@ method start*(g: GossipSub) {.async.} =
# setup the heartbeat interval
g.heartbeatRunning = true
g.heartbeatFut = g.heartbeat()
g.directPeersLoop = g.maintainDirectPeers()
method stop*(g: GossipSub) {.async.} =
trace "gossipsub stop"
@ -1067,6 +1078,8 @@ method stop*(g: GossipSub) {.async.} =
if not g.heartbeatFut.finished:
trace "awaiting last heartbeat"
await g.heartbeatFut
await g.directPeersLoop.cancelAndWait()
method initPubSub*(g: GossipSub) =
procCall FloodSub(g).initPubSub()