mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-10 09:33:08 +00:00
initial take at making fsds synchronous
This commit is contained in:
parent
51d2b47205
commit
24ac1a8708
@ -2,7 +2,6 @@ import std/os
|
||||
import std/options
|
||||
import std/strutils
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
from pkg/stew/results as stewResults import get, isErr
|
||||
@ -16,7 +15,7 @@ export datastore
|
||||
push: {.upraises: [].}
|
||||
|
||||
type
|
||||
FSDatastore* = ref object of Datastore
|
||||
FSDatastore* = object
|
||||
root*: DataBuffer
|
||||
ignoreProtected: bool
|
||||
depth: int
|
||||
@ -66,10 +65,13 @@ proc path*(self: FSDatastore, key: Key): ?!string =
|
||||
|
||||
return success fullname
|
||||
|
||||
proc has*(self: FSDatastore, key: Key): Future[?!bool] {.async.} =
|
||||
proc has*(self: FSDatastore, key: KeyId): ?!bool =
|
||||
let key = key.toKey()
|
||||
return self.path(key).?fileExists()
|
||||
|
||||
proc delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
|
||||
proc delete*(self: FSDatastore, key: KeyId): ?!void =
|
||||
let key = key.toKey()
|
||||
|
||||
without path =? self.path(key), error:
|
||||
return failure error
|
||||
|
||||
@ -83,9 +85,9 @@ proc delete*(self: FSDatastore, key: Key): Future[?!void] {.async.} =
|
||||
|
||||
return success()
|
||||
|
||||
proc delete*(self: FSDatastore, keys: seq[Key]): Future[?!void] {.async.} =
|
||||
proc delete*(self: FSDatastore, keys: openArray[KeyId]): ?!void =
|
||||
for key in keys:
|
||||
if err =? (await self.delete(key)).errorOption:
|
||||
if err =? self.delete(key).errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
@ -120,7 +122,7 @@ proc readFile*(self: FSDatastore, path: string): ?!seq[byte] =
|
||||
except CatchableError as e:
|
||||
return failure e
|
||||
|
||||
proc get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
proc get*(self: FSDatastore, key: Key): ?!seq[byte] =
|
||||
without path =? self.path(key), error:
|
||||
return failure error
|
||||
|
||||
@ -133,7 +135,7 @@ proc get*(self: FSDatastore, key: Key): Future[?!seq[byte]] {.async.} =
|
||||
proc put*(
|
||||
self: FSDatastore,
|
||||
key: Key,
|
||||
data: seq[byte]): Future[?!void] {.async.} =
|
||||
data: seq[byte]): ?!void =
|
||||
|
||||
without path =? self.path(key), error:
|
||||
return failure error
|
||||
@ -148,10 +150,10 @@ proc put*(
|
||||
|
||||
proc put*(
|
||||
self: FSDatastore,
|
||||
batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
||||
batch: seq[BatchEntry]): ?!void =
|
||||
|
||||
for entry in batch:
|
||||
if err =? (await self.put(entry.key, entry.data)).errorOption:
|
||||
if err =? self.put(entry.key, entry.data).errorOption:
|
||||
return failure err
|
||||
|
||||
return success()
|
||||
@ -167,12 +169,12 @@ proc dirWalker(path: string): iterator: string {.gcsafe.} =
|
||||
except CatchableError as exc:
|
||||
raise newException(Defect, exc.msg)
|
||||
|
||||
proc close*(self: FSDatastore): Future[?!void] {.async.} =
|
||||
proc close*(self: FSDatastore): ?!void =
|
||||
return success()
|
||||
|
||||
proc query*(
|
||||
self: FSDatastore,
|
||||
query: Query): Future[?!QueryIter] {.async.} =
|
||||
query: Query): ?!QueryIter =
|
||||
|
||||
without path =? self.path(query.key), error:
|
||||
return failure error
|
||||
@ -193,14 +195,14 @@ proc query*(
|
||||
var
|
||||
iter = QueryIter.new()
|
||||
|
||||
var lock = newAsyncLock() # serialize querying under threads
|
||||
proc next(): Future[?!QueryResponse] {.async.} =
|
||||
defer:
|
||||
if lock.locked:
|
||||
lock.release()
|
||||
# var lock = newAsyncLock() # serialize querying under threads
|
||||
proc next(): ?!QueryResponse =
|
||||
# defer:
|
||||
# if lock.locked:
|
||||
# lock.release()
|
||||
|
||||
if lock.locked:
|
||||
return failure (ref DatastoreError)(msg: "Should always await query features")
|
||||
# if lock.locked:
|
||||
# return failure (ref DatastoreError)(msg: "Should always await query features")
|
||||
|
||||
let
|
||||
root = $self.root
|
||||
@ -209,7 +211,7 @@ proc query*(
|
||||
if iter.finished:
|
||||
return failure "iterator is finished"
|
||||
|
||||
await lock.acquire()
|
||||
# await lock.acquire()
|
||||
|
||||
if finished(walker):
|
||||
iter.finished = true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user