mirror of
https://github.com/logos-storage/nim-datastore.git
synced 2026-01-04 22:53:08 +00:00
Revert "cleanup"
This reverts commit 55b118c055833c28d6fca4eaae7aeba10a74c4b8.
This commit is contained in:
parent
55b118c055
commit
2255ea1c1c
@ -13,7 +13,7 @@ import ../datastore
|
||||
import ./backend
|
||||
import ./sql/sqliteds
|
||||
|
||||
export datastore, sqliteds
|
||||
export datastore
|
||||
|
||||
push: {.upraises: [].}
|
||||
|
||||
@ -42,8 +42,7 @@ method delete*(self: SQLiteDatastore,
|
||||
|
||||
method get*(self: SQLiteDatastore,
|
||||
key: Key): Future[?!seq[byte]] {.async.} =
|
||||
self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]:
|
||||
d.toSeq()
|
||||
self.db.get(KeyId.new key.id())
|
||||
|
||||
method put*(self: SQLiteDatastore,
|
||||
key: Key,
|
||||
@ -80,7 +79,7 @@ method queryIter*(
|
||||
yield QueryResponse.failure err
|
||||
let k = qres.key.map() do(k: KeyId) -> Key:
|
||||
Key.init($k).expect("valid key")
|
||||
let v: seq[byte] = qres.data.toSeq()
|
||||
let v: seq[byte] = qres.data
|
||||
yield success (k, v)
|
||||
|
||||
success iter
|
||||
@ -88,14 +87,20 @@ method queryIter*(
|
||||
proc new*(
|
||||
T: type SQLiteDatastore,
|
||||
path: string,
|
||||
readOnly = false): ?!SQLiteDatastore =
|
||||
readOnly = false): ?!T =
|
||||
|
||||
success SQLiteDatastore(
|
||||
db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly))
|
||||
let
|
||||
flags =
|
||||
if readOnly: SQLITE_OPEN_READONLY
|
||||
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
|
||||
|
||||
success T(
|
||||
db: ? SQLiteDsDb.open(path, flags),
|
||||
readOnly: readOnly)
|
||||
|
||||
proc new*(
|
||||
T: type SQLiteDatastore,
|
||||
db: SQLiteBackend[KeyId, DataBuffer]): ?!T =
|
||||
db: SQLiteDsDb): ?!T =
|
||||
|
||||
success T(
|
||||
db: db,
|
||||
|
||||
@ -50,7 +50,6 @@ type
|
||||
signal: ThreadSignalPtr
|
||||
running: bool
|
||||
cancelled: bool
|
||||
nextSignal: (Lock, Cond)
|
||||
|
||||
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
|
||||
|
||||
@ -252,6 +251,7 @@ proc queryTask[DB](
|
||||
ctx: TaskCtx[QResult],
|
||||
ds: DB,
|
||||
query: DbQuery[KeyId],
|
||||
nextSignal: ThreadSignalPtr
|
||||
) {.gcsafe, nimcall.} =
|
||||
## run query command
|
||||
executeTask(ctx):
|
||||
@ -265,11 +265,8 @@ proc queryTask[DB](
|
||||
# otherwise manually an set empty ok result
|
||||
ctx[].res.ok (KeyId.none, DataBuffer(), )
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
withLock(ctx[].nextSignal[0]):
|
||||
wait(ctx[].nextSignal[1], ctx[].nextSignal[0])
|
||||
# if not nextSignal.waitSync(10.seconds).get():
|
||||
# raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
|
||||
if not nextSignal.waitSync(10.seconds).get():
|
||||
raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
|
||||
|
||||
var handle = handleRes.get()
|
||||
for item in handle.iter():
|
||||
@ -285,9 +282,7 @@ proc queryTask[DB](
|
||||
|
||||
discard ctx[].signal.fireSync()
|
||||
|
||||
# discard nextSignal.waitSync().get()
|
||||
withLock(ctx[].nextSignal[0]):
|
||||
wait(ctx[].nextSignal[1], ctx[].nextSignal[0])
|
||||
discard nextSignal.waitSync().get()
|
||||
|
||||
# set final result
|
||||
(?!QResult).ok((KeyId.none, DataBuffer()))
|
||||
@ -301,11 +296,8 @@ method query*(self: ThreadDatastore,
|
||||
await self.semaphore.acquire()
|
||||
without signal =? acquireSignal(), err:
|
||||
return failure err
|
||||
# without nextSignal =? acquireSignal(), err:
|
||||
# return failure err
|
||||
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
|
||||
ctx[].nextSignal[0].initLock()
|
||||
ctx[].nextSignal[1].initCond()
|
||||
without nextSignal =? acquireSignal(), err:
|
||||
return failure err
|
||||
|
||||
try:
|
||||
let query = dbQuery(
|
||||
@ -313,10 +305,10 @@ method query*(self: ThreadDatastore,
|
||||
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
|
||||
|
||||
# setup initial queryTask
|
||||
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
|
||||
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
|
||||
self.tp.spawn queryTask(ctx, ds, query)
|
||||
withLock(ctx[].nextSignal[0]):
|
||||
signal(ctx[].nextSignal[1])
|
||||
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
|
||||
await nextSignal.fire()
|
||||
|
||||
var
|
||||
lock = newAsyncLock() # serialize querying under threads
|
||||
@ -337,8 +329,7 @@ method query*(self: ThreadDatastore,
|
||||
iter.finished = true
|
||||
|
||||
defer:
|
||||
withLock(ctx[].nextSignal[0]):
|
||||
signal(ctx[].nextSignal[1])
|
||||
await nextSignal.fire()
|
||||
|
||||
if ctx[].res.isErr():
|
||||
return err(ctx[].res.error())
|
||||
@ -351,7 +342,7 @@ method query*(self: ThreadDatastore,
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
ctx.setCancelled()
|
||||
discard ctx[].signal.close()
|
||||
# discard nextSignal.close()
|
||||
discard nextSignal.close()
|
||||
self.semaphore.release()
|
||||
raise exc
|
||||
|
||||
@ -360,7 +351,7 @@ method query*(self: ThreadDatastore,
|
||||
except CancelledError as exc:
|
||||
trace "Cancelling thread future!", exc = exc.msg
|
||||
discard signal.close()
|
||||
# discard nextSignal.close()
|
||||
discard nextSignal.close()
|
||||
self.semaphore.release()
|
||||
raise exc
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ import pkg/chronos
|
||||
import pkg/stew/results
|
||||
import pkg/stew/byteutils
|
||||
|
||||
import pkg/datastore/sql
|
||||
import pkg/datastore/sql/sqliteds
|
||||
|
||||
import ../dscommontests
|
||||
import ../querycommontests
|
||||
|
||||
@ -98,6 +98,7 @@ suite "AsyncSemaphore":
|
||||
resource.inc()
|
||||
check resource > 0 and resource <= 3
|
||||
let sleep = rand(0..10).millis
|
||||
# echo sleep
|
||||
await sleepAsync(sleep)
|
||||
finally:
|
||||
resource.dec()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user