mirror of
https://github.com/logos-storage/apatheia.git
synced 2026-01-05 22:43:10 +00:00
Merge eff04d8efa57f37a9ba156e0ffe3b6baf5cbe552 into 61ff2594d3e204b2ef597b17207094ada2fd7c5c
This commit is contained in:
commit
c7f7f6ec05
@ -50,6 +50,11 @@ type
|
|||||||
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
|
template toOpenArray*[T](arr: OpenArrayHolder[T]): auto =
|
||||||
system.toOpenArray(arr.data, 0, arr.size)
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
proc toArrayHolder*[T](data: seq[T]): OpenArrayHolder[T] =
|
||||||
|
OpenArrayHolder[T](
|
||||||
|
data: cast[ptr UncheckedArray[T]](unsafeAddr(data[0])), size: data.len()
|
||||||
|
)
|
||||||
|
|
||||||
func jobId*[T](fut: Future[T]): JobId =
|
func jobId*[T](fut: Future[T]): JobId =
|
||||||
JobId fut.id()
|
JobId fut.id()
|
||||||
|
|
||||||
@ -93,9 +98,7 @@ template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] =
|
|||||||
when T is SupportedSeqTypes:
|
when T is SupportedSeqTypes:
|
||||||
let rval = SeqRetainer[T](data: exp)
|
let rval = SeqRetainer[T](data: exp)
|
||||||
retainMemory(fut.jobId(), rval)
|
retainMemory(fut.jobId(), rval)
|
||||||
let expPtr = OpenArrayHolder[T](
|
let expPtr = toArrayHolder(rval.data)
|
||||||
data: cast[ptr UncheckedArray[T]](unsafeAddr(rval.data[0])), size: rval.data.len()
|
|
||||||
)
|
|
||||||
expPtr
|
expPtr
|
||||||
else:
|
else:
|
||||||
{.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).}
|
{.error: "unsupported sequence type for job argument: " & $typeof(seq[T]).}
|
||||||
@ -103,9 +106,7 @@ template checkJobArgs*[T](exp: seq[T], fut: untyped): OpenArrayHolder[T] =
|
|||||||
template checkJobArgs*(exp: string, fut: untyped): OpenArrayHolder[char] =
|
template checkJobArgs*(exp: string, fut: untyped): OpenArrayHolder[char] =
|
||||||
let rval = StrRetainer(data: exp)
|
let rval = StrRetainer(data: exp)
|
||||||
retainMemory(fut.jobId(), rval)
|
retainMemory(fut.jobId(), rval)
|
||||||
let expPtr = OpenArrayHolder[char](
|
let expPtr = toArrayHolder(rval.data)
|
||||||
data: cast[ptr UncheckedArray[char]](unsafeAddr(rval.data[0])), size: rval.data.len()
|
|
||||||
)
|
|
||||||
expPtr
|
expPtr
|
||||||
|
|
||||||
template checkJobArgs*(exp: typed, fut: untyped): auto =
|
template checkJobArgs*(exp: typed, fut: untyped): auto =
|
||||||
|
|||||||
1
tests/exCursor.nim
Normal file
1
tests/exCursor.nim
Normal file
@ -0,0 +1 @@
|
|||||||
|
|
||||||
2
tests/exampleGcFailures/config.nims
Normal file
2
tests/exampleGcFailures/config.nims
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
--threads:on
|
||||||
|
--mm:refc
|
||||||
69
tests/exampleGcFailures/exFailure.nim
Normal file
69
tests/exampleGcFailures/exFailure.nim
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
import std/os
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## This example mocks up a sequence and uses
|
||||||
|
## a finalizer and GC_fullCollect to more
|
||||||
|
## deterministically create a memory error.
|
||||||
|
##
|
||||||
|
## see `exFailureSeq.nim` for a probablisitc based
|
||||||
|
## example using a real seq object.
|
||||||
|
##
|
||||||
|
|
||||||
|
type
|
||||||
|
Seq*[T] = object
|
||||||
|
data*: ptr UncheckedArray[T]
|
||||||
|
size*: int
|
||||||
|
|
||||||
|
DataObj = ref object
|
||||||
|
mockSeq: Seq[char]
|
||||||
|
|
||||||
|
template toOpenArray*[T](arr: Seq[T]): auto =
|
||||||
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
proc worker(data: ptr Seq[char], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(1_000)
|
||||||
|
echo "running worker: "
|
||||||
|
assert data[].data != nil
|
||||||
|
echo "worker: ", data[].toOpenArray()
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc finalizer(obj: DataObj) =
|
||||||
|
echo "finalize DataObj and freeing mockSeq"
|
||||||
|
obj.mockSeq.data.dealloc()
|
||||||
|
obj.mockSeq.data = nil
|
||||||
|
|
||||||
|
proc initMockSeq(msg: string): Seq[char] =
|
||||||
|
result.data = cast[ptr UncheckedArray[char]](alloc0(13))
|
||||||
|
for i, c in msg:
|
||||||
|
result.data[i] = c
|
||||||
|
result.size = 12
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj: DataObj
|
||||||
|
obj.new(finalizer)
|
||||||
|
obj.mockSeq = initMockSeq("hello world!")
|
||||||
|
|
||||||
|
echo "spawn worker"
|
||||||
|
tp.spawn worker(addr obj.mockSeq, sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
await wait(sig)
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
try:
|
||||||
|
await runTest(tp, sig).wait(100.milliseconds)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
echo "Run GC"
|
||||||
|
GC_fullCollect()
|
||||||
|
os.sleep(2_000)
|
||||||
|
echo "Done"
|
||||||
81
tests/exampleGcFailures/exFailure2.nim
Normal file
81
tests/exampleGcFailures/exFailure2.nim
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
import std/os
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## This example mocks up a sequence and uses
|
||||||
|
## a finalizer and GC_fullCollect to more
|
||||||
|
## deterministically create a memory error.
|
||||||
|
##
|
||||||
|
## see `exFailureSeq.nim` for a probablisitc based
|
||||||
|
## example using a real seq object.
|
||||||
|
##
|
||||||
|
|
||||||
|
type
|
||||||
|
Seq*[T] = object
|
||||||
|
data*: ptr UncheckedArray[T]
|
||||||
|
size*: int
|
||||||
|
|
||||||
|
DataObj = ref object
|
||||||
|
mockSeq: Seq[char]
|
||||||
|
|
||||||
|
template toOpenArray*[T](arr: Seq[T]): auto =
|
||||||
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
proc worker(data: ptr Seq[char], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(5_000)
|
||||||
|
echo "running worker: "
|
||||||
|
assert data[].data != nil
|
||||||
|
echo "worker: ", data[].toOpenArray()
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc finalizer(obj: DataObj) =
|
||||||
|
echo "finalize DataObj and freeing mockSeq"
|
||||||
|
obj.mockSeq.data.dealloc()
|
||||||
|
obj.mockSeq.data = nil
|
||||||
|
|
||||||
|
proc initMockSeq(msg: string): Seq[char] =
|
||||||
|
result.data = cast[ptr UncheckedArray[char]](alloc0(13))
|
||||||
|
for i, c in msg:
|
||||||
|
result.data[i] = c
|
||||||
|
result.size = 12
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj: DataObj
|
||||||
|
obj.new(finalizer)
|
||||||
|
obj.mockSeq = initMockSeq("hello world!")
|
||||||
|
|
||||||
|
echo "spawn worker"
|
||||||
|
tp.spawn worker(addr obj.mockSeq, sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
try:
|
||||||
|
await wait(sig)
|
||||||
|
echo "runTest got sig"
|
||||||
|
finally:
|
||||||
|
echo "runTest done"
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc doFail() {.async.} =
|
||||||
|
await sleepAsync(10.milliseconds)
|
||||||
|
raise newException(CatchableError, "error")
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
try:
|
||||||
|
await allFutures(
|
||||||
|
runTest(tp, sig),
|
||||||
|
doFail(),
|
||||||
|
)
|
||||||
|
except Defect:
|
||||||
|
echo "Errored out"
|
||||||
|
finally:
|
||||||
|
echo "Run GC"
|
||||||
|
GC_fullCollect()
|
||||||
|
os.sleep(2_000)
|
||||||
63
tests/exampleGcFailures/exFailureNoGcCollect.nim
Normal file
63
tests/exampleGcFailures/exFailureNoGcCollect.nim
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
import std/os
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
type
|
||||||
|
Seq*[T] = object
|
||||||
|
data*: ptr UncheckedArray[T]
|
||||||
|
size*: int
|
||||||
|
|
||||||
|
DataObj = ref object
|
||||||
|
mockSeq: Seq[char]
|
||||||
|
|
||||||
|
template toOpenArray*[T](arr: Seq[T]): auto =
|
||||||
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
proc worker(data: ptr Seq[char], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(4_000)
|
||||||
|
echo "running worker: "
|
||||||
|
assert data[].data != nil
|
||||||
|
echo "worker: ", data[].toOpenArray()
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc finalizer(obj: DataObj) =
|
||||||
|
echo "finalize DataObj and freeing mockSeq"
|
||||||
|
obj.mockSeq.data.dealloc()
|
||||||
|
obj.mockSeq.data = nil
|
||||||
|
|
||||||
|
proc initMockSeq(msg: string): Seq[char] =
|
||||||
|
result.data = cast[ptr UncheckedArray[char]](alloc0(13))
|
||||||
|
for i, c in msg:
|
||||||
|
result.data[i] = c
|
||||||
|
result.size = 12
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj: DataObj
|
||||||
|
obj.new(finalizer)
|
||||||
|
obj.mockSeq = initMockSeq("hello world!")
|
||||||
|
|
||||||
|
echo "spawn worker"
|
||||||
|
tp.spawn worker(addr obj.mockSeq, sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
await wait(sig).wait(100.milliseconds)
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
for i in 1..2_000:
|
||||||
|
try:
|
||||||
|
await runTest(tp, sig)
|
||||||
|
os.sleep(200)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
echo "looping..."
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
68
tests/exampleGcFailures/exFailureSeq.nim
Normal file
68
tests/exampleGcFailures/exFailureSeq.nim
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
import std/os
|
||||||
|
import std/sequtils
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## create a probablistically likely failure of
|
||||||
|
## using sequence memory from another thread
|
||||||
|
## with refc.
|
||||||
|
##
|
||||||
|
## However, unlike `exFailure.nim`, this can take
|
||||||
|
## a while to run.
|
||||||
|
##
|
||||||
|
## It may not always produce an error either, but
|
||||||
|
## generally does so in a few seconds of running.
|
||||||
|
##
|
||||||
|
|
||||||
|
type
|
||||||
|
SeqDataPtr*[T] = object
|
||||||
|
data*: ptr UncheckedArray[T]
|
||||||
|
size*: int
|
||||||
|
|
||||||
|
template toOpenArray*[T](arr: SeqDataPtr[T]): auto =
|
||||||
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
proc toSeqDataPtr*[T](data: seq[T]): SeqDataPtr[T] =
|
||||||
|
SeqDataPtr[T](
|
||||||
|
data: cast[ptr UncheckedArray[T]](unsafeAddr(data[0])), size: data.len()
|
||||||
|
)
|
||||||
|
|
||||||
|
proc worker(data: SeqDataPtr[char], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(10)
|
||||||
|
echo "running worker: "
|
||||||
|
echo "worker: ", data.toOpenArray()
|
||||||
|
for i, c in data.toOpenArray():
|
||||||
|
data.data[i] = char(c.uint8 + 10)
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj = "hello world!".toSeq()
|
||||||
|
|
||||||
|
# echo "spawn worker"
|
||||||
|
tp.spawn worker(obj.toSeqDataPtr(), sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
await wait(sig).wait(1.milliseconds)
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
for i in 1..30_000:
|
||||||
|
try:
|
||||||
|
await runTest(tp, sig)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
# os.sleep(1)
|
||||||
|
# echo "looping..."
|
||||||
|
# GC_fullCollect()
|
||||||
|
discard
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
|
os.sleep(10_000)
|
||||||
66
tests/exampleGcFailures/exFailureSeqSeqCursor.nim
Normal file
66
tests/exampleGcFailures/exFailureSeqSeqCursor.nim
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
import std/os
|
||||||
|
import std/sequtils
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## create a probablistically likely failure of
|
||||||
|
## using sequence memory from another thread
|
||||||
|
## with refc.
|
||||||
|
##
|
||||||
|
## However, unlike `exFailure.nim`, this can take
|
||||||
|
## a while to run.
|
||||||
|
##
|
||||||
|
## It may not always produce an error either, but
|
||||||
|
## generally does so in a few seconds of running.
|
||||||
|
##
|
||||||
|
|
||||||
|
type
|
||||||
|
SeqCursor* = object
|
||||||
|
data* {.cursor.}: seq[seq[char]]
|
||||||
|
|
||||||
|
proc worker(data: SeqCursor, sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(10)
|
||||||
|
echo "running worker: "
|
||||||
|
echo "worker: ", data.data
|
||||||
|
echo "worker:addr: ", cast[pointer](data.data).repr
|
||||||
|
# for i, d in data.data:
|
||||||
|
# for j, c in d:
|
||||||
|
# data.data[i][j] = char(c.uint8 + 10)
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj1 = "hello world!".toSeq()
|
||||||
|
var obj2 = "goodbye denver!".toSeq()
|
||||||
|
var data = @[obj1, obj2]
|
||||||
|
var cur = SeqCursor(data: data)
|
||||||
|
|
||||||
|
# echo "spawn worker"
|
||||||
|
tp.spawn worker(cur, sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
await wait(sig)
|
||||||
|
echo "data:addr: ", data.addr.pointer.repr
|
||||||
|
echo "data:cursor:addr: ", cast[pointer](cur.data).repr
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
for i in 1..1:
|
||||||
|
try:
|
||||||
|
await runTest(tp, sig)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
# os.sleep(1)
|
||||||
|
# echo "looping..."
|
||||||
|
# GC_fullCollect()
|
||||||
|
discard
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
|
# os.sleep(10_000)
|
||||||
84
tests/exampleGcFailures/exFailureWrapSuccess.nim
Normal file
84
tests/exampleGcFailures/exFailureWrapSuccess.nim
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
import std/os
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## This example mocks up a sequence and uses
|
||||||
|
## a finalizer and GC_fullCollect to more
|
||||||
|
## deterministically create a memory error.
|
||||||
|
##
|
||||||
|
## This variant tries using `allFutures` to prevent
|
||||||
|
## cancelation of the thread-future.
|
||||||
|
##
|
||||||
|
## It keeps the future from finishing before
|
||||||
|
## the task runs. However, it doesn't appear to be
|
||||||
|
## triggering the finalizer for the MockSeq.
|
||||||
|
##
|
||||||
|
|
||||||
|
type
|
||||||
|
Seq*[T] = object
|
||||||
|
data*: ptr UncheckedArray[T]
|
||||||
|
size*: int
|
||||||
|
|
||||||
|
DataObj = ref object
|
||||||
|
mockSeq: Seq[char]
|
||||||
|
|
||||||
|
template toOpenArray*[T](arr: Seq[T]): auto =
|
||||||
|
system.toOpenArray(arr.data, 0, arr.size)
|
||||||
|
|
||||||
|
proc worker(data: ptr Seq[char], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(1_000)
|
||||||
|
echo "running worker: "
|
||||||
|
assert data[].data != nil
|
||||||
|
echo "worker: ", data[].toOpenArray()
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc finalizer(obj: DataObj) =
|
||||||
|
echo "finalize DataObj and freeing mockSeq"
|
||||||
|
obj.mockSeq.data.dealloc()
|
||||||
|
obj.mockSeq.data = nil
|
||||||
|
|
||||||
|
proc initMockSeq(msg: string): Seq[char] =
|
||||||
|
result.data = cast[ptr UncheckedArray[char]](alloc0(13))
|
||||||
|
for i, c in msg:
|
||||||
|
result.data[i] = c
|
||||||
|
result.size = 12
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj: DataObj
|
||||||
|
obj.new(finalizer)
|
||||||
|
obj.mockSeq = initMockSeq("hello world!")
|
||||||
|
|
||||||
|
echo "spawn worker"
|
||||||
|
tp.spawn worker(addr obj.mockSeq, sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
try:
|
||||||
|
await wait(sig)
|
||||||
|
finally:
|
||||||
|
echo "finishing runTest"
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc runTestWrap(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
await runTest(tp, sig).allFutures().wait(100.milliseconds)
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
try:
|
||||||
|
await runTestWrap(tp, sig)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
echo "Run GC"
|
||||||
|
GC_fullCollect()
|
||||||
|
os.sleep(2_000)
|
||||||
|
echo "Done"
|
||||||
|
os.sleep(10_000)
|
||||||
|
GC_fullCollect()
|
||||||
|
|
||||||
|
teardown:
|
||||||
|
GC_fullCollect()
|
||||||
39
tests/exampleGcFailures/exNoFailureNoGcCollectSeq.nim
Normal file
39
tests/exampleGcFailures/exNoFailureNoGcCollectSeq.nim
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import std/os
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
proc worker(data: string, sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(4_000)
|
||||||
|
echo "running worker: "
|
||||||
|
echo "worker: ", data
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj = "hello world!"
|
||||||
|
# obj.shallow()
|
||||||
|
|
||||||
|
echo "spawn worker"
|
||||||
|
tp.spawn worker(obj, sig)
|
||||||
|
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
await wait(sig).wait(100.milliseconds)
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
for i in 1..2_000:
|
||||||
|
try:
|
||||||
|
await runTest(tp, sig)
|
||||||
|
os.sleep(200)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
echo "looping..."
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
48
tests/exampleGcFailures/exNoFailureNoGcCollectSeqDestroy.nim
Normal file
48
tests/exampleGcFailures/exNoFailureNoGcCollectSeqDestroy.nim
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
import std/os
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
type
|
||||||
|
Test* = object
|
||||||
|
count*: int
|
||||||
|
|
||||||
|
proc `=destroy`*(obj: var Test) =
|
||||||
|
echo "destroy count: ", obj.count, " thread: ", getThreadId()
|
||||||
|
|
||||||
|
proc worker(data: seq[Test], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(40)
|
||||||
|
echo "running worker: ", getThreadId()
|
||||||
|
echo "worker: ", data
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
## init
|
||||||
|
var obj = @[Test(count: 1), Test(count: 2)]
|
||||||
|
|
||||||
|
echo "spawn worker"
|
||||||
|
tp.spawn worker(obj, sig)
|
||||||
|
|
||||||
|
obj[0].count = 10
|
||||||
|
obj[1].count = 20
|
||||||
|
## adding fut.wait(100.milliseconds) creates memory issue
|
||||||
|
await wait(sig)
|
||||||
|
## just doing the wait is fine:
|
||||||
|
# await wait(sig)
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
for i in 1..1:
|
||||||
|
try:
|
||||||
|
echo "\n\nrunning main: ", getThreadId()
|
||||||
|
await runTest(tp, sig)
|
||||||
|
os.sleep(200)
|
||||||
|
except AsyncTimeoutError:
|
||||||
|
echo "looping..."
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 2) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
60
tests/exampleGcFailures/exNoFailurePtrSeqSeq.nim
Normal file
60
tests/exampleGcFailures/exNoFailurePtrSeqSeq.nim
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
import std/os
|
||||||
|
import std/sequtils
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## create a probablistically likely failure of
|
||||||
|
## using sequence memory from another thread
|
||||||
|
## with refc.
|
||||||
|
##
|
||||||
|
## However, unlike `exFailure.nim`, this can take
|
||||||
|
## a while to run.
|
||||||
|
##
|
||||||
|
## It may not always produce an error either, but
|
||||||
|
## generally does so in a few seconds of running.
|
||||||
|
##
|
||||||
|
|
||||||
|
proc worker(data: ptr seq[seq[char]], sig: ThreadSignalPtr) =
|
||||||
|
# os.sleep(100)
|
||||||
|
echo "running worker: "
|
||||||
|
echo "worker: ", data.pointer.repr
|
||||||
|
echo "worker: ", data[]
|
||||||
|
# for i, d in data:
|
||||||
|
# for j, c in d:
|
||||||
|
# data[i][j] = char(c.uint8 + 10)
|
||||||
|
GC_fullCollect()
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr, i: int) {.async.} =
|
||||||
|
## init
|
||||||
|
# await sleepAsync(10.milliseconds)
|
||||||
|
var obj1 = ("hello world! " & $i).toSeq()
|
||||||
|
var obj2 = "goodbye denver!".toSeq()
|
||||||
|
var data = @[obj1, obj2]
|
||||||
|
|
||||||
|
# echo "spawn worker"
|
||||||
|
tp.spawn worker(addr data, sig)
|
||||||
|
|
||||||
|
await wait(sig)
|
||||||
|
echo "data: ", data.addr.pointer.repr
|
||||||
|
echo "data: ", data
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
var futs = newSeq[Future[void]]()
|
||||||
|
for i in 1..1:
|
||||||
|
let f = runTest(tp, sig, i)
|
||||||
|
# futs.add f
|
||||||
|
await f
|
||||||
|
GC_fullCollect()
|
||||||
|
await allFutures(futs)
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 4) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
|
# os.sleep(10_000)
|
||||||
60
tests/exampleGcFailures/exNoFailureSeqSeq.nim
Normal file
60
tests/exampleGcFailures/exNoFailureSeqSeq.nim
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
import std/os
|
||||||
|
import std/sequtils
|
||||||
|
import chronos
|
||||||
|
import chronos/threadsync
|
||||||
|
import chronos/unittest2/asynctests
|
||||||
|
import taskpools
|
||||||
|
|
||||||
|
## create a probablistically likely failure of
|
||||||
|
## using sequence memory from another thread
|
||||||
|
## with refc.
|
||||||
|
##
|
||||||
|
## However, unlike `exFailure.nim`, this can take
|
||||||
|
## a while to run.
|
||||||
|
##
|
||||||
|
## It may not always produce an error either, but
|
||||||
|
## generally does so in a few seconds of running.
|
||||||
|
##
|
||||||
|
|
||||||
|
proc worker(data: seq[seq[char]], sig: ThreadSignalPtr) =
|
||||||
|
os.sleep(1_000)
|
||||||
|
echo "running worker: "
|
||||||
|
echo "worker: ", data.unsafeAddr.pointer.repr
|
||||||
|
echo "worker: ", data
|
||||||
|
# for i, d in data:
|
||||||
|
# for j, c in d:
|
||||||
|
# data[i][j] = char(c.uint8 + 10)
|
||||||
|
GC_fullCollect()
|
||||||
|
discard sig.fireSync()
|
||||||
|
|
||||||
|
proc runTest(tp: TaskPool, sig: ThreadSignalPtr, i: int) {.async.} =
|
||||||
|
## init
|
||||||
|
# await sleepAsync(10.milliseconds)
|
||||||
|
var obj1 = ("hello world! " & $i).toSeq()
|
||||||
|
var obj2 = "goodbye denver!".toSeq()
|
||||||
|
var data = @[obj1, obj2]
|
||||||
|
|
||||||
|
# echo "spawn worker"
|
||||||
|
tp.spawn worker(data, sig)
|
||||||
|
|
||||||
|
await wait(sig)
|
||||||
|
echo "data: ", data.addr.pointer.repr
|
||||||
|
echo "data: ", data
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
proc runTests(tp: TaskPool, sig: ThreadSignalPtr) {.async.} =
|
||||||
|
var futs = newSeq[Future[void]]()
|
||||||
|
for i in 1..10_000:
|
||||||
|
let f = runTest(tp, sig, i).wait(100.milliseconds)
|
||||||
|
# futs.add f
|
||||||
|
await f
|
||||||
|
GC_fullCollect()
|
||||||
|
await allFutures(futs)
|
||||||
|
|
||||||
|
suite "async tests":
|
||||||
|
var tp = Taskpool.new(num_threads = 4) # Default to the number of hardware threads.
|
||||||
|
let sig = ThreadSignalPtr.new().get()
|
||||||
|
|
||||||
|
asyncTest "test":
|
||||||
|
await runTests(tp, sig)
|
||||||
|
# os.sleep(10_000)
|
||||||
214
tests/exampleGcFailures/example.nim
Normal file
214
tests/exampleGcFailures/example.nim
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
while true:
|
||||||
|
block :stateLoop:
|
||||||
|
try:
|
||||||
|
var
|
||||||
|
:tmpD
|
||||||
|
:tmpD_1
|
||||||
|
:tmpD_2
|
||||||
|
:tmpD_3
|
||||||
|
closureIterSetupExc(:envP.`:curExc1`)
|
||||||
|
goto :envP.`:state`6
|
||||||
|
state 0:
|
||||||
|
template result(): auto {.used.} =
|
||||||
|
{.fatal: "You should not reference the `result` variable inside" &
|
||||||
|
" a void async proc".}
|
||||||
|
|
||||||
|
var :envP.closureSucceeded25 = true
|
||||||
|
:envP.`:state` = 1
|
||||||
|
break :stateLoop
|
||||||
|
state 1:
|
||||||
|
var :envP.obj117 =
|
||||||
|
type
|
||||||
|
OutType`gensym13 = typeof(items("hello world! " & $i))
|
||||||
|
block :tmp:
|
||||||
|
:tmpD =
|
||||||
|
let :envP.`:tmp5` = `&`("hello world! ", $:envP.`:up`.i1)
|
||||||
|
template s2_838861096(): untyped =
|
||||||
|
:tmp_1
|
||||||
|
|
||||||
|
var :envP.i`gensym1310 = 0
|
||||||
|
var :envP.result`gensym139 = newSeq(
|
||||||
|
chckRange(len(:envP.`:tmp5`), 0, 9223372036854775807))
|
||||||
|
block :tmp_2:
|
||||||
|
var :envP.it`gensym138
|
||||||
|
var :envP.i6 = 0
|
||||||
|
let :envP.L7 = len(:envP.`:tmp5`)
|
||||||
|
block :tmp_3:
|
||||||
|
while :envP.i6 < :envP.L7:
|
||||||
|
:envP.it`gensym138 = :envP.`:tmp5`[:envP.i6]
|
||||||
|
:envP.result`gensym139[:envP.i`gensym1310] = :envP.it`gensym138
|
||||||
|
:envP.i`gensym1310 += 1
|
||||||
|
inc(:envP.i6, 1)
|
||||||
|
const
|
||||||
|
loc`gensym22 = (filename: "/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim",
|
||||||
|
line: 258, column: 10)
|
||||||
|
ploc`gensym22 = "/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim(258, 11)"
|
||||||
|
bind instantiationInfo
|
||||||
|
mixin failedAssertImpl
|
||||||
|
{.line: (filename: "/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim",
|
||||||
|
line: 258, column: 10).}:
|
||||||
|
if not (len(:envP.`:tmp5`) == :envP.L7):
|
||||||
|
failedAssertImpl("/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim(258, 11) `len(a) == L` the length of the string changed while iterating over it")
|
||||||
|
:envP.result`gensym139
|
||||||
|
:tmpD
|
||||||
|
var :envP.obj218 =
|
||||||
|
type
|
||||||
|
OutType`gensym20 = typeof(items("goodbye denver!"))
|
||||||
|
block :tmp_4:
|
||||||
|
:tmpD_1 =
|
||||||
|
let :envP.`:tmp11` = "goodbye denver!"
|
||||||
|
template s2_838861153(): untyped =
|
||||||
|
:tmp_5
|
||||||
|
|
||||||
|
var :envP.i`gensym2016 = 0
|
||||||
|
var :envP.result`gensym2015 = newSeq(
|
||||||
|
chckRange(len(:envP.`:tmp11`), 0, 9223372036854775807))
|
||||||
|
block :tmp_6:
|
||||||
|
var :envP.it`gensym2014
|
||||||
|
var :envP.i12 = 0
|
||||||
|
let :envP.L13 = len(:envP.`:tmp11`)
|
||||||
|
block :tmp_7:
|
||||||
|
while :envP.i12 < :envP.L13:
|
||||||
|
:envP.it`gensym2014 = :envP.`:tmp11`[:envP.i12]
|
||||||
|
:envP.result`gensym2015[:envP.i`gensym2016] = :envP.it`gensym2014
|
||||||
|
:envP.i`gensym2016 += 1
|
||||||
|
inc(:envP.i12, 1)
|
||||||
|
const
|
||||||
|
loc`gensym22 = (filename: "/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim",
|
||||||
|
line: 258, column: 10)
|
||||||
|
ploc`gensym22 = "/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim(258, 11)"
|
||||||
|
bind instantiationInfo
|
||||||
|
mixin failedAssertImpl
|
||||||
|
{.line: (filename: "/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim",
|
||||||
|
line: 258, column: 10).}:
|
||||||
|
if not (len(:envP.`:tmp11`) == :envP.L13):
|
||||||
|
failedAssertImpl("/Users/elcritch/.asdf/installs/nim/1.6.18/lib/system/iterators.nim(258, 11) `len(a) == L` the length of the string changed while iterating over it")
|
||||||
|
:envP.result`gensym2015
|
||||||
|
:tmpD_1
|
||||||
|
var :envP.data20 = @[
|
||||||
|
:tmpD_2 = :envP.obj117
|
||||||
|
:tmpD_2,
|
||||||
|
:tmpD_3 = :envP.obj218
|
||||||
|
:tmpD_3]
|
||||||
|
block :tmp_8:
|
||||||
|
var
|
||||||
|
:tmpD_4
|
||||||
|
:tmpD_5
|
||||||
|
:tmpD_6
|
||||||
|
:tmpD_7
|
||||||
|
let :envP.taskNode23 = new(TaskNode, workerContext.currentTask) do:
|
||||||
|
type
|
||||||
|
ScratchObj = object
|
||||||
|
data: seq[seq[char]]
|
||||||
|
sig: ThreadSignalPtr
|
||||||
|
|
||||||
|
let :envP.scratch19 = cast[ptr ScratchObj](c_calloc(1'u, 16'u))
|
||||||
|
if isNil(:envP.scratch19):
|
||||||
|
raise
|
||||||
|
(ref OutOfMemDefect)(msg: "Could not allocate memory", parent: nil)
|
||||||
|
block :tmp_9:
|
||||||
|
var
|
||||||
|
:tmpD_8
|
||||||
|
:tmpD_9
|
||||||
|
`=sink`(:envP.isoTemp21, isolate do:
|
||||||
|
:tmpD_8 = :envP.data20
|
||||||
|
:tmpD_8)
|
||||||
|
:envP.scratch19.data = extract(:envP.isoTemp21)
|
||||||
|
`=sink_1`(:envP.isoTemp22, isolate do:
|
||||||
|
:tmpD_9 = :envP.`:up`.sig2
|
||||||
|
:tmpD_9)
|
||||||
|
:envP.scratch19.sig = extract_1(:envP.isoTemp22)
|
||||||
|
proc worker_838861197(args`gensym27: pointer) {.gcsafe, nimcall,
|
||||||
|
raises: [].} =
|
||||||
|
let objTemp = cast[ptr ScratchObj](args`gensym27)
|
||||||
|
let data_1 = objTemp.data
|
||||||
|
let sig_1 = objTemp.sig
|
||||||
|
worker(data_1, sig_1)
|
||||||
|
|
||||||
|
proc destroyScratch_838861198(args`gensym27_1: pointer) {.gcsafe,
|
||||||
|
nimcall, raises: [].} =
|
||||||
|
let obj = cast[ptr ScratchObj](args`gensym27_1)
|
||||||
|
`=destroy`(obj[])
|
||||||
|
|
||||||
|
Task(callback:
|
||||||
|
:tmpD_4 = worker_1
|
||||||
|
:tmpD_4, args:
|
||||||
|
:tmpD_5 = :envP.scratch19
|
||||||
|
:tmpD_5, destroy:
|
||||||
|
:tmpD_6 = destroyScratch
|
||||||
|
:tmpD_6)
|
||||||
|
schedule(workerContext):
|
||||||
|
:tmpD_7 = :envP.taskNode23
|
||||||
|
:tmpD_7
|
||||||
|
chronosInternalRetFuture.internalChild = FutureBase(wait(:envP.`:up`.sig2))
|
||||||
|
:envP.`:state` = 4
|
||||||
|
return chronosInternalRetFuture.internalChild
|
||||||
|
state 2:
|
||||||
|
:envP.`:curExc1` = nil
|
||||||
|
if of(getCurrentException(), CancelledError):
|
||||||
|
:envP.closureSucceeded25 = false
|
||||||
|
cancelAndSchedule(FutureBase(cast[Future[void]](chronosInternalRetFuture)),
|
||||||
|
srcLocImpl("", "exNoFailureSeqSeq.nim", 43))
|
||||||
|
elif of(getCurrentException(), CatchableError):
|
||||||
|
let :envP.exc26 = getCurrentException()
|
||||||
|
:envP.closureSucceeded25 = false
|
||||||
|
failImpl(FutureBase(cast[Future[void]](chronosInternalRetFuture)),
|
||||||
|
:envP.exc26, srcLocImpl("", "exNoFailureSeqSeq.nim", 43))
|
||||||
|
elif of(getCurrentException(), Defect):
|
||||||
|
var :tmpD_10
|
||||||
|
let :envP.exc27 = getCurrentException()
|
||||||
|
:envP.closureSucceeded25 = false
|
||||||
|
raise
|
||||||
|
:tmpD_10 = :envP.exc27
|
||||||
|
:tmpD_10
|
||||||
|
else:
|
||||||
|
:envP.`:unrollFinally3` = true
|
||||||
|
:envP.`:curExc1` = getCurrentException()
|
||||||
|
:envP.`:state` = 3
|
||||||
|
break :stateLoop
|
||||||
|
:envP.`:state` = 3
|
||||||
|
break :stateLoop
|
||||||
|
state 3:
|
||||||
|
if :envP.closureSucceeded25:
|
||||||
|
complete(cast[Future[void]](chronosInternalRetFuture),
|
||||||
|
srcLocImpl("", "exNoFailureSeqSeq.nim", 43))
|
||||||
|
if :envP.`:unrollFinally3`:
|
||||||
|
if `==`(:envP.`:curExc1`, nil):
|
||||||
|
:envP.`:state` = -1
|
||||||
|
return result = :envP.`:tmpResult2`
|
||||||
|
else:
|
||||||
|
var :tmpD_11
|
||||||
|
closureIterSetupExc(nil)
|
||||||
|
raise
|
||||||
|
:tmpD_11 = :envP.`:curExc1`
|
||||||
|
:tmpD_11
|
||||||
|
:envP.`:state` = 6
|
||||||
|
break :stateLoop
|
||||||
|
state 4:
|
||||||
|
{.cast(raises: [AsyncError, CancelledError]).}:
|
||||||
|
if isNil(cast[type(wait(sig_2))](chronosInternalRetFuture.internalChild).internalError):
|
||||||
|
discard
|
||||||
|
else:
|
||||||
|
var :tmpD_12
|
||||||
|
raise
|
||||||
|
:tmpD_12 = cast[type(wait(sig_2))](chronosInternalRetFuture.internalChild).internalError
|
||||||
|
:tmpD_12
|
||||||
|
:envP.`:state` = 5
|
||||||
|
break :stateLoop
|
||||||
|
state 5:
|
||||||
|
echo ["data: ", repr(pointer(addr(:envP.data20)))]
|
||||||
|
echo ["data: ", `$_1`(:envP.data20)]
|
||||||
|
echo [""]
|
||||||
|
:envP.`:state` = 3
|
||||||
|
break :stateLoop
|
||||||
|
state 6:
|
||||||
|
:envP.`:state` = -1
|
||||||
|
break :stateLoop
|
||||||
|
except:
|
||||||
|
:envP.`:state` = [0, -2, 3, 0, -2, -2, 0][:envP.`:state`]
|
||||||
|
if `==`(:envP.`:state`, 0):
|
||||||
|
raise
|
||||||
|
:envP.`:unrollFinally3` = `<`(0, :envP.`:state`)
|
||||||
|
if `<`(:envP.`:state`, 0):
|
||||||
|
:envP.`:state` = `-`(:envP.`:state`)
|
||||||
|
:envP.`:curExc1` = getCurrentException()
|
||||||
25
tests/exampleGcFailures/test.nim
Normal file
25
tests/exampleGcFailures/test.nim
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
|
||||||
|
when false:
|
||||||
|
type AnObject* = object of RootObj
|
||||||
|
value*: int
|
||||||
|
|
||||||
|
proc mutate(a: sink AnObject) =
|
||||||
|
a.value = 1
|
||||||
|
|
||||||
|
var obj = AnObject(value: 42)
|
||||||
|
mutate(obj)
|
||||||
|
doAssert obj.value == 42
|
||||||
|
|
||||||
|
else:
|
||||||
|
type AnObject = object of RootObj
|
||||||
|
value*: int
|
||||||
|
|
||||||
|
proc `=destroy`(x: var AnObject) =
|
||||||
|
echo "DEST"
|
||||||
|
|
||||||
|
proc mutate(a: sink AnObject) =
|
||||||
|
a.value = 1
|
||||||
|
|
||||||
|
var obj = AnObject(value: 42)
|
||||||
|
mutate(obj)
|
||||||
|
doAssert obj.value == 42
|
||||||
Loading…
x
Reference in New Issue
Block a user