simplify locking

This commit is contained in:
Dmitriy Ryajov 2023-09-18 13:18:51 -06:00
parent 52d6a857ac
commit 7306a0bfd4
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
4 changed files with 58 additions and 73 deletions

2
.gitignore vendored
View File

@ -10,3 +10,5 @@ datastore.nims
nimcache
TODO
nim.cfg
nimble.develop
nimble.paths

View File

@ -10,3 +10,7 @@ when (NimMajor, NimMinor) == (1, 2):
when (NimMajor, NimMinor) > (1, 2):
switch("hint", "XCannotRaiseY:off")
# begin Nimble config (version 2)
when withDir(thisDir(), system.fileExists("nimble.paths")):
include "nimble.paths"
# end Nimble config

View File

@ -46,13 +46,10 @@ type
ds: Datastore
semaphore: AsyncSemaphore # semaphore is used for backpressure \
# to avoid exhausting file descriptors
case withLocks: bool
of true:
tasks: Table[Key, Future[void]]
queryLock: AsyncLock # global query lock, this is only really \
# needed for the fsds, but it is expensive!
else:
futs: seq[Future[void]] # keep a list of the futures to the signals around
withLocks: bool
tasks: Table[Key, Future[void]]
queryLock: AsyncLock # global query lock, this is only really \
# needed for the fsds, but it is expensive!
template withLocks(
self: ThreadDatastore,
@ -61,27 +58,21 @@ template withLocks(
fut: Future[void],
body: untyped) =
try:
case self.withLocks:
of true:
if key.isSome and
key.get in self.tasks:
if key.isSome and key.get in self.tasks:
if self.withLocks:
await self.tasks[key.get]
await self.queryLock.acquire() # lock query or wait to finish
self.tasks[key.get] = fut # we alway want to store the future, but only await if we're using locks
self.tasks[key.get] = fut
else:
self.futs.add(fut)
if self.withLocks:
await self.queryLock.acquire() # only lock if it's required (fsds)
body
finally:
case self.withLocks:
of true:
if key.isSome:
if self.withLocks:
if key.isSome and key.get in self.tasks:
self.tasks.del(key.get)
if self.queryLock.locked:
self.queryLock.release()
else:
self.futs.keepItIf(it != fut)
if self.queryLock.locked:
self.queryLock.release()
template dispatchTask(
self: ThreadDatastore,
@ -359,13 +350,8 @@ method get*(
return success(@(res.get()))
method close*(self: ThreadDatastore): Future[?!void] {.async.} =
var futs = if self.withLocks:
self.tasks.values.toSeq # toSeq(...) doesn't work here???
else:
self.futs
for fut in futs:
await fut.cancelAndWait()
for fut in self.tasks.values.toSeq:
await fut.cancelAndWait() # probably want to store the signal, instead of the future (or both?)
await self.ds.close()
@ -418,18 +404,9 @@ method query*(
defer:
locked = false
self.semaphore.release()
case self.withLocks:
of true:
if self.queryLock.locked:
self.queryLock.release()
else:
discard
trace "About to query"
await self.semaphore.acquire()
if self.withLocks:
await self.queryLock.acquire()
if locked:
return failure (ref DatastoreError)(msg: "Should always await query features")

View File

@ -21,47 +21,49 @@ import pkg/datastore/threads/threadproxyds {.all.}
import ./dscommontests
import ./querycommontests
suite "Test Basic ThreadDatastore with SQLite":
const NumThreads = 200 # IO threads aren't attached to CPU count
var
sqlStore: Datastore
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
# suite "Test Basic ThreadDatastore with SQLite":
setupAll:
sqlStore = SQLiteDatastore.new(Memory).tryGet()
taskPool = Taskpool.new(countProcessors() * 2)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
# var
# sqlStore: Datastore
# ds: ThreadDatastore
# taskPool: Taskpool
# key = Key.init("/a/b").tryGet()
# bytes = "some bytes".toBytes
# otherBytes = "some other bytes".toBytes
teardownAll:
(await ds.close()).tryGet()
taskPool.shutdown()
# setupAll:
# sqlStore = SQLiteDatastore.new(Memory).tryGet()
# taskPool = Taskpool.new(NumThreads)
# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
basicStoreTests(ds, key, bytes, otherBytes)
# teardownAll:
# (await ds.close()).tryGet()
# taskPool.shutdown()
suite "Test Query ThreadDatastore with SQLite":
# basicStoreTests(ds, key, bytes, otherBytes)
var
sqlStore: Datastore
ds: ThreadDatastore
taskPool: Taskpool
key = Key.init("/a/b").tryGet()
bytes = "some bytes".toBytes
otherBytes = "some other bytes".toBytes
# suite "Test Query ThreadDatastore with SQLite":
setup:
sqlStore = SQLiteDatastore.new(Memory).tryGet()
taskPool = Taskpool.new(countProcessors() * 2)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
# var
# sqlStore: Datastore
# ds: ThreadDatastore
# taskPool: Taskpool
# key = Key.init("/a/b").tryGet()
# bytes = "some bytes".toBytes
# otherBytes = "some other bytes".toBytes
teardown:
(await ds.close()).tryGet()
taskPool.shutdown()
# setup:
# sqlStore = SQLiteDatastore.new(Memory).tryGet()
# taskPool = Taskpool.new(NumThreads)
# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
queryTests(ds, true)
# teardown:
# (await ds.close()).tryGet()
# taskPool.shutdown()
# queryTests(ds, true)
suite "Test Basic ThreadDatastore with fsds":
let
@ -83,7 +85,7 @@ suite "Test Basic ThreadDatastore with fsds":
createDir(basePathAbs)
fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet()
taskPool = Taskpool.new(countProcessors() * 2)
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
teardownAll:
@ -112,7 +114,7 @@ suite "Test Query ThreadDatastore with fsds":
createDir(basePathAbs)
fsStore = FSDatastore.new(root = basePathAbs, depth = 5).tryGet()
taskPool = Taskpool.new(countProcessors() * 2)
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet()
teardown:
@ -138,7 +140,7 @@ suite "Test ThreadDatastore cancelations":
setupAll:
sqlStore = SQLiteDatastore.new(Memory).tryGet()
taskPool = Taskpool.new(countProcessors() * 2)
taskPool = Taskpool.new(NumThreads)
ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet()
test "Should monitor signal and cancel":