From b7454d6e3d2c89f4ec1dae72a33d27c5e7c7140f Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 11 Sep 2023 14:48:53 -0600 Subject: [PATCH] foreign buffer --- datastore/memoryds.nim | 8 +- datastore/query.nim | 2 +- datastore/threads/databuffer.nim | 92 ------------ datastore/threads/foreignbuffer.nim | 82 ++++++----- datastore/threads/threadproxyds.nim | 130 +++++++++-------- datastore/threads/threadproxyds_.nim | 202 -------------------------- datastore/threads/threadresult.nim | 60 -------- tests/datastore/testthreadproxyds.nim | 9 +- 8 files changed, 120 insertions(+), 465 deletions(-) delete mode 100644 datastore/threads/databuffer.nim delete mode 100644 datastore/threads/threadproxyds_.nim delete mode 100644 datastore/threads/threadresult.nim diff --git a/datastore/memoryds.nim b/datastore/memoryds.nim index fd02fe9..5745bf3 100644 --- a/datastore/memoryds.nim +++ b/datastore/memoryds.nim @@ -21,9 +21,8 @@ type store*: Table[Key, seq[byte]] method has*( - self: MemoryDatastore, - key: Key -): Future[?!bool] {.async.} = + self: MemoryDatastore, + key: Key): Future[?!bool] {.async.} = return success self.store.hasKey(key) @@ -113,5 +112,4 @@ method close*(self: MemoryDatastore): Future[?!void] {.async.} = return success() func new*(tp: type MemoryDatastore): MemoryDatastore = - var self = tp() - return self + MemoryDatastore() diff --git a/datastore/query.nim b/datastore/query.nim index 5f9c913..e2ca118 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -7,7 +7,7 @@ import pkg/questionable/results import ./key import ./types -import ./threads/databuffer +# import ./threads/databuffer export options, SortOrder type diff --git a/datastore/threads/databuffer.nim b/datastore/threads/databuffer.nim deleted file mode 100644 index 67193e0..0000000 --- a/datastore/threads/databuffer.nim +++ /dev/null @@ -1,92 +0,0 @@ - -import threading/smartptrs -import std/hashes -import pkg/stew/results -import pkg/upraises - -push: {.upraises: [].} - -import ../key - -export hashes - -type - DataBufferHolder* = object - buf: ptr UncheckedArray[byte] - size: int - - DataBuffer* = SharedPtr[DataBufferHolder] ##\ - ## A fixed length data buffer using a SharedPtr. - ## It is thread safe even with `refc` since - ## it doesn't use string or seq types internally. - ## - - KeyBuffer* = DataBuffer - ValueBuffer* = DataBuffer - StringBuffer* = DataBuffer - - CatchableErrorBuffer* = object - msg: StringBuffer - -proc `=destroy`*(x: var DataBufferHolder) = - ## copy pointer implementation - if x.buf != nil: - # when isMainModule or true: - # echo "buffer: FREE: ", repr x.buf.pointer - deallocShared(x.buf) - -proc len*(a: DataBuffer): int = a[].size - -proc isNil*(a: DataBuffer): bool = smartptrs.isNil(a) - -proc hash*(a: DataBuffer): Hash = - a[].buf.toOpenArray(0, a[].size-1).hash() - -proc `==`*(a, b: DataBuffer): bool = - if a.isNil and b.isNil: return true - elif a.isNil or b.isNil: return false - elif a[].size != b[].size: return false - elif a[].buf == b[].buf: return true - else: a.hash() == b.hash() - -proc new*(tp: typedesc[DataBuffer], size: int = 0): DataBuffer = - ## allocate new buffer with given size - newSharedPtr(DataBufferHolder( - buf: cast[typeof(result[].buf)](allocShared0(size)), - size: size, - )) - -proc new*[T: byte | char](tp: typedesc[DataBuffer], data: openArray[T]): DataBuffer = - ## allocate new buffer and copies indata from openArray - ## - result = DataBuffer.new(data.len) - if data.len() > 0: - copyMem(result[].buf, unsafeAddr data[0], data.len) - -proc toSeq*[T: byte | char](a: DataBuffer, tp: typedesc[T]): seq[T] = - ## convert buffer to a seq type using copy and either a byte or char - result = newSeq[T](a.len) - copyMem(addr result[0], unsafeAddr a[].buf[0], a.len) - -proc toString*(data: DataBuffer): string = - ## convert buffer to string type using copy - if data.isNil: return "" - result = newString(data.len()) - if data.len() > 0: - copyMem(addr result[0], unsafeAddr data[].buf[0], data.len) - -proc toCatchable*(err: CatchableErrorBuffer): ref CatchableError = - ## convert back to a ref CatchableError - result = (ref CatchableError)(msg: err.msg.toString()) - -proc toBuffer*(err: ref Exception): CatchableErrorBuffer = - ## convert exception to an object with StringBuffer - return CatchableErrorBuffer( - msg: StringBuffer.new(err.msg) - ) - -proc new*(tp: typedesc[KeyBuffer], key: Key): KeyBuffer = - KeyBuffer.new(key.id()) - -proc toKey*(kb: KeyBuffer): Result[Key, ref CatchableError] = - Key.init(kb.toString()) diff --git a/datastore/threads/foreignbuffer.nim b/datastore/threads/foreignbuffer.nim index b2002ea..54a8720 100644 --- a/datastore/threads/foreignbuffer.nim +++ b/datastore/threads/foreignbuffer.nim @@ -1,8 +1,10 @@ +import std/locks + type - ## Copy foreign buffers between threads. + ## Pass foreign buffers between threads. ## - ## This is meant to be used as a temporary holder a + ## This is meant to be used as temporary holder ## pointer to a foreign buffer that is being passed ## between threads. ## @@ -11,58 +13,64 @@ type ## used with refgc. ## ForeignBuff*[T] = object + lock: Lock buf: ptr UncheckedArray[T] len: int cell: ForeignCell -proc `=sink`[T](a: var ForeignBuff[T], b: ForeignBuff[T]) = - `=destroy`(a) - wasMoved(a) - a.len = b.len - a.buf = b.buf - a.cell = b.cell +proc `=sink`[T](self: var ForeignBuff[T], b: ForeignBuff[T]) = + withLock(self.lock): + `=destroy`(self) + wasMoved(self) + self.len = b.len + self.buf = b.buf + self.cell = b.cell -proc `=copy`[T](a: var ForeignBuff[T], b: ForeignBuff[T]) - {.error: "You can't copy the buffer, only it's contents!".} +proc `=copy`[T](self: var ForeignBuff[T], b: ForeignBuff[T]) {.error.} proc `=destroy`[T](self: var ForeignBuff[T]) = - if self.cell.data != nil: - echo "DESTROYING CELL" - dispose self.cell + withLock(self.lock): + if self.cell.data != nil: + echo "DESTROYING CELL" + dispose self.cell proc len*[T](self: ForeignBuff[T]): int = return self.len -template `[]`*[T](self: ForeignBuff[T], idx: int): T = - assert idx >= 0 and idx < self.len - return self.buf[idx] - -template `[]=`*[T](self: ForeignBuff[T], idx: int, val: T) = - assert idx >= 0 and idx < self.len - return self.buf[idx] - proc get*[T](self: ForeignBuff[T]): ptr UncheckedArray[T] = self.buf -iterator items*[T](self: ForeignBuff[T]): T = - for i in 0 ..< self.len: - yield self.buf[i] - -iterator miterms*[T](self: ForeignBuff[T]): var T = - for i in 0 ..< self.len: - yield self.buf[i] - proc attach*[T]( self: var ForeignBuff[T], - buf: ptr UncheckedArray[T], - len: int, - cell: ForeignCell) = - ## Attach a foreign pointer to this buffer + buf: openArray[T]) = + ## Attach self foreign pointer to this buffer ## + withLock(self.lock): + self.buf = makeUncheckedArray[T](baseAddr buf) + self.len = buf.len + self.cell = protect(self.buf) - self.buf = buf - self.len = len - self.cell = cell +func attached*[T]() = + ## Check if self foreign pointer is attached to this buffer + ## + withLock(self.lock): + return self.but != nil and self.cell.data != nil + +## NOTE: Converters might return copies of the buffer, +## this should be overall safe since we want to copy +## the buffer local GC anyway. +converter toSeq*[T](self: ForeignBuff[T]): seq[T] | lent seq[T] = + @(self.buf.toOpenArray(0, self.len - 1)) + +converter toString*[T](self: ForeignBuff[T]): string | lent string = + $(self.buf.toOpenArray(0, self.len - 1)) + +converter getVal*[T](self: ForeignBuff[T]): ptr UncheckedArray[T] = + self.buf func init*[T](_: type ForeignBuff[T]): ForeignBuff[T] = - return ForeignBuff[T]() + var + lock = Lock() + + lock.initLock() + ForeignBuff[T](lock: lock) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 71a1bbe..b0b44e9 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -1,4 +1,7 @@ +when not compileOption("threads"): + {.error: "This module requires --threads:on compilation flag".} + import pkg/upraises push: {.upraises: [].} @@ -19,50 +22,49 @@ import ../datastore import ./foreignbuffer type - TaskRes = object + ThreadResults = object ok: Atomic[bool] - msg: ptr cstring + msg: ForeignBuff[char] TaskCtx = object ds: ptr Datastore - res: TaskRes + res: ptr ThreadResults signal: ThreadSignalPtr ThreadDatastore* = ref object of Datastore tp*: Taskpool ds*: Datastore +proc success(self: var ThreadResults) {.inline.} = + self.ok.store(true) + +proc failure(self: var ThreadResults, msg: var string) {.inline.} = + self.ok.store(false) + self.msg.attach(msg.toOpenArray(0, msg.high)) + proc hasTask( ctx: ptr TaskCtx, key: ptr Key, doesHave: ptr bool) = - let - res = (waitFor ctx[].ds[].has(key[])).catch + without res =? (waitFor ctx[].ds[].has(key[])).catch, error: + ctx[].res[].failure(error.msg) + return - if res.isErr: - var - err = cstring(res.error().msg) - ctx[].res.msg = addr err - else: - ctx[].res.msg = nil - doesHave[] = res.get().get() - - ctx[].res.ok.store(res.isOk) + doesHave[] = res.get() + ctx[].res[].success() discard ctx[].signal.fireSync() -proc has*( - self: ThreadDatastore, - key: Key): Future[?!bool] {.async.} = - +method has*(self: ThreadDatastore, key: Key): Future[?!bool] {.async.} = var signal = ThreadSignalPtr.new().valueOr: return failure("Failed to create signal") key = key + res = ThreadResults() ctx = TaskCtx( ds: addr self.ds, - res: TaskRes(msg: nil), + res: addr res, signal: signal) doesHave = false @@ -73,30 +75,22 @@ proc has*( runTask() await wait(ctx.signal) - var data: bool if ctx.res.ok.load() == false: - return failure("error") + return failure($(ctx.res.msg)) return success(doesHave) finally: ctx.signal.close() proc delTask(ctx: ptr TaskCtx, key: ptr Key) = + without res =? (waitFor ctx[].ds[].delete(key[])).catch, error: + ctx[].res[].failure(error.msg) + return - let - res = (waitFor ctx[].ds[].delete(key[])).catch - - if res.isErr: - var - err = cstring(res.error().msg) - ctx[].res.msg = addr err - else: - ctx[].res.msg = nil - - ctx[].res.ok.store(res.isOk) + ctx[].res[].ok.store(true) discard ctx[].signal.fireSync() -proc delete*( +method delete*( self: ThreadDatastore, key: Key): Future[?!void] {.async.} = @@ -105,9 +99,10 @@ proc delete*( return failure("Failed to create signal") key = key + res = ThreadResults() ctx = TaskCtx( ds: addr self.ds, - res: TaskRes(msg: nil), + res: addr res, signal: signal) proc runTask() = @@ -124,6 +119,13 @@ proc delete*( finally: ctx.signal.close() +method delete*(self: ThreadDatastore, keys: seq[Key]): Future[?!void] {.async.} = + for key in keys: + if err =? (await self.delete(key)).errorOption: + return failure err + + return success() + proc putTask( ctx: ptr TaskCtx, key: ptr Key, @@ -132,21 +134,16 @@ proc putTask( ## run put in a thread task ## - let - res = (waitFor ctx[].ds[].put( + without res =? (waitFor ctx[].ds[].put( key[], - @(toOpenArray(data, 0, len - 1)))).catch + @(toOpenArray(data, 0, len - 1)))).catch, error: + ctx[].res[].failure(error.msg) + return - if res.isErr: - var err = cstring(res.error().msg) - ctx[].res.msg = addr err - else: - ctx[].res.msg = nil - - ctx[].res.ok.store(res.isOk) + ctx[].res[].ok.store(true) discard ctx[].signal.fireSync() -proc put*( +method put*( self: ThreadDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = @@ -156,15 +153,17 @@ proc put*( return failure("Failed to create signal") key = key data = data + res = ThreadResults() ctx = TaskCtx( ds: addr self.ds, - res: TaskRes(msg: nil), + res: addr res, signal: signal) proc runTask() = self.tp.spawn putTask( addr ctx, - addr key, makeUncheckedArray(baseAddr data), + addr key, + makeUncheckedArray(baseAddr data), data.len) try: @@ -173,8 +172,18 @@ proc put*( finally: ctx.signal.close() - if ctx.res.ok.load() == false: - return failure("error") + if ctx.res[].ok.load() == false: + return failure($(ctx.res[].msg)) + + return success() + +method put*( + self: ThreadDatastore, + batch: seq[BatchEntry]): Future[?!void] {.async.} = + + for entry in batch: + if err =? (await self.put(entry.key, entry.data)).errorOption: + return failure err return success() @@ -186,21 +195,18 @@ proc getTask( ## without res =? (waitFor ctx[].ds[].get(key[])).catch, error: - var err = cstring(error.msg) - ctx[].res.msg = addr err + var err = error.msg + ctx[].res[].failure(error.msg) return var data = res.get() - cell = protect(addr data) - ctx[].res.msg = nil - buf[].attach( - makeUncheckedArray(baseAddr data), data.len, cell) - ctx[].res.ok.store(res.isOk) + buf[].attach(data) + ctx[].res[].ok.store(res.isOk) discard ctx[].signal.fireSync() -proc get*( +method get*( self: ThreadDatastore, key: Key): Future[?!seq[byte]] {.async.} = @@ -210,9 +216,10 @@ proc get*( key = key buf = ForeignBuff[byte].init() + res = ThreadResults() ctx = TaskCtx( ds: addr self.ds, - res: TaskRes(msg: nil), + res: addr res, signal: signal) proc runTask() = @@ -223,14 +230,13 @@ proc get*( await wait(ctx.signal) if ctx.res.ok.load() == false: - return failure("error") + return failure($(ctx.res[].msg)) - var data = @(toOpenArray(buf.get(), 0, buf.len - 1)) - return success(data) + return success(buf.toSeq()) finally: ctx.signal.close() -proc new*( +func new*( self: type ThreadDatastore, ds: Datastore, tp: Taskpool): ?!ThreadDatastore = diff --git a/datastore/threads/threadproxyds_.nim b/datastore/threads/threadproxyds_.nim deleted file mode 100644 index 4410f10..0000000 --- a/datastore/threads/threadproxyds_.nim +++ /dev/null @@ -1,202 +0,0 @@ -import std/tables - -import pkg/chronos -import pkg/chronos/threadsync -import pkg/questionable -import pkg/questionable/results -import pkg/upraises -import pkg/taskpools -import pkg/stew/results -import pkg/threading/smartptrs - -import ./key -import ./query -import ./datastore -import ./threadbackend -import ./fsds - -export key, query - -push: {.upraises: [].} - -type - ThreadProxyDatastore* = ref object of Datastore - tds: ThreadDatastorePtr - -method has*( - self: ThreadProxyDatastore, - key: Key -): Future[?!bool] {.async.} = - - without ret =? newThreadResult(bool), err: - return failure(err) - - try: - has(ret, self.tds, key) - await wait(ret[].signal) - finally: - ret[].signal.close() - - return ret.convert(bool) - -method delete*( - self: ThreadProxyDatastore, - key: Key -): Future[?!void] {.async.} = - - without ret =? newThreadResult(void), err: - return failure(err) - - try: - delete(ret, self.tds, key) - await wait(ret[].signal) - finally: - ret[].signal.close() - - return ret.convert(void) - -method delete*( - self: ThreadProxyDatastore, - keys: seq[Key] -): Future[?!void] {.async.} = - - for key in keys: - if err =? (await self.delete(key)).errorOption: - return failure err - - return success() - -method get*( - self: ThreadProxyDatastore, - key: Key -): Future[?!seq[byte]] {.async.} = - ## implements batch get - ## - ## note: this implementation is rather naive and should - ## probably be switched to use a single ThreadSignal - ## for the entire batch - - without ret =? newThreadResult(ValueBuffer), err: - return failure(err) - - try: - get(ret, self.tds, key) - await wait(ret[].signal) - finally: - ret[].signal.close() - - return ret.convert(seq[byte]) - -method put*( - self: ThreadProxyDatastore, - key: Key, - data: seq[byte] -): Future[?!void] {.async.} = - - without ret =? newThreadResult(void), err: - return failure(err) - - try: - put(ret, self.tds, key, data) - await wait(ret[].signal) - finally: - ret[].signal.close() - - return ret.convert(void) - -method put*( - self: ThreadProxyDatastore, - batch: seq[BatchEntry] -): Future[?!void] {.async.} = - ## implements batch put - ## - ## note: this implementation is rather naive and should - ## probably be switched to use a single ThreadSignal - ## for the entire batch - - for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: - return failure err - - return success() - -import pretty - -method query*( - self: ThreadProxyDatastore, - query: Query -): Future[?!QueryIter] {.async.} = - - without ret =? newThreadResult(QueryResponseBuffer), err: - return failure(err) - - echo "\n\n=== Query Start === " - - ## we need to setup the query iter on the main thread - ## to keep it's lifetime associated with this async Future - without it =? await self.tds[].ds.query(query), err: - ret.failure(err) - - var iter = newSharedPtr(QueryIterStore) - ## note that bypasses SharedPtr isolation - may need `protect` here? - iter[].it = it - - var iterWrapper = QueryIter.new() - - proc next(): Future[?!QueryResponse] {.async.} = - print "query:next:start: " - iterWrapper.finished = iter[].it.finished - if not iter[].it.finished: - query(ret, self.tds, iter) - await wait(ret[].signal) - echo "" - print "query:post: ", ret[].results - print "query:post:finished: ", iter[].it.finished - print "query:post: ", " qrb:key: ", ret[].results.get().key.toString() - print "query:post: ", " qrb:data: ", ret[].results.get().data.toString() - result = ret.convert(QueryResponse) - else: - result = success (Key.none, EmptyBytes) - - proc dispose(): Future[?!void] {.async.} = - iter[].it = nil # ensure our sharedptr doesn't try and dealloc - ret[].signal.close() - return success() - - iterWrapper.next = next - iterWrapper.dispose = dispose - return success iterWrapper - -method close*( - self: ThreadProxyDatastore -): Future[?!void] {.async.} = - # TODO: how to handle failed close? - result = success() - - without res =? self.tds[].ds.close(), err: - result = failure(err) - # GC_unref(self.tds[].ds) ## TODO: is this needed? - - if self.tds[].tp != nil: - ## this can block... how to handle? maybe just leak? - self.tds[].tp.shutdown() - - self[].tds[].ds = nil # ensure our sharedptr doesn't try and dealloc - -proc newThreadProxyDatastore*( - ds: Datastore, -): ?!ThreadProxyDatastore = - ## create a new - - var self = ThreadProxyDatastore() - let value = newSharedPtr(ThreadDatastore) - # GC_ref(ds) ## TODO: is this needed? - try: - value[].ds = ds - value[].tp = Taskpool.new(num_threads = 2) - except Exception as exc: - return err((ref DatastoreError)(msg: exc.msg)) - - self.tds = value - - success self diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim deleted file mode 100644 index 2692093..0000000 --- a/datastore/threads/threadresult.nim +++ /dev/null @@ -1,60 +0,0 @@ - -import threading/smartptrs - -import pkg/upraises -push: {.upraises: [].} - -import pkg/chronos/threadsync - -import ./foreignbuffer - -type - CatchableErrorBuffer = ForeignBuffer[ref CatchableError] - - ThreadResult*[T] = object - signal*: ThreadSignalPtr - results*: Result[T, CatchableErrorBuffer] - - TResult*[T] = SharedPtr[ThreadResult[T]] - -proc success*[T](ret: TResult[T], value: T) = - ret[].results.ok(value) - -proc success*[T: void](ret: TResult[T]) = - ret[].results.ok() - -proc failure*[T](ret: TResult[T], exc: ref Exception) = - ret[].results.err(exc.toBuffer()) - -proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] = - if ret[].results.isOk(): - when S is seq[byte]: - result.ok(ret[].results.get().toSeq(byte)) - elif S is string: - result.ok(ret[].results.get().toString()) - elif S is void: - result.ok() - # elif S is QueryResponse: - # result.ok(ret[].results.get().toQueryResponse()) - else: - result.ok(ret[].results.get()) - else: - let exc: ref CatchableError = ret[].results.error().toCatchable() - result.err(exc) - -proc new*[T]( - self: type ThreadResult, - tp: typedesc[T]): Result[TResult[T], ref CatchableError] = - ## Create a new ThreadResult for type T - ## - - let - res = newSharedPtr(ThreadResult[T]) - signal = ThreadSignalPtr.new() - - if signal.isErr: - return err((ref CatchableError)(msg: signal.error())) - else: - res[].signal = signal.get() - - ok res diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 0f79536..8ca8481 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -62,19 +62,16 @@ suite "Test Basic ThreadProxyDatastore": key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes - taskPool: TaskPool + taskPool: Taskpool setupAll: memStore = MemoryDatastore.new() - taskPool = TaskPool.new(3) - ds = ThreadDatastore.new(memStore, taskPool).expect("should work") + taskPool = Taskpool.new(2) + ds = ThreadDatastore.new(memStore, taskPool).tryGet() teardownAll: (await memStore.close()).get() - # test "check put": - # (await ds.put(key, bytes)).tryGet() - basicStoreTests(ds, key, bytes, otherBytes) # suite "Test Query":