Start removing allFutures (#125)

* Start removing allFutures

* More allfutures removal

* Complete allFutures removal except legacy and tests

* Introduce table values copies to prevent error

* Switch to allFinished

* Resolve TODOs in flood/gossip

* muxer handler, log and re-raise

* Add a common and flexible way to check multiple futures
This commit is contained in:
Giovanni Petrantoni 2020-04-11 13:08:25 +09:00 committed by GitHub
parent d1a7c08a0a
commit 303ec297da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 121 additions and 30 deletions

39
libp2p/errors.nim Normal file
View File

@ -0,0 +1,39 @@
# this module will be further extended in PR
# https://github.com/status-im/nim-libp2p/pull/107/
import chronos
import chronicles
import macros
# could not figure how to make it with a simple template
# sadly nim needs more love for hygenic templates
# so here goes the macro, its based on the proc/template version
# and uses quote do so it's quite readable
macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
let nexclude = exclude.len
case nexclude
of 0:
quote do:
let pos = instantiationInfo()
for res in `futs`:
if res.failed:
let exc = res.readError()
# We still don't abort but warn
warn "Something went wrong in a future",
error=exc.name, file=pos.filename, line=pos.line
else:
quote do:
let pos = instantiationInfo()
for res in `futs`:
block check:
if res.failed:
let exc = res.readError()
for i in 0..<`nexclude`:
if exc of `exclude`[i]:
trace "Ignoring an error (no warning)",
error=exc.name, file=pos.filename, line=pos.line
break check
# We still don't abort but warn
warn "Something went wrong in a future",
error=exc.name, file=pos.filename, line=pos.line

View File

@ -14,7 +14,8 @@ import types,
../../stream/bufferstream, ../../stream/bufferstream,
../../stream/lpstream, ../../stream/lpstream,
../../connection, ../../connection,
../../utility ../../utility,
../../errors
logScope: logScope:
topic = "MplexChannel" topic = "MplexChannel"
@ -93,12 +94,27 @@ proc resetMessage(s: LPChannel) {.async.} =
await s.conn.writeMsg(s.id, s.resetCode) await s.conn.writeMsg(s.id, s.resetCode)
proc resetByRemote*(s: LPChannel) {.async.} = proc resetByRemote*(s: LPChannel) {.async.} =
await allFutures(s.close(), s.closedByRemote()) # Immediately block futher calls
s.isReset = true s.isReset = true
await s.cleanUp()
# start and await async teardown
let
futs = await allFinished(
s.close(),
s.closedByRemote(),
s.cleanUp()
)
checkFutures(futs, [LPStreamEOFError])
proc reset*(s: LPChannel) {.async.} = proc reset*(s: LPChannel) {.async.} =
await allFutures(s.resetMessage(), s.resetByRemote()) let
futs = await allFinished(
s.resetMessage(),
s.resetByRemote()
)
checkFutures(futs, [LPStreamEOFError])
method closed*(s: LPChannel): bool = method closed*(s: LPChannel): bool =
trace "closing lpchannel", id = s.id, initiator = s.initiator trace "closing lpchannel", id = s.id, initiator = s.initiator

View File

@ -17,6 +17,7 @@ import ../muxer,
../../connection, ../../connection,
../../stream/lpstream, ../../stream/lpstream,
../../utility, ../../utility,
../../errors,
coder, coder,
types, types,
lpchannel lpchannel
@ -154,10 +155,16 @@ method newStream*(m: Mplex,
method close*(m: Mplex) {.async, gcsafe.} = method close*(m: Mplex) {.async, gcsafe.} =
trace "closing mplex muxer" trace "closing mplex muxer"
if not m.connection.closed(): if not m.connection.closed():
await m.connection.close() await m.connection.close()
await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())), let
allFutures(toSeq(m.local.values).mapIt(it.reset()))]) futs = await allFinished(
toSeq(m.remote.values).mapIt(it.reset()) &
toSeq(m.local.values).mapIt(it.reset()))
checkFutures(futs)
m.remote.clear() m.remote.clear()
m.local.clear() m.local.clear()

View File

@ -9,7 +9,8 @@
import chronos, chronicles import chronos, chronicles
import ../protocols/protocol, import ../protocols/protocol,
../connection ../connection,
../errors
logScope: logScope:
topic = "Muxer" topic = "Muxer"
@ -45,15 +46,22 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider
method init(c: MuxerProvider) = method init(c: MuxerProvider) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
let muxer = c.newMuxer(conn) let
var handlerFut = if not isNil(c.muxerHandler): muxer = c.newMuxer(conn)
c.muxerHandler(muxer)
else:
var dummyFut = newFuture[void]()
dummyFut.complete(); dummyFut
if not isNil(c.streamHandler): if not isNil(c.streamHandler):
muxer.streamHandler = c.streamHandler muxer.streamHandler = c.streamHandler
await allFutures(muxer.handle(), handlerFut) var futs = newSeq[Future[void]]()
futs &= muxer.handle()
# finally await both the futures
if not isNil(c.muxerHandler):
futs &= c.muxerHandler(muxer)
# log and re-raise on errors
futs = await allFinished(futs)
checkFutures(futs)
c.handler = handler c.handler = handler

View File

