nim-datastore/datastore/threads/threadproxyds.nim

306 lines
6.4 KiB
Nim
Raw Permalink Normal View History

2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
when not compileOption("threads"):
{.error: "This module requires --threads:on compilation flag".}
2023-09-08 15:09:15 -06:00
import pkg/upraises
push: {.upraises: [].}
import std/atomics
2023-09-12 13:51:01 -06:00
import std/strutils
2023-09-08 15:09:15 -06:00
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable
import pkg/questionable/results
import pkg/stew/ptrops
import pkg/taskpools
2023-09-12 13:51:01 -06:00
import pkg/stew/byteutils
2023-09-08 15:09:15 -06:00
import ../key
import ../query
import ../datastore
2023-09-12 13:51:01 -06:00
import ./databuffer
2023-09-08 15:09:15 -06:00
type
2023-09-13 14:41:16 -06:00
ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple
2023-09-12 13:51:01 -06:00
ThreadResult[T: ThreadTypes] = Result[T, DataBuffer]
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
TaskCtx[T: ThreadTypes] = object
2023-09-08 15:09:15 -06:00
ds: ptr Datastore
2023-09-12 13:51:01 -06:00
res: ptr ThreadResult[T]
2023-09-08 15:09:15 -06:00
signal: ThreadSignalPtr
ThreadDatastore* = ref object of Datastore
2023-09-12 13:51:01 -06:00
tp: Taskpool
ds: Datastore
tasks: seq[Future[void]]
2023-09-08 15:09:15 -06:00
2023-09-14 17:56:02 -06:00
template dispatchTask(
self: ThreadDatastore,
ctx: TaskCtx,
runTask: proc): untyped =
2023-09-12 13:51:01 -06:00
let
fut = wait(ctx.signal)
2023-09-11 14:48:53 -06:00
2023-09-12 13:51:01 -06:00
try:
runTask()
self.tasks.add(fut)
await fut
if ctx.res[].isErr:
result = failure(ctx.res[].error())
finally:
discard ctx.signal.close()
if (
let idx = self.tasks.find(fut);
idx != -1):
self.tasks.del(idx)
2023-09-08 15:09:15 -06:00
proc hasTask(
ctx: ptr TaskCtx,
2023-09-12 13:51:01 -06:00
key: ptr Key) =
defer:
discard ctx[].signal.fireSync()
2023-09-08 15:09:15 -06:00
2023-09-13 14:41:16 -06:00
without ret =?
(waitFor ctx[].ds[].has(key[])).catch and res =? ret, error:
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-11 14:48:53 -06:00
return
2023-09-08 15:09:15 -06:00
2023-09-13 14:41:16 -06:00
ctx[].res[].ok(res)
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
2023-09-08 15:09:15 -06:00
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
2023-09-12 13:51:01 -06:00
res = ThreadResult[bool]()
ctx = TaskCtx[bool](
2023-09-08 15:09:15 -06:00
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
2023-09-12 13:51:01 -06:00
self.tp.spawn hasTask(addr ctx, unsafeAddr key)
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
self.dispatchTask(ctx, runTask)
return success(res.get())
2023-09-08 15:09:15 -06:00
proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
2023-09-12 13:51:01 -06:00
defer:
discard ctx[].signal.fireSync()
2023-09-11 14:48:53 -06:00
without res =? (waitFor ctx[].ds[].delete(key[])).catch, error:
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-11 14:48:53 -06:00
return
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
ctx[].res[].ok()
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
method delete*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key): Future[?!void] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
2023-09-12 13:51:01 -06:00
res = ThreadResult[void]()
ctx = TaskCtx[void](
2023-09-08 15:09:15 -06:00
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
2023-09-12 13:51:01 -06:00
self.tp.spawn delTask(addr ctx, unsafeAddr key)
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
self.dispatchTask(ctx, runTask)
return success()
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
return success()
2023-09-08 15:09:15 -06:00
proc putTask(
ctx: ptr TaskCtx,
key: ptr Key,
2023-09-12 13:51:01 -06:00
data: DataBuffer,
2023-09-08 15:09:15 -06:00
len: int) =
## run put in a thread task
##
2023-09-12 13:51:01 -06:00
defer:
discard ctx[].signal.fireSync()
without res =? (waitFor ctx[].ds[].put(key[], @data)).catch, error:
ctx[].res[].err(error)
2023-09-11 14:48:53 -06:00
return
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
ctx[].res[].ok()
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
method put*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
2023-09-12 13:51:01 -06:00
res = ThreadResult[void]()
ctx = TaskCtx[void](
2023-09-08 15:09:15 -06:00
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
self.tp.spawn putTask(
addr ctx,
2023-09-12 13:51:01 -06:00
unsafeAddr key,
DataBuffer.new(data),
2023-09-08 15:09:15 -06:00
data.len)
2023-09-12 13:51:01 -06:00
self.dispatchTask(ctx, runTask)
2023-09-11 14:48:53 -06:00
return success()
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
2023-09-08 15:09:15 -06:00
return success()
proc getTask(
ctx: ptr TaskCtx,
2023-09-12 13:51:01 -06:00
key: ptr Key) =
2023-09-08 15:09:15 -06:00
## Run get in a thread task
##
2023-09-12 13:51:01 -06:00
defer:
discard ctx[].signal.fireSync()
2023-09-13 14:41:16 -06:00
without res =?
(waitFor ctx[].ds[].get(key[])).catch and data =? res, error:
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-08 15:09:15 -06:00
return
2023-09-12 13:51:01 -06:00
ctx[].res[].ok(DataBuffer.new(data))
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
method get*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
2023-09-12 13:51:01 -06:00
var
res = ThreadResult[DataBuffer]()
ctx = TaskCtx[DataBuffer](
2023-09-08 15:09:15 -06:00
ds: addr self.ds,
2023-09-11 14:48:53 -06:00
res: addr res,
2023-09-08 15:09:15 -06:00
signal: signal)
proc runTask() =
2023-09-12 13:51:01 -06:00
self.tp.spawn getTask(addr ctx, unsafeAddr key)
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
self.dispatchTask(ctx, runTask)
return success(@(res.get()))
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
for task in self.tasks:
await task.cancelAndWait()
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
await self.ds.close()
2023-09-08 15:09:15 -06:00
2023-09-14 17:56:02 -06:00
proc queryTask(
2023-09-13 14:41:16 -06:00
ctx: ptr TaskCtx,
iter: ptr QueryIter) =
defer:
discard ctx[].signal.fireSync()
without ret =? (waitFor iter[].next()).catch and res =? ret, error:
ctx[].res[].err(error)
return
if res.key.isNone:
ctx[].res[].ok((false, DataBuffer.new(), DataBuffer.new()))
return
var
keyBuf = DataBuffer.new($(res.key.get()))
dataBuf = DataBuffer.new(res.data)
ctx[].res[].ok((true, keyBuf, dataBuf))
method query*(
self: ThreadDatastore,
query: Query): Future[?!QueryIter] {.async.} =
without var childIter =? await self.ds.query(query), error:
return failure error
var
iter = QueryIter.init()
let lock = newAsyncLock()
proc next(): Future[?!QueryResponse] {.async.} =
defer:
if lock.locked:
lock.release()
if iter.finished == true:
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
await lock.acquire()
if iter.finished == true:
return success (Key.none, EmptyBytes)
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
res = ThreadResult[(bool, DataBuffer, DataBuffer)]()
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)](
ds: addr self.ds,
res: addr res,
signal: signal)
proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter)
self.dispatchTask(ctx, runTask)
if err =? res.errorOption:
return failure err
let (ok, key, data) = res.get()
if not ok:
iter.finished = true
return success (Key.none, EmptyBytes)
return success (Key.init($key).expect("should not fail").some, @(data))
iter.next = next
return success iter
2023-09-11 14:48:53 -06:00
func new*(
2023-09-08 15:09:15 -06:00
self: type ThreadDatastore,
ds: Datastore,
tp: Taskpool): ?!ThreadDatastore =
2023-09-12 13:51:01 -06:00
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
success ThreadDatastore(
tp: tp,
2023-09-14 17:56:02 -06:00
ds: ds)