From ad7db1ca26782d8f9b81b1156bbde81d75aa4189 Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sat, 4 Jul 2020 14:04:44 +0900 Subject: [PATCH] add more heartbeat locking to prevent races --- libp2p/protocols/pubsub/gossipsub.nim | 70 ++++++++++++++------------- libp2p/protocols/pubsub/pubsub.nim | 2 +- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 82d10a7a1..2cbf783e0 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -220,31 +220,33 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = ## handle peer disconnects trace "peer disconnected", peer=peer.id - await procCall FloodSub(g).handleDisconnect(peer) + # must avoid running this while manipulating mesh/gossip tables + await g.heartbeatLock.acquire() + try: + await procCall FloodSub(g).handleDisconnect(peer) - for t in toSeq(g.gossipsub.keys): - if t in g.gossipsub: + for t in toSeq(g.gossipsub.keys): g.gossipsub[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) - # mostly for metrics - await procCall PubSub(g).subscribeTopic(t, false, peer.id) + # mostly for metrics + await procCall PubSub(g).subscribeTopic(t, false, peer.id) - for t in toSeq(g.mesh.keys): - if t in g.mesh: + for t in toSeq(g.mesh.keys): g.mesh[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_mesh - .set(g.mesh[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_mesh + .set(g.mesh[t].len.int64, labelValues = [t]) - for t in toSeq(g.fanout.keys): - if t in g.fanout: + for t in toSeq(g.fanout.keys): g.fanout[t].excl(peer.id) - libp2p_gossipsub_peers_per_topic_fanout - .set(g.fanout[t].len.int64, labelValues = [t]) + libp2p_gossipsub_peers_per_topic_fanout + .set(g.fanout[t].len.int64, labelValues = [t]) + finally: + g.heartbeatLock.release() method subscribeToPeer*(p: GossipSub, conn: Connection) {.async.} = @@ -255,28 +257,30 @@ method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, peerId: string) {.gcsafe, async.} = - await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) + + # must avoid running this while manipulating mesh/gossip tables + await g.heartbeatLock.acquire() + try: + await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId) - if topic notin g.gossipsub: - g.gossipsub[topic] = initHashSet[string]() + if topic notin g.gossipsub: + g.gossipsub[topic] = initHashSet[string]() - if subscribe: - trace "adding subscription for topic", peer = peerId, name = topic - # subscribe remote peer to the topic - g.gossipsub[topic].incl(peerId) - else: - trace "removing subscription for topic", peer = peerId, name = topic - # unsubscribe remote peer from the topic - g.gossipsub[topic].excl(peerId) + if subscribe: + trace "adding subscription for topic", peer = peerId, name = topic + # subscribe remote peer to the topic + g.gossipsub[topic].incl(peerId) + else: + trace "removing subscription for topic", peer = peerId, name = topic + # unsubscribe remote peer from the topic + g.gossipsub[topic].excl(peerId) - libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[topic].len.int64, labelValues = [topic]) + libp2p_gossipsub_peers_per_topic_gossipsub + .set(g.gossipsub[topic].len.int64, labelValues = [topic]) - trace "gossip peers", peers = g.gossipsub[topic].len, topic - - # also rebalance current topic if we are subbed to - if topic in g.topics: - await g.rebalanceMesh(topic) + trace "gossip peers", peers = g.gossipsub[topic].len, topic + finally: + g.heartbeatLock.release() proc handleGraft(g: GossipSub, peer: PubSubPeer, diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 119b136c4..b17739834 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -100,7 +100,7 @@ method rpcHandler*(p: PubSub, method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} = ## handle peer disconnects if peer.id in p.peers: - trace "deleting peer", id = peer.id + trace "deleting peer", id = peer.id, trace = getStackTrace() p.peers.del(peer.id) # metrics