Merge branch 'threadpool-refactor-generics' into threadpool-refactor-fsds

This commit is contained in:
Jaremy Creechley 2023-09-27 16:32:50 -07:00
commit 8db28571d9
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
4 changed files with 145 additions and 162 deletions

View File

@ -9,7 +9,7 @@ license = "Apache License 2.0 or MIT"
requires "nim >= 1.6.14",
"asynctest >= 0.3.1 & < 0.4.0",
"chronos#0277b65be2c7a365ac13df002fba6e172be55537",
"questionable >= 0.10.3 & < 0.11.0",
"questionable",
"sqlite3_abi",
"stew",
"unittest2",

View File

@ -36,50 +36,41 @@ logScope:
type
ThreadBackendKinds* = enum
Sqlite
# Filesystem
ThreadBackend* = object
## backend case type to avoid needing to make ThreadDatastore generic
case kind*: ThreadBackendKinds
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]
TaskCtxObj*[T: ThreadTypes] = object
res: ThreadResult[T]
signal: ThreadSignalPtr
running: bool ## used to mark when a task worker is running
cancelled: bool ## used to cancel a task before it's started
nextSignal: ThreadSignalPtr
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
## Task context object.
## This is a SharedPtr to make the query iter simpler
ThreadDatastore* = ref object of Datastore
ThreadDatastore*[BT] = ref object of Datastore
tp: Taskpool
backend: ThreadBackend
backend: BT
semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors
var ctxLock: Lock
ctxLock.initLock()
proc newTaskCtx*[T](tp: typedesc[T],
signal: ThreadSignalPtr,
nextSignal: ThreadSignalPtr = nil): TaskCtx[T] =
newSharedPtr(TaskCtxObj[T](signal: signal, nextSignal: nextSignal))
proc setCancelled[T](ctx: TaskCtx[T]) =
# withLock(ctxLock):
ctx[].cancelled = true
proc setRunning[T](ctx: TaskCtx[T]): bool =
# withLock(ctxLock):
if ctx[].cancelled:
return false
ctx[].running = true
return true
proc setDone[T](ctx: TaskCtx[T]) =
# withLock(ctxLock):
ctx[].running = false
proc acquireSignal(): ?!ThreadSignalPtr =
# echo "signal:OPEN!"
let signal = ThreadSignalPtr.new()
if signal.isErr():
failure (ref CatchableError)(msg: "failed to aquire ThreadSignalPtr: " & signal.error())
@ -113,33 +104,31 @@ template executeTask[T](ctx: TaskCtx[T], blk: untyped) =
ctx.setDone()
discard ctx[].signal.fireSync()
template dispatchTaskWrap[T](self: ThreadDatastore,
signal: ThreadSignalPtr,
blk: untyped
): auto =
case self.backend.kind:
of Sqlite:
var ds {.used, inject.} = self.backend.sql
proc runTask() =
`blk`
runTask()
await wait(ctx[].signal)
template dispatchTaskWrap[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =
var ds {.used, inject.} = self.backend
proc runTask() =
`blk`
runTask()
await wait(ctx[].signal)
template dispatchTask[T](self: ThreadDatastore,
template dispatchTask[BT](self: ThreadDatastore[BT],
signal: ThreadSignalPtr,
blk: untyped
): auto =
): auto =
## handles dispatching a task from an async context
## `blk` is the actions, it has `ctx` and `ds` variables in scope.
## note that `ds` is a generic
let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal))
try:
dispatchTaskWrap[T](self, signal, blk)
dispatchTaskWrap[BT](self, signal, blk)
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
raise exc
finally:
# echo "signal:CLOSE!"
discard ctx[].signal.close()
self.semaphore.release()
@ -149,38 +138,39 @@ proc hasTask[T, DB](ctx: TaskCtx[T], ds: DB, key: KeyId) {.gcsafe.} =
executeTask(ctx):
has(ds, key)
method has*(self: ThreadDatastore,
key: Key): Future[?!bool] {.async.} =
method has*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!bool] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
let key = KeyId.new key.id()
dispatchTask[bool](self, signal):
let ctx = newTaskCtx(bool, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn hasTask(ctx, ds, key)
return ctx[].res.toRes(v => v)
proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
method deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe.} =
## run backend command
executeTask(ctx):
delete(ds, key)
method delete*(self: ThreadDatastore,
method delete*[BT](self: ThreadDatastore[BT],
key: Key): Future[?!void] {.async.} =
## delete key
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
let key = KeyId.new key.id()
dispatchTask[void](self, signal):
let ctx = newTaskCtx(void, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn deleteTask(ctx, ds, key)
return ctx[].res.toRes()
method delete*(self: ThreadDatastore,
method delete*[BT](self: ThreadDatastore[BT],
keys: seq[Key]): Future[?!void] {.async.} =
## delete batch
for key in keys:
@ -196,7 +186,7 @@ proc putTask[T, DB](ctx: TaskCtx[T], ds: DB;
executeTask(ctx):
put(ds, key, data)
method put*(self: ThreadDatastore,
method put*[BT](self: ThreadDatastore[BT],
key: Key,
data: seq[byte]): Future[?!void] {.async.} =
## put key with data
@ -204,15 +194,16 @@ method put*(self: ThreadDatastore,
without signal =? acquireSignal(), err:
return failure err
dispatchTask[void](self, signal):
let ctx = newTaskCtx(void, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
let data = DataBuffer.new data
self.tp.spawn putTask(ctx, ds, key, data)
return ctx[].res.toRes()
method put*(
self: ThreadDatastore,
method put*[DB](
self: ThreadDatastore[DB],
batch: seq[BatchEntry]): Future[?!void] {.async.} =
## put batch data
for entry in batch:
@ -222,42 +213,38 @@ method put*(
return success()
proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
method getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
key: KeyId) {.gcsafe, nimcall.} =
## run backend command
executeTask(ctx):
let res = get(ds, key)
res
method get*(self: ThreadDatastore,
key: Key,
): Future[?!seq[byte]] {.async.} =
method get*[BT](self: ThreadDatastore[BT],
key: Key,
): Future[?!seq[byte]] {.async.} =
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
let key = KeyId.new key.id()
dispatchTask[DataBuffer](self, signal):
let ctx = newTaskCtx(DataBuffer, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn getTask(ctx, ds, key)
return ctx[].res.toRes(v => v.toSeq())
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
await self.semaphore.closeAll()
case self.backend.kind:
of Sqlite:
self.backend.sql.close()
self.backend.close()
type
QResult = DbQueryResponse[KeyId, DataBuffer]
import os
proc queryTask[DB](
method queryTask[DB](
ctx: TaskCtx[QResult],
ds: DB,
query: DbQuery[KeyId],
nextSignal: ThreadSignalPtr
) {.gcsafe, nimcall.} =
## run query command
executeTask(ctx):
@ -271,8 +258,8 @@ proc queryTask[DB](
# otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), )
discard ctx[].signal.fireSync()
if not nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
if not ctx[].nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "queryTask timed out")
var handle = handleRes.get()
for item in handle.iter():
@ -287,13 +274,13 @@ proc queryTask[DB](
exc
discard ctx[].signal.fireSync()
discard nextSignal.waitSync().get()
if not ctx[].nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "queryTask timed out")
# set final result
(?!QResult).ok((KeyId.none, DataBuffer()))
method query*(self: ThreadDatastore,
method query*[BT](self: ThreadDatastore[BT],
q: Query
): Future[?!QueryIter] {.async.} =
## performs async query
@ -304,6 +291,16 @@ method query*(self: ThreadDatastore,
return failure err
without nextSignal =? acquireSignal(), err:
return failure err
let ctx = newTaskCtx(QResult, signal=signal, nextSignal=nextSignal)
proc iterDispose() {.async.} =
# echo "signal:CLOSE!"
ctx.setCancelled()
await ctx[].nextSignal.fire()
discard ctx[].signal.close()
# echo "nextSignal:CLOSE!"
discard ctx[].nextSignal.close()
self.semaphore.release()
try:
let query = dbQuery(
@ -311,16 +308,17 @@ method query*(self: ThreadDatastore,
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
# setup initial queryTask
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
await nextSignal.fire()
dispatchTaskWrap(self, signal):
self.tp.spawn queryTask(ctx, ds, query)
await ctx[].nextSignal.fire()
var
lock = newAsyncLock() # serialize querying under threads
iter = QueryIter.new()
var lock = newAsyncLock() # serialize querying under threads
var iter = QueryIter.new()
iter.dispose = proc (): Future[?!void] {.async.} =
iterDispose()
success()
proc next(): Future[?!QueryResponse] {.async.} =
iter.next = proc(): Future[?!QueryResponse] {.async.} =
let ctx = ctx
try:
trace "About to query"
@ -330,12 +328,11 @@ method query*(self: ThreadDatastore,
return failure (ref QueryEndedError)(msg: "Calling next on a finished query!")
await wait(ctx[].signal)
if not ctx[].running:
iter.finished = true
defer:
await nextSignal.fire()
await ctx[].nextSignal.fire()
if ctx[].res.isErr():
return err(ctx[].res.error())
@ -347,37 +344,24 @@ method query*(self: ThreadDatastore,
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
discard ctx[].signal.close()
discard nextSignal.close()
self.semaphore.release()
await iterDispose() # todo: is this valid?
raise exc
iter.next = next
return success iter
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
discard signal.close()
discard nextSignal.close()
self.semaphore.release()
await iterDispose()
raise exc
proc new*[DB](self: type ThreadDatastore,
db: DB,
withLocks = static false,
tp: Taskpool
): ?!ThreadDatastore =
): ?!ThreadDatastore[DB] =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
when DB is SQLiteBackend[KeyId,DataBuffer]:
let backend = ThreadBackend(kind: Sqlite, sql: db)
else:
{.error: "unsupported backend: " & $typeof(db).}
success ThreadDatastore(
success ThreadDatastore[DB](
tp: tp,
backend: backend,
# TODO: are these needed anymore??
# withLocks: withLocks,
# queryLock: newAsyncLock(),
backend: db,
semaphore: AsyncSemaphore.new(tp.numThreads - 1)
)

View File

@ -1,5 +1,6 @@
import std/atomics
import std/options
import std/locks
import pkg/questionable/results
import pkg/results
@ -51,3 +52,24 @@ proc toRes*[T,S](res: ThreadResult[T],
result.err res.error().toExc()
else:
result.ok m(res.get())
type
MutexSignal* = tuple[lock: Lock, cond: Cond, open: bool]
proc init*(sig: var MutexSignal) =
sig.lock.initLock()
sig.cond.initCond()
sig.open = true
proc wait*(sig: var MutexSignal) =
withLock(sig.lock):
wait(sig.cond, sig.lock)
proc fire*(sig: var MutexSignal) =
withLock(sig.lock):
signal(sig.cond)
proc close*(sig: var MutexSignal) =
if sig.open:
sig.lock.deinitLock()
sig.cond.deinitCond()

View File

@ -22,88 +22,65 @@ import pkg/datastore/threads/threadproxyds {.all.}
import ./dscommontests
import ./querycommontests
const NumThreads = 20 # IO threads aren't attached to CPU count
const
NumThreads = 20 # IO threads aren't attached to CPU count
ThreadTestLoops {.intdefine.} = 1
N = ThreadTestLoops
ThreadTestInnerLoops {.intdefine.} = 1
M = ThreadTestInnerLoops
suite "Test Basic ThreadProxyDatastore":
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a").tryGet()
data = "some bytes".toBytes
var
taskPool: Taskpool = Taskpool.new(NumThreads)
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
for i in 1..N:
suite "Test Basic ThreadDatastore with SQLite " & $i:
teardownAll:
echo "teardown done"
var
sqlStore: SQLiteBackend[KeyId, DataBuffer]
ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
test "check put":
echo "\n\n=== put ==="
let res1 = await ds.put(key, data)
echo "res1: ", res1.repr
check res1.isOk
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
# taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
test "check get":
echo "\n\n=== get ==="
echo "get send key: ", key.repr
let res2 = await ds.get(key)
echo "get key post: ", key.repr
echo "get res2: ", res2.repr
echo res2.get() == data
var val = ""
for c in res2.get():
val &= char(c)
echo "get res2: ", $val
teardown:
GC_fullCollect()
suite "Test Basic ThreadDatastore with SQLite":
teardownAll:
(await ds.close()).tryGet()
# taskPool.shutdown()
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
for i in 1..M:
basicStoreTests(ds, key, bytes, otherBytes)
GC_fullCollect()
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
teardown:
GC_fullCollect()
for i in 1..N:
suite "Test Query ThreadDatastore with SQLite " & $i:
teardownAll:
(await ds.close()).tryGet()
taskPool.shutdown()
var
sqlStore: SQLiteBackend[KeyId, DataBuffer]
# taskPool: Taskpool
ds: ThreadDatastore[SQLiteBackend[KeyId, DataBuffer]]
basicStoreTests(ds, key, bytes, otherBytes)
setup:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
# taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
suite "Test Query ThreadDatastore with SQLite":
teardown:
GC_fullCollect()
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
(await ds.close()).tryGet()
# taskPool.shutdown()
setup:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
teardown:
GC_fullCollect()
(await ds.close()).tryGet()
taskPool.shutdown()
queryTests(ds, true)
for i in 1..M:
queryTests(ds, true)
GC_fullCollect()
# suite "Test Basic ThreadDatastore with fsds":
# let