diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index c8e98ad..b573bf1 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -203,7 +203,7 @@ method put*[BT](self: ThreadDatastore[BT], return ctx[].res.toRes() -method put*( +method put*[DB]( self: ThreadDatastore, batch: seq[BatchEntry]): Future[?!void] {.async.} = ## put batch data @@ -214,7 +214,7 @@ method put*( return success() -proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; +method getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; key: KeyId) {.gcsafe, nimcall.} = ## run backend command executeTask(ctx): @@ -242,9 +242,7 @@ method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = type QResult = DbQueryResponse[KeyId, DataBuffer] -import os - -proc queryTask[DB]( +method queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], @@ -339,6 +337,7 @@ method query*[BT](self: ThreadDatastore[BT], trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() discard ctx[].signal.close() + echo "nextSignal:CLOSE!" discard nextSignal.close() self.semaphore.release() raise exc @@ -348,6 +347,7 @@ method query*[BT](self: ThreadDatastore[BT], except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg discard signal.close() + echo "nextSignal:CLOSE!" discard nextSignal.close() self.semaphore.release() raise exc @@ -362,8 +362,5 @@ proc new*[DB](self: type ThreadDatastore, success ThreadDatastore[DB]( tp: tp, backend: db, - # TODO: are these needed anymore?? - # withLocks: withLocks, - # queryLock: newAsyncLock(), semaphore: AsyncSemaphore.new(tp.numThreads - 1) ) diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 72758b0..f415ba6 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -22,74 +22,62 @@ import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests import ./querycommontests -const NumThreads = 20 # IO threads aren't attached to CPU count +const + NumThreads = 20 # IO threads aren't attached to CPU count + ThreadTestLoops {.intdefine.} = 10 + N = ThreadTestLoops -suite "Test Basic ThreadProxyDatastore": +for i in 1..N: + suite "Test Basic ThreadDatastore with SQLite": - var - key = Key.init("/a").tryGet() - data = "some bytes".toBytes - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + var + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes - teardownAll: - echo "teardown done" + setupAll: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - test "check put": - echo "\n\n=== put ===" - let res1 = await ds.put(key, data) - echo "res1: ", res1.repr - check res1.isOk + teardown: + GC_fullCollect() - test "check get": - echo "\n\n=== get ===" - echo "get send key: ", key.repr - let res2 = await ds.get(key) - echo "get key post: ", key.repr - echo "get res2: ", res2.repr - echo res2.get() == data - var val = "" - for c in res2.get(): - val &= char(c) - echo "get res2: ", $val + teardownAll: + (await ds.close()).tryGet() + taskPool.shutdown() -suite "Test Basic ThreadDatastore with SQLite": + basicStoreTests(ds, key, bytes, otherBytes) + GC_fullCollect() - var - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes +for i in 1..N: + suite "Test Query ThreadDatastore with SQLite": - teardown: - GC_fullCollect() + var + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes + - teardownAll: - (await ds.close()).tryGet() - taskPool.shutdown() + setup: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - basicStoreTests(ds, key, bytes, otherBytes) + teardown: + GC_fullCollect() -suite "Test Query ThreadDatastore with SQLite": + (await ds.close()).tryGet() + taskPool.shutdown() - var - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes - - teardown: - GC_fullCollect() - - (await ds.close()).tryGet() - taskPool.shutdown() - - queryTests(ds, true) + queryTests(ds, true) + GC_fullCollect() # suite "Test Basic ThreadDatastore with fsds": # let