From 05463873a21004b27a275a78e43d84c5cb69add1 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Wed, 12 Aug 2020 18:25:57 +0900 Subject: [PATCH] restore explicit peering --- libp2p/protocols/pubsub/gossipsub.nim | 39 ++++++++++++++++++--------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 06d3f812c..2e0d26c62 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -100,7 +100,7 @@ type expire*: Moment # updated on disconnect, to retain scores until expire GossipSubParams* = object - explicit*: bool + explicit: bool pruneBackoff*: Duration floodPublish*: bool gossipFactor*: float64 @@ -122,13 +122,14 @@ type behaviourPenaltyWeight*: float64 behaviourPenaltyDecay*: float64 + directPeers*: seq[PeerId] + GossipSub* = ref object of FloodSub mesh*: PeerTable # peers that we send messages to when we are subscribed to the topic fanout*: PeerTable # peers that we send messages to when we're not subscribed to the topic gossipsub*: PeerTable # peers that are subscribed to a topic explicit*: PeerTable # directpeers that we keep alive explicitly - explicitPeers*: HashSet[PeerID] # explicit (always connected/forward) peers - backingOff*: HashSet[PeerID] # explicit (always connected/forward) peers + backingOff*: HashSet[PeerID] # explicit (always connected/forward) peers lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics gossip*: Table[string, seq[ControlIHave]] # pending gossip control*: Table[string, ControlMessage] # pending control messages @@ -140,6 +141,7 @@ type peerStats: Table[PubSubPeer, PeerStats] parameters*: GossipSubParams topicParams*: Table[string, TopicParams] + directPeersLoop: Future[void] heartbeatEvents*: seq[AsyncEvent] @@ -369,7 +371,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # avoid negative score peers x.score >= 0.0 and # don't pick explicit peers - x.peerId notin g.explicitPeers and + x.peerId notin g.parameters.directPeers and # and avoid peers we are backing off x.peerId notin g.backingOff @@ -427,7 +429,7 @@ proc rebalanceMesh(g: GossipSub, topic: string) {.async.} = # avoid negative score peers x.score >= median.score and # don't pick explicit peers - x.peerId notin g.explicitPeers and + x.peerId notin g.parameters.directPeers and # and avoid peers we are backing off x.peerId notin g.backingOff @@ -658,6 +660,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = for t in toSeq(g.gossipsub.keys): g.gossipsub.removePeer(t, pubSubPeer) + # also try to remove from explicit table here + g.explicit.removePeer(t, pubSubPeer) when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_gossipsub @@ -678,12 +682,6 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = when defined(libp2p_expensive_metrics): libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout.peers(t).int64, labelValues = [t]) - - # TODO - # if peer.peerInfo.maintain: - # for t in toSeq(g.explicit.keys): - # g.explicit.removePeer(t, peer) - # g.explicitPeers.excl(peer.id) # don't retain bad score peers if pubSubPeer.score < 0.0: @@ -717,7 +715,7 @@ method subscribeTopic*(g: GossipSub, trace "peer subscribed to topic" # subscribe remote peer to the topic discard g.gossipsub.addPeer(topic, peer) - if peerId in g.explicitPeers: + if peerId in g.parameters.directPeers: discard g.explicit.addPeer(topic, peer) else: trace "peer unsubscribed from topic" @@ -725,7 +723,7 @@ method subscribeTopic*(g: GossipSub, g.gossipsub.removePeer(topic, peer) g.mesh.removePeer(topic, peer) g.fanout.removePeer(topic, peer) - if peerId in g.explicitPeers: + if peerId in g.parameters.directPeers: g.explicit.removePeer(topic, peer) when defined(libp2p_expensive_metrics): @@ -1044,6 +1042,18 @@ method publish*(g: GossipSub, msg = msg.shortLog() return published +proc maintainDirectPeers(g: GossipSub) {.async.} = + while g.heartbeatRunning: + for id in g.parameters.directPeers: + let peer = g.peers.getOrDefault(id) + if peer == nil: + # this creates a new peer and assigns the current switch to it + # as a result the next time we try to Send we will as well try to open a connection + # see pubsubpeer.nim send and such + discard g.getOrCreatePeer(id, g.codec) + + await sleepAsync(1.minutes) + method start*(g: GossipSub) {.async.} = trace "gossipsub start" @@ -1054,6 +1064,7 @@ method start*(g: GossipSub) {.async.} = # setup the heartbeat interval g.heartbeatRunning = true g.heartbeatFut = g.heartbeat() + g.directPeersLoop = g.maintainDirectPeers() method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" @@ -1067,6 +1078,8 @@ method stop*(g: GossipSub) {.async.} = if not g.heartbeatFut.finished: trace "awaiting last heartbeat" await g.heartbeatFut + await g.directPeersLoop.cancelAndWait() + method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub()