fix: switch and tests

This commit is contained in:
Dmitriy Ryajov 2019-09-05 09:19:39 -06:00
parent 6f8de062bb
commit 0f52a6e798
2 changed files with 22 additions and 13 deletions

View File

@ -72,7 +72,10 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
# install stream handler # install stream handler
muxer.streamHandler = proc (stream: Connection) {.async, gcsafe.} = muxer.streamHandler = proc (stream: Connection) {.async, gcsafe.} =
try: try:
asyncDiscard s.handleConn(stream) # TODO: figure out proper way of handling this.
# Perhaps it's ok to discard this Future and handle
# errors elsewere?
asyncCheck s.ms.handle(stream) # handle incoming connection
finally: finally:
await stream.close() await stream.close()
@ -81,17 +84,16 @@ proc mux(s: Switch, conn: Connection): Future[void] {.async, gcsafe.} =
let stream = await muxer.newStream() let stream = await muxer.newStream()
await s.identify(stream) await s.identify(stream)
# store it in muxed connections if we have a peer for it
# TODO: We should make sure that this are cleaned up properly
# on exit even if there is no peer for it. This shouldn't
# happen once secio is in place, but still something to keep
# in mind
if conn.peerInfo.isSome: if conn.peerInfo.isSome:
s.muxed[conn.peerInfo.get().peerId.pretty] = muxer s.muxed[conn.peerInfo.get().peerId.pretty] = muxer
proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} = proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe.} =
## perform upgrade flow ## perform upgrade flow
# TODO: figure out proper way of handling this.
# Perhaps it's ok to discard this Future and handle
# errors elsewere?
asyncDiscard s.ms.handle(conn) # handler incoming connection
if conn.peerInfo.isSome: if conn.peerInfo.isSome:
let id = conn.peerInfo.get().peerId.pretty let id = conn.peerInfo.get().peerId.pretty
if s.connections.contains(id): if s.connections.contains(id):
@ -103,7 +105,7 @@ proc handleConn(s: Switch, conn: Connection): Future[Connection] {.async, gcsafe
s.connections[id] = conn s.connections[id] = conn
await s.secure(conn) # secure the connection await s.secure(conn) # secure the connection
await s.mux(conn) # mux it if possible # await s.mux(conn) # mux it if possible
result = conn result = conn
proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} = proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
@ -115,7 +117,10 @@ proc cleanupConn(s: Switch, conn: Connection) {.async, gcsafe.} =
if s.connections.contains(id): if s.connections.contains(id):
await s.connections[id].close() await s.connections[id].close()
proc dial*(s: Switch, peer: PeerInfo, proto: string = ""): Future[Connection] {.async.} = proc dial*(s: Switch,
peer: PeerInfo,
proto: string = ""):
Future[Connection] {.async.} =
for t in s.transports: # for each transport for t in s.transports: # for each transport
for a in peer.addrs: # for each address for a in peer.addrs: # for each address
if t.handles(a): # check if it can dial it if t.handles(a): # check if it can dial it
@ -142,7 +147,10 @@ proc mount*[T: LPProtocol](s: Switch, proto: T) =
proc start*(s: Switch) {.async.} = proc start*(s: Switch) {.async.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try: try:
asyncDiscard s.handleConn(conn) # TODO: figure out proper way of handling this.
# Perhaps it's ok to discard this Future and handle
# errors elsewere?
asyncCheck s.ms.handle(conn) # handle incoming connection
except: except:
await s.cleanupConn(conn) await s.cleanupConn(conn)

View File

@ -43,19 +43,20 @@ suite "Switch":
proc testSwitch(): Future[bool] {.async, gcsafe.} = proc testSwitch(): Future[bool] {.async, gcsafe.} =
let ma1: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53370") let ma1: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53370")
let ma2: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53371") let ma2: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53381")
var peerInfo1, peerInfo2: PeerInfo var peerInfo1, peerInfo2: PeerInfo
var switch1, switch2: Switch var switch1, switch2: Switch
(switch1, peerInfo1) = createSwitch(ma1) (switch1, peerInfo1) = createSwitch(ma1)
let testProto = new TestProto let testProto = new TestProto
testProto.handler = proc(conn: Connection, proto: string) testProto.init()
{.async, gcsafe.} = discard
testProto.codec = TestCodec testProto.codec = TestCodec
switch1.mount(testProto) switch1.mount(testProto)
await switch1.start() await switch1.start()
(switch2, peerInfo2) = createSwitch(ma2) (switch2, peerInfo2) = createSwitch(ma2)
await switch2.start()
let conn = await switch2.dial(peerInfo1, TestCodec) let conn = await switch2.dial(peerInfo1, TestCodec)
await conn.writeLp("Hello!") await conn.writeLp("Hello!")
let msg = cast[string](await conn.readLp()) let msg = cast[string](await conn.readLp())