diff --git a/datastore/memoryds.nim b/datastore/memoryds.nim deleted file mode 100644 index fa7d10f..0000000 --- a/datastore/memoryds.nim +++ /dev/null @@ -1,149 +0,0 @@ -import std/tables -import std/sharedtables -import std/sharedlist -import std/sequtils -import std/strutils -import std/algorithm -import std/locks - -import std/atomics - -import pkg/chronos -import pkg/questionable -import pkg/questionable/results -import pkg/upraises - -import ./key -import ./query -import ./datastore - -export key, query - -push: {.upraises: [].} - -type - MemoryDatastore* = ref object of Datastore - store*: SharedTable[Key, seq[byte]] - keys*: SharedList[Key] - lock: Lock # yes, we need the lock since we need to update both the table and the list :facepalm: - -template withLock(lock: Lock, body: untyped) = - try: - lock.acquire() - body - finally: - lock.release() - -method has*( - self: MemoryDatastore, - key: Key): Future[?!bool] {.async.} = - let - keys = toSeq(self.keys) - - for k in keys: - if k == key: - return success true - - return success false - -method delete*( - self: MemoryDatastore, - key: Key): Future[?!void] {.async.} = - - withLock(self.lock): - self.keys.iterAndMutate(proc(k: Key): bool = k == key) - self.store.del(key) - - return success() - -method delete*( - self: MemoryDatastore, - keys: seq[Key]): Future[?!void] {.async.} = - - for key in keys: - if err =? (await self.delete(key)).errorOption: - return failure err - - return success() - -method get*( - self: MemoryDatastore, - key: Key): Future[?!seq[byte]] {.async.} = - - withLock(self.lock): - let - has = (await self.has(key)) - - if has.isOk and has.get: - return self.store.mget(key).catch() - - return failure (ref DatastoreError)(msg: "not found") - -method put*( - self: MemoryDatastore, - key: Key, - data: seq[byte]): Future[?!void] {.async.} = - - withLock(self.lock): - if not self.store.hasKeyOrPut(key, data): - self.keys.add(key) - else: - self.store[key] = data - -method put*( - self: MemoryDatastore, - batch: seq[BatchEntry]): Future[?!void] {.async.} = - - for entry in batch: - if err =? (await self.put(entry.key, entry.data)).errorOption: - return failure err - - return success() - -method query*( - self: MemoryDatastore, - query: Query, -): Future[?!QueryIter] {.async.} = - - let - queryKey = query.key.id() - keys = toSeq(self.keys) - - var - iter = QueryIter.new() - pos = 0 - - proc next(): Future[?!QueryResponse] {.async.} = - defer: - pos.inc - - if iter.finished: - return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") - - if pos > keys.len - 1: - iter.finished = true - return success (Key.none, EmptyBytes) - - return success ( - Key.init(keys[pos]).expect("Should not fail!").some, - if query.value: self.store.mget(keys[pos]) else: EmptyBytes) - - iter.next = next - return success iter - -method close*(self: MemoryDatastore): Future[?!void] {.async.} = - self.store.deinitSharedTable() - self.keys.deinitSharedList() - self.lock.deinitLock() - return success() - -proc new*(tp: type MemoryDatastore): MemoryDatastore = - var - table: SharedTable[Key, seq[byte]] - keys: SharedList[Key] - lock: Lock - - table.init() - keys.init() - lock.initLock() - MemoryDatastore(store: table, keys: keys, lock: lock) diff --git a/datastore/threads/threadproxyds.nim b/datastore/threads/threadproxyds.nim index edf027c..f497515 100644 --- a/datastore/threads/threadproxyds.nim +++ b/datastore/threads/threadproxyds.nim @@ -37,7 +37,8 @@ type ThreadDatastore* = ref object of Datastore tp: Taskpool ds: Datastore - semaphore: Semaphore # semaphore is used for backpressure to avoid exhausting file descriptors + semaphore: Semaphore # semaphore is used for backpressure \ + # to avoid exhausting file descriptors tasks: seq[Future[void]] template dispatchTask( @@ -278,7 +279,7 @@ method query*( return failure error var - iter = QueryIter.init() + iter = QueryIter.new() let lock = newAsyncLock() proc next(): Future[?!QueryResponse] {.async.} =