From 055edb4a41a47b52068540d5c9781d8d7a1f0abf Mon Sep 17 00:00:00 2001 From: Jaremy Creechley Date: Thu, 24 Aug 2023 17:33:32 -0700 Subject: [PATCH] plumbing values --- datastore/databuffer.nim | 12 +++++- datastore/sharedds.nim | 23 ++++------ datastore/threadbackend.nim | 84 +++++++++---------------------------- 3 files changed, 40 insertions(+), 79 deletions(-) diff --git a/datastore/databuffer.nim b/datastore/databuffer.nim index 9be717a..dd745fa 100644 --- a/datastore/databuffer.nim +++ b/datastore/databuffer.nim @@ -17,6 +17,8 @@ type KeyBuffer* = DataBuffer ValueBuffer* = DataBuffer StringBuffer* = DataBuffer + CatchableErrorBuffer* = object + msg: StringBuffer proc `$`*(data: DataBuffer): string = if data.buf.isNil: @@ -83,7 +85,15 @@ proc toSeq*[T: byte | char](a: DataBuffer, tp: typedesc[T]): seq[T] = result = newSeq[T](a.len) copyMem(addr result[0], unsafeAddr a.buf[0], a.len) -proc toString*(data: StringBuffer): string = +proc toString*(data: DataBuffer): string = result = newString(data.len()) if data.len() > 0: copyMem(addr result[0], unsafeAddr data.buf[0], data.len) + +proc toCatchable*(data: CatchableErrorBuffer): ref CatchableError = + result = (ref CatchableError)(msg: data.msg.toString()) + +proc toBuffer*(err: ref Exception): CatchableErrorBuffer = + return CatchableErrorBuffer( + msg: StringBuffer.new(err.msg) + ) diff --git a/datastore/sharedds.nim b/datastore/sharedds.nim index 62a06a9..a374a11 100644 --- a/datastore/sharedds.nim +++ b/datastore/sharedds.nim @@ -1,6 +1,7 @@ import std/tables import pkg/chronos +import pkg/chronos/threadsync import pkg/questionable import pkg/questionable/results import pkg/upraises @@ -66,22 +67,18 @@ method put*( data: seq[byte] ): Future[?!void] {.async.} = - # without mounted =? self.dispatch(key), error: - # return failure(error) - - # return (await mounted.store.store.put(mounted.relative, data)) - return success() + let signal = ThreadSignalPtr.new() + if signal.isErr: + return failure("error creating signal") + else: + await wait(signal.get()) + return success() method put*( self: SharedDatastore, batch: seq[BatchEntry] ): Future[?!void] {.async.} = - - for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: - return failure err - - return success() + raiseAssert("Not implemented!") method close*( self: SharedDatastore @@ -98,8 +95,6 @@ func new*[S: ref Datastore]( storeTp: typedesc[S] ): ?!SharedDatastore = - var self = T() - # for (k, v) in stores.pairs: - # self.stores[?k.path] = MountedStore(store: v, key: k) + var self = SharedDatastore() success self diff --git a/datastore/threadbackend.nim b/datastore/threadbackend.nim index a4c1bfb..6970443 100644 --- a/datastore/threadbackend.nim +++ b/datastore/threadbackend.nim @@ -1,8 +1,6 @@ -import std/tables -import pkg/chronos -import pkg/questionable -import pkg/questionable/results +import pkg/chronos/threadsync +import stew/results import pkg/upraises import pkg/taskpools @@ -32,7 +30,7 @@ type of SQliteBackend: discard - ThreadDatastore* = ref object of Datastore + ThreadDatastore* = ptr object tp: Taskpool var backendDatastore {.threadvar.}: Datastore @@ -51,81 +49,39 @@ proc startupDatastore(backend: ThreadBackend): bool = else: discard -proc has*( - self: ThreadDatastore, - key: KeyBuffer -): Future[?!bool] {.async.} = - - # without mounted =? self.dispatch(key): - # return failure "No mounted datastore found" - # return (await mounted.store.store.has(mounted.relative)) - return success(true) - -proc delete*( - self: ThreadDatastore, - key: KeyBuffer -): Future[?!void] {.async.} = - - # without mounted =? self.dispatch(key), error: - # return failure(error) - # return (await mounted.store.store.delete(mounted.relative)) - return success() - -proc delete*( - self: ThreadDatastore, - keys: seq[KeyBuffer] -): Future[?!void] {.async.} = - - # for key in keys: - # if err =? (await self.delete(key)).errorOption: - # return failure err - - return success() - proc get*( self: ThreadDatastore, + signal: ThreadSignalPtr, key: KeyBuffer -): Future[?!DataBuffer] {.async.} = +): Result[DataBuffer, CatchableErrorBuffer] = - # without mounted =? self.dispatch(key), error: - # return failure(error) - - # return await mounted.store.store.get(mounted.relative) - return success(DataBuffer.new()) + return ok(DataBuffer.new()) proc put*( self: ThreadDatastore, + signal: ThreadSignalPtr, key: KeyBuffer, data: DataBuffer -): Future[?!void] {.async.} = +): Result[void, CatchableErrorBuffer] = - # without mounted =? self.dispatch(key), error: - # return failure(error) + return ok() - # return (await mounted.store.store.put(mounted.relative, data)) - return success() - -proc put*( - self: ThreadDatastore, - batch: seq[BatchEntry] -): Future[?!void] {.async.} = - - for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: - return failure err - - return success() proc close*( - self: ThreadDatastore -): Future[?!void] {.async.} = - self.tp.shutdown() - return success() + self: ThreadDatastore, + signal: ThreadSignalPtr, +): Result[void, CatchableErrorBuffer] = + try: + self[].tp.shutdown() + return ok() + except Exception as exc: + return err(exc.toBuffer()) func new*[S: ref Datastore]( T: typedesc[ThreadDatastore], + signal: ThreadSignalPtr, backend: ThreadBackend, -): ?!ThreadDatastore = +): Result[ThreadDatastore, CatchableErrorBuffer] = var self = T() self.tp = Taskpool.new(num_threads = 1) ##\ @@ -135,4 +91,4 @@ func new*[S: ref Datastore]( let pending = self.tp.spawn startupDatastore(backend) sync pending - success self + ok self