foreign buffer

This commit is contained in:
Dmitriy Ryajov 2023-09-11 14:48:53 -06:00
parent 146cbcb88a
commit b7454d6e3d
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
8 changed files with 120 additions and 465 deletions

View File

@ -21,9 +21,8 @@ type
store*: Table[Key, seq[byte]]
method has*(
self: MemoryDatastore,
key: Key
): Future[?!bool] {.async.} =
self: MemoryDatastore,
key: Key): Future[?!bool] {.async.} =
return success self.store.hasKey(key)
@ -113,5 +112,4 @@ method close*(self: MemoryDatastore): Future[?!void] {.async.} =
return success()
func new*(tp: type MemoryDatastore): MemoryDatastore =
var self = tp()
return self
MemoryDatastore()

View File

@ -7,7 +7,7 @@ import pkg/questionable/results
import ./key
import ./types
import ./threads/databuffer
# import ./threads/databuffer
export options, SortOrder
type

View File

@ -1,92 +0,0 @@
import threading/smartptrs
import std/hashes
import pkg/stew/results
import pkg/upraises
push: {.upraises: [].}
import ../key
export hashes
type
DataBufferHolder* = object
buf: ptr UncheckedArray[byte]
size: int
DataBuffer* = SharedPtr[DataBufferHolder] ##\
## A fixed length data buffer using a SharedPtr.
## It is thread safe even with `refc` since
## it doesn't use string or seq types internally.
##
KeyBuffer* = DataBuffer
ValueBuffer* = DataBuffer
StringBuffer* = DataBuffer
CatchableErrorBuffer* = object
msg: StringBuffer
proc `=destroy`*(x: var DataBufferHolder) =
## copy pointer implementation
if x.buf != nil:
# when isMainModule or true:
# echo "buffer: FREE: ", repr x.buf.pointer
deallocShared(x.buf)
proc len*(a: DataBuffer): int = a[].size
proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a)
proc hash*(a: DataBuffer): Hash =
a[].buf.toOpenArray(0, a[].size-1).hash()
proc `==`*(a, b: DataBuffer): bool =
if a.isNil and b.isNil: return true
elif a.isNil or b.isNil: return false
elif a[].size != b[].size: return false
elif a[].buf == b[].buf: return true
else: a.hash() == b.hash()
proc new*(tp: typedesc[DataBuffer], size: int = 0): DataBuffer =
## allocate new buffer with given size
newSharedPtr(DataBufferHolder(
buf: cast[typeof(result[].buf)](allocShared0(size)),
size: size,
))
proc new*[T: byte | char](tp: typedesc[DataBuffer], data: openArray[T]): DataBuffer =
## allocate new buffer and copies indata from openArray
##
result = DataBuffer.new(data.len)
if data.len() > 0:
copyMem(result[].buf, unsafeAddr data[0], data.len)
proc toSeq*[T: byte | char](a: DataBuffer, tp: typedesc[T]): seq[T] =
## convert buffer to a seq type using copy and either a byte or char
result = newSeq[T](a.len)
copyMem(addr result[0], unsafeAddr a[].buf[0], a.len)
proc toString*(data: DataBuffer): string =
## convert buffer to string type using copy
if data.isNil: return ""
result = newString(data.len())
if data.len() > 0:
copyMem(addr result[0], unsafeAddr data[].buf[0], data.len)
proc toCatchable*(err: CatchableErrorBuffer): ref CatchableError =
## convert back to a ref CatchableError
result = (ref CatchableError)(msg: err.msg.toString())
proc toBuffer*(err: ref Exception): CatchableErrorBuffer =
## convert exception to an object with StringBuffer
return CatchableErrorBuffer(
msg: StringBuffer.new(err.msg)
)
proc new*(tp: typedesc[KeyBuffer], key: Key): KeyBuffer =
KeyBuffer.new(key.id())
proc toKey*(kb: KeyBuffer): Result[Key, ref CatchableError] =
Key.init(kb.toString())

View File

