diff --git a/datastore/memoryds.nim b/datastore/memoryds.nim index 90e7c6f..fa7d10f 100644 --- a/datastore/memoryds.nim +++ b/datastore/memoryds.nim @@ -1,8 +1,12 @@ import std/tables import std/sharedtables +import std/sharedlist import std/sequtils import std/strutils import std/algorithm +import std/locks + +import std/atomics import pkg/chronos import pkg/questionable @@ -20,21 +24,36 @@ push: {.upraises: [].} type MemoryDatastore* = ref object of Datastore store*: SharedTable[Key, seq[byte]] + keys*: SharedList[Key] + lock: Lock # yes, we need the lock since we need to update both the table and the list :facepalm: + +template withLock(lock: Lock, body: untyped) = + try: + lock.acquire() + body + finally: + lock.release() method has*( self: MemoryDatastore, key: Key): Future[?!bool] {.async.} = + let + keys = toSeq(self.keys) - self.store.withValue(key, value): - return success true - do: - return success(false) + for k in keys: + if k == key: + return success true + + return success false method delete*( self: MemoryDatastore, key: Key): Future[?!void] {.async.} = - self.store.del(key) + withLock(self.lock): + self.keys.iterAndMutate(proc(k: Key): bool = k == key) + self.store.del(key) + return success() method delete*( @@ -51,18 +70,25 @@ method get*( self: MemoryDatastore, key: Key): Future[?!seq[byte]] {.async.} = - self.store.withValue(key, value): - return success value[] - do: - return failure (ref DatastoreError)(msg: "no such key") + withLock(self.lock): + let + has = (await self.has(key)) + + if has.isOk and has.get: + return self.store.mget(key).catch() + + return failure (ref DatastoreError)(msg: "not found") method put*( self: MemoryDatastore, key: Key, data: seq[byte]): Future[?!void] {.async.} = - self.store[key] = data - return success() + withLock(self.lock): + if not self.store.hasKeyOrPut(key, data): + self.keys.add(key) + else: + self.store[key] = data method put*( self: MemoryDatastore, @@ -74,50 +100,50 @@ method put*( return success() -# proc keyIterator(self: MemoryDatastore, queryKey: string): iterator: KeyBuffer {.gcsafe.} = -# return iterator(): KeyBuffer {.closure.} = -# var keys = self.store.keys().toSeq() -# keys.sort(proc (x, y: KeyBuffer): int = cmp(x.toString, y.toString)) -# for key in keys: -# if key.toString().startsWith(queryKey): -# yield key +method query*( + self: MemoryDatastore, + query: Query, +): Future[?!QueryIter] {.async.} = -# method query*( -# self: MemoryDatastore, -# query: Query, -# ): Future[?!QueryIter] {.async.} = + let + queryKey = query.key.id() + keys = toSeq(self.keys) -# let -# queryKey = query.key.id() -# walker = keyIterator(self, queryKey) -# var -# iter = QueryIter.new() + var + iter = QueryIter.new() + pos = 0 -# proc next(): Future[?!QueryResponse] {.async.} = -# let kb = walker() + proc next(): Future[?!QueryResponse] {.async.} = + defer: + pos.inc -# if finished(walker): -# iter.finished = true -# return success (Key.none, EmptyBytes) + if iter.finished: + return failure (ref QueryEndedError)(msg: "Calling next on a finished query!") -# let key = kb.toKey().expect("should not fail") -# var ds: ValueBuffer -# if query.value: -# ds = self.store[kb] -# let data = if ds.isNil: EmptyBytes else: ds.toSeq(byte) + if pos > keys.len - 1: + iter.finished = true + return success (Key.none, EmptyBytes) -# return success (key.some, data) + return success ( + Key.init(keys[pos]).expect("Should not fail!").some, + if query.value: self.store.mget(keys[pos]) else: EmptyBytes) -# iter.next = next -# return success iter + iter.next = next + return success iter method close*(self: MemoryDatastore): Future[?!void] {.async.} = self.store.deinitSharedTable() + self.keys.deinitSharedList() + self.lock.deinitLock() return success() proc new*(tp: type MemoryDatastore): MemoryDatastore = var table: SharedTable[Key, seq[byte]] + keys: SharedList[Key] + lock: Lock table.init() - MemoryDatastore(store: table) + keys.init() + lock.initLock() + MemoryDatastore(store: table, keys: keys, lock: lock)