From b12145dff778bbda0e8115e0643662bc11f8cbfa Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 17 Aug 2020 12:10:22 +0200 Subject: [PATCH] avoid crash when subscribe is received (#333) ...by making subscribeTopic synchronous, avoiding a peer table lookup completely. rebalanceMesh will be called a second later - it's fine --- libp2p/protocols/pubsub/floodsub.nim | 6 +++--- libp2p/protocols/pubsub/gossipsub.nim | 15 +++------------ libp2p/protocols/pubsub/pubsub.nim | 4 ++-- 3 files changed, 8 insertions(+), 17 deletions(-) diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3ec69ad..c9ca57e 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -31,9 +31,9 @@ type method subscribeTopic*(f: FloodSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) - let peer = f.peers.getOrDefault(peerId) + peer: PubsubPeer) {.gcsafe.} = + procCall PubSub(f).subscribeTopic(topic, subscribe, peer) + if topic notin f.floodsub: f.floodsub[topic] = initHashSet[PubSubPeer]() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9195207..468959f 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -281,18 +281,13 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) + peer: PubSubPeer) {.gcsafe.} = + procCall FloodSub(g).subscribeTopic(topic, subscribe, peer) logScope: - peer = $peerId + peer = $peer.id topic - let peer = g.peers.getOrDefault(peerId) - if peer == nil: - # floodsub method logs a debug line already - return - if subscribe: trace "peer subscribed to topic" # subscribe remote peer to the topic @@ -316,10 +311,6 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic - # also rebalance current topic if we are subbed to - if topic in g.topics: - await g.rebalanceMesh(topic) - proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 17e0203..72d5f52 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -119,7 +119,7 @@ proc sendSubs*(p: PubSub, method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, - peerId: PeerID) {.base, async.} = + peer: PubSubPeer) {.base.} = # called when remote peer subscribes to a topic discard @@ -134,7 +134,7 @@ method rpcHandler*(p: PubSub, if m.subscriptions.len > 0: # if there are any subscriptions for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic trace "about to subscribe to topic", topicId = s.topic - await p.subscribeTopic(s.topic, s.subscribe, peer.peerId) + p.subscribeTopic(s.topic, s.subscribe, peer) proc getOrCreatePeer*( p: PubSub,