@ -1,8 +1,10 @@
import std/locks
type
## Copy foreign buffers between threads.
## Pass foreign buffers between threads.
##
## This is meant to be used as a temporary holder a
## This is meant to be used as temporary holder
## pointer to a foreign buffer that is being passed
## between threads.
##
@ -11,58 +13,64 @@ type
## used with refgc.
##
ForeignBuff*[T] = object
lock: Lock
buf: ptr UncheckedArray[T]
len: int
cell: ForeignCell
proc `=sink`[T](a: var ForeignBuff[T], b: ForeignBuff[T]) =
`=destroy`(a)
wasMoved(a)
a.len = b.len
a.buf = b.buf
a.cell = b.cell
proc `=sink`[T](self: var ForeignBuff[T], b: ForeignBuff[T]) =
withLock(self.lock):
`=destroy`(self)
wasMoved(self)
self.len = b.len
self.buf = b.buf
self.cell = b.cell
proc `=copy`[T](a: var ForeignBuff[T], b: ForeignBuff[T])
{.error: "You can't copy the buffer, only it's contents!".}
proc `=copy`[T](self: var ForeignBuff[T], b: ForeignBuff[T]) {.error.}
proc `=destroy`[T](self: var ForeignBuff[T]) =
if self.cell.data != nil:
echo "DESTROYING CELL"
dispose self.cell
withLock(self.lock):
if self.cell.data != nil:
echo "DESTROYING CELL"
dispose self.cell
proc len*[T](self: ForeignBuff[T]): int =
return self.len
template `[]`*[T](self: ForeignBuff[T], idx: int): T =
assert idx >= 0 and idx < self.len
return self.buf[idx]
template `[]=`*[T](self: ForeignBuff[T], idx: int, val: T) =
assert idx >= 0 and idx < self.len
return self.buf[idx]
proc get*[T](self: ForeignBuff[T]): ptr UncheckedArray[T] =
self.buf
iterator items*[T](self: ForeignBuff[T]): T =
for i in 0 ..< self.len:
yield self.buf[i]
iterator miterms*[T](self: ForeignBuff[T]): var T =
for i in 0 ..< self.len:
yield self.buf[i]
proc attach*[T](
self: var ForeignBuff[T],
buf: ptr UncheckedArray[T],
len: int,
cell: ForeignCell) =
## Attach a foreign pointer to this buffer
buf: openArray[T]) =
## Attach self foreign pointer to this buffer
##
withLock(self.lock):
self.buf = makeUncheckedArray[T](baseAddr buf)
self.len = buf.len
self.cell = protect(self.buf)
self.buf = buf
self.len = len
self.cell = cell
func attached*[T]() =
## Check if self foreign pointer is attached to this buffer
##
withLock(self.lock):
return self.but != nil and self.cell.data != nil
## NOTE: Converters might return copies of the buffer,
## this should be overall safe since we want to copy
## the buffer local GC anyway.
converter toSeq*[T](self: ForeignBuff[T]): seq[T] | lent seq[T] =
@(self.buf.toOpenArray(0, self.len - 1))
converter toString*[T](self: ForeignBuff[T]): string | lent string =
$(self.buf.toOpenArray(0, self.len - 1))
converter getVal*[T](self: ForeignBuff[T]): ptr UncheckedArray[T] =
self.buf
func init*[T](_: type ForeignBuff[T]): ForeignBuff[T] =
return ForeignBuff[T]()
var
lock = Lock()
lock.initLock()
ForeignBuff[T](lock: lock)

View File

