Fix GC issues with sending, according to latest Nim changes.

Changed datagram.nim API to allow sending parts of strings and seqs.
This commit is contained in:
cheatfate 2018-06-15 13:54:26 +03:00
parent 82f7b9f77f
commit 0ca2cd8e5c
3 changed files with 49 additions and 82 deletions

View File

@ -292,6 +292,16 @@ proc raiseTransportOsError*(err: OSErrorCode) =
var msg = "(" & $int(err) & ") " & osErrorMsg(err)
raise newException(TransportOsError, msg)
type
SeqHeader = object
length, reserved: int
proc isLiteral*(s: string): bool {.inline.} =
(cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0
proc isLiteral*[T](s: seq[T]): bool {.inline.} =
(cast[ptr SeqHeader](s).reserved and (1 shl (sizeof(int) * 8 - 2))) != 0
when defined(windows):
import winlean

View File

@ -564,12 +564,14 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
transp.resumeWrite()
return retFuture
proc send*(transp: DatagramTransport, msg: var string): Future[void] =
proc send*(transp: DatagramTransport, msg: string, msglen = -1): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = FutureGCString[void]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
let length = if msglen <= 0: len(msg) else: msglen
let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0],
buflen: len(msg),
writer: cast[Future[void]](retFuture))
@ -578,22 +580,25 @@ proc send*(transp: DatagramTransport, msg: var string): Future[void] =
transp.resumeWrite()
return retFuture
proc send*[T](transp: DatagramTransport, msg: var seq[T]): Future[void] =
proc send*[T](transp: DatagramTransport, msg: seq[T],
msglen = -1): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = FutureGCSeq[void, T]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
let vector = GramVector(kind: WithoutAddress, buf: unsafeAddr msg[0],
buflen: (len(msg) * sizeof(T)),
buflen: length,
writer: cast[Future[void]](retFuture))
transp.queue.addLast(vector)
if WritePaused in transp.state:
transp.resumeWrite()
return retFuture
proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
remote: TransportAddress): Future[void] =
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
pbytes: pointer, nbytes: int): Future[void] =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address ``remote``.
var retFuture = newFuture[void]()
@ -605,15 +610,17 @@ proc sendTo*(transp: DatagramTransport, pbytes: pointer, nbytes: int,
transp.resumeWrite()
return retFuture
proc sendTo*(transp: DatagramTransport, msg: var string,
remote: TransportAddress): Future[void] =
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
msg: string, msglen = -1): Future[void] =
## Send string ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
var retFuture = FutureGCString[void]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
let length = if msglen <= 0: len(msg) else: msglen
let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0],
buflen: len(msg),
buflen: length,
writer: cast[Future[void]](retFuture),
address: remote)
transp.queue.addLast(vector)
@ -621,15 +628,17 @@ proc sendTo*(transp: DatagramTransport, msg: var string,
transp.resumeWrite()
return retFuture
proc sendTo*[T](transp: DatagramTransport, msg: var seq[T],
remote: TransportAddress): Future[void] =
proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
msg: seq[T], msglen = -1): Future[void] =
## Send sequence ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
var retFuture = FutureGCSeq[void, T]()
transp.checkClosed(retFuture)
shallowCopy(retFuture.gcholder, msg)
if not isLiteral(msg):
shallowCopy(retFuture.gcholder, msg)
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
let vector = GramVector(kind: WithAddress, buf: unsafeAddr msg[0],
buflen: (len(msg) * sizeof(T)),
buflen: length,
writer: cast[Future[void]](retFuture),
address: remote)
transp.queue.addLast(vector)
@ -656,55 +665,3 @@ proc getMessage*(transp: DatagramTransport): seq[byte] =
proc getUserData*[T](transp: DatagramTransport): T {.inline.} =
## Obtain user data stored in ``transp`` object.
result = cast[T](transp.udata)
# proc createDatagramServer*(host: TransportAddress,
# cbproc: DatagramCallback,
# flags: set[ServerFlags] = {},
# sock: AsyncFD = asyncInvalidSocket,
# bufferSize: int = DefaultDatagramBufferSize,
# udata: pointer = nil): DatagramServer =
# var transp: DatagramTransport
# var fflags = flags + {NoAutoRead}
# if host.address.family == IpAddressFamily.IPv4:
# transp = newDatagramTransport(cbproc, AnyAddress, host, sock,
# fflags, udata, bufferSize)
# else:
# transp = newDatagramTransport6(cbproc, AnyAddress6, host, sock,
# fflags, udata, bufferSize)
# result = DatagramServer()
# result.transport = transp
# result.status = ServerStatus.Starting
# GC_ref(result)
# proc start*(server: DatagramServer) =
# ## Starts ``server``.
# if server.status == ServerStatus.Starting:
# server.transport.resumeRead()
# server.status = ServerStatus.Running
# proc stop*(server: DatagramServer) =
# ## Stops ``server``.
# if server.status == ServerStatus.Running:
# when defined(windows):
# if {WritePending, ReadPending} * server.transport.state != {}:
# ## CancelIO will stop both reading and writing.
# discard cancelIo(Handle(server.transport.fd))
# else:
# if WritePaused notin server.transport.state:
# server.transport.fd.removeWriter()
# if ReadPaused notin server.transport.state:
# server.transport.fd.removeReader()
# server.status = ServerStatus.Stopped
# proc join*(server: DatagramServer) {.async.} =
# ## Waits until ``server`` is not stopped.
# if not server.transport.future.finished:
# await server.transport.future
# proc close*(server: DatagramServer) =
# ## Release ``server`` resources.
# if server.status == ServerStatus.Stopped:
# server.status = ServerStatus.Closed
# server.transport.close()
# GC_unref(server)