@ -17,7 +17,8 @@ import pubsub,
../../connection, ../../connection,
../../peerinfo, ../../peerinfo,
../../peer, ../../peer,
../../utility ../../utility,
../../errors
logScope: logScope:
topic = "FloodSub" topic = "FloodSub"
@ -85,10 +86,13 @@ method rpcHandler*(f: FloodSub,
# forward the message to all peers interested in it # forward the message to all peers interested in it
var sent: seq[Future[void]] var sent: seq[Future[void]]
# start the future but do not wait yet
for p in toSendPeers: for p in toSendPeers:
if p in f.peers and f.peers[p].id != peer.id: if p in f.peers and f.peers[p].id != peer.id:
sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)])) sent.add(f.peers[p].send(@[RPCMsg(messages: m.messages)]))
await allFutures(sent) # wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
method init(f: FloodSub) = method init(f: FloodSub) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
@ -118,10 +122,13 @@ method publish*(f: FloodSub,
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
let msg = newMessage(f.peerInfo, data, topic) let msg = newMessage(f.peerInfo, data, topic)
var sent: seq[Future[void]] var sent: seq[Future[void]]
# start the future but do not wait yet
for p in f.floodsub[topic]: for p in f.floodsub[topic]:
trace "publishing message", name = topic, peer = p, data = data.shortLog trace "publishing message", name = topic, peer = p, data = data.shortLog
sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])])) sent.add(f.peers[p].send(@[RPCMsg(messages: @[msg])]))
await allFutures(sent) # wait for all the futures now
sent = await allFinished(sent)
checkFutures(sent)
method unsubscribe*(f: FloodSub, method unsubscribe*(f: FloodSub,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =

View File

@ -19,7 +19,8 @@ import pubsub,
../protocol, ../protocol,
../../peerinfo, ../../peerinfo,
../../connection, ../../connection,
../../peer ../../peer,
../../errors
logScope: logScope:
topic = "GossipSub" topic = "GossipSub"
@ -220,7 +221,8 @@ method rpcHandler(g: GossipSub,
if msgs.len > 0: if msgs.len > 0:
trace "forwarding message to", peerId = id trace "forwarding message to", peerId = id
sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)])) sent.add(g.peers[p].send(@[RPCMsg(messages: msgs)]))
await allFutures(sent) sent = await allFinished(sent)
checkFutures(sent)
var respControl: ControlMessage var respControl: ControlMessage
if m.control.isSome: if m.control.isSome:
@ -408,7 +410,8 @@ method publish*(g: GossipSub,
trace "publishing on topic", name = topic trace "publishing on topic", name = topic
g.mcache.put(msg) g.mcache.put(msg)
sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])])) sent.add(g.peers[p].send(@[RPCMsg(messages: @[msg])]))
await allFutures(sent) sent = await allFinished(sent)
checkFutures(sent)
method start*(g: GossipSub) {.async.} = method start*(g: GossipSub) {.async.} =
## start pubsub ## start pubsub

View File

@ -238,9 +238,8 @@ method validate*(p: PubSub, message: Message): Future[bool] {.async, base.} =
# TODO: add timeout to validator # TODO: add timeout to validator
pending.add(p.validators[topic].mapIt(it(topic, message))) pending.add(p.validators[topic].mapIt(it(topic, message)))
await allFutures(pending) let futs = await allFinished(pending)
if pending.allIt(it.read()): # only if all passed result = futs.allIt(not it.failed and it.read())
result = true
proc newPubSub*(p: typedesc[PubSub], proc newPubSub*(p: typedesc[PubSub],
peerInfo: PeerInfo, peerInfo: PeerInfo,

View File

@ -19,6 +19,7 @@ import connection,
protocols/identify, protocols/identify,
protocols/pubsub/pubsub, protocols/pubsub/pubsub,
muxers/muxer, muxers/muxer,
errors,
peer peer
logScope: logScope:
@ -309,11 +310,19 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc stop*(s: Switch) {.async.} = proc stop*(s: Switch) {.async.} =
trace "stopping switch" trace "stopping switch"
if s.pubSub.isSome: # we want to report erros but we do not want to fail
await s.pubSub.get().stop() # or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up
var futs = newSeq[Future[void]]()
await allFutures(toSeq(s.connections.values).mapIt(s.cleanupConn(it))) if s.pubSub.isSome:
await allFutures(s.transports.mapIt(it.close())) futs &= s.pubSub.get().stop()
futs &= toSeq(s.connections.values).mapIt(s.cleanupConn(it))
futs &= s.transports.mapIt(it.close())
futs = await allFinished(futs)
checkFutures(futs)
proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer

View File

@ -61,7 +61,8 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
t.server.stop() t.server.stop()
t.server.close() t.server.close()
await t.server.join() await t.server.join()
trace "transport stopped"
trace "transport stopped"
method listen*(t: TcpTransport, method listen*(t: TcpTransport,
ma: MultiAddress, ma: MultiAddress,

View File

@ -11,7 +11,8 @@ import sequtils
import chronos, chronicles import chronos, chronicles
import ../connection, import ../connection,
../multiaddress, ../multiaddress,
../multicodec ../multicodec,
../errors
type type
ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.} ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.}
@ -33,7 +34,8 @@ proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} =
method close*(t: Transport) {.base, async, gcsafe.} = method close*(t: Transport) {.base, async, gcsafe.} =
## stop and cleanup the transport ## stop and cleanup the transport
## including all outstanding connections ## including all outstanding connections
await allFutures(t.connections.mapIt(it.close())) let futs = await allFinished(t.connections.mapIt(it.close()))
checkFutures(futs)
method listen*(t: Transport, method listen*(t: Transport,
ma: MultiAddress, ma: MultiAddress,