feat: allow ussage of 0 addrs

This commit is contained in:
Dmitriy Ryajov 2019-09-25 11:36:39 -06:00
parent 9862064234
commit f3afe0a6ce
2 changed files with 33 additions and 15 deletions

View File

@ -21,6 +21,22 @@ logScope:
type TcpTransport* = ref object of Transport type TcpTransport* = ref object of Transport
server*: StreamServer server*: StreamServer
proc toMultiAddr*(address: TransportAddress): MultiAddress =
## Returns string representation of ``address``.
case address.family
of AddressFamily.IPv4:
var a = IpAddress(
family: IpAddressFamily.IPv4,
address_v4: address.address_v4
)
result = MultiAddress.init(a, Protocol.IPPROTO_TCP, address.port)
of AddressFamily.IPv6:
var a = IpAddress(family: IpAddressFamily.IPv6,
address_v6: address.address_v6)
result = MultiAddress.init(a, Protocol.IPPROTO_TCP, address.port)
else:
raise newException(TransportAddressError, "Invalid address for transport!")
proc connHandler*(t: Transport, proc connHandler*(t: Transport,
server: StreamServer, server: StreamServer,
client: StreamTransport, client: StreamTransport,
@ -28,6 +44,7 @@ proc connHandler*(t: Transport,
Future[Connection] {.async, gcsafe.} = Future[Connection] {.async, gcsafe.} =
trace "handling connection for", address = $client.remoteAddress trace "handling connection for", address = $client.remoteAddress
let conn: Connection = newConnection(newChronosStream(server, client)) let conn: Connection = newConnection(newChronosStream(server, client))
conn.observedAddrs = client.remoteAddress.toMultiAddr()
if not initiator: if not initiator:
let handlerFut = if t.handler == nil: nil else: t.handler(conn) let handlerFut = if t.handler == nil: nil else: t.handler(conn)
let connHolder: ConnHolder = ConnHolder(connection: conn, let connHolder: ConnHolder = ConnHolder(connection: conn,
@ -56,14 +73,15 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
method listen*(t: TcpTransport, method listen*(t: TcpTransport,
ma: MultiAddress, ma: MultiAddress,
handler: ConnHandler): handler: ConnHandler):
# TODO: need to check how this futures
# are being returned, it doesn't seem to be right
Future[Future[void]] {.async, gcsafe.} = Future[Future[void]] {.async, gcsafe.} =
discard await procCall Transport(t).listen(ma, handler) # call base discard await procCall Transport(t).listen(ma, handler) # call base
## listen on the transport ## listen on the transport
t.server = createStreamServer(t.ma, connCb, {}, t) t.server = createStreamServer(t.ma, connCb, {}, t)
t.server.start() t.server.start()
# always get the resolved address in case we're bound to 0.0.0.0:0
t.ma = t.server.sock.getLocalAddress().toMultiAddr()
result = t.server.join() result = t.server.join()
method dial*(t: TcpTransport, method dial*(t: TcpTransport,

View File

@ -9,13 +9,13 @@ import ../libp2p/connection,
suite "TCP transport": suite "TCP transport":
test "test listener: handle write": test "test listener: handle write":
proc testListener(): Future[bool] {.async, gcsafe.} = proc testListener(): Future[bool] {.async, gcsafe.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
result = conn.write(cstring("Hello!"), 6) result = conn.write(cstring("Hello!"), 6)
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
asyncCheck await transport.listen(ma, connHandler) asyncCheck await transport.listen(ma, connHandler)
let streamTransport: StreamTransport = await connect(ma) let streamTransport: StreamTransport = await connect(transport.ma)
let msg = await streamTransport.read(6) let msg = await streamTransport.read(6)
await transport.close() await transport.close()
await streamTransport.closeWait() await streamTransport.closeWait()
@ -27,14 +27,14 @@ suite "TCP transport":
test "test listener: handle read": test "test listener: handle read":
proc testListener(): Future[bool] {.async.} = proc testListener(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53336") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
let msg = await conn.read(6) let msg = await conn.read(6)
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
asyncCheck await transport.listen(ma, connHandler) asyncCheck await transport.listen(ma, connHandler)
let streamTransport: StreamTransport = await connect(ma) let streamTransport: StreamTransport = await connect(transport.ma)
let sent = await streamTransport.write("Hello!", 6) let sent = await streamTransport.write("Hello!", 6)
result = sent == 6 result = sent == 6
@ -56,7 +56,7 @@ suite "TCP transport":
var server = createStreamServer(address, serveClient, {ReuseAddr}) var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start() server.start()
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337") let ma: MultiAddress = server.sock.getLocalAddress().toMultiAddr()
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
let conn = await transport.dial(ma) let conn = await transport.dial(ma)
let msg = await conn.read(6) let msg = await conn.read(6)
@ -65,7 +65,7 @@ suite "TCP transport":
server.stop() server.stop()
server.close() server.close()
await server.join() await server.join()
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
test "test dialer: handle write": test "test dialer: handle write":
proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} = proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} =
@ -83,7 +83,7 @@ suite "TCP transport":
var server = createStreamServer(address, serveClient, {ReuseAddr}) var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start() server.start()
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337") let ma: MultiAddress = server.sock.getLocalAddress().toMultiAddr()
let transport: TcpTransport = newTransport(TcpTransport) let transport: TcpTransport = newTransport(TcpTransport)
let conn = await transport.dial(ma) let conn = await transport.dial(ma)
await conn.write(cstring("Hello!"), 6) await conn.write(cstring("Hello!"), 6)
@ -92,11 +92,11 @@ suite "TCP transport":
server.stop() server.stop()
server.close() server.close()
await server.join() await server.join()
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true check waitFor(testDialer(initTAddress("0.0.0.0:0"))) == true
test "e2e: handle write": test "e2e: handle write":
proc testListenerDialer(): Future[bool] {.async.} = proc testListenerDialer(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53339") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
result = conn.write(cstring("Hello!"), 6) result = conn.write(cstring("Hello!"), 6)
@ -104,7 +104,7 @@ suite "TCP transport":
asyncCheck await transport1.listen(ma, connHandler) asyncCheck await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma) let conn = await transport2.dial(transport1.ma)
let msg = await conn.read(6) let msg = await conn.read(6)
await transport1.close() await transport1.close()
@ -115,7 +115,7 @@ suite "TCP transport":
test "e2e: handle read": test "e2e: handle read":
proc testListenerDialer(): Future[bool] {.async.} = proc testListenerDialer(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53340") let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
let msg = await conn.read(6) let msg = await conn.read(6)
check cast[string](msg) == "Hello!" check cast[string](msg) == "Hello!"
@ -124,7 +124,7 @@ suite "TCP transport":
asyncCheck await transport1.listen(ma, connHandler) asyncCheck await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport) let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma) let conn = await transport2.dial(transport1.ma)
await conn.write(cstring("Hello!"), 6) await conn.write(cstring("Hello!"), 6)
await transport1.close() await transport1.close()
result = true result = true