mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-03 14:13:09 +00:00
139 lines
2.9 KiB
Nim
139 lines
2.9 KiB
Nim
import std/tables
|
|
|
|
import pkg/chronos
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
import pkg/upraises
|
|
import pkg/taskpools
|
|
|
|
import ./key
|
|
import ./query
|
|
import ./datastore
|
|
import ./databuffer
|
|
|
|
import fsds
|
|
|
|
export key, query
|
|
|
|
push: {.upraises: [].}
|
|
|
|
type
|
|
ThreadBackendKind* {.pure.} = enum
|
|
FSBackend
|
|
SQliteBackend
|
|
|
|
ThreadBackend* = object
|
|
case kind*: ThreadBackendKind
|
|
of FSBackend:
|
|
root: StringBuffer
|
|
depth: int
|
|
caseSensitive: bool
|
|
ignoreProtected: bool
|
|
of SQliteBackend:
|
|
discard
|
|
|
|
ThreadDatastore* = ref object of Datastore
|
|
tp: Taskpool
|
|
|
|
var backendDatastore {.threadvar.}: ref Datastore
|
|
|
|
proc startupDatastore(backend: ThreadBackend): bool =
|
|
## starts up a FS instance on a give thread
|
|
case backend.kind:
|
|
of FSBackend:
|
|
let res = FSDatastore.new(
|
|
root = backend.root.toString(),
|
|
depth = backend.depth,
|
|
caseSensitive = backend.caseSensitive,
|
|
ignoreProtected = backend.ignoreProtected)
|
|
if res.isOk:
|
|
backendDatastore = res.get()
|
|
else:
|
|
discard
|
|
|
|
proc has*(
|
|
self: ThreadDatastore,
|
|
key: KeyBuffer
|
|
): Future[?!bool] {.async.} =
|
|
|
|
# without mounted =? self.dispatch(key):
|
|
# return failure "No mounted datastore found"
|
|
# return (await mounted.store.store.has(mounted.relative))
|
|
return success(true)
|
|
|
|
proc delete*(
|
|
self: ThreadDatastore,
|
|
key: KeyBuffer
|
|
): Future[?!void] {.async.} =
|
|
|
|
# without mounted =? self.dispatch(key), error:
|
|
# return failure(error)
|
|
# return (await mounted.store.store.delete(mounted.relative))
|
|
return success()
|
|
|
|
proc delete*(
|
|
self: ThreadDatastore,
|
|
keys: seq[KeyBuffer]
|
|
): Future[?!void] {.async.} =
|
|
|
|
# for key in keys:
|
|
# if err =? (await self.delete(key)).errorOption:
|
|
# return failure err
|
|
|
|
return success()
|
|
|
|
proc get*(
|
|
self: ThreadDatastore,
|
|
key: KeyBuffer
|
|
): Future[?!DataBuffer] {.async.} =
|
|
|
|
# without mounted =? self.dispatch(key), error:
|
|
# return failure(error)
|
|
|
|
# return await mounted.store.store.get(mounted.relative)
|
|
return success(DataBuffer.new())
|
|
|
|
proc put*(
|
|
self: ThreadDatastore,
|
|
key: KeyBuffer,
|
|
data: DataBuffer
|
|
): Future[?!void] {.async.} =
|
|
|
|
# without mounted =? self.dispatch(key), error:
|
|
# return failure(error)
|
|
|
|
# return (await mounted.store.store.put(mounted.relative, data))
|
|
return success()
|
|
|
|
proc put*(
|
|
self: ThreadDatastore,
|
|
batch: seq[BatchEntry]
|
|
): Future[?!void] {.async.} =
|
|
|
|
for entry in batch:
|
|
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
|
return failure err
|
|
|
|
return success()
|
|
|
|
proc close*(
|
|
self: ThreadDatastore
|
|
): Future[?!void] {.async.} =
|
|
self.tp.shutdown()
|
|
return success()
|
|
|
|
func new*[S: ref Datastore](
|
|
T: typedesc[ThreadDatastore],
|
|
backend: DatastoreBackend,
|
|
): ?!ThreadDatastore =
|
|
|
|
var self = T()
|
|
self.tp = Taskpool.new(num_threads = 1) ##\
|
|
## Default to one thread, multiple threads \
|
|
## will require more work
|
|
|
|
let pending = self.tp.spawn startupDatastore(backend)
|
|
sync pending
|
|
|
|
success self
|