diff --git a/libp2p/chronosstream.nim b/libp2p/chronosstream.nim index d251a7377..8cf992106 100644 --- a/libp2p/chronosstream.nim +++ b/libp2p/chronosstream.nim @@ -25,36 +25,36 @@ proc newChronosStream*(server: StreamServer, result.writer = newAsyncStreamWriter(client) result.closed = false -method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} = +method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async, gcsafe.} = result = await s.reader.read(n) method readExactly*(s: ChronosStream, pbytes: pointer, - nbytes: int): Future[void] {.async.} = + nbytes: int): Future[void] {.async, gcsafe.} = await s.reader.readExactly(pbytes, nbytes) -method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} = +method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async, gcsafe.} = result = await s.reader.readLine(limit, sep) -method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = +method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async, gcsafe.} = result = await s.reader.readOnce(pbytes, nbytes) method readUntil*(s: ChronosStream, pbytes: pointer, nbytes: int, - sep: seq[byte]): Future[int] {.async.} = + sep: seq[byte]): Future[int] {.async, gcsafe.} = result = await s.reader.readUntil(pbytes, nbytes, sep) -method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} = +method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async, gcsafe.} = await s.writer.write(pbytes, nbytes) -method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} = +method write*(s: ChronosStream, msg: string, msglen = -1) {.async, gcsafe.} = await s.writer.write(msg, msglen) -method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} = +method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} = await s.writer.write(msg, msglen) -method close*(s: ChronosStream) {.async.} = +method close*(s: ChronosStream) {.async, gcsafe.} = await s.reader.closeWait() await s.writer.finish() diff --git a/libp2p/connection.nim b/libp2p/connection.nim index 7365f9fa3..cbe183f59 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -22,44 +22,44 @@ proc newConnection*(stream: LPStream): Connection = new result result.stream = stream -method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} = +method read*(s: Connection, n = -1): Future[seq[byte]] {.async, gcsafe.} = result = await s.stream.read(n) method readExactly*(s: Connection, pbytes: pointer, - nbytes: int): Future[void] {.async.} = + nbytes: int): Future[void] {.async, gcsafe.} = await s.stream.readExactly(pbytes, nbytes) method readLine*(s: Connection, limit = 0, - sep = "\r\n"): Future[string] {.async.} = + sep = "\r\n"): Future[string] {.async, gcsafe.} = result = await s.stream.readLine(limit, sep) method readOnce*(s: Connection, pbytes: pointer, - nbytes: int): Future[int] {.async.} = + nbytes: int): Future[int] {.async, gcsafe.} = result = await s.stream.readOnce(pbytes, nbytes) method readUntil*(s: Connection, pbytes: pointer, nbytes: int, - sep: seq[byte]): Future[int] {.async.} = + sep: seq[byte]): Future[int] {.async, gcsafe.} = result = await s.stream.readUntil(pbytes, nbytes, sep) -method write*(s: Connection, pbytes: pointer, nbytes: int) {.async.} = +method write*(s: Connection, pbytes: pointer, nbytes: int) {.async, gcsafe.} = await s.stream.write(pbytes, nbytes) -method write*(s: Connection, msg: string, msglen = -1) {.async.} = +method write*(s: Connection, msg: string, msglen = -1) {.async, gcsafe.} = await s.stream.write(msg, msglen) -method write*(s: Connection, msg: seq[byte], msglen = -1) {.async.} = +method write*(s: Connection, msg: seq[byte], msglen = -1) {.async, gcsafe.} = await s.stream.write(msg, msglen) -method close*(s: Connection) {.async.} = +method close*(s: Connection) {.async, gcsafe.} = await s.stream.close() s.closed = true -proc readLp*(s: Connection): Future[seq[byte]] {.async.} = +proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} = ## read lenght prefixed msg var size: uint @@ -81,18 +81,18 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async.} = result = buffer -proc writeLp*(s: Connection, msg: string | seq[byte]) {.async.} = +proc writeLp*(s: Connection, msg: string | seq[byte]) {.async, gcsafe.} = ## write lenght prefixed var buf = initVBuffer() buf.writeSeq(msg) buf.finish() result = s.write(buf.buffer) -method getPeerInfo*(c: Connection): Future[PeerInfo] {.base, async.} = +method getPeerInfo*(c: Connection): Future[PeerInfo] {.base, async, gcsafe.} = ## get up to date peer info ## TODO: implement PeerInfo refresh over identify discard -method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async.} = +method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} = ## get resolved multiaddresses for the connection discard diff --git a/libp2p/identify.nim b/libp2p/identify.nim index 2de518566..db8ef7416 100644 --- a/libp2p/identify.nim +++ b/libp2p/identify.nim @@ -17,6 +17,8 @@ const IdentifyPushCodec* = "/ipfs/id/push/1.0.0" const ProtoVersion* = "ipfs/0.1.0" const AgentVersion* = "nim-libp2p/0.0.1" +#TODO: implment push identify, leaving out for now as it is not essential + type # TODO: we're doing protobuf manualy, this is only temporary ProtoField[T] = object diff --git a/libp2p/stream.nim b/libp2p/stream.nim index e5f65443f..64e84a63c 100644 --- a/libp2p/stream.nim +++ b/libp2p/stream.nim @@ -13,39 +13,39 @@ type LPStream* = ref object of RootObj closed*: bool method read*(s: LPStream, n = -1): Future[seq[byte]] - {.base, async.} = + {.base, async, gcsafe.} = discard method readExactly*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] - {.base, async.} = + {.base, async, gcsafe.} = discard method readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] - {.base, async.} = + {.base, async, gcsafe.} = discard method readOnce*(s: LPStream, pbytes: pointer, nbytes: int): Future[int] - {.base, async.} = + {.base, async, gcsafe.} = discard method readUntil*(s: LPStream, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] - {.base, async.} = + {.base, async, gcsafe.} = discard method write*(s: LPStream, pbytes: pointer, nbytes: int) - {.base, async.} = + {.base, async, gcsafe.} = discard method write*(s: LPStream, msg: string, msglen = -1) - {.base, async.} = + {.base, async, gcsafe.} = discard method write*(s: LPStream, msg: seq[byte], msglen = -1) - {.base, async.} = + {.base, async, gcsafe.} = discard method close*(s: LPStream) - {.base, async.} = + {.base, async, gcsafe.} = discard diff --git a/libp2p/tcptransport.nim b/libp2p/tcptransport.nim index 46a6c0418..63d687fdb 100644 --- a/libp2p/tcptransport.nim +++ b/libp2p/tcptransport.nim @@ -18,7 +18,7 @@ type TcpTransport* = ref object of Transport proc connHandler*(t: Transport, server: StreamServer, client: StreamTransport): Future[Connection] - {.gcsafe, async.} = + {.async, gcsafe.} = let conn: Connection = newConnection(newChronosStream(server, client)) let handlerFut = if t.handler == nil: nil else: t.handler(conn) let connHolder: ConnHolder = ConnHolder(connection: conn, @@ -27,14 +27,14 @@ proc connHandler*(t: Transport, result = conn proc connCb(server: StreamServer, - client: StreamTransport) {.gcsafe, async.} = + client: StreamTransport) {.async, gcsafe.} = let t: Transport = cast[Transport](server.udata) discard t.connHandler(server, client) method init*(t: TcpTransport) = t.multicodec = multiCodec("tcp") -method close*(t: TcpTransport): Future[void] {.async.} = +method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = ## start the transport await procCall Transport(t).close() # call base @@ -43,7 +43,7 @@ method close*(t: TcpTransport): Future[void] {.async.} = method listen*(t: TcpTransport, ma: MultiAddress, - handler: ConnHandler): Future[void] {.async.} = + handler: ConnHandler): Future[void] {.async, gcsafe.} = await procCall Transport(t).listen(ma, handler) # call base ## listen on the transport @@ -56,9 +56,9 @@ method listen*(t: TcpTransport, listenFuture.complete() method dial*(t: TcpTransport, - address: MultiAddress): Future[Connection] {.async.} = + address: MultiAddress): Future[Connection] {.async, gcsafe.} = ## dial a peer let client: StreamTransport = await connect(address) result = await t.connHandler(t.server, client) -method handles*(t: Transport, address: MultiAddress): bool = true +method handles*(t: Transport, address: MultiAddress): bool {.gcsafe.} = true diff --git a/libp2p/transport.nim b/libp2p/transport.nim index 1b595e31c..abc523429 100644 --- a/libp2p/transport.nim +++ b/libp2p/transport.nim @@ -23,15 +23,15 @@ type handler*: ConnHandler multicodec*: MultiCodec -method init*(t: Transport) {.base.} = +method init*(t: Transport) {.base, gcsafe.} = ## perform protocol initialization discard -proc newTransport*(t: typedesc[Transport]): t = +proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} = new result result.init() -method close*(t: Transport) {.base, async.} = +method close*(t: Transport) {.base, async, gcsafe.} = ## stop and cleanup the transport ## including all outstanding connections for c in t.connections: @@ -39,17 +39,18 @@ method close*(t: Transport) {.base, async.} = method listen*(t: Transport, ma: MultiAddress, - handler: ConnHandler) {.base, async.} = + handler: ConnHandler) {.base, async, gcsafe.} = ## listen for incoming connections t.ma = ma t.handler = handler method dial*(t: Transport, - address: MultiAddress): Future[Connection] {.base, async.} = + address: MultiAddress): + Future[Connection] {.base, async, gcsafe.} = ## dial a peer discard -method handles*(t: Transport, address: MultiAddress): bool {.base.} = +method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} = ## check if transport supportes the multiaddress # TODO: this should implement generic logic that would use the multicodec # declared in the multicodec field and set by each individual transport diff --git a/tests/testidentify.nim b/tests/testidentify.nim index eb3345be0..78208e6f7 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -1,9 +1,11 @@ import unittest import chronos, strutils, sequtils import ../libp2p/identify, ../libp2p/multiaddress, - ../libp2p/peerinfo, ../libp2p/peer, ../libp2p/connection, - ../libp2p/identify, ../libp2p/multistream, ../libp2p/transport, - ../libp2p/tcptransport, ../libp2p/protocol, ../libp2p/crypto/crypto + ../libp2p/peerinfo, ../libp2p/peer, + ../libp2p/connection, ../libp2p/identify, + ../libp2p/multistream, ../libp2p/transport, + ../libp2p/tcptransport, ../libp2p/protocol, + ../libp2p/crypto/crypto suite "Identify": test "handle identify message6": diff --git a/tests/testtransport.nim b/tests/testtransport.nim index c54133c77..1084975ba 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -5,7 +5,7 @@ import ../libp2p/connection, ../libp2p/transport, ../libp2p/tcptransport, suite "TCP transport suite": test "test listener: handle write": - proc testListener(): Future[bool] {.async.} = + proc testListener(): Future[bool] {.async, gcsafe.} = let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335") proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} = result = conn.write(cstring("Hello!"), 6) @@ -41,7 +41,7 @@ suite "TCP transport suite": test "test dialer: handle write": proc testDialer(address: TransportAddress): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = + transp: StreamTransport) {.async, gcsafe.} = var wstream = newAsyncStreamWriter(transp) await wstream.write("Hello!") await wstream.finish() @@ -65,9 +65,9 @@ suite "TCP transport suite": check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true test "test dialer: handle write": - proc testDialer(address: TransportAddress): Future[bool] {.async.} = + proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = + transp: StreamTransport) {.async, gcsafe.} = var rstream = newAsyncStreamReader(transp) let msg = await rstream.read(6) check cast[string](msg) == "Hello!"