mirror of
https://github.com/status-im/nim-chronos.git
synced 2025-02-14 12:16:24 +00:00
Fix nasty GC misuse bug
This commit is contained in:
parent
302f573683
commit
5c6c723cb9
@ -87,8 +87,6 @@ when defined(windows):
|
|||||||
|
|
||||||
proc writeDatagramLoop(udata: pointer) =
|
proc writeDatagramLoop(udata: pointer) =
|
||||||
var bytesCount: int32
|
var bytesCount: int32
|
||||||
if isNil(udata):
|
|
||||||
return
|
|
||||||
var ovl = cast[PCustomOverlapped](udata)
|
var ovl = cast[PCustomOverlapped](udata)
|
||||||
var transp = cast[WindowsDatagramTransport](ovl.data.udata)
|
var transp = cast[WindowsDatagramTransport](ovl.data.udata)
|
||||||
while len(transp.queue) > 0:
|
while len(transp.queue) > 0:
|
||||||
@ -97,8 +95,7 @@ when defined(windows):
|
|||||||
transp.state.excl(WritePending)
|
transp.state.excl(WritePending)
|
||||||
let err = transp.wovl.data.errCode
|
let err = transp.wovl.data.errCode
|
||||||
if err == OSErrorCode(-1):
|
if err == OSErrorCode(-1):
|
||||||
var vector = transp.queue.popFirst()
|
transp.finishWriter()
|
||||||
vector.writer.complete()
|
|
||||||
else:
|
else:
|
||||||
transp.setWriteError(err)
|
transp.setWriteError(err)
|
||||||
transp.finishWriter()
|
transp.finishWriter()
|
||||||
@ -126,7 +123,7 @@ when defined(windows):
|
|||||||
else:
|
else:
|
||||||
transp.state.excl(WritePending)
|
transp.state.excl(WritePending)
|
||||||
transp.setWriteError(err)
|
transp.setWriteError(err)
|
||||||
transp.finishWriter()
|
vector.writer.complete()
|
||||||
else:
|
else:
|
||||||
transp.queue.addFirst(vector)
|
transp.queue.addFirst(vector)
|
||||||
break
|
break
|
||||||
@ -138,8 +135,6 @@ when defined(windows):
|
|||||||
var
|
var
|
||||||
bytesCount: int32
|
bytesCount: int32
|
||||||
raddr: TransportAddress
|
raddr: TransportAddress
|
||||||
if isNil(udata):
|
|
||||||
return
|
|
||||||
var ovl = cast[PCustomOverlapped](udata)
|
var ovl = cast[PCustomOverlapped](udata)
|
||||||
var transp = cast[WindowsDatagramTransport](ovl.data.udata)
|
var transp = cast[WindowsDatagramTransport](ovl.data.udata)
|
||||||
while true:
|
while true:
|
||||||
@ -156,7 +151,7 @@ when defined(windows):
|
|||||||
transp.state.incl(ReadPaused)
|
transp.state.incl(ReadPaused)
|
||||||
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
|
fromSockAddr(transp.raddr, transp.ralen, raddr.address, raddr.port)
|
||||||
discard transp.function(transp, addr transp.buffer[0], bytesCount,
|
discard transp.function(transp, addr transp.buffer[0], bytesCount,
|
||||||
raddr, transp.udata)
|
raddr, transp.udata)
|
||||||
else:
|
else:
|
||||||
transp.setReadError(err)
|
transp.setReadError(err)
|
||||||
transp.state.incl(ReadPaused)
|
transp.state.incl(ReadPaused)
|
||||||
@ -280,6 +275,7 @@ when defined(windows):
|
|||||||
udata: cast[pointer](wresult))
|
udata: cast[pointer](wresult))
|
||||||
wresult.wsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]),
|
wresult.wsabuf = TWSABuf(buf: cast[cstring](addr wresult.buffer[0]),
|
||||||
len: int32(len(wresult.buffer)))
|
len: int32(len(wresult.buffer)))
|
||||||
|
GC_ref(wresult)
|
||||||
result = cast[DatagramTransport](wresult)
|
result = cast[DatagramTransport](wresult)
|
||||||
result.resumeRead()
|
result.resumeRead()
|
||||||
|
|
||||||
@ -291,6 +287,8 @@ when defined(windows):
|
|||||||
transp.state.incl(WriteClosed)
|
transp.state.incl(WriteClosed)
|
||||||
transp.state.incl(ReadClosed)
|
transp.state.incl(ReadClosed)
|
||||||
transp.future.complete()
|
transp.future.complete()
|
||||||
|
var wresult = cast[WindowsDatagramTransport](transp)
|
||||||
|
GC_unref(wresult)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|
||||||
@ -430,6 +428,7 @@ else:
|
|||||||
result.udata = udata
|
result.udata = udata
|
||||||
result.state = {WritePaused}
|
result.state = {WritePaused}
|
||||||
result.future = newFuture[void]("datagram.transport")
|
result.future = newFuture[void]("datagram.transport")
|
||||||
|
GC_ref(result)
|
||||||
result.resumeRead()
|
result.resumeRead()
|
||||||
|
|
||||||
proc close*(transp: DatagramTransport) =
|
proc close*(transp: DatagramTransport) =
|
||||||
@ -439,6 +438,7 @@ else:
|
|||||||
transp.state.incl(WriteClosed)
|
transp.state.incl(WriteClosed)
|
||||||
transp.state.incl(ReadClosed)
|
transp.state.incl(ReadClosed)
|
||||||
transp.future.complete()
|
transp.future.complete()
|
||||||
|
GC_unref(transp)
|
||||||
|
|
||||||
proc newDatagramTransport*(cbproc: DatagramCallback,
|
proc newDatagramTransport*(cbproc: DatagramCallback,
|
||||||
remote: TransportAddress = AnyAddress,
|
remote: TransportAddress = AnyAddress,
|
||||||
@ -463,7 +463,8 @@ proc newDatagramTransport6*(cbproc: DatagramCallback,
|
|||||||
flags, udata, bufSize)
|
flags, udata, bufSize)
|
||||||
|
|
||||||
proc join*(transp: DatagramTransport) {.async.} =
|
proc join*(transp: DatagramTransport) {.async.} =
|
||||||
await transp.future
|
if not transp.future.finished:
|
||||||
|
await transp.future
|
||||||
|
|
||||||
proc send*(transp: DatagramTransport, pbytes: pointer,
|
proc send*(transp: DatagramTransport, pbytes: pointer,
|
||||||
nbytes: int) {.async.} =
|
nbytes: int) {.async.} =
|
||||||
|
@ -1,10 +1,18 @@
|
|||||||
|
# Asyncdispatch2
|
||||||
|
# (c) Copyright 2018
|
||||||
|
# Status Research & Development GmbH
|
||||||
|
#
|
||||||
|
# Licensed under either of
|
||||||
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
import strutils, net, unittest
|
import strutils, net, unittest
|
||||||
import ../asyncdispatch2
|
import ../asyncdispatch2
|
||||||
|
|
||||||
const
|
const
|
||||||
TestsCount = 5000
|
TestsCount = 10000
|
||||||
ClientsCount = 2
|
ClientsCount = 100
|
||||||
MessagesCount = 1000
|
MessagesCount = 100
|
||||||
|
|
||||||
proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int,
|
proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int,
|
||||||
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
|
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
|
||||||
@ -22,7 +30,6 @@ proc client1(transp: DatagramTransport, pbytes: pointer, nbytes: int,
|
|||||||
await transp.sendTo(addr err[0], len(err), raddr)
|
await transp.sendTo(addr err[0], len(err), raddr)
|
||||||
else:
|
else:
|
||||||
## Read operation failed with error
|
## Read operation failed with error
|
||||||
echo "SERVER ERROR HAPPENS QUITING"
|
|
||||||
var counterPtr = cast[ptr int](udata)
|
var counterPtr = cast[ptr int](udata)
|
||||||
counterPtr[] = -1
|
counterPtr[] = -1
|
||||||
transp.close()
|
transp.close()
|
||||||
@ -91,19 +98,43 @@ proc client4(transp: DatagramTransport, pbytes: pointer, nbytes: int,
|
|||||||
transp.close()
|
transp.close()
|
||||||
else:
|
else:
|
||||||
var req = "REQUEST" & $counterPtr[]
|
var req = "REQUEST" & $counterPtr[]
|
||||||
echo $counterPtr[] & "-SEND"
|
|
||||||
await transp.send(addr req[0], len(req))
|
await transp.send(addr req[0], len(req))
|
||||||
else:
|
else:
|
||||||
echo "ERROR1 [" & $data & "]"
|
|
||||||
var counterPtr = cast[ptr int](udata)
|
var counterPtr = cast[ptr int](udata)
|
||||||
counterPtr[] = -1
|
counterPtr[] = -1
|
||||||
transp.close()
|
transp.close()
|
||||||
else:
|
else:
|
||||||
## Read operation failed with error
|
## Read operation failed with error
|
||||||
echo "ERROR2"
|
|
||||||
var counterPtr = cast[ptr int](udata)
|
var counterPtr = cast[ptr int](udata)
|
||||||
counterPtr[] = -1
|
counterPtr[] = -1
|
||||||
transp.close()
|
transp.close()
|
||||||
|
|
||||||
|
proc client5(transp: DatagramTransport, pbytes: pointer, nbytes: int,
|
||||||
|
raddr: TransportAddress, udata: pointer): Future[void] {.async.} =
|
||||||
|
if not isNil(pbytes):
|
||||||
|
var data = newString(nbytes + 1)
|
||||||
|
copyMem(addr data[0], pbytes, nbytes)
|
||||||
|
data.setLen(nbytes)
|
||||||
|
if data.startsWith("ANSWER"):
|
||||||
|
var counterPtr = cast[ptr int](udata)
|
||||||
|
counterPtr[] = counterPtr[] + 1
|
||||||
|
if counterPtr[] == MessagesCount:
|
||||||
|
transp.close()
|
||||||
|
else:
|
||||||
|
var ta: TransportAddress
|
||||||
|
ta.address = parseIpAddress("127.0.0.1")
|
||||||
|
ta.port = Port(33337)
|
||||||
|
var req = "REQUEST" & $counterPtr[]
|
||||||
|
await transp.sendTo(addr req[0], len(req), ta)
|
||||||
|
else:
|
||||||
|
var counterPtr = cast[ptr int](udata)
|
||||||
|
counterPtr[] = -1
|
||||||
|
transp.close()
|
||||||
|
else:
|
||||||
|
## Read operation failed with error
|
||||||
|
var counterPtr = cast[ptr int](udata)
|
||||||
|
counterPtr[] = -1
|
||||||
|
transp.close()
|
||||||
|
|
||||||
proc test1(): Future[int] {.async.} =
|
proc test1(): Future[int] {.async.} =
|
||||||
var ta: TransportAddress
|
var ta: TransportAddress
|
||||||
@ -142,7 +173,7 @@ proc waitAll(futs: seq[Future[void]]): Future[void] =
|
|||||||
fut.addCallback(cb)
|
fut.addCallback(cb)
|
||||||
return retFuture
|
return retFuture
|
||||||
|
|
||||||
proc test3(): Future[int] {.async.} =
|
proc test3(bounded: bool): Future[int] {.async.} =
|
||||||
var ta: TransportAddress
|
var ta: TransportAddress
|
||||||
ta.address = parseIpAddress("127.0.0.1")
|
ta.address = parseIpAddress("127.0.0.1")
|
||||||
ta.port = Port(33337)
|
ta.port = Port(33337)
|
||||||
@ -150,14 +181,18 @@ proc test3(): Future[int] {.async.} =
|
|||||||
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
|
var dgram1 = newDatagramTransport(client1, udata = addr counter, local = ta)
|
||||||
var clients = newSeq[Future[void]](ClientsCount)
|
var clients = newSeq[Future[void]](ClientsCount)
|
||||||
var counters = newSeq[int](ClientsCount)
|
var counters = newSeq[int](ClientsCount)
|
||||||
|
var dgram: DatagramTransport
|
||||||
for i in 0..<ClientsCount:
|
for i in 0..<ClientsCount:
|
||||||
var dgram = newDatagramTransport(client4, udata = addr counters[i],
|
|
||||||
remote = ta)
|
|
||||||
echo "FIRST SEND at " & toHex(cast[uint](dgram))
|
|
||||||
var data = "REQUEST0"
|
var data = "REQUEST0"
|
||||||
await dgram.sendTo(addr data[0], len(data), ta)
|
if bounded:
|
||||||
|
dgram = newDatagramTransport(client4, udata = addr counters[i],
|
||||||
|
remote = ta)
|
||||||
|
await dgram.send(addr data[0], len(data))
|
||||||
|
else:
|
||||||
|
dgram = newDatagramTransport(client5, udata = addr counters[i])
|
||||||
|
await dgram.sendTo(addr data[0], len(data), ta)
|
||||||
clients[i] = dgram.join()
|
clients[i] = dgram.join()
|
||||||
# await dgram1.join()
|
|
||||||
await waitAll(clients)
|
await waitAll(clients)
|
||||||
dgram1.close()
|
dgram1.close()
|
||||||
result = 0
|
result = 0
|
||||||
@ -165,10 +200,19 @@ proc test3(): Future[int] {.async.} =
|
|||||||
result += counters[i]
|
result += counters[i]
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
|
const
|
||||||
|
m1 = "Unbounded test (" & $TestsCount & " messages)"
|
||||||
|
m2 = "Bounded test (" & $TestsCount & " messages)"
|
||||||
|
m3 = "Unbounded multiple clients with messages (" & $ClientsCount &
|
||||||
|
" clients x " & $MessagesCount & " messages)"
|
||||||
|
m4 = "Bounded multiple clients with messages (" & $ClientsCount &
|
||||||
|
" clients x " & $MessagesCount & " messages)"
|
||||||
suite "Datagram Transport test suite":
|
suite "Datagram Transport test suite":
|
||||||
# test "Unbounded test (5000 times)":
|
test m1:
|
||||||
# check waitFor(test1()) == TestsCount
|
check waitFor(test1()) == TestsCount
|
||||||
# test "Bound test (5000 times)":
|
test m2:
|
||||||
# check waitFor(test2()) == TestsCount
|
check waitFor(test2()) == TestsCount
|
||||||
test "Multiple clients with messages":
|
test m3:
|
||||||
echo waitFor(test3())
|
check waitFor(test3(false)) == ClientsCount * MessagesCount
|
||||||
|
test m4:
|
||||||
|
check waitFor(test3(true)) == ClientsCount * MessagesCount
|
||||||
|
Loading…
x
Reference in New Issue
Block a user