nim-datastore/datastore/threads/threadproxyds.nim

456 lines
10 KiB
Nim
Raw Permalink Normal View History

2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
when not compileOption("threads"):
{.error: "This module requires --threads:on compilation flag".}
2023-09-08 15:09:15 -06:00
import pkg/upraises
push: {.upraises: [].}
import std/atomics
2023-09-12 13:51:01 -06:00
import std/strutils
2023-09-15 13:08:38 -06:00
import std/tables
2023-09-15 16:40:46 -06:00
import std/sequtils
2023-09-08 15:09:15 -06:00
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable
import pkg/questionable/results
import pkg/stew/ptrops
import pkg/taskpools
2023-09-12 13:51:01 -06:00
import pkg/stew/byteutils
2023-09-15 13:08:38 -06:00
import pkg/chronicles
2023-09-08 15:09:15 -06:00
import ../key
import ../query
import ../datastore
2023-09-15 13:08:38 -06:00
import ./asyncsemaphore
2023-09-12 13:51:01 -06:00
import ./databuffer
import ./threadresult
2023-09-08 15:09:15 -06:00
export threadresult
2023-09-15 13:08:38 -06:00
logScope:
topics = "datastore threadproxyds"
2023-09-08 15:09:15 -06:00
type
2023-09-12 13:51:01 -06:00
TaskCtx[T: ThreadTypes] = object
ds: Datastore
2023-09-12 13:51:01 -06:00
res: ptr ThreadResult[T]
2023-09-15 13:08:38 -06:00
cancelled: bool
semaphore: AsyncSemaphore
2023-09-08 15:09:15 -06:00
signal: ThreadSignalPtr
ThreadDatastore* = ref object of Datastore
2023-09-12 13:51:01 -06:00
tp: Taskpool
ds: Datastore
2023-09-15 13:08:38 -06:00
semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors
2023-09-18 13:18:51 -06:00
withLocks: bool
tasks: Table[Key, Future[void]]
queryLock: AsyncLock # global query lock, this is only really \
# needed for the fsds, but it is expensive!
2023-09-15 16:40:46 -06:00
template withLocks(
self: ThreadDatastore,
ctx: TaskCtx,
key: ?Key = Key.none,
fut: Future[void],
body: untyped): untyped =
2023-09-15 16:40:46 -06:00
try:
2023-09-18 13:18:51 -06:00
if key.isSome and key.get in self.tasks:
if self.withLocks:
2023-09-15 16:40:46 -06:00
await self.tasks[key.get]
2023-09-18 13:18:51 -06:00
self.tasks[key.get] = fut # we alway want to store the future, but only await if we're using locks
2023-09-15 16:40:46 -06:00
2023-09-18 13:18:51 -06:00
if self.withLocks:
await self.queryLock.acquire() # only lock if it's required (fsds)
block:
body
2023-09-15 16:40:46 -06:00
finally:
2023-09-18 13:18:51 -06:00
if self.withLocks:
if key.isSome and key.get in self.tasks:
2023-09-15 16:40:46 -06:00
self.tasks.del(key.get)
2023-09-18 13:18:51 -06:00
if self.queryLock.locked:
self.queryLock.release()
2023-09-08 15:09:15 -06:00
2023-09-18 13:40:44 -06:00
# TODO: needs rework, we can't use `result` with async
template dispatchTask[T](
2023-09-14 17:56:02 -06:00
self: ThreadDatastore,
ctx: TaskCtx[T],
2023-09-15 16:40:46 -06:00
key: ?Key = Key.none,
runTask: proc): auto =
2023-09-18 13:40:44 -06:00
try:
await self.semaphore.acquire()
let signal = ThreadSignalPtr.new()
if signal.isErr:
failure(signal.error)
else:
ctx.signal = signal.get()
let
fut = wait(ctx.signal)
withLocks(self, ctx, key, fut):
runTask()
await fut
if ctx.res[].isErr:
failure ctx.res[].error
else:
when result.T isnot void:
success result.T(ctx.res[].get)
else:
success()
2023-09-18 13:40:44 -06:00
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.cancelled = true
await ctx.signal.fire()
raise exc
finally:
discard ctx.signal.close()
self.semaphore.release()
2023-09-12 13:51:01 -06:00
2023-09-15 13:08:38 -06:00
proc signalMonitor[T](ctx: ptr TaskCtx, fut: Future[T]) {.async.} =
## Monitor the signal and cancel the future if
## the cancellation flag is set
##
if ctx.isNil:
trace "ctx is nil"
return
2023-09-15 13:08:38 -06:00
try:
await ctx[].signal.wait()
trace "Received signal"
if ctx[].cancelled: # there could eventually be other flags
trace "Cancelling future"
if not fut.finished:
await fut.cancelAndWait() # cancel the `has` future
discard ctx[].signal.fireSync()
except CatchableError as exc:
trace "Exception in thread signal monitor", exc = exc.msg
ctx[].res[].err(exc)
discard ctx[].signal.fireSync()
2023-09-12 13:51:01 -06:00
2023-09-15 13:08:38 -06:00
proc asyncHasTask(
ctx: ptr TaskCtx[bool],
key: ptr Key) {.async.} =
if ctx.isNil:
trace "ctx is nil"
return
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
let
2023-09-20 14:00:29 -06:00
key = key[]
fut = ctx[].ds.has(key)
2023-09-15 13:08:38 -06:00
asyncSpawn signalMonitor(ctx, fut)
without ret =? (await fut).catch and res =? ret, error:
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-11 14:48:53 -06:00
return
2023-09-08 15:09:15 -06:00
2023-09-13 14:41:16 -06:00
ctx[].res[].ok(res)
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
proc hasTask(ctx: ptr TaskCtx, key: ptr Key) =
2023-09-19 21:54:02 -06:00
defer:
if not ctx.isNil:
discard ctx[].signal.fireSync()
2023-09-15 13:08:38 -06:00
try:
waitFor asyncHasTask(ctx, key)
except CatchableError as exc:
2023-09-15 16:54:50 -06:00
trace "Unexpected exception thrown in asyncHasTask", exc = exc.msg
2023-09-15 13:08:38 -06:00
raiseAssert exc.msg
2023-09-11 14:48:53 -06:00
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
2023-09-08 15:09:15 -06:00
var
2023-09-19 21:54:02 -06:00
key = key
2023-09-12 13:51:01 -06:00
res = ThreadResult[bool]()
ctx = TaskCtx[bool](
ds: self.ds,
2023-09-18 13:40:44 -06:00
res: addr res)
2023-09-08 15:09:15 -06:00
proc runTask() =
2023-09-19 21:54:02 -06:00
self.tp.spawn hasTask(addr ctx, addr key)
2023-09-08 15:09:15 -06:00
return self.dispatchTask(ctx, key.some, runTask)
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
if ctx.isNil:
trace "ctx is nil"
return
2023-09-12 13:51:01 -06:00
2023-09-15 13:08:38 -06:00
let
2023-09-20 14:00:29 -06:00
key = key[]
fut = ctx[].ds.delete(key)
2023-09-15 13:08:38 -06:00
asyncSpawn signalMonitor(ctx, fut)
without res =? (await fut).catch, error:
2023-09-15 16:40:46 -06:00
trace "Error in asyncDelTask", error = error.msg
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-11 14:48:53 -06:00
return
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
ctx[].res[].ok()
2023-09-15 13:08:38 -06:00
return
proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
2023-09-19 21:54:02 -06:00
defer:
if not ctx.isNil:
discard ctx[].signal.fireSync()
2023-09-15 13:08:38 -06:00
try:
waitFor asyncDelTask(ctx, key)
except CatchableError as exc:
2023-09-15 16:54:50 -06:00
trace "Unexpected exception thrown in asyncDelTask", exc = exc.msg
2023-09-15 13:08:38 -06:00
raiseAssert exc.msg
2023-09-08 15:09:15 -06:00
2023-09-11 14:48:53 -06:00
method delete*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key): Future[?!void] {.async.} =
var
2023-09-19 21:54:02 -06:00
key = key
2023-09-12 13:51:01 -06:00
res = ThreadResult[void]()
ctx = TaskCtx[void](
ds: self.ds,
2023-09-18 13:40:44 -06:00
res: addr res)
2023-09-08 15:09:15 -06:00
proc runTask() =
2023-09-19 21:54:02 -06:00
self.tp.spawn delTask(addr ctx, addr key)
2023-09-08 15:09:15 -06:00
return self.dispatchTask(ctx, key.some, runTask)
2023-09-08 15:09:15 -06:00
2023-09-14 17:47:37 -06:00
method delete*(
self: ThreadDatastore,
keys: seq[Key]): Future[?!void] {.async.} =
2023-09-11 14:48:53 -06:00
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
return success()
2023-09-15 13:08:38 -06:00
proc asyncPutTask(
ctx: ptr TaskCtx[void],
2023-09-08 15:09:15 -06:00
key: ptr Key,
2023-09-14 17:47:37 -06:00
data: ptr UncheckedArray[byte],
2023-09-15 13:08:38 -06:00
len: int) {.async.} =
if ctx.isNil:
trace "ctx is nil"
return
2023-09-12 13:51:01 -06:00
2023-09-15 13:08:38 -06:00
let
2023-09-20 14:00:29 -06:00
key = key[]
data = @(data.toOpenArray(0, len - 1))
fut = ctx[].ds.put(key, data)
2023-09-15 13:08:38 -06:00
asyncSpawn signalMonitor(ctx, fut)
without res =? (await fut).catch, error:
2023-09-15 16:40:46 -06:00
trace "Error in asyncPutTask", error = error.msg
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-11 14:48:53 -06:00
return
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
ctx[].res[].ok()
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
proc putTask(
ctx: ptr TaskCtx,
key: ptr Key,
data: ptr UncheckedArray[byte],
len: int) =
## run put in a thread task
##
2023-09-19 21:54:02 -06:00
defer:
if not ctx.isNil:
discard ctx[].signal.fireSync()
2023-09-15 13:08:38 -06:00
try:
2023-09-15 16:40:46 -06:00
waitFor asyncPutTask(ctx, key, data, len)
2023-09-15 13:08:38 -06:00
except CatchableError as exc:
2023-09-15 16:54:50 -06:00
trace "Unexpected exception thrown in asyncPutTask", exc = exc.msg
2023-09-15 13:08:38 -06:00
raiseAssert exc.msg
2023-09-11 14:48:53 -06:00
method put*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
var
2023-09-19 21:54:02 -06:00
key = key
data = data
2023-09-12 13:51:01 -06:00
res = ThreadResult[void]()
ctx = TaskCtx[void](
ds: self.ds,
2023-09-18 13:40:44 -06:00
res: addr res)
2023-09-08 15:09:15 -06:00
proc runTask() =
self.tp.spawn putTask(
addr ctx,
2023-09-19 21:54:02 -06:00
addr key,
makeUncheckedArray(addr data[0]),
2023-09-08 15:09:15 -06:00
data.len)
return self.dispatchTask(ctx, key.some, runTask)
2023-09-11 14:48:53 -06:00
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
2023-09-08 15:09:15 -06:00
return success()
2023-09-15 13:08:38 -06:00
proc asyncGetTask(
ctx: ptr TaskCtx[DataBuffer],
key: ptr Key) {.async.} =
if ctx.isNil:
trace "ctx is nil"
return
2023-09-12 13:51:01 -06:00
2023-09-15 13:08:38 -06:00
let
2023-09-20 14:00:29 -06:00
key = key[]
fut = ctx[].ds.get(key)
2023-09-15 13:08:38 -06:00
asyncSpawn signalMonitor(ctx, fut)
2023-09-15 16:40:46 -06:00
without res =? (await fut).catch and data =? res, error:
trace "Error in asyncGetTask", error = error.msg
2023-09-12 13:51:01 -06:00
ctx[].res[].err(error)
2023-09-08 15:09:15 -06:00
return
2023-09-20 14:00:29 -06:00
trace "Got data in get"
2023-09-12 13:51:01 -06:00
ctx[].res[].ok(DataBuffer.new(data))
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
proc getTask(
ctx: ptr TaskCtx,
key: ptr Key) =
## Run get in a thread task
##
2023-09-19 21:54:02 -06:00
defer:
if not ctx.isNil:
discard ctx[].signal.fireSync()
2023-09-15 13:08:38 -06:00
try:
waitFor asyncGetTask(ctx, key)
except CatchableError as exc:
2023-09-15 16:54:50 -06:00
trace "Unexpected exception thrown in asyncGetTask", exc = exc.msg
2023-09-15 13:08:38 -06:00
raiseAssert exc.msg
2023-09-11 14:48:53 -06:00
method get*(
2023-09-08 15:09:15 -06:00
self: ThreadDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
2023-09-12 13:51:01 -06:00
var
2023-09-19 21:54:02 -06:00
key = key
2023-09-12 13:51:01 -06:00
res = ThreadResult[DataBuffer]()
ctx = TaskCtx[DataBuffer](
ds: self.ds,
2023-09-18 13:40:44 -06:00
res: addr res)
2023-09-08 15:09:15 -06:00
proc runTask() =
2023-09-19 21:54:02 -06:00
self.tp.spawn getTask(addr ctx, addr key)
2023-09-08 15:09:15 -06:00
return self.dispatchTask(ctx, key.some, runTask)
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
2023-09-18 13:18:51 -06:00
for fut in self.tasks.values.toSeq:
await fut.cancelAndWait() # probably want to store the signal, instead of the future (or both?)
2023-09-08 15:09:15 -06:00
2023-09-12 13:51:01 -06:00
await self.ds.close()
2023-09-08 15:09:15 -06:00
2023-09-15 13:08:38 -06:00
proc asyncQueryTask(
2023-09-13 14:41:16 -06:00
ctx: ptr TaskCtx,
2023-09-15 13:08:38 -06:00
iter: ptr QueryIter) {.async.} =
if ctx.isNil or iter.isNil:
trace "ctx is nil"
return
2023-09-13 14:41:16 -06:00
2023-09-15 13:08:38 -06:00
let
fut = iter[].next()
asyncSpawn signalMonitor(ctx, fut)
2023-09-15 16:40:46 -06:00
without ret =? (await fut).catch and res =? ret, error:
trace "Error in asyncQueryTask", error = error.msg
2023-09-13 14:41:16 -06:00
ctx[].res[].err(error)
return
if res.key.isNone:
ctx[].res[].ok((default(DataBuffer), default(DataBuffer)))
2023-09-13 14:41:16 -06:00
return
var
keyBuf = DataBuffer.new($(res.key.get()))
dataBuf = DataBuffer.new(res.data)
trace "Got query result", key = $res.key.get(), data = res.data
ctx[].res[].ok((keyBuf, dataBuf))
2023-09-13 14:41:16 -06:00
2023-09-15 13:08:38 -06:00
proc queryTask(
ctx: ptr TaskCtx,
iter: ptr QueryIter) =
2023-09-19 21:54:02 -06:00
defer:
if not ctx.isNil:
discard ctx[].signal.fireSync()
2023-09-15 13:08:38 -06:00
try:
waitFor asyncQueryTask(ctx, iter)
except CatchableError as exc:
2023-09-15 16:54:50 -06:00
trace "Unexpected exception thrown in asyncQueryTask", exc = exc.msg
2023-09-15 13:08:38 -06:00
raiseAssert exc.msg
2023-09-13 14:41:16 -06:00
method query*(
self: ThreadDatastore,
query: Query): Future[?!QueryIter] {.async.} =
without var childIter =? await self.ds.query(query), error:
return failure error
var
2023-09-14 18:21:24 -06:00
iter = QueryIter.new()
lock = newAsyncLock() # serialize querying under threads
2023-09-13 14:41:16 -06:00
proc next(): Future[?!QueryResponse] {.async.} =
defer:
if lock.locked:
lock.release()
2023-09-15 16:40:46 -06:00
trace "About to query"
if lock.locked:
2023-09-14 18:34:20 -06:00
return failure (ref DatastoreError)(msg: "Should always await query features")
await lock.acquire()
2023-09-15 13:08:38 -06:00
2023-09-13 14:41:16 -06:00
if iter.finished == true:
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
iter.finished = childIter.finished
2023-09-13 14:41:16 -06:00
var
res = ThreadResult[ThreadQueryRes]()
ctx = TaskCtx[ThreadQueryRes](
ds: self.ds,
2023-09-18 13:40:44 -06:00
res: addr res)
2023-09-13 14:41:16 -06:00
proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter)
return self.dispatchTask(ctx, Key.none, runTask)
2023-09-13 14:41:16 -06:00
iter.next = next
return success iter
2023-09-15 16:40:46 -06:00
proc new*(
2023-09-08 15:09:15 -06:00
self: type ThreadDatastore,
ds: Datastore,
2023-09-15 16:40:46 -06:00
withLocks = static false,
2023-09-08 15:09:15 -06:00
tp: Taskpool): ?!ThreadDatastore =
2023-09-12 13:51:01 -06:00
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
success ThreadDatastore(
tp: tp,
ds: ds,
withLocks: withLocks,
queryLock: newAsyncLock(),
semaphore: AsyncSemaphore.new(tp.numThreads - 1))