This commit is contained in:
Jaremy Creechley 2023-09-28 18:19:05 -07:00
parent d24d3e5f2d
commit e945e30626
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
4 changed files with 24 additions and 25 deletions

View File

@ -19,7 +19,7 @@ push: {.upraises: [].}
type
FSDatastore* = ref object of Datastore
db: ThreadDatastore[FSBackend[KeyId, DataBuffer]]
db: ThreadProxy[FSBackend[KeyId, DataBuffer]]
method has*(self: FSDatastore,
key: Key): Future[?!bool] {.async.} =
@ -65,5 +65,5 @@ proc new*(
let
backend = ? newFSBackend[KeyId, DataBuffer](
root=root, depth=depth, caseSensitive=caseSensitive, ignoreProtected=ignoreProtected)
db = ? ThreadDatastore.new(backend, tp = tp)
db = ? ThreadProxy.new(backend, tp = tp)
success FSDatastore(db: db)

View File

@ -20,7 +20,7 @@ push: {.upraises: [].}
type
SQLiteDatastore* = ref object of Datastore
db: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
db: ThreadProxy[SQLiteBackend[KeyId, DataBuffer]]
proc path*(self: SQLiteDatastore): string =
self.db.backend.path()
@ -68,6 +68,6 @@ proc new*(
let
backend = ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly)
db = ? ThreadDatastore.new(backend, tp = tp)
db = ? ThreadProxy.new(backend, tp = tp)
success SQLiteDatastore(db: db)

View File

@ -48,10 +48,10 @@ type
## Task context object.
## This is a SharedPtr to make the query iter simpler
ThreadDatastore*[BT] = object
tp*: Taskpool
ThreadProxy*[BT] = object
tp: Taskpool
backend*: BT
semaphore*: AsyncSemaphore # semaphore is used for backpressure \
semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors
proc newTaskCtx*[T](tp: typedesc[T],
@ -105,7 +105,7 @@ template executeTask*[T](ctx: TaskCtx[T], blk: untyped) =
ctx.setDone()
discard ctx[].signal.fireSync()
template dispatchTaskWrap[BT](self: ThreadDatastore[BT],
template dispatchTaskWrap[BT](self: ThreadProxy[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =
@ -115,7 +115,7 @@ template dispatchTaskWrap[BT](self: ThreadDatastore[BT],
runTask()
await wait(ctx[].signal)
template dispatchTask*[BT](self: ThreadDatastore[BT],
template dispatchTask*[BT](self: ThreadProxy[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =
@ -141,7 +141,7 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
executeTask(ctx):
has(ds, key)
proc has*[BT](self: ThreadDatastore[BT],
proc has*[BT](self: ThreadProxy[BT],
key: Key): Future[?!bool] {.async.} =
await self.semaphore.acquire()
let signal = acquireSignal().get()
@ -161,7 +161,7 @@ proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx):
delete(ds, key)
proc delete*[BT](self: ThreadDatastore[BT],
proc delete*[BT](self: ThreadProxy[BT],
key: Key): Future[?!void] {.async.} =
## delete key
await self.semaphore.acquire()
@ -176,7 +176,7 @@ proc delete*[BT](self: ThreadDatastore[BT],
return ctx[].res.toRes()
proc delete*[BT](self: ThreadDatastore[BT],
proc delete*[BT](self: ThreadProxy[BT],
keys: seq[Key]): Future[?!void] {.async.} =
## delete batch
for key in keys:
@ -193,7 +193,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx):
put(ds, key, data)
proc put*[BT](self: ThreadDatastore[BT],
proc put*[BT](self: ThreadProxy[BT],
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
## put key with data
@ -210,9 +210,8 @@ proc put*[BT](self: ThreadDatastore[BT],
return ctx[].res.toRes()
proc put*[E, DB](
self: ThreadDatastore[DB],
batch: seq[E]): Future[?!void] {.async.} =
proc put*[E, DB](self: ThreadProxy[DB],
batch: seq[E]): Future[?!void] {.async.} =
## put batch data
for entry in batch:
if err =? (await self.put(entry.key, entry.data)).errorOption:
@ -229,7 +228,7 @@ proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
let res = get(ds, key)
res
proc get*[BT](self: ThreadDatastore[BT],
proc get*[BT](self: ThreadProxy[BT],
key: Key,
): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire()
@ -244,7 +243,7 @@ proc get*[BT](self: ThreadDatastore[BT],
return ctx[].res.toRes(v => v.toSeq())
proc close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
proc close*[BT](self: ThreadProxy[BT]): Future[?!void] {.async.} =
await self.semaphore.closeAll()
self.backend.close()
@ -295,7 +294,7 @@ proc queryTask[DB](
# set final result
(?!QResult).ok((KeyId.none, DataBuffer()))
proc query*[BT](self: ThreadDatastore[BT],
proc query*[BT](self: ThreadProxy[BT],
q: Query
): Future[?!QueryIter] {.async.} =
## performs async query
@ -369,14 +368,14 @@ proc query*[BT](self: ThreadDatastore[BT],
await iterDispose()
raise exc
proc new*[DB](self: type ThreadDatastore,
proc new*[DB](self: type ThreadProxy,
db: DB,
withLocks = static false,
tp: Taskpool
): ?!ThreadDatastore[DB] =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
): ?!ThreadProxy[DB] =
doAssert tp.numThreads > 1, "ThreadProxy requires at least 2 threads"
success ThreadDatastore[DB](
success ThreadProxy[DB](
tp: tp,
backend: db,
semaphore: AsyncSemaphore.new(tp.numThreads - 1)

View File

@ -131,8 +131,8 @@ for i in 1..N:
suite "Test ThreadDatastore cancelations":
privateAccess(SQLiteDatastore) # expose private fields
privateAccess(ThreadDatastore) # expose private fields
# privateAccess(TaskCtx) # expose private fields
privateAccess(ThreadProxy) # expose private fields
privateAccess(TaskCtx) # expose private fields
var sds: SQLiteDatastore