View File

@ -27,10 +27,10 @@ proc client1(transp: DatagramTransport,
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(addr ans[0], len(ans), raddr)
await transp.sendTo(raddr, addr ans[0], len(ans))
else:
var err = "ERROR"
await transp.sendTo(addr err[0], len(err), raddr)
await transp.sendTo(raddr, addr err[0], len(err))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
@ -53,7 +53,7 @@ proc client2(transp: DatagramTransport,
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(addr req[0], len(req), ta)
await transp.sendTo(ta, addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
@ -135,7 +135,7 @@ proc client5(transp: DatagramTransport,
else:
var ta = initTAddress("127.0.0.1:33337")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(addr req[0], len(req), ta)
await transp.sendTo(ta, addr req[0], len(req))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
@ -159,10 +159,10 @@ proc client6(transp: DatagramTransport,
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(ans, raddr)
await transp.sendTo(raddr, ans)
else:
var err = "ERROR"
await transp.sendTo(err, raddr)
await transp.sendTo(raddr, err)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
@ -186,7 +186,7 @@ proc client7(transp: DatagramTransport,
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(req, ta)
await transp.sendTo(ta, req)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
@ -239,12 +239,12 @@ proc client9(transp: DatagramTransport,
var ans = "ANSWER" & $num
var ansseq = newSeq[byte](len(ans))
copyMem(addr ansseq[0], addr ans[0], len(ans))
await transp.sendTo(ansseq, raddr)
await transp.sendTo(raddr, ansseq)
else:
var err = "ERROR"
var errseq = newSeq[byte](len(err))
copyMem(addr errseq[0], addr err[0], len(err))
await transp.sendTo(errseq, raddr)
await transp.sendTo(raddr, errseq)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
@ -270,7 +270,7 @@ proc client10(transp: DatagramTransport,
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(reqseq, ta)
await transp.sendTo(ta, reqseq)
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
@ -317,7 +317,7 @@ proc testPointerSendTo(): Future[int] {.async.} =
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client2, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(addr data[0], len(data), ta)
await dgram2.sendTo(ta, addr data[0], len(data))
await dgram2.join()
dgram1.close()
dgram2.close()
@ -343,7 +343,7 @@ proc testStringSendTo(): Future[int] {.async.} =
var dgram1 = newDatagramTransport(client6, udata = addr counter, local = ta)
var dgram2 = newDatagramTransport(client7, udata = addr counter)
var data = "REQUEST0"
await dgram2.sendTo(data, ta)
await dgram2.sendTo(ta, data)
await dgram2.join()
dgram1.close()
dgram2.close()
@ -371,7 +371,7 @@ proc testSeqSendTo(): Future[int] {.async.} =
var data = "REQUEST0"
var dataseq = newSeq[byte](len(data))
copyMem(addr dataseq[0], addr data[0], len(data))
await dgram2.sendTo(dataseq, ta)
await dgram2.sendTo(ta, dataseq)
await dgram2.join()
dgram1.close()
dgram2.close()
@ -420,7 +420,7 @@ proc test3(bounded: bool): Future[int] {.async.} =
await grams[i].send(addr data[0], len(data))
else:
grams[i] = newDatagramTransport(client5, udata = addr counters[i])
await grams[i].sendTo(addr data[0], len(data), ta)
await grams[i].sendTo(ta, addr data[0], len(data))
clients[i] = grams[i].join()
await waitAll(clients)
@ -442,7 +442,7 @@ proc testConnReset(): Future[bool] {.async.} =
dgram1.close()
var dgram2 = newDatagramTransport(client20, udata = addr counter)
var data = "MESSAGE"
discard dgram2.sendTo(data, ta)
discard dgram2.sendTo(ta, data)
await sleepAsync(1000)
result = (counter == 0)