plumbing values

This commit is contained in:
Jaremy Creechley 2023-08-24 17:33:32 -07:00 committed by Dmitriy Ryajov
parent d09ef9b21a
commit 055edb4a41
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
3 changed files with 40 additions and 79 deletions

View File

@ -17,6 +17,8 @@ type
KeyBuffer* = DataBuffer KeyBuffer* = DataBuffer
ValueBuffer* = DataBuffer ValueBuffer* = DataBuffer
StringBuffer* = DataBuffer StringBuffer* = DataBuffer
CatchableErrorBuffer* = object
msg: StringBuffer
proc `$`*(data: DataBuffer): string = proc `$`*(data: DataBuffer): string =
if data.buf.isNil: if data.buf.isNil:
@ -83,7 +85,15 @@ proc toSeq*[T: byte | char](a: DataBuffer, tp: typedesc[T]): seq[T] =
result = newSeq[T](a.len) result = newSeq[T](a.len)
copyMem(addr result[0], unsafeAddr a.buf[0], a.len) copyMem(addr result[0], unsafeAddr a.buf[0], a.len)
proc toString*(data: StringBuffer): string = proc toString*(data: DataBuffer): string =
result = newString(data.len()) result = newString(data.len())
if data.len() > 0: if data.len() > 0:
copyMem(addr result[0], unsafeAddr data.buf[0], data.len) copyMem(addr result[0], unsafeAddr data.buf[0], data.len)
proc toCatchable*(data: CatchableErrorBuffer): ref CatchableError =
result = (ref CatchableError)(msg: data.msg.toString())
proc toBuffer*(err: ref Exception): CatchableErrorBuffer =
return CatchableErrorBuffer(
msg: StringBuffer.new(err.msg)
)

View File

@ -1,6 +1,7 @@
import std/tables import std/tables
import pkg/chronos import pkg/chronos
import pkg/chronos/threadsync
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/upraises import pkg/upraises
@ -66,22 +67,18 @@ method put*(
data: seq[byte] data: seq[byte]
): Future[?!void] {.async.} = ): Future[?!void] {.async.} =
# without mounted =? self.dispatch(key), error: let signal = ThreadSignalPtr.new()
# return failure(error) if signal.isErr:
return failure("error creating signal")
# return (await mounted.store.store.put(mounted.relative, data)) else:
await wait(signal.get())
return success() return success()
method put*( method put*(
self: SharedDatastore, self: SharedDatastore,
batch: seq[BatchEntry] batch: seq[BatchEntry]
): Future[?!void] {.async.} = ): Future[?!void] {.async.} =
raiseAssert("Not implemented!")
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
return success()
method close*( method close*(
self: SharedDatastore self: SharedDatastore
@ -98,8 +95,6 @@ func new*[S: ref Datastore](
storeTp: typedesc[S] storeTp: typedesc[S]
): ?!SharedDatastore = ): ?!SharedDatastore =
var self = T() var self = SharedDatastore()
# for (k, v) in stores.pairs:
# self.stores[?k.path] = MountedStore(store: v, key: k)
success self success self

View File

@ -1,8 +1,6 @@
import std/tables
import pkg/chronos import pkg/chronos/threadsync
import pkg/questionable import stew/results
import pkg/questionable/results
import pkg/upraises import pkg/upraises
import pkg/taskpools import pkg/taskpools
@ -32,7 +30,7 @@ type
of SQliteBackend: of SQliteBackend:
discard discard
ThreadDatastore* = ref object of Datastore ThreadDatastore* = ptr object
tp: Taskpool tp: Taskpool
var backendDatastore {.threadvar.}: Datastore var backendDatastore {.threadvar.}: Datastore
@ -51,81 +49,39 @@ proc startupDatastore(backend: ThreadBackend): bool =
else: else:
discard discard
proc has*(
self: ThreadDatastore,
key: KeyBuffer
): Future[?!bool] {.async.} =
# without mounted =? self.dispatch(key):
# return failure "No mounted datastore found"
# return (await mounted.store.store.has(mounted.relative))
return success(true)
proc delete*(
self: ThreadDatastore,
key: KeyBuffer
): Future[?!void] {.async.} =
# without mounted =? self.dispatch(key), error:
# return failure(error)
# return (await mounted.store.store.delete(mounted.relative))
return success()
proc delete*(
self: ThreadDatastore,
keys: seq[KeyBuffer]
): Future[?!void] {.async.} =
# for key in keys:
# if err =? (await self.delete(key)).errorOption:
# return failure err
return success()
proc get*( proc get*(
self: ThreadDatastore, self: ThreadDatastore,
signal: ThreadSignalPtr,
key: KeyBuffer key: KeyBuffer
): Future[?!DataBuffer] {.async.} = ): Result[DataBuffer, CatchableErrorBuffer] =
# without mounted =? self.dispatch(key), error: return ok(DataBuffer.new())
# return failure(error)
# return await mounted.store.store.get(mounted.relative)
return success(DataBuffer.new())
proc put*( proc put*(
self: ThreadDatastore, self: ThreadDatastore,
signal: ThreadSignalPtr,
key: KeyBuffer, key: KeyBuffer,
data: DataBuffer data: DataBuffer
): Future[?!void] {.async.} = ): Result[void, CatchableErrorBuffer] =
# without mounted =? self.dispatch(key), error: return ok()
# return failure(error)
# return (await mounted.store.store.put(mounted.relative, data))
return success()
proc put*(
self: ThreadDatastore,
batch: seq[BatchEntry]
): Future[?!void] {.async.} =
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
return failure err
return success()
proc close*( proc close*(
self: ThreadDatastore self: ThreadDatastore,
): Future[?!void] {.async.} = signal: ThreadSignalPtr,
self.tp.shutdown() ): Result[void, CatchableErrorBuffer] =
return success() try:
self[].tp.shutdown()
return ok()
except Exception as exc:
return err(exc.toBuffer())
func new*[S: ref Datastore]( func new*[S: ref Datastore](
T: typedesc[ThreadDatastore], T: typedesc[ThreadDatastore],
signal: ThreadSignalPtr,
backend: ThreadBackend, backend: ThreadBackend,
): ?!ThreadDatastore = ): Result[ThreadDatastore, CatchableErrorBuffer] =
var self = T() var self = T()
self.tp = Taskpool.new(num_threads = 1) ##\ self.tp = Taskpool.new(num_threads = 1) ##\
@ -135,4 +91,4 @@ func new*[S: ref Datastore](
let pending = self.tp.spawn startupDatastore(backend) let pending = self.tp.spawn startupDatastore(backend)
sync pending sync pending
success self ok self