From 69a92aebe61f29bbab80935b87b240461083a117 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sat, 4 Jul 2020 20:53:40 +0900 Subject: [PATCH] merge 1.0 --- libp2p/protocols/pubsub/gossipsub11.nim | 149 +++++++++++------------- 1 file changed, 71 insertions(+), 78 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub11.nim b/libp2p/protocols/pubsub/gossipsub11.nim index 901d5effd..cb4d410b7 100644 --- a/libp2p/protocols/pubsub/gossipsub11.nim +++ b/libp2p/protocols/pubsub/gossipsub11.nim @@ -242,30 +242,31 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: - try: - trace "running heartbeat" + withLock g.heartbeatLock: + try: + trace "running heartbeat" - for t in toSeq(g.topics.keys): - await g.rebalanceMesh(t) + for t in toSeq(g.topics.keys): + await g.rebalanceMesh(t) - await g.dropFanoutPeers() + await g.dropFanoutPeers() - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) - let peers = g.getGossipPeers() - var sent: seq[Future[void]] - for peer in peers.keys: - if peer in g.peers: - sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) - checkFutures(await allFinished(sent)) + let peers = g.getGossipPeers() + var sent: seq[Future[void]] + for peer in peers.keys: + if peer in g.peers: + sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + checkFutures(await allFinished(sent)) - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + g.mcache.shift() # shift the cache + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception ocurred in gossipsub heartbeat", exc = exc.msg await sleepAsync(1.seconds) @@ -275,29 +276,30 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = await procCall FloodSub(g).handleDisconnect(peer) - for t in toSeq(g.gossipsub.keys): - if t in g.gossipsub: - g.gossipsub[t].excl(peer.id) + withLock g.heartbeatLock: + for t in toSeq(g.gossipsub.keys): + if t in g.gossipsub: + g.gossipsub[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) - # mostly for metrics - await procCall PubSub(g).subscribeTopic(t, false, peer.id) + # mostly for metrics + await procCall PubSub(g).subscribeTopic(t, false, peer.id) - for t in toSeq(g.mesh.keys): - if t in g.mesh: - g.mesh[t].excl(peer.id) + for t in toSeq(g.mesh.keys): + if t in g.mesh: + g.mesh[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh[t].len.int64, labelValues = [t]) - for t in toSeq(g.fanout.keys): - if t in g.fanout: - g.fanout[t].excl(peer.id) + for t in toSeq(g.fanout.keys): + if t in g.fanout: + g.fanout[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout[t].len.int64, labelValues = [t]) method subscribeToPeer*(p: GossipSub, conn: Connection) {.async.} = @@ -310,26 +312,27 @@ method subscribeTopic*(g: GossipSub, peerId: string) {.gcsafe, async.} = await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) - if topic notin g.gossipsub: - g.gossipsub[topic] = initHashSet[string]() + withLock g.heartbeatLock: + if topic notin g.gossipsub: + g.gossipsub[topic] = initHashSet[string]() - if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic - # subscribe remote peer to the topic - g.gossipsub[topic].incl(peerId) - if peerId in g.explicit: - g.explicit[topic].incl(peerId) - else: - trace "removing subscription for topic", peer = peerId, name = topic - # unsubscribe remote peer from the topic - g.gossipsub[topic].excl(peerId) - if peerId in g.explicit: - g.explicit[topic].excl(peerId) + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe remote peer to the topic + g.gossipsub[topic].incl(peerId) + if peerId in g.explicit: + g.explicit[topic].incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe remote peer from the topic + g.gossipsub[topic].excl(peerId) + if peerId in g.explicit: + g.explicit[topic].excl(peerId) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub[topic].len.int64, labelValues = [topic]) - trace "gossip peers", peers = g.gossipsub[topic].len, topic + trace "gossip peers", peers = g.gossipsub[topic].len, topic # also rebalance current topic if we are subbed to if topic in g.topics: @@ -555,14 +558,11 @@ method publish*(g: GossipSub, trace "publish: sending message to peer", peer = p sent.add(peer.send(@[RPCMsg(messages: @[msg])])) else: - # Notice this needs a better fix! for now it's a hack - error "publish: peer or peerInfo was nil", missing = p - if topic in g.mesh: - g.mesh[topic].excl(p) - if topic in g.fanout: - g.fanout[topic].excl(p) - if topic in g.gossipsub: - g.gossipsub[topic].excl(p) + # this absolutely should not happen + # if it happens there is a bug that needs fixing asap + # this ain't no place to manage connections + fatal "publish: peer or peerInfo was nil", missing = p + doAssert(false, "publish: peer or peerInfo was nil") sent = await allFinished(sent) checkFutures(sent) @@ -580,14 +580,10 @@ method start*(g: GossipSub) {.async.} = ## start pubsub ## start long running/repeating procedures - # interlock start to to avoid overlapping to stops - await g.heartbeatLock.acquire() - - # setup the heartbeat interval - g.heartbeatRunning = true - g.heartbeatFut = g.heartbeat() - - g.heartbeatLock.release() + withLock g.heartbeatLock: + # setup the heartbeat interval + g.heartbeatRunning = true + g.heartbeatFut = g.heartbeat() method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" @@ -595,15 +591,12 @@ method stop*(g: GossipSub) {.async.} = ## stop pubsub ## stop long running tasks - await g.heartbeatLock.acquire() - - # stop heartbeat interval - g.heartbeatRunning = false - if not g.heartbeatFut.finished: - trace "awaiting last heartbeat" - await g.heartbeatFut - - g.heartbeatLock.release() + withLock g.heartbeatLock: + # stop heartbeat interval + g.heartbeatRunning = false + if not g.heartbeatFut.finished: + trace "awaiting last heartbeat" + await g.heartbeatFut method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub()