diff --git a/datastore/sql.nim b/datastore/sql.nim index 28b1f3d..5b68b53 100644 --- a/datastore/sql.nim +++ b/datastore/sql.nim @@ -13,7 +13,7 @@ import ../datastore import ./backend import ./sql/sqliteds -export datastore, sqliteds +export datastore push: {.upraises: [].} @@ -42,8 +42,7 @@ method delete*(self: SQLiteDatastore, method get*(self: SQLiteDatastore, key: Key): Future[?!seq[byte]] {.async.} = - self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]: - d.toSeq() + self.db.get(KeyId.new key.id()) method put*(self: SQLiteDatastore, key: Key, @@ -80,7 +79,7 @@ method queryIter*( yield QueryResponse.failure err let k = qres.key.map() do(k: KeyId) -> Key: Key.init($k).expect("valid key") - let v: seq[byte] = qres.data.toSeq() + let v: seq[byte] = qres.data yield success (k, v) success iter @@ -88,14 +87,20 @@ method queryIter*( proc new*( T: type SQLiteDatastore, path: string, - readOnly = false): ?!SQLiteDatastore = + readOnly = false): ?!T = - success SQLiteDatastore( - db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)) + let + flags = + if readOnly: SQLITE_OPEN_READONLY + else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE + + success T( + db: ? SQLiteDsDb.open(path, flags), + readOnly: readOnly) proc new*( T: type SQLiteDatastore, - db: SQLiteBackend[KeyId, DataBuffer]): ?!T = + db: SQLiteDsDb): ?!T = success T( db: db, diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 795274e..ae14782 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -50,7 +50,6 @@ type signal: ThreadSignalPtr running: bool cancelled: bool - nextSignal: (Lock, Cond) TaskCtx*[T] = SharedPtr[TaskCtxObj[T]] @@ -252,6 +251,7 @@ proc queryTask[DB]( ctx: TaskCtx[QResult], ds: DB, query: DbQuery[KeyId], + nextSignal: ThreadSignalPtr ) {.gcsafe, nimcall.} = ## run query command executeTask(ctx): @@ -265,11 +265,8 @@ proc queryTask[DB]( # otherwise manually an set empty ok result ctx[].res.ok (KeyId.none, DataBuffer(), ) discard ctx[].signal.fireSync() - - withLock(ctx[].nextSignal[0]): - wait(ctx[].nextSignal[1], ctx[].nextSignal[0]) - # if not nextSignal.waitSync(10.seconds).get(): - # raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") + if not nextSignal.waitSync(10.seconds).get(): + raise newException(DeadThreadDefect, "query task timeout; possible deadlock!") var handle = handleRes.get() for item in handle.iter(): @@ -285,9 +282,7 @@ proc queryTask[DB]( discard ctx[].signal.fireSync() - # discard nextSignal.waitSync().get() - withLock(ctx[].nextSignal[0]): - wait(ctx[].nextSignal[1], ctx[].nextSignal[0]) + discard nextSignal.waitSync().get() # set final result (?!QResult).ok((KeyId.none, DataBuffer())) @@ -301,11 +296,8 @@ method query*(self: ThreadDatastore, await self.semaphore.acquire() without signal =? acquireSignal(), err: return failure err - # without nextSignal =? acquireSignal(), err: - # return failure err - let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) - ctx[].nextSignal[0].initLock() - ctx[].nextSignal[1].initCond() + without nextSignal =? acquireSignal(), err: + return failure err try: let query = dbQuery( @@ -313,10 +305,10 @@ method query*(self: ThreadDatastore, value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) # setup initial queryTask + let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): - self.tp.spawn queryTask(ctx, ds, query) - withLock(ctx[].nextSignal[0]): - signal(ctx[].nextSignal[1]) + self.tp.spawn queryTask(ctx, ds, query, nextSignal) + await nextSignal.fire() var lock = newAsyncLock() # serialize querying under threads @@ -337,8 +329,7 @@ method query*(self: ThreadDatastore, iter.finished = true defer: - withLock(ctx[].nextSignal[0]): - signal(ctx[].nextSignal[1]) + await nextSignal.fire() if ctx[].res.isErr(): return err(ctx[].res.error()) @@ -351,7 +342,7 @@ method query*(self: ThreadDatastore, trace "Cancelling thread future!", exc = exc.msg ctx.setCancelled() discard ctx[].signal.close() - # discard nextSignal.close() + discard nextSignal.close() self.semaphore.release() raise exc @@ -360,7 +351,7 @@ method query*(self: ThreadDatastore, except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg discard signal.close() - # discard nextSignal.close() + discard nextSignal.close() self.semaphore.release() raise exc diff --git a/tests/datastore/sql/testsqlite.nim b/tests/datastore/sql/testsqlite.nim index 3e15c43..c629eb0 100644 --- a/tests/datastore/sql/testsqlite.nim +++ b/tests/datastore/sql/testsqlite.nim @@ -8,7 +8,7 @@ import pkg/chronos import pkg/stew/results import pkg/stew/byteutils -import pkg/datastore/sql +import pkg/datastore/sql/sqliteds import ../dscommontests import ../querycommontests diff --git a/tests/datastore/testasyncsemaphore.nim b/tests/datastore/testasyncsemaphore.nim index aefff19..3c8bb84 100644 --- a/tests/datastore/testasyncsemaphore.nim +++ b/tests/datastore/testasyncsemaphore.nim @@ -98,6 +98,7 @@ suite "AsyncSemaphore": resource.inc() check resource > 0 and resource <= 3 let sleep = rand(0..10).millis + # echo sleep await sleepAsync(sleep) finally: resource.dec()