mirror of https://github.com/vacp2p/nim-libp2p.git
add gcsafe pragma
This commit is contained in:
parent
11060ae809
commit
e2b04fc30d
|
@ -25,36 +25,36 @@ proc newChronosStream*(server: StreamServer,
|
|||
result.writer = newAsyncStreamWriter(client)
|
||||
result.closed = false
|
||||
|
||||
method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async.} =
|
||||
method read*(s: ChronosStream, n = -1): Future[seq[byte]] {.async, gcsafe.} =
|
||||
result = await s.reader.read(n)
|
||||
|
||||
method readExactly*(s: ChronosStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[void] {.async.} =
|
||||
nbytes: int): Future[void] {.async, gcsafe.} =
|
||||
await s.reader.readExactly(pbytes, nbytes)
|
||||
|
||||
method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async.} =
|
||||
method readLine*(s: ChronosStream, limit = 0, sep = "\r\n"): Future[string] {.async, gcsafe.} =
|
||||
result = await s.reader.readLine(limit, sep)
|
||||
|
||||
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async.} =
|
||||
method readOnce*(s: ChronosStream, pbytes: pointer, nbytes: int): Future[int] {.async, gcsafe.} =
|
||||
result = await s.reader.readOnce(pbytes, nbytes)
|
||||
|
||||
method readUntil*(s: ChronosStream,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]): Future[int] {.async.} =
|
||||
sep: seq[byte]): Future[int] {.async, gcsafe.} =
|
||||
result = await s.reader.readUntil(pbytes, nbytes, sep)
|
||||
|
||||
method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async.} =
|
||||
method write*(s: ChronosStream, pbytes: pointer, nbytes: int) {.async, gcsafe.} =
|
||||
await s.writer.write(pbytes, nbytes)
|
||||
|
||||
method write*(s: ChronosStream, msg: string, msglen = -1) {.async.} =
|
||||
method write*(s: ChronosStream, msg: string, msglen = -1) {.async, gcsafe.} =
|
||||
await s.writer.write(msg, msglen)
|
||||
|
||||
method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async.} =
|
||||
method write*(s: ChronosStream, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
|
||||
await s.writer.write(msg, msglen)
|
||||
|
||||
method close*(s: ChronosStream) {.async.} =
|
||||
method close*(s: ChronosStream) {.async, gcsafe.} =
|
||||
await s.reader.closeWait()
|
||||
|
||||
await s.writer.finish()
|
||||
|
|
|
@ -22,44 +22,44 @@ proc newConnection*(stream: LPStream): Connection =
|
|||
new result
|
||||
result.stream = stream
|
||||
|
||||
method read*(s: Connection, n = -1): Future[seq[byte]] {.async.} =
|
||||
method read*(s: Connection, n = -1): Future[seq[byte]] {.async, gcsafe.} =
|
||||
result = await s.stream.read(n)
|
||||
|
||||
method readExactly*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[void] {.async.} =
|
||||
nbytes: int): Future[void] {.async, gcsafe.} =
|
||||
await s.stream.readExactly(pbytes, nbytes)
|
||||
|
||||
method readLine*(s: Connection,
|
||||
limit = 0,
|
||||
sep = "\r\n"): Future[string] {.async.} =
|
||||
sep = "\r\n"): Future[string] {.async, gcsafe.} =
|
||||
result = await s.stream.readLine(limit, sep)
|
||||
|
||||
method readOnce*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int): Future[int] {.async.} =
|
||||
nbytes: int): Future[int] {.async, gcsafe.} =
|
||||
result = await s.stream.readOnce(pbytes, nbytes)
|
||||
|
||||
method readUntil*(s: Connection,
|
||||
pbytes: pointer,
|
||||
nbytes: int,
|
||||
sep: seq[byte]): Future[int] {.async.} =
|
||||
sep: seq[byte]): Future[int] {.async, gcsafe.} =
|
||||
result = await s.stream.readUntil(pbytes, nbytes, sep)
|
||||
|
||||
method write*(s: Connection, pbytes: pointer, nbytes: int) {.async.} =
|
||||
method write*(s: Connection, pbytes: pointer, nbytes: int) {.async, gcsafe.} =
|
||||
await s.stream.write(pbytes, nbytes)
|
||||
|
||||
method write*(s: Connection, msg: string, msglen = -1) {.async.} =
|
||||
method write*(s: Connection, msg: string, msglen = -1) {.async, gcsafe.} =
|
||||
await s.stream.write(msg, msglen)
|
||||
|
||||
method write*(s: Connection, msg: seq[byte], msglen = -1) {.async.} =
|
||||
method write*(s: Connection, msg: seq[byte], msglen = -1) {.async, gcsafe.} =
|
||||
await s.stream.write(msg, msglen)
|
||||
|
||||
method close*(s: Connection) {.async.} =
|
||||
method close*(s: Connection) {.async, gcsafe.} =
|
||||
await s.stream.close()
|
||||
s.closed = true
|
||||
|
||||
proc readLp*(s: Connection): Future[seq[byte]] {.async.} =
|
||||
proc readLp*(s: Connection): Future[seq[byte]] {.async, gcsafe.} =
|
||||
## read lenght prefixed msg
|
||||
var
|
||||
size: uint
|
||||
|
@ -81,18 +81,18 @@ proc readLp*(s: Connection): Future[seq[byte]] {.async.} =
|
|||
|
||||
result = buffer
|
||||
|
||||
proc writeLp*(s: Connection, msg: string | seq[byte]) {.async.} =
|
||||
proc writeLp*(s: Connection, msg: string | seq[byte]) {.async, gcsafe.} =
|
||||
## write lenght prefixed
|
||||
var buf = initVBuffer()
|
||||
buf.writeSeq(msg)
|
||||
buf.finish()
|
||||
result = s.write(buf.buffer)
|
||||
|
||||
method getPeerInfo*(c: Connection): Future[PeerInfo] {.base, async.} =
|
||||
method getPeerInfo*(c: Connection): Future[PeerInfo] {.base, async, gcsafe.} =
|
||||
## get up to date peer info
|
||||
## TODO: implement PeerInfo refresh over identify
|
||||
discard
|
||||
|
||||
method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async.} =
|
||||
method getObservedAddrs*(c: Connection): Future[MultiAddress] {.base, async, gcsafe.} =
|
||||
## get resolved multiaddresses for the connection
|
||||
discard
|
||||
|
|
|
@ -17,6 +17,8 @@ const IdentifyPushCodec* = "/ipfs/id/push/1.0.0"
|
|||
const ProtoVersion* = "ipfs/0.1.0"
|
||||
const AgentVersion* = "nim-libp2p/0.0.1"
|
||||
|
||||
#TODO: implment push identify, leaving out for now as it is not essential
|
||||
|
||||
type
|
||||
# TODO: we're doing protobuf manualy, this is only temporary
|
||||
ProtoField[T] = object
|
||||
|
|
|
@ -13,39 +13,39 @@ type LPStream* = ref object of RootObj
|
|||
closed*: bool
|
||||
|
||||
method read*(s: LPStream, n = -1): Future[seq[byte]]
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method readExactly*(s: LPStream, pbytes: pointer, nbytes: int): Future[void]
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string]
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method readOnce*(s: LPStream, pbytes: pointer, nbytes: int): Future[int]
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method readUntil*(s: LPStream,
|
||||
pbytes: pointer, nbytes: int,
|
||||
sep: seq[byte]): Future[int]
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method write*(s: LPStream, pbytes: pointer, nbytes: int)
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method write*(s: LPStream, msg: string, msglen = -1)
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method write*(s: LPStream, msg: seq[byte], msglen = -1)
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
||||
method close*(s: LPStream)
|
||||
{.base, async.} =
|
||||
{.base, async, gcsafe.} =
|
||||
discard
|
||||
|
|
|
@ -18,7 +18,7 @@ type TcpTransport* = ref object of Transport
|
|||
proc connHandler*(t: Transport,
|
||||
server: StreamServer,
|
||||
client: StreamTransport): Future[Connection]
|
||||
{.gcsafe, async.} =
|
||||
{.async, gcsafe.} =
|
||||
let conn: Connection = newConnection(newChronosStream(server, client))
|
||||
let handlerFut = if t.handler == nil: nil else: t.handler(conn)
|
||||
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
||||
|
@ -27,14 +27,14 @@ proc connHandler*(t: Transport,
|
|||
result = conn
|
||||
|
||||
proc connCb(server: StreamServer,
|
||||
client: StreamTransport) {.gcsafe, async.} =
|
||||
client: StreamTransport) {.async, gcsafe.} =
|
||||
let t: Transport = cast[Transport](server.udata)
|
||||
discard t.connHandler(server, client)
|
||||
|
||||
method init*(t: TcpTransport) =
|
||||
t.multicodec = multiCodec("tcp")
|
||||
|
||||
method close*(t: TcpTransport): Future[void] {.async.} =
|
||||
method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
|
||||
## start the transport
|
||||
await procCall Transport(t).close() # call base
|
||||
|
||||
|
@ -43,7 +43,7 @@ method close*(t: TcpTransport): Future[void] {.async.} =
|
|||
|
||||
method listen*(t: TcpTransport,
|
||||
ma: MultiAddress,
|
||||
handler: ConnHandler): Future[void] {.async.} =
|
||||
handler: ConnHandler): Future[void] {.async, gcsafe.} =
|
||||
await procCall Transport(t).listen(ma, handler) # call base
|
||||
|
||||
## listen on the transport
|
||||
|
@ -56,9 +56,9 @@ method listen*(t: TcpTransport,
|
|||
listenFuture.complete()
|
||||
|
||||
method dial*(t: TcpTransport,
|
||||
address: MultiAddress): Future[Connection] {.async.} =
|
||||
address: MultiAddress): Future[Connection] {.async, gcsafe.} =
|
||||
## dial a peer
|
||||
let client: StreamTransport = await connect(address)
|
||||
result = await t.connHandler(t.server, client)
|
||||
|
||||
method handles*(t: Transport, address: MultiAddress): bool = true
|
||||
method handles*(t: Transport, address: MultiAddress): bool {.gcsafe.} = true
|
||||
|
|
|
@ -23,15 +23,15 @@ type
|
|||
handler*: ConnHandler
|
||||
multicodec*: MultiCodec
|
||||
|
||||
method init*(t: Transport) {.base.} =
|
||||
method init*(t: Transport) {.base, gcsafe.} =
|
||||
## perform protocol initialization
|
||||
discard
|
||||
|
||||
proc newTransport*(t: typedesc[Transport]): t =
|
||||
proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} =
|
||||
new result
|
||||
result.init()
|
||||
|
||||
method close*(t: Transport) {.base, async.} =
|
||||
method close*(t: Transport) {.base, async, gcsafe.} =
|
||||
## stop and cleanup the transport
|
||||
## including all outstanding connections
|
||||
for c in t.connections:
|
||||
|
@ -39,17 +39,18 @@ method close*(t: Transport) {.base, async.} =
|
|||
|
||||
method listen*(t: Transport,
|
||||
ma: MultiAddress,
|
||||
handler: ConnHandler) {.base, async.} =
|
||||
handler: ConnHandler) {.base, async, gcsafe.} =
|
||||
## listen for incoming connections
|
||||
t.ma = ma
|
||||
t.handler = handler
|
||||
|
||||
method dial*(t: Transport,
|
||||
address: MultiAddress): Future[Connection] {.base, async.} =
|
||||
address: MultiAddress):
|
||||
Future[Connection] {.base, async, gcsafe.} =
|
||||
## dial a peer
|
||||
discard
|
||||
|
||||
method handles*(t: Transport, address: MultiAddress): bool {.base.} =
|
||||
method handles*(t: Transport, address: MultiAddress): bool {.base, gcsafe.} =
|
||||
## check if transport supportes the multiaddress
|
||||
# TODO: this should implement generic logic that would use the multicodec
|
||||
# declared in the multicodec field and set by each individual transport
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
import unittest
|
||||
import chronos, strutils, sequtils
|
||||
import ../libp2p/identify, ../libp2p/multiaddress,
|
||||
../libp2p/peerinfo, ../libp2p/peer, ../libp2p/connection,
|
||||
../libp2p/identify, ../libp2p/multistream, ../libp2p/transport,
|
||||
../libp2p/tcptransport, ../libp2p/protocol, ../libp2p/crypto/crypto
|
||||
../libp2p/peerinfo, ../libp2p/peer,
|
||||
../libp2p/connection, ../libp2p/identify,
|
||||
../libp2p/multistream, ../libp2p/transport,
|
||||
../libp2p/tcptransport, ../libp2p/protocol,
|
||||
../libp2p/crypto/crypto
|
||||
|
||||
suite "Identify":
|
||||
test "handle identify message6":
|
||||
|
|
|
@ -5,7 +5,7 @@ import ../libp2p/connection, ../libp2p/transport, ../libp2p/tcptransport,
|
|||
|
||||
suite "TCP transport suite":
|
||||
test "test listener: handle write":
|
||||
proc testListener(): Future[bool] {.async.} =
|
||||
proc testListener(): Future[bool] {.async, gcsafe.} =
|
||||
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53335")
|
||||
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
|
||||
result = conn.write(cstring("Hello!"), 6)
|
||||
|
@ -41,7 +41,7 @@ suite "TCP transport suite":
|
|||
test "test dialer: handle write":
|
||||
proc testDialer(address: TransportAddress): Future[bool] {.async.} =
|
||||
proc serveClient(server: StreamServer,
|
||||
transp: StreamTransport) {.async.} =
|
||||
transp: StreamTransport) {.async, gcsafe.} =
|
||||
var wstream = newAsyncStreamWriter(transp)
|
||||
await wstream.write("Hello!")
|
||||
await wstream.finish()
|
||||
|
@ -65,9 +65,9 @@ suite "TCP transport suite":
|
|||
check waitFor(testDialer(initTAddress("127.0.0.1:53337"))) == true
|
||||
|
||||
test "test dialer: handle write":
|
||||
proc testDialer(address: TransportAddress): Future[bool] {.async.} =
|
||||
proc testDialer(address: TransportAddress): Future[bool] {.async, gcsafe.} =
|
||||
proc serveClient(server: StreamServer,
|
||||
transp: StreamTransport) {.async.} =
|
||||
transp: StreamTransport) {.async, gcsafe.} =
|
||||
var rstream = newAsyncStreamReader(transp)
|
||||
let msg = await rstream.read(6)
|
||||
check cast[string](msg) == "Hello!"
|
||||
|
|
Loading…
Reference in New Issue