diff --git a/datastore/query.nim b/datastore/query.nim index 097d33d..7cae897 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -6,6 +6,8 @@ import pkg/questionable/results import ./key import ./types + +export types export options, SortOrder type @@ -17,7 +19,6 @@ type sort*: SortOrder # Sort order - not available in all backends QueryResponse* = tuple[key: ?Key, data: seq[byte]] - QueryEndedError* = object of DatastoreError GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe.} IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index f0638e9..fc69a69 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -26,14 +26,14 @@ import ../datastore import ./asyncsemaphore import ./databuffer +import ./threadresult + +export threadresult + +logScope: + topics = "datastore threadproxyds" type - ErrorEnum {.pure.} = enum - DatastoreErr, DatastoreKeyNotFoundErr, CatchableErr - - ThreadTypes = void | bool | SomeInteger | DataBuffer | tuple | Atomic - ThreadResult[T: ThreadTypes] = Result[T, DataBuffer] - TaskCtx[T: ThreadTypes] = object ds: Datastore res: ptr ThreadResult[T] @@ -56,7 +56,7 @@ template withLocks( ctx: TaskCtx, key: ?Key = Key.none, fut: Future[void], - body: untyped) = + body: untyped): untyped = try: if key.isSome and key.get in self.tasks: if self.withLocks: @@ -65,7 +65,9 @@ template withLocks( if self.withLocks: await self.queryLock.acquire() # only lock if it's required (fsds) - body + + block: + body finally: if self.withLocks: if key.isSome and key.get in self.tasks: @@ -74,26 +76,31 @@ template withLocks( self.queryLock.release() # TODO: needs rework, we can't use `result` with async -template dispatchTask( +template dispatchTask[T]( self: ThreadDatastore, - ctx: TaskCtx, + ctx: TaskCtx[T], key: ?Key = Key.none, - runTask: proc): untyped = + runTask: proc): auto = try: await self.semaphore.acquire() - ctx.signal = ThreadSignalPtr.new().valueOr: - result = failure(error()) - return + let signal = ThreadSignalPtr.new() + if signal.isErr: + failure(signal.error) + else: + ctx.signal = signal.get() + let + fut = wait(ctx.signal) - let - fut = wait(ctx.signal) - - withLocks(self, ctx, key, fut): - runTask() - await fut - - if ctx.res[].isErr: - result = failure(ctx.res[].error()) # TODO: fix this, result shouldn't be accessed from a thread + withLocks(self, ctx, key, fut): + runTask() + await fut + if ctx.res[].isErr: + failure ctx.res[].error + else: + when result.T isnot void: + success result.T(ctx.res[].get) + else: + success() except CancelledError as exc: trace "Cancelling thread future!", exc = exc.msg ctx.cancelled = true @@ -156,8 +163,7 @@ method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = proc runTask() = self.tp.spawn hasTask(addr ctx, unsafeAddr key) - self.dispatchTask(ctx, key.some, runTask) - return success(res.get()) + return self.dispatchTask(ctx, key.some, runTask) proc asyncDelTask(ctx: ptr TaskCtx[void], key: ptr Key) {.async.} = defer: @@ -194,8 +200,7 @@ method delete*( proc runTask() = self.tp.spawn delTask(addr ctx, unsafeAddr key) - self.dispatchTask(ctx, key.some, runTask) - return success() + return self.dispatchTask(ctx, key.some, runTask) method delete*( self: ThreadDatastore, @@ -257,8 +262,7 @@ method put*( makeUncheckedArray(baseAddr data), data.len) - self.dispatchTask(ctx, key.some, runTask) - return success() + return self.dispatchTask(ctx, key.some, runTask) method put*( self: ThreadDatastore, @@ -285,6 +289,7 @@ proc asyncGetTask( ctx[].res[].err(error) return + trace "Got data in get", data ctx[].res[].ok(DataBuffer.new(data)) proc getTask( @@ -311,11 +316,7 @@ method get*( proc runTask() = self.tp.spawn getTask(addr ctx, unsafeAddr key) - self.dispatchTask(ctx, key.some, runTask) - if err =? res.errorOption: - return failure err - - return success(@(res.get())) + return self.dispatchTask(ctx, key.some, runTask) method close*(self: ThreadDatastore): Future[?!void] {.async.} = for fut in self.tasks.values.toSeq: @@ -339,14 +340,15 @@ proc asyncQueryTask( return if res.key.isNone: - ctx[].res[].ok((false, default(DataBuffer), default(DataBuffer))) + ctx[].res[].ok((default(DataBuffer), default(DataBuffer))) return var keyBuf = DataBuffer.new($(res.key.get())) dataBuf = DataBuffer.new(res.data) - ctx[].res[].ok((true, keyBuf, dataBuf)) + trace "Got query result", key = $res.key.get(), data = res.data + ctx[].res[].ok((keyBuf, dataBuf)) proc queryTask( ctx: ptr TaskCtx, @@ -385,25 +387,15 @@ method query*( return success (Key.none, EmptyBytes) var - res = ThreadResult[(bool, DataBuffer, DataBuffer)]() - ctx = TaskCtx[(bool, DataBuffer, DataBuffer)]( + res = ThreadResult[ThreadQueryRes]() + ctx = TaskCtx[ThreadQueryRes]( ds: self.ds, res: addr res) proc runTask() = self.tp.spawn queryTask(addr ctx, addr childIter) - self.dispatchTask(ctx, Key.none, runTask) - if err =? res.errorOption: - trace "Query failed", err = $err - return failure err - - let (ok, key, data) = res.get() - if not ok: - iter.finished = true - return success (Key.none, EmptyBytes) - - return success (Key.init($key).expect("should not fail").some, @(data)) + return self.dispatchTask(ctx, Key.none, runTask) iter.next = next return success iter @@ -415,17 +407,9 @@ proc new*( tp: Taskpool): ?!ThreadDatastore = doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads" - case withLocks: - of true: - success ThreadDatastore( - tp: tp, - ds: ds, - withLocks: true, - queryLock: newAsyncLock(), - semaphore: AsyncSemaphore.new(tp.numThreads - 1)) - else: - success ThreadDatastore( - tp: tp, - ds: ds, - withLocks: false, - semaphore: AsyncSemaphore.new(tp.numThreads - 1)) + success ThreadDatastore( + tp: tp, + ds: ds, + withLocks: withLocks, + queryLock: newAsyncLock(), + semaphore: AsyncSemaphore.new(tp.numThreads - 1)) diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim new file mode 100644 index 0000000..fcb7ffd --- /dev/null +++ b/datastore/threads/threadresult.nim @@ -0,0 +1,43 @@ +import std/atomics +import std/options + +import pkg/questionable/results +import pkg/results + +import ../types +import ../query +import ../key + +import ./databuffer + +type + ErrorEnum* {.pure.} = enum + DatastoreErr, + DatastoreKeyNotFoundErr, + QueryEndedErr, + CatchableErr + + ThreadTypes* = void | bool | SomeInteger | DataBuffer | tuple | Atomic + ThreadResErr* = (ErrorEnum, DataBuffer) + ThreadQueryRes* = (DataBuffer, DataBuffer) + ThreadResult*[T: ThreadTypes] = Result[T, ThreadResErr] + +converter toThreadErr*(e: ref CatchableError): ThreadResErr {.inline, raises: [].} = + if e of DatastoreKeyNotFound: (ErrorEnum.DatastoreKeyNotFoundErr, DataBuffer.new(e.msg)) + elif e of QueryEndedError: (ErrorEnum.QueryEndedErr, DataBuffer.new(e.msg)) + elif e of DatastoreError: (DatastoreErr, DataBuffer.new(e.msg)) + elif e of CatchableError: (CatchableErr, DataBuffer.new(e.msg)) + else: raise (ref Defect)(msg: e.msg) + +converter toExc*(e: ThreadResErr): ref CatchableError = + case e[0]: + of ErrorEnum.DatastoreKeyNotFoundErr: (ref DatastoreKeyNotFound)(msg: $e[1]) + of ErrorEnum.QueryEndedErr: (ref QueryEndedError)(msg: $e[1]) + of ErrorEnum.DatastoreErr: (ref DatastoreError)(msg: $e[1]) + of ErrorEnum.CatchableErr: (ref CatchableError)(msg: $e[1]) + +converter toQueryResponse*(r: ThreadQueryRes): QueryResponse = + if not r[0].isNil and r[0].len > 0 and key =? Key.init($r[0]): + (key.some, @(r[1])) + else: + (Key.none, EmptyBytes) diff --git a/datastore/types.nim b/datastore/types.nim index b019cdb..473db52 100644 --- a/datastore/types.nim +++ b/datastore/types.nim @@ -6,5 +6,6 @@ const type DatastoreError* = object of CatchableError DatastoreKeyNotFound* = object of DatastoreError + QueryEndedError* = object of DatastoreError Datastore* = ref object of RootObj