fixup databuffer

This commit is contained in:
Jaremy Creechley 2023-09-26 16:12:55 -07:00
parent 892ec385ee
commit 555f1fe77a
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
2 changed files with 117 additions and 114 deletions

View File

@ -43,13 +43,13 @@ type
of Sqlite:
sql*: SQLiteBackend[KeyId,DataBuffer]
TaskCtxObj[T: ThreadTypes] = object
TaskCtxObj*[T: ThreadTypes] = object
res: ThreadResult[T]
signal: ThreadSignalPtr
running: bool
cancelled: bool
TaskCtx[T] = SharedPtr[TaskCtxObj[T]]
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
ThreadDatastore* = ref object of Datastore
tp: Taskpool
@ -313,17 +313,22 @@ method query*(
iter.next = next
return success iter
proc new*(self: type ThreadDatastore,
backend: ThreadBackendKinds,
proc new*[DB](self: type ThreadDatastore,
db: DB,
withLocks = static false,
tp: Taskpool
): ?!ThreadDatastore =
doAssert tp.numThreads > 1, "ThreadDatastore requires at least 2 threads"
when DB is SQLiteBackend[KeyId,DataBuffer]:
let backend = ThreadBackend(kind: Sqlite, sql: db)
else:
{.error: "unsupported backend: " & $typeof(db).}
success ThreadDatastore(
tp: tp,
ds: ThreadBackend(),
withLocks: withLocks,
queryLock: newAsyncLock(),
backend: backend,
# withLocks: withLocks,
# queryLock: newAsyncLock(),
semaphore: AsyncSemaphore.new(tp.numThreads - 1)
)

View File

@ -13,8 +13,9 @@ import pkg/stew/byteutils
import pkg/taskpools
import pkg/questionable/results
import pkg/chronicles
import pkg/threading/smartptrs
import pkg/datastore/sql
import pkg/datastore/sql/sqliteds
import pkg/datastore/fsds
import pkg/datastore/threads/threadproxyds {.all.}
@ -26,7 +27,7 @@ const NumThreads = 200 # IO threads aren't attached to CPU count
suite "Test Basic ThreadDatastore with SQLite":
var
sqlStore: Datastore
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
@ -34,7 +35,7 @@ suite "Test Basic ThreadDatastore with SQLite":
otherBytes = "some other bytes".toBytes
setupAll:
sqlStore = SQLiteDatastore.new(Memory).tryGet()
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
@ -50,7 +51,7 @@ suite "Test Basic ThreadDatastore with SQLite":
suite "Test Query ThreadDatastore with SQLite":
var
sqlStore: Datastore
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
@ -58,7 +59,7 @@ suite "Test Query ThreadDatastore with SQLite":
otherBytes = "some other bytes".toBytes
setup:
sqlStore = SQLiteDatastore.new(Memory).tryGet()
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
@ -70,74 +71,74 @@ suite "Test Query ThreadDatastore with SQLite":
queryTests(ds, true)
suite "Test Basic ThreadDatastore with fsds":
let
path = currentSourcePath() # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
# suite "Test Basic ThreadDatastore with fsds":
# let
# path = currentSourcePath() # get this file's name
# basePath = "tests_data"
# basePathAbs = path.parentDir / basePath
# key = Key.init("/a/b").tryGet()
# bytes = "some bytes".toBytes
# otherBytes = "some other bytes".toBytes
var
fsStore: FSDatastore
ds: ThreadDatastore
taskPool: Taskpool
# var
# fsStore: FSDatastore
# ds: ThreadDatastore
# taskPool: Taskpool
setupAll:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
# setupAll:
# removeDir(basePathAbs)
# require(not dirExists(basePathAbs))
# createDir(basePathAbs)
fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
# fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet()
# taskPool = Taskpool.new(NumThreads)
# ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
teardown:
GC_fullCollect()
# teardown:
# GC_fullCollect()
teardownAll:
(await ds.close()).tryGet()
taskPool.shutdown()
# teardownAll:
# (await ds.close()).tryGet()
# taskPool.shutdown()
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
# removeDir(basePathAbs)
# require(not dirExists(basePathAbs))
basicStoreTests(fsStore, key, bytes, otherBytes)
# basicStoreTests(fsStore, key, bytes, otherBytes)
suite "Test Query ThreadDatastore with fsds":
let
path = currentSourcePath() # get this file's name
basePath = "tests_data"
basePathAbs = path.parentDir / basePath
# suite "Test Query ThreadDatastore with fsds":
# let
# path = currentSourcePath() # get this file's name
# basePath = "tests_data"
# basePathAbs = path.parentDir / basePath
var
fsStore: FSDatastore
ds: ThreadDatastore
taskPool: Taskpool
# var
# fsStore: FSDatastore
# ds: ThreadDatastore
# taskPool: Taskpool
setup:
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
createDir(basePathAbs)
# setup:
# removeDir(basePathAbs)
# require(not dirExists(basePathAbs))
# createDir(basePathAbs)
fsStore = FSDatastore.new(root = basePathAbs, depth = 5).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
# fsStore = FSDatastore.new(root = basePathAbs, depth = 5).tryGet()
# taskPool = Taskpool.new(NumThreads)
# ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
teardown:
GC_fullCollect()
(await ds.close()).tryGet()
taskPool.shutdown()
# teardown:
# GC_fullCollect()
# (await ds.close()).tryGet()
# taskPool.shutdown()
removeDir(basePathAbs)
require(not dirExists(basePathAbs))
# removeDir(basePathAbs)
# require(not dirExists(basePathAbs))
queryTests(ds, false)
# queryTests(ds, false)
suite "Test ThreadDatastore cancelations":
var
sqlStore: Datastore
sqlStore: SQLiteBackend[KeyId,DataBuffer]
ds: ThreadDatastore
taskPool: Taskpool
@ -145,7 +146,7 @@ suite "Test ThreadDatastore cancelations":
privateAccess(TaskCtx) # expose private fields
setupAll:
sqlStore = SQLiteDatastore.new(Memory).tryGet()
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
@ -156,66 +157,63 @@ suite "Test ThreadDatastore cancelations":
(await ds.close()).tryGet()
taskPool.shutdown()
test "Should monitor signal and cancel":
var
signal = ThreadSignalPtr.new().tryGet()
res = ThreadResult[void]()
ctx = TaskCtx[void](
ds: sqlStore,
res: addr res,
signal: signal)
fut = newFuture[void]("signalMonitor")
threadArgs = (addr ctx, addr fut)
thread: Thread[type threadArgs]
# test "Should monitor signal and cancel":
# var
# signal = ThreadSignalPtr.new().tryGet()
# res = ThreadResult[void]()
# ctx = newSharedPtr(TaskCtxObj[void](signal: signal))
# fut = newFuture[void]("signalMonitor")
# threadArgs = (addr ctx, addr fut)
# thread: Thread[type threadArgs]
proc threadTask(args: type threadArgs) =
var (ctx, fut) = args
proc asyncTask() {.async.} =
let
monitor = signalMonitor(ctx, fut[])
# proc threadTask(args: type threadArgs) =
# var (ctx, fut) = args
# proc asyncTask() {.async.} =
# let
# monitor = signalMonitor(ctx, fut[])
await monitor
# await monitor
waitFor asyncTask()
# waitFor asyncTask()
createThread(thread, threadTask, threadArgs)
ctx.cancelled = true
check: ctx.signal.fireSync.tryGet
# createThread(thread, threadTask, threadArgs)
# ctx.cancelled = true
# check: ctx.signal.fireSync.tryGet
joinThreads(thread)
# joinThreads(thread)
check: fut.cancelled
check: ctx.signal.close().isOk
fut = nil
# check: fut.cancelled
# check: ctx.signal.close().isOk
# fut = nil
test "Should monitor and not cancel":
var
signal = ThreadSignalPtr.new().tryGet()
res = ThreadResult[void]()
ctx = TaskCtx[void](
ds: sqlStore,
res: addr res,
signal: signal)
fut = newFuture[void]("signalMonitor")
threadArgs = (addr ctx, addr fut)
thread: Thread[type threadArgs]
# test "Should monitor and not cancel":
# var
# signal = ThreadSignalPtr.new().tryGet()
# res = ThreadResult[void]()
# ctx = TaskCtx[void](
# ds: sqlStore,
# res: addr res,
# signal: signal)
# fut = newFuture[void]("signalMonitor")
# threadArgs = (addr ctx, addr fut)
# thread: Thread[type threadArgs]
proc threadTask(args: type threadArgs) =
var (ctx, fut) = args
proc asyncTask() {.async.} =
let
monitor = signalMonitor(ctx, fut[])
# proc threadTask(args: type threadArgs) =
# var (ctx, fut) = args
# proc asyncTask() {.async.} =
# let
# monitor = signalMonitor(ctx, fut[])
await monitor
# await monitor
waitFor asyncTask()
# waitFor asyncTask()
createThread(thread, threadTask, threadArgs)
ctx.cancelled = false
check: ctx.signal.fireSync.tryGet
# createThread(thread, threadTask, threadArgs)
# ctx.cancelled = false
# check: ctx.signal.fireSync.tryGet
joinThreads(thread)
# joinThreads(thread)
check: not fut.cancelled
check: ctx.signal.close().isOk
fut = nil
# check: not fut.cancelled
# check: ctx.signal.close().isOk
# fut = nil