mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-05 15:13:14 +00:00
fix merge
This commit is contained in:
commit
a4f0c48392
@ -203,7 +203,7 @@ method put*[BT](self: ThreadDatastore[BT],
|
||||
|
||||
return ctx[].res.toRes()
|
||||
|
||||
method put*(
|
||||
method put*[DB](
|
||||
self: ThreadDatastore,
|
||||
batch: seq[BatchEntry]): Future[?!void] {.async.} =
|
||||
## put batch data
|
||||
@ -214,7 +214,7 @@ method put*(
|
||||
return success()
|
||||
|
||||
|
||||
proc getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
|
||||
method getTask[DB](ctx: TaskCtx[DataBuffer], ds: DB;
|
||||
key: KeyId) {.gcsafe, nimcall.} =
|
||||
## run backend command
|
||||
executeTask(ctx):
|
||||
@ -242,9 +242,7 @@ method close*[BT](self: ThreadDatastore[BT]): Future[?!void] {.async.} =
|
||||
type
|
||||
QResult = DbQueryResponse[KeyId, DataBuffer]
|
||||
|
||||
import os
|
||||
|
||||
proc queryTask[DB](
|
||||
method queryTask[DB](
|
||||
ctx: TaskCtx[QResult],
|
||||
ds: DB,
|
||||
query: DbQuery[KeyId],
|
||||
@ -339,6 +337,7 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
ctx.setCancelled()
|
||||
discard ctx[].signal.close()
|
||||
echo "nextSignal:CLOSE!"
|
||||
discard nextSignal.close()
|
||||
self.semaphore.release()
|
||||
raise exc
|
||||
@ -348,6 +347,7 @@ method query*[BT](self: ThreadDatastore[BT],
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
discard signal.close()
|
||||
echo "nextSignal:CLOSE!"
|
||||
discard nextSignal.close()
|
||||
self.semaphore.release()
|
||||
raise exc
|
||||
@ -362,8 +362,5 @@ proc new*[DB](self: type ThreadDatastore,
|
||||
success ThreadDatastore[DB](
|
||||
tp: tp,
|
||||
backend: db,
|
||||
# TODO: are these needed anymore??
|
||||
# withLocks: withLocks,
|
||||
# queryLock: newAsyncLock(),
|
||||
semaphore: AsyncSemaphore.new(tp.numThreads - 1)
|
||||
)
|
||||
|
||||
@ -22,74 +22,62 @@ import pkg/datastore/threads/threadproxyds {.all.}
|
||||
import ./dscommontests
|
||||
import ./querycommontests
|
||||
|
||||
const NumThreads = 20 # IO threads aren't attached to CPU count
|
||||
const
|
||||
NumThreads = 20 # IO threads aren't attached to CPU count
|
||||
ThreadTestLoops {.intdefine.} = 10
|
||||
N = ThreadTestLoops
|
||||
|
||||
suite "Test Basic ThreadProxyDatastore":
|
||||
for i in 1..N:
|
||||
suite "Test Basic ThreadDatastore with SQLite":
|
||||
|
||||
var
|
||||
key = Key.init("/a").tryGet()
|
||||
data = "some bytes".toBytes
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
var
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
key = Key.init("/a/b").tryGet()
|
||||
bytes = "some bytes".toBytes
|
||||
otherBytes = "some other bytes".toBytes
|
||||
|
||||
teardownAll:
|
||||
echo "teardown done"
|
||||
setupAll:
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
|
||||
test "check put":
|
||||
echo "\n\n=== put ==="
|
||||
let res1 = await ds.put(key, data)
|
||||
echo "res1: ", res1.repr
|
||||
check res1.isOk
|
||||
teardown:
|
||||
GC_fullCollect()
|
||||
|
||||
test "check get":
|
||||
echo "\n\n=== get ==="
|
||||
echo "get send key: ", key.repr
|
||||
let res2 = await ds.get(key)
|
||||
echo "get key post: ", key.repr
|
||||
echo "get res2: ", res2.repr
|
||||
echo res2.get() == data
|
||||
var val = ""
|
||||
for c in res2.get():
|
||||
val &= char(c)
|
||||
echo "get res2: ", $val
|
||||
teardownAll:
|
||||
(await ds.close()).tryGet()
|
||||
taskPool.shutdown()
|
||||
|
||||
suite "Test Basic ThreadDatastore with SQLite":
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
GC_fullCollect()
|
||||
|
||||
var
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
key = Key.init("/a/b").tryGet()
|
||||
bytes = "some bytes".toBytes
|
||||
otherBytes = "some other bytes".toBytes
|
||||
for i in 1..N:
|
||||
suite "Test Query ThreadDatastore with SQLite":
|
||||
|
||||
teardown:
|
||||
GC_fullCollect()
|
||||
var
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
key = Key.init("/a/b").tryGet()
|
||||
bytes = "some bytes".toBytes
|
||||
otherBytes = "some other bytes".toBytes
|
||||
|
||||
|
||||
teardownAll:
|
||||
(await ds.close()).tryGet()
|
||||
taskPool.shutdown()
|
||||
setup:
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
|
||||
basicStoreTests(ds, key, bytes, otherBytes)
|
||||
teardown:
|
||||
GC_fullCollect()
|
||||
|
||||
suite "Test Query ThreadDatastore with SQLite":
|
||||
(await ds.close()).tryGet()
|
||||
taskPool.shutdown()
|
||||
|
||||
var
|
||||
sqlStore = newSQLiteBackend[KeyId, DataBuffer](Memory).tryGet()
|
||||
taskPool = Taskpool.new(NumThreads)
|
||||
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
|
||||
key = Key.init("/a/b").tryGet()
|
||||
bytes = "some bytes".toBytes
|
||||
otherBytes = "some other bytes".toBytes
|
||||
|
||||
teardown:
|
||||
GC_fullCollect()
|
||||
|
||||
(await ds.close()).tryGet()
|
||||
taskPool.shutdown()
|
||||
|
||||
queryTests(ds, true)
|
||||
queryTests(ds, true)
|
||||
GC_fullCollect()
|
||||
|
||||
# suite "Test Basic ThreadDatastore with fsds":
|
||||
# let
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user