diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 3ec69adde..c9ca57e96 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -31,9 +31,9 @@ type method subscribeTopic*(f: FloodSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId) - let peer = f.peers.getOrDefault(peerId) + peer: PubsubPeer) {.gcsafe.} = + procCall PubSub(f).subscribeTopic(topic, subscribe, peer) + if topic notin f.floodsub: f.floodsub[topic] = initHashSet[PubSubPeer]() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 9195207c0..468959f37 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -281,18 +281,13 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) = method subscribeTopic*(g: GossipSub, topic: string, subscribe: bool, - peerId: PeerID) {.gcsafe, async.} = - await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId) + peer: PubSubPeer) {.gcsafe.} = + procCall FloodSub(g).subscribeTopic(topic, subscribe, peer) logScope: - peer = $peerId + peer = $peer.id topic - let peer = g.peers.getOrDefault(peerId) - if peer == nil: - # floodsub method logs a debug line already - return - if subscribe: trace "peer subscribed to topic" # subscribe remote peer to the topic @@ -316,10 +311,6 @@ method subscribeTopic*(g: GossipSub, trace "gossip peers", peers = g.gossipsub.peers(topic), topic - # also rebalance current topic if we are subbed to - if topic in g.topics: - await g.rebalanceMesh(topic) - proc handleGraft(g: GossipSub, peer: PubSubPeer, grafts: seq[ControlGraft]): seq[ControlPrune] = diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 17e0203e8..72d5f527e 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -119,7 +119,7 @@ proc sendSubs*(p: PubSub, method subscribeTopic*(p: PubSub, topic: string, subscribe: bool, - peerId: PeerID) {.base, async.} = + peer: PubSubPeer) {.base.} = # called when remote peer subscribes to a topic discard @@ -134,7 +134,7 @@ method rpcHandler*(p: PubSub, if m.subscriptions.len > 0: # if there are any subscriptions for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic trace "about to subscribe to topic", topicId = s.topic - await p.subscribeTopic(s.topic, s.subscribe, peer.peerId) + p.subscribeTopic(s.topic, s.subscribe, peer) proc getOrCreatePeer*( p: PubSub,