From 0f52a6e798ef82df461221a7813294ada1c2019b Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 5 Sep 2019 09:19:39 -0600 Subject: [PATCH] fix: switch and tests --- libp2p/switch.nim | 28 ++++++++++++++++++---------- tests/testswitch.nim | 7 ++++--- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 3f9fa36b8..fd6858c20 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -72,7 +72,10 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = # install stream handler muxer.streamHandler = proc (stream: Connection) {.async, gcsafe.} = try: - asyncDiscard s.handleConn(stream) + # TODO: figure out proper way of handling this. + # Perhaps it's ok to discard this Future and handle + # errors elsewere? + asyncCheck s.ms.handle(stream) # handle incoming connection finally: await stream.close() @@ -81,17 +84,16 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} = let stream = await muxer.newStream() await s.identify(stream) + # store it in muxed connections if we have a peer for it + # TODO: We should make sure that this are cleaned up properly + # on exit even if there is no peer for it. This shouldn't + # happen once secio is in place, but still something to keep + # in mind if conn.peerInfo.isSome: s.muxed[conn.peerInfo.get().peerId.pretty] = muxer proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = ## perform upgrade flow - - # TODO: figure out proper way of handling this. - # Perhaps it's ok to discard this Future and handle - # errors elsewere? - asyncDiscard s.ms.handle(conn) # handler incoming connection - if conn.peerInfo.isSome: let id = conn.peerInfo.get().peerId.pretty if s.connections.contains(id): @@ -103,7 +105,7 @@ proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe s.connections[id] = conn await s.secure(conn) # secure the connection - await s.mux(conn) # mux it if possible + # await s.mux(conn) # mux it if possible result = conn proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = @@ -115,7 +117,10 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = if s.connections.contains(id): await s.connections[id].close() -proc dial*(s: Switch, peer: PeerInfo, proto: string = ""): Future[Connection] {.async.} = +proc dial*(s: Switch, + peer: PeerInfo, + proto: string = ""): + Future[Connection] {.async.} = for t in s.transports: # for each transport for a in peer.addrs: # for each address if t.handles(a): # check if it can dial it @@ -142,7 +147,10 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) = proc start*(s: Switch) {.async.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: - asyncDiscard s.handleConn(conn) + # TODO: figure out proper way of handling this. + # Perhaps it's ok to discard this Future and handle + # errors elsewere? + asyncCheck s.ms.handle(conn) # handle incoming connection except: await s.cleanupConn(conn) diff --git a/tests/testswitch.nim b/tests/testswitch.nim index 9554646ce..cee417221 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -43,19 +43,20 @@ suite "Switch": proc testSwitch(): Future[bool] {.async, gcsafe.} = let ma1: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53370") - let ma2: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53371") + let ma2: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53381") var peerInfo1, peerInfo2: PeerInfo var switch1, switch2: Switch (switch1, peerInfo1) = createSwitch(ma1) + let testProto = new TestProto - testProto.handler = proc(conn: Connection, proto: string) - {.async, gcsafe.} = discard + testProto.init() testProto.codec = TestCodec switch1.mount(testProto) await switch1.start() (switch2, peerInfo2) = createSwitch(ma2) + await switch2.start() let conn = await switch2.dial(peerInfo1, TestCodec) await conn.writeLp("Hello!") let msg = cast[string](await conn.readLp())