160 lines
4.3 KiB
Nim
Raw Normal View History

2023-08-29 15:15:00 -07:00
import std/options
2023-08-29 16:40:11 -07:00
import std/algorithm
2022-09-19 17:13:11 -06:00
import pkg/upraises
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
2022-09-19 17:13:11 -06:00
2022-07-15 15:28:42 -05:00
import ./key
import ./types
2023-09-05 17:08:40 -07:00
import ./threads/databuffer
import ./threads/threadresults
2023-08-29 16:40:11 -07:00
export options, SortOrder
2022-07-15 15:28:42 -05:00
type
2022-09-12 12:30:52 -06:00
Query* = object
key*: Key # Key to be queried
value*: bool # Flag to indicate if data should be returned
limit*: int # Max items to return - not available in all backends
offset*: int # Offset from which to start querying - not available in all backends
sort*: SortOrder # Sort order - not available in all backends
2022-07-15 15:28:42 -05:00
QueryResponse* = tuple[key: ?Key, data: seq[byte]]
QueryEndedError* = object of DatastoreError
2022-09-19 17:13:11 -06:00
GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.}
QueryIter* = ref object
finished*: bool
readyForNext*: bool
2022-09-19 17:13:11 -06:00
next*: GetNext
dispose*: IterDispose
2022-09-19 17:13:11 -06:00
iterator items*(q: QueryIter): Future[?!QueryResponse] =
while not q.finished:
if not q.readyForNext:
raise newException(FutureDefect, "query iterator not ready for next Future")
yield q.next()
proc waitForAllQueryResults*(qi: ?!QueryIter): Future[?!seq[QueryResponse]] {.async.} =
## for large blocks this would be *expensive*
var res: seq[QueryResponse]
without iter =? qi, err:
return failure err
while not iter.finished:
let val = await iter.next()
if val.isOk():
let qr = val.tryGet()
if qr.key.isSome:
res.add qr
else:
return failure val.error()
let rd = await iter.dispose()
if rd.isErr():
return failure(rd.error())
return success res
2022-09-12 12:30:52 -06:00
2023-08-30 18:47:40 -07:00
proc waitForAllQueryResults*(iter: Future[?!QueryIter]
): Future[?!seq[QueryResponse]] {.async.} =
let res = await iter
2023-08-30 18:46:30 -07:00
return await waitForAllQueryResults(res)
proc defaultDispose(): Future[?!void] {.upraises: [], gcsafe, async.} =
return success()
proc new*(T: type QueryIter, dispose = defaultDispose): T =
QueryIter(dispose: dispose)
2022-07-15 15:28:42 -05:00
proc init*(
T: type Query,
2022-09-12 12:30:52 -06:00
key: Key,
value = true,
2023-08-29 16:40:11 -07:00
sort = Ascending,
offset = 0,
2023-09-05 13:39:13 -07:00
limit = -1
): T =
Query(
2022-09-12 12:30:52 -06:00
key: key,
2022-09-19 15:52:34 -06:00
value: value,
sort: sort,
offset: offset,
2022-09-12 12:30:52 -06:00
limit: limit)
2023-08-29 14:58:33 -07:00
2023-08-29 15:15:00 -07:00
type
2023-09-05 13:48:26 -07:00
## These type are equivalent thread-safe types
## for copying / sharing query data between threads.
##
2023-08-29 15:15:00 -07:00
QueryBuffer* = object
key*: KeyBuffer # Key to be queried
value*: bool # Flag to indicate if data should be returned
limit*: int # Max items to return - not available in all backends
offset*: int # Offset from which to start querying - not available in all backends
2023-08-29 16:40:11 -07:00
sort*: SortOrder # Sort order - not available in all backends
2023-08-29 15:15:00 -07:00
QueryResponseBuffer* = object
key*: KeyBuffer
data*: ValueBuffer
2023-09-05 13:39:13 -07:00
proc threadSafeType*(tp: typedesc[QueryResponseBuffer]) =
2023-09-05 13:45:03 -07:00
## QueryResponseBuffer is a thread-safe type
2023-09-05 13:39:13 -07:00
discard
2023-08-29 15:15:00 -07:00
2023-08-29 14:58:33 -07:00
proc toBuffer*(q: Query): QueryBuffer =
## convert Query to thread-safe QueryBuffer
return QueryBuffer(
key: KeyBuffer.new(q.key),
value: q.value,
offset: q.offset,
2023-08-29 16:40:11 -07:00
sort: q.sort
2023-08-29 14:58:33 -07:00
)
2023-08-29 15:15:00 -07:00
2023-08-29 15:20:57 -07:00
proc toQuery*(qb: QueryBuffer): Query =
## convert QueryBuffer to regular Query
Query(
key: qb.key.toKey().expect("key expected"),
value: qb.value,
limit: qb.limit,
offset: qb.offset,
2023-08-29 16:40:11 -07:00
sort: qb.sort
2023-08-29 15:20:57 -07:00
)
2023-08-29 15:15:00 -07:00
proc toBuffer*(q: QueryResponse): QueryResponseBuffer =
## convert QueryReponses to thread safe type
var kb: KeyBuffer
if q.key.isSome():
kb = KeyBuffer.new(q.key.get())
var kv: KeyBuffer
if q.data.len() > 0:
kv = ValueBuffer.new(q.data)
QueryResponseBuffer(key: kb, data: kv)
proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse =
## convert QueryReponseBuffer to regular QueryResponse
let key =
if qb.key.isNil: none(Key)
else: some qb.key.toKey().expect("key response should work")
let data =
if qb.data.isNil: EmptyBytes
else: qb.data.toSeq(byte)
(key: key, data: data)
2023-09-05 13:41:38 -07:00
proc convert*(
ret: TResult[QueryResponseBuffer],
tp: typedesc[QueryResponse]
): Result[QueryResponse, ref CatchableError] =
## helper to convert `TResult[QueryReponseBuffer]` to
## a regular `Result[QueryResponse]` type
2023-09-05 13:39:13 -07:00
if ret[].results.isOk():
result.ok(ret[].results.get().toQueryResponse())
else:
2023-09-05 13:48:26 -07:00
let exc: ref CatchableError =
ret[].results.error().toCatchable()
2023-09-05 13:39:13 -07:00
result.err(exc)