mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 14:13:09 +00:00
reworking results and tasks
This commit is contained in:
parent
3bde00d111
commit
a0d60cc9be
@ -5,10 +5,12 @@ import pkg/chronos/threadsync
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/upraises
|
||||
import pkg/taskpools
|
||||
|
||||
import ./key
|
||||
import ./query
|
||||
import ./datastore
|
||||
import ./threadbackend
|
||||
|
||||
export key, query
|
||||
|
||||
@ -18,6 +20,11 @@ type
|
||||
|
||||
SharedDatastore* = ref object of Datastore
|
||||
# stores*: Table[Key, SharedDatastore]
|
||||
tds: ThreadDatastorePtr
|
||||
|
||||
template newSignal(): auto =
|
||||
ThreadSignalPtr.new().valueOr:
|
||||
return failure newException(DatastoreError, "error creating signal")
|
||||
|
||||
method has*(
|
||||
self: SharedDatastore,
|
||||
@ -70,9 +77,13 @@ method close*(
|
||||
|
||||
func new*[S: ref Datastore](
|
||||
T: typedesc[SharedDatastore],
|
||||
storeTp: typedesc[S]
|
||||
backend: ThreadBackend,
|
||||
): ?!SharedDatastore =
|
||||
|
||||
var self = SharedDatastore()
|
||||
var
|
||||
self = SharedDatastore()
|
||||
signal = newSignal()
|
||||
res = TResult[ThreadDatastore].new()
|
||||
self.tds = ThreadDatastore.new(signal, backend, res)
|
||||
|
||||
success self
|
||||
|
||||
@ -12,13 +12,17 @@ import threading/smartptrs
|
||||
|
||||
import fsds
|
||||
|
||||
export key, query
|
||||
export key, query, smartptrs, databuffer
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
|
||||
ThreadResult*[T: DataBuffer | void] = Result[T, CatchableErrorBuffer]
|
||||
ThreadResult*[T: DataBuffer | void] = object
|
||||
ready*: bool
|
||||
success*: bool
|
||||
val*: T
|
||||
err*: CatchableErrorBuffer
|
||||
|
||||
TResult*[T] = UniquePtr[ThreadResult[T]]
|
||||
|
||||
@ -36,27 +40,43 @@ type
|
||||
of SQliteBackend:
|
||||
discard
|
||||
|
||||
ThreadDatastore* = ptr object
|
||||
tp: Taskpool
|
||||
ThreadDatastore* = object
|
||||
taskpool: Taskpool
|
||||
backendDatastore: Datastore
|
||||
|
||||
var backendDatastore {.threadvar.}: Datastore
|
||||
ThreadDatastorePtr* = SharedPtr[ThreadDatastore]
|
||||
|
||||
proc new*[T](tp: typedesc[TResult[T]]): TResult[T] =
|
||||
newUniquePtr(ThreadResult[T])
|
||||
|
||||
proc startupDatastore(backend: ThreadBackend): bool =
|
||||
proc startupDatastore(
|
||||
signal: ThreadSignalPtr,
|
||||
backend: ThreadBackend,
|
||||
res: TResult[ThreadDatastorePtr],
|
||||
): bool =
|
||||
## starts up a FS instance on a give thread
|
||||
case backend.kind:
|
||||
of FSBackend:
|
||||
let res = FSDatastore.new(
|
||||
let ds = FSDatastore.new(
|
||||
root = backend.root.toString(),
|
||||
depth = backend.depth,
|
||||
caseSensitive = backend.caseSensitive,
|
||||
ignoreProtected = backend.ignoreProtected)
|
||||
if res.isOk:
|
||||
backendDatastore = res.get()
|
||||
ignoreProtected = backend.ignoreProtected
|
||||
)
|
||||
if ds.isOk:
|
||||
let tds = newSharedPtr(ThreadDatastore)
|
||||
tds[].backendDatastore = ds.get()
|
||||
|
||||
res[].ready = true
|
||||
res[].success = true
|
||||
res[].val = tds
|
||||
else:
|
||||
res[].ready = true
|
||||
res[].success = false
|
||||
else:
|
||||
discard
|
||||
|
||||
signal.fireSync().get()
|
||||
|
||||
proc get*(
|
||||
self: ThreadDatastore,
|
||||
@ -66,37 +86,37 @@ proc get*(
|
||||
|
||||
return ok(DataBuffer.new())
|
||||
|
||||
proc put*(
|
||||
proc putWorker*(
|
||||
self: ThreadDatastore,
|
||||
signal: ThreadSignalPtr,
|
||||
key: KeyBuffer,
|
||||
data: DataBuffer,
|
||||
): TResult[void] =
|
||||
res: TResult[void]
|
||||
) =
|
||||
|
||||
return TResult[void].new()
|
||||
discard
|
||||
|
||||
proc close*(
|
||||
self: ThreadDatastore,
|
||||
signal: ThreadSignalPtr,
|
||||
): TResult[void] =
|
||||
try:
|
||||
self[].tp.shutdown()
|
||||
return ok()
|
||||
except Exception as exc:
|
||||
return TResult[void].new()
|
||||
# proc close*(
|
||||
# self: ThreadDatastore,
|
||||
# signal: ThreadSignalPtr,
|
||||
# ): TResult[void] =
|
||||
# try:
|
||||
# self[].tp.shutdown()
|
||||
# return ok()
|
||||
# except Exception as exc:
|
||||
# return TResult[void].new()
|
||||
|
||||
func new*[S: ref Datastore](
|
||||
T: typedesc[ThreadDatastore],
|
||||
signal: ThreadSignalPtr,
|
||||
backend: ThreadBackend,
|
||||
): TResult[ThreadDatastore] =
|
||||
res: TResult[ThreadDatastore]
|
||||
) =
|
||||
|
||||
var self = T()
|
||||
self.tp = Taskpool.new(num_threads = 1) ##\
|
||||
## Default to one thread, multiple threads \
|
||||
## will require more work
|
||||
|
||||
let pending = self.tp.spawn startupDatastore(backend)
|
||||
sync pending
|
||||
let pending = self.tp.spawn startupDatastore(signal, backend)
|
||||
|
||||
ok self
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user