diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index aef9a4bc..277f451b 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -13,6 +13,7 @@ import pkg/chronos import pkg/chronicles import pkg/libp2p import pkg/metrics +import pkg/questionable import pkg/questionable/results import ../protobuf/presence @@ -37,7 +38,7 @@ const DefaultDiscoveryTimeout = 1.minutes DefaultMinPeersPerBlock = 3 DefaultDiscoveryLoopSleep = 3.seconds - DefaultAdvertiseLoopSleep = 3.seconds + DefaultAdvertiseLoopSleep = 30.minutes type DiscoveryEngine* = ref object of RootObj @@ -60,6 +61,7 @@ type advertiseLoopSleep: Duration # Advertise loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests + advertiseType*: BlockType # Advertice blocks, manifests or both proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: @@ -77,19 +79,12 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = await sleepAsync(b.discoveryLoopSleep) proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = - proc onBlock(cid: Cid) {.async.} = - try: - trace "Listed block", cid - await b.advertiseQueue.put(cid) - await sleepAsync(50.millis) # TODO: temp workaround because we're announcing all CIDs - except CancelledError as exc: - trace "Cancelling block listing" - raise exc - except CatchableError as exc: - trace "Exception listing blocks", exc = exc.msg - while b.discEngineRunning: - discard await b.localStore.listBlocks(onBlock) + if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): + for c in cids: + if cid =? await c: + await b.advertiseQueue.put(cid) + await sleepAsync(50.millis) trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep await sleepAsync(b.advertiseLoopSleep) @@ -257,7 +252,8 @@ proc new*( concurrentDiscReqs = DefaultConcurrentDiscRequests, discoveryLoopSleep = DefaultDiscoveryLoopSleep, advertiseLoopSleep = DefaultAdvertiseLoopSleep, - minPeersPerBlock = DefaultMinPeersPerBlock,): DiscoveryEngine = + minPeersPerBlock = DefaultMinPeersPerBlock, + advertiseType = BlockType.Both): DiscoveryEngine = T( localStore: localStore, peers: peers, @@ -272,4 +268,5 @@ proc new*( inFlightAdvReqs: initTable[Cid, Future[void]](), discoveryLoopSleep: discoveryLoopSleep, advertiseLoopSleep: advertiseLoopSleep, - minPeersPerBlock: minPeersPerBlock) + minPeersPerBlock: minPeersPerBlock, + advertiseType: advertiseType) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index dd24b1d5..20038217 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -13,7 +13,6 @@ import pkg/upraises push: {.upraises: [].} -import pkg/questionable import pkg/chronicles import pkg/chronos import pkg/libp2p diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 0911c22a..eb82bffb 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -35,8 +35,6 @@ type cid*: Cid data*: seq[byte] - BlockNotFoundError* = object of CodexError - template EmptyCid*: untyped = var emptyCid {.global, threadvar.}: diff --git a/codex/codex.nim b/codex/codex.nim index a6273785..c6831e85 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -10,6 +10,7 @@ import std/sequtils import std/os import std/sugar +import std/tables import pkg/chronicles import pkg/chronos @@ -20,6 +21,7 @@ import pkg/confutils/defs import pkg/nitro import pkg/stew/io2 import pkg/stew/shims/net as stewnet +import pkg/datastore import ./node import ./conf @@ -31,8 +33,8 @@ import ./utils/fileutils import ./erasure import ./discovery import ./contracts -import ./utils/keyutils import ./utils/addrutils +import ./namespaces logScope: topics = "codex node" @@ -43,15 +45,19 @@ type config: CodexConf restServer: RestServerRef codexNode: CodexNodeRef + repoStore: RepoStore CodexPrivateKey* = libp2p.PrivateKey # alias proc start*(s: CodexServer) {.async.} = + notice "Starting codex node" + + await s.repoStore.start() s.restServer.start() await s.codexNode.start() let - # TODO: Can't define this as constants, pity + # TODO: Can't define these as constants, pity natIpPart = MultiAddress.init("/ip4/" & $s.config.nat & "/") .expect("Should create multiaddress") anyAddrIp = MultiAddress.init("/ip4/0.0.0.0/") @@ -79,8 +85,12 @@ proc start*(s: CodexServer) {.async.} = await s.runHandle proc stop*(s: CodexServer) {.async.} = + notice "Stopping codex node" + await allFuturesThrowing( - s.restServer.stop(), s.codexNode.stop()) + s.restServer.stop(), + s.codexNode.stop(), + s.repoStore.start()) s.runHandle.complete() @@ -122,9 +132,17 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): cache = CacheStore.new(cacheSize = config.cacheSize * MiB) let - discoveryStore = Datastore(SQLiteDatastore.new( - config.dataDir / "dht") - .expect("Should not fail!")) + discoveryDir = config.dataDir / CodexDhtNamespace + + if io2.createPath(discoveryDir).isErr: + trace "Unable to create discovery directory for block store", discoveryDir = discoveryDir + raise (ref Defect)( + msg: "Unable to create discovery directory for block store: " & discoveryDir) + + let + discoveryStore = Datastore( + SQLiteDatastore.new(config.dataDir / CodexDhtProvidersNamespace) + .expect("Should create discovery datastore!")) discovery = Discovery.new( switch.peerInfo.privateKey, @@ -136,20 +154,20 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) - repoDir = config.dataDir / "repo" - if io2.createPath(repoDir).isErr: - trace "Unable to create data directory for block store", dataDir = repoDir - raise (ref Defect)( - msg: "Unable to create data directory for block store: " & repoDir) + repoStore = RepoStore.new( + repoDs = Datastore(FSDatastore.new($config.dataDir, depth = 5) + .expect("Should create repo data store!")), + metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) + .expect("Should create meta data store!"), + quotaMaxBytes = config.storageQuota.uint, + blockTtl = config.blockTtl.seconds) - let - localStore = FSStore.new(repoDir, cache = cache) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() - blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) - engine = BlockExcEngine.new(localStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) - store = NetworkStore.new(engine, localStore) + blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) + engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) + store = NetworkStore.new(engine, repoStore) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) contracts = ContractInteractions.new(config) codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery, contracts) @@ -164,4 +182,5 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): T( config: config, codexNode: codexNode, - restServer: restServer) + restServer: restServer, + repoStore: repoStore) diff --git a/codex/conf.nim b/codex/conf.nim index 0d16d237..7a7b8eab 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -29,9 +29,9 @@ import pkg/libp2p import pkg/ethers import ./discovery -import ./stores/cachestore +import ./stores -export DefaultCacheSizeMiB, net +export DefaultCacheSizeMiB, net, DefaultQuotaBytes, DefaultBlockTtl type StartUpCommand* {.pure.} = enum @@ -95,8 +95,8 @@ type abbr: "i" name: "listen-addrs" }: seq[MultiAddress] + # TODO: change this once we integrate nat support nat* {. - # TODO: change this once we integrate nat support desc: "IP Addresses to announce behind a NAT" defaultValue: ValidIpAddress.init("127.0.0.1") defaultValueDesc: "127.0.0.1" @@ -144,6 +144,20 @@ type name: "api-port" abbr: "p" }: int + storageQuota* {. + desc: "The size of the total storage quota dedicated to the node" + defaultValue: DefaultQuotaBytes + defaultValueDesc: $DefaultQuotaBytes + name: "storage-quota" + abbr: "q" }: Natural + + blockTtl* {. + desc: "Default block timeout in seconds - 0 disables the ttl" + defaultValue: DefaultBlockTtl.secs + defaultValueDesc: $DefaultBlockTtl + name: "block-ttl" + abbr: "t" }: Natural + cacheSize* {. desc: "The size in MiB of the block cache, 0 disables the cache - might help on slow hardrives" defaultValue: 0 diff --git a/codex/erasure/backend.nim b/codex/erasure/backend.nim index 9feac36b..d3c95211 100644 --- a/codex/erasure/backend.nim +++ b/codex/erasure/backend.nim @@ -11,7 +11,6 @@ import pkg/upraises push: {.upraises: [].} -import ../manifest import ../stores type diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index ccc793d5..c514ee14 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -18,7 +18,6 @@ import pkg/chronicles import ../manifest import ../stores -import ../errors import ../blocktype as bt import ./backend diff --git a/codex/manifest/coders.nim b/codex/manifest/coders.nim index f8e0a74b..fbf2eba6 100644 --- a/codex/manifest/coders.nim +++ b/codex/manifest/coders.nim @@ -187,7 +187,12 @@ func decode*( decoder.decode(data) func decode*(_: type Manifest, blk: Block): ?!Manifest = - without contentType =? blk.cid.contentType() and - containerType =? ManifestContainers.?[$contentType]: - return failure "CID has invalid content type for manifest" - Manifest.decode(blk.data, containerType) + ## Decode a manifest using `decoder` + ## + + if not ? blk.cid.isManifest: + return failure "Cid not a manifest codec" + + Manifest.decode( + blk.data, + ? ManifestContainers[$(?blk.cid.contentType().mapFailure)].catch) diff --git a/codex/manifest/manifest.nim b/codex/manifest/manifest.nim index 8be8d4bb..87da19f0 100644 --- a/codex/manifest/manifest.nim +++ b/codex/manifest/manifest.nim @@ -23,7 +23,6 @@ import ../errors import ../utils import ../blocktype import ./types -import ./coders ############################################################ # Operations on block list @@ -46,6 +45,12 @@ func `[]=`*(self: Manifest, i: BackwardsIndex, item: Cid) = self.rootHash = Cid.none self.blocks[self.len - i.int] = item +func isManifest*(cid: Cid): ?!bool = + ($(?cid.contentType().mapFailure) in ManifestContainers).success + +func isManifest*(mc: MultiCodec): ?!bool = + ($mc in ManifestContainers).success + proc add*(self: Manifest, cid: Cid) = assert not self.protected # we expect that protected manifests are created with properly-sized self.blocks self.rootHash = Cid.none diff --git a/codex/manifest/types.nim b/codex/manifest/types.nim index 11eba72c..e747c9cf 100644 --- a/codex/manifest/types.nim +++ b/codex/manifest/types.nim @@ -14,6 +14,7 @@ import pkg/libp2p import pkg/questionable const + BlockCodec* = multiCodec("raw") DagPBCodec* = multiCodec("dag-pb") type diff --git a/codex/namespaces.nim b/codex/namespaces.nim index 6d675a1b..93d7902c 100644 --- a/codex/namespaces.nim +++ b/codex/namespaces.nim @@ -7,13 +7,15 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/os - const - CodexRepoNamespace* = "/repo" # repository namespace, blocks and manifests are subkeys - CodexBlocksNamespace* = CodexRepoNamespace / "blocks" # blocks namespace - CodexManifestNamespace* = CodexRepoNamespace / "manifests" # manifest namespace - CodexBlocksPersistNamespace* = # Cid's of persisted blocks goes here - CodexMetaNamespace / "blocks" / "persist" + # Namespaces + CodexMetaNamespace* = "meta" # meta info stored here + CodexRepoNamespace* = "repo" # repository namespace, blocks and manifests are subkeys + CodexBlocksNamespace* = CodexRepoNamespace & "/blocks" # blocks namespace + CodexManifestNamespace* = CodexRepoNamespace & "/manifests" # manifest namespace CodexBlocksTtlNamespace* = # Cid TTL - CodexMetaNamespace / "blocks" / "ttl" + CodexMetaNamespace & "/ttl" + CodexDhtNamespace* = "dht" # Dht namespace + CodexDhtProvidersNamespace* = # Dht providers namespace + CodexDhtNamespace & "/providers" + CodexQuotaNamespace* = CodexMetaNamespace & "/quota" # quota's namespace diff --git a/codex/node.nim b/codex/node.nim index 177489a9..b9c49090 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -69,18 +69,20 @@ proc fetchManifest*( ## Fetch and decode a manifest block ## - without contentType =? cid.contentType() and - containerType =? ManifestContainers.?[$contentType]: - return failure "CID has invalid content type for manifest" + if err =? cid.isManifest.errorOption: + return failure "CID has invalid content type for manifest {$cid}" - trace "Received retrieval request", cid + trace "Received manifest retrieval request", cid - without blk =? await node.blockStore.getBlock(cid), error: - return failure error + without blk =? await node.blockStore.getBlock(cid), err: + trace "Error retriving manifest block", cid, err = err.msg + return failure err - without manifest =? Manifest.decode(blk): - return failure( - newException(CodexError, "Unable to decode as manifest")) + without manifest =? Manifest.decode(blk), err: + trace "Unable to decode as manifest", err = err.msg + return failure("Unable to decode as manifest") + + trace "Decoded manifest", cid return manifest.success @@ -120,6 +122,7 @@ proc retrieve*( ## if manifest =? (await node.fetchManifest(cid)): + trace "Retrieving blocks from manifest", cid if manifest.protected: # Retrieve, decode and save to the local store all EС groups proc erasureJob(): Future[void] {.async.} = @@ -142,6 +145,7 @@ proc retrieve*( # asyncSpawn prefetchBlocks() - temporarily commented out # # Retrieve all blocks of the dataset sequentially from the local store or network + trace "Creating store stream for manifest", cid return LPStream(StoreStream.new(node.blockStore, manifest, pad = false)).success let @@ -189,8 +193,8 @@ proc store*( return failure("Unable to init block from chunk!") blockManifest.add(blk.cid) - if isErr (await self.blockStore.putBlock(blk)): - # trace "Unable to store block", cid = blk.cid + if err =? (await self.blockStore.putBlock(blk)).errorOption: + trace "Unable to store block", cid = blk.cid, err = err.msg return failure(&"Unable to store block {blk.cid}") except CancelledError as exc: diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 0fd87b43..485d797f 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -13,7 +13,6 @@ push: {.upraises: [].} import std/sequtils -import std/sugar import pkg/questionable import pkg/questionable/results diff --git a/codex/storageproofs/stpnetwork.nim b/codex/storageproofs/stpnetwork.nim index 99abd990..e1a117a5 100644 --- a/codex/storageproofs/stpnetwork.nim +++ b/codex/storageproofs/stpnetwork.nim @@ -12,7 +12,6 @@ import std/sequtils import pkg/chronos import pkg/libp2p import pkg/chronicles -import pkg/questionable import pkg/questionable/results import pkg/contractabi/address as ca diff --git a/codex/storageproofs/stpproto/messages.nim b/codex/storageproofs/stpproto/messages.nim index d5294bbb..d79b1cc6 100644 --- a/codex/storageproofs/stpproto/messages.nim +++ b/codex/storageproofs/stpproto/messages.nim @@ -10,8 +10,6 @@ import pkg/questionable/results import pkg/libp2p/protobuf/minprotobuf -import ../../errors - type Tag* = object idx*: int64 diff --git a/codex/stores.nim b/codex/stores.nim index 5b18f1f2..1386a990 100644 --- a/codex/stores.nim +++ b/codex/stores.nim @@ -1,6 +1,6 @@ import ./stores/cachestore import ./stores/blockstore import ./stores/networkstore -import ./stores/fsstore +import ./stores/repostore -export cachestore, blockstore, networkstore, fsstore +export cachestore, blockstore, networkstore, repostore diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 54f796f1..3e06ef9f 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -13,23 +13,44 @@ push: {.upraises: [].} import pkg/chronos import pkg/libp2p +import pkg/questionable import pkg/questionable/results import ../blocktype export blocktype, libp2p +const + DefaultBlockTtl = 24.hours + type - OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.} + BlockNotFoundError* = object of CodexError + + BlockType* {.pure.} = enum + Manifest, Block, Both + + GetNext* = proc(): Future[?Cid] {.upraises: [], gcsafe, closure.} + + BlocksIter* = ref object + finished*: bool + next*: GetNext + BlockStore* = ref object of RootObj +iterator items*(self: BlocksIter): Future[?Cid] = + while not self.finished: + yield self.next() + method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = ## Get a block from the blockstore ## raiseAssert("Not implemented!") -method putBlock*(self: BlockStore, blk: Block): Future[?!void] {.base.} = +method putBlock*( + self: BlockStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.base.} = ## Put a block to the blockstore ## @@ -47,7 +68,9 @@ method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base.} = raiseAssert("Not implemented!") -method listBlocks*(self: BlockStore, onBlock: OnBlock): Future[?!void] {.base.} = +method listBlocks*( + self: BlockStore, + blockType = BlockType.Manifest): Future[?!BlocksIter] {.base.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index aa8eeeb7..36e34ec4 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -24,6 +24,7 @@ import pkg/questionable/results import ./blockstore import ../chunker import ../errors +import ../manifest export blockstore @@ -73,14 +74,62 @@ method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = return (cid in self.cache).success -method listBlocks*(s: CacheStore, onBlock: OnBlock): Future[?!void] {.async.} = +func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) = + return iterator(): Cid = + for cid in self.cache.keys: + yield cid + +method listBlocks*( + self: CacheStore, + blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## - for cid in toSeq(s.cache.keys): - await onBlock(cid) + var + iter = BlocksIter() - return success() + let + cids = self.cids() + + proc next(): Future[?Cid] {.async.} = + await idleAsync() + + var cid: Cid + while true: + if iter.finished: + return Cid.none + + cid = cids() + + if finished(cids): + iter.finished = true + return Cid.none + + without isManifest =? cid.isManifest, err: + trace "Error checking if cid is a manifest", err = err.msg + return Cid.none + + case blockType: + of BlockType.Manifest: + if not isManifest: + trace "Cid is not manifest, skipping", cid + continue + + break + of BlockType.Block: + if isManifest: + trace "Cid is a manifest, skipping", cid + continue + + break + of BlockType.Both: + break + + return cid.some + + iter.next = next + + return success iter func putBlockSync(self: CacheStore, blk: Block): bool = @@ -104,7 +153,10 @@ func putBlockSync(self: CacheStore, blk: Block): bool = self.currentSize += blkSize return true -method putBlock*(self: CacheStore, blk: Block): Future[?!void] {.async.} = +method putBlock*( + self: CacheStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.async.} = ## Put a block to the blockstore ## diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim deleted file mode 100644 index 10af63bd..00000000 --- a/codex/stores/fsstore.nim +++ /dev/null @@ -1,207 +0,0 @@ -## Nim-Codex -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import pkg/upraises - -push: {.upraises: [].} - -import std/os - -import pkg/chronos -import pkg/chronicles -import pkg/libp2p -import pkg/questionable -import pkg/questionable/results -import pkg/stew/io2 - -import ./cachestore -import ./blockstore - -export blockstore - -logScope: - topics = "codex fsstore" - -type - FSStore* = ref object of BlockStore - cache: BlockStore - repoDir: string - postfixLen*: int - -template blockPath*(self: FSStore, cid: Cid): string = - self.repoDir / ($cid)[^self.postfixLen..^1] / $cid - -method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} = - ## Get a block from the cache or filestore. - ## Save a copy to the cache if present in the filestore but not in the cache - ## - - if not self.cache.isNil: - trace "Getting block from cache or filestore", cid - else: - trace "Getting block from filestore", cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success cid.emptyBlock - - if not self.cache.isNil: - let - cachedBlockRes = await self.cache.getBlock(cid) - - if not cachedBlockRes.isErr: - return success cachedBlockRes.get - else: - trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg - - # Read file contents - var - data: seq[byte] - - let - path = self.blockPath(cid) - res = io2.readFile(path, data) - - if res.isErr: - if not isFile(path): # May be, check instead that "res.error == ERROR_FILE_NOT_FOUND" ? - return failure (ref BlockNotFoundError)(msg: "Block not in filestore") - else: - let - error = io2.ioErrorMsg(res.error) - - trace "Error requesting block from filestore", path, error - return failure "Error requesting block from filestore: " & error - - without blk =? Block.new(cid, data), error: - trace "Unable to construct block from data", cid, error = error.msg - return failure error - - if not self.cache.isNil: - let - putCachedRes = await self.cache.putBlock(blk) - - if putCachedRes.isErr: - trace "Unable to store block in cache", cid, error = putCachedRes.error.msg - - return success blk - -method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} = - ## Write a block's contents to a file with name based on blk.cid. - ## Save a copy to the cache - ## - - if not self.cache.isNil: - trace "Putting block into filestore and cache", cid = blk.cid - else: - trace "Putting block into filestore", cid = blk.cid - - if blk.isEmpty: - trace "Empty block, ignoring" - return success() - - let path = self.blockPath(blk.cid) - if isFile(path): - return success() - - # If directory exists createPath wont fail - let dir = path.parentDir - if io2.createPath(dir).isErr: - trace "Unable to create block prefix dir", dir - return failure("Unable to create block prefix dir") - - let res = io2.writeFile(path, blk.data) - if res.isErr: - let error = io2.ioErrorMsg(res.error) - trace "Unable to store block", path, cid = blk.cid, error - return failure("Unable to store block") - - if not self.cache.isNil: - let - putCachedRes = await self.cache.putBlock(blk) - - if putCachedRes.isErr: - trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg - - return success() - -method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the cache and filestore - ## - - if not self.cache.isNil: - trace "Deleting block from cache and filestore", cid - else: - trace "Deleting block from filestore", cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success() - - if not self.cache.isNil: - let - delCachedRes = await self.cache.delBlock(cid) - - if delCachedRes.isErr: - trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg - - let - path = self.blockPath(cid) - res = io2.removeFile(path) - - if res.isErr: - let error = io2.ioErrorMsg(res.error) - trace "Unable to delete block", path, cid, error - return error.failure - - return success() - -method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if a block exists in the filestore - ## - - trace "Checking filestore for block existence", cid - if cid.isEmpty: - trace "Empty block, ignoring" - return true.success - - return self.blockPath(cid).isFile().success - -method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} = - ## Process list of all blocks in the filestore via callback. - ## This is an intensive operation - ## - - trace "Listing all blocks in filestore" - for (pkind, folderPath) in self.repoDir.walkDir(): - if pkind != pcDir: continue - if len(folderPath.basename) != self.postfixLen: continue - - for (fkind, filename) in folderPath.walkDir(relative = true): - if fkind != pcFile: continue - let cid = Cid.init(filename) - if cid.isOk: - await onBlock(cid.get()) - - return success() - -method close*(self: FSStore): Future[void] {.async.} = - ## Close the underlying cache - ## - - if not self.cache.isNil: await self.cache.close - -proc new*( - T: type FSStore, - repoDir: string, - postfixLen = 2, - cache: BlockStore = nil): T = - T( - postfixLen: postfixLen, - repoDir: repoDir, - cache: cache) diff --git a/codex/stores/localstore.nim b/codex/stores/localstore.nim deleted file mode 100644 index 900825cd..00000000 --- a/codex/stores/localstore.nim +++ /dev/null @@ -1,118 +0,0 @@ -## Nim-Codex -## Copyright (c) 2022 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import std/os - -import pkg/upraises - -push: {.upraises: [].} - -import pkg/chronos -import pkg/libp2p -import pkg/questionable -import pkg/questionable/results -import pkg/datastore - -import ./blockstore -import ../blocktype -import ../namespaces -import ../manifest - -export blocktype, libp2p - -const - CacheBytesKey* = CodexMetaNamespace / "bytes" / "cache" - CachePersistentKey* = CodexMetaNamespace / "bytes" / "persistent" - -type - LocalStore* = ref object of BlockStore - ds*: Datastore - blocksRepo*: BlockStore # TODO: Should be a Datastore - manifestRepo*: BlockStore # TODO: Should be a Datastore - cacheBytes*: uint - persistBytes*: uint - -method getBlock*(self: LocalStore, cid: Cid): Future[?!Block] = - ## Get a block from the blockstore - ## - - if cid.isManifest: - self.manifestRepo.getBlock(cid) - else: - self.blocksRepo.getBlock(cid) - -method putBlock*(self: LocalStore, blk: Block): Future[?!void] = - ## Put a block to the blockstore - ## - - if blk.cid.isManifest: - self.manifestRepo.putBlock(blk) - else: - self.blocksRepo.putBlock(blk) - -method delBlock*(self: LocalStore, cid: Cid): Future[?!void] = - ## Delete a block from the blockstore - ## - - if cid.isManifest: - self.manifestRepo.delBlock(cid) - else: - self.blocksRepo.delBlock(cid) - -method hasBlock*(self: LocalStore, cid: Cid): Future[?!bool] = - ## Check if the block exists in the blockstore - ## - - if cid.isManifest: - self.manifestRepo.hasBlock(cid) - else: - self.blocksRepo.hasBlock(cid) - -method listBlocks*( - self: LocalStore, - blkType: MultiCodec, - batch = 100, - onBlock: OnBlock): Future[?!void] = - ## Get the list of blocks in the LocalStore. - ## This is an intensive operation - ## - - if $blkType in ManifestContainers: - self.manifestRepo.listBlocks(blkType, batch, onBlock) - else: - self.blocksRepo.listBlocks(onBlock) - -method close*(self: LocalStore) {.async.} = - ## Close the blockstore, cleaning up resources managed by it. - ## For some implementations this may be a no-op - ## - - await self.manifestRepo.close() - await self.blocksRepo.close() - -proc contains*(self: LocalStore, blk: Cid): Future[bool] {.async.} = - ## Check if the block exists in the blockstore. - ## Return false if error encountered - ## - - return (await self.hasBlock(blk)) |? false - -func new*( - T: type LocalStore, - datastore: Datastore, - blocksRepo: BlockStore, - manifestRepo: BlockStore, - cacheBytes: uint, - persistBytes: uint): T = - T( - datastore: datastore, - blocksRepo: blocksRepo, - manifestRepo: manifestRepo, - cacheBytes: cacheBytes, - persistBytes: persistBytes) diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 5c96a5f6..a6e9a948 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -46,13 +46,16 @@ method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} = return success blk -method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} = +method putBlock*( + self: NetworkStore, + blk: bt.Block, + ttl = Duration.none): Future[?!void] {.async.} = ## Store block locally and notify the network ## trace "Puting block into network store", cid = blk.cid - let res = await self.localStore.putBlock(blk) + let res = await self.localStore.putBlock(blk, ttl) if res.isErr: return res @@ -79,15 +82,17 @@ method close*(self: NetworkStore): Future[void] {.async.} = ## Close the underlying local blockstore ## - if not self.localStore.isNil: await self.localStore.close + if not self.localStore.isNil: + await self.localStore.close proc new*( T: type NetworkStore, engine: BlockExcEngine, localStore: BlockStore): T = - let b = NetworkStore( - localStore: localStore, - engine: engine) + let + self = NetworkStore( + localStore: localStore, + engine: engine) - return b + return self diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim new file mode 100644 index 00000000..e7d76fb5 --- /dev/null +++ b/codex/stores/repostore.nim @@ -0,0 +1,357 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/upraises + +push: {.upraises: [].} + +import pkg/chronos +import pkg/chronicles +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results +import pkg/datastore +import pkg/stew/endians2 + +import ./blockstore +import ../blocktype +import ../namespaces +import ../manifest + +export blocktype, libp2p + +logScope: + topics = "codex repostore" + +const + CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet + CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet + CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet + CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet + + QuotaKey* = Key.init(CodexQuotaNamespace).tryGet + QuotaUsedKey* = (QuotaKey / "used").tryGet + QuotaReservedKey* = (QuotaKey / "reserved").tryGet + + BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet + + DefaultBlockTtl* = 24.hours + DefaultQuotaBytes* = 1'u shl 33'u # ~8GB + + ZeroMoment = Moment.init(0, Nanosecond) # used for converting between Duration and Moment + +type + QuotaUsedError* = object of CodexError + QuotaNotEnoughError* = object of CodexError + + RepoStore* = ref object of BlockStore + postFixLen*: int + repoDs*: Datastore + metaDs*: Datastore + quotaMaxBytes*: uint + quotaUsedBytes*: uint + quotaReservedBytes*: uint + blockTtl*: Duration + started*: bool + +func makePrefixKey*(self: RepoStore, cid: Cid): ?!Key = + let + cidKey = ? Key.init(($cid)[^self.postFixLen..^1] & "/" & $cid) + + if ? cid.isManifest: + success CodexManifestKey / cidKey + else: + success CodexBlocksKey / cidKey + +func makeExpiresKey(expires: Duration, cid: Cid): ?!Key = + BlocksTtlKey / $cid / $expires.seconds + +func totalUsed*(self: RepoStore): uint = + (self.quotaUsedBytes + self.quotaReservedBytes) + +method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = + ## Get a block from the blockstore + ## + + without key =? self.makePrefixKey(cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + without data =? await self.repoDs.get(key), err: + if not (err of DatastoreKeyNotFound): + trace "Error getting block from datastore", err = err.msg, key + return failure(err) + + return failure(newException(BlockNotFoundError, err.msg)) + + trace "Got block for cid", cid + return Block.new(cid, data) + +method putBlock*( + self: RepoStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.async.} = + ## Put a block to the blockstore + ## + + without key =? self.makePrefixKey(blk.cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + if await key in self.repoDs: + trace "Block already in store", cid = blk.cid + return success() + + if (self.totalUsed + blk.data.len.uint) > self.quotaMaxBytes: + error "Cannot store block, quota used!", used = self.totalUsed + return failure( + newException(QuotaUsedError, "Cannot store block, quota used!")) + + trace "Storing block with key", key + + without var expires =? ttl: + expires = Moment.fromNow(self.blockTtl) - ZeroMoment + + var + batch: seq[BatchEntry] + + let + used = self.quotaUsedBytes + blk.data.len.uint + + if err =? (await self.repoDs.put(key, blk.data)).errorOption: + trace "Error storing block", err = err.msg + return failure(err) + + trace "Updating quota", used + batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) + + without expiresKey =? makeExpiresKey(expires, blk.cid), err: + trace "Unable make block ttl key", + err = err.msg, cid = blk.cid, expires, expiresKey + + return failure(err) + + trace "Adding expires key", expiresKey, expires + batch.add((expiresKey, @[])) + + if err =? (await self.metaDs.put(batch)).errorOption: + trace "Error updating quota bytes", err = err.msg + + if err =? (await self.repoDs.delete(key)).errorOption: + trace "Error deleting block after failed quota update", err = err.msg + return failure(err) + + return failure(err) + + self.quotaUsedBytes = used + return success() + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore + ## + + trace "Deleting block", cid + + if blk =? (await self.getBlock(cid)): + if key =? self.makePrefixKey(cid) and + err =? (await self.repoDs.delete(key)).errorOption: + trace "Error deleting block!", err = err.msg + return failure(err) + + let + used = self.quotaUsedBytes - blk.data.len.uint + + if err =? (await self.metaDs.put( + QuotaUsedKey, + @(used.uint64.toBytesBE))).errorOption: + trace "Error updating quota key!", err = err.msg + return failure(err) + + self.quotaUsedBytes = used + + trace "Deleted block", cid, totalUsed = self.totalUsed + + return success() + +method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore + ## + + without key =? self.makePrefixKey(cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err) + + return await self.repoDs.has(key) + +method listBlocks*( + self: RepoStore, + blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} = + ## Get the list of blocks in the RepoStore. + ## This is an intensive operation + ## + + var + iter = BlocksIter() + + let key = + case blockType: + of BlockType.Manifest: CodexManifestKey + of BlockType.Block: CodexBlocksKey + of BlockType.Both: CodexRepoKey + + without queryIter =? (await self.repoDs.query(Query.init(key))), err: + trace "Error querying cids in repo", blockType, err = err.msg + return failure(err) + + proc next(): Future[?Cid] {.async.} = + await idleAsync() + iter.finished = queryIter.finished + if not queryIter.finished: + if pair =? (await queryIter.next()) and cid =? pair.key: + trace "Retrieved record from repo", cid + return Cid.init(cid.value).option + + return Cid.none + + iter.next = next + return success iter + +method close*(self: RepoStore): Future[void] {.async.} = + ## Close the blockstore, cleaning up resources managed by it. + ## For some implementations this may be a no-op + ## + + (await self.repoDs.close()).expect("Should close datastore") + +proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore. + ## Return false if error encountered + ## + + without key =? self.makePrefixKey(cid), err: + trace "Error getting key from provider", err = err.msg + return failure(err.msg) + + return await self.repoDs.has(key) + +proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = + ## Reserve bytes + ## + + trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes + + if (self.totalUsed + bytes) > self.quotaMaxBytes: + trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes + return failure( + newException(QuotaNotEnoughError, "Not enough storage quota to reserver")) + + self.quotaReservedBytes += bytes + if err =? (await self.metaDs.put( + QuotaReservedKey, + @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: + + trace "Error reserving bytes", err = err.msg + + self.quotaReservedBytes += bytes + return failure(err) + + return success() + +proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} = + ## Release bytes + ## + + trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes + + if (self.quotaReservedBytes.int - bytes.int) < 0: + trace "Cannot release this many bytes", + quotaReservedBytes = self.quotaReservedBytes, bytes + + return failure("Cannot release this many bytes") + + self.quotaReservedBytes -= bytes + if err =? (await self.metaDs.put( + QuotaReservedKey, + @(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption: + + trace "Error releasing bytes", err = err.msg + + self.quotaReservedBytes -= bytes + + return failure(err) + + trace "Released bytes", bytes + return success() + +proc start*(self: RepoStore): Future[void] {.async.} = + ## Start repo + ## + + if self.started: + trace "Repo already started" + return + + trace "Starting repo" + + ## load current persist and cache bytes from meta ds + without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err: + if not (err of DatastoreKeyNotFound): + error "Error getting cache bytes from datastore", + err = err.msg, key = $QuotaUsedKey + + raise newException(Defect, err.msg) + + if quotaUsedBytes.len > 0: + self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint + + notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes + + without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err: + if not (err of DatastoreKeyNotFound): + error "Error getting persist bytes from datastore", + err = err.msg, key = $QuotaReservedKey + + raise newException(Defect, err.msg) + + if quotaReservedBytes.len > 0: + self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint + + if self.quotaUsedBytes > self.quotaMaxBytes: + raiseAssert "All storage quota used, increase storage quota!" + + notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes + + self.started = true + +proc stop*(self: RepoStore): Future[void] {.async.} = + ## Stop repo + ## + + if self.started: + trace "Repo is not started" + return + + trace "Stopping repo" + (await self.repoDs.close()).expect("Should close repo store!") + (await self.metaDs.close()).expect("Should close meta store!") + +func new*( + T: type RepoStore, + repoDs: Datastore, + metaDs: Datastore, + postFixLen = 2, + quotaMaxBytes = DefaultQuotaBytes, + blockTtl = DefaultBlockTtl): T = + + T( + repoDs: repoDs, + metaDs: metaDs, + postFixLen: postFixLen, + quotaMaxBytes: quotaMaxBytes, + blockTtl: blockTtl) diff --git a/codex/utils/keyutils.nim b/codex/utils/keyutils.nim index 2280da75..fa24ac3d 100644 --- a/codex/utils/keyutils.nim +++ b/codex/utils/keyutils.nim @@ -1,4 +1,3 @@ - ## Nim-Codex ## Copyright (c) 2022 Status Research & Development GmbH ## Licensed under either of @@ -11,14 +10,11 @@ import pkg/upraises push: {.upraises: [].} -import std/os - import pkg/chronicles import pkg/questionable/results import pkg/libp2p import ./fileutils -import ../conf import ../errors import ../rng diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index d866de70..93668f19 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -12,6 +12,7 @@ import pkg/codex/rng import pkg/codex/stores import pkg/codex/blockexchange import pkg/codex/chunker +import pkg/codex/manifest import pkg/codex/blocktype as bt import ../../helpers/mockdiscovery @@ -24,6 +25,8 @@ suite "Block Advertising and Discovery": var blocks: seq[bt.Block] + manifest: Manifest + manifestBlock: bt.Block switch: Switch peerStore: PeerCtxStore blockDiscovery: MockDiscovery @@ -50,6 +53,12 @@ suite "Block Advertising and Discovery": peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() + manifest = Manifest.new( blocks.mapIt( it.cid ) ).tryGet() + manifestBlock = bt.Block.new( + manifest.encode().tryGet(), codec = DagPBCodec).tryGet() + + (await localStore.putBlock(manifestBlock)).tryGet() + discovery = DiscoveryEngine.new( localStore, peerStore, @@ -89,19 +98,50 @@ suite "Block Advertising and Discovery": await engine.stop() - test "Should advertise have blocks": + test "Should advertise both manifests and blocks": + let + advertised = initTable.collect: + for b in (blocks & manifestBlock): {b.cid: newFuture[void]()} + + blockDiscovery + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = + if cid in advertised and not advertised[cid].finished(): + advertised[cid].complete() + + discovery.advertiseType = BlockType.Both + await engine.start() # fire up advertise loop + await allFuturesThrowing( + allFinished(toSeq(advertised.values))) + await engine.stop() + + test "Should advertise local manifests": + let + advertised = newFuture[Cid]() + + blockDiscovery + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = + check manifestBlock.cid == cid + advertised.complete(cid) + + discovery.advertiseType = BlockType.Manifest + await engine.start() # fire up advertise loop + check (await advertised.wait(10.millis)) == manifestBlock.cid + await engine.stop() + + test "Should advertise local blocks": let advertised = initTable.collect: for b in blocks: {b.cid: newFuture[void]()} - blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - if cid in advertised and not advertised[cid].finished(): - advertised[cid].complete() + blockDiscovery + .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = + if cid in advertised and not advertised[cid].finished(): + advertised[cid].complete() + discovery.advertiseType = BlockType.Block await engine.start() # fire up advertise loop await allFuturesThrowing( allFinished(toSeq(advertised.values))) - await engine.stop() test "Should not launch discovery if remote peer has block": diff --git a/tests/codex/stores/commonstoretests.nim b/tests/codex/stores/commonstoretests.nim new file mode 100644 index 00000000..2fa87943 --- /dev/null +++ b/tests/codex/stores/commonstoretests.nim @@ -0,0 +1,149 @@ +import std/sequtils +import std/strutils +import std/options + +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils +import pkg/questionable/results +import pkg/codex/stores/cachestore +import pkg/codex/chunker +import pkg/codex/manifest + +import ../helpers + +type + StoreProvider* = proc(): BlockStore {.gcsafe.} + Before* = proc(): Future[void] {.gcsafe.} + After* = proc(): Future[void] {.gcsafe.} + +proc commonBlockStoreTests*( + name: string, + provider: StoreProvider, + before: Before = nil, + after: After = nil) = + + suite name & " Store Common": + var + newBlock, newBlock1, newBlock2, newBlock3: Block + store: BlockStore + + setup: + newBlock = Block.new("New Kids on the Block".toBytes()).tryGet() + newBlock1 = Block.new("1".repeat(100).toBytes()).tryGet() + newBlock2 = Block.new("2".repeat(100).toBytes()).tryGet() + newBlock3 = Block.new("3".repeat(100).toBytes()).tryGet() + + if not isNil(before): + await before() + + store = provider() + + teardown: + await store.close() + + if not isNil(after): + await after() + + test "putBlock": + (await store.putBlock(newBlock1)).tryGet() + check (await store.hasBlock(newBlock1.cid)).tryGet() + + test "getBlock": + (await store.putBlock(newBlock)).tryGet() + let blk = await store.getBlock(newBlock.cid) + check blk.tryGet() == newBlock + + test "fail getBlock": + expect BlockNotFoundError: + discard (await store.getBlock(newBlock.cid)).tryGet() + + test "hasBlock": + (await store.putBlock(newBlock)).tryGet() + + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store + + test "fail hasBlock": + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) + + test "delBlock": + (await store.putBlock(newBlock1)).tryGet() + check (await store.hasBlock(newBlock1.cid)).tryGet() + + (await store.delBlock(newBlock1.cid)).tryGet() + + check not (await store.hasBlock(newBlock1.cid)).tryGet() + + test "listBlocks Blocks": + let + blocks = @[newBlock1, newBlock2, newBlock3] + + putHandles = await allFinished( + blocks.mapIt( store.putBlock( it ) )) + + for handle in putHandles: + check not handle.failed + check handle.read.isOK + + let + cids = (await store.listBlocks(blockType = BlockType.Block)).tryGet() + + var count = 0 + for c in cids: + if cid =? (await c): + check (await store.hasBlock(cid)).tryGet() + count.inc + + check count == 3 + + test "listBlocks Manifest": + let + blocks = @[newBlock1, newBlock2, newBlock3] + manifest = Manifest.new(blocks = blocks.mapIt( it.cid )).tryGet() + manifestBlock = Block.new(manifest.encode().tryGet(), codec = DagPBCodec).tryGet() + putHandles = await allFinished( + (manifestBlock & blocks).mapIt( store.putBlock( it ) )) + + for handle in putHandles: + check not handle.failed + check handle.read.isOK + + let + cids = (await store.listBlocks(blockType = BlockType.Manifest)).tryGet() + + var count = 0 + for c in cids: + if cid =? (await c): + check manifestBlock.cid == cid + check (await store.hasBlock(cid)).tryGet() + count.inc + + check count == 1 + + test "listBlocks Both": + let + blocks = @[newBlock1, newBlock2, newBlock3] + manifest = Manifest.new(blocks = blocks.mapIt( it.cid )).tryGet() + manifestBlock = Block.new(manifest.encode().tryGet(), codec = DagPBCodec).tryGet() + putHandles = await allFinished( + (manifestBlock & blocks).mapIt( store.putBlock( it ) )) + + for handle in putHandles: + check not handle.failed + check handle.read.isOK + + let + cids = (await store.listBlocks(blockType = BlockType.Both)).tryGet() + + var count = 0 + for c in cids: + if cid =? (await c): + check (await store.hasBlock(cid)).tryGet() + count.inc + + check count == 4 diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index 1e5dc043..11bbff84 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -9,6 +9,8 @@ import pkg/questionable/results import pkg/codex/stores/cachestore import pkg/codex/chunker +import ./commonstoretests + import ../helpers suite "Cache Store": @@ -30,6 +32,7 @@ suite "Cache Store": store = CacheStore.new(cacheSize = 100, chunkSize = 1) check store.currentSize == 0 + store = CacheStore.new(@[newBlock1, newBlock2, newBlock3]) check store.currentSize == 300 @@ -48,7 +51,6 @@ suite "Cache Store": chunkSize = 100) test "putBlock": - (await store.putBlock(newBlock1)).tryGet() check (await store.hasBlock(newBlock1.cid)).tryGet() @@ -68,60 +70,6 @@ suite "Cache Store": (await store.hasBlock(newBlock2.cid)).tryGet() store.currentSize == newBlock2.data.len + newBlock3.data.len # 200 - test "getBlock": - store = CacheStore.new(@[newBlock]) - - let blk = await store.getBlock(newBlock.cid) - check blk.tryGet() == newBlock - - test "fail getBlock": - let blk = await store.getBlock(newBlock.cid) - check: - blk.isErr - blk.error of BlockNotFoundError - - test "hasBlock": - let store = CacheStore.new(@[newBlock]) - check: - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store - - test "fail hasBlock": - check: - not (await store.hasBlock(newBlock.cid)).tryGet() - not (await newBlock.cid in store) - - test "delBlock": - # empty cache - (await store.delBlock(newBlock1.cid)).tryGet() - check not (await store.hasBlock(newBlock1.cid)).tryGet() - - (await store.putBlock(newBlock1)).tryGet() - check (await store.hasBlock(newBlock1.cid)).tryGet() - - # successfully deleted - (await store.delBlock(newBlock1.cid)).tryGet() - check not (await store.hasBlock(newBlock1.cid)).tryGet() - - # deletes item should decrement size - store = CacheStore.new(@[newBlock1, newBlock2, newBlock3]) - check: - store.currentSize == 300 - - (await store.delBlock(newBlock2.cid)).tryGet() - - check: - store.currentSize == 200 - not (await store.hasBlock(newBlock2.cid)).tryGet() - - test "listBlocks": - (await store.putBlock(newBlock1)).tryGet() - - var listed = false - (await store.listBlocks( - proc(cid: Cid) {.gcsafe, async.} = - check (await store.hasBlock(cid)).tryGet() - listed = true - )).tryGet() - - check listed +commonBlockStoreTests( + "Cache", proc: BlockStore = + BlockStore(CacheStore.new(cacheSize = 500, chunkSize = 1))) diff --git a/tests/codex/stores/testfsstore.nim b/tests/codex/stores/testfsstore.nim deleted file mode 100644 index e178f9a4..00000000 --- a/tests/codex/stores/testfsstore.nim +++ /dev/null @@ -1,88 +0,0 @@ -import std/os -import std/options - -import pkg/questionable -import pkg/questionable/results - -import pkg/chronos -import pkg/asynctest -import pkg/libp2p -import pkg/stew/byteutils - -import pkg/codex/stores/cachestore -import pkg/codex/chunker -import pkg/codex/stores -import pkg/codex/blocktype as bt - -import ../helpers - -proc runSuite(cache: bool) = - suite "FS Store " & (if cache: "(cache enabled)" else: "(cache disabled)"): - var - store: FSStore - repoDir: string - newBlock = bt.Block.new("New Block".toBytes()).tryGet() - - setup: - repoDir = getAppDir() / "repo" - createDir(repoDir) - - if cache: - store = FSStore.new(repoDir) - else: - store = FSStore.new(repoDir, postfixLen = 2, cache = nil) - - teardown: - removeDir(repoDir) - - test "putBlock": - (await store.putBlock(newBlock)).tryGet() - check: - fileExists(store.blockPath(newBlock.cid)) - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store - - test "getBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) - let blk = await store.getBlock(newBlock.cid) - check blk.tryGet() == newBlock - - test "fail getBlock": - let blk = await store.getBlock(newBlock.cid) - check: - blk.isErr - blk.error of BlockNotFoundError - - test "hasBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) - - check: - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store - - test "fail hasBlock": - check: - not (await store.hasBlock(newBlock.cid)).tryGet() - not (await newBlock.cid in store) - - test "listBlocks": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) - - (await store.listBlocks( - proc(cid: Cid) {.gcsafe, async.} = - check cid == newBlock.cid - )).tryGet() - - test "delBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) - - (await store.delBlock(newBlock.cid)).tryGet() - - check not fileExists(store.blockPath(newBlock.cid)) - -runSuite(cache = true) -runSuite(cache = false) diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim new file mode 100644 index 00000000..d576cd9b --- /dev/null +++ b/tests/codex/stores/testrepostore.nim @@ -0,0 +1,170 @@ +import std/os +import std/options +import std/strutils + +import pkg/questionable +import pkg/questionable/results + +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils +import pkg/stew/endians2 +import pkg/datastore + +import pkg/codex/stores/cachestore +import pkg/codex/chunker +import pkg/codex/stores +import pkg/codex/blocktype as bt + +import ../helpers +import ./commonstoretests + +suite "Test RepoStore Quota": + + var + repoDs: Datastore + metaDs: Datastore + + setup: + repoDs = SQLiteDatastore.new(Memory).tryGet() + metaDs = SQLiteDatastore.new(Memory).tryGet() + + teardown: + (await repoDs.close()).tryGet + (await metaDs.close()).tryGet + + test "Should update current used bytes on block put": + let + blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100) + + check repo.quotaUsedBytes == 0 + (await repo.putBlock(blk)).tryGet + + check: + repo.quotaUsedBytes == 100 + uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u + + test "Should update current used bytes on block delete": + let + blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100) + + check repo.quotaUsedBytes == 0 + (await repo.putBlock(blk)).tryGet + check repo.quotaUsedBytes == 100 + + (await repo.delBlock(blk.cid)).tryGet + + check: + repo.quotaUsedBytes == 0 + uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u + + test "Should fail storing passed the quota": + let + blk = bt.Block.new('a'.repeat(200).toBytes).tryGet() + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100) + + check repo.totalUsed == 0 + expect QuotaUsedError: + (await repo.putBlock(blk)).tryGet + + test "Should reserve bytes": + let + blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + + check repo.totalUsed == 0 + (await repo.putBlock(blk)).tryGet + check repo.totalUsed == 100 + + (await repo.reserve(100)).tryGet + + check: + repo.totalUsed == 200 + repo.quotaUsedBytes == 100 + repo.quotaReservedBytes == 100 + uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + + test "Should not reserve bytes over max quota": + let + blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + + check repo.totalUsed == 0 + (await repo.putBlock(blk)).tryGet + check repo.totalUsed == 100 + + expect QuotaNotEnoughError: + (await repo.reserve(101)).tryGet + + check: + repo.totalUsed == 100 + repo.quotaUsedBytes == 100 + repo.quotaReservedBytes == 0 + + expect DatastoreKeyNotFound: + discard (await metaDs.get(QuotaReservedKey)).tryGet + + test "Should release bytes": + let + blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + + check repo.totalUsed == 0 + (await repo.reserve(100)).tryGet + check repo.totalUsed == 100 + + (await repo.release(100)).tryGet + + check: + repo.totalUsed == 0 + repo.quotaUsedBytes == 0 + repo.quotaReservedBytes == 0 + uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u + + test "Should not release bytes less than quota": + let + repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + + check repo.totalUsed == 0 + (await repo.reserve(100)).tryGet + check repo.totalUsed == 100 + + expect CatchableError: + (await repo.release(101)).tryGet + + check: + repo.totalUsed == 100 + repo.quotaUsedBytes == 0 + repo.quotaReservedBytes == 100 + uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + +commonBlockStoreTests( + "RepoStore Sql backend", proc: BlockStore = + BlockStore( + RepoStore.new( + SQLiteDatastore.new(Memory).tryGet(), + SQLiteDatastore.new(Memory).tryGet()))) + +const + path = currentSourcePath().parentDir / "test" + +proc before() {.async.} = + createDir(path) + +proc after() {.async.} = + removeDir(path) + +let + depth = path.split(DirSep).len + +commonBlockStoreTests( + "RepoStore FS backend", proc: BlockStore = + BlockStore( + RepoStore.new( + FSDatastore.new(path, depth).tryGet(), + SQLiteDatastore.new(Memory).tryGet())), + before = before, + after = after) diff --git a/tests/codex/teststores.nim b/tests/codex/teststores.nim index 3e83116c..355897c6 100644 --- a/tests/codex/teststores.nim +++ b/tests/codex/teststores.nim @@ -1,4 +1,4 @@ import ./stores/testcachestore -import ./stores/testfsstore +import ./stores/testrepostore {.warning[UnusedImport]: off.} diff --git a/vendor/nim-datastore b/vendor/nim-datastore index 6c06a3b0..44c198b9 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit 6c06a3b095d1935aaf5eec66295862c9c3b4bac5 +Subproject commit 44c198b96a2d687f94c9971f4a3ece02b330347b diff --git a/vendor/nim-libp2p-dht b/vendor/nim-libp2p-dht index e4e7a3e1..4375b922 160000 --- a/vendor/nim-libp2p-dht +++ b/vendor/nim-libp2p-dht @@ -1 +1 @@ -Subproject commit e4e7a3e11fe635de3f15e37164b3ace96f588993 +Subproject commit 4375b9229815c332a3b1a9d0091d5cf5a74adb2e