diff --git a/.gitignore b/.gitignore index 3806842..de00036 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ datastore.nims nimcache TODO nim.cfg +nimble.develop +nimble.paths diff --git a/config.nims b/config.nims index 052a576..ed3a531 100644 --- a/config.nims +++ b/config.nims @@ -10,3 +10,7 @@ when (NimMajor, NimMinor) == (1, 2): when (NimMajor, NimMinor) > (1, 2): switch("hint", "XCannotRaiseY:off") +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index 5559126..c9b0408 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -46,13 +46,10 @@ type ds: Datastore semaphore: AsyncSemaphore # semaphore is used for backpressure \ # to avoid exhausting file descriptors - case withLocks: bool - of true: - tasks: Table[Key, Future[void]] - queryLock: AsyncLock # global query lock, this is only really \ - # needed for the fsds, but it is expensive! - else: - futs: seq[Future[void]] # keep a list of the futures to the signals around + withLocks: bool + tasks: Table[Key, Future[void]] + queryLock: AsyncLock # global query lock, this is only really \ + # needed for the fsds, but it is expensive! template withLocks( self: ThreadDatastore, @@ -61,27 +58,21 @@ template withLocks( fut: Future[void], body: untyped) = try: - case self.withLocks: - of true: - if key.isSome and - key.get in self.tasks: + if key.isSome and key.get in self.tasks: + if self.withLocks: await self.tasks[key.get] - await self.queryLock.acquire() # lock query or wait to finish + self.tasks[key.get] = fut # we alway want to store the future, but only await if we're using locks - self.tasks[key.get] = fut - else: - self.futs.add(fut) + if self.withLocks: + await self.queryLock.acquire() # only lock if it's required (fsds) body finally: - case self.withLocks: - of true: - if key.isSome: + if self.withLocks: + if key.isSome and key.get in self.tasks: self.tasks.del(key.get) - if self.queryLock.locked: - self.queryLock.release() - else: - self.futs.keepItIf(it != fut) + if self.queryLock.locked: + self.queryLock.release() template dispatchTask( self: ThreadDatastore, @@ -359,13 +350,8 @@ method get*( return success(@(res.get())) method close*(self: ThreadDatastore): Future[?!void] {.async.} = - var futs = if self.withLocks: - self.tasks.values.toSeq # toSeq(...) doesn't work here??? - else: - self.futs - - for fut in futs: - await fut.cancelAndWait() + for fut in self.tasks.values.toSeq: + await fut.cancelAndWait() # probably want to store the signal, instead of the future (or both?) await self.ds.close() @@ -418,18 +404,9 @@ method query*( defer: locked = false self.semaphore.release() - case self.withLocks: - of true: - if self.queryLock.locked: - self.queryLock.release() - else: - discard trace "About to query" await self.semaphore.acquire() - if self.withLocks: - await self.queryLock.acquire() - if locked: return failure (ref DatastoreError)(msg: "Should always await query features") diff --git a/tests/datastore/testthreadproxyds.nim b/tests/datastore/testthreadproxyds.nim index 92b832c..7019e52 100644 --- a/tests/datastore/testthreadproxyds.nim +++ b/tests/datastore/testthreadproxyds.nim @@ -21,47 +21,49 @@ import pkg/datastore/threads/threadproxyds {.all.} import ./dscommontests import ./querycommontests -suite "Test Basic ThreadDatastore with SQLite": +const NumThreads = 200 # IO threads aren't attached to CPU count - var - sqlStore: Datastore - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes +# suite "Test Basic ThreadDatastore with SQLite": - setupAll: - sqlStore = SQLiteDatastore.new(Memory).tryGet() - taskPool = Taskpool.new(countProcessors() * 2) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() +# var +# sqlStore: Datastore +# ds: ThreadDatastore +# taskPool: Taskpool +# key = Key.init("/a/b").tryGet() +# bytes = "some bytes".toBytes +# otherBytes = "some other bytes".toBytes - teardownAll: - (await ds.close()).tryGet() - taskPool.shutdown() +# setupAll: +# sqlStore = SQLiteDatastore.new(Memory).tryGet() +# taskPool = Taskpool.new(NumThreads) +# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - basicStoreTests(ds, key, bytes, otherBytes) +# teardownAll: +# (await ds.close()).tryGet() +# taskPool.shutdown() -suite "Test Query ThreadDatastore with SQLite": +# basicStoreTests(ds, key, bytes, otherBytes) - var - sqlStore: Datastore - ds: ThreadDatastore - taskPool: Taskpool - key = Key.init("/a/b").tryGet() - bytes = "some bytes".toBytes - otherBytes = "some other bytes".toBytes +# suite "Test Query ThreadDatastore with SQLite": - setup: - sqlStore = SQLiteDatastore.new(Memory).tryGet() - taskPool = Taskpool.new(countProcessors() * 2) - ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() +# var +# sqlStore: Datastore +# ds: ThreadDatastore +# taskPool: Taskpool +# key = Key.init("/a/b").tryGet() +# bytes = "some bytes".toBytes +# otherBytes = "some other bytes".toBytes - teardown: - (await ds.close()).tryGet() - taskPool.shutdown() +# setup: +# sqlStore = SQLiteDatastore.new(Memory).tryGet() +# taskPool = Taskpool.new(NumThreads) +# ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() - queryTests(ds, true) +# teardown: +# (await ds.close()).tryGet() +# taskPool.shutdown() + +# queryTests(ds, true) suite "Test Basic ThreadDatastore with fsds": let @@ -83,7 +85,7 @@ suite "Test Basic ThreadDatastore with fsds": createDir(basePathAbs) fsStore = FSDatastore.new(root = basePathAbs, depth = 3).tryGet() - taskPool = Taskpool.new(countProcessors() * 2) + taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() teardownAll: @@ -112,7 +114,7 @@ suite "Test Query ThreadDatastore with fsds": createDir(basePathAbs) fsStore = FSDatastore.new(root = basePathAbs, depth = 5).tryGet() - taskPool = Taskpool.new(countProcessors() * 2) + taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(fsStore, withLocks = true, tp = taskPool).tryGet() teardown: @@ -138,7 +140,7 @@ suite "Test ThreadDatastore cancelations": setupAll: sqlStore = SQLiteDatastore.new(Memory).tryGet() - taskPool = Taskpool.new(countProcessors() * 2) + taskPool = Taskpool.new(NumThreads) ds = ThreadDatastore.new(sqlStore, tp = taskPool).tryGet() test "Should monitor signal and cancel":