fixing some key not found exceptions (#231)

This commit is contained in:
Dmitriy Ryajov 2020-06-19 15:19:07 -06:00 committed by GitHub
parent 5b28e8c488
commit 7a1c1c2ea6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 18 deletions

View File

@ -52,8 +52,9 @@ method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} =
await procCall PubSub(f).handleDisconnect(peer)
## handle peer disconnects
for t in f.floodsub.keys:
f.floodsub[t].excl(peer.id)
for t in toSeq(f.floodsub.keys):
if t in f.floodsub:
f.floodsub[t].excl(peer.id)
method rpcHandler*(f: FloodSub,
peer: PubSubPeer,
@ -131,9 +132,10 @@ method publish*(f: FloodSub,
let msg = newMessage(f.peerInfo, data, topic, f.sign)
var sent: seq[Future[void]]
# start the future but do not wait yet
for p in f.floodsub[topic]:
trace "publishing message", name = topic, peer = p, data = data.shortLog
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
for p in f.floodsub.getOrDefault(topic):
if p in f.peers:
trace "publishing message", name = topic, peer = p, data = data.shortLog
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
# wait for all the futures now
sent = await allFinished(sent)

View File

@ -200,11 +200,12 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} =
break
let id = toSeq(g.gossipsub.getOrDefault(topic)).sample()
g.gossipsub[topic].excl(id)
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
result[id].ihave.add(ihave)
if id in g.gossipsub.getOrDefault(topic):
g.gossipsub[topic].excl(id)
if id notin gossipPeers:
if id notin result:
result[id] = ControlMessage()
result[id].ihave.add(ihave)
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic])
@ -222,12 +223,14 @@ proc heartbeat(g: GossipSub) {.async.} =
let peers = g.getGossipPeers()
var sent: seq[Future[void]]
for peer in peers.keys:
sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))])
if peer in g.peers:
sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))])
checkFutures(await allFinished(sent))
g.mcache.shift() # shift the cache
except CatchableError as exc:
trace "exception ocurred in gossipsub heartbeat", exc = exc.msg
continue
finally:
g.heartbeatLock.release()
@ -239,21 +242,27 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} =
await procCall FloodSub(g).handleDisconnect(peer)
for t in g.gossipsub.keys:
g.gossipsub[t].excl(peer.id)
for t in toSeq(g.gossipsub.keys):
if t in g.gossipsub:
g.gossipsub[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_gossipsub
.set(g.gossipsub[t].len.int64, labelValues = [t])
.set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t])
# mostly for metrics
await procCall PubSub(g).subscribeTopic(t, false, peer.id)
for t in g.mesh.keys:
g.mesh[t].excl(peer.id)
for t in toSeq(g.mesh.keys):
if t in g.mesh:
g.mesh[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_mesh
.set(g.mesh[t].len.int64, labelValues = [t])
for t in g.fanout.keys:
g.fanout[t].excl(peer.id)
for t in toSeq(g.fanout.keys):
if t in g.fanout:
g.fanout[t].excl(peer.id)
libp2p_gossipsub_peers_per_topic_fanout
.set(g.fanout[t].len.int64, labelValues = [t])