diff --git a/datastore/query.nim b/datastore/query.nim index 92aafe9..a0958e9 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -8,6 +8,7 @@ import pkg/questionable/results import ./key import ./types import ./databuffer +import ./threadresults export options, SortOrder type @@ -66,9 +67,9 @@ proc init*( value = true, sort = Ascending, offset = 0, - limit = -1): T = - - T( + limit = -1 +): T = + Query( key: key, value: value, sort: sort, @@ -88,12 +89,8 @@ type key*: KeyBuffer data*: ValueBuffer - # GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.} - # IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} - # QueryIter* = ref object - # finished*: bool - # next*: GetNext - # dispose*: IterDispose +proc threadSafeType*(tp: typedesc[QueryResponseBuffer]) = + discard proc toBuffer*(q: Query): QueryBuffer = ## convert Query to thread-safe QueryBuffer @@ -136,11 +133,12 @@ proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse = (key: key, data: data) -# proc convert*(ret: TResult[QueryResponseBuffer], -# tp: typedesc[QueryResponse] -# ): Result[QueryResponse, ref CatchableError] = -# if ret[].results.isOk(): -# result.ok(ret[].results.get().toString()) -# else: -# let exc: ref CatchableError = ret[].results.error().toCatchable() -# result.err(exc) +proc convert*[T: QueryResponseBuffer, S: QueryResponse]( + ret: TResult[T], + tp: typedesc[S] +): Result[S, ref CatchableError] = + if ret[].results.isOk(): + result.ok(ret[].results.get().toQueryResponse()) + else: + let exc: ref CatchableError = ret[].results.error().toCatchable() + result.err(exc) diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index 29ed620..040b0fb 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -5,43 +5,20 @@ import pkg/questionable/results import stew/results import pkg/upraises import pkg/taskpools +import pkg/threading/smartptrs import ./key import ./query import ./datastore import ./databuffer -import threading/smartptrs +import ./threadresults export key, query, smartptrs, databuffer +export threadresults push: {.upraises: [].} type - ThreadSafeTypes* = DataBuffer | void | bool | ThreadDatastorePtr | QueryResponseBuffer ##\ - ## This is a whitelisting of types that can be used with ThreadResult below - ## These types need to be thread safe with refc. That means no - ## GC types. - - ThreadResult*[T: ThreadSafeTypes] = object - ## Encapsulates both the results from a thread but also the cross - ## thread signaling mechanism. This makes it easier to keep them - ## together. - signal*: ThreadSignalPtr - results*: Result[T, CatchableErrorBuffer] - - TResult*[T] = SharedPtr[ThreadResult[T]] ##\ - ## SharedPtr that allocates a shared buffer and keeps the - ## memory allocated until all references to it are gone. - ## - ## Since ThreadResult is a plain object, and if its lifetime is - ## tied to that of an async proc or the thread-backend request - ## it could be freed before the other thread is finished. - ## - ## For example, `myFuture.cancel()` can end an async proc early. - ## If the ThreadResult was stored in the async's memory then it'd - ## be free'ed along with the rest of the async env. This would - ## result in likely memory corruption (use-after-free). - ThreadDatastore* = object tp*: Taskpool ds*: Datastore @@ -52,57 +29,6 @@ type it*: QueryIter QueryIterPtr* = SharedPtr[QueryIterStore] -proc newThreadResult*[T]( - tp: typedesc[T] -): Result[TResult[T], ref CatchableError] = - ## Creates a new TResult including allocating - ## a new ThreadSignalPtr. - ## - ## Since allocating the TSP can fail, this returns - ## a Result. - let res = newSharedPtr(ThreadResult[T]) - let signal = ThreadSignalPtr.new() - if signal.isErr: - return err((ref CatchableError)(msg: signal.error())) - else: - res[].signal = signal.get() - ok res - -proc success*[T](ret: TResult[T], value: T) = - ## convenience wrapper for `TResult` to make - ## "returning" results easier - ret[].results.ok(value) - -proc success*[T: void](ret: TResult[T]) = - ## convenience wrapper for `TResult` to make - ## "returning" results easier - ret[].results.ok() - -proc failure*[T](ret: TResult[T], exc: ref Exception) = - ## convenience wrapper for `TResult` to make - ## "returning" results easier - ret[].results.err(exc.toBuffer()) - -proc convert*[T, S](ret: TResult[T], - tp: typedesc[S] - ): Result[S, ref CatchableError] = - ## convenience wrapper for `TResult` to make - ## fetching results from `TResult` easier. - if ret[].results.isOk(): - when S is seq[byte]: - result.ok(ret[].results.get().toSeq(byte)) - elif S is string: - result.ok(ret[].results.get().toString()) - elif S is void: - result.ok() - elif S is QueryResponse: - result.ok(ret[].results.get().toQueryResponse()) - else: - result.ok(ret[].results.get()) - else: - let exc: ref CatchableError = ret[].results.error().toCatchable() - result.err(exc) - proc hasTask*( ret: TResult[bool], tds: ThreadDatastorePtr, diff --git a/datastore/threadresults.nim b/datastore/threadresults.nim new file mode 100644 index 0000000..c6f1d42 --- /dev/null +++ b/datastore/threadresults.nim @@ -0,0 +1,93 @@ + +import pkg/chronos/threadsync +import pkg/threading/smartptrs + +import ./databuffer + +export databuffer +export smartptrs +export threadsync + +type + ThreadSafeTypes* = DataBuffer | void | bool | SharedPtr ##\ + ## This is a whitelisting of types that can be used with ThreadResult below + ## These types need to be thread safe with refc. That means no + ## GC types. + + ThreadResult*[T] = object + ## Encapsulates both the results from a thread but also the cross + ## thread signaling mechanism. This makes it easier to keep them + ## together. + signal*: ThreadSignalPtr + results*: Result[T, CatchableErrorBuffer] + + TResult*[T] = SharedPtr[ThreadResult[T]] ##\ + ## SharedPtr that allocates a shared buffer and keeps the + ## memory allocated until all references to it are gone. + ## + ## Since ThreadResult is a plain object, and if its lifetime is + ## tied to that of an async proc or the thread-backend request + ## it could be freed before the other thread is finished. + ## + ## For example, `myFuture.cancel()` can end an async proc early. + ## If the ThreadResult was stored in the async's memory then it'd + ## be free'ed along with the rest of the async env. This would + ## result in likely memory corruption (use-after-free). + +proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) = + discard + +proc newThreadResult*[T]( + tp: typedesc[T] +): Result[TResult[T], ref CatchableError] = + ## Creates a new TResult including allocating + ## a new ThreadSignalPtr. + ## + ## Since allocating the TSP can fail, this returns + ## a Result. + mixin threadSafeType + when not compiles(threadSafeType): + {.error: "only thread safe types can be used".} + + let res = newSharedPtr(ThreadResult[T]) + let signal = ThreadSignalPtr.new() + if signal.isErr: + return err((ref CatchableError)(msg: signal.error())) + else: + res[].signal = signal.get() + ok res + +proc success*[T](ret: TResult[T], value: T) = + ## convenience wrapper for `TResult` to make + ## "returning" results easier + ret[].results.ok(value) + +proc success*[T: void](ret: TResult[T]) = + ## convenience wrapper for `TResult` to make + ## "returning" results easier + ret[].results.ok() + +proc failure*[T](ret: TResult[T], exc: ref Exception) = + ## convenience wrapper for `TResult` to make + ## "returning" results easier + ret[].results.err(exc.toBuffer()) + +proc convert*[T, S](ret: TResult[T], + tp: typedesc[S] + ): Result[S, ref CatchableError] = + ## convenience wrapper for `TResult` to make + ## fetching results from `TResult` easier. + if ret[].results.isOk(): + when S is seq[byte]: + result.ok(ret[].results.get().toSeq(byte)) + elif S is string: + result.ok(ret[].results.get().toString()) + elif S is void: + result.ok() + # elif S is QueryResponse: + # result.ok(ret[].results.get().toQueryResponse()) + else: + result.ok(ret[].results.get()) + else: + let exc: ref CatchableError = ret[].results.error().toCatchable() + result.err(exc) \ No newline at end of file