diff --git a/datastore/sharedds.nim b/datastore/sharedds.nim index 79cafb8..a4ad564 100644 --- a/datastore/sharedds.nim +++ b/datastore/sharedds.nim @@ -5,10 +5,12 @@ import pkg/chronos/threadsync import pkg/questionable import pkg/questionable/results import pkg/upraises +import pkg/taskpools import ./key import ./query import ./datastore +import ./threadbackend export key, query @@ -18,6 +20,11 @@ type SharedDatastore* = ref object of Datastore # stores*: Table[Key, SharedDatastore] + tds: ThreadDatastorePtr + +template newSignal(): auto = + ThreadSignalPtr.new().valueOr: + return failure newException(DatastoreError, "error creating signal") method has*( self: SharedDatastore, @@ -70,9 +77,13 @@ method close*( func new*[S: ref Datastore]( T: typedesc[SharedDatastore], - storeTp: typedesc[S] + backend: ThreadBackend, ): ?!SharedDatastore = - var self = SharedDatastore() + var + self = SharedDatastore() + signal = newSignal() + res = TResult[ThreadDatastore].new() + self.tds = ThreadDatastore.new(signal, backend, res) success self diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index 019d8b6..f64716d 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -12,13 +12,17 @@ import threading/smartptrs import fsds -export key, query +export key, query, smartptrs, databuffer push: {.upraises: [].} type - ThreadResult*[T: DataBuffer | void] = Result[T, CatchableErrorBuffer] + ThreadResult*[T: DataBuffer | void] = object + ready*: bool + success*: bool + val*: T + err*: CatchableErrorBuffer TResult*[T] = UniquePtr[ThreadResult[T]] @@ -36,27 +40,43 @@ type of SQliteBackend: discard - ThreadDatastore* = ptr object - tp: Taskpool + ThreadDatastore* = object + taskpool: Taskpool + backendDatastore: Datastore -var backendDatastore {.threadvar.}: Datastore + ThreadDatastorePtr* = SharedPtr[ThreadDatastore] proc new*[T](tp: typedesc[TResult[T]]): TResult[T] = newUniquePtr(ThreadResult[T]) -proc startupDatastore(backend: ThreadBackend): bool = +proc startupDatastore( + signal: ThreadSignalPtr, + backend: ThreadBackend, + res: TResult[ThreadDatastorePtr], +): bool = ## starts up a FS instance on a give thread case backend.kind: of FSBackend: - let res = FSDatastore.new( + let ds = FSDatastore.new( root = backend.root.toString(), depth = backend.depth, caseSensitive = backend.caseSensitive, - ignoreProtected = backend.ignoreProtected) - if res.isOk: - backendDatastore = res.get() + ignoreProtected = backend.ignoreProtected + ) + if ds.isOk: + let tds = newSharedPtr(ThreadDatastore) + tds[].backendDatastore = ds.get() + + res[].ready = true + res[].success = true + res[].val = tds + else: + res[].ready = true + res[].success = false else: discard + + signal.fireSync().get() proc get*( self: ThreadDatastore, @@ -66,37 +86,37 @@ proc get*( return ok(DataBuffer.new()) -proc put*( +proc putWorker*( self: ThreadDatastore, signal: ThreadSignalPtr, key: KeyBuffer, data: DataBuffer, -): TResult[void] = + res: TResult[void] +) = - return TResult[void].new() + discard -proc close*( - self: ThreadDatastore, - signal: ThreadSignalPtr, -): TResult[void] = - try: - self[].tp.shutdown() - return ok() - except Exception as exc: - return TResult[void].new() +# proc close*( +# self: ThreadDatastore, +# signal: ThreadSignalPtr, +# ): TResult[void] = +# try: +# self[].tp.shutdown() +# return ok() +# except Exception as exc: +# return TResult[void].new() func new*[S: ref Datastore]( T: typedesc[ThreadDatastore], signal: ThreadSignalPtr, backend: ThreadBackend, -): TResult[ThreadDatastore] = + res: TResult[ThreadDatastore] +) = var self = T() self.tp = Taskpool.new(num_threads = 1) ##\ ## Default to one thread, multiple threads \ ## will require more work - let pending = self.tp.spawn startupDatastore(backend) - sync pending + let pending = self.tp.spawn startupDatastore(signal, backend) - ok self