avoid crash when subscribe is received (#333)
...by making subscribeTopic synchronous, avoiding a peer table lookup completely. rebalanceMesh will be called a second later - it's fine
This commit is contained in:
parent
ab864fc747
commit
b12145dff7
|
@ -31,9 +31,9 @@ type
|
||||||
method subscribeTopic*(f: FloodSub,
|
method subscribeTopic*(f: FloodSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
subscribe: bool,
|
subscribe: bool,
|
||||||
peerId: PeerID) {.gcsafe, async.} =
|
peer: PubsubPeer) {.gcsafe.} =
|
||||||
await procCall PubSub(f).subscribeTopic(topic, subscribe, peerId)
|
procCall PubSub(f).subscribeTopic(topic, subscribe, peer)
|
||||||
let peer = f.peers.getOrDefault(peerId)
|
|
||||||
if topic notin f.floodsub:
|
if topic notin f.floodsub:
|
||||||
f.floodsub[topic] = initHashSet[PubSubPeer]()
|
f.floodsub[topic] = initHashSet[PubSubPeer]()
|
||||||
|
|
||||||
|
|
|
@ -281,18 +281,13 @@ method unsubscribePeer*(g: GossipSub, peer: PeerID) =
|
||||||
method subscribeTopic*(g: GossipSub,
|
method subscribeTopic*(g: GossipSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
subscribe: bool,
|
subscribe: bool,
|
||||||
peerId: PeerID) {.gcsafe, async.} =
|
peer: PubSubPeer) {.gcsafe.} =
|
||||||
await procCall FloodSub(g).subscribeTopic(topic, subscribe, peerId)
|
procCall FloodSub(g).subscribeTopic(topic, subscribe, peer)
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
peer = $peerId
|
peer = $peer.id
|
||||||
topic
|
topic
|
||||||
|
|
||||||
let peer = g.peers.getOrDefault(peerId)
|
|
||||||
if peer == nil:
|
|
||||||
# floodsub method logs a debug line already
|
|
||||||
return
|
|
||||||
|
|
||||||
if subscribe:
|
if subscribe:
|
||||||
trace "peer subscribed to topic"
|
trace "peer subscribed to topic"
|
||||||
# subscribe remote peer to the topic
|
# subscribe remote peer to the topic
|
||||||
|
@ -316,10 +311,6 @@ method subscribeTopic*(g: GossipSub,
|
||||||
|
|
||||||
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
trace "gossip peers", peers = g.gossipsub.peers(topic), topic
|
||||||
|
|
||||||
# also rebalance current topic if we are subbed to
|
|
||||||
if topic in g.topics:
|
|
||||||
await g.rebalanceMesh(topic)
|
|
||||||
|
|
||||||
proc handleGraft(g: GossipSub,
|
proc handleGraft(g: GossipSub,
|
||||||
peer: PubSubPeer,
|
peer: PubSubPeer,
|
||||||
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
grafts: seq[ControlGraft]): seq[ControlPrune] =
|
||||||
|
|
|
@ -119,7 +119,7 @@ proc sendSubs*(p: PubSub,
|
||||||
method subscribeTopic*(p: PubSub,
|
method subscribeTopic*(p: PubSub,
|
||||||
topic: string,
|
topic: string,
|
||||||
subscribe: bool,
|
subscribe: bool,
|
||||||
peerId: PeerID) {.base, async.} =
|
peer: PubSubPeer) {.base.} =
|
||||||
# called when remote peer subscribes to a topic
|
# called when remote peer subscribes to a topic
|
||||||
discard
|
discard
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ method rpcHandler*(p: PubSub,
|
||||||
if m.subscriptions.len > 0: # if there are any subscriptions
|
if m.subscriptions.len > 0: # if there are any subscriptions
|
||||||
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
for s in m.subscriptions: # subscribe/unsubscribe the peer for each topic
|
||||||
trace "about to subscribe to topic", topicId = s.topic
|
trace "about to subscribe to topic", topicId = s.topic
|
||||||
await p.subscribeTopic(s.topic, s.subscribe, peer.peerId)
|
p.subscribeTopic(s.topic, s.subscribe, peer)
|
||||||
|
|
||||||
proc getOrCreatePeer*(
|
proc getOrCreatePeer*(
|
||||||
p: PubSub,
|
p: PubSub,
|
||||||
|
|
Loading…
Reference in New Issue