From 303ec297da849513026a377138ea7bd6e78216eb Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Sat, 11 Apr 2020 13:08:25 +0900 Subject: [PATCH] Start removing allFutures (#125) * Start removing allFutures * More allfutures removal * Complete allFutures removal except legacy and tests * Introduce table values copies to prevent error * Switch to allFinished * Resolve TODOs in flood/gossip * muxer handler, log and re-raise * Add a common and flexible way to check multiple futures --- libp2p/errors.nim | 39 +++++++++++++++++++++++++++ libp2p/muxers/mplex/lpchannel.nim | 24 ++++++++++++++--- libp2p/muxers/mplex/mplex.nim | 11 ++++++-- libp2p/muxers/muxer.nim | 24 +++++++++++------ libp2p/protocols/pubsub/floodsub.nim | 13 ++++++--- libp2p/protocols/pubsub/gossipsub.nim | 9 ++++--- libp2p/protocols/pubsub/pubsub.nim | 5 ++-- libp2p/switch.nim | 17 +++++++++--- libp2p/transports/tcptransport.nim | 3 ++- libp2p/transports/transport.nim | 6 +++-- 10 files changed, 121 insertions(+), 30 deletions(-) create mode 100644 libp2p/errors.nim diff --git a/libp2p/errors.nim b/libp2p/errors.nim new file mode 100644 index 000000000..35ea14bff --- /dev/null +++ b/libp2p/errors.nim @@ -0,0 +1,39 @@ +# this module will be further extended in PR +# https://github.com/status-im/nim-libp2p/pull/107/ + +import chronos +import chronicles +import macros + +# could not figure how to make it with a simple template +# sadly nim needs more love for hygenic templates +# so here goes the macro, its based on the proc/template version +# and uses quote do so it's quite readable + +macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = + let nexclude = exclude.len + case nexclude + of 0: + quote do: + let pos = instantiationInfo() + for res in `futs`: + if res.failed: + let exc = res.readError() + # We still don't abort but warn + warn "Something went wrong in a future", + error=exc.name, file=pos.filename, line=pos.line + else: + quote do: + let pos = instantiationInfo() + for res in `futs`: + block check: + if res.failed: + let exc = res.readError() + for i in 0..<`nexclude`: + if exc of `exclude`[i]: + trace "Ignoring an error (no warning)", + error=exc.name, file=pos.filename, line=pos.line + break check + # We still don't abort but warn + warn "Something went wrong in a future", + error=exc.name, file=pos.filename, line=pos.line diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 6e1b2a19f..73895cbe0 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -14,7 +14,8 @@ import types, ../../stream/bufferstream, ../../stream/lpstream, ../../connection, - ../../utility + ../../utility, + ../../errors logScope: topic = "MplexChannel" @@ -93,12 +94,27 @@ proc resetMessage(s: LPChannel) {.async.} = await s.conn.writeMsg(s.id, s.resetCode) proc resetByRemote*(s: LPChannel) {.async.} = - await allFutures(s.close(), s.closedByRemote()) + # Immediately block futher calls s.isReset = true - await s.cleanUp() + + # start and await async teardown + let + futs = await allFinished( + s.close(), + s.closedByRemote(), + s.cleanUp() + ) + + checkFutures(futs, [LPStreamEOFError]) proc reset*(s: LPChannel) {.async.} = - await allFutures(s.resetMessage(), s.resetByRemote()) + let + futs = await allFinished( + s.resetMessage(), + s.resetByRemote() + ) + + checkFutures(futs, [LPStreamEOFError]) method closed*(s: LPChannel): bool = trace "closing lpchannel", id = s.id, initiator = s.initiator diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 6b4745827..85290e0ad 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -17,6 +17,7 @@ import ../muxer, ../../connection, ../../stream/lpstream, ../../utility, + ../../errors, coder, types, lpchannel @@ -154,10 +155,16 @@ method newStream*(m: Mplex, method close*(m: Mplex) {.async, gcsafe.} = trace "closing mplex muxer" + if not m.connection.closed(): await m.connection.close() - await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())), - allFutures(toSeq(m.local.values).mapIt(it.reset()))]) + let + futs = await allFinished( + toSeq(m.remote.values).mapIt(it.reset()) & + toSeq(m.local.values).mapIt(it.reset())) + + checkFutures(futs) + m.remote.clear() m.local.clear() diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 4b2a71d1a..4cba14058 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -9,7 +9,8 @@ import chronos, chronicles import ../protocols/protocol, - ../connection + ../connection, + ../errors logScope: topic = "Muxer" @@ -45,15 +46,22 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let muxer = c.newMuxer(conn) - var handlerFut = if not isNil(c.muxerHandler): - c.muxerHandler(muxer) - else: - var dummyFut = newFuture[void]() - dummyFut.complete(); dummyFut + let + muxer = c.newMuxer(conn) if not isNil(c.streamHandler): muxer.streamHandler = c.streamHandler - await allFutures(muxer.handle(), handlerFut) + var futs = newSeq[Future[void]]() + + futs &= muxer.handle() + + # finally await both the futures + if not isNil(c.muxerHandler): + futs &= c.muxerHandler(muxer) + + # log and re-raise on errors + futs = await allFinished(futs) + checkFutures(futs) + c.handler = handler diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index 9bc52e451..5a95a6314 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -17,7 +17,8 @@ import pubsub, ../../connection, ../../peerinfo, ../../peer, - ../../utility + ../../utility, + ../../errors logScope: topic = "FloodSub" @@ -85,10 +86,13 @@ method rpcHandler*(f: FloodSub, # forward the message to all peers interested in it var sent: seq[Future[void]] + # start the future but do not wait yet for p in toSendPeers: if p in f.peers and f.peers[p].id != peer.id: sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) - await allFutures(sent) + # wait for all the futures now + sent = await allFinished(sent) + checkFutures(sent) method init(f: FloodSub) = proc handler(conn: Connection, proto: string) {.async.} = @@ -118,10 +122,13 @@ method publish*(f: FloodSub, trace "publishing on topic", name = topic let msg = newMessage(f.peerInfo, data, topic) var sent: seq[Future[void]] + # start the future but do not wait yet for p in f.floodsub[topic]: trace "publishing message", name = topic, peer = p, data = data.shortLog sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) - await allFutures(sent) + # wait for all the futures now + sent = await allFinished(sent) + checkFutures(sent) method unsubscribe*(f: FloodSub, topics: seq[TopicPair]) {.async.} = diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index cb421b78f..8afee014a 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -19,7 +19,8 @@ import pubsub, ../protocol, ../../peerinfo, ../../connection, - ../../peer + ../../peer, + ../../errors logScope: topic = "GossipSub" @@ -220,7 +221,8 @@ method rpcHandler(g: GossipSub, if msgs.len > 0: trace "forwarding message to", peerId = id sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) - await allFutures(sent) + sent = await allFinished(sent) + checkFutures(sent) var respControl: ControlMessage if m.control.isSome: @@ -408,7 +410,8 @@ method publish*(g: GossipSub, trace "publishing on topic", name = topic g.mcache.put(msg) sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) - await allFutures(sent) + sent = await allFinished(sent) + checkFutures(sent) method start*(g: GossipSub) {.async.} = ## start pubsub diff --git a/libp2p/protocols/pubsub/pubsub.nim b/libp2p/protocols/pubsub/pubsub.nim index 636322497..a6b9e9931 100644 --- a/libp2p/protocols/pubsub/pubsub.nim +++ b/libp2p/protocols/pubsub/pubsub.nim @@ -238,9 +238,8 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} = # TODO: add timeout to validator pending.add(p.validators[topic].mapIt(it(topic, message))) - await allFutures(pending) - if pending.allIt(it.read()): # only if all passed - result = true + let futs = await allFinished(pending) + result = futs.allIt(not it.failed and it.read()) proc newPubSub*(p: typedesc[PubSub], peerInfo: PeerInfo, diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 4c33f4f84..dfca8a45c 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -19,6 +19,7 @@ import connection, protocols/identify, protocols/pubsub/pubsub, muxers/muxer, + errors, peer logScope: @@ -309,11 +310,19 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc stop*(s: Switch) {.async.} = trace "stopping switch" - if s.pubSub.isSome: - await s.pubSub.get().stop() + # we want to report erros but we do not want to fail + # or crash here, cos we need to clean possibly MANY items + # and any following conn/transport won't be cleaned up + var futs = newSeq[Future[void]]() - await allFutures(toSeq(s.connections.values).mapIt(s.cleanupConn(it))) - await allFutures(s.transports.mapIt(it.close())) + if s.pubSub.isSome: + futs &= s.pubSub.get().stop() + + futs &= toSeq(s.connections.values).mapIt(s.cleanupConn(it)) + futs &= s.transports.mapIt(it.close()) + + futs = await allFinished(futs) + checkFutures(futs) proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index cbd2985e0..6e260045c 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -61,7 +61,8 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = t.server.stop() t.server.close() await t.server.join() - trace "transport stopped" + + trace "transport stopped" method listen*(t: TcpTransport, ma: MultiAddress, diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 564afd702..72b01ebae 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -11,7 +11,8 @@ import sequtils import chronos, chronicles import ../connection, ../multiaddress, - ../multicodec + ../multicodec, + ../errors type ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.} @@ -33,7 +34,8 @@ proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} = method close*(t: Transport) {.base, async, gcsafe.} = ## stop and cleanup the transport ## including all outstanding connections - await allFutures(t.connections.mapIt(it.close())) + let futs = await allFinished(t.connections.mapIt(it.close())) + checkFutures(futs) method listen*(t: Transport, ma: MultiAddress,