2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-11 14:48:53 -06:00
|
|
|
when not compileOption("threads"):
|
|
|
|
|
{.error: "This module requires --threads:on compilation flag".}
|
|
|
|
|
|
2023-09-08 15:09:15 -06:00
|
|
|
import pkg/upraises
|
|
|
|
|
|
|
|
|
|
push: {.upraises: [].}
|
|
|
|
|
|
|
|
|
|
import std/atomics
|
2023-09-12 13:51:01 -06:00
|
|
|
import std/strutils
|
2023-09-15 13:08:38 -06:00
|
|
|
import std/tables
|
2023-09-15 16:40:46 -06:00
|
|
|
import std/sequtils
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
import pkg/chronos
|
|
|
|
|
import pkg/chronos/threadsync
|
|
|
|
|
import pkg/questionable
|
|
|
|
|
import pkg/questionable/results
|
|
|
|
|
import pkg/stew/ptrops
|
|
|
|
|
import pkg/taskpools
|
2023-09-12 13:51:01 -06:00
|
|
|
import pkg/stew/byteutils
|
2023-09-15 13:08:38 -06:00
|
|
|
import pkg/chronicles
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
import ../key
|
|
|
|
|
import ../query
|
|
|
|
|
import ../datastore
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
import ./asyncsemaphore
|
2023-09-12 13:51:01 -06:00
|
|
|
import ./databuffer
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
type
|
2023-09-15 13:08:38 -06:00
|
|
|
ErrorEnum {.pure.} = enum
|
|
|
|
|
DatastoreErr, DatastoreKeyNotFoundErr, CatchableErr
|
|
|
|
|
|
2023-09-18 13:54:56 -07:00
|
|
|
ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic
|
|
|
|
|
ThreadResult*[T: ThreadTypes] = Result[T, DataBuffer]
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-18 13:54:56 -07:00
|
|
|
TaskCtx*[T: ThreadTypes] = object
|
2023-09-18 14:44:15 -07:00
|
|
|
head: int
|
2023-09-18 13:30:22 -07:00
|
|
|
ds*: ptr Datastore
|
|
|
|
|
res*: ThreadResult[T]
|
2023-09-18 13:19:07 -07:00
|
|
|
cancelled: Atomic[bool]
|
2023-09-18 13:23:05 -07:00
|
|
|
isActive: Atomic[bool]
|
2023-09-15 13:08:38 -06:00
|
|
|
semaphore: AsyncSemaphore
|
2023-09-18 13:30:22 -07:00
|
|
|
signal*: ThreadSignalPtr
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
ThreadDatastore* = ref object of Datastore
|
2023-09-12 13:51:01 -06:00
|
|
|
tp: Taskpool
|
|
|
|
|
ds: Datastore
|
2023-09-15 13:08:38 -06:00
|
|
|
semaphore: AsyncSemaphore # semaphore is used for backpressure \
|
|
|
|
|
# to avoid exhausting file descriptors
|
2023-09-18 13:18:51 -06:00
|
|
|
withLocks: bool
|
|
|
|
|
tasks: Table[Key, Future[void]]
|
|
|
|
|
queryLock: AsyncLock # global query lock, this is only really \
|
|
|
|
|
# needed for the fsds, but it is expensive!
|
2023-09-15 16:40:46 -06:00
|
|
|
|
2023-09-18 14:05:21 -07:00
|
|
|
proc addrOf*[T](ctx: ref TaskCtx[T]): ptr TaskCtx[T] =
|
2023-09-18 14:10:54 -07:00
|
|
|
result = cast[ptr TaskCtx[T]](ctx)
|
|
|
|
|
echo "ADDR_OF: ", result.pointer.repr
|
2023-09-18 13:54:56 -07:00
|
|
|
|
|
|
|
|
proc new*[T](
|
|
|
|
|
ctx: typedesc[TaskCtx[T]],
|
|
|
|
|
ds: Datastore,
|
2023-09-18 14:05:21 -07:00
|
|
|
): ref TaskCtx[T] =
|
2023-09-18 14:44:15 -07:00
|
|
|
result = (ref TaskCtx[T])()
|
2023-09-18 15:10:09 -07:00
|
|
|
result.ds = unsafeAddr(ds) ##\
|
|
|
|
|
## doing this appears to break things. previously it was using `addr(ds)`
|
2023-09-18 15:03:44 -07:00
|
|
|
## and reverting to those lets the tests get further.
|
|
|
|
|
##
|
|
|
|
|
## however, that seems to mean that `addr(ds)` we're taking the
|
|
|
|
|
## address of the `var ds: Datastore` location, and not the actual
|
|
|
|
|
## Datastore:ObjectType. As in `ds: ptr(ref Datastore:ObjectType)`.
|
|
|
|
|
##
|
|
|
|
|
## so doing the `unsafeAddr` would mean we're taking the ptr location
|
|
|
|
|
## of the `ds` argument, which is a temporary stack location.
|
|
|
|
|
##
|
|
|
|
|
## not sure how to fix this while using GC types
|
2023-09-18 15:05:20 -07:00
|
|
|
## just taking the address of the var location sorta works, but
|
|
|
|
|
## crashes now as well, but later. likely due to datastore now being
|
|
|
|
|
## on two GC heaps?
|
2023-09-18 15:03:44 -07:00
|
|
|
|
2023-09-18 14:44:15 -07:00
|
|
|
echo ""
|
|
|
|
|
echo "TaskCtx:new: ", "addrOf: ", addrOf(result).pointer.repr
|
|
|
|
|
echo "TaskCtx:new: ", "head:ptr: ", unsafeAddr(result.head).pointer.repr
|
2023-09-18 15:03:44 -07:00
|
|
|
echo "TaskCtx:new: ", " result:ds:ptr: ", result.ds.pointer.repr
|
|
|
|
|
|
|
|
|
|
echo "TaskCtx:new: ds orig:\n\t", ds.repr
|
|
|
|
|
echo "TaskCtx:new:\n\t", " result:repr:\n", result.repr
|
2023-09-18 14:44:15 -07:00
|
|
|
echo ""
|
2023-09-18 13:54:56 -07:00
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
template withLocks(
|
|
|
|
|
self: ThreadDatastore,
|
2023-09-18 14:05:21 -07:00
|
|
|
ctx: ref TaskCtx,
|
2023-09-15 16:40:46 -06:00
|
|
|
key: ?Key = Key.none,
|
|
|
|
|
fut: Future[void],
|
|
|
|
|
body: untyped) =
|
|
|
|
|
try:
|
2023-09-18 13:18:51 -06:00
|
|
|
if key.isSome and key.get in self.tasks:
|
|
|
|
|
if self.withLocks:
|
2023-09-15 16:40:46 -06:00
|
|
|
await self.tasks[key.get]
|
2023-09-18 13:18:51 -06:00
|
|
|
self.tasks[key.get] = fut # we alway want to store the future, but only await if we're using locks
|
2023-09-15 16:40:46 -06:00
|
|
|
|
2023-09-18 13:18:51 -06:00
|
|
|
if self.withLocks:
|
|
|
|
|
await self.queryLock.acquire() # only lock if it's required (fsds)
|
2023-09-15 16:40:46 -06:00
|
|
|
body
|
|
|
|
|
finally:
|
2023-09-18 13:18:51 -06:00
|
|
|
if self.withLocks:
|
|
|
|
|
if key.isSome and key.get in self.tasks:
|
2023-09-15 16:40:46 -06:00
|
|
|
self.tasks.del(key.get)
|
2023-09-18 13:18:51 -06:00
|
|
|
if self.queryLock.locked:
|
|
|
|
|
self.queryLock.release()
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-18 13:40:44 -06:00
|
|
|
# TODO: needs rework, we can't use `result` with async
|
2023-09-14 17:56:02 -06:00
|
|
|
template dispatchTask(
|
|
|
|
|
self: ThreadDatastore,
|
2023-09-18 14:05:21 -07:00
|
|
|
ctx: ref TaskCtx,
|
2023-09-15 16:40:46 -06:00
|
|
|
key: ?Key = Key.none,
|
2023-09-14 17:56:02 -06:00
|
|
|
runTask: proc): untyped =
|
2023-09-18 13:40:44 -06:00
|
|
|
try:
|
2023-09-18 15:03:44 -07:00
|
|
|
GC_ref(ctx)
|
2023-09-18 13:40:44 -06:00
|
|
|
await self.semaphore.acquire()
|
2023-09-18 14:18:17 -07:00
|
|
|
ctx[].signal = ThreadSignalPtr.new().valueOr:
|
2023-09-18 13:40:44 -06:00
|
|
|
result = failure(error())
|
|
|
|
|
return
|
2023-09-14 17:56:02 -06:00
|
|
|
|
2023-09-18 13:40:44 -06:00
|
|
|
let
|
|
|
|
|
fut = wait(ctx.signal)
|
2023-09-11 14:48:53 -06:00
|
|
|
|
2023-09-18 13:40:44 -06:00
|
|
|
withLocks(self, ctx, key, fut):
|
2023-09-15 16:40:46 -06:00
|
|
|
runTask()
|
|
|
|
|
await fut
|
|
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
if ctx.res.isErr:
|
|
|
|
|
result = failure(ctx.res.error()) # TODO: fix this, result shouldn't be accessed
|
2023-09-18 13:40:44 -06:00
|
|
|
except CancelledError as exc:
|
|
|
|
|
trace "Cancelling thread future!", exc = exc.msg
|
2023-09-18 14:15:47 -07:00
|
|
|
if ctx.isActive.load(moAcquireRelease):
|
|
|
|
|
# could do a spinlock here until the other side cancels,
|
|
|
|
|
# but for now it'd at least be better to leak than possibly
|
|
|
|
|
# corrupt memory since it's easier to detect and fix leaks
|
|
|
|
|
warn "request was cancelled while thread task is running", exc = exc.msg
|
2023-09-18 15:03:44 -07:00
|
|
|
GC_ref(ctx)
|
2023-09-18 14:15:47 -07:00
|
|
|
ctx.cancelled.store(true, moAcquireRelease)
|
2023-09-18 13:40:44 -06:00
|
|
|
await ctx.signal.fire()
|
|
|
|
|
raise exc
|
|
|
|
|
finally:
|
2023-09-18 15:03:44 -07:00
|
|
|
GC_unref(ctx)
|
2023-09-18 13:40:44 -06:00
|
|
|
discard ctx.signal.close()
|
|
|
|
|
self.semaphore.release()
|
2023-09-12 13:51:01 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
|
|
|
|
|
## Monitor the signal and cancel the future if
|
|
|
|
|
## the cancellation flag is set
|
|
|
|
|
##
|
|
|
|
|
|
|
|
|
|
try:
|
2023-09-18 13:23:05 -07:00
|
|
|
ctx[].isActive.store(true, moAcquireRelease)
|
2023-09-15 13:08:38 -06:00
|
|
|
await ctx[].signal.wait()
|
|
|
|
|
trace "Received signal"
|
|
|
|
|
|
2023-09-18 13:19:07 -07:00
|
|
|
if ctx[].cancelled.load(moAcquireRelease): # there could eventually be other flags
|
2023-09-15 13:08:38 -06:00
|
|
|
trace "Cancelling future"
|
|
|
|
|
if not fut.finished:
|
|
|
|
|
await fut.cancelAndWait() # cancel the `has` future
|
|
|
|
|
|
|
|
|
|
discard ctx[].signal.fireSync()
|
|
|
|
|
except CatchableError as exc:
|
|
|
|
|
trace "Exception in thread signal monitor", exc = exc.msg
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.err(exc)
|
2023-09-15 13:08:38 -06:00
|
|
|
discard ctx[].signal.fireSync()
|
2023-09-18 13:23:05 -07:00
|
|
|
finally:
|
|
|
|
|
ctx[].isActive.store(false, moAcquireRelease)
|
2023-09-12 13:51:01 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc asyncHasTask(
|
|
|
|
|
ctx: ptr TaskCtx[bool],
|
|
|
|
|
key: ptr Key) {.async.} =
|
2023-09-12 13:51:01 -06:00
|
|
|
defer:
|
|
|
|
|
discard ctx[].signal.fireSync()
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
let
|
|
|
|
|
fut = ctx[].ds[].has(key[])
|
|
|
|
|
|
|
|
|
|
asyncSpawn signalMonitor(ctx, fut)
|
|
|
|
|
without ret =? (await fut).catch and res =? ret, error:
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.err(error)
|
2023-09-11 14:48:53 -06:00
|
|
|
return
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.ok(res)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc hasTask(ctx: ptr TaskCtx, key: ptr Key) =
|
|
|
|
|
try:
|
|
|
|
|
waitFor asyncHasTask(ctx, key)
|
|
|
|
|
except CatchableError as exc:
|
2023-09-15 16:54:50 -06:00
|
|
|
trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg
|
2023-09-15 13:08:38 -06:00
|
|
|
raiseAssert exc.msg
|
|
|
|
|
|
2023-09-11 14:48:53 -06:00
|
|
|
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
|
2023-09-08 15:09:15 -06:00
|
|
|
var
|
|
|
|
|
|
2023-09-18 14:18:17 -07:00
|
|
|
ctx = TaskCtx[bool].new( ds = self.ds)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
proc runTask() =
|
2023-09-18 13:54:56 -07:00
|
|
|
self.tp.spawn hasTask(addrOf(ctx), unsafeAddr key)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
self.dispatchTask(ctx, key.some, runTask)
|
2023-09-18 13:04:13 -07:00
|
|
|
return success(ctx.res.get())
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
|
2023-09-12 13:51:01 -06:00
|
|
|
defer:
|
|
|
|
|
discard ctx[].signal.fireSync()
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
let
|
|
|
|
|
fut = ctx[].ds[].delete(key[])
|
|
|
|
|
|
|
|
|
|
asyncSpawn signalMonitor(ctx, fut)
|
|
|
|
|
without res =? (await fut).catch, error:
|
2023-09-15 16:40:46 -06:00
|
|
|
trace "Error in asyncDelTask", error = error.msg
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.err(error)
|
2023-09-11 14:48:53 -06:00
|
|
|
return
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.ok()
|
2023-09-15 13:08:38 -06:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
|
|
|
|
|
try:
|
|
|
|
|
waitFor asyncDelTask(ctx, key)
|
|
|
|
|
except CatchableError as exc:
|
2023-09-15 16:54:50 -06:00
|
|
|
trace "Unexpected exception thrown in asyncDelTask", exc = exc.msg
|
2023-09-15 13:08:38 -06:00
|
|
|
raiseAssert exc.msg
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-11 14:48:53 -06:00
|
|
|
method delete*(
|
2023-09-08 15:09:15 -06:00
|
|
|
self: ThreadDatastore,
|
|
|
|
|
key: Key): Future[?!void] {.async.} =
|
|
|
|
|
var
|
|
|
|
|
|
2023-09-18 14:18:17 -07:00
|
|
|
ctx = TaskCtx[void].new( ds= self.ds)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
proc runTask() =
|
2023-09-18 13:54:56 -07:00
|
|
|
self.tp.spawn delTask(addrOf(ctx), unsafeAddr key)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
self.dispatchTask(ctx, key.some, runTask)
|
2023-09-12 13:51:01 -06:00
|
|
|
return success()
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-14 17:47:37 -06:00
|
|
|
method delete*(
|
|
|
|
|
self: ThreadDatastore,
|
|
|
|
|
keys: seq[Key]): Future[?!void] {.async.} =
|
|
|
|
|
|
2023-09-11 14:48:53 -06:00
|
|
|
for key in keys:
|
|
|
|
|
if err =? (await self.delete(key)).errorOption:
|
|
|
|
|
return failure err
|
|
|
|
|
|
|
|
|
|
return success()
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc asyncPutTask(
|
|
|
|
|
ctx: ptr TaskCtx[void],
|
2023-09-08 15:09:15 -06:00
|
|
|
key: ptr Key,
|
2023-09-14 17:47:37 -06:00
|
|
|
data: ptr UncheckedArray[byte],
|
2023-09-15 13:08:38 -06:00
|
|
|
len: int) {.async.} =
|
2023-09-18 14:19:32 -07:00
|
|
|
|
2023-09-18 14:44:15 -07:00
|
|
|
echo ""
|
|
|
|
|
echo "ASYNC PUT TASK:ptr: ", ctx.pointer.repr
|
|
|
|
|
# echo "PUT TASK:\n", ctx[].repr
|
|
|
|
|
echo "ASYNC PUT TASK:ds: ", ctx[].ds.pointer.repr
|
|
|
|
|
echo "ASYNC PUT TASK:res: ", ctx[].res.repr
|
|
|
|
|
echo "ASYNC PUT TASK:signal: ", ctx[].signal.pointer.repr
|
|
|
|
|
echo "ASYNC PUT TASK:cancelled: ", ctx[].cancelled.repr
|
|
|
|
|
echo "ASYNC PUT TASK:semaphore: ", ctx[].semaphore.repr
|
|
|
|
|
echo "ASYNC PUT TASK:ds: ", ctx[].ds[].repr
|
|
|
|
|
|
2023-09-12 13:51:01 -06:00
|
|
|
defer:
|
2023-09-18 14:44:15 -07:00
|
|
|
discard ctx.signal.fireSync()
|
2023-09-12 13:51:01 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
let
|
2023-09-18 14:44:15 -07:00
|
|
|
fut = ctx.ds[].put(key[], @(data.toOpenArray(0, len - 1)))
|
2023-09-15 13:08:38 -06:00
|
|
|
|
|
|
|
|
asyncSpawn signalMonitor(ctx, fut)
|
|
|
|
|
without res =? (await fut).catch, error:
|
2023-09-15 16:40:46 -06:00
|
|
|
trace "Error in asyncPutTask", error = error.msg
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.err(error)
|
2023-09-11 14:48:53 -06:00
|
|
|
return
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.ok()
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc putTask(
|
|
|
|
|
ctx: ptr TaskCtx,
|
|
|
|
|
key: ptr Key,
|
|
|
|
|
data: ptr UncheckedArray[byte],
|
|
|
|
|
len: int) =
|
|
|
|
|
## run put in a thread task
|
|
|
|
|
##
|
|
|
|
|
|
2023-09-18 14:44:15 -07:00
|
|
|
echo ""
|
|
|
|
|
echo "PUT TASK:ptr: ", ctx.pointer.repr
|
|
|
|
|
# echo "PUT TASK:\n", ctx[].repr
|
|
|
|
|
echo "PUT TASK:ds: ", ctx[].ds.pointer.repr
|
|
|
|
|
echo "PUT TASK:res: ", ctx[].res.repr
|
|
|
|
|
echo "PUT TASK:signal: ", ctx[].signal.pointer.repr
|
|
|
|
|
echo "PUT TASK:cancelled: ", ctx[].cancelled.repr
|
|
|
|
|
echo "PUT TASK:semaphore: ", ctx[].semaphore.repr
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
try:
|
2023-09-15 16:40:46 -06:00
|
|
|
waitFor asyncPutTask(ctx, key, data, len)
|
2023-09-15 13:08:38 -06:00
|
|
|
except CatchableError as exc:
|
2023-09-15 16:54:50 -06:00
|
|
|
trace "Unexpected exception thrown in asyncPutTask", exc = exc.msg
|
2023-09-15 13:08:38 -06:00
|
|
|
raiseAssert exc.msg
|
|
|
|
|
|
2023-09-11 14:48:53 -06:00
|
|
|
method put*(
|
2023-09-08 15:09:15 -06:00
|
|
|
self: ThreadDatastore,
|
|
|
|
|
key: Key,
|
|
|
|
|
data: seq[byte]): Future[?!void] {.async.} =
|
2023-09-12 13:51:01 -06:00
|
|
|
|
2023-09-18 14:44:15 -07:00
|
|
|
var
|
2023-09-18 14:18:17 -07:00
|
|
|
ctx = TaskCtx[void].new( ds= self.ds)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
proc runTask() =
|
|
|
|
|
self.tp.spawn putTask(
|
2023-09-18 13:54:56 -07:00
|
|
|
addrOf(ctx),
|
2023-09-12 13:51:01 -06:00
|
|
|
unsafeAddr key,
|
2023-09-14 17:47:37 -06:00
|
|
|
makeUncheckedArray(baseAddr data),
|
2023-09-08 15:09:15 -06:00
|
|
|
data.len)
|
|
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
self.dispatchTask(ctx, key.some, runTask)
|
2023-09-11 14:48:53 -06:00
|
|
|
return success()
|
|
|
|
|
|
|
|
|
|
method put*(
|
|
|
|
|
self: ThreadDatastore,
|
|
|
|
|
batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
|
|
|
|
|
|
|
|
|
for entry in batch:
|
|
|
|
|
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
|
|
|
|
return failure err
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
return success()
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc asyncGetTask(
|
|
|
|
|
ctx: ptr TaskCtx[DataBuffer],
|
|
|
|
|
key: ptr Key) {.async.} =
|
2023-09-12 13:51:01 -06:00
|
|
|
defer:
|
|
|
|
|
discard ctx[].signal.fireSync()
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
let
|
|
|
|
|
fut = ctx[].ds[].get(key[])
|
|
|
|
|
|
|
|
|
|
asyncSpawn signalMonitor(ctx, fut)
|
2023-09-15 16:40:46 -06:00
|
|
|
without res =? (await fut).catch and data =? res, error:
|
|
|
|
|
trace "Error in asyncGetTask", error = error.msg
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.err(error)
|
2023-09-08 15:09:15 -06:00
|
|
|
return
|
|
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.ok(DataBuffer.new(data))
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc getTask(
|
|
|
|
|
ctx: ptr TaskCtx,
|
|
|
|
|
key: ptr Key) =
|
|
|
|
|
## Run get in a thread task
|
|
|
|
|
##
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
waitFor asyncGetTask(ctx, key)
|
|
|
|
|
except CatchableError as exc:
|
2023-09-15 16:54:50 -06:00
|
|
|
trace "Unexpected exception thrown in asyncGetTask", exc = exc.msg
|
2023-09-15 13:08:38 -06:00
|
|
|
raiseAssert exc.msg
|
|
|
|
|
|
2023-09-11 14:48:53 -06:00
|
|
|
method get*(
|
2023-09-08 15:09:15 -06:00
|
|
|
self: ThreadDatastore,
|
|
|
|
|
key: Key): Future[?!seq[byte]] {.async.} =
|
2023-09-12 13:51:01 -06:00
|
|
|
var
|
2023-09-18 14:18:17 -07:00
|
|
|
ctx = TaskCtx[DataBuffer].new(ds= self.ds)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
|
|
|
|
proc runTask() =
|
2023-09-18 13:54:56 -07:00
|
|
|
self.tp.spawn getTask(addrOf(ctx), unsafeAddr key)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
self.dispatchTask(ctx, key.some, runTask)
|
2023-09-18 13:19:07 -07:00
|
|
|
if err =? ctx.res.errorOption:
|
2023-09-14 17:47:37 -06:00
|
|
|
return failure err
|
|
|
|
|
|
2023-09-18 13:19:07 -07:00
|
|
|
return success(@(ctx.res.get()))
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-12 13:51:01 -06:00
|
|
|
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
|
2023-09-18 13:18:51 -06:00
|
|
|
for fut in self.tasks.values.toSeq:
|
|
|
|
|
await fut.cancelAndWait() # probably want to store the signal, instead of the future (or both?)
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-12 13:51:01 -06:00
|
|
|
await self.ds.close()
|
2023-09-08 15:09:15 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc asyncQueryTask(
|
2023-09-13 14:41:16 -06:00
|
|
|
ctx: ptr TaskCtx,
|
2023-09-15 13:08:38 -06:00
|
|
|
iter: ptr QueryIter) {.async.} =
|
2023-09-13 14:41:16 -06:00
|
|
|
defer:
|
|
|
|
|
discard ctx[].signal.fireSync()
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
let
|
|
|
|
|
fut = iter[].next()
|
|
|
|
|
|
|
|
|
|
asyncSpawn signalMonitor(ctx, fut)
|
2023-09-15 16:40:46 -06:00
|
|
|
without ret =? (await fut).catch and res =? ret, error:
|
|
|
|
|
trace "Error in asyncQueryTask", error = error.msg
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.err(error)
|
2023-09-13 14:41:16 -06:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if res.key.isNone:
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.ok((false, default(DataBuffer), default(DataBuffer)))
|
2023-09-13 14:41:16 -06:00
|
|
|
return
|
|
|
|
|
|
|
|
|
|
var
|
|
|
|
|
keyBuf = DataBuffer.new($(res.key.get()))
|
|
|
|
|
dataBuf = DataBuffer.new(res.data)
|
|
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
ctx.res.ok((true, keyBuf, dataBuf))
|
2023-09-13 14:41:16 -06:00
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
proc queryTask(
|
|
|
|
|
ctx: ptr TaskCtx,
|
|
|
|
|
iter: ptr QueryIter) =
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
waitFor asyncQueryTask(ctx, iter)
|
|
|
|
|
except CatchableError as exc:
|
2023-09-15 16:54:50 -06:00
|
|
|
trace "Unexpected exception thrown in asyncQueryTask", exc = exc.msg
|
2023-09-15 13:08:38 -06:00
|
|
|
raiseAssert exc.msg
|
|
|
|
|
|
2023-09-13 14:41:16 -06:00
|
|
|
method query*(
|
|
|
|
|
self: ThreadDatastore,
|
|
|
|
|
query: Query): Future[?!QueryIter] {.async.} =
|
|
|
|
|
without var childIter =? await self.ds.query(query), error:
|
|
|
|
|
return failure error
|
|
|
|
|
|
|
|
|
|
var
|
2023-09-14 18:21:24 -06:00
|
|
|
iter = QueryIter.new()
|
2023-09-15 13:08:38 -06:00
|
|
|
locked = false
|
2023-09-13 14:41:16 -06:00
|
|
|
|
|
|
|
|
proc next(): Future[?!QueryResponse] {.async.} =
|
|
|
|
|
defer:
|
2023-09-15 13:08:38 -06:00
|
|
|
locked = false
|
2023-09-15 16:40:46 -06:00
|
|
|
|
|
|
|
|
trace "About to query"
|
2023-09-15 13:08:38 -06:00
|
|
|
if locked:
|
2023-09-14 18:34:20 -06:00
|
|
|
return failure (ref DatastoreError)(msg: "Should always await query features")
|
|
|
|
|
|
2023-09-15 13:08:38 -06:00
|
|
|
locked = true
|
|
|
|
|
|
2023-09-13 14:41:16 -06:00
|
|
|
if iter.finished == true:
|
|
|
|
|
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
|
|
|
|
|
|
|
|
|
|
if iter.finished == true:
|
|
|
|
|
return success (Key.none, EmptyBytes)
|
|
|
|
|
|
|
|
|
|
var
|
|
|
|
|
|
2023-09-18 14:18:17 -07:00
|
|
|
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)].new( ds= self.ds)
|
2023-09-13 14:41:16 -06:00
|
|
|
|
|
|
|
|
proc runTask() =
|
2023-09-18 13:54:56 -07:00
|
|
|
self.tp.spawn queryTask(addrOf(ctx), addr childIter)
|
2023-09-13 14:41:16 -06:00
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
self.dispatchTask(ctx, Key.none, runTask)
|
2023-09-18 13:04:13 -07:00
|
|
|
if err =? ctx.res.errorOption:
|
2023-09-15 16:40:46 -06:00
|
|
|
trace "Query failed", err = err
|
2023-09-13 14:41:16 -06:00
|
|
|
return failure err
|
|
|
|
|
|
2023-09-18 13:04:13 -07:00
|
|
|
let (ok, key, data) = ctx.res.get()
|
2023-09-13 14:41:16 -06:00
|
|
|
if not ok:
|
|
|
|
|
iter.finished = true
|
|
|
|
|
return success (Key.none, EmptyBytes)
|
|
|
|
|
|
|
|
|
|
return success (Key.init($key).expect("should not fail").some, @(data))
|
|
|
|
|
|
|
|
|
|
iter.next = next
|
|
|
|
|
return success iter
|
|
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
proc new*(
|
2023-09-08 15:09:15 -06:00
|
|
|
self: type ThreadDatastore,
|
|
|
|
|
ds: Datastore,
|
2023-09-15 16:40:46 -06:00
|
|
|
withLocks = static false,
|
2023-09-08 15:09:15 -06:00
|
|
|
tp: Taskpool): ?!ThreadDatastore =
|
2023-09-12 13:51:01 -06:00
|
|
|
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
|
|
|
|
|
|
2023-09-15 16:40:46 -06:00
|
|
|
case withLocks:
|
|
|
|
|
of true:
|
|
|
|
|
success ThreadDatastore(
|
|
|
|
|
tp: tp,
|
|
|
|
|
ds: ds,
|
|
|
|
|
withLocks: true,
|
|
|
|
|
queryLock: newAsyncLock(),
|
|
|
|
|
semaphore: AsyncSemaphore.new(tp.numThreads - 1))
|
|
|
|
|
else:
|
|
|
|
|
success ThreadDatastore(
|
|
|
|
|
tp: tp,
|
|
|
|
|
ds: ds,
|
|
|
|
|
withLocks: false,
|
|
|
|
|
semaphore: AsyncSemaphore.new(tp.numThreads - 1))
|