diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 69ce8f559..a979f00d7 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -15,12 +15,18 @@ type Connection* = ref object of RootObj reader: AsyncStreamReader writter: AsyncStreamWriter + server: StreamServer + client: StreamTransport -proc newConnection*(reader: AsyncStreamReader, writter: AsyncStreamWriter): Connection = +proc newConnection*(server: StreamServer, + client: StreamTransport): Connection = ## create a new Connection for the specified async stream reader/writter new result - result.reader = reader - result.writter = writter + result.server = server + result.client = client + + result.reader = newAsyncStreamReader(client) + result.writter = newAsyncStreamWriter(client) method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.base, async, gcsafe.} = ## read DefaultReadSize (1024) bytes or `size` bytes if specified @@ -28,7 +34,7 @@ method read* (c: Connection, size: int = DefaultReadSize): Future[seq[byte]] {.b method write* (c: Connection, data: pointer, size: int): Future[void] {.base, async.} = ## write bytes pointed to by `data` up to `size` size - result = c.writter.write(data, size) + discard c.writter.write(data, size) method close* (c: Connection): Future[void] {.base, async.} = ## close connection diff --git a/libp2p/tcptransport.nim b/libp2p/tcptransport.nim index 93fbbf65a..a514a824d 100644 --- a/libp2p/tcptransport.nim +++ b/libp2p/tcptransport.nim @@ -11,21 +11,19 @@ import chronos import transport, wire, connection, multiaddress, connection, multicodec type TcpTransport* = ref object of Transport - fd*: AsyncFD server*: StreamServer proc connHandler(server: StreamServer, client: StreamTransport): Future[Connection] {.gcsafe, async.} = let t: TcpTransport = cast[TcpTransport](server.udata) - let conn: Connection = newConnection(newAsyncStreamReader(client), - newAsyncStreamWriter(client)) + let conn: Connection = newConnection(server, client) let connHolder: ConnHolder = ConnHolder(connection: conn, connFuture: t.handler(conn)) t.connections.add(connHolder) result = conn proc connCb(server: StreamServer, - client: StreamTransport) {.gcsafe, async.} = + client: StreamTransport) {.gcsafe, async.} = discard connHandler(server, client) method init*(t: TcpTransport) = @@ -39,21 +37,9 @@ method listen*(t: TcpTransport): Future[void] {.async.} = let listenFuture: Future[void] = newFuture[void]() result = listenFuture - proc initTransport(server: StreamServer, fd: AsyncFD): StreamTransport {.gcsafe.} = - t.server = server - t.fd = fd - listenFuture.complete() - ## listen on the transport - let server = createStreamServer(t.ma, - connCb, - {}, - t, - asyncInvalidSocket, - 100, - DefaultStreamBufferSize, - nil, - initTransport) + let server = createStreamServer(t.ma, connCb, {}, t) + t.server = server server.start() method dial*(t: TcpTransport, diff --git a/tests/testtransport b/tests/testtransport index f5ef3782c..6823eb8eb 100755 Binary files a/tests/testtransport and b/tests/testtransport differ diff --git a/tests/testtransport.nim b/tests/testtransport.nim index 0b4ad0bed..07f2a3024 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -7,15 +7,13 @@ suite "TCP transport suite": test "test listener": proc testListener(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335") - proc connHandler(conn: Connection): Future[void] {.gcsafe.} = - let msg = "Hello" - conn.write(msg.cstring, 6) + proc connHandler(conn: Connection): Future[void] {.async ,gcsafe.} = + result = conn.write(cstring("Hello!"), 6) let transport: TcpTransport = newTransport(TcpTransport, ma, connHandler) await transport.listen() let streamTransport: StreamTransport = await connect(ma) - let msg = await streamTransport.read() - echo "HERE!!!!" + let msg = await streamTransport.read(6) result = cast[string](msg) == "Hello!" check: