This commit is contained in:
Jaremy Creechley 2023-09-25 21:27:35 -07:00
parent 46a29128de
commit 1a6065b89d
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 17 additions and 31 deletions

View File

@ -9,7 +9,6 @@ import ./types
export databuffer, types, SortOrder
type
DbQueryResponse*[K, V] = tuple[key: Option[K], data: V]
DbQuery*[K] = object
@ -41,13 +40,7 @@ proc dbQuery*[K](
offset = 0,
limit = -1
): DbQuery[K] =
DbQuery[K](
key: key,
value: value,
sort: sort,
offset: offset,
limit: limit)
DbQuery[K](key: key, value: value, sort: sort, offset: offset, limit: limit)
proc `$`*(id: KeyId): string = $(id.data)

View File

@ -24,6 +24,7 @@ import ../key
import ../query
import ../datastore
import ../backend
import ../sql/sqliteds
import ./asyncsemaphore
import ./databuffer
@ -35,8 +36,18 @@ logScope:
topics = "datastore threadproxyds"
type
TaskCtx[T: ThreadTypes] = object
ds: Datastore
ThreadBackendKinds* = enum
Sqlite
# Filesystem
ThreadBackend* = object
case kind*: ThreadBackendKinds
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]
TaskCtx[D; T: ThreadTypes] = object
ds: D
res: ptr ThreadResult[T]
cancelled: bool
semaphore: AsyncSemaphore
@ -77,9 +88,9 @@ template withLocks(
self.queryLock.release()
# TODO: needs rework, we can't use `result` with async
template dispatchTask[T](
template dispatchTask[D, T](
self: ThreadDatastore,
ctx: TaskCtx[T],
ctx: TaskCtx[D, T],
key: ?Key = Key.none,
runTask: proc): auto =
try:
@ -135,31 +146,13 @@ proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
ctx[].res[].err(exc)
discard ctx[].signal.fireSync()
proc asyncHasTask(
ctx: ptr TaskCtx[bool],
key: ptr Key) {.async.} =
if ctx.isNil:
trace "ctx is nil"
return
let
key = key[]
fut = ctx[].ds.has(key)
asyncSpawn signalMonitor(ctx, fut)
without ret =? (await fut).catch and res =? ret, error:
ctx[].res[].err(error)
return
ctx[].res[].ok(res)
proc hasTask(ctx: ptr TaskCtx, key: ptr Key) =
defer:
if not ctx.isNil:
discard ctx[].signal.fireSync()
try:
waitFor asyncHasTask(ctx, key)
has(ctx.ds, key)
except CatchableError as exc:
trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg
raiseAssert exc.msg