From 4d9444afe9a78585178a82b24f307a67c77797e1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 11 Sep 2019 18:15:04 -0600 Subject: [PATCH] proper server startup sequence --- libp2p/switch.nim | 9 ++++++--- libp2p/transports/tcptransport.nim | 23 +++++++++++++---------- libp2p/transports/transport.nim | 3 ++- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 38d3609..5b78498 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -184,18 +184,21 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) {.gcsafe.} = s.ms.addHandler(proto.codec, proto) -proc start*(s: Switch) {.async.} = +proc start*(s: Switch): Future[seq[Future[void]]] {.async.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: - if (await s.ms.select(conn)): + if (await s.ms.select(conn)): # just handshake await s.ms.handle(conn) # handle incoming connection except: await s.cleanupConn(conn) + var startFuts: seq[Future[void]] for t in s.transports: # for each transport for a in s.peerInfo.addrs: if t.handles(a): # check if it handles the multiaddr - await t.listen(a, handle) # listen for incoming connections + var server = await t.listen(a, handle) + startFuts.add(server) + result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = await allFutures(s.transports.mapIt(it.close())) diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index e4e088b..1b6b9c7 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -7,11 +7,14 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import chronos +import chronos, chronicles import transport, ../wire, ../connection, ../multiaddress, ../connection, ../multicodec, ../stream/chronosstream +logScope: + topic = "TcpTransport" + type TcpTransport* = ref object of Transport server*: StreamServer @@ -20,6 +23,7 @@ proc connHandler*(t: Transport, client: StreamTransport, initiator: bool = false): Future[Connection] {.async, gcsafe.} = + debug "handling connection for", address = $client.remoteAddress let conn: Connection = newConnection(newChronosStream(server, client)) if not initiator: let handlerFut = if t.handler == nil: nil else: t.handler(conn) @@ -30,6 +34,7 @@ proc connHandler*(t: Transport, proc connCb(server: StreamServer, client: StreamTransport) {.async, gcsafe.} = + debug "incomming connection for", address = $client.remoteAddress let t: Transport = cast[Transport](server.udata) discard t.connHandler(server, client) @@ -38,6 +43,7 @@ method init*(t: TcpTransport) = method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = ## start the transport + debug "stopping transport" await procCall Transport(t).close() # call base t.server.stop() @@ -46,21 +52,18 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = method listen*(t: TcpTransport, ma: MultiAddress, handler: ConnHandler): - Future[void] {.async, gcsafe.} = - await procCall Transport(t).listen(ma, handler) # call base + Future[Future[void]] {.async, gcsafe.} = + discard await procCall Transport(t).listen(ma, handler) # call base ## listen on the transport - let listenFuture: Future[void] = newFuture[void]() - result = listenFuture - - let server = createStreamServer(t.ma, connCb, {}, t) - t.server = server - server.start() - listenFuture.complete() + t.server = createStreamServer(t.ma, connCb, {}, t) + t.server.start() + result = t.server.join() method dial*(t: TcpTransport, address: MultiAddress): Future[Connection] {.async, gcsafe.} = + debug "dialing remote peer", address = $address ## dial a peer let client: StreamTransport = await connect(address) result = await t.connHandler(t.server, client, true) diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 9088fe9..55d67a3 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -40,7 +40,8 @@ method close*(t: Transport) {.base, async, gcsafe.} = method listen*(t: Transport, ma: MultiAddress, - handler: ConnHandler) {.base, async, gcsafe.} = + handler: ConnHandler): + Future[Future[void]] {.base, async, gcsafe.} = ## listen for incoming connections t.ma = ma t.handler = handler