import pkg/chronos import pkg/chronos/threadsync import pkg/questionable import pkg/questionable/results import stew/results import pkg/upraises import pkg/taskpools import ./key import ./query import ./datastore import ./databuffer import threading/smartptrs import pretty export key, query, smartptrs, databuffer push: {.upraises: [].} type ThreadResultKind* {.pure.} = enum NotReady Success Error ThreadResult*[T: DataBuffer | void | ThreadDatastorePtr] = object state*: ThreadResultKind signal*: ThreadSignalPtr value*: T error*: CatchableErrorBuffer TResult*[T] = SharedPtr[ThreadResult[T]] ThreadDatastore* = object tp*: Taskpool ds*: Datastore ThreadDatastorePtr* = SharedPtr[ThreadDatastore] proc newThreadResult*[T]( tp: typedesc[T] ): Result[TResult[T], ref CatchableError] = let res = newSharedPtr(ThreadResult[T]) let signal = ThreadSignalPtr.new() if signal.isErr: return err((ref CatchableError)(msg: signal.error())) else: res[].signal = signal.get() ok res proc getTask*( ret: TResult[DataBuffer], tds: ThreadDatastorePtr, kb: KeyBuffer, ) = without key =? kb.toKey(), err: ret[].state = Error try: let res = waitFor tds[].ds.get(key) if res.isErr: ret[].state = Error ret[].error = res.error().toBuffer() else: let db = DataBuffer.new res.get() ret[].state = Success ret[].value = db discard ret[].signal.fireSync() except CatchableError as err: ret[].state = Error ret[].error = err.toBuffer() proc get*( ret: TResult[DataBuffer], tds: ThreadDatastorePtr, key: Key, ) = let bkey = StringBuffer.new(key.id()) tds[].tp.spawn getTask(ret, tds, bkey) import os proc putTask*( ret: TResult[void], tds: ThreadDatastorePtr, kb: KeyBuffer, db: DataBuffer, ) = without key =? kb.toKey(), err: ret[].state = Error let data = db.toSeq(byte) let res = (waitFor tds[].ds.put(key, data)).catch # print "thrbackend: putTask: fire", ret[].signal.fireSync().get() if res.isErr: ret[].state = Error ret[].error = res.error().toBuffer() else: ret[].state = Success discard ret[].signal.fireSync() proc put*( ret: TResult[void], tds: ThreadDatastorePtr, key: Key, data: seq[byte] ) = echo "thrfrontend:put: " let bkey = StringBuffer.new(key.id()) let bval = DataBuffer.new(data) print "bkey: ", bkey print "bval: ", bval tds[].tp.spawn putTask(ret, tds, bkey, bval)