mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-02 13:43:11 +00:00
simplifying
This commit is contained in:
parent
575d973118
commit
77e53d2bf6
@ -26,7 +26,7 @@ proc readOnly*[K,V](self: SQLiteBackend[K,V]): bool = self.db.readOnly
|
||||
proc timestamp*(t = epochTime()): int64 =
|
||||
(t * 1_000_000).int64
|
||||
|
||||
proc has*[K,V](self: SQLiteBackend[K,V], key: DbKey): ?!bool =
|
||||
proc has*[K,V](self: SQLiteBackend[K,V], key: K): ?!bool =
|
||||
var
|
||||
exists = false
|
||||
key = key
|
||||
@ -42,7 +42,7 @@ proc has*[K,V](self: SQLiteBackend[K,V], key: DbKey): ?!bool =
|
||||
proc delete*[K,V](self: SQLiteBackend[K,V], key: K): ?!void =
|
||||
return self.db.deleteStmt.exec((key))
|
||||
|
||||
proc delete*[K,V](self: SQLiteBackend[K,V], keys: openArray[DbKey]): ?!void =
|
||||
proc delete*[K,V](self: SQLiteBackend[K,V], keys: openArray[K]): ?!void =
|
||||
if err =? self.db.beginStmt.exec().errorOption:
|
||||
return failure(err)
|
||||
|
||||
@ -74,7 +74,7 @@ proc get*[K,V](self: SQLiteBackend[K,V], key: K): ?!seq[byte] =
|
||||
|
||||
if bytes.len <= 0:
|
||||
return failure(
|
||||
newException(DatastoreKeyNotFound, "DbKey doesn't exist"))
|
||||
newException(DatastoreKeyNotFound, "key doesn't exist"))
|
||||
|
||||
return success bytes
|
||||
|
||||
@ -212,7 +212,7 @@ iterator iter*[K, V](handle: var DbQueryHandle[K, V, RawStmtPtr]): ?!DbQueryResp
|
||||
handle.close()
|
||||
|
||||
|
||||
proc contains*[K,V](self: SQLiteBackend[K,V], key: DbKey): bool =
|
||||
proc contains*[K,V](self: SQLiteBackend[K,V], key: K): bool =
|
||||
return self.has(key).get()
|
||||
|
||||
|
||||
|
||||
@ -50,7 +50,7 @@ type
|
||||
|
||||
|
||||
TaskCtx[D; T: ThreadTypes] = object
|
||||
ds: D
|
||||
key: KeyId
|
||||
res: ThreadResult[T]
|
||||
signal: ThreadSignalPtr
|
||||
running: bool
|
||||
@ -74,14 +74,20 @@ proc setCancelled(ctx: var TaskCtx): bool =
|
||||
ctx.cancelled = true
|
||||
return true
|
||||
|
||||
proc dispatchTask[D, T](self: ThreadDatastore,
|
||||
ctx: var TaskCtx[D, T],
|
||||
key: ?KeyId = KeyId.none,
|
||||
runTask: proc(): void
|
||||
): Future[?!T] {.async.} =
|
||||
template dispatchTask(self: ThreadDatastore,
|
||||
signal: ThreadSignalPtr,
|
||||
fn: untyped): auto =
|
||||
var
|
||||
ctx = TaskCtx[SqliteDB, bool](signal: signal)
|
||||
try:
|
||||
runTask()
|
||||
await wait(ctx.signal)
|
||||
case self.backend.kind:
|
||||
of Sqlite:
|
||||
var ds = self.backend.sql
|
||||
proc runTask() =
|
||||
`fn`(addr ctx, ds)
|
||||
runTask()
|
||||
|
||||
await wait(ctx.signal)
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
while not ctx.setCancelled():
|
||||
@ -98,13 +104,13 @@ proc acquireSignal(): ?!ThreadSignalPtr =
|
||||
else:
|
||||
success signal.get()
|
||||
|
||||
proc hasTask(ctx: ptr TaskCtx, key: KeyId) =
|
||||
proc hasTask[D](ctx: ptr TaskCtx, ds: D) =
|
||||
defer:
|
||||
if not ctx.isNil:
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
try:
|
||||
let res = has(ctx.ds, key)
|
||||
let res = has(ds, key)
|
||||
ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr:
|
||||
e.toThreadErr()
|
||||
except CatchableError as exc:
|
||||
@ -118,16 +124,7 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
without signal =? acquireSignal(), err:
|
||||
return bool.failure err
|
||||
|
||||
case self.backend.kind:
|
||||
of Sqlite:
|
||||
var
|
||||
ds = self.backend.sql
|
||||
ctx = TaskCtx[SqliteDB, bool](ds: self.backend.sql, signal: signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn hasTask(addr ctx, key)
|
||||
|
||||
return await self.dispatchTask(ctx, key.some, runTask)
|
||||
dispatchTask(self, signal, hasTask)
|
||||
|
||||
# proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
|
||||
# if ctx.isNil:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user