diff --git a/codex/node.nim b/codex/node.nim index 9932deb6..7732c3f3 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -399,6 +399,8 @@ proc store*( dataCodec = BlockCodec chunker = LPStreamChunker.new(stream, chunkSize = blockSize) + var proofs = newSeq[CodexProof]() + var cids: seq[Cid] try: @@ -433,10 +435,12 @@ proc store*( for index, cid in cids: without proof =? tree.getProof(index), err: return failure(err) - if err =? - (await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption: - # TODO add log here - return failure(err) + proofs.add(proof) + + if err =? + (await self.networkStore.putCidAndProofBatch(treeCid, cids, proofs)).errorOption: + # TODO add log here + return failure(err) let manifest = Manifest.new( treeCid = treeCid, diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 243d4ed6..cbf36bc3 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -366,7 +366,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse: let json = %RestRepoStore( - totalBlocks: repoStore.totalBlocks, + totalBlocks: repoStore.storageStats.totalBlocks, quotaMaxBytes: repoStore.quotaMaxBytes, quotaUsedBytes: repoStore.quotaUsedBytes, quotaReservedBytes: repoStore.quotaReservedBytes, diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 78fab0da..686744e0 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -87,6 +87,14 @@ method putCidAndProof*( raiseAssert("putCidAndProof not implemented!") +method putCidAndProofBatch*( + self: BlockStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] +): Future[?!void] {.base, gcsafe.} = + ## Put a batch of block proofs to the blockstore + ## + + raiseAssert("putCidAndProofBatch not implemented!") + method getCidAndProof*( self: BlockStore, treeCid: Cid, index: Natural ): Future[?!(Cid, CodexProof)] {.base, gcsafe.} = diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index f94bca33..4e326a22 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -69,6 +69,11 @@ method putBlock*( await self.engine.resolveBlocks(@[blk]) return success() +method putCidAndProofBatch*( + self: NetworkStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] +): Future[?!void] = + self.localStore.putCidAndProofBatch(treeCid, blkCids, proofs) + method putCidAndProof*( self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof ): Future[?!void] = diff --git a/codex/stores/repostore/coders.nim b/codex/stores/repostore/coders.nim index 47df7219..e656886d 100644 --- a/codex/stores/repostore/coders.nim +++ b/codex/stores/repostore/coders.nim @@ -19,10 +19,10 @@ import ../../errors import ../../merkletree import ../../utils/json -proc encode*(t: QuotaUsage): seq[byte] = +proc encode*(t: StorageStats): seq[byte] = t.toJson().toBytes() -proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T = +proc decode*(T: type StorageStats, bytes: seq[byte]): ?!T = T.fromJson(bytes) proc encode*(t: BlockMetadata): seq[byte] = diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index 125741e1..4926fb19 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -82,57 +82,97 @@ proc getLeafMetadata*( success(leafMd) -proc updateTotalBlocksCount*( - self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0 -): Future[?!void] {.async.} = - await self.metaDs.modify( - CodexTotalBlocksKey, - proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} = - let count: Natural = - if currCount =? maybeCurrCount: - currCount + plusCount - minusCount - else: - plusCount - minusCount - - self.totalBlocks = count - codex_repostore_blocks.set(count.int64) - count.some, - ) - -proc updateQuotaUsage*( +proc updateQuotaAndBlockCount*( self: RepoStore, + plusCount: Natural = 0, + minusCount: Natural = 0, plusUsed: NBytes = 0.NBytes, minusUsed: NBytes = 0.NBytes, plusReserved: NBytes = 0.NBytes, minusReserved: NBytes = 0.NBytes, ): Future[?!void] {.async.} = await self.metaDs.modify( - QuotaUsedKey, - proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = - var usage: QuotaUsage - - if currUsage =? maybeCurrUsage: - usage = QuotaUsage( - used: currUsage.used + plusUsed - minusUsed, - reserved: currUsage.reserved + plusReserved - minusReserved, + CodexTotalBlocksKey, + proc(maybeCurrStats: ?StorageStats): Future[?StorageStats] {.async.} = + var stats: StorageStats + if currStats =? maybeCurrStats: + stats = StorageStats( + quotaUsed: currStats.quotaUsed + plusUsed - minusUsed, + quotaReserved: currStats.quotaReserved + plusReserved - minusReserved, + totalBlocks: currStats.totalBlocks + plusCount - minusCount, ) else: - usage = - QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + stats = StorageStats( + quotaUsed: plusUsed - minusUsed, + quotaReserved: plusReserved - minusReserved, + totalBlocks: plusCount - minusCount, + ) - if usage.used + usage.reserved > self.quotaMaxBytes: + if stats.quotaUsed + stats.quotaReserved > self.quotaMaxBytes: raise newException( QuotaNotEnoughError, - "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & - $usage.reserved & ", limit: " & $self.quotaMaxBytes, + "Quota usage would exceed the limit. Used: " & $stats.quotaUsed & + ", reserved: " & $stats.quotaReserved & ", limit: " & $self.quotaMaxBytes, ) else: - self.quotaUsage = usage - codex_repostore_bytes_used.set(usage.used.int64) - codex_repostore_bytes_reserved.set(usage.reserved.int64) - return usage.some, + self.storageStats = stats + codex_repostore_bytes_used.set(stats.quotaUsed.int64) + codex_repostore_bytes_reserved.set(stats.quotaReserved.int64) + codex_repostore_blocks.set(stats.totalBlocks.int64) + return stats.some, ) +# proc updateTotalBlocksCount*( +# self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0 +# ): Future[?!void] {.async.} = +# await self.metaDs.modify( +# CodexTotalBlocksKey, +# proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} = +# let count: Natural = +# if currCount =? maybeCurrCount: +# currCount + plusCount - minusCount +# else: +# plusCount - minusCount + +# self.totalBlocks = count +# codex_repostore_blocks.set(count.int64) +# count.some, +# ) + +# proc updateQuotaUsage*( +# self: RepoStore, +# plusUsed: NBytes = 0.NBytes, +# minusUsed: NBytes = 0.NBytes, +# plusReserved: NBytes = 0.NBytes, +# minusReserved: NBytes = 0.NBytes, +# ): Future[?!void] {.async.} = +# await self.metaDs.modify( +# QuotaUsedKey, +# proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = +# var usage: QuotaUsage + +# if currUsage =? maybeCurrUsage: +# usage = QuotaUsage( +# used: currUsage.used + plusUsed - minusUsed, +# reserved: currUsage.reserved + plusReserved - minusReserved, +# ) +# else: +# usage = +# QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved) + +# if usage.used + usage.reserved > self.quotaMaxBytes: +# raise newException( +# QuotaNotEnoughError, +# "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " & +# $usage.reserved & ", limit: " & $self.quotaMaxBytes, +# ) +# else: +# self.quotaUsage = usage +# codex_repostore_bytes_used.set(usage.used.int64) +# codex_repostore_bytes_reserved.set(usage.reserved.int64) +# return usage.some, +# ) + proc updateBlockMetadata*( self: RepoStore, cid: Cid, diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index d7305107..9225b0d5 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -114,12 +114,64 @@ method ensureExpiry*( await self.ensureExpiry(leafMd.blkCid, expiry) +method putCidAndProofBatch*( + self: RepoStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] +): Future[?!void] {.async.} = + var + batch = newSeq[BatchEntry]() + results = newSeq[StoreResult](blkCids.len) + lock = self.locks.mgetOrPut(treeCid, newAsyncLock()) + batchSize = 50 + try: + await lock.acquire() + for i, cid in blkCids: + without key =? createBlockCidAndProofMetadataKey(treeCid, i), err: + return failure(err) + + # Check existence before adding to batch + if exists =? await self.metaDs.has(key): + results[i] = StoreResult(kind: AlreadyInStore) + else: + results[i] = StoreResult(kind: Stored) + + let metadata = LeafMetadata(blkCid: cid, proof: proofs[i]) + batch.add((key: key, data: metadata.encode)) + + if batch.len >= batchSize: + try: + if err =? (await self.metaDs.ds.put(batch)).errorOption: + return failure(err) + except CatchableError as e: + return failure(e.msg) + batch = newSeq[BatchEntry]() + + if batch.len > 0: + try: + if err =? (await self.metaDs.ds.put(batch)).errorOption: + return failure(err) + except CatchableError as e: + return failure(e.msg) + finally: + lock.release() + if not lock.locked: + self.locks.del(treeCid) + + for i, res in results: + if res.kind == Stored: + if err =? + (await self.updateBlockMetadata(blkCids[i], plusRefCount = 1)).errorOption: + return failure(err) + trace "Leaf metadata stored, block refCount incremented" + else: + trace "Leaf metadata already exists" + return success() + method putCidAndProof*( self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof ): Future[?!void] {.async.} = ## Put a block to the blockstore ## - + # TODO: Add locking for treeCid logScope: treeCid = treeCid index = index @@ -170,13 +222,19 @@ method putBlock*( if res.kind == Stored: trace "Block Stored" - if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: - # rollback changes + # if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption: + # # rollback changes + # without delRes =? await self.tryDeleteBlock(blk.cid), err: + # return failure(err) + # return failure(err) + + # if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: + # return failure(err) + + if err =? (await self.updateQuotaAndBlockCount(plusCount = 1, plusUsed = res.used)).errorOption: without delRes =? await self.tryDeleteBlock(blk.cid), err: return failure(err) - return failure(err) - if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption: return failure(err) if onBlock =? self.onBlockStored: @@ -200,10 +258,15 @@ proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.a if res.kind == Deleted: trace "Block deleted" - if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: - return failure(err) + # if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption: + # return failure(err) - if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + # if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: + # return failure(err) + + if err =? ( + await self.updateQuotaAndBlockCount(minusCount = 1, minusUsed = res.released) + ).errorOption: return failure(err) success(res.kind) @@ -386,7 +449,7 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = trace "Reserving bytes", bytes - await self.updateQuotaUsage(plusReserved = bytes) + await self.updateQuotaAndBlockCount(plusReserved = bytes) proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = ## Release bytes @@ -394,7 +457,7 @@ proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = trace "Releasing bytes", bytes - await self.updateQuotaUsage(minusReserved = bytes) + await self.updateQuotaAndBlockCount(minusReserved = bytes) proc start*(self: RepoStore): Future[void] {.async.} = ## Start repo @@ -405,10 +468,7 @@ proc start*(self: RepoStore): Future[void] {.async.} = return trace "Starting rep" - if err =? (await self.updateTotalBlocksCount()).errorOption: - raise newException(CodexError, err.msg) - - if err =? (await self.updateQuotaUsage()).errorOption: + if err =? (await self.updateQuotaAndBlockCount()).errorOption: raise newException(CodexError, err.msg) self.started = true diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 42f528e9..a98c8f93 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -33,14 +33,15 @@ type metaDs*: TypedDatastore clock*: Clock quotaMaxBytes*: NBytes - quotaUsage*: QuotaUsage - totalBlocks*: Natural + storageStats*: StorageStats blockTtl*: Duration started*: bool + locks*: TableRef[Cid, AsyncLock] - QuotaUsage* {.serialize.} = object - used*: NBytes - reserved*: NBytes + StorageStats* {.serialize.} = object + quotaUsed*: NBytes + quotaReserved*: NBytes + totalBlocks*: Natural BlockMetadata* {.serialize.} = object expiry*: SecondsSince1970 @@ -73,10 +74,10 @@ type used*: NBytes func quotaUsedBytes*(self: RepoStore): NBytes = - self.quotaUsage.used + self.storageStats.quotaUsed func quotaReservedBytes*(self: RepoStore): NBytes = - self.quotaUsage.reserved + self.storageStats.quotaReserved func totalUsed*(self: RepoStore): NBytes = (self.quotaUsedBytes + self.quotaReservedBytes) @@ -106,4 +107,5 @@ func new*( quotaMaxBytes: quotaMaxBytes, blockTtl: blockTtl, onBlockStored: CidCallback.none, + locks: newTable[Cid, AsyncLock](), ) diff --git a/tests/codex/stores/repostore/testcoders.nim b/tests/codex/stores/repostore/testcoders.nim index 9d341af0..1ab2eba8 100644 --- a/tests/codex/stores/repostore/testcoders.nim +++ b/tests/codex/stores/repostore/testcoders.nim @@ -19,8 +19,10 @@ suite "Test coders": let ordinals = enumRangeInt64(E) E(ordinals[rand(ordinals.len - 1)]) - proc rand(T: type QuotaUsage): T = - QuotaUsage(used: rand(NBytes), reserved: rand(NBytes)) + proc rand(T: type StorageStats): T = + StorageStats( + quotaUsed: rand(NBytes), quotaReserved: rand(NBytes), totalBlocks: rand(Natural) + ) proc rand(T: type BlockMetadata): T = BlockMetadata( @@ -37,11 +39,12 @@ suite "Test coders": for val in newSeqWith[Natural](100, rand(Natural)) & @[Natural.low, Natural.high]: check: success(val) == Natural.decode(encode(val)) + + test "StorageStats encode/decode": + for val in newSeqWith[StorageStats](100, rand(StorageStats)): + check: + success(val) == StorageStats.decode(encode(val)) - test "QuotaUsage encode/decode": - for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)): - check: - success(val) == QuotaUsage.decode(encode(val)) test "BlockMetadata encode/decode": for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)):