import std/options import std/sequtils import std/os import std/cpuinfo import std/algorithm import std/importutils import pkg/asynctest import pkg/chronos import pkg/chronos/threadsync import pkg/stew/results import pkg/stew/byteutils import pkg/taskpools import pkg/questionable/results import pkg/chronicles import pkg/threading/smartptrs import pkg/datastore/sql/sqliteds import pkg/datastore/fsds import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests import ./querycommontests const NumThreads = 200 # IO threads aren't attached to CPU count suite "Test Basic ThreadProxyDatastore": var sqlStore: SQLiteBackend[KeyId,DataBuffer] ds: ThreadDatastore taskPool: Taskpool key = Key.init("/a").tryGet() data = "some bytes".toBytes setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() teardownAll: echo "teardown done" test "check put": echo "\n\n=== put ===" let res1 = await ds.put(key, data) echo "res1: ", res1.repr check res1.isOk test "check get": echo "\n\n=== get ===" echo "get send key: ", key.repr let res2 = await ds.get(key) echo "get key post: ", key.repr echo "get res2: ", res2.repr echo res2.get() == data var val = "" for c in res2.get(): val &= char(c) echo "get res2: ", $val suite "Test Basic ThreadDatastore with SQLite": var sqlStore: SQLiteBackend[KeyId,DataBuffer] ds: ThreadDatastore taskPool: Taskpool key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes setupAll: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() teardown: GC_fullCollect() teardownAll: (await ds.close()).tryGet() taskPool.shutdown() basicStoreTests(ds, key, bytes, otherBytes) suite "Test Query ThreadDatastore with SQLite": var sqlStore: SQLiteBackend[KeyId,DataBuffer] ds: ThreadDatastore taskPool: Taskpool key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes setup: sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() teardown: GC_fullCollect() (await ds.close()).tryGet() taskPool.shutdown() queryTests(ds, true) # suite "Test Basic ThreadDatastore with fsds": # let # path = currentSourcePath() # get this file's name # basePath = "tests_data" # basePathAbs = path.parentDir / basePath # key = Key.init("/a/b").tryGet() # bytes = "some bytes".toBytes # otherBytes = "some other bytes".toBytes # var # fsStore: FSDatastore # ds: ThreadDatastore # taskPool: Taskpool # setupAll: # removeDir(basePathAbs) # require(not dirExists(basePathAbs)) # createDir(basePathAbs) # fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() # taskPool = Taskpool.new(NumThreads) # ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() # teardown: # GC_fullCollect() # teardownAll: # (await ds.close()).tryGet() # taskPool.shutdown() # removeDir(basePathAbs) # require(not dirExists(basePathAbs)) # basicStoreTests(fsStore, key, bytes, otherBytes) # suite "Test Query ThreadDatastore with fsds": # let # path = currentSourcePath() # get this file's name # basePath = "tests_data" # basePathAbs = path.parentDir / basePath # var # fsStore: FSDatastore # ds: ThreadDatastore # taskPool: Taskpool # setup: # removeDir(basePathAbs) # require(not dirExists(basePathAbs)) # createDir(basePathAbs) # fsStore = FSDatastore.new(root = basePathAbs, depth = 5).tryGet() # taskPool = Taskpool.new(NumThreads) # ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() # teardown: # GC_fullCollect() # (await ds.close()).tryGet() # taskPool.shutdown() # removeDir(basePathAbs) # require(not dirExists(basePathAbs)) # queryTests(ds, false) # suite "Test ThreadDatastore cancelations": # var # sqlStore: SQLiteBackend[KeyId,DataBuffer] # ds: ThreadDatastore # taskPool: Taskpool # privateAccess(ThreadDatastore) # expose private fields # privateAccess(TaskCtx) # expose private fields # setupAll: # sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() # taskPool = Taskpool.new(NumThreads) # ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() # teardown: # GC_fullCollect() # run full collect after each test # teardownAll: # (await ds.close()).tryGet() # taskPool.shutdown() # test "Should monitor signal and cancel": # var # signal = ThreadSignalPtr.new().tryGet() # res = ThreadResult[void]() # ctx = newSharedPtr(TaskCtxObj[void](signal: signal)) # fut = newFuture[void]("signalMonitor") # threadArgs = (addr ctx, addr fut) # thread: Thread[type threadArgs] # proc threadTask(args: type threadArgs) = # var (ctx, fut) = args # proc asyncTask() {.async.} = # let # monitor = signalMonitor(ctx, fut[]) # await monitor # waitFor asyncTask() # createThread(thread, threadTask, threadArgs) # ctx.cancelled = true # check: ctx.signal.fireSync.tryGet # joinThreads(thread) # check: fut.cancelled # check: ctx.signal.close().isOk # fut = nil # test "Should monitor and not cancel": # var # signal = ThreadSignalPtr.new().tryGet() # res = ThreadResult[void]() # ctx = TaskCtx[void]( # ds: sqlStore, # res: addr res, # signal: signal) # fut = newFuture[void]("signalMonitor") # threadArgs = (addr ctx, addr fut) # thread: Thread[type threadArgs] # proc threadTask(args: type threadArgs) = # var (ctx, fut) = args # proc asyncTask() {.async.} = # let # monitor = signalMonitor(ctx, fut[]) # await monitor # waitFor asyncTask() # createThread(thread, threadTask, threadArgs) # ctx.cancelled = false # check: ctx.signal.fireSync.tryGet # joinThreads(thread) # check: not fut.cancelled # check: ctx.signal.close().isOk # fut = nil