feat: tcp transport listen
This commit is contained in:
parent
47106a6a7d
commit
c8546583e6
|
@ -15,12 +15,18 @@ type
|
||||||
Connection* = ref object of RootObj
|
Connection* = ref object of RootObj
|
||||||
reader: AsyncStreamReader
|
reader: AsyncStreamReader
|
||||||
writter: AsyncStreamWriter
|
writter: AsyncStreamWriter
|
||||||
|
server: StreamServer
|
||||||
|
client: StreamTransport
|
||||||
|
|
||||||
proc newConnection*(reader: AsyncStreamReader, writter: AsyncStreamWriter): Connection =
|
proc newConnection*(server: StreamServer,
|
||||||
|
client: StreamTransport): Connection =
|
||||||
## create a new Connection for the specified async stream reader/writter
|
## create a new Connection for the specified async stream reader/writter
|
||||||
new result
|
new result
|
||||||
result.reader = reader
|
result.server = server
|
||||||
result.writter = writter
|
result.client = client
|
||||||
|
|
||||||
|
result.reader = newAsyncStreamReader(client)
|
||||||
|
result.writter = newAsyncStreamWriter(client)
|
||||||
|
|
||||||
method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async, gcsafe.} =
|
method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async, gcsafe.} =
|
||||||
## read DefaultReadSize (1024) bytes or `size` bytes if specified
|
## read DefaultReadSize (1024) bytes or `size` bytes if specified
|
||||||
|
@ -28,7 +34,7 @@ method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.b
|
||||||
|
|
||||||
method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} =
|
method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} =
|
||||||
## write bytes pointed to by `data` up to `size` size
|
## write bytes pointed to by `data` up to `size` size
|
||||||
result = c.writter.write(data, size)
|
discard c.writter.write(data, size)
|
||||||
|
|
||||||
method close* (c: Connection): Future[void] {.base, async.} =
|
method close* (c: Connection): Future[void] {.base, async.} =
|
||||||
## close connection
|
## close connection
|
||||||
|
|
|
@ -11,21 +11,19 @@ import chronos
|
||||||
import transport, wire, connection, multiaddress, connection, multicodec
|
import transport, wire, connection, multiaddress, connection, multicodec
|
||||||
|
|
||||||
type TcpTransport* = ref object of Transport
|
type TcpTransport* = ref object of Transport
|
||||||
fd*: AsyncFD
|
|
||||||
server*: StreamServer
|
server*: StreamServer
|
||||||
|
|
||||||
proc connHandler(server: StreamServer,
|
proc connHandler(server: StreamServer,
|
||||||
client: StreamTransport): Future[Connection] {.gcsafe, async.} =
|
client: StreamTransport): Future[Connection] {.gcsafe, async.} =
|
||||||
let t: TcpTransport = cast[TcpTransport](server.udata)
|
let t: TcpTransport = cast[TcpTransport](server.udata)
|
||||||
let conn: Connection = newConnection(newAsyncStreamReader(client),
|
let conn: Connection = newConnection(server, client)
|
||||||
newAsyncStreamWriter(client))
|
|
||||||
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
||||||
connFuture: t.handler(conn))
|
connFuture: t.handler(conn))
|
||||||
t.connections.add(connHolder)
|
t.connections.add(connHolder)
|
||||||
result = conn
|
result = conn
|
||||||
|
|
||||||
proc connCb(server: StreamServer,
|
proc connCb(server: StreamServer,
|
||||||
client: StreamTransport) {.gcsafe, async.} =
|
client: StreamTransport) {.gcsafe, async.} =
|
||||||
discard connHandler(server, client)
|
discard connHandler(server, client)
|
||||||
|
|
||||||
method init*(t: TcpTransport) =
|
method init*(t: TcpTransport) =
|
||||||
|
@ -39,21 +37,9 @@ method listen*(t: TcpTransport): Future[void] {.async.} =
|
||||||
let listenFuture: Future[void] = newFuture[void]()
|
let listenFuture: Future[void] = newFuture[void]()
|
||||||
result = listenFuture
|
result = listenFuture
|
||||||
|
|
||||||
proc initTransport(server: StreamServer, fd: AsyncFD): StreamTransport {.gcsafe.} =
|
|
||||||
t.server = server
|
|
||||||
t.fd = fd
|
|
||||||
listenFuture.complete()
|
|
||||||
|
|
||||||
## listen on the transport
|
## listen on the transport
|
||||||
let server = createStreamServer(t.ma,
|
let server = createStreamServer(t.ma, connCb, {}, t)
|
||||||
connCb,
|
t.server = server
|
||||||
{},
|
|
||||||
t,
|
|
||||||
asyncInvalidSocket,
|
|
||||||
100,
|
|
||||||
DefaultStreamBufferSize,
|
|
||||||
nil,
|
|
||||||
initTransport)
|
|
||||||
server.start()
|
server.start()
|
||||||
|
|
||||||
method dial*(t: TcpTransport,
|
method dial*(t: TcpTransport,
|
||||||
|
|
Binary file not shown.
|
@ -7,15 +7,13 @@ suite "TCP transport suite":
|
||||||
test "test listener":
|
test "test listener":
|
||||||
proc testListener(): Future[bool] {.async.} =
|
proc testListener(): Future[bool] {.async.} =
|
||||||
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335")
|
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335")
|
||||||
proc connHandler(conn: Connection): Future[void] {.gcsafe.} =
|
proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} =
|
||||||
let msg = "Hello"
|
result = conn.write(cstring("Hello!"), 6)
|
||||||
conn.write(msg.cstring, 6)
|
|
||||||
|
|
||||||
let transport: TcpTransport = newTransport(TcpTransport, ma, connHandler)
|
let transport: TcpTransport = newTransport(TcpTransport, ma, connHandler)
|
||||||
await transport.listen()
|
await transport.listen()
|
||||||
let streamTransport: StreamTransport = await connect(ma)
|
let streamTransport: StreamTransport = await connect(ma)
|
||||||
let msg = await streamTransport.read()
|
let msg = await streamTransport.read(6)
|
||||||
echo "HERE!!!!"
|
|
||||||
result = cast[string](msg) == "Hello!"
|
result = cast[string](msg) == "Hello!"
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
|
Loading…
Reference in New Issue