some more cleanup / refactoring

This commit is contained in:
Jaremy Creechley 2023-09-05 13:39:13 -07:00
parent 28227040d3
commit ebf61ca899
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
3 changed files with 111 additions and 94 deletions

View File

@ -8,6 +8,7 @@ import pkg/questionable/results
import ./key
import ./types
import ./databuffer
import ./threadresults
export options, SortOrder
type
@ -66,9 +67,9 @@ proc init*(
value = true,
sort = Ascending,
offset = 0,
limit = -1): T =
T(
limit = -1
): T =
Query(
key: key,
value: value,
sort: sort,
@ -88,12 +89,8 @@ type
key*: KeyBuffer
data*: ValueBuffer
# GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.}
# IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.}
# QueryIter* = ref object
# finished*: bool
# next*: GetNext
# dispose*: IterDispose
proc threadSafeType*(tp: typedesc[QueryResponseBuffer]) =
discard
proc toBuffer*(q: Query): QueryBuffer =
## convert Query to thread-safe QueryBuffer
@ -136,11 +133,12 @@ proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse =
(key: key, data: data)
# proc convert*(ret: TResult[QueryResponseBuffer],
# tp: typedesc[QueryResponse]
# ): Result[QueryResponse, ref CatchableError] =
# if ret[].results.isOk():
# result.ok(ret[].results.get().toString())
# else:
# let exc: ref CatchableError = ret[].results.error().toCatchable()
# result.err(exc)
proc convert*[T: QueryResponseBuffer, S: QueryResponse](
ret: TResult[T],
tp: typedesc[S]
): Result[S, ref CatchableError] =
if ret[].results.isOk():
result.ok(ret[].results.get().toQueryResponse())
else:
let exc: ref CatchableError = ret[].results.error().toCatchable()
result.err(exc)

View File

@ -5,43 +5,20 @@ import pkg/questionable/results
import stew/results
import pkg/upraises
import pkg/taskpools
import pkg/threading/smartptrs
import ./key
import ./query
import ./datastore
import ./databuffer
import threading/smartptrs
import ./threadresults
export key, query, smartptrs, databuffer
export threadresults
push: {.upraises: [].}
type
ThreadSafeTypes* = DataBuffer | void | bool | ThreadDatastorePtr | QueryResponseBuffer ##\
## This is a whitelisting of types that can be used with ThreadResult below
## These types need to be thread safe with refc. That means no
## GC types.
ThreadResult*[T: ThreadSafeTypes] = object
## Encapsulates both the results from a thread but also the cross
## thread signaling mechanism. This makes it easier to keep them
## together.
signal*: ThreadSignalPtr
results*: Result[T, CatchableErrorBuffer]
TResult*[T] = SharedPtr[ThreadResult[T]] ##\
## SharedPtr that allocates a shared buffer and keeps the
## memory allocated until all references to it are gone.
##
## Since ThreadResult is a plain object, and if its lifetime is
## tied to that of an async proc or the thread-backend request
## it could be freed before the other thread is finished.
##
## For example, `myFuture.cancel()` can end an async proc early.
## If the ThreadResult was stored in the async's memory then it'd
## be free'ed along with the rest of the async env. This would
## result in likely memory corruption (use-after-free).
ThreadDatastore* = object
tp*: Taskpool
ds*: Datastore
@ -52,57 +29,6 @@ type
it*: QueryIter
QueryIterPtr* = SharedPtr[QueryIterStore]
proc newThreadResult*[T](
tp: typedesc[T]
): Result[TResult[T], ref CatchableError] =
## Creates a new TResult including allocating
## a new ThreadSignalPtr.
##
## Since allocating the TSP can fail, this returns
## a Result.
let res = newSharedPtr(ThreadResult[T])
let signal = ThreadSignalPtr.new()
if signal.isErr:
return err((ref CatchableError)(msg: signal.error()))
else:
res[].signal = signal.get()
ok res
proc success*[T](ret: TResult[T], value: T) =
## convenience wrapper for `TResult` to make
## "returning" results easier
ret[].results.ok(value)
proc success*[T: void](ret: TResult[T]) =
## convenience wrapper for `TResult` to make
## "returning" results easier
ret[].results.ok()
proc failure*[T](ret: TResult[T], exc: ref Exception) =
## convenience wrapper for `TResult` to make
## "returning" results easier
ret[].results.err(exc.toBuffer())
proc convert*[T, S](ret: TResult[T],
tp: typedesc[S]
): Result[S, ref CatchableError] =
## convenience wrapper for `TResult` to make
## fetching results from `TResult` easier.
if ret[].results.isOk():
when S is seq[byte]:
result.ok(ret[].results.get().toSeq(byte))
elif S is string:
result.ok(ret[].results.get().toString())
elif S is void:
result.ok()
elif S is QueryResponse:
result.ok(ret[].results.get().toQueryResponse())
else:
result.ok(ret[].results.get())
else:
let exc: ref CatchableError = ret[].results.error().toCatchable()
result.err(exc)
proc hasTask*(
ret: TResult[bool],
tds: ThreadDatastorePtr,

View File

@ -0,0 +1,93 @@
import pkg/chronos/threadsync
import pkg/threading/smartptrs
import ./databuffer
export databuffer
export smartptrs
export threadsync
type
ThreadSafeTypes* = DataBuffer | void | bool | SharedPtr ##\
## This is a whitelisting of types that can be used with ThreadResult below
## These types need to be thread safe with refc. That means no
## GC types.
ThreadResult*[T] = object
## Encapsulates both the results from a thread but also the cross
## thread signaling mechanism. This makes it easier to keep them
## together.
signal*: ThreadSignalPtr
results*: Result[T, CatchableErrorBuffer]
TResult*[T] = SharedPtr[ThreadResult[T]] ##\
## SharedPtr that allocates a shared buffer and keeps the
## memory allocated until all references to it are gone.
##
## Since ThreadResult is a plain object, and if its lifetime is
## tied to that of an async proc or the thread-backend request
## it could be freed before the other thread is finished.
##
## For example, `myFuture.cancel()` can end an async proc early.
## If the ThreadResult was stored in the async's memory then it'd
## be free'ed along with the rest of the async env. This would
## result in likely memory corruption (use-after-free).
proc threadSafeType*[T: ThreadSafeTypes](tp: typedesc[T]) =
discard
proc newThreadResult*[T](
tp: typedesc[T]
): Result[TResult[T], ref CatchableError] =
## Creates a new TResult including allocating
## a new ThreadSignalPtr.
##
## Since allocating the TSP can fail, this returns
## a Result.
mixin threadSafeType
when not compiles(threadSafeType):
{.error: "only thread safe types can be used".}
let res = newSharedPtr(ThreadResult[T])
let signal = ThreadSignalPtr.new()
if signal.isErr:
return err((ref CatchableError)(msg: signal.error()))
else:
res[].signal = signal.get()
ok res
proc success*[T](ret: TResult[T], value: T) =
## convenience wrapper for `TResult` to make
## "returning" results easier
ret[].results.ok(value)
proc success*[T: void](ret: TResult[T]) =
## convenience wrapper for `TResult` to make
## "returning" results easier
ret[].results.ok()
proc failure*[T](ret: TResult[T], exc: ref Exception) =
## convenience wrapper for `TResult` to make
## "returning" results easier
ret[].results.err(exc.toBuffer())
proc convert*[T, S](ret: TResult[T],
tp: typedesc[S]
): Result[S, ref CatchableError] =
## convenience wrapper for `TResult` to make
## fetching results from `TResult` easier.
if ret[].results.isOk():
when S is seq[byte]:
result.ok(ret[].results.get().toSeq(byte))
elif S is string:
result.ok(ret[].results.get().toString())
elif S is void:
result.ok()
# elif S is QueryResponse:
# result.ok(ret[].results.get().toQueryResponse())
else:
result.ok(ret[].results.get())
else:
let exc: ref CatchableError = ret[].results.error().toCatchable()
result.err(exc)