mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-06 23:53:09 +00:00
whats up with windows + questionable + generics
This commit is contained in:
parent
e6afc68cab
commit
eba40334b8
@ -178,6 +178,12 @@ proc close*[K,V](self: FSDatastore[K,V]): ?!void =
|
|||||||
return success()
|
return success()
|
||||||
|
|
||||||
type
|
type
|
||||||
|
FsQueryHandle*[K, V] = object
|
||||||
|
query*: DbQuery[K]
|
||||||
|
cancel*: bool
|
||||||
|
closed*: bool
|
||||||
|
env*: FsQueryEnv[K,V]
|
||||||
|
|
||||||
FsQueryEnv*[K,V] = object
|
FsQueryEnv*[K,V] = object
|
||||||
self: FSDatastore[K,V]
|
self: FSDatastore[K,V]
|
||||||
basePath: DataBuffer
|
basePath: DataBuffer
|
||||||
@ -185,7 +191,7 @@ type
|
|||||||
proc query*[K,V](
|
proc query*[K,V](
|
||||||
self: FSDatastore[K,V],
|
self: FSDatastore[K,V],
|
||||||
query: DbQuery[K],
|
query: DbQuery[K],
|
||||||
): Result[DbQueryHandle[K, V, FsQueryEnv[K,V]], ref CatchableError] =
|
): Result[FsQueryHandle[K, V], ref CatchableError] =
|
||||||
|
|
||||||
let key = query.key
|
let key = query.key
|
||||||
without path =? self.findPath(key), error:
|
without path =? self.findPath(key), error:
|
||||||
@ -202,14 +208,15 @@ proc query*[K,V](
|
|||||||
path.changeFileExt("")
|
path.changeFileExt("")
|
||||||
|
|
||||||
let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath))
|
let env = FsQueryEnv[K,V](self: self, basePath: DataBuffer.new(basePath))
|
||||||
success DbQueryHandle[KeyId, V, FsQueryEnv[K,V]](query: query, env: env)
|
success FsQueryHandle[K, V](query: query, env: env)
|
||||||
|
|
||||||
proc close*[K,V](handle: var DbQueryHandle[K,V,FsQueryEnv[K,V]]) =
|
proc close*[K,V](handle: var DbQueryHandle[K,V,FsQueryEnv[K,V]]) =
|
||||||
if not handle.closed:
|
if not handle.closed:
|
||||||
handle.closed = true
|
handle.closed = true
|
||||||
|
|
||||||
iterator iter*[K, V](handle: var DbQueryHandle[K, V, FsQueryEnv[K,V]]
|
iterator queryIter*[K, V](
|
||||||
): ?!DbQueryResponse[K, V] =
|
handle: var FsQueryHandle[K, V]
|
||||||
|
): ?!DbQueryResponse[K, V] =
|
||||||
let root = $(handle.env.self.root)
|
let root = $(handle.env.self.root)
|
||||||
let basePath = $(handle.env.basePath)
|
let basePath = $(handle.env.basePath)
|
||||||
|
|
||||||
|
|||||||
@ -103,10 +103,17 @@ proc close*[K,V](self: SQLiteBackend[K,V]): ?!void =
|
|||||||
|
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
type
|
||||||
|
SqQueryHandle*[K, V] = object
|
||||||
|
query*: DbQuery[K]
|
||||||
|
cancel*: bool
|
||||||
|
closed*: bool
|
||||||
|
env*: RawStmtPtr
|
||||||
|
|
||||||
proc query*[K,V](
|
proc query*[K,V](
|
||||||
self: SQLiteBackend[K,V],
|
self: SQLiteBackend[K,V],
|
||||||
query: DbQuery[K]
|
query: DbQuery[K]
|
||||||
): Result[DbQueryHandle[K,V,RawStmtPtr], ref CatchableError] =
|
): Result[SqQueryHandle[K,V], ref CatchableError] =
|
||||||
|
|
||||||
var
|
var
|
||||||
queryStr = if query.value:
|
queryStr = if query.value:
|
||||||
@ -151,16 +158,18 @@ proc query*[K,V](
|
|||||||
if not (v == SQLITE_OK):
|
if not (v == SQLITE_OK):
|
||||||
return failure newException(DatastoreError, $sqlite3_errstr(v))
|
return failure newException(DatastoreError, $sqlite3_errstr(v))
|
||||||
|
|
||||||
success DbQueryHandle[K,V,RawStmtPtr](query: query, env: s)
|
success SqQueryHandle[K,V](query: query, env: s)
|
||||||
|
|
||||||
proc close*[K,V](handle: var DbQueryHandle[K,V,RawStmtPtr]) =
|
proc close*[K,V](handle: var SqQueryHandle[K,V]) =
|
||||||
if not handle.closed:
|
if not handle.closed:
|
||||||
handle.closed = true
|
handle.closed = true
|
||||||
discard sqlite3_reset(handle.env)
|
discard sqlite3_reset(handle.env)
|
||||||
discard sqlite3_clear_bindings(handle.env)
|
discard sqlite3_clear_bindings(handle.env)
|
||||||
handle.env.dispose()
|
handle.env.dispose()
|
||||||
|
|
||||||
iterator iter*[K, V](handle: var DbQueryHandle[K, V, RawStmtPtr]): ?!DbQueryResponse[K, V] =
|
iterator queyIter*[K, V](
|
||||||
|
handle: var SqQueryHandle[K, V]
|
||||||
|
): ?!DbQueryResponse[K, V] =
|
||||||
while not handle.cancel:
|
while not handle.cancel:
|
||||||
|
|
||||||
let v = sqlite3_step(handle.env)
|
let v = sqlite3_step(handle.env)
|
||||||
|
|||||||
@ -23,6 +23,7 @@ import ../key
|
|||||||
import ../query
|
import ../query
|
||||||
import ../datastore
|
import ../datastore
|
||||||
import ../backend
|
import ../backend
|
||||||
|
import ../fsds
|
||||||
import ../sql/sqliteds
|
import ../sql/sqliteds
|
||||||
|
|
||||||
import ./asyncsemaphore
|
import ./asyncsemaphore
|
||||||
@ -251,10 +252,13 @@ method queryTask[DB](
|
|||||||
query: DbQuery[KeyId],
|
query: DbQuery[KeyId],
|
||||||
) {.gcsafe, nimcall.} =
|
) {.gcsafe, nimcall.} =
|
||||||
## run query command
|
## run query command
|
||||||
|
mixin queryIter
|
||||||
executeTask(ctx):
|
executeTask(ctx):
|
||||||
# we execute this all inside `executeTask`
|
# we execute this all inside `executeTask`
|
||||||
# so we need to return a final result
|
# so we need to return a final result
|
||||||
let handleRes = ds.query(query)
|
let handleRes = query(ds, query)
|
||||||
|
static:
|
||||||
|
echo "HANDLE_RES: ", typeof(handleRes)
|
||||||
if handleRes.isErr():
|
if handleRes.isErr():
|
||||||
# set error and exit executeTask, which will fire final signal
|
# set error and exit executeTask, which will fire final signal
|
||||||
(?!QResult).err(handleRes.error())
|
(?!QResult).err(handleRes.error())
|
||||||
@ -266,7 +270,9 @@ method queryTask[DB](
|
|||||||
raise newException(DeadThreadDefect, "queryTask timed out")
|
raise newException(DeadThreadDefect, "queryTask timed out")
|
||||||
|
|
||||||
var handle = handleRes.get()
|
var handle = handleRes.get()
|
||||||
for item in handle.iter():
|
static:
|
||||||
|
echo "HANDLE: ", typeof(handle)
|
||||||
|
for item in handle.queyIter():
|
||||||
# wait for next request from async thread
|
# wait for next request from async thread
|
||||||
|
|
||||||
if ctx[].cancelled:
|
if ctx[].cancelled:
|
||||||
|
|||||||
@ -15,8 +15,8 @@ import pkg/questionable/results
|
|||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import pkg/threading/smartptrs
|
import pkg/threading/smartptrs
|
||||||
|
|
||||||
import pkg/datastore/sql/sqliteds
|
|
||||||
import pkg/datastore/fsds
|
import pkg/datastore/fsds
|
||||||
|
import pkg/datastore/sql/sqliteds
|
||||||
import pkg/datastore/threads/threadproxyds {.all.}
|
import pkg/datastore/threads/threadproxyds {.all.}
|
||||||
|
|
||||||
import ./dscommontests
|
import ./dscommontests
|
||||||
@ -82,40 +82,39 @@ for i in 1..N:
|
|||||||
queryTests(ds, true)
|
queryTests(ds, true)
|
||||||
GC_fullCollect()
|
GC_fullCollect()
|
||||||
|
|
||||||
# suite "Test Basic ThreadDatastore with fsds":
|
suite "Test Basic ThreadDatastore with fsds":
|
||||||
# let
|
let
|
||||||
# path = currentSourcePath() # get this file's name
|
path = currentSourcePath() # get this file's name
|
||||||
# basePath = "tests_data"
|
basePath = "tests_data"
|
||||||
# basePathAbs = path.parentDir / basePath
|
basePathAbs = path.parentDir / basePath
|
||||||
# key = Key.init("/a/b").tryGet()
|
key = Key.init("/a/b").tryGet()
|
||||||
# bytes = "some bytes".toBytes
|
bytes = "some bytes".toBytes
|
||||||
# otherBytes = "some other bytes".toBytes
|
otherBytes = "some other bytes".toBytes
|
||||||
|
|
||||||
# var
|
var
|
||||||
# fsStore: FSDatastore
|
fsStore: FSDatastore[KeyId, DataBuffer]
|
||||||
# ds: ThreadDatastore
|
ds: ThreadDatastore[FSDatastore[KeyId, DataBuffer]]
|
||||||
# taskPool: Taskpool
|
taskPool: Taskpool
|
||||||
|
|
||||||
# setupAll:
|
setupAll:
|
||||||
# removeDir(basePathAbs)
|
removeDir(basePathAbs)
|
||||||
# require(not dirExists(basePathAbs))
|
require(not dirExists(basePathAbs))
|
||||||
# createDir(basePathAbs)
|
createDir(basePathAbs)
|
||||||
|
|
||||||
# fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet()
|
fsStore = newFSDatastore[KeyId, DataBuffer](root = basePathAbs, depth = 3).tryGet()
|
||||||
# taskPool = Taskpool.new(NumThreads)
|
ds = ThreadDatastore.new(fsStore, tp = taskPool).tryGet()
|
||||||
# ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
|
|
||||||
|
|
||||||
# teardown:
|
teardown:
|
||||||
# GC_fullCollect()
|
GC_fullCollect()
|
||||||
|
|
||||||
# teardownAll:
|
teardownAll:
|
||||||
# (await ds.close()).tryGet()
|
(await ds.close()).tryGet()
|
||||||
# taskPool.shutdown()
|
|
||||||
|
|
||||||
# removeDir(basePathAbs)
|
removeDir(basePathAbs)
|
||||||
# require(not dirExists(basePathAbs))
|
require(not dirExists(basePathAbs))
|
||||||
|
|
||||||
|
basicStoreTests(ds, key, bytes, otherBytes)
|
||||||
|
|
||||||
# basicStoreTests(fsStore, key, bytes, otherBytes)
|
|
||||||
|
|
||||||
# suite "Test Query ThreadDatastore with fsds":
|
# suite "Test Query ThreadDatastore with fsds":
|
||||||
# let
|
# let
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user