From 479c83c1dffdeafc047267b322aa5c9848519eb9 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sat, 4 Jul 2020 19:36:15 +0900 Subject: [PATCH] with sugar --- libp2p/protocols/pubsub/gossipsub.nim | 93 +++++++++++---------------- libp2p/utility.nim | 7 ++ 2 files changed, 44 insertions(+), 56 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index b345b9903..9e3ec488b 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -189,34 +189,31 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = proc heartbeat(g: GossipSub) {.async.} = while g.heartbeatRunning: - await g.heartbeatLock.acquire() - try: + withLock g.heartbeatLock: + try: + trace "running heartbeat" - trace "running heartbeat" + for t in toSeq(g.topics.keys): + await g.rebalanceMesh(t) - for t in toSeq(g.topics.keys): - await g.rebalanceMesh(t) + await g.dropFanoutPeers() - await g.dropFanoutPeers() + # replenish known topics to the fanout + for t in toSeq(g.fanout.keys): + g.replenishFanout(t) - # replenish known topics to the fanout - for t in toSeq(g.fanout.keys): - g.replenishFanout(t) + let peers = g.getGossipPeers() + var sent: seq[Future[void]] + for peer in peers.keys: + if peer in g.peers: + sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + checkFutures(await allFinished(sent)) - let peers = g.getGossipPeers() - var sent: seq[Future[void]] - for peer in peers.keys: - if peer in g.peers: - sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) - checkFutures(await allFinished(sent)) - - g.mcache.shift() # shift the cache - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception ocurred in gossipsub heartbeat", exc = exc.msg - finally: - g.heartbeatLock.release() + g.mcache.shift() # shift the cache + except CancelledError as exc: + raise exc + except CatchableError as exc: + trace "exception ocurred in gossipsub heartbeat", exc = exc.msg await sleepAsync(1.seconds) @@ -227,8 +224,7 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = await procCall FloodSub(g).handleDisconnect(peer) # must avoid running this while manipulating mesh/gossip tables - await g.heartbeatLock.acquire() - try: + withLock g.heartbeatLock: for t in toSeq(g.gossipsub.keys): g.gossipsub[t].excl(peer.id) @@ -249,8 +245,6 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout[t].len.int64, labelValues = [t]) - finally: - g.heartbeatLock.release() method subscribeToPeer*(p: GossipSub, conn: Connection) {.async.} = @@ -264,8 +258,7 @@ method subscribeTopic*(g: GossipSub, await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) # must avoid running this while manipulating mesh/gossip tables - await g.heartbeatLock.acquire() - try: + withLock g.heartbeatLock: if topic notin g.gossipsub: g.gossipsub[topic] = initHashSet[string]() @@ -282,8 +275,6 @@ method subscribeTopic*(g: GossipSub, .set(g.gossipsub[topic].len.int64, labelValues = [topic]) trace "gossip peers", peers = g.gossipsub[topic].len, topic - finally: - g.heartbeatLock.release() proc handleGraft(g: GossipSub, peer: PubSubPeer, @@ -486,14 +477,11 @@ method publish*(g: GossipSub, trace "publish: sending message to peer", peer = p sent.add(peer.send(@[RPCMsg(messages: @[msg])])) else: - # Notice this needs a better fix! for now it's a hack - error "publish: peer or peerInfo was nil", missing = p - if topic in g.mesh: - g.mesh[topic].excl(p) - if topic in g.fanout: - g.fanout[topic].excl(p) - if topic in g.gossipsub: - g.gossipsub[topic].excl(p) + # this absolutely should not happen + # if it happens there is a bug that needs fixing asap + # this ain't no place to manage connections + fatal "publish: peer or peerInfo was nil", missing = p + doAssert(false, "publish: peer or peerInfo was nil") sent = await allFinished(sent) checkFutures(sent) @@ -511,14 +499,10 @@ method start*(g: GossipSub) {.async.} = ## start pubsub ## start long running/repeating procedures - # interlock start to to avoid overlapping to stops - await g.heartbeatLock.acquire() - - # setup the heartbeat interval - g.heartbeatRunning = true - g.heartbeatFut = g.heartbeat() - - g.heartbeatLock.release() + withLock g.heartbeatLock: + # setup the heartbeat interval + g.heartbeatRunning = true + g.heartbeatFut = g.heartbeat() method stop*(g: GossipSub) {.async.} = trace "gossipsub stop" @@ -526,15 +510,12 @@ method stop*(g: GossipSub) {.async.} = ## stop pubsub ## stop long running tasks - await g.heartbeatLock.acquire() - - # stop heartbeat interval - g.heartbeatRunning = false - if not g.heartbeatFut.finished: - trace "awaiting last heartbeat" - await g.heartbeatFut - - g.heartbeatLock.release() + withLock g.heartbeatLock: + # stop heartbeat interval + g.heartbeatRunning = false + if not g.heartbeatFut.finished: + trace "awaiting last heartbeat" + await g.heartbeatFut method initPubSub*(g: GossipSub) = procCall FloodSub(g).initPubSub() diff --git a/libp2p/utility.nim b/libp2p/utility.nim index 94307d139..70b034dac 100644 --- a/libp2p/utility.nim +++ b/libp2p/utility.nim @@ -35,3 +35,10 @@ func shortLog*(item: string): string = result &= item[0..