From 7a1c1c2ea66b89ee4f9eadc824ebddc743682dd7 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 19 Jun 2020 15:19:07 -0600 Subject: [PATCH] fixing some key not found exceptions (#231) --- libp2p/protocols/pubsub/floodsub.nim | 12 +++++---- libp2p/protocols/pubsub/gossipsub.nim | 35 +++++++++++++++++---------- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 762162d..7f68e6d 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -52,8 +52,9 @@ method handleDisconnect*(f: FloodSub, peer: PubSubPeer) {.async.} = await procCall PubSub(f).handleDisconnect(peer) ## handle peer disconnects - for t in f.floodsub.keys: - f.floodsub[t].excl(peer.id) + for t in toSeq(f.floodsub.keys): + if t in f.floodsub: + f.floodsub[t].excl(peer.id) method rpcHandler*(f: FloodSub, peer: PubSubPeer, @@ -131,9 +132,10 @@ method publish*(f: FloodSub, let msg = newMessage(f.peerInfo, data, topic, f.sign) var sent: seq[Future[void]] # start the future but do not wait yet - for p in f.floodsub[topic]: - trace "publishing message", name = topic, peer = p, data = data.shortLog - sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) + for p in f.floodsub.getOrDefault(topic): + if p in f.peers: + trace "publishing message", name = topic, peer = p, data = data.shortLog + sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) # wait for all the futures now sent = await allFinished(sent) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 10f4244..4b328c4 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -200,11 +200,12 @@ proc getGossipPeers(g: GossipSub): Table[string, ControlMessage] {.gcsafe.} = break let id = toSeq(g.gossipsub.getOrDefault(topic)).sample() - g.gossipsub[topic].excl(id) - if id notin gossipPeers: - if id notin result: - result[id] = ControlMessage() - result[id].ihave.add(ihave) + if id in g.gossipsub.getOrDefault(topic): + g.gossipsub[topic].excl(id) + if id notin gossipPeers: + if id notin result: + result[id] = ControlMessage() + result[id].ihave.add(ihave) libp2p_gossipsub_peers_per_topic_gossipsub .set(g.gossipsub.getOrDefault(topic).len.int64, labelValues = [topic]) @@ -222,12 +223,14 @@ proc heartbeat(g: GossipSub) {.async.} = let peers = g.getGossipPeers() var sent: seq[Future[void]] for peer in peers.keys: - sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) + if peer in g.peers: + sent &= g.peers[peer].send(@[RPCMsg(control: some(peers[peer]))]) checkFutures(await allFinished(sent)) g.mcache.shift() # shift the cache except CatchableError as exc: trace "exception ocurred in gossipsub heartbeat", exc = exc.msg + continue finally: g.heartbeatLock.release() @@ -239,21 +242,27 @@ method handleDisconnect*(g: GossipSub, peer: PubSubPeer) {.async.} = await procCall FloodSub(g).handleDisconnect(peer) - for t in g.gossipsub.keys: - g.gossipsub[t].excl(peer.id) + for t in toSeq(g.gossipsub.keys): + if t in g.gossipsub: + g.gossipsub[t].excl(peer.id) + libp2p_gossipsub_peers_per_topic_gossipsub - .set(g.gossipsub[t].len.int64, labelValues = [t]) + .set(g.gossipsub.getOrDefault(t).len.int64, labelValues = [t]) # mostly for metrics await procCall PubSub(g).subscribeTopic(t, false, peer.id) - for t in g.mesh.keys: - g.mesh[t].excl(peer.id) + for t in toSeq(g.mesh.keys): + if t in g.mesh: + g.mesh[t].excl(peer.id) + libp2p_gossipsub_peers_per_topic_mesh .set(g.mesh[t].len.int64, labelValues = [t]) - for t in g.fanout.keys: - g.fanout[t].excl(peer.id) + for t in toSeq(g.fanout.keys): + if t in g.fanout: + g.fanout[t].excl(peer.id) + libp2p_gossipsub_peers_per_topic_fanout .set(g.fanout[t].len.int64, labelValues = [t])