@ -1,4 +1,7 @@
when not compileOption("threads"):
{.error: "This module requires --threads:on compilation flag".}
import pkg/upraises
push: {.upraises: [].}
@ -19,50 +22,49 @@ import ../datastore
import ./foreignbuffer
type
TaskRes = object
ThreadResults = object
ok: Atomic[bool]
msg: ptr cstring
msg: ForeignBuff[char]
TaskCtx = object
ds: ptr Datastore
res: TaskRes
res: ptr ThreadResults
signal: ThreadSignalPtr
ThreadDatastore* = ref object of Datastore
tp*: Taskpool
ds*: Datastore
proc success(self: var ThreadResults) {.inline.} =
self.ok.store(true)
proc failure(self: var ThreadResults, msg: var string) {.inline.} =
self.ok.store(false)
self.msg.attach(msg.toOpenArray(0, msg.high))
proc hasTask(
ctx: ptr TaskCtx,
key: ptr Key,
doesHave: ptr bool) =
let
res = (waitFor ctx[].ds[].has(key[])).catch
without res =? (waitFor ctx[].ds[].has(key[])).catch, error:
ctx[].res[].failure(error.msg)
return
if res.isErr:
var
err = cstring(res.error().msg)
ctx[].res.msg = addr err
else:
ctx[].res.msg = nil
doesHave[] = res.get().get()
ctx[].res.ok.store(res.isOk)
doesHave[] = res.get()
ctx[].res[].success()
discard ctx[].signal.fireSync()
proc has*(
self: ThreadDatastore,
key: Key): Future[?!bool] {.async.} =
method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} =
var
signal = ThreadSignalPtr.new().valueOr:
return failure("Failed to create signal")
key = key
res = ThreadResults()
ctx = TaskCtx(
ds: addr self.ds,
res: TaskRes(msg: nil),
res: addr res,
signal: signal)
doesHave = false
@ -73,30 +75,22 @@ proc has*(
runTask()
await wait(ctx.signal)
var data: bool
if ctx.res.ok.load() == false:
return failure("error")
return failure($(ctx.res.msg))
return success(doesHave)
finally:
ctx.signal.close()
proc delTask(ctx: ptr TaskCtx, key: ptr Key) =
without res =? (waitFor ctx[].ds[].delete(key[])).catch, error:
ctx[].res[].failure(error.msg)
return
let
res = (waitFor ctx[].ds[].delete(key[])).catch
if res.isErr:
var
err = cstring(res.error().msg)
ctx[].res.msg = addr err
else:
ctx[].res.msg = nil
ctx[].res.ok.store(res.isOk)
ctx[].res[].ok.store(true)
discard ctx[].signal.fireSync()
proc delete*(
method delete*(
self: ThreadDatastore,
key: Key): Future[?!void] {.async.} =
@ -105,9 +99,10 @@ proc delete*(
return failure("Failed to create signal")
key = key
res = ThreadResults()
ctx = TaskCtx(
ds: addr self.ds,
res: TaskRes(msg: nil),
res: addr res,
signal: signal)
proc runTask() =
@ -124,6 +119,13 @@ proc delete*(
finally:
ctx.signal.close()
method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
return success()
proc putTask(
ctx: ptr TaskCtx,
key: ptr Key,
@ -132,21 +134,16 @@ proc putTask(
## run put in a thread task
##
let
res = (waitFor ctx[].ds[].put(
without res =? (waitFor ctx[].ds[].put(
key[],
@(toOpenArray(data, 0, len - 1)))).catch
@(toOpenArray(data, 0, len - 1)))).catch, error:
ctx[].res[].failure(error.msg)
return
if res.isErr:
var err = cstring(res.error().msg)
ctx[].res.msg = addr err
else:
ctx[].res.msg = nil
ctx[].res.ok.store(res.isOk)
ctx[].res[].ok.store(true)
discard ctx[].signal.fireSync()
proc put*(
method put*(
self: ThreadDatastore,
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
@ -156,15 +153,17 @@ proc put*(
return failure("Failed to create signal")
key = key
data = data
res = ThreadResults()
ctx = TaskCtx(
ds: addr self.ds,
res: TaskRes(msg: nil),
res: addr res,
signal: signal)
proc runTask() =
self.tp.spawn putTask(
addr ctx,
addr key, makeUncheckedArray(baseAddr data),
addr key,
makeUncheckedArray(baseAddr data),
data.len)
try:
@ -173,8 +172,18 @@ proc put*(
finally:
ctx.signal.close()
if ctx.res.ok.load() == false:
return failure("error")
if ctx.res[].ok.load() == false:
return failure($(ctx.res[].msg))
return success()
method put*(
self: ThreadDatastore,
batch: seq[BatchEntry]): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
return success()
@ -186,21 +195,18 @@ proc getTask(
##
without res =? (waitFor ctx[].ds[].get(key[])).catch, error:
var err = cstring(error.msg)
ctx[].res.msg = addr err
var err = error.msg
ctx[].res[].failure(error.msg)
return
var
data = res.get()
cell = protect(addr data)
ctx[].res.msg = nil
buf[].attach(
makeUncheckedArray(baseAddr data), data.len, cell)
ctx[].res.ok.store(res.isOk)
buf[].attach(data)
ctx[].res[].ok.store(res.isOk)
discard ctx[].signal.fireSync()
proc get*(
method get*(
self: ThreadDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
@ -210,9 +216,10 @@ proc get*(
key = key
buf = ForeignBuff[byte].init()
res = ThreadResults()
ctx = TaskCtx(
ds: addr self.ds,
res: TaskRes(msg: nil),
res: addr res,
signal: signal)
proc runTask() =
@ -223,14 +230,13 @@ proc get*(
await wait(ctx.signal)
if ctx.res.ok.load() == false:
return failure("error")
return failure($(ctx.res[].msg))
var data = @(toOpenArray(buf.get(), 0, buf.len - 1))
return success(data)
return success(buf.toSeq())
finally:
ctx.signal.close()
proc new*(
func new*(
self: type ThreadDatastore,
ds: Datastore,
tp: Taskpool): ?!ThreadDatastore =

View File

@ -1,202 +0,0 @@
import std/tables
import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import pkg/taskpools
import pkg/stew/results
import pkg/threading/smartptrs
import ./key
import ./query
import ./datastore
import ./threadbackend
import ./fsds
export key, query
push: {.upraises: [].}
type
ThreadProxyDatastore* = ref object of Datastore
tds: ThreadDatastorePtr
method has*(
self: ThreadProxyDatastore,
key: Key
): Future[?!bool] {.async.} =
without ret =? newThreadResult(bool), err:
return failure(err)
try:
has(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
return ret.convert(bool)
method delete*(
self: ThreadProxyDatastore,
key: Key
): Future[?!void] {.async.} =
without ret =? newThreadResult(void), err:
return failure(err)
try:
delete(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
return ret.convert(void)
method delete*(
self: ThreadProxyDatastore,
keys: seq[Key]
): Future[?!void] {.async.} =
for key in keys:
if err =? (await self.delete(key)).errorOption:
return failure err
return success()
method get*(
self: ThreadProxyDatastore,
key: Key
): Future[?!seq[byte]] {.async.} =
## implements batch get
##
## note: this implementation is rather naive and should
## probably be switched to use a single ThreadSignal
## for the entire batch
without ret =? newThreadResult(ValueBuffer), err:
return failure(err)
try:
get(ret, self.tds, key)
await wait(ret[].signal)
finally:
ret[].signal.close()
return ret.convert(seq[byte])
method put*(
self: ThreadProxyDatastore,
key: Key,
data: seq[byte]
): Future[?!void] {.async.} =
without ret =? newThreadResult(void), err:
return failure(err)
try:
put(ret, self.tds, key, data)
await wait(ret[].signal)
finally:
ret[].signal.close()
return ret.convert(void)
method put*(
self: ThreadProxyDatastore,
batch: seq[BatchEntry]
): Future[?!void] {.async.} =
## implements batch put
##
## note: this implementation is rather naive and should
## probably be switched to use a single ThreadSignal
## for the entire batch
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
return success()
import pretty
method query*(
self: ThreadProxyDatastore,
query: Query
): Future[?!QueryIter] {.async.} =
without ret =? newThreadResult(QueryResponseBuffer), err:
return failure(err)
echo "\n\n=== Query Start === "
## we need to setup the query iter on the main thread
## to keep it's lifetime associated with this async Future
without it =? await self.tds[].ds.query(query), err:
ret.failure(err)
var iter = newSharedPtr(QueryIterStore)
## note that bypasses SharedPtr isolation - may need `protect` here?
iter[].it = it
var iterWrapper = QueryIter.new()
proc next(): Future[?!QueryResponse] {.async.} =
print "query:next:start: "
iterWrapper.finished = iter[].it.finished
if not iter[].it.finished:
query(ret, self.tds, iter)
await wait(ret[].signal)
echo ""
print "query:post: ", ret[].results
print "query:post:finished: ", iter[].it.finished
print "query:post: ", " qrb:key: ", ret[].results.get().key.toString()
print "query:post: ", " qrb:data: ", ret[].results.get().data.toString()
result = ret.convert(QueryResponse)
else:
result = success (Key.none, EmptyBytes)
proc dispose(): Future[?!void] {.async.} =
iter[].it = nil # ensure our sharedptr doesn't try and dealloc
ret[].signal.close()
return success()
iterWrapper.next = next
iterWrapper.dispose = dispose
return success iterWrapper
method close*(
self: ThreadProxyDatastore
): Future[?!void] {.async.} =
# TODO: how to handle failed close?
result = success()
without res =? self.tds[].ds.close(), err:
result = failure(err)
# GC_unref(self.tds[].ds) ## TODO: is this needed?
if self.tds[].tp != nil:
## this can block... how to handle? maybe just leak?
self.tds[].tp.shutdown()
self[].tds[].ds = nil # ensure our sharedptr doesn't try and dealloc
proc newThreadProxyDatastore*(
ds: Datastore,
): ?!ThreadProxyDatastore =
## create a new
var self = ThreadProxyDatastore()
let value = newSharedPtr(ThreadDatastore)
# GC_ref(ds) ## TODO: is this needed?
try:
value[].ds = ds
value[].tp = Taskpool.new(num_threads = 2)
except Exception as exc:
return err((ref DatastoreError)(msg: exc.msg))
self.tds = value
success self

View File

@ -1,60 +0,0 @@
import threading/smartptrs
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos/threadsync
import ./foreignbuffer
type
CatchableErrorBuffer = ForeignBuffer[ref CatchableError]
ThreadResult*[T] = object
signal*: ThreadSignalPtr
results*: Result[T, CatchableErrorBuffer]
TResult*[T] = SharedPtr[ThreadResult[T]]
proc success*[T](ret: TResult[T], value: T) =
ret[].results.ok(value)
proc success*[T: void](ret: TResult[T]) =
ret[].results.ok()
proc failure*[T](ret: TResult[T], exc: ref Exception) =
ret[].results.err(exc.toBuffer())
proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] =
if ret[].results.isOk():
when S is seq[byte]:
result.ok(ret[].results.get().toSeq(byte))
elif S is string:
result.ok(ret[].results.get().toString())
elif S is void:
result.ok()
# elif S is QueryResponse:
# result.ok(ret[].results.get().toQueryResponse())
else:
result.ok(ret[].results.get())
else:
let exc: ref CatchableError = ret[].results.error().toCatchable()
result.err(exc)
proc new*[T](
self: type ThreadResult,
tp: typedesc[T]): Result[TResult[T], ref CatchableError] =
## Create a new ThreadResult for type T
##
let
res = newSharedPtr(ThreadResult[T])
signal = ThreadSignalPtr.new()
if signal.isErr:
return err((ref CatchableError)(msg: signal.error()))
else:
res[].signal = signal.get()
ok res

View File

@ -62,19 +62,16 @@ suite "Test Basic ThreadProxyDatastore":
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
taskPool: TaskPool
taskPool: Taskpool
setupAll:
memStore = MemoryDatastore.new()
taskPool = TaskPool.new(3)
ds = ThreadDatastore.new(memStore, taskPool).expect("should work")
taskPool = Taskpool.new(2)
ds = ThreadDatastore.new(memStore, taskPool).tryGet()
teardownAll:
(await memStore.close()).get()
# test "check put":
# (await ds.put(key, bytes)).tryGet()
basicStoreTests(ds, key, bytes, otherBytes)
# suite "Test Query":