feat: adding dial tests
This commit is contained in:
parent
c8546583e6
commit
c15a9bdd67
|
@ -14,19 +14,21 @@ const DefaultReadSize = 1024
|
||||||
type
|
type
|
||||||
Connection* = ref object of RootObj
|
Connection* = ref object of RootObj
|
||||||
reader: AsyncStreamReader
|
reader: AsyncStreamReader
|
||||||
writter: AsyncStreamWriter
|
writer: AsyncStreamWriter
|
||||||
server: StreamServer
|
server: StreamServer
|
||||||
client: StreamTransport
|
client: StreamTransport
|
||||||
|
isOpen*: bool
|
||||||
|
|
||||||
proc newConnection*(server: StreamServer,
|
proc newConnection*(server: StreamServer,
|
||||||
client: StreamTransport): Connection =
|
client: StreamTransport): Connection =
|
||||||
## create a new Connection for the specified async stream reader/writter
|
## create a new Connection for the specified async stream reader/writer
|
||||||
new result
|
new result
|
||||||
|
result.isOpen = false
|
||||||
result.server = server
|
result.server = server
|
||||||
result.client = client
|
result.client = client
|
||||||
|
|
||||||
result.reader = newAsyncStreamReader(client)
|
result.reader = newAsyncStreamReader(client)
|
||||||
result.writter = newAsyncStreamWriter(client)
|
result.writer = newAsyncStreamWriter(client)
|
||||||
|
|
||||||
method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async, gcsafe.} =
|
method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async, gcsafe.} =
|
||||||
## read DefaultReadSize (1024) bytes or `size` bytes if specified
|
## read DefaultReadSize (1024) bytes or `size` bytes if specified
|
||||||
|
@ -34,12 +36,18 @@ method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.b
|
||||||
|
|
||||||
method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} =
|
method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} =
|
||||||
## write bytes pointed to by `data` up to `size` size
|
## write bytes pointed to by `data` up to `size` size
|
||||||
discard c.writter.write(data, size)
|
discard c.writer.write(data, size)
|
||||||
|
|
||||||
method close* (c: Connection): Future[void] {.base, async.} =
|
method close* (c: Connection): Future[void] {.base, async.} =
|
||||||
## close connection
|
## close connection
|
||||||
## TODO: figure out how to correctly close the streams and underlying resource
|
await c.reader.closeWait()
|
||||||
discard
|
|
||||||
|
await c.writer.finish()
|
||||||
|
await c.writer.closeWait()
|
||||||
|
|
||||||
|
await c.client.closeWait()
|
||||||
|
c.server.stop()
|
||||||
|
c.server.close()
|
||||||
|
|
||||||
method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} =
|
method getPeerInfo* (c: Connection): Future[PeerInfo] {.base, async.} =
|
||||||
## get up to date peer info
|
## get up to date peer info
|
||||||
|
|
|
@ -13,25 +13,20 @@ import transport, wire, connection, multiaddress, connection, multicodec
|
||||||
type TcpTransport* = ref object of Transport
|
type TcpTransport* = ref object of Transport
|
||||||
server*: StreamServer
|
server*: StreamServer
|
||||||
|
|
||||||
proc connHandler(server: StreamServer,
|
|
||||||
client: StreamTransport): Future[Connection] {.gcsafe, async.} =
|
|
||||||
let t: TcpTransport = cast[TcpTransport](server.udata)
|
|
||||||
let conn: Connection = newConnection(server, client)
|
|
||||||
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
|
||||||
connFuture: t.handler(conn))
|
|
||||||
t.connections.add(connHolder)
|
|
||||||
result = conn
|
|
||||||
|
|
||||||
proc connCb(server: StreamServer,
|
proc connCb(server: StreamServer,
|
||||||
client: StreamTransport) {.gcsafe, async.} =
|
client: StreamTransport) {.gcsafe, async.} =
|
||||||
discard connHandler(server, client)
|
let t: Transport = cast[Transport](server.udata)
|
||||||
|
discard t.connHandler(server, client)
|
||||||
|
|
||||||
method init*(t: TcpTransport) =
|
method init*(t: TcpTransport) =
|
||||||
t.multicodec = multiCodec("tcp")
|
t.multicodec = multiCodec("tcp")
|
||||||
|
|
||||||
method close*(t: TcpTransport): Future[void] {.async.} =
|
method close*(t: TcpTransport): Future[void] {.async.} =
|
||||||
## start the transport
|
## start the transport
|
||||||
result = t.server.closeWait()
|
await procCall Transport(t).close() # call base close
|
||||||
|
|
||||||
|
t.server.stop()
|
||||||
|
await t.server.closeWait()
|
||||||
|
|
||||||
method listen*(t: TcpTransport): Future[void] {.async.} =
|
method listen*(t: TcpTransport): Future[void] {.async.} =
|
||||||
let listenFuture: Future[void] = newFuture[void]()
|
let listenFuture: Future[void] = newFuture[void]()
|
||||||
|
@ -42,8 +37,7 @@ method listen*(t: TcpTransport): Future[void] {.async.} =
|
||||||
t.server = server
|
t.server = server
|
||||||
server.start()
|
server.start()
|
||||||
|
|
||||||
method dial*(t: TcpTransport,
|
method dial*(t: TcpTransport, address: MultiAddress): Future[Connection] {.async.} =
|
||||||
address: MultiAddress): Future[Connection] {.async.} =
|
|
||||||
## dial a peer
|
## dial a peer
|
||||||
let client: StreamTransport = await connect(address)
|
let client: StreamTransport = await connect(address)
|
||||||
result = await connHandler(t.server, client)
|
result = await t.connHandler(t.server, client)
|
||||||
|
|
|
@ -23,24 +23,37 @@ type
|
||||||
handler*: ConnHandler
|
handler*: ConnHandler
|
||||||
multicodec*: MultiCodec
|
multicodec*: MultiCodec
|
||||||
|
|
||||||
|
method connHandler*(t: Transport,
|
||||||
|
server: StreamServer,
|
||||||
|
client: StreamTransport): Future[Connection] {.base, gcsafe, async.} =
|
||||||
|
let conn: Connection = newConnection(server, client)
|
||||||
|
let handlerFut = if t.handler == nil: nil else: t.handler(conn)
|
||||||
|
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
||||||
|
connFuture: handlerFut)
|
||||||
|
t.connections.add(connHolder)
|
||||||
|
result = conn
|
||||||
|
|
||||||
method init*(t: Transport) {.base.} =
|
method init*(t: Transport) {.base.} =
|
||||||
## perform protocol initialization
|
## perform protocol initialization
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc newTransport*(t: typedesc[Transport],
|
proc newTransport*(t: typedesc[Transport],
|
||||||
ma: MultiAddress,
|
ma: MultiAddress,
|
||||||
handler: ConnHandler): t =
|
handler: ConnHandler = nil): t =
|
||||||
new result
|
new result
|
||||||
result.ma = ma
|
result.ma = ma
|
||||||
result.handler = handler
|
result.handler = handler
|
||||||
result.init()
|
result.init()
|
||||||
|
|
||||||
method close*(t: Transport) {.base, async.} =
|
method close*(t: Transport) {.base, async.} =
|
||||||
## start the transport
|
## stop and cleanup the transport
|
||||||
discard
|
## including all outstanding connections
|
||||||
|
for c in t.connections:
|
||||||
|
if c.connection.isOpen:
|
||||||
|
await c.connection.close()
|
||||||
|
|
||||||
method listen*(t: Transport) {.base, async.} =
|
method listen*(t: Transport) {.base, async.} =
|
||||||
## stop the transport
|
## listen for incoming connections
|
||||||
discard
|
discard
|
||||||
|
|
||||||
method dial*(t: Transport, address: MultiAddress): Future[Connection] {.base, async.} =
|
method dial*(t: Transport, address: MultiAddress): Future[Connection] {.base, async.} =
|
||||||
|
|
Binary file not shown.
|
@ -4,17 +4,90 @@ import ../libp2p/connection, ../libp2p/transport, ../libp2p/tcptransport,
|
||||||
../libp2p/multiaddress, ../libp2p/wire
|
../libp2p/multiaddress, ../libp2p/wire
|
||||||
|
|
||||||
suite "TCP transport suite":
|
suite "TCP transport suite":
|
||||||
test "test listener":
|
test "test listener: handle write":
|
||||||
proc testListener(): Future[bool] {.async.} =
|
proc testListener(): Future[bool] {.async.} =
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335")
|
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335")
|
||||||
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} =
|
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} =
|
||||||
result = conn.write(cstring("Hello!"), 6)
|
result = conn.write(cstring("Hello!"), 6)
|
||||||
|
await conn.close()
|
||||||
|
|
||||||
let transport: TcpTransport = newTransport(TcpTransport, ma, connHandler)
|
let transport: TcpTransport = newTransport(TcpTransport, ma, connHandler)
|
||||||
await transport.listen()
|
await transport.listen()
|
||||||
let streamTransport: StreamTransport = await connect(ma)
|
let streamTransport: StreamTransport = await connect(ma)
|
||||||
let msg = await streamTransport.read(6)
|
let msg = await streamTransport.read(6)
|
||||||
|
await transport.close()
|
||||||
|
await streamTransport.closeWait()
|
||||||
|
|
||||||
result = cast[string](msg) == "Hello!"
|
result = cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
check:
|
check:
|
||||||
waitFor(testListener()) == true
|
waitFor(testListener()) == true
|
||||||
|
|
||||||
|
test "test listener: handle read":
|
||||||
|
proc testListener(): Future[bool] {.async.} =
|
||||||
|
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53336")
|
||||||
|
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} =
|
||||||
|
let msg = await conn.read(6)
|
||||||
|
check cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
|
let transport: TcpTransport = newTransport(TcpTransport, ma, connHandler)
|
||||||
|
await transport.listen()
|
||||||
|
let streamTransport: StreamTransport = await connect(ma)
|
||||||
|
let sent = await streamTransport.write("Hello!", 6)
|
||||||
|
result = sent == 6
|
||||||
|
|
||||||
|
check:
|
||||||
|
waitFor(testListener()) == true
|
||||||
|
|
||||||
|
test "test dialer: handle write":
|
||||||
|
proc testDialer(address: TransportAddress): Future[bool] {.async.} =
|
||||||
|
proc serveClient(server: StreamServer,
|
||||||
|
transp: StreamTransport) {.async.} =
|
||||||
|
var wstream = newAsyncStreamWriter(transp)
|
||||||
|
await wstream.write("Hello!")
|
||||||
|
await wstream.finish()
|
||||||
|
await wstream.closeWait()
|
||||||
|
await transp.closeWait()
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337")
|
||||||
|
let transport: TcpTransport = newTransport(TcpTransport, ma)
|
||||||
|
let conn = await transport.dial(ma)
|
||||||
|
let msg = await conn.read(6)
|
||||||
|
result = cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
await server.join()
|
||||||
|
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true
|
||||||
|
|
||||||
|
test "test dialer: handle write":
|
||||||
|
proc testDialer(address: TransportAddress): Future[bool] {.async.} =
|
||||||
|
proc serveClient(server: StreamServer,
|
||||||
|
transp: StreamTransport) {.async.} =
|
||||||
|
var rstream = newAsyncStreamReader(transp)
|
||||||
|
let msg = await rstream.read(6)
|
||||||
|
check cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
|
await rstream.closeWait()
|
||||||
|
await transp.closeWait()
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
|
||||||
|
var server = createStreamServer(address, serveClient, {ReuseAddr})
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53337")
|
||||||
|
let transport: TcpTransport = newTransport(TcpTransport, ma)
|
||||||
|
let conn = await transport.dial(ma)
|
||||||
|
await conn.write(cstring("Hello!"), 6)
|
||||||
|
result = true
|
||||||
|
|
||||||
|
server.stop()
|
||||||
|
server.close()
|
||||||
|
await server.join()
|
||||||
|
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true
|
||||||
|
|
Loading…
Reference in New Issue