integrate setups

This commit is contained in:
Jaremy Creechley 2023-09-25 23:48:17 -07:00
parent 9c7c47e66a
commit 0bd6bd5801
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300

View File

@ -43,7 +43,7 @@ type
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]
TaskCtx[D; T: ThreadTypes] = object
TaskCtx[T: ThreadTypes] = object
res: ThreadResult[T]
signal: ThreadSignalPtr
running: bool
@ -78,7 +78,7 @@ template dispatchTask[T](self: ThreadDatastore,
blk: untyped
): auto =
var
ctx {.inject.} = TaskCtx[SqliteDB, T](signal: signal)
ctx {.inject.} = TaskCtx[T](signal: signal)
try:
case self.backend.kind:
of Sqlite:
@ -98,7 +98,7 @@ template dispatchTask[T](self: ThreadDatastore,
discard ctx.signal.close()
self.semaphore.release()
template executeTask(ctx: ptr TaskCtx, blk: untyped) =
template executeTask[T](ctx: ptr TaskCtx[T], blk: untyped) =
try:
withLock(ctxLock):
if ctx.cancelled:
@ -107,18 +107,23 @@ template executeTask(ctx: ptr TaskCtx, blk: untyped) =
## run backend command
let res = `blk`
if res.isOk():
when T is void:
ctx.res.ok()
else:
ctx.res.ok(res.get())
else:
ctx.res.err res.error().toThreadErr()
withLock(ctxLock):
ctx.running = false
ctx.res = res.mapErr() do(e: ref CatchableError) -> ThreadResErr:
e.toThreadErr()
except CatchableError as exc:
trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg
raiseAssert exc.msg
finally:
discard ctx[].signal.fireSync()
proc hasTask[DB](ctx: ptr TaskCtx, ds: DB, key: KeyId) {.gcsafe.} =
proc hasTask[T, DB](ctx: ptr TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
## run backend command
executeTask(ctx):
has(ds, key)
@ -133,8 +138,8 @@ method has*(self: ThreadDatastore,
dispatchTask[bool](self, signal):
self.tp.spawn hasTask(addr ctx, ds, key)
proc deleteTask[DB](ctx: ptr TaskCtx, ds: DB;
key: KeyId) {.gcsafe.} =
proc deleteTask[T, DB](ctx: ptr TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe.} =
## run backend command
executeTask(ctx):
delete(ds, key)
@ -158,128 +163,51 @@ method delete*(self: ThreadDatastore,
return success()
# proc asyncPutTask(
# ctx: ptr TaskCtx[void],
# key: ptr Key,
# data: ptr UncheckedArray[byte],
# len: int) {.async.} =
proc putTask[DB](ctx: ptr TaskCtx, ds: DB;
key: KeyId,
data: DataBuffer) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
put(ds, key, data)
# if ctx.isNil:
# trace "ctx is nil"
# return
method put*(self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
# let
# key = key[]
# data = @(data.toOpenArray(0, len - 1))
# fut = ctx[].ds.put(key, data)
let key = KeyId.new key.id()
let data = DataBuffer.new data
dispatchTask[void](self, signal):
self.tp.spawn putTask(addr ctx, ds, key, data)
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
# asyncSpawn signalMonitor(ctx, fut)
# without res =? (await fut).catch, error:
# trace "Error in asyncPutTask", error = error.msg
# ctx[].res[].err(error)
# return
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
# ctx[].res[].ok()
return success()
# proc putTask(
# ctx: ptr TaskCtx,
# key: ptr Key,
# data: ptr UncheckedArray[byte],
# len: int) =
# ## run put in a thread task
# ##
proc getTask[DB](ctx: ptr TaskCtx, ds: DB;
key: KeyId) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
get(ds, key)
# defer:
# if not ctx.isNil:
# discard ctx[].signal.fireSync()
method get*(self: ThreadDatastore,
key: Key,
): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
# try:
# waitFor asyncPutTask(ctx, key, data, len)
# except CatchableError as exc:
# trace "Unexpected exception thrown in asyncPutTask", exc = exc.msg
# raiseAssert exc.msg
# method put*(
# self: ThreadDatastore,
# key: Key,
# data: seq[byte]): Future[?!void] {.async.} =
# var
# key = key
# data = data
# res = ThreadResult[void]()
# ctx = TaskCtx[void](
# ds: self.ds,
# res: addr res)
# proc runTask() =
# self.tp.spawn putTask(
# addr ctx,
# addr key,
# makeUncheckedArray(addr data[0]),
# data.len)
# return self.dispatchTask(ctx, key.some, runTask)
# method put*(
# self: ThreadDatastore,
# batch: seq[BatchEntry]): Future[?!void] {.async.} =
# for entry in batch:
# if err =? (await self.put(entry.key, entry.data)).errorOption:
# return failure err
# return success()
# proc asyncGetTask(
# ctx: ptr TaskCtx[DataBuffer],
# key: ptr Key) {.async.} =
# if ctx.isNil:
# trace "ctx is nil"
# return
# let
# key = key[]
# fut = ctx[].ds.get(key)
# asyncSpawn signalMonitor(ctx, fut)
# without res =? (await fut).catch and data =? res, error:
# trace "Error in asyncGetTask", error = error.msg
# ctx[].res[].err(error)
# return
# trace "Got data in get"
# ctx[].res[].ok(DataBuffer.new(data))
# proc getTask(
# ctx: ptr TaskCtx,
# key: ptr Key) =
# ## Run get in a thread task
# ##
# defer:
# if not ctx.isNil:
# discard ctx[].signal.fireSync()
# try:
# waitFor asyncGetTask(ctx, key)
# except CatchableError as exc:
# trace "Unexpected exception thrown in asyncGetTask", exc = exc.msg
# raiseAssert exc.msg
# method get*(
# self: ThreadDatastore,
# key: Key): Future[?!seq[byte]] {.async.} =
# var
# key = key
# res = ThreadResult[DataBuffer]()
# ctx = TaskCtx[DataBuffer](
# ds: self.ds,
# res: addr res)
# proc runTask() =
# self.tp.spawn getTask(addr ctx, addr key)
# return self.dispatchTask(ctx, key.some, runTask)
let key = KeyId.new key.id()
dispatchTask[void](self, signal):
self.tp.spawn getTask(addr ctx, ds, key)
# method close*(self: ThreadDatastore): Future[?!void] {.async.} =
# for fut in self.tasks.values.toSeq: