nim-datastore/tests/datastore/testthreadproxyds.nim

250 lines
6.5 KiB
Nim
Raw Permalink Normal View History

2023-08-24 19:51:04 -07:00
import std/options
import std/sequtils
import std/os
2023-09-13 14:43:25 -06:00
import std/cpuinfo
2023-08-28 21:45:55 -07:00
import std/algorithm
2023-09-14 17:47:37 -06:00
import std/importutils
2023-08-24 19:51:04 -07:00
2023-09-07 17:33:33 -06:00
import pkg/asynctest
2023-08-24 19:51:04 -07:00
import pkg/chronos
2023-09-15 13:08:38 -06:00
import pkg/chronos/threadsync
2023-08-24 19:51:04 -07:00
import pkg/stew/results
import pkg/stew/byteutils
2023-09-08 15:09:15 -06:00
import pkg/taskpools
2023-09-14 17:47:37 -06:00
import pkg/questionable/results
2023-09-15 13:08:38 -06:00
import pkg/chronicles
2023-09-26 16:12:55 -07:00
import pkg/threading/smartptrs
2023-09-28 14:51:27 -07:00
import pkg/threading/atomics
2023-08-24 19:51:04 -07:00
2023-09-14 17:47:37 -06:00
import pkg/datastore/fsds
import pkg/datastore/sql/sqliteds
2023-09-14 17:47:37 -06:00
import pkg/datastore/threads/threadproxyds {.all.}
2023-08-24 19:51:04 -07:00
import ./dscommontests
2023-09-13 14:43:25 -06:00
import ./querycommontests
2023-08-24 19:51:04 -07:00
2023-09-27 12:54:22 -07:00
const
NumThreads = 20 # IO threads aren't attached to CPU count
2023-09-27 13:35:14 -07:00
ThreadTestLoops {.intdefine.} = 1
2023-09-27 12:55:48 -07:00
N = ThreadTestLoops
2023-09-27 13:35:14 -07:00
ThreadTestInnerLoops {.intdefine.} = 1
M = ThreadTestInnerLoops
2023-09-27 12:54:22 -07:00
2023-09-27 16:01:40 -07:00
var
taskPool: Taskpool = Taskpool.new(NumThreads)
2023-09-27 16:17:02 -07:00
for i in 1..N:
2023-09-27 15:30:21 -07:00
suite "Test Basic ThreadDatastore with SQLite " & $i:
2023-09-27 12:54:22 -07:00
var
2023-09-27 14:01:21 -07:00
sqlStore: SQLiteBackend[KeyId, DataBuffer]
ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
2023-09-27 12:54:22 -07:00
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
teardown:
GC_fullCollect()
teardownAll:
(await ds.close()).tryGet()
2023-09-27 14:01:21 -07:00
for i in 1..M:
basicStoreTests(ds, key, bytes, otherBytes)
2023-09-27 12:54:22 -07:00
GC_fullCollect()
2023-09-27 16:01:40 -07:00
2023-09-27 12:54:22 -07:00
for i in 1..N:
2023-09-27 15:30:21 -07:00
suite "Test Query ThreadDatastore with SQLite " & $i:
2023-09-27 12:54:22 -07:00
var
2023-09-27 14:01:21 -07:00
sqlStore: SQLiteBackend[KeyId, DataBuffer]
2023-09-27 16:01:40 -07:00
# taskPool: Taskpool
2023-09-27 14:01:21 -07:00
ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
2023-09-27 12:54:22 -07:00
setup:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
2023-09-27 16:01:40 -07:00
# taskPool = Taskpool.new(NumThreads)
2023-09-27 12:54:22 -07:00
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
teardown:
GC_fullCollect()
(await ds.close()).tryGet()
2023-09-27 13:35:14 -07:00
for i in 1..M:
queryTests(ds, true)
2023-09-27 12:54:22 -07:00
GC_fullCollect()
2023-08-24 19:51:04 -07:00
suite "Test Basic ThreadDatastore with fsds":
let
path = currentSourcePath() # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
2023-09-15 16:41:03 -06:00
var
fsStore: FSDatastore[KeyId, DataBuffer]
ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]]
2023-09-15 16:41:03 -06:00
setupAll:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
2023-09-15 16:41:03 -06:00
fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet()
ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet()
2023-09-15 16:41:03 -06:00
teardown:
GC_fullCollect()
teardownAll:
(await ds.close()).tryGet()
2023-09-15 16:41:03 -06:00
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
basicStoreTests(ds, key, bytes, otherBytes)
2023-09-15 16:41:03 -06:00
2023-09-28 13:24:55 -07:00
suite "Test Query ThreadDatastore with fsds":
let
path = currentSourcePath() # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
2023-09-15 16:41:03 -06:00
2023-09-28 13:24:55 -07:00
var
fsStore: FSDatastore[KeyId, DataBuffer]
ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]]
2023-09-15 16:41:03 -06:00
2023-09-28 13:24:55 -07:00
setup:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
2023-09-15 16:41:03 -06:00
2023-09-28 13:24:55 -07:00
fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 5).tryGet()
ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet()
2023-09-15 16:41:03 -06:00
2023-09-28 13:24:55 -07:00
teardown:
GC_fullCollect()
(await ds.close()).tryGet()
2023-08-24 19:51:04 -07:00
2023-09-28 13:24:55 -07:00
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
2023-08-24 19:51:04 -07:00
2023-09-28 13:24:55 -07:00
queryTests(ds, false)
2023-08-29 17:00:11 -07:00
2023-09-28 15:05:07 -07:00
for i in 1..N:
suite "Test ThreadDatastore cancelations":
2023-09-28 13:47:48 -07:00
var
2023-09-28 15:05:07 -07:00
sqlStore: SQLiteBackend[KeyId,DataBuffer]
sds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
2023-09-28 13:47:48 -07:00
2023-09-28 15:05:07 -07:00
privateAccess(ThreadDatastore) # expose private fields
privateAccess(TaskCtx) # expose private fields
2023-09-28 13:47:48 -07:00
2023-09-28 15:05:07 -07:00
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
sds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
2023-09-28 13:47:48 -07:00
2023-09-28 15:05:07 -07:00
teardown:
GC_fullCollect() # run full collect after each test
2023-09-26 16:12:55 -07:00
2023-09-28 15:05:07 -07:00
test "Should monitor signal and cancel":
var
signal = ThreadSignalPtr.new().tryGet()
2023-09-28 14:25:25 -07:00
2023-09-28 15:05:07 -07:00
proc cancelTestTask(ctx: TaskCtx[bool]) {.gcsafe.} =
executeTask(ctx):
(?!bool).ok(true)
2023-09-28 14:51:27 -07:00
2023-09-28 15:05:07 -07:00
let ctx = newTaskCtx(bool, signal=signal)
ctx[].cancelled = true
dispatchTask(sds, signal):
sds.tp.spawn cancelTestTask(ctx)
check:
ctx[].res.isErr == true
ctx[].cancelled == true
ctx[].running == false
test "Should cancel future":
var
signal = ThreadSignalPtr.new().tryGet()
ms {.global.}: MutexSignal
flag {.global.}: Atomic[bool]
futFreed {.global.}: Atomic[bool]
ready {.global.}: Atomic[bool]
ms.init()
type
FutTestObj = object
val: int
TestValue = object
ThreadTestInt = (TestValue, )
proc `=destroy`(obj: var TestValue) =
# echo "destroy TestObj!"
flag.store(true)
proc `=destroy`(obj: var FutTestObj) =
# echo "destroy FutTestObj!"
futFreed.store(true)
proc wait(flag: var Atomic[bool], name = "task") =
# echo "wait for " & name & " to be ready..."
2023-09-28 15:07:40 -07:00
# defer: echo ""
2023-09-28 15:05:07 -07:00
for i in 1..100:
# stdout.write(".")
if flag.load() == true:
return
os.sleep(10)
raise newException(Defect, "timeout")
proc errorTestTask(ctx: TaskCtx[ThreadTestInt]) {.gcsafe, nimcall.} =
executeTask(ctx):
# echo "task:exec"
discard ctx[].signal.fireSync()
ready.store(true)
ms.wait()
2023-09-28 15:07:40 -07:00
echo "task context memory: ", ctx[]
2023-09-28 15:05:07 -07:00
(?!ThreadTestInt).ok(default(ThreadTestInt))
proc runTestTask() {.async.} =
let obj = FutTestObj(val: 42)
await sleepAsync(1.milliseconds)
try:
let ctx = newTaskCtx(ThreadTestInt, signal=signal)
dispatchTask(sds, signal):
sds.tp.spawn errorTestTask(ctx)
ready.wait()
# echo "raise error"
raise newException(ValueError, "fake error")
finally:
# echo "fut FutTestObj: ", obj
assert obj.val == 42 # need to force future to keep ref here
try:
block:
await runTestTask()
except CatchableError as exc:
# echo "caught: ", $exc
discard
finally:
# echo "finish"
check ready.load() == true
GC_fullCollect()
futFreed.wait("futFreed")
echo "future freed it's mem!"
check futFreed.load() == true
ms.fire()
flag.wait("flag")
check flag.load() == true