make Threadproxyds generic

This commit is contained in:
Jaremy Creechley 2023-09-27 12:02:04 -07:00
parent 5e424262c3
commit 49d846f468
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 19 additions and 33 deletions

View File

@ -55,7 +55,7 @@ type
var ctxLock: Lock
ctxLock.initLock()
proc newTaskCtx*[T](signal: ThreadSignalPtr): TaskCtx[T] =
proc newTaskCtx*[T](tp: typedesc[T], signal: ThreadSignalPtr): TaskCtx[T] =
newSharedPtr(TaskCtxObj[T](signal: signal))
proc setCancelled[T](ctx: TaskCtx[T]) =
@ -123,7 +123,6 @@ template dispatchTask[BT](self: ThreadDatastore[BT],
## handles dispatching a task from an async context
## `blk` is the actions, it has `ctx` and `ds` variables in scope.
## note that `ds` is a generic
# let ctx {.inject.} = newSharedPtr(TaskCtxObj[T](signal: signal))
try:
dispatchTaskWrap[BT](self, signal, blk)
except CancelledError as exc:
@ -146,13 +145,13 @@ method has*[BT](self: ThreadDatastore[BT],
without signal =? acquireSignal(), err:
return failure err
let ctx = newTaskCtx(bool, signal: signal)
let ctx = newTaskCtx(bool, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn hasTask(ctx, ds, key)
return ctx[].res.toRes(v => v)
proc deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
method deleteTask[T, DB](ctx: TaskCtx[T], ds: DB;
key: KeyId) {.gcsafe.} =
## run backend command
executeTask(ctx):
@ -165,7 +164,7 @@ method delete*[BT](self: ThreadDatastore[BT],
without signal =? acquireSignal(), err:
return failure err
let ctx = newTaskCtx[void](signal: signal)
let ctx = newTaskCtx(void, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn deleteTask(ctx, ds, key)
@ -196,7 +195,7 @@ method put*[BT](self: ThreadDatastore[BT],
without signal =? acquireSignal(), err:
return failure err
let ctx = newTaskCtx[void](signal: signal)
let ctx = newTaskCtx(void, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
let data = DataBuffer.new data
@ -229,8 +228,9 @@ method get*[BT](self: ThreadDatastore[BT],
without signal =? acquireSignal(), err:
return failure err
let ctx = newTaskCtx[void](signal: signal)
dispatchTask[DataBuffer](self, signal):
let ctx = newTaskCtx(DataBuffer, signal=signal)
dispatchTask(self, signal):
let key = KeyId.new key.id()
self.tp.spawn getTask(ctx, ds, key)
return ctx[].res.toRes(v => v.toSeq())
@ -302,8 +302,8 @@ method query*[BT](self: ThreadDatastore[BT],
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
# setup initial queryTask
let ctx = newTaskCtx[QResult](signal: signal)
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer], BT](self, signal):
let ctx = newTaskCtx(QResult, signal=signal)
dispatchTaskWrap(self, signal):
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
await nextSignal.fire()
@ -356,10 +356,10 @@ proc new*[DB](self: type ThreadDatastore,
db: DB,
withLocks = static false,
tp: Taskpool
): ?!ThreadDatastore =
): ?!ThreadDatastore[DB] =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
success ThreadDatastore(
success ThreadDatastore[DB](
tp: tp,
backend: db,
# TODO: are these needed anymore??

View File

@ -25,14 +25,10 @@ import ./querycommontests
const NumThreads = 20 # IO threads aren't attached to CPU count
suite "Test Basic ThreadProxyDatastore":
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a").tryGet()
data = "some bytes".toBytes
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
@ -61,17 +57,12 @@ suite "Test Basic ThreadProxyDatastore":
suite "Test Basic ThreadDatastore with SQLite":
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
setupAll:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
teardown:
GC_fullCollect()
@ -85,17 +76,12 @@ suite "Test Basic ThreadDatastore with SQLite":
suite "Test Query ThreadDatastore with SQLite":
var
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
setup:
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
teardown:
GC_fullCollect()