add more heartbeat locking to prevent races

This commit is contained in:
Giovanni Petrantoni 2020-07-04 14:04:44 +09:00
parent 0d0309a601
commit ad7db1ca26
2 changed files with 38 additions and 34 deletions

View File

@ -220,31 +220,33 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
## handle peer disconnects
trace "peer disconnected", peer=peer.id
await procCall FloodSub(g).handleDisconnect(peer)
# must avoid running this while manipulating mesh/gossip tables
await g.heartbeatLock.acquire()
try:
await procCall FloodSub(g).handleDisconnect(peer)
for t in toSeq(g.gossipsub.keys):
if t in g.gossipsub:
for t in toSeq(g.gossipsub.keys):
g.gossipsub[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t])
# mostly for metrics
await procCall PubSub(g).subscribeTopic(t, false, peer.id)
# mostly for metrics
await procCall PubSub(g).subscribeTopic(t, false, peer.id)
for t in toSeq(g.mesh.keys):
if t in g.mesh:
for t in toSeq(g.mesh.keys):
g.mesh[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh[t].len.int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh[t].len.int64, labelValues = [t])
for t in toSeq(g.fanout.keys):
if t in g.fanout:
for t in toSeq(g.fanout.keys):
g.fanout[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout[t].len.int64, labelValues = [t])
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout[t].len.int64, labelValues = [t])
finally:
g.heartbeatLock.release()
method subscribeToPeer*(p: GossipSub,
conn: Connection) {.async.} =
@ -255,28 +257,30 @@ method subscribeTopic*(g: GossipSub,
topic: string,
subscribe: bool,
peerId: string) {.gcsafe, async.} =
await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId)
# must avoid running this while manipulating mesh/gossip tables
await g.heartbeatLock.acquire()
try:
await procCall PubSub(g).subscribeTopic(topic, subscribe, peerId)
if topic notin g.gossipsub:
g.gossipsub[topic] = initHashSet[string]()
if topic notin g.gossipsub:
g.gossipsub[topic] = initHashSet[string]()
if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic
# subscribe remote peer to the topic
g.gossipsub[topic].incl(peerId)
else:
trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe remote peer from the topic
g.gossipsub[topic].excl(peerId)
if subscribe:
trace "adding subscription for topic", peer = peerId, name = topic
# subscribe remote peer to the topic
g.gossipsub[topic].incl(peerId)
else:
trace "removing subscription for topic", peer = peerId, name = topic
# unsubscribe remote peer from the topic
g.gossipsub[topic].excl(peerId)
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub[topic].len.int64, labelValues = [topic])
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub[topic].len.int64, labelValues = [topic])
trace "gossip peers", peers = g.gossipsub[topic].len, topic
# also rebalance current topic if we are subbed to
if topic in g.topics:
await g.rebalanceMesh(topic)
trace "gossip peers", peers = g.gossipsub[topic].len, topic
finally:
g.heartbeatLock.release()
proc handleGraft(g: GossipSub,
peer: PubSubPeer,

View File

@ -100,7 +100,7 @@ method rpcHandler*(p: PubSub,
method handleDisconnect*(p: PubSub, peer: PubSubPeer) {.async, base.} =
## handle peer disconnects
if peer.id in p.peers:
trace "deleting peer", id = peer.id
trace "deleting peer", id = peer.id, trace = getStackTrace()
p.peers.del(peer.id)
# metrics