This commit is contained in:
Jaremy Creechley 2023-09-26 19:56:17 -07:00
parent 2d843857cd
commit 55b118c055
No known key found for this signature in database
GPG Key ID: 4E66FB67B21D3300
4 changed files with 30 additions and 27 deletions

View File

@ -13,7 +13,7 @@ import ../datastore
import ./backend
import ./sql/sqliteds
export datastore
export datastore, sqliteds
push: {.upraises: [].}
@ -42,7 +42,8 @@ method delete*(self: SQLiteDatastore,
method get*(self: SQLiteDatastore,
key: Key): Future[?!seq[byte]] {.async.} =
self.db.get(KeyId.new key.id())
self.db.get(KeyId.new key.id()).map() do(d: DataBuffer) -> seq[byte]:
d.toSeq()
method put*(self: SQLiteDatastore,
key: Key,
@ -79,7 +80,7 @@ method queryIter*(
yield QueryResponse.failure err
let k = qres.key.map() do(k: KeyId) -> Key:
Key.init($k).expect("valid key")
let v: seq[byte] = qres.data
let v: seq[byte] = qres.data.toSeq()
yield success (k, v)
success iter
@ -87,20 +88,14 @@ method queryIter*(
proc new*(
T: type SQLiteDatastore,
path: string,
readOnly = false): ?!T =
readOnly = false): ?!SQLiteDatastore =
let
flags =
if readOnly: SQLITE_OPEN_READONLY
else: SQLITE_OPEN_READWRITE or SQLITE_OPEN_CREATE
success T(
db: ? SQLiteDsDb.open(path, flags),
readOnly: readOnly)
success SQLiteDatastore(
db: ? newSQLiteBackend[KeyId, DataBuffer](path, readOnly))
proc new*(
T: type SQLiteDatastore,
db: SQLiteDsDb): ?!T =
db: SQLiteBackend[KeyId, DataBuffer]): ?!T =
success T(
db: db,

View File

@ -50,6 +50,7 @@ type
signal: ThreadSignalPtr
running: bool
cancelled: bool
nextSignal: (Lock, Cond)
TaskCtx*[T] = SharedPtr[TaskCtxObj[T]]
@ -251,7 +252,6 @@ proc queryTask[DB](
ctx: TaskCtx[QResult],
ds: DB,
query: DbQuery[KeyId],
nextSignal: ThreadSignalPtr
) {.gcsafe, nimcall.} =
## run query command
executeTask(ctx):
@ -265,8 +265,11 @@ proc queryTask[DB](
# otherwise manually an set empty ok result
ctx[].res.ok (KeyId.none, DataBuffer(), )
discard ctx[].signal.fireSync()
if not nextSignal.waitSync(10.seconds).get():
raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
withLock(ctx[].nextSignal[0]):
wait(ctx[].nextSignal[1], ctx[].nextSignal[0])
# if not nextSignal.waitSync(10.seconds).get():
# raise newException(DeadThreadDefect, "query task timeout; possible deadlock!")
var handle = handleRes.get()
for item in handle.iter():
@ -282,7 +285,9 @@ proc queryTask[DB](
discard ctx[].signal.fireSync()
discard nextSignal.waitSync().get()
# discard nextSignal.waitSync().get()
withLock(ctx[].nextSignal[0]):
wait(ctx[].nextSignal[1], ctx[].nextSignal[0])
# set final result
(?!QResult).ok((KeyId.none, DataBuffer()))
@ -296,8 +301,11 @@ method query*(self: ThreadDatastore,
await self.semaphore.acquire()
without signal =? acquireSignal(), err:
return failure err
without nextSignal =? acquireSignal(), err:
return failure err
# without nextSignal =? acquireSignal(), err:
# return failure err
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
ctx[].nextSignal[0].initLock()
ctx[].nextSignal[1].initCond()
try:
let query = dbQuery(
@ -305,10 +313,10 @@ method query*(self: ThreadDatastore,
value=q.value, limit=q.limit, offset=q.offset, sort=q.sort)
# setup initial queryTask
let ctx {.inject.} = newSharedPtr(TaskCtxObj[QResult](signal: signal))
dispatchTaskWrap[DbQueryResponse[KeyId, DataBuffer]](self, signal):
self.tp.spawn queryTask(ctx, ds, query, nextSignal)
await nextSignal.fire()
self.tp.spawn queryTask(ctx, ds, query)
withLock(ctx[].nextSignal[0]):
signal(ctx[].nextSignal[1])
var
lock = newAsyncLock() # serialize querying under threads
@ -329,7 +337,8 @@ method query*(self: ThreadDatastore,
iter.finished = true
defer:
await nextSignal.fire()
withLock(ctx[].nextSignal[0]):
signal(ctx[].nextSignal[1])
if ctx[].res.isErr():
return err(ctx[].res.error())
@ -342,7 +351,7 @@ method query*(self: ThreadDatastore,
trace "Cancelling thread future!", exc = exc.msg
ctx.setCancelled()
discard ctx[].signal.close()
discard nextSignal.close()
# discard nextSignal.close()
self.semaphore.release()
raise exc
@ -351,7 +360,7 @@ method query*(self: ThreadDatastore,
except CancelledError as exc:
trace "Cancelling thread future!", exc = exc.msg
discard signal.close()
discard nextSignal.close()
# discard nextSignal.close()
self.semaphore.release()
raise exc

View File

@ -8,7 +8,7 @@ import pkg/chronos
import pkg/stew/results
import pkg/stew/byteutils
import pkg/datastore/sql/sqliteds
import pkg/datastore/sql
import ../dscommontests
import ../querycommontests

View File

@ -98,7 +98,6 @@ suite "AsyncSemaphore":
resource.inc()
check resource > 0 and resource <= 3
let sleep = rand(0..10).millis
# echo sleep
await sleepAsync(sleep)
finally:
resource.dec()