handle error passing and conversion better

This commit is contained in:
Dmitriy Ryajov 2023-09-19 18:43:00 -06:00
parent 3d33820729
commit 0beb3d3c90
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
4 changed files with 93 additions and 64 deletions

View File

@ -6,6 +6,8 @@ import pkg/questionable/results
import ./key
import ./types
export types
export options, SortOrder
type
@ -17,7 +19,6 @@ type
sort*: SortOrder # Sort order - not available in all backends
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
QueryEndedError* = object of DatastoreError
GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe.}
IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.}

View File

@ -26,14 +26,14 @@ import ../datastore
import ./asyncsemaphore
import ./databuffer
import ./threadresult
export threadresult
logScope:
topics = "datastore threadproxyds"
type
ErrorEnum {.pure.} = enum
DatastoreErr, DatastoreKeyNotFoundErr, CatchableErr
ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple | Atomic
ThreadResult[T: ThreadTypes] = Result[T, DataBuffer]
TaskCtx[T: ThreadTypes] = object
ds: Datastore
res: ptr ThreadResult[T]
@ -56,7 +56,7 @@ template withLocks(
ctx: TaskCtx,
key: ?Key = Key.none,
fut: Future[void],
body: untyped) =
body: untyped): untyped =
try:
if key.isSome and key.get in self.tasks:
if self.withLocks:
@ -65,7 +65,9 @@ template withLocks(
if self.withLocks:
await self.queryLock.acquire() # only lock if it's required (fsds)
body
block:
body
finally:
if self.withLocks:
if key.isSome and key.get in self.tasks:
@ -74,26 +76,31 @@ template withLocks(
self.queryLock.release()
# TODO: needs rework, we can't use `result` with async
template dispatchTask(
template dispatchTask[T](
self: ThreadDatastore,
ctx: TaskCtx,
ctx: TaskCtx[T],
key: ?Key = Key.none,
runTask: proc): untyped =
runTask: proc): auto =
try:
await self.semaphore.acquire()
ctx.signal = ThreadSignalPtr.new().valueOr:
result = failure(error())
return
let signal = ThreadSignalPtr.new()
if signal.isErr:
failure(signal.error)
else:
ctx.signal = signal.get()
let
fut = wait(ctx.signal)
let
fut = wait(ctx.signal)
withLocks(self, ctx, key, fut):
runTask()
await fut
if ctx.res[].isErr:
result = failure(ctx.res[].error()) # TODO: fix this, result shouldn't be accessed from a thread
withLocks(self, ctx, key, fut):
runTask()
await fut
if ctx.res[].isErr:
failure ctx.res[].error
else:
when result.T isnot void:
success result.T(ctx.res[].get)
else:
success()
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.cancelled = true
@ -156,8 +163,7 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
proc runTask() =
self.tp.spawn hasTask(addr ctx, unsafeAddr key)
self.dispatchTask(ctx, key.some, runTask)
return success(res.get())
return self.dispatchTask(ctx, key.some, runTask)
proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} =
defer:
@ -194,8 +200,7 @@ method delete*(
proc runTask() =
self.tp.spawn delTask(addr ctx, unsafeAddr key)
self.dispatchTask(ctx, key.some, runTask)
return success()
return self.dispatchTask(ctx, key.some, runTask)
method delete*(
self: ThreadDatastore,
@ -257,8 +262,7 @@ method put*(
makeUncheckedArray(baseAddr data),
data.len)
self.dispatchTask(ctx, key.some, runTask)
return success()
return self.dispatchTask(ctx, key.some, runTask)
method put*(
self: ThreadDatastore,
@ -285,6 +289,7 @@ proc asyncGetTask(
ctx[].res[].err(error)
return
trace "Got data in get", data
ctx[].res[].ok(DataBuffer.new(data))
proc getTask(
@ -311,11 +316,7 @@ method get*(
proc runTask() =
self.tp.spawn getTask(addr ctx, unsafeAddr key)
self.dispatchTask(ctx, key.some, runTask)
if err =? res.errorOption:
return failure err
return success(@(res.get()))
return self.dispatchTask(ctx, key.some, runTask)
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
for fut in self.tasks.values.toSeq:
@ -339,14 +340,15 @@ proc asyncQueryTask(
return
if res.key.isNone:
ctx[].res[].ok((false, default(DataBuffer), default(DataBuffer)))
ctx[].res[].ok((default(DataBuffer), default(DataBuffer)))
return
var
keyBuf = DataBuffer.new($(res.key.get()))
dataBuf = DataBuffer.new(res.data)
ctx[].res[].ok((true, keyBuf, dataBuf))
trace "Got query result", key = $res.key.get(), data = res.data
ctx[].res[].ok((keyBuf, dataBuf))
proc queryTask(
ctx: ptr TaskCtx,
@ -385,25 +387,15 @@ method query*(
return success (Key.none, EmptyBytes)
var
res = ThreadResult[(bool, DataBuffer, DataBuffer)]()
ctx = TaskCtx[(bool, DataBuffer, DataBuffer)](
res = ThreadResult[ThreadQueryRes]()
ctx = TaskCtx[ThreadQueryRes](
ds: self.ds,
res: addr res)
proc runTask() =
self.tp.spawn queryTask(addr ctx, addr childIter)
self.dispatchTask(ctx, Key.none, runTask)
if err =? res.errorOption:
trace "Query failed", err = $err
return failure err
let (ok, key, data) = res.get()
if not ok:
iter.finished = true
return success (Key.none, EmptyBytes)
return success (Key.init($key).expect("should not fail").some, @(data))
return self.dispatchTask(ctx, Key.none, runTask)
iter.next = next
return success iter
@ -415,17 +407,9 @@ proc new*(
tp: Taskpool): ?!ThreadDatastore =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
case withLocks:
of true:
success ThreadDatastore(
tp: tp,
ds: ds,
withLocks: true,
queryLock: newAsyncLock(),
semaphore: AsyncSemaphore.new(tp.numThreads - 1))
else:
success ThreadDatastore(
tp: tp,
ds: ds,
withLocks: false,
semaphore: AsyncSemaphore.new(tp.numThreads - 1))
success ThreadDatastore(
tp: tp,
ds: ds,
withLocks: withLocks,
queryLock: newAsyncLock(),
semaphore: AsyncSemaphore.new(tp.numThreads - 1))

View File

@ -0,0 +1,43 @@
import std/atomics
import std/options
import pkg/questionable/results
import pkg/results
import ../types
import ../query
import ../key
import ./databuffer
type
ErrorEnum* {.pure.} = enum
DatastoreErr,
DatastoreKeyNotFoundErr,
QueryEndedErr,
CatchableErr
ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic
ThreadResErr* = (ErrorEnum, DataBuffer)
ThreadQueryRes* = (DataBuffer, DataBuffer)
ThreadResult*[T: ThreadTypes] = Result[T, ThreadResErr]
converter toThreadErr*(e: ref CatchableError): ThreadResErr {.inline, raises: [].} =
if e of DatastoreKeyNotFound: (ErrorEnum.DatastoreKeyNotFoundErr, DataBuffer.new(e.msg))
elif e of QueryEndedError: (ErrorEnum.QueryEndedErr, DataBuffer.new(e.msg))
elif e of DatastoreError: (DatastoreErr, DataBuffer.new(e.msg))
elif e of CatchableError: (CatchableErr, DataBuffer.new(e.msg))
else: raise (ref Defect)(msg: e.msg)
converter toExc*(e: ThreadResErr): ref CatchableError =
case e[0]:
of ErrorEnum.DatastoreKeyNotFoundErr: (ref DatastoreKeyNotFound)(msg: $e[1])
of ErrorEnum.QueryEndedErr: (ref QueryEndedError)(msg: $e[1])
of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1])
of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1])
converter toQueryResponse*(r: ThreadQueryRes): QueryResponse =
if not r[0].isNil and r[0].len > 0 and key =? Key.init($r[0]):
(key.some, @(r[1]))
else:
(Key.none, EmptyBytes)

View File

@ -6,5 +6,6 @@ const
type
DatastoreError* = object of CatchableError
DatastoreKeyNotFound* = object of DatastoreError
QueryEndedError* = object of DatastoreError
Datastore* = ref object of RootObj