diff --git a/datastore.nimble b/datastore.nimble index 3b5b28e..bcd3139 100644 --- a/datastore.nimble +++ b/datastore.nimble @@ -9,7 +9,7 @@ license = "Apache License 2.0 or MIT" requires "nim >= 1.6.14", "asynctest >= 0.3.1 & < 0.4.0", "chronos#0277b65be2c7a365ac13df002fba6e172be55537", - "questionable >= 0.10.3 & < 0.11.0", + "questionable", "sqlite3_abi", "stew", "unittest2", diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 772dc72..8be8a8a 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -36,50 +36,41 @@ logScope: type - ThreadBackendKinds* = enum - Sqlite - # Filesystem - - ThreadBackend* = object - ## backend case type to avoid needing to make ThreadDatastore generic - case kind*: ThreadBackendKinds - of Sqlite: - sql*: SQLiteBackend[KeyId,DataBuffer] - TaskCtxObj*[T: ThreadTypes] = object res: ThreadResult[T] signal: ThreadSignalPtr running: bool ## used to mark when a task worker is running cancelled: bool ## used to cancel a task before it's started + nextSignal: ThreadSignalPtr TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] ## Task context object. ## This is a SharedPtr to make the query iter simpler - ThreadDatastore* = ref object of Datastore + ThreadDatastore*[BT] = ref object of Datastore tp: Taskpool - backend: ThreadBackend + backend: BT semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors -var ctxLock: Lock -ctxLock.initLock() +proc newTaskCtx*[T](tp: typedesc[T], + signal: ThreadSignalPtr, + nextSignal: ThreadSignalPtr = nil): TaskCtx[T] = + newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal)) proc setCancelled[T](ctx: TaskCtx[T]) = - # withLock(ctxLock): ctx[].cancelled = true proc setRunning[T](ctx: TaskCtx[T]): bool = - # withLock(ctxLock): if ctx[].cancelled: return false ctx[].running = true return true proc setDone[T](ctx: TaskCtx[T]) = - # withLock(ctxLock): ctx[].running = false proc acquireSignal(): ?!ThreadSignalPtr = + # echo "signal:OPEN!" let signal = ThreadSignalPtr.new() if signal.isErr(): failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error()) @@ -113,33 +104,31 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) = ctx.setDone() discard ctx[].signal.fireSync() -template dispatchTaskWrap[T](self: ThreadDatastore, - signal: ThreadSignalPtr, - blk: untyped - ): auto = - case self.backend.kind: - of Sqlite: - var ds {.used, inject.} = self.backend.sql - proc runTask() = - `blk` - runTask() - await wait(ctx[].signal) +template dispatchTaskWrap[BT](self: ThreadDatastore[BT], + signal: ThreadSignalPtr, + blk: untyped + ): auto = + var ds {.used, inject.} = self.backend + proc runTask() = + `blk` + runTask() + await wait(ctx[].signal) -template dispatchTask[T](self: ThreadDatastore, +template dispatchTask[BT](self: ThreadDatastore[BT], signal: ThreadSignalPtr, blk: untyped - ): auto = + ): auto = ## handles dispatching a task from an async context ## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## note that `ds` is a generic - let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) try: - dispatchTaskWrap[T](self, signal, blk) + dispatchTaskWrap[BT](self, signal, blk) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() raise exc finally: + # echo "signal:CLOSE!" discard ctx[].signal.close() self.semaphore.release() @@ -149,38 +138,39 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} = executeTask(ctx): has(ds, key) -method has*(self: ThreadDatastore, - key: Key): Future[?!bool] {.async.} = +method has*[BT](self: ThreadDatastore[BT], + key: Key): Future[?!bool] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[bool](self, signal): + let ctx = newTaskCtx(bool, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn hasTask(ctx, ds, key) return ctx[].res.toRes(v => v) - -proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; +method deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; key: KeyId) {.gcsafe.} = ## run backend command executeTask(ctx): delete(ds, key) -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], key: Key): Future[?!void] {.async.} = ## delete key await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[void](self, signal): + let ctx = newTaskCtx(void, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn deleteTask(ctx, ds, key) return ctx[].res.toRes() -method delete*(self: ThreadDatastore, +method delete*[BT](self: ThreadDatastore[BT], keys: seq[Key]): Future[?!void] {.async.} = ## delete batch for key in keys: @@ -196,7 +186,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB; executeTask(ctx): put(ds, key, data) -method put*(self: ThreadDatastore, +method put*[BT](self: ThreadDatastore[BT], key: Key, data: seq[byte]): Future[?!void] {.async.} = ## put key with data @@ -204,15 +194,16 @@ method put*(self: ThreadDatastore, without signal =? acquireSignal(), err: return failure err - dispatchTask[void](self, signal): + let ctx = newTaskCtx(void, signal=signal) + dispatchTask(self, signal): let key = KeyId.new key.id() let data = DataBuffer.new data self.tp.spawn putTask(ctx, ds, key, data) return ctx[].res.toRes() -method put*( - self: ThreadDatastore, +method put*[DB]( + self: ThreadDatastore[DB], batch: seq[BatchEntry]): Future[?!void] {.async.} = ## put batch data for entry in batch: @@ -222,42 +213,38 @@ method put*( return success() -proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; +method getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB; key: KeyId) {.gcsafe, nimcall.} = ## run backend command executeTask(ctx): let res = get(ds, key) res -method get*(self: ThreadDatastore, - key: Key, - ): Future[?!seq[byte]] {.async.} = +method get*[BT](self: ThreadDatastore[BT], + key: Key, + ): Future[?!seq[byte]] {.async.} = await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - let key = KeyId.new key.id() - dispatchTask[DataBuffer](self, signal): + let ctx = newTaskCtx(DataBuffer, signal=signal) + dispatchTask(self, signal): + let key = KeyId.new key.id() self.tp.spawn getTask(ctx, ds, key) return ctx[].res.toRes(v => v.toSeq()) -method close*(self: ThreadDatastore): Future[?!void] {.async.} = +method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} = await self.semaphore.closeAll() - case self.backend.kind: - of Sqlite: - self.backend.sql.close() + self.backend.close() type QResult = DbQueryResponse[KeyId, DataBuffer] -import os - -proc queryTask[DB]( +method queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], - nextSignal: ThreadSignalPtr ) {.gcsafe, nimcall.} = ## run query command executeTask(ctx): @@ -271,8 +258,8 @@ proc queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - if not nextSignal.waitSync(10.seconds).get(): - raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") var handle = handleRes.get() for item in handle.iter(): @@ -287,13 +274,13 @@ proc queryTask[DB]( exc discard ctx[].signal.fireSync() - - discard nextSignal.waitSync().get() + if not ctx[].nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "queryTask timed out") # set final result (?!QResult).ok((KeyId.none, DataBuffer())) -method query*(self: ThreadDatastore, +method query*[BT](self: ThreadDatastore[BT], q: Query ): Future[?!QueryIter] {.async.} = ## performs async query @@ -304,6 +291,16 @@ method query*(self: ThreadDatastore, return failure err without nextSignal =? acquireSignal(), err: return failure err + let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal) + + proc iterDispose() {.async.} = + # echo "signal:CLOSE!" + ctx.setCancelled() + await ctx[].nextSignal.fire() + discard ctx[].signal.close() + # echo "nextSignal:CLOSE!" + discard ctx[].nextSignal.close() + self.semaphore.release() try: let query = dbQuery( @@ -311,16 +308,17 @@ method query*(self: ThreadDatastore, value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) - dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - self.tp.spawn queryTask(ctx, ds, query, nextSignal) - await nextSignal.fire() + dispatchTaskWrap(self, signal): + self.tp.spawn queryTask(ctx, ds, query) + await ctx[].nextSignal.fire() - var - lock = newAsyncLock() # serialize querying under threads - iter = QueryIter.new() + var lock = newAsyncLock() # serialize querying under threads + var iter = QueryIter.new() + iter.dispose = proc (): Future[?!void] {.async.} = + iterDispose() + success() - proc next(): Future[?!QueryResponse] {.async.} = + iter.next = proc(): Future[?!QueryResponse] {.async.} = let ctx = ctx try: trace "About to query" @@ -330,12 +328,11 @@ method query*(self: ThreadDatastore, return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") await wait(ctx[].signal) - if not ctx[].running: iter.finished = true defer: - await nextSignal.fire() + await ctx[].nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -347,37 +344,24 @@ method query*(self: ThreadDatastore, except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() - discard ctx[].signal.close() - discard nextSignal.close() - self.semaphore.release() + await iterDispose() # todo: is this valid? raise exc - iter.next = next return success iter except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg - discard signal.close() - discard nextSignal.close() - self.semaphore.release() + await iterDispose() raise exc proc new*[DB](self: type ThreadDatastore, db: DB, withLocks = static false, tp: Taskpool - ): ?!ThreadDatastore = + ): ?!ThreadDatastore[DB] = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - when DB is SQLiteBackend[KeyId,DataBuffer]: - let backend = ThreadBackend(kind: Sqlite, sql: db) - else: - {.error: "unsupported backend: " & $typeof(db).} - - success ThreadDatastore( + success ThreadDatastore[DB]( tp: tp, - backend: backend, - # TODO: are these needed anymore?? - # withLocks: withLocks, - # queryLock: newAsyncLock(), + backend: db, semaphore: AsyncSemaphore.new(tp.numThreads - 1) ) diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim index da0df6d..3508e9e 100644 --- a/datastore/threads/threadresult.nim +++ b/datastore/threads/threadresult.nim @@ -1,5 +1,6 @@ import std/atomics import std/options +import std/locks import pkg/questionable/results import pkg/results @@ -51,3 +52,24 @@ proc toRes*[T,S](res: ThreadResult[T], result.err res.error().toExc() else: result.ok m(res.get()) + +type + MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool] + +proc init*(sig: var MutexSignal) = + sig.lock.initLock() + sig.cond.initCond() + sig.open = true + +proc wait*(sig: var MutexSignal) = + withLock(sig.lock): + wait(sig.cond, sig.lock) + +proc fire*(sig: var MutexSignal) = + withLock(sig.lock): + signal(sig.cond) + +proc close*(sig: var MutexSignal) = + if sig.open: + sig.lock.deinitLock() + sig.cond.deinitCond() diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 14a41b6..d59992c 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -22,88 +22,65 @@ import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests import ./querycommontests -const NumThreads = 20 # IO threads aren't attached to CPU count +const + NumThreads = 20 # IO threads aren't attached to CPU count + ThreadTestLoops {.intdefine.} = 1 + N = ThreadTestLoops + ThreadTestInnerLoops {.intdefine.} = 1 + M = ThreadTestInnerLoops -suite "Test Basic ThreadProxyDatastore": - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a").tryGet() - data = "some bytes".toBytes +var + taskPool: Taskpool = Taskpool.new(NumThreads) - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() +for i in 1..N: + suite "Test Basic ThreadDatastore with SQLite " & $i: - teardownAll: - echo "teardown done" + var + sqlStore: SQLiteBackend[KeyId, DataBuffer] + ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] + key = Key.init("/a/b").tryGet() + bytes = "some bytes".toBytes + otherBytes = "some other bytes".toBytes - test "check put": - echo "\n\n=== put ===" - let res1 = await ds.put(key, data) - echo "res1: ", res1.repr - check res1.isOk + setupAll: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + # taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - test "check get": - echo "\n\n=== get ===" - echo "get send key: ", key.repr - let res2 = await ds.get(key) - echo "get key post: ", key.repr - echo "get res2: ", res2.repr - echo res2.get() == data - var val = "" - for c in res2.get(): - val &= char(c) - echo "get res2: ", $val + teardown: + GC_fullCollect() -suite "Test Basic ThreadDatastore with SQLite": + teardownAll: + (await ds.close()).tryGet() + # taskPool.shutdown() - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes + for i in 1..M: + basicStoreTests(ds, key, bytes, otherBytes) + GC_fullCollect() - setupAll: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - teardown: - GC_fullCollect() +for i in 1..N: + suite "Test Query ThreadDatastore with SQLite " & $i: - teardownAll: - (await ds.close()).tryGet() - taskPool.shutdown() + var + sqlStore: SQLiteBackend[KeyId, DataBuffer] + # taskPool: Taskpool + ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]] - basicStoreTests(ds, key, bytes, otherBytes) + setup: + sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() + # taskPool = Taskpool.new(NumThreads) + ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() -suite "Test Query ThreadDatastore with SQLite": + teardown: + GC_fullCollect() - var - sqlStore: SQLiteBackend[KeyId,DataBuffer] - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes + (await ds.close()).tryGet() + # taskPool.shutdown() - setup: - sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet() - taskPool = Taskpool.new(NumThreads) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - - teardown: - GC_fullCollect() - - (await ds.close()).tryGet() - taskPool.shutdown() - - queryTests(ds, true) + for i in 1..M: + queryTests(ds, true) + GC_fullCollect() # suite "Test Basic ThreadDatastore with fsds": # let