diff --git a/datastore/sql/sqliteds.nim b/datastore/sql/sqliteds.nim index 1ce8d55..8b07c39 100644 --- a/datastore/sql/sqliteds.nim +++ b/datastore/sql/sqliteds.nim @@ -26,7 +26,7 @@ proc readOnly*[K,V](self: SQLiteBackend[K,V]): bool = self.db.readOnly proc timestamp*(t = epochTime()): int64 = (t * 1_000_000).int64 -proc has*[K,V](self: SQLiteBackend[K,V], key: DbKey): ?!bool = +proc has*[K,V](self: SQLiteBackend[K,V], key: K): ?!bool = var exists = false key = key @@ -42,7 +42,7 @@ proc has*[K,V](self: SQLiteBackend[K,V], key: DbKey): ?!bool = proc delete*[K,V](self: SQLiteBackend[K,V], key: K): ?!void = return self.db.deleteStmt.exec((key)) -proc delete*[K,V](self: SQLiteBackend[K,V], keys: openArray[DbKey]): ?!void = +proc delete*[K,V](self: SQLiteBackend[K,V], keys: openArray[K]): ?!void = if err =? self.db.beginStmt.exec().errorOption: return failure(err) @@ -74,7 +74,7 @@ proc get*[K,V](self: SQLiteBackend[K,V], key: K): ?!seq[byte] = if bytes.len <= 0: return failure( - newException(DatastoreKeyNotFound, "DbKey doesn't exist")) + newException(DatastoreKeyNotFound, "key doesn't exist")) return success bytes @@ -212,7 +212,7 @@ iterator iter*[K, V](handle: var DbQueryHandle[K, V, RawStmtPtr]): ?!DbQueryResp handle.close() -proc contains*[K,V](self: SQLiteBackend[K,V], key: DbKey): bool = +proc contains*[K,V](self: SQLiteBackend[K,V], key: K): bool = return self.has(key).get() diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 715a051..eb635de 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -50,7 +50,7 @@ type TaskCtx[D; T: ThreadTypes] = object - ds: D + key: KeyId res: ThreadResult[T] signal: ThreadSignalPtr running: bool @@ -74,14 +74,20 @@ proc setCancelled(ctx: var TaskCtx): bool = ctx.cancelled = true return true -proc dispatchTask[D, T](self: ThreadDatastore, - ctx: var TaskCtx[D, T], - key: ?KeyId = KeyId.none, - runTask: proc(): void - ): Future[?!T] {.async.} = +template dispatchTask(self: ThreadDatastore, + signal: ThreadSignalPtr, + fn: untyped): auto = + var + ctx = TaskCtx[SqliteDB, bool](signal: signal) try: - runTask() - await wait(ctx.signal) + case self.backend.kind: + of Sqlite: + var ds = self.backend.sql + proc runTask() = + `fn`(addr ctx, ds) + runTask() + + await wait(ctx.signal) except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg while not ctx.setCancelled(): @@ -98,13 +104,13 @@ proc acquireSignal(): ?!ThreadSignalPtr = else: success signal.get() -proc hasTask(ctx: ptr TaskCtx, key: KeyId) = +proc hasTask[D](ctx: ptr TaskCtx, ds: D) = defer: if not ctx.isNil: discard ctx[].signal.fireSync() try: - let res = has(ctx.ds, key) + let res = has(ds, key) ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr: e.toThreadErr() except CatchableError as exc: @@ -118,16 +124,7 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = without signal =? acquireSignal(), err: return bool.failure err - case self.backend.kind: - of Sqlite: - var - ds = self.backend.sql - ctx = TaskCtx[SqliteDB, bool](ds: self.backend.sql, signal: signal) - - proc runTask() = - self.tp.spawn hasTask(addr ctx, key) - - return await self.dispatchTask(ctx, key.some, runTask) + dispatchTask(self, signal, hasTask) # proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = # if ctx.isNil: