diff --git a/datastore/datastore.nim b/datastore/datastore.nim index d7fa45f..3e90814 100644 --- a/datastore/datastore.nim +++ b/datastore/datastore.nim @@ -34,11 +34,11 @@ method put*(self: Datastore, batch: seq[BatchEntry]): Future[?!void] {.base, loc method close*(self: Datastore): Future[?!void] {.base, async, locks: "unknown".} = raiseAssert("Not implemented!") -method query*( - self: Datastore, - query: Query): Future[?!QueryIter] {.base, gcsafe.} = +# method query*( +# self: Datastore, +# query: Query): Future[?!QueryIter] {.base, gcsafe.} = - raiseAssert("Not implemented!") +# raiseAssert("Not implemented!") proc contains*(self: Datastore, key: Key): Future[bool] {.async.} = return (await self.has(key)) |? false diff --git a/datastore/memoryds.nim b/datastore/memoryds.nim index dd3192f..fd02fe9 100644 --- a/datastore/memoryds.nim +++ b/datastore/memoryds.nim @@ -11,33 +11,27 @@ import pkg/upraises import ./key import ./query import ./datastore -import ./databuffer export key, query push: {.upraises: [].} type - MemoryDatastore* = ref object of Datastore - store*: Table[KeyBuffer, ValueBuffer] + store*: Table[Key, seq[byte]] method has*( self: MemoryDatastore, key: Key ): Future[?!bool] {.async.} = - let dk = KeyBuffer.new(key) - return success self.store.hasKey(dk) + return success self.store.hasKey(key) method delete*( - self: MemoryDatastore, - key: Key -): Future[?!void] {.async.} = + self: MemoryDatastore, + key: Key): Future[?!void] {.async.} = - let dk = KeyBuffer.new(key) - var val: ValueBuffer - discard self.store.pop(dk, val) + self.store.del(key) return success() method delete*( @@ -51,26 +45,20 @@ method delete*( return success() method get*( - self: MemoryDatastore, - key: Key -): Future[?!seq[byte]] {.async.} = + self: MemoryDatastore, + key: Key): Future[?!seq[byte]] {.async.} = - let dk = KeyBuffer.new(key) - if self.store.hasKey(dk): - let res = self.store[dk].toSeq(byte) - return success res - else: + self.store.withValue(key, value): + return success value[] + do: return failure (ref DatastoreError)(msg: "no such key") method put*( - self: MemoryDatastore, - key: Key, - data: seq[byte] -): Future[?!void] {.async.} = + self: MemoryDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = - let dk = KeyBuffer.new(key) - let dv = ValueBuffer.new(data) - self.store[dk] = dv + self.store[key] = data return success() method put*( @@ -83,47 +71,47 @@ method put*( return success() -proc keyIterator(self: MemoryDatastore, queryKey: string): iterator: KeyBuffer {.gcsafe.} = - return iterator(): KeyBuffer {.closure.} = - var keys = self.store.keys().toSeq() - keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString)) - for key in keys: - if key.toString().startsWith(queryKey): - yield key +# proc keyIterator(self: MemoryDatastore, queryKey: string): iterator: KeyBuffer {.gcsafe.} = +# return iterator(): KeyBuffer {.closure.} = +# var keys = self.store.keys().toSeq() +# keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString)) +# for key in keys: +# if key.toString().startsWith(queryKey): +# yield key -method query*( - self: MemoryDatastore, - query: Query, -): Future[?!QueryIter] {.async.} = +# method query*( +# self: MemoryDatastore, +# query: Query, +# ): Future[?!QueryIter] {.async.} = - let - queryKey = query.key.id() - walker = keyIterator(self, queryKey) - var - iter = QueryIter.new() +# let +# queryKey = query.key.id() +# walker = keyIterator(self, queryKey) +# var +# iter = QueryIter.new() - proc next(): Future[?!QueryResponse] {.async.} = - let kb = walker() +# proc next(): Future[?!QueryResponse] {.async.} = +# let kb = walker() - if finished(walker): - iter.finished = true - return success (Key.none, EmptyBytes) +# if finished(walker): +# iter.finished = true +# return success (Key.none, EmptyBytes) - let key = kb.toKey().expect("should not fail") - var ds: ValueBuffer - if query.value: - ds = self.store[kb] - let data = if ds.isNil: EmptyBytes else: ds.toSeq(byte) +# let key = kb.toKey().expect("should not fail") +# var ds: ValueBuffer +# if query.value: +# ds = self.store[kb] +# let data = if ds.isNil: EmptyBytes else: ds.toSeq(byte) - return success (key.some, data) +# return success (key.some, data) - iter.next = next - return success iter +# iter.next = next +# return success iter method close*(self: MemoryDatastore): Future[?!void] {.async.} = self.store.clear() return success() -func new*(tp: typedesc[MemoryDatastore]): MemoryDatastore = +func new*(tp: type MemoryDatastore): MemoryDatastore = var self = tp() return self diff --git a/datastore/query.nim b/datastore/query.nim index 92aafe9..5f9c913 100644 --- a/datastore/query.nim +++ b/datastore/query.nim @@ -7,7 +7,7 @@ import pkg/questionable/results import ./key import ./types -import ./databuffer +import ./threads/databuffer export options, SortOrder type @@ -43,7 +43,7 @@ proc waitForAllQueryResults*(qi: ?!QueryIter): Future[?!seq[QueryResponse]] {.as res.add qr else: return failure val.error() - + let rd = await iter.dispose() if rd.isErr(): return failure(rd.error()) @@ -75,18 +75,18 @@ proc init*( offset: offset, limit: limit) -type +# type - QueryBuffer* = object - key*: KeyBuffer # Key to be queried - value*: bool # Flag to indicate if data should be returned - limit*: int # Max items to return - not available in all backends - offset*: int # Offset from which to start querying - not available in all backends - sort*: SortOrder # Sort order - not available in all backends +# QueryBuffer* = object +# key*: KeyBuffer # Key to be queried +# value*: bool # Flag to indicate if data should be returned +# limit*: int # Max items to return - not available in all backends +# offset*: int # Offset from which to start querying - not available in all backends +# sort*: SortOrder # Sort order - not available in all backends - QueryResponseBuffer* = object - key*: KeyBuffer - data*: ValueBuffer +# QueryResponseBuffer* = object +# key*: KeyBuffer +# data*: ValueBuffer # GetNext* = proc(): Future[?!QueryResponse] {.upraises: [], gcsafe, closure.} # IterDispose* = proc(): Future[?!void] {.upraises: [], gcsafe.} @@ -95,46 +95,46 @@ type # next*: GetNext # dispose*: IterDispose -proc toBuffer*(q: Query): QueryBuffer = - ## convert Query to thread-safe QueryBuffer - return QueryBuffer( - key: KeyBuffer.new(q.key), - value: q.value, - offset: q.offset, - sort: q.sort - ) +# proc toBuffer*(q: Query): QueryBuffer = +# ## convert Query to thread-safe QueryBuffer +# return QueryBuffer( +# key: KeyBuffer.new(q.key), +# value: q.value, +# offset: q.offset, +# sort: q.sort +# ) -proc toQuery*(qb: QueryBuffer): Query = - ## convert QueryBuffer to regular Query - Query( - key: qb.key.toKey().expect("key expected"), - value: qb.value, - limit: qb.limit, - offset: qb.offset, - sort: qb.sort - ) +# proc toQuery*(qb: QueryBuffer): Query = +# ## convert QueryBuffer to regular Query +# Query( +# key: qb.key.toKey().expect("key expected"), +# value: qb.value, +# limit: qb.limit, +# offset: qb.offset, +# sort: qb.sort +# ) -proc toBuffer*(q: QueryResponse): QueryResponseBuffer = - ## convert QueryReponses to thread safe type - var kb: KeyBuffer - if q.key.isSome(): - kb = KeyBuffer.new(q.key.get()) - var kv: KeyBuffer - if q.data.len() > 0: - kv = ValueBuffer.new(q.data) +# proc toBuffer*(q: QueryResponse): QueryResponseBuffer = +# ## convert QueryReponses to thread safe type +# var kb: KeyBuffer +# if q.key.isSome(): +# kb = KeyBuffer.new(q.key.get()) +# var kv: KeyBuffer +# if q.data.len() > 0: +# kv = ValueBuffer.new(q.data) - QueryResponseBuffer(key: kb, data: kv) +# QueryResponseBuffer(key: kb, data: kv) -proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse = - ## convert QueryReponseBuffer to regular QueryResponse - let key = - if qb.key.isNil: none(Key) - else: some qb.key.toKey().expect("key response should work") - let data = - if qb.data.isNil: EmptyBytes - else: qb.data.toSeq(byte) +# proc toQueryResponse*(qb: QueryResponseBuffer): QueryResponse = +# ## convert QueryReponseBuffer to regular QueryResponse +# let key = +# if qb.key.isNil: none(Key) +# else: some qb.key.toKey().expect("key response should work") +# let data = +# if qb.data.isNil: EmptyBytes +# else: qb.data.toSeq(byte) - (key: key, data: data) +# (key: key, data: data) # proc convert*(ret: TResult[QueryResponseBuffer], # tp: typedesc[QueryResponse] diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim deleted file mode 100644 index 087dc48..0000000 --- a/datastore/threadbackend.nim +++ /dev/null @@ -1,217 +0,0 @@ -import pkg/chronos -import pkg/chronos/threadsync -import pkg/questionable -import pkg/questionable/results -import stew/results -import pkg/upraises -import pkg/taskpools - -import ./key -import ./query -import ./datastore -import ./databuffer -import threading/smartptrs - -export key, query, smartptrs, databuffer - -push: {.upraises: [].} - -type - ThreadSafeTypes* = DataBuffer | void | bool | ThreadDatastorePtr | QueryResponseBuffer - ThreadResult*[T: ThreadSafeTypes] = object - signal*: ThreadSignalPtr - results*: Result[T, CatchableErrorBuffer] - - TResult*[T] = SharedPtr[ThreadResult[T]] - - ThreadDatastore* = object - tp*: Taskpool - ds*: Datastore - - ThreadDatastorePtr* = SharedPtr[ThreadDatastore] - - QueryIterStore* = object - it*: QueryIter - QueryIterPtr* = SharedPtr[QueryIterStore] - -proc newThreadResult*[T]( - tp: typedesc[T] -): Result[TResult[T], ref CatchableError] = - let res = newSharedPtr(ThreadResult[T]) - let signal = ThreadSignalPtr.new() - if signal.isErr: - return err((ref CatchableError)(msg: signal.error())) - else: - res[].signal = signal.get() - ok res - -proc success*[T](ret: TResult[T], value: T) = - ret[].results.ok(value) - -proc success*[T: void](ret: TResult[T]) = - ret[].results.ok() - -proc failure*[T](ret: TResult[T], exc: ref Exception) = - ret[].results.err(exc.toBuffer()) - -proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] = - if ret[].results.isOk(): - when S is seq[byte]: - result.ok(ret[].results.get().toSeq(byte)) - elif S is string: - result.ok(ret[].results.get().toString()) - elif S is void: - result.ok() - elif S is QueryResponse: - result.ok(ret[].results.get().toQueryResponse()) - else: - result.ok(ret[].results.get()) - else: - let exc: ref CatchableError = ret[].results.error().toCatchable() - result.err(exc) - -proc hasTask*( - ret: TResult[bool], - tds: ThreadDatastorePtr, - kb: KeyBuffer, -) = - without key =? kb.toKey(), err: - ret.failure(err) - - try: - let res = waitFor tds[].ds.has(key) - if res.isErr: - ret.failure(res.error()) - else: - ret.success(res.get()) - discard ret[].signal.fireSync() - except CatchableError as err: - ret.failure(err) - -proc has*( - ret: TResult[bool], - tds: ThreadDatastorePtr, - key: Key, -) = - let bkey = StringBuffer.new(key.id()) - tds[].tp.spawn hasTask(ret, tds, bkey) - -proc getTask*( - ret: TResult[DataBuffer], - tds: ThreadDatastorePtr, - kb: KeyBuffer, -) = - without key =? kb.toKey(), err: - ret.failure(err) - try: - let res = waitFor tds[].ds.get(key) - if res.isErr: - ret.failure(res.error()) - else: - let db = DataBuffer.new res.get() - ret.success(db) - - discard ret[].signal.fireSync() - except CatchableError as err: - ret.failure(err) - -proc get*( - ret: TResult[DataBuffer], - tds: ThreadDatastorePtr, - key: Key, -) = - let bkey = StringBuffer.new(key.id()) - tds[].tp.spawn getTask(ret, tds, bkey) - - -proc putTask*( - ret: TResult[void], - tds: ThreadDatastorePtr, - kb: KeyBuffer, - db: DataBuffer, -) = - - without key =? kb.toKey(), err: - ret.failure(err) - - let data = db.toSeq(byte) - let res = (waitFor tds[].ds.put(key, data)).catch - # print "thrbackend: putTask: fire", ret[].signal.fireSync().get() - if res.isErr: - ret.failure(res.error()) - else: - ret.success() - - discard ret[].signal.fireSync() - -proc put*( - ret: TResult[void], - tds: ThreadDatastorePtr, - key: Key, - data: seq[byte] -) = - let bkey = StringBuffer.new(key.id()) - let bval = DataBuffer.new(data) - - tds[].tp.spawn putTask(ret, tds, bkey, bval) - - -proc deleteTask*( - ret: TResult[void], - tds: ThreadDatastorePtr, - kb: KeyBuffer, -) = - - without key =? kb.toKey(), err: - ret.failure(err) - - let res = (waitFor tds[].ds.delete(key)).catch - # print "thrbackend: putTask: fire", ret[].signal.fireSync().get() - if res.isErr: - ret.failure(res.error()) - else: - ret.success() - - discard ret[].signal.fireSync() - -import pretty - -proc delete*( - ret: TResult[void], - tds: ThreadDatastorePtr, - key: Key, -) = - let bkey = StringBuffer.new(key.id()) - tds[].tp.spawn deleteTask(ret, tds, bkey) - -import os - -proc queryTask*( - ret: TResult[QueryResponseBuffer], - tds: ThreadDatastorePtr, - qiter: QueryIterPtr, -) = - - try: - os.sleep(100) - without res =? waitFor(qiter[].it.next()), err: - ret.failure(err) - - let qrb = res.toBuffer() - # print "queryTask: ", " res: ", res - - ret.success(qrb) - print "queryTask: ", " qrb:key: ", ret[].results.get().key.toString() - print "queryTask: ", " qrb:data: ", ret[].results.get().data.toString() - - except Exception as exc: - ret.failure(exc) - - discard ret[].signal.fireSync() - -proc query*( - ret: TResult[QueryResponseBuffer], - tds: ThreadDatastorePtr, - qiter: QueryIterPtr, -) = - tds[].tp.spawn queryTask(ret, tds, qiter) diff --git a/datastore/databuffer.nim b/datastore/threads/databuffer.nim similarity index 96% rename from datastore/databuffer.nim rename to datastore/threads/databuffer.nim index 431e5e8..67193e0 100644 --- a/datastore/databuffer.nim +++ b/datastore/threads/databuffer.nim @@ -1,5 +1,12 @@ + import threading/smartptrs import std/hashes +import pkg/stew/results +import pkg/upraises + +push: {.upraises: [].} + +import ../key export hashes @@ -7,20 +14,20 @@ type DataBufferHolder* = object buf: ptr UncheckedArray[byte] size: int - + DataBuffer* = SharedPtr[DataBufferHolder] ##\ ## A fixed length data buffer using a SharedPtr. ## It is thread safe even with `refc` since ## it doesn't use string or seq types internally. - ## + ## KeyBuffer* = DataBuffer ValueBuffer* = DataBuffer StringBuffer* = DataBuffer + CatchableErrorBuffer* = object msg: StringBuffer - proc `=destroy`*(x: var DataBufferHolder) = ## copy pointer implementation if x.buf != nil: @@ -51,7 +58,7 @@ proc new*(tp: typedesc[DataBuffer], size: int = 0): DataBuffer = proc new*[T: byte | char](tp: typedesc[DataBuffer], data: openArray[T]): DataBuffer = ## allocate new buffer and copies indata from openArray - ## + ## result = DataBuffer.new(data.len) if data.len() > 0: copyMem(result[].buf, unsafeAddr data[0], data.len) @@ -78,10 +85,8 @@ proc toBuffer*(err: ref Exception): CatchableErrorBuffer = msg: StringBuffer.new(err.msg) ) -import ./key -import stew/results - proc new*(tp: typedesc[KeyBuffer], key: Key): KeyBuffer = KeyBuffer.new(key.id()) + proc toKey*(kb: KeyBuffer): Result[Key, ref CatchableError] = Key.init(kb.toString()) diff --git a/datastore/threads/foreignbuffer.nim b/datastore/threads/foreignbuffer.nim new file mode 100644 index 0000000..b2002ea --- /dev/null +++ b/datastore/threads/foreignbuffer.nim @@ -0,0 +1,68 @@ + +type + ## Copy foreign buffers between threads. + ## + ## This is meant to be used as a temporary holder a + ## pointer to a foreign buffer that is being passed + ## between threads. + ## + ## The receiving thread should copy the contained buffer + ## to it's local GC as soon as possible. Should only be + ## used with refgc. + ## + ForeignBuff*[T] = object + buf: ptr UncheckedArray[T] + len: int + cell: ForeignCell + +proc `=sink`[T](a: var ForeignBuff[T], b: ForeignBuff[T]) = + `=destroy`(a) + wasMoved(a) + a.len = b.len + a.buf = b.buf + a.cell = b.cell + +proc `=copy`[T](a: var ForeignBuff[T], b: ForeignBuff[T]) + {.error: "You can't copy the buffer, only it's contents!".} + +proc `=destroy`[T](self: var ForeignBuff[T]) = + if self.cell.data != nil: + echo "DESTROYING CELL" + dispose self.cell + +proc len*[T](self: ForeignBuff[T]): int = + return self.len + +template `[]`*[T](self: ForeignBuff[T], idx: int): T = + assert idx >= 0 and idx < self.len + return self.buf[idx] + +template `[]=`*[T](self: ForeignBuff[T], idx: int, val: T) = + assert idx >= 0 and idx < self.len + return self.buf[idx] + +proc get*[T](self: ForeignBuff[T]): ptr UncheckedArray[T] = + self.buf + +iterator items*[T](self: ForeignBuff[T]): T = + for i in 0 ..< self.len: + yield self.buf[i] + +iterator miterms*[T](self: ForeignBuff[T]): var T = + for i in 0 ..< self.len: + yield self.buf[i] + +proc attach*[T]( + self: var ForeignBuff[T], + buf: ptr UncheckedArray[T], + len: int, + cell: ForeignCell) = + ## Attach a foreign pointer to this buffer + ## + + self.buf = buf + self.len = len + self.cell = cell + +func init*[T](_: type ForeignBuff[T]): ForeignBuff[T] = + return ForeignBuff[T]() diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim new file mode 100644 index 0000000..71a1bbe --- /dev/null +++ b/datastore/threads/threadproxyds.nim @@ -0,0 +1,237 @@ + +import pkg/upraises + +push: {.upraises: [].} + +import std/atomics + +import pkg/chronos +import pkg/chronos/threadsync +import pkg/questionable +import pkg/questionable/results +import pkg/stew/ptrops +import pkg/taskpools + +import ../key +import ../query +import ../datastore + +import ./foreignbuffer + +type + TaskRes = object + ok: Atomic[bool] + msg: ptr cstring + + TaskCtx = object + ds: ptr Datastore + res: TaskRes + signal: ThreadSignalPtr + + ThreadDatastore* = ref object of Datastore + tp*: Taskpool + ds*: Datastore + +proc hasTask( + ctx: ptr TaskCtx, + key: ptr Key, + doesHave: ptr bool) = + + let + res = (waitFor ctx[].ds[].has(key[])).catch + + if res.isErr: + var + err = cstring(res.error().msg) + ctx[].res.msg = addr err + else: + ctx[].res.msg = nil + doesHave[] = res.get().get() + + ctx[].res.ok.store(res.isOk) + discard ctx[].signal.fireSync() + +proc has*( + self: ThreadDatastore, + key: Key): Future[?!bool] {.async.} = + + var + signal = ThreadSignalPtr.new().valueOr: + return failure("Failed to create signal") + + key = key + ctx = TaskCtx( + ds: addr self.ds, + res: TaskRes(msg: nil), + signal: signal) + doesHave = false + + proc runTask() = + self.tp.spawn hasTask(addr ctx, addr key, addr doesHave) + + try: + runTask() + await wait(ctx.signal) + + var data: bool + if ctx.res.ok.load() == false: + return failure("error") + + return success(doesHave) + finally: + ctx.signal.close() + +proc delTask(ctx: ptr TaskCtx, key: ptr Key) = + + let + res = (waitFor ctx[].ds[].delete(key[])).catch + + if res.isErr: + var + err = cstring(res.error().msg) + ctx[].res.msg = addr err + else: + ctx[].res.msg = nil + + ctx[].res.ok.store(res.isOk) + discard ctx[].signal.fireSync() + +proc delete*( + self: ThreadDatastore, + key: Key): Future[?!void] {.async.} = + + var + signal = ThreadSignalPtr.new().valueOr: + return failure("Failed to create signal") + + key = key + ctx = TaskCtx( + ds: addr self.ds, + res: TaskRes(msg: nil), + signal: signal) + + proc runTask() = + self.tp.spawn delTask(addr ctx, addr key) + + try: + runTask() + await wait(ctx.signal) + + if ctx.res.ok.load() == false: + return failure("error") + + return success() + finally: + ctx.signal.close() + +proc putTask( + ctx: ptr TaskCtx, + key: ptr Key, + data: ptr UncheckedArray[byte], + len: int) = + ## run put in a thread task + ## + + let + res = (waitFor ctx[].ds[].put( + key[], + @(toOpenArray(data, 0, len - 1)))).catch + + if res.isErr: + var err = cstring(res.error().msg) + ctx[].res.msg = addr err + else: + ctx[].res.msg = nil + + ctx[].res.ok.store(res.isOk) + discard ctx[].signal.fireSync() + +proc put*( + self: ThreadDatastore, + key: Key, + data: seq[byte]): Future[?!void] {.async.} = + + var + signal = ThreadSignalPtr.new().valueOr: + return failure("Failed to create signal") + key = key + data = data + ctx = TaskCtx( + ds: addr self.ds, + res: TaskRes(msg: nil), + signal: signal) + + proc runTask() = + self.tp.spawn putTask( + addr ctx, + addr key, makeUncheckedArray(baseAddr data), + data.len) + + try: + runTask() + await wait(ctx.signal) + finally: + ctx.signal.close() + + if ctx.res.ok.load() == false: + return failure("error") + + return success() + +proc getTask( + ctx: ptr TaskCtx, + key: ptr Key, + buf: ptr ForeignBuff[byte]) = + ## Run get in a thread task + ## + + without res =? (waitFor ctx[].ds[].get(key[])).catch, error: + var err = cstring(error.msg) + ctx[].res.msg = addr err + return + + var + data = res.get() + cell = protect(addr data) + ctx[].res.msg = nil + buf[].attach( + makeUncheckedArray(baseAddr data), data.len, cell) + + ctx[].res.ok.store(res.isOk) + discard ctx[].signal.fireSync() + +proc get*( + self: ThreadDatastore, + key: Key): Future[?!seq[byte]] {.async.} = + + var + signal = ThreadSignalPtr.new().valueOr: + return failure("Failed to create signal") + + key = key + buf = ForeignBuff[byte].init() + ctx = TaskCtx( + ds: addr self.ds, + res: TaskRes(msg: nil), + signal: signal) + + proc runTask() = + self.tp.spawn getTask(addr ctx, addr key, addr buf) + + try: + runTask() + await wait(ctx.signal) + + if ctx.res.ok.load() == false: + return failure("error") + + var data = @(toOpenArray(buf.get(), 0, buf.len - 1)) + return success(data) + finally: + ctx.signal.close() + +proc new*( + self: type ThreadDatastore, + ds: Datastore, + tp: Taskpool): ?!ThreadDatastore = + success ThreadDatastore(tp: tp, ds: ds) diff --git a/datastore/threadproxyds.nim b/datastore/threads/threadproxyds_.nim similarity index 100% rename from datastore/threadproxyds.nim rename to datastore/threads/threadproxyds_.nim diff --git a/datastore/threads/threadresult.nim b/datastore/threads/threadresult.nim new file mode 100644 index 0000000..2692093 --- /dev/null +++ b/datastore/threads/threadresult.nim @@ -0,0 +1,60 @@ + +import threading/smartptrs + +import pkg/upraises +push: {.upraises: [].} + +import pkg/chronos/threadsync + +import ./foreignbuffer + +type + CatchableErrorBuffer = ForeignBuffer[ref CatchableError] + + ThreadResult*[T] = object + signal*: ThreadSignalPtr + results*: Result[T, CatchableErrorBuffer] + + TResult*[T] = SharedPtr[ThreadResult[T]] + +proc success*[T](ret: TResult[T], value: T) = + ret[].results.ok(value) + +proc success*[T: void](ret: TResult[T]) = + ret[].results.ok() + +proc failure*[T](ret: TResult[T], exc: ref Exception) = + ret[].results.err(exc.toBuffer()) + +proc convert*[T, S](ret: TResult[T], tp: typedesc[S]): Result[S, ref CatchableError] = + if ret[].results.isOk(): + when S is seq[byte]: + result.ok(ret[].results.get().toSeq(byte)) + elif S is string: + result.ok(ret[].results.get().toString()) + elif S is void: + result.ok() + # elif S is QueryResponse: + # result.ok(ret[].results.get().toQueryResponse()) + else: + result.ok(ret[].results.get()) + else: + let exc: ref CatchableError = ret[].results.error().toCatchable() + result.err(exc) + +proc new*[T]( + self: type ThreadResult, + tp: typedesc[T]): Result[TResult[T], ref CatchableError] = + ## Create a new ThreadResult for type T + ## + + let + res = newSharedPtr(ThreadResult[T]) + signal = ThreadSignalPtr.new() + + if signal.isErr: + return err((ref CatchableError)(msg: signal.error())) + else: + res[].signal = signal.get() + + ok res diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 0641570..0f79536 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -7,75 +7,84 @@ import pkg/asynctest import pkg/chronos import pkg/stew/results import pkg/stew/byteutils +import pkg/taskpools import pkg/datastore/memoryds -import pkg/datastore/threadproxyds +import pkg/datastore/threads/threadproxyds import ./dscommontests -import ./querycommontests +# import ./querycommontests import pretty -suite "Test Basic ThreadProxyDatastore": - var - sds: ThreadProxyDatastore - mem: MemoryDatastore - key1: Key - data: seq[byte] +# suite "Test Basic ThreadProxyDatastore": +# var +# sds: ThreadDatastore +# mem: MemoryDatastore +# key1: Key +# data: seq[byte] +# taskPool: Taskpool - setupAll: - mem = MemoryDatastore.new() - sds = newThreadProxyDatastore(mem).expect("should work") - key1 = Key.init("/a").tryGet - data = "value for 1".toBytes() +# setupAll: +# mem = MemoryDatastore.new() +# taskPool = TaskPool.new(3) +# sds = ThreadDatastore.new(mem, taskPool).expect("should work") +# key1 = Key.init("/a").tryGet +# data = "value for 1".toBytes() - test "check put": - echo "\n\n=== put ===" - let res1 = await sds.put(key1, data) - print "res1: ", res1 +# test "check put": +# echo "\n\n=== put ===" +# let res1 = await sds.put(key1, data) +# print "res1: ", res1 - test "check get": - echo "\n\n=== get ===" - let res2 = await sds.get(key1) - check res2.get() == data - var val = "" - for c in res2.get(): - val &= char(c) - print "get res2: ", $val +# test "check get": +# echo "\n\n=== get ===" +# let res2 = await sds.get(key1) +# check res2.get() == data +# var val = "" +# for c in res2.get(): +# val &= char(c) - # echo "\n\n=== put cancel ===" - # # let res1 = await sds.put(key1, "value for 1".toBytes()) - # let res3 = sds.put(key1, "value for 1".toBytes()) - # res3.cancel() - # # print "res3: ", res3 +# print "get res2: ", $val + +# # echo "\n\n=== put cancel ===" +# # # let res1 = await sds.put(key1, "value for 1".toBytes()) +# # let res3 = sds.put(key1, "value for 1".toBytes()) +# # res3.cancel() +# # # print "res3: ", res3 suite "Test Basic ThreadProxyDatastore": var memStore: MemoryDatastore - ds: ThreadProxyDatastore + ds: ThreadDatastore key = Key.init("/a/b").tryGet() bytes = "some bytes".toBytes otherBytes = "some other bytes".toBytes + taskPool: TaskPool setupAll: memStore = MemoryDatastore.new() - ds = newThreadProxyDatastore(memStore).expect("should work") + taskPool = TaskPool.new(3) + ds = ThreadDatastore.new(memStore, taskPool).expect("should work") teardownAll: (await memStore.close()).get() + # test "check put": + # (await ds.put(key, bytes)).tryGet() + basicStoreTests(ds, key, bytes, otherBytes) -suite "Test Query": - var - mem: MemoryDatastore - sds: ThreadProxyDatastore +# suite "Test Query": +# var +# mem: MemoryDatastore +# sds: ThreadProxyDatastore - setup: - mem = MemoryDatastore.new() - sds = newThreadProxyDatastore(mem).expect("should work") +# setup: +# mem = MemoryDatastore.new() +# sds = newThreadProxyDatastore(mem).expect("should work") - queryTests(sds, false) +# queryTests(sds, false)