mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 06:03:06 +00:00
wip
This commit is contained in:
parent
6c3b0d60fb
commit
146cbcb88a
@ -34,11 +34,11 @@ method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, loc
|
||||
method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} =
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method query*(
|
||||
self: Datastore,
|
||||
query: Query): Future[?!QueryIter] {.base, gcsafe.} =
|
||||
# method query*(
|
||||
# self: Datastore,
|
||||
# query: Query): Future[?!QueryIter] {.base, gcsafe.} =
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
# raiseAssert("Not implemented!")
|
||||
|
||||
proc contains*(self: Datastore, key: Key): Future[bool] {.async.} =
|
||||
return (await self.has(key)) |? false
|
||||
|
||||
@ -11,33 +11,27 @@ import pkg/upraises
|
||||
import ./key
|
||||
import ./query
|
||||
import ./datastore
|
||||
import ./databuffer
|
||||
|
||||
export key, query
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
|
||||
MemoryDatastore* = ref object of Datastore
|
||||
store*: Table[KeyBuffer, ValueBuffer]
|
||||
store*: Table[Key, seq[byte]]
|
||||
|
||||
method has*(
|
||||
self: MemoryDatastore,
|
||||
key: Key
|
||||
): Future[?!bool] {.async.} =
|
||||
|
||||
let dk = KeyBuffer.new(key)
|
||||
return success self.store.hasKey(dk)
|
||||
return success self.store.hasKey(key)
|
||||
|
||||
method delete*(
|
||||
self: MemoryDatastore,
|
||||
key: Key
|
||||
): Future[?!void] {.async.} =
|
||||
self: MemoryDatastore,
|
||||
key: Key): Future[?!void] {.async.} =
|
||||
|
||||
let dk = KeyBuffer.new(key)
|
||||
var val: ValueBuffer
|
||||
discard self.store.pop(dk, val)
|
||||
self.store.del(key)
|
||||
return success()
|
||||
|
||||
method delete*(
|
||||
@ -51,26 +45,20 @@ method delete*(
|
||||
return success()
|
||||
|
||||
method get*(
|
||||
self: MemoryDatastore,
|
||||
key: Key
|
||||
): Future[?!seq[byte]] {.async.} =
|
||||
self: MemoryDatastore,
|
||||
key: Key): Future[?!seq[byte]] {.async.} =
|
||||
|
||||
let dk = KeyBuffer.new(key)
|
||||
if self.store.hasKey(dk):
|
||||
let res = self.store[dk].toSeq(byte)
|
||||
return success res
|
||||
else:
|
||||
self.store.withValue(key, value):
|
||||
return success value[]
|
||||
do:
|
||||
return failure (ref DatastoreError)(msg: "no such key")
|
||||
|
||||
method put*(
|
||||
self: MemoryDatastore,
|
||||
key: Key,
|
||||
data: seq[byte]
|
||||
): Future[?!void] {.async.} =
|
||||
self: MemoryDatastore,
|
||||
key: Key,
|
||||
data: seq[byte]): Future[?!void] {.async.} =
|
||||
|
||||
let dk = KeyBuffer.new(key)
|
||||
let dv = ValueBuffer.new(data)
|
||||
self.store[dk] = dv
|
||||
self.store[key] = data
|
||||
return success()
|
||||
|
||||
method put*(
|
||||
@ -83,47 +71,47 @@ method put*(
|
||||
|
||||
return success()
|
||||
|
||||
proc keyIterator(self: MemoryDatastore, queryKey: string): iterator: KeyBuffer {.gcsafe.} =
|
||||
return iterator(): KeyBuffer {.closure.} =
|
||||
var keys = self.store.keys().toSeq()
|
||||
keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString))
|
||||
for key in keys:
|
||||
if key.toString().startsWith(queryKey):
|
||||
yield key
|
||||
# proc keyIterator(self: MemoryDatastore, queryKey: string): iterator: KeyBuffer {.gcsafe.} =
|
||||
# return iterator(): KeyBuffer {.closure.} =
|
||||
# var keys = self.store.keys().toSeq()
|
||||
# keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString))
|
||||
# for key in keys:
|
||||
# if key.toString().startsWith(queryKey):
|
||||
# yield key
|
||||
|
||||
method query*(
|
||||
self: MemoryDatastore,
|
||||
query: Query,
|
||||
): Future[?!QueryIter] {.async.} =
|
||||
# method query*(
|
||||
# self: MemoryDatastore,
|
||||
# query: Query,
|
||||
# ): Future[?!QueryIter] {.async.} =
|
||||
|
||||
let
|
||||
queryKey = query.key.id()
|
||||
walker = keyIterator(self, queryKey)
|
||||
var
|
||||
iter = QueryIter.new()
|
||||
# let
|
||||
# queryKey = query.key.id()
|
||||
# walker = keyIterator(self, queryKey)
|
||||
# var
|
||||
# iter = QueryIter.new()
|
||||
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
let kb = walker()
|
||||
# proc next(): Future[?!QueryResponse] {.async.} =
|
||||
# let kb = walker()
|
||||
|
||||
if finished(walker):
|
||||
iter.finished = true
|
||||
return success (Key.none, EmptyBytes)
|
||||
# if finished(walker):
|
||||
# iter.finished = true
|
||||
# return success (Key.none, EmptyBytes)
|
||||
|
||||
let key = kb.toKey().expect("should not fail")
|
||||
var ds: ValueBuffer
|
||||
if query.value:
|
||||
ds = self.store[kb]
|
||||
let data = if ds.isNil: EmptyBytes else: ds.toSeq(byte)
|
||||
# let key = kb.toKey().expect("should not fail")
|
||||
# var ds: ValueBuffer
|
||||
# if query.value:
|
||||
# ds = self.store[kb]
|
||||
# let data = if ds.isNil: EmptyBytes else: ds.toSeq(byte)
|
||||
|
||||
return success (key.some, data)
|
||||
# return success (key.some, data)
|
||||
|
||||
iter.next = next
|
||||
return success iter
|
||||
# iter.next = next
|
||||
# return success iter
|
||||
|
||||
method close*(self: MemoryDatastore): Future[?!void] {.async.} =
|
||||
self.store.clear()
|
||||
return success()
|
||||
|
||||
func new*(tp: typedesc[MemoryDatastore]): MemoryDatastore =
|
||||
func new*(tp: type MemoryDatastore): MemoryDatastore =
|
||||
var self = tp()
|
||||
return self
|
||||
|
||||
@ -7,7 +7,7 @@ import pkg/questionable/results
|
||||
|
||||
import ./key
|
||||
import ./types
|
||||
import ./databuffer
|
||||
import ./threads/databuffer
|
||||
export options, SortOrder
|
||||
|
||||
type
|
||||
@ -43,7 +43,7 @@ proc waitForAllQueryResults*(qi: ?!QueryIter): Future[?!seq[QueryResponse]] {.as
|
||||
res.add qr
|
||||
else:
|
||||
return failure val.error()
|
||||
|
||||
|
||||
let rd = await iter.dispose()
|
||||
if rd.isErr():
|
||||
return failure(rd.error())
|
||||
@ -75,18 +75,18 @@ proc init*(
|
||||
offset: offset,
|
||||
limit: limit)
|
||||
|
||||
type
|
||||
# type
|
||||
|
||||
QueryBuffer* = object
|
||||
key*: KeyBuffer # Key to be queried
|
||||
value*: bool # Flag to indicate if data should be returned
|
||||
limit*: int # Max items to return - not available in all backends
|
||||
offset*: int # Offset from which to start querying - not available in all backends
|
||||
sort*: SortOrder # Sort order - not available in all backends
|
||||
# QueryBuffer* = object
|
||||
# key*: KeyBuffer # Key to be queried
|
||||
# value*: bool # Flag to indicate if data should be returned
|
||||
# limit*: int # Max items to return - not available in all backends
|
||||
# offset*: int # Offset from which to start querying - not available in all backends
|
||||
# sort*: SortOrder # Sort order - not available in all backends
|
||||
|
||||
QueryResponseBuffer* = object
|
||||
key*: KeyBuffer
|
||||
data*: ValueBuffer
|
||||
# QueryResponseBuffer* = object
|
||||
# key*: KeyBuffer
|
||||
# data*: ValueBuffer
|
||||
|
||||
# GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.}
|
||||
# IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.}
|
||||
@ -95,46 +95,46 @@ type
|
||||
# next*: GetNext
|
||||
# dispose*: IterDispose
|
||||
|
||||
proc toBuffer*(q: Query): QueryBuffer =
|
||||
## convert Query to thread-safe QueryBuffer
|
||||
return QueryBuffer(
|
||||
key: KeyBuffer.new(q.key),
|
||||
value: q.value,
|
||||
offset: q.offset,
|
||||
sort: q.sort
|
||||
)
|
||||
# proc toBuffer*(q: Query): QueryBuffer =
|
||||
# ## convert Query to thread-safe QueryBuffer
|
||||
# return QueryBuffer(
|
||||
# key: KeyBuffer.new(q.key),
|
||||
# value: q.value,
|
||||
# offset: q.offset,
|
||||
# sort: q.sort
|
||||
# )
|
||||
|
||||
proc toQuery*(qb: QueryBuffer): Query =
|
||||
## convert QueryBuffer to regular Query
|
||||
Query(
|
||||
key: qb.key.toKey().expect("key expected"),
|
||||
value: qb.value,
|
||||
limit: qb.limit,
|
||||
offset: qb.offset,
|
||||
sort: qb.sort
|
||||
)
|
||||
# proc toQuery*(qb: QueryBuffer): Query =
|
||||
# ## convert QueryBuffer to regular Query
|
||||
# Query(
|
||||
# key: qb.key.toKey().expect("key expected"),
|
||||
# value: qb.value,
|
||||
# limit: qb.limit,
|
||||
# offset: qb.offset,
|
||||
# sort: qb.sort
|
||||
# )
|
||||
|
||||
proc toBuffer*(q: QueryResponse): QueryResponseBuffer =
|
||||
## convert QueryReponses to thread safe type
|
||||
var kb: KeyBuffer
|
||||
if q.key.isSome():
|
||||
kb = KeyBuffer.new(q.key.get())
|
||||
var kv: KeyBuffer
|
||||
if q.data.len() > 0:
|
||||
kv = ValueBuffer.new(q.data)
|
||||
# proc toBuffer*(q: QueryResponse): QueryResponseBuffer =
|
||||
# ## convert QueryReponses to thread safe type
|
||||
# var kb: KeyBuffer
|
||||
# if q.key.isSome():
|
||||
# kb = KeyBuffer.new(q.key.get())
|
||||
# var kv: KeyBuffer
|
||||
# if q.data.len() > 0:
|
||||
# kv = ValueBuffer.new(q.data)
|
||||
|
||||
QueryResponseBuffer(key: kb, data: kv)
|
||||
# QueryResponseBuffer(key: kb, data: kv)
|
||||
|
||||
proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse =
|
||||
## convert QueryReponseBuffer to regular QueryResponse
|
||||
let key =
|
||||
if qb.key.isNil: none(Key)
|
||||
else: some qb.key.toKey().expect("key response should work")
|
||||
let data =
|
||||
if qb.data.isNil: EmptyBytes
|
||||
else: qb.data.toSeq(byte)
|
||||
# proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse =
|
||||
# ## convert QueryReponseBuffer to regular QueryResponse
|
||||
# let key =
|
||||
# if qb.key.isNil: none(Key)
|
||||
# else: some qb.key.toKey().expect("key response should work")
|
||||
# let data =
|
||||
# if qb.data.isNil: EmptyBytes
|
||||
# else: qb.data.toSeq(byte)
|
||||
|
||||
(key: key, data: data)
|
||||
# (key: key, data: data)
|
||||
|
||||
# proc convert*(ret: TResult[QueryResponseBuffer],
|
||||
# tp: typedesc[QueryResponse]
|
||||
|
||||
@ -1,217 +0,0 @@
|
||||
import pkg/chronos
|
||||
import pkg/chronos/threadsync
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import stew/results
|
||||
import pkg/upraises
|
||||
import pkg/taskpools
|
||||
|
||||
import ./key
|
||||
import ./query
|
||||
import ./datastore
|
||||
import ./databuffer
|
||||
import threading/smartptrs
|
||||
|
||||
export key, query, smartptrs, databuffer
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
ThreadSafeTypes* = DataBuffer | void | bool | ThreadDatastorePtr | QueryResponseBuffer
|
||||
ThreadResult*[T: ThreadSafeTypes] = object
|
||||
signal*: ThreadSignalPtr
|
||||
results*: Result[T, CatchableErrorBuffer]
|
||||
|
||||
TResult*[T] = SharedPtr[ThreadResult[T]]
|
||||
|
||||
ThreadDatastore* = object
|
||||
tp*: Taskpool
|
||||
ds*: Datastore
|
||||
|
||||
ThreadDatastorePtr* = SharedPtr[ThreadDatastore]
|
||||
|
||||
QueryIterStore* = object
|
||||
it*: QueryIter
|
||||
QueryIterPtr* = SharedPtr[QueryIterStore]
|
||||
|
||||
proc newThreadResult*[T](
|
||||
tp: typedesc[T]
|
||||
): Result[TResult[T], ref CatchableError] =
|
||||
let res = newSharedPtr(ThreadResult[T])
|
||||
let signal = ThreadSignalPtr.new()
|
||||
if signal.isErr:
|
||||
return err((ref CatchableError)(msg: signal.error()))
|
||||
else:
|
||||
res[].signal = signal.get()
|
||||
ok res
|
||||
|
||||
proc success*[T](ret: TResult[T], value: T) =
|
||||
ret[].results.ok(value)
|
||||
|
||||
proc success*[T: void](ret: TResult[T]) =
|
||||
ret[].results.ok()
|
||||
|
||||
proc failure*[T](ret: TResult[T], exc: ref Exception) =
|
||||
ret[].results.err(exc.toBuffer())
|
||||
|
||||
proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] =
|
||||
if ret[].results.isOk():
|
||||
when S is seq[byte]:
|
||||
result.ok(ret[].results.get().toSeq(byte))
|
||||
elif S is string:
|
||||
result.ok(ret[].results.get().toString())
|
||||
elif S is void:
|
||||
result.ok()
|
||||
elif S is QueryResponse:
|
||||
result.ok(ret[].results.get().toQueryResponse())
|
||||
else:
|
||||
result.ok(ret[].results.get())
|
||||
else:
|
||||
let exc: ref CatchableError = ret[].results.error().toCatchable()
|
||||
result.err(exc)
|
||||
|
||||
proc hasTask*(
|
||||
ret: TResult[bool],
|
||||
tds: ThreadDatastorePtr,
|
||||
kb: KeyBuffer,
|
||||
) =
|
||||
without key =? kb.toKey(), err:
|
||||
ret.failure(err)
|
||||
|
||||
try:
|
||||
let res = waitFor tds[].ds.has(key)
|
||||
if res.isErr:
|
||||
ret.failure(res.error())
|
||||
else:
|
||||
ret.success(res.get())
|
||||
discard ret[].signal.fireSync()
|
||||
except CatchableError as err:
|
||||
ret.failure(err)
|
||||
|
||||
proc has*(
|
||||
ret: TResult[bool],
|
||||
tds: ThreadDatastorePtr,
|
||||
key: Key,
|
||||
) =
|
||||
let bkey = StringBuffer.new(key.id())
|
||||
tds[].tp.spawn hasTask(ret, tds, bkey)
|
||||
|
||||
proc getTask*(
|
||||
ret: TResult[DataBuffer],
|
||||
tds: ThreadDatastorePtr,
|
||||
kb: KeyBuffer,
|
||||
) =
|
||||
without key =? kb.toKey(), err:
|
||||
ret.failure(err)
|
||||
try:
|
||||
let res = waitFor tds[].ds.get(key)
|
||||
if res.isErr:
|
||||
ret.failure(res.error())
|
||||
else:
|
||||
let db = DataBuffer.new res.get()
|
||||
ret.success(db)
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
except CatchableError as err:
|
||||
ret.failure(err)
|
||||
|
||||
proc get*(
|
||||
ret: TResult[DataBuffer],
|
||||
tds: ThreadDatastorePtr,
|
||||
key: Key,
|
||||
) =
|
||||
let bkey = StringBuffer.new(key.id())
|
||||
tds[].tp.spawn getTask(ret, tds, bkey)
|
||||
|
||||
|
||||
proc putTask*(
|
||||
ret: TResult[void],
|
||||
tds: ThreadDatastorePtr,
|
||||
kb: KeyBuffer,
|
||||
db: DataBuffer,
|
||||
) =
|
||||
|
||||
without key =? kb.toKey(), err:
|
||||
ret.failure(err)
|
||||
|
||||
let data = db.toSeq(byte)
|
||||
let res = (waitFor tds[].ds.put(key, data)).catch
|
||||
# print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
|
||||
if res.isErr:
|
||||
ret.failure(res.error())
|
||||
else:
|
||||
ret.success()
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
|
||||
proc put*(
|
||||
ret: TResult[void],
|
||||
tds: ThreadDatastorePtr,
|
||||
key: Key,
|
||||
data: seq[byte]
|
||||
) =
|
||||
let bkey = StringBuffer.new(key.id())
|
||||
let bval = DataBuffer.new(data)
|
||||
|
||||
tds[].tp.spawn putTask(ret, tds, bkey, bval)
|
||||
|
||||
|
||||
proc deleteTask*(
|
||||
ret: TResult[void],
|
||||
tds: ThreadDatastorePtr,
|
||||
kb: KeyBuffer,
|
||||
) =
|
||||
|
||||
without key =? kb.toKey(), err:
|
||||
ret.failure(err)
|
||||
|
||||
let res = (waitFor tds[].ds.delete(key)).catch
|
||||
# print "thrbackend: putTask: fire", ret[].signal.fireSync().get()
|
||||
if res.isErr:
|
||||
ret.failure(res.error())
|
||||
else:
|
||||
ret.success()
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
|
||||
import pretty
|
||||
|
||||
proc delete*(
|
||||
ret: TResult[void],
|
||||
tds: ThreadDatastorePtr,
|
||||
key: Key,
|
||||
) =
|
||||
let bkey = StringBuffer.new(key.id())
|
||||
tds[].tp.spawn deleteTask(ret, tds, bkey)
|
||||
|
||||
import os
|
||||
|
||||
proc queryTask*(
|
||||
ret: TResult[QueryResponseBuffer],
|
||||
tds: ThreadDatastorePtr,
|
||||
qiter: QueryIterPtr,
|
||||
) =
|
||||
|
||||
try:
|
||||
os.sleep(100)
|
||||
without res =? waitFor(qiter[].it.next()), err:
|
||||
ret.failure(err)
|
||||
|
||||
let qrb = res.toBuffer()
|
||||
# print "queryTask: ", " res: ", res
|
||||
|
||||
ret.success(qrb)
|
||||
print "queryTask: ", " qrb:key: ", ret[].results.get().key.toString()
|
||||
print "queryTask: ", " qrb:data: ", ret[].results.get().data.toString()
|
||||
|
||||
except Exception as exc:
|
||||
ret.failure(exc)
|
||||
|
||||
discard ret[].signal.fireSync()
|
||||
|
||||
proc query*(
|
||||
ret: TResult[QueryResponseBuffer],
|
||||
tds: ThreadDatastorePtr,
|
||||
qiter: QueryIterPtr,
|
||||
) =
|
||||
tds[].tp.spawn queryTask(ret, tds, qiter)
|
||||
@ -1,5 +1,12 @@
|
||||
|
||||
import threading/smartptrs
|
||||
import std/hashes
|
||||
import pkg/stew/results
|
||||
import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import ../key
|
||||
|
||||
export hashes
|
||||
|
||||
@ -7,20 +14,20 @@ type
|
||||
DataBufferHolder* = object
|
||||
buf: ptr UncheckedArray[byte]
|
||||
size: int
|
||||
|
||||
|
||||
DataBuffer* = SharedPtr[DataBufferHolder] ##\
|
||||
## A fixed length data buffer using a SharedPtr.
|
||||
## It is thread safe even with `refc` since
|
||||
## it doesn't use string or seq types internally.
|
||||
##
|
||||
##
|
||||
|
||||
KeyBuffer* = DataBuffer
|
||||
ValueBuffer* = DataBuffer
|
||||
StringBuffer* = DataBuffer
|
||||
|
||||
CatchableErrorBuffer* = object
|
||||
msg: StringBuffer
|
||||
|
||||
|
||||
proc `=destroy`*(x: var DataBufferHolder) =
|
||||
## copy pointer implementation
|
||||
if x.buf != nil:
|
||||
@ -51,7 +58,7 @@ proc new*(tp: typedesc[DataBuffer], size: int = 0): DataBuffer =
|
||||
|
||||
proc new*[T: byte | char](tp: typedesc[DataBuffer], data: openArray[T]): DataBuffer =
|
||||
## allocate new buffer and copies indata from openArray
|
||||
##
|
||||
##
|
||||
result = DataBuffer.new(data.len)
|
||||
if data.len() > 0:
|
||||
copyMem(result[].buf, unsafeAddr data[0], data.len)
|
||||
@ -78,10 +85,8 @@ proc toBuffer*(err: ref Exception): CatchableErrorBuffer =
|
||||
msg: StringBuffer.new(err.msg)
|
||||
)
|
||||
|
||||
import ./key
|
||||
import stew/results
|
||||
|
||||
proc new*(tp: typedesc[KeyBuffer], key: Key): KeyBuffer =
|
||||
KeyBuffer.new(key.id())
|
||||
|
||||
proc toKey*(kb: KeyBuffer): Result[Key, ref CatchableError] =
|
||||
Key.init(kb.toString())
|
||||
68
datastore/threads/foreignbuffer.nim
Normal file
68
datastore/threads/foreignbuffer.nim
Normal file
@ -0,0 +1,68 @@
|
||||
|
||||
type
|
||||
## Copy foreign buffers between threads.
|
||||
##
|
||||
## This is meant to be used as a temporary holder a
|
||||
## pointer to a foreign buffer that is being passed
|
||||
## between threads.
|
||||
##
|
||||
## The receiving thread should copy the contained buffer
|
||||
## to it's local GC as soon as possible. Should only be
|
||||
## used with refgc.
|
||||
##
|
||||
ForeignBuff*[T] = object
|
||||
buf: ptr UncheckedArray[T]
|
||||
len: int
|
||||
cell: ForeignCell
|
||||
|
||||
proc `=sink`[T](a: var ForeignBuff[T], b: ForeignBuff[T]) =
|
||||
`=destroy`(a)
|
||||
wasMoved(a)
|
||||
a.len = b.len
|
||||
a.buf = b.buf
|
||||
a.cell = b.cell
|
||||
|
||||
proc `=copy`[T](a: var ForeignBuff[T], b: ForeignBuff[T])
|
||||
{.error: "You can't copy the buffer, only it's contents!".}
|
||||
|
||||
proc `=destroy`[T](self: var ForeignBuff[T]) =
|
||||
if self.cell.data != nil:
|
||||
echo "DESTROYING CELL"
|
||||
dispose self.cell
|
||||
|
||||
proc len*[T](self: ForeignBuff[T]): int =
|
||||
return self.len
|
||||
|
||||
template `[]`*[T](self: ForeignBuff[T], idx: int): T =
|
||||
assert idx >= 0 and idx < self.len
|
||||
return self.buf[idx]
|
||||
|
||||
template `[]=`*[T](self: ForeignBuff[T], idx: int, val: T) =
|
||||
assert idx >= 0 and idx < self.len
|
||||
return self.buf[idx]
|
||||
|
||||
proc get*[T](self: ForeignBuff[T]): ptr UncheckedArray[T] =
|
||||
self.buf
|
||||
|
||||
iterator items*[T](self: ForeignBuff[T]): T =
|
||||
for i in 0 ..< self.len:
|
||||
yield self.buf[i]
|
||||
|
||||
iterator miterms*[T](self: ForeignBuff[T]): var T =
|
||||
for i in 0 ..< self.len:
|
||||
yield self.buf[i]
|
||||
|
||||
proc attach*[T](
|
||||
self: var ForeignBuff[T],
|
||||
buf: ptr UncheckedArray[T],
|
||||
len: int,
|
||||
cell: ForeignCell) =
|
||||
## Attach a foreign pointer to this buffer
|
||||
##
|
||||
|
||||
self.buf = buf
|
||||
self.len = len
|
||||
self.cell = cell
|
||||
|
||||
func init*[T](_: type ForeignBuff[T]): ForeignBuff[T] =
|
||||
return ForeignBuff[T]()
|
||||
237
datastore/threads/threadproxyds.nim
Normal file
237
datastore/threads/threadproxyds.nim
Normal file
@ -0,0 +1,237 @@
|
||||
|
||||
import pkg/upraises
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import std/atomics
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronos/threadsync
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/ptrops
|
||||
import pkg/taskpools
|
||||
|
||||
import ../key
|
||||
import ../query
|
||||
import ../datastore
|
||||
|
||||
import ./foreignbuffer
|
||||
|
||||
type
|
||||
TaskRes = object
|
||||
ok: Atomic[bool]
|
||||
msg: ptr cstring
|
||||
|
||||
TaskCtx = object
|
||||
ds: ptr Datastore
|
||||
res: TaskRes
|
||||
signal: ThreadSignalPtr
|
||||
|
||||
ThreadDatastore* = ref object of Datastore
|
||||
tp*: Taskpool
|
||||
ds*: Datastore
|
||||
|
||||
proc hasTask(
|
||||
ctx: ptr TaskCtx,
|
||||
key: ptr Key,
|
||||
doesHave: ptr bool) =
|
||||
|
||||
let
|
||||
res = (waitFor ctx[].ds[].has(key[])).catch
|
||||
|
||||
if res.isErr:
|
||||
var
|
||||
err = cstring(res.error().msg)
|
||||
ctx[].res.msg = addr err
|
||||
else:
|
||||
ctx[].res.msg = nil
|
||||
doesHave[] = res.get().get()
|
||||
|
||||
ctx[].res.ok.store(res.isOk)
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
proc has*(
|
||||
self: ThreadDatastore,
|
||||
key: Key): Future[?!bool] {.async.} =
|
||||
|
||||
var
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure("Failed to create signal")
|
||||
|
||||
key = key
|
||||
ctx = TaskCtx(
|
||||
ds: addr self.ds,
|
||||
res: TaskRes(msg: nil),
|
||||
signal: signal)
|
||||
doesHave = false
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn hasTask(addr ctx, addr key, addr doesHave)
|
||||
|
||||
try:
|
||||
runTask()
|
||||
await wait(ctx.signal)
|
||||
|
||||
var data: bool
|
||||
if ctx.res.ok.load() == false:
|
||||
return failure("error")
|
||||
|
||||
return success(doesHave)
|
||||
finally:
|
||||
ctx.signal.close()
|
||||
|
||||
proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
|
||||
|
||||
let
|
||||
res = (waitFor ctx[].ds[].delete(key[])).catch
|
||||
|
||||
if res.isErr:
|
||||
var
|
||||
err = cstring(res.error().msg)
|
||||
ctx[].res.msg = addr err
|
||||
else:
|
||||
ctx[].res.msg = nil
|
||||
|
||||
ctx[].res.ok.store(res.isOk)
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
proc delete*(
|
||||
self: ThreadDatastore,
|
||||
key: Key): Future[?!void] {.async.} =
|
||||
|
||||
var
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure("Failed to create signal")
|
||||
|
||||
key = key
|
||||
ctx = TaskCtx(
|
||||
ds: addr self.ds,
|
||||
res: TaskRes(msg: nil),
|
||||
signal: signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn delTask(addr ctx, addr key)
|
||||
|
||||
try:
|
||||
runTask()
|
||||
await wait(ctx.signal)
|
||||
|
||||
if ctx.res.ok.load() == false:
|
||||
return failure("error")
|
||||
|
||||
return success()
|
||||
finally:
|
||||
ctx.signal.close()
|
||||
|
||||
proc putTask(
|
||||
ctx: ptr TaskCtx,
|
||||
key: ptr Key,
|
||||
data: ptr UncheckedArray[byte],
|
||||
len: int) =
|
||||
## run put in a thread task
|
||||
##
|
||||
|
||||
let
|
||||
res = (waitFor ctx[].ds[].put(
|
||||
key[],
|
||||
@(toOpenArray(data, 0, len - 1)))).catch
|
||||
|
||||
if res.isErr:
|
||||
var err = cstring(res.error().msg)
|
||||
ctx[].res.msg = addr err
|
||||
else:
|
||||
ctx[].res.msg = nil
|
||||
|
||||
ctx[].res.ok.store(res.isOk)
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
proc put*(
|
||||
self: ThreadDatastore,
|
||||
key: Key,
|
||||
data: seq[byte]): Future[?!void] {.async.} =
|
||||
|
||||
var
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure("Failed to create signal")
|
||||
key = key
|
||||
data = data
|
||||
ctx = TaskCtx(
|
||||
ds: addr self.ds,
|
||||
res: TaskRes(msg: nil),
|
||||
signal: signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn putTask(
|
||||
addr ctx,
|
||||
addr key, makeUncheckedArray(baseAddr data),
|
||||
data.len)
|
||||
|
||||
try:
|
||||
runTask()
|
||||
await wait(ctx.signal)
|
||||
finally:
|
||||
ctx.signal.close()
|
||||
|
||||
if ctx.res.ok.load() == false:
|
||||
return failure("error")
|
||||
|
||||
return success()
|
||||
|
||||
proc getTask(
|
||||
ctx: ptr TaskCtx,
|
||||
key: ptr Key,
|
||||
buf: ptr ForeignBuff[byte]) =
|
||||
## Run get in a thread task
|
||||
##
|
||||
|
||||
without res =? (waitFor ctx[].ds[].get(key[])).catch, error:
|
||||
var err = cstring(error.msg)
|
||||
ctx[].res.msg = addr err
|
||||
return
|
||||
|
||||
var
|
||||
data = res.get()
|
||||
cell = protect(addr data)
|
||||
ctx[].res.msg = nil
|
||||
buf[].attach(
|
||||
makeUncheckedArray(baseAddr data), data.len, cell)
|
||||
|
||||
ctx[].res.ok.store(res.isOk)
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
proc get*(
|
||||
self: ThreadDatastore,
|
||||
key: Key): Future[?!seq[byte]] {.async.} =
|
||||
|
||||
var
|
||||
signal = ThreadSignalPtr.new().valueOr:
|
||||
return failure("Failed to create signal")
|
||||
|
||||
key = key
|
||||
buf = ForeignBuff[byte].init()
|
||||
ctx = TaskCtx(
|
||||
ds: addr self.ds,
|
||||
res: TaskRes(msg: nil),
|
||||
signal: signal)
|
||||
|
||||
proc runTask() =
|
||||
self.tp.spawn getTask(addr ctx, addr key, addr buf)
|
||||
|
||||
try:
|
||||
runTask()
|
||||
await wait(ctx.signal)
|
||||
|
||||
if ctx.res.ok.load() == false:
|
||||
return failure("error")
|
||||
|
||||
var data = @(toOpenArray(buf.get(), 0, buf.len - 1))
|
||||
return success(data)
|
||||
finally:
|
||||
ctx.signal.close()
|
||||
|
||||
proc new*(
|
||||
self: type ThreadDatastore,
|
||||
ds: Datastore,
|
||||
tp: Taskpool): ?!ThreadDatastore =
|
||||
success ThreadDatastore(tp: tp, ds: ds)
|
||||
60
datastore/threads/threadresult.nim
Normal file
60
datastore/threads/threadresult.nim
Normal file
@ -0,0 +1,60 @@
|
||||
|
||||
import threading/smartptrs
|
||||
|
||||
import pkg/upraises
|
||||
push: {.upraises: [].}
|
||||
|
||||
import pkg/chronos/threadsync
|
||||
|
||||
import ./foreignbuffer
|
||||
|
||||
type
|
||||
CatchableErrorBuffer = ForeignBuffer[ref CatchableError]
|
||||
|
||||
ThreadResult*[T] = object
|
||||
signal*: ThreadSignalPtr
|
||||
results*: Result[T, CatchableErrorBuffer]
|
||||
|
||||
TResult*[T] = SharedPtr[ThreadResult[T]]
|
||||
|
||||
proc success*[T](ret: TResult[T], value: T) =
|
||||
ret[].results.ok(value)
|
||||
|
||||
proc success*[T: void](ret: TResult[T]) =
|
||||
ret[].results.ok()
|
||||
|
||||
proc failure*[T](ret: TResult[T], exc: ref Exception) =
|
||||
ret[].results.err(exc.toBuffer())
|
||||
|
||||
proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] =
|
||||
if ret[].results.isOk():
|
||||
when S is seq[byte]:
|
||||
result.ok(ret[].results.get().toSeq(byte))
|
||||
elif S is string:
|
||||
result.ok(ret[].results.get().toString())
|
||||
elif S is void:
|
||||
result.ok()
|
||||
# elif S is QueryResponse:
|
||||
# result.ok(ret[].results.get().toQueryResponse())
|
||||
else:
|
||||
result.ok(ret[].results.get())
|
||||
else:
|
||||
let exc: ref CatchableError = ret[].results.error().toCatchable()
|
||||
result.err(exc)
|
||||
|
||||
proc new*[T](
|
||||
self: type ThreadResult,
|
||||
tp: typedesc[T]): Result[TResult[T], ref CatchableError] =
|
||||
## Create a new ThreadResult for type T
|
||||
##
|
||||
|
||||
let
|
||||
res = newSharedPtr(ThreadResult[T])
|
||||
signal = ThreadSignalPtr.new()
|
||||
|
||||
if signal.isErr:
|
||||
return err((ref CatchableError)(msg: signal.error()))
|
||||
else:
|
||||
res[].signal = signal.get()
|
||||
|
||||
ok res
|
||||
@ -7,75 +7,84 @@ import pkg/asynctest
|
||||
import pkg/chronos
|
||||
import pkg/stew/results
|
||||
import pkg/stew/byteutils
|
||||
import pkg/taskpools
|
||||
|
||||
import pkg/datastore/memoryds
|
||||
import pkg/datastore/threadproxyds
|
||||
import pkg/datastore/threads/threadproxyds
|
||||
|
||||
import ./dscommontests
|
||||
import ./querycommontests
|
||||
# import ./querycommontests
|
||||
|
||||
import pretty
|
||||
|
||||
|
||||
suite "Test Basic ThreadProxyDatastore":
|
||||
var
|
||||
sds: ThreadProxyDatastore
|
||||
mem: MemoryDatastore
|
||||
key1: Key
|
||||
data: seq[byte]
|
||||
# suite "Test Basic ThreadProxyDatastore":
|
||||
# var
|
||||
# sds: ThreadDatastore
|
||||
# mem: MemoryDatastore
|
||||
# key1: Key
|
||||
# data: seq[byte]
|
||||
# taskPool: Taskpool
|
||||
|
||||
setupAll:
|
||||
mem = MemoryDatastore.new()
|
||||
sds = newThreadProxyDatastore(mem).expect("should work")
|
||||
key1 = Key.init("/a").tryGet
|
||||
data = "value for 1".toBytes()
|
||||
# setupAll:
|
||||
# mem = MemoryDatastore.new()
|
||||
# taskPool = TaskPool.new(3)
|
||||
# sds = ThreadDatastore.new(mem, taskPool).expect("should work")
|
||||
# key1 = Key.init("/a").tryGet
|
||||
# data = "value for 1".toBytes()
|
||||
|
||||
test "check put":
|
||||
echo "\n\n=== put ==="
|
||||
let res1 = await sds.put(key1, data)
|
||||
print "res1: ", res1
|
||||
# test "check put":
|
||||
# echo "\n\n=== put ==="
|
||||
# let res1 = await sds.put(key1, data)
|
||||
# print "res1: ", res1
|
||||
|
||||
test "check get":
|
||||
echo "\n\n=== get ==="
|
||||
let res2 = await sds.get(key1)
|
||||
check res2.get() == data
|
||||
var val = ""
|
||||
for c in res2.get():
|
||||
val &= char(c)
|
||||
print "get res2: ", $val
|
||||
# test "check get":
|
||||
# echo "\n\n=== get ==="
|
||||
# let res2 = await sds.get(key1)
|
||||
# check res2.get() == data
|
||||
# var val = ""
|
||||
# for c in res2.get():
|
||||
# val &= char(c)
|
||||
|
||||
# echo "\n\n=== put cancel ==="
|
||||
# # let res1 = await sds.put(key1, "value for 1".toBytes())
|
||||
# let res3 = sds.put(key1, "value for 1".toBytes())
|
||||
# res3.cancel()
|
||||
# # print "res3: ", res3
|
||||
# print "get res2: ", $val
|
||||
|
||||
# # echo "\n\n=== put cancel ==="
|
||||
# # # let res1 = await sds.put(key1, "value for 1".toBytes())
|
||||
# # let res3 = sds.put(key1, "value for 1".toBytes())
|
||||
# # res3.cancel()
|
||||
# # # print "res3: ", res3
|
||||
|
||||
suite "Test Basic ThreadProxyDatastore":
|
||||
|
||||
var
|
||||
memStore: MemoryDatastore
|
||||
ds: ThreadProxyDatastore
|
||||
ds: ThreadDatastore
|
||||
key = Key.init("/a/b").tryGet()
|
||||
bytes = "some bytes".toBytes
|
||||
otherBytes = "some other bytes".toBytes
|
||||
taskPool: TaskPool
|
||||
|
||||
setupAll:
|
||||
memStore = MemoryDatastore.new()
|
||||
ds = newThreadProxyDatastore(memStore).expect("should work")
|
||||
taskPool = TaskPool.new(3)
|
||||
ds = ThreadDatastore.new(memStore, taskPool).expect("should work")
|
||||
|
||||
teardownAll:
|
||||
(await memStore.close()).get()
|
||||
|
||||
# test "check put":
|
||||
# (await ds.put(key, bytes)).tryGet()
|
||||
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
|
||||
suite "Test Query":
|
||||
var
|
||||
mem: MemoryDatastore
|
||||
sds: ThreadProxyDatastore
|
||||
# suite "Test Query":
|
||||
# var
|
||||
# mem: MemoryDatastore
|
||||
# sds: ThreadProxyDatastore
|
||||
|
||||
setup:
|
||||
mem = MemoryDatastore.new()
|
||||
sds = newThreadProxyDatastore(mem).expect("should work")
|
||||
# setup:
|
||||
# mem = MemoryDatastore.new()
|
||||
# sds = newThreadProxyDatastore(mem).expect("should work")
|
||||
|
||||
queryTests(sds, false)
|
||||
# queryTests(sds, false)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user