rework tuple types

This commit is contained in:
Jaremy Creechley 2023-09-27 11:51:55 -07:00
parent d1f503fa96
commit 5e424262c3
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -36,16 +36,6 @@ logScope:
type type
ThreadBackendKinds* = enum
Sqlite
# Filesystem
ThreadBackend* = object
## backend case type to avoid needing to make ThreadDatastore generic
case kind*: ThreadBackendKinds
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]
TaskCtxObj*[T: ThreadTypes] = object TaskCtxObj*[T: ThreadTypes] = object
res: ThreadResult[T] res: ThreadResult[T]
signal: ThreadSignalPtr signal: ThreadSignalPtr
@ -56,15 +46,18 @@ type
## Task context object. ## Task context object.
## This is a SharedPtr to make the query iter simpler ## This is a SharedPtr to make the query iter simpler
ThreadDatastore* = ref object of Datastore ThreadDatastore*[BT] = ref object of Datastore
tp: Taskpool tp: Taskpool
backend: ThreadBackend backend: BT
semaphore: AsyncSemaphore # semaphore is used for backpressure \ semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors # to avoid exhausting file descriptors
var ctxLock: Lock var ctxLock: Lock
ctxLock.initLock() ctxLock.initLock()
proc newTaskCtx*[T](signal: ThreadSignalPtr): TaskCtx[T] =
newSharedPtr(TaskCtxObj[T](signal: signal))
proc setCancelled[T](ctx: TaskCtx[T]) = proc setCancelled[T](ctx: TaskCtx[T]) =
# withLock(ctxLock): # withLock(ctxLock):
ctx[].cancelled = true ctx[].cancelled = true
@ -113,28 +106,26 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
ctx.setDone() ctx.setDone()
discard ctx[].signal.fireSync() discard ctx[].signal.fireSync()
template dispatchTaskWrap[T](self: ThreadDatastore, template dispatchTaskWrap[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr, signal: ThreadSignalPtr,
blk: untyped blk: untyped
): auto = ): auto =
case self.backend.kind: var ds {.used, inject.} = self.backend
of Sqlite: proc runTask() =
var ds {.used, inject.} = self.backend.sql `blk`
proc runTask() = runTask()
`blk` await wait(ctx[].signal)
runTask()
await wait(ctx[].signal)
template dispatchTask[T](self: ThreadDatastore, template dispatchTask[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr, signal: ThreadSignalPtr,
blk: untyped blk: untyped
): auto = ): auto =
## handles dispatching a task from an async context ## handles dispatching a task from an async context
## `blk` is the actions, it has `ctx` and `ds` variables in scope. ## `blk` is the actions, it has `ctx` and `ds` variables in scope.
## note that `ds` is a generic ## note that `ds` is a generic
let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal)) # let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal))
try: try:
dispatchTaskWrap[T](self, signal, blk) dispatchTaskWrap[BT](self, signal, blk)
except CancelledError as exc: except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled() ctx.setCancelled()
@ -149,38 +140,39 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
executeTask(ctx): executeTask(ctx):
has(ds, key) has(ds, key)
method has*(self: ThreadDatastore, method has*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!bool] {.async.} = key: Key): Future[?!bool] {.async.} =
await self.semaphore.acquire() await self.semaphore.acquire()
without signal =? acquireSignal(), err: without signal =? acquireSignal(), err:
return failure err return failure err
let key = KeyId.new key.id() let ctx = newTaskCtx(bool, signal: signal)
dispatchTask[bool](self, signal): dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn hasTask(ctx, ds, key) self.tp.spawn hasTask(ctx, ds, key)
return ctx[].res.toRes(v => v) return ctx[].res.toRes(v => v)
proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB; proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe.} = key: KeyId) {.gcsafe.} =
## run backend command ## run backend command
executeTask(ctx): executeTask(ctx):
delete(ds, key) delete(ds, key)
method delete*(self: ThreadDatastore, method delete*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!void] {.async.} = key: Key): Future[?!void] {.async.} =
## delete key ## delete key
await self.semaphore.acquire() await self.semaphore.acquire()
without signal =? acquireSignal(), err: without signal =? acquireSignal(), err:
return failure err return failure err
let key = KeyId.new key.id() let ctx = newTaskCtx[void](signal: signal)
dispatchTask[void](self, signal): dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn deleteTask(ctx, ds, key) self.tp.spawn deleteTask(ctx, ds, key)
return ctx[].res.toRes() return ctx[].res.toRes()
method delete*(self: ThreadDatastore, method delete*[BT](self: ThreadDatastore[BT],
keys: seq[Key]): Future[?!void] {.async.} = keys: seq[Key]): Future[?!void] {.async.} =
## delete batch ## delete batch
for key in keys: for key in keys:
@ -196,7 +188,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx): executeTask(ctx):
put(ds, key, data) put(ds, key, data)
method put*(self: ThreadDatastore, method put*[BT](self: ThreadDatastore[BT],
key: Key, key: Key,
data: seq[byte]): Future[?!void] {.async.} = data: seq[byte]): Future[?!void] {.async.} =
## put key with data ## put key with data
@ -204,7 +196,8 @@ method put*(self: ThreadDatastore,
without signal =? acquireSignal(), err: without signal =? acquireSignal(), err:
return failure err return failure err
dispatchTask[void](self, signal): let ctx = newTaskCtx[void](signal: signal)
dispatchTask(self, signal):
let key = KeyId.new key.id() let key = KeyId.new key.id()
let data = DataBuffer.new data let data = DataBuffer.new data
self.tp.spawn putTask(ctx, ds, key, data) self.tp.spawn putTask(ctx, ds, key, data)
@ -229,24 +222,22 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
let res = get(ds, key) let res = get(ds, key)
res res
method get*(self: ThreadDatastore, method get*[BT](self: ThreadDatastore[BT],
key: Key, key: Key,
): Future[?!seq[byte]] {.async.} = ): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire() await self.semaphore.acquire()
without signal =? acquireSignal(), err: without signal =? acquireSignal(), err:
return failure err return failure err
let key = KeyId.new key.id() let ctx = newTaskCtx[void](signal: signal)
dispatchTask[DataBuffer](self, signal): dispatchTask[DataBuffer](self, signal):
self.tp.spawn getTask(ctx, ds, key) self.tp.spawn getTask(ctx, ds, key)
return ctx[].res.toRes(v => v.toSeq()) return ctx[].res.toRes(v => v.toSeq())
method close*(self: ThreadDatastore): Future[?!void] {.async.} = method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
await self.semaphore.closeAll() await self.semaphore.closeAll()
case self.backend.kind: self.backend.close()
of Sqlite:
self.backend.sql.close()
type type
QResult = DbQueryResponse[KeyId, DataBuffer] QResult = DbQueryResponse[KeyId, DataBuffer]
@ -293,7 +284,7 @@ proc queryTask[DB](
# set final result # set final result
(?!QResult).ok((KeyId.none, DataBuffer())) (?!QResult).ok((KeyId.none, DataBuffer()))
method query*(self: ThreadDatastore, method query*[BT](self: ThreadDatastore[BT],
q: Query q: Query
): Future[?!QueryIter] {.async.} = ): Future[?!QueryIter] {.async.} =
## performs async query ## performs async query
@ -311,8 +302,8 @@ method query*(self: ThreadDatastore,
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort) value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
# setup initial queryTask # setup initial queryTask
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal)) let ctx = newTaskCtx[QResult](signal: signal)
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal): dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer], BT](self, signal):
self.tp.spawn queryTask(ctx, ds, query, nextSignal) self.tp.spawn queryTask(ctx, ds, query, nextSignal)
await nextSignal.fire() await nextSignal.fire()
@ -368,14 +359,9 @@ proc new*[DB](self: type ThreadDatastore,
): ?!ThreadDatastore = ): ?!ThreadDatastore =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
when DB is SQLiteBackend[KeyId,DataBuffer]:
let backend = ThreadBackend(kind: Sqlite, sql: db)
else:
{.error: "unsupported backend: " & $typeof(db).}
success ThreadDatastore( success ThreadDatastore(
tp: tp, tp: tp,
backend: backend, backend: db,
# TODO: are these needed anymore?? # TODO: are these needed anymore??
# withLocks: withLocks, # withLocks: withLocks,
# queryLock: newAsyncLock(), # queryLock: newAsyncLock(),