diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index c9ca57e96..382db1fd0 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -157,13 +157,13 @@ method unsubscribe*(f: FloodSub, await procCall PubSub(f).unsubscribe(topics) for p in f.peers.values: - await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) + discard await f.sendSubs(p, topics.mapIt(it.topic).deduplicate(), false) method unsubscribeAll*(f: FloodSub, topic: string) {.async.} = await procCall PubSub(f).unsubscribeAll(topic) for p in f.peers.values: - await f.sendSubs(p, @[topic], false) + discard await f.sendSubs(p, @[topic], false) method initPubSub*(f: FloodSub) = procCall PubSub(f).initPubSub() diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 468959f37..fbda35dcc 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -229,7 +229,7 @@ proc heartbeat(g: GossipSub) {.async.} = g.replenishFanout(t) let peers = g.getGossipPeers() - var sent: seq[Future[void]] + var sent: seq[Future[bool]] for peer, control in peers: g.peers.withValue(peer.peerId, pubsubPeer) do: sent &= g.send( @@ -450,10 +450,14 @@ method rpcHandler*(g: GossipSub, respControl.ihave.len > 0: try: info "sending control message", msg = respControl - await g.send( + let sent = await g.send( peer, RPCMsg(control: some(respControl), messages: messages), DefaultSendTimeout) + + if not sent: + g.unsubscribePeer(peer.peerId) + except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 6bad2d85b..8c4a6dff2 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -8,7 +8,7 @@ ## those terms. import std/[tables, sequtils, sets] -import chronos, chronicles, chronicles/chronos_tools, metrics +import chronos, chronicles, metrics import pubsubpeer, rpc/[message, messages], ../../switch, @@ -75,20 +75,20 @@ proc send*( p: PubSub, peer: PubSubPeer, msg: RPCMsg, - timeout: Duration) {.async.} = + timeout: Duration): Future[bool] {.async.} = ## send to remote peer ## trace "sending pubsub message to peer", peer = $peer, msg = shortLog(msg) try: await peer.send(msg, timeout) + return true except CancelledError as exc: raise exc except CatchableError as exc: trace "exception sending pubsub message to peer", peer = $peer, msg = shortLog(msg) p.unsubscribePeer(peer.peerId) - raise exc proc broadcast*( p: PubSub, @@ -102,12 +102,12 @@ proc broadcast*( peers = sendPeers.len, message = shortLog(msg) let sent = await allFinished( sendPeers.mapIt( p.send(it, msg, timeout) )) - return sent.filterIt( it.finished and it.error.isNil ).len + return sent.filterIt( it.finished and it.read ).len proc sendSubs*(p: PubSub, peer: PubSubPeer, topics: seq[string], - subscribe: bool): Future[void] = + subscribe: bool): Future[bool] = ## send subscriptions to remote peer p.send( peer, @@ -175,11 +175,11 @@ method handleConn*(p: PubSub, # call pubsub rpc handler await p.rpcHandler(peer, msgs) - let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) - if p.topics.len > 0: - await p.sendSubs(peer, toSeq(p.topics.keys), true) - try: + let peer = p.getOrCreatePeer(conn.peerInfo.peerId, proto) + if p.topics.len > 0: + discard await p.sendSubs(peer, toSeq(p.topics.keys), true) + peer.handler = handler await peer.handle(conn) # spawn peer read loop trace "pubsub peer handler ended", peer = peer.id @@ -201,7 +201,7 @@ method subscribePeer*(p: PubSub, peer: PeerID) {.base.} = # to escape to the poll loop. # With a bit of luck, it may be harmless to ignore exceptions here - # some cleanup is eventually done in PubSubPeer.send - traceAsyncErrors p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true) + asyncCheck p.sendSubs(pubsubPeer, toSeq(p.topics.keys), true) pubsubPeer.subscribed = true @@ -249,7 +249,7 @@ method subscribe*(p: PubSub, p.topics[topic].handler.add(handler) - var sent: seq[Future[void]] + var sent: seq[Future[bool]] for peer in toSeq(p.peers.values): sent.add(p.sendSubs(peer, @[topic], true)) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index a0fb92141..5cd33c1e6 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -129,7 +129,8 @@ proc initBufferStream*(s: BufferStream, if not(isNil(handler)): s.writeHandler = proc (data: seq[byte]) {.async, gcsafe.} = defer: - s.writeLock.release() + if s.writeLock.locked: + s.writeLock.release() # Using a lock here to guarantee # proper write ordering. This is @@ -137,6 +138,7 @@ proc initBufferStream*(s: BufferStream, # implementing half-closed in mplex # or other functionality that requires # strict message ordering + await s.writeLock.acquire() await handler(data)