From 1519100ea4dc44fa1d39ebab5e68fb67a367bcae Mon Sep 17 00:00:00 2001 From: munna0908 Date: Fri, 28 Mar 2025 22:20:26 +0530 Subject: [PATCH] improve store batch --- codex/node.nim | 19 ++++++++++++---- codex/slots/builder/builder.nim | 16 +++++++------ codex/stores/blockstore.nim | 2 +- codex/stores/networkstore.nim | 4 ++-- codex/stores/repostore/store.nim | 39 ++++++++++++++++---------------- 5 files changed, 46 insertions(+), 34 deletions(-) diff --git a/codex/node.nim b/codex/node.nim index dba0dd80..99eefb43 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -53,6 +53,7 @@ logScope: topics = "codex node" const DefaultFetchBatch = 10 +const DefaultLeafBatch = 100 type Contracts* = @@ -422,15 +423,23 @@ proc store*( without treeCid =? tree.rootCid(CIDv1, dataCodec), err: return failure(err) + var + batch = newSeq[(Cid, CodexProof)]() + batchStartIndex = 0 + for index, cid in cids: without proof =? tree.getProof(index), err: return failure(err) - proofs.add(proof) + batch.add((cid, proof)) - if err =? - (await self.networkStore.putCidAndProofBatch(treeCid, cids, proofs)).errorOption: - # TODO add log here - return failure(err) + if batch.len >= DefaultLeafBatch or index == cids.len - 1: + if err =? ( + await self.networkStore.putCidAndProofBatch(treeCid, batchStartIndex, batch) + ).errorOption: + # TODO add log here + return failure(err) + batch.setLen(0) + batchStartIndex = index + 1 let manifest = Manifest.new( treeCid = treeCid, diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 37c2ccd3..e5832fa7 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -225,8 +225,8 @@ proc buildSlot*[T, H]( trace "Building slot tree" var - cids = newSeq[Cid]() - proofs = newSeq[CodexProof]() + batch = newSeq[(Cid, CodexProof)]() + batchStartIndex = 0 without tree =? (await self.buildSlotTree(slotIndex)) and treeCid =? tree.root .? toSlotCid, err: @@ -243,12 +243,14 @@ proc buildSlot*[T, H]( error "Failed to get proof for slot tree", err = err.msg return failure(err) - cids.add(cellCid) - proofs.add(encodableProof) + batch.add((cellCid, encodableProof)) - if err =? (await self.store.putCidAndProofBatch(treeCid, cids, proofs)).errorOption: - error "Failed to store slot tree", err = err.msg - return failure(err) + if batch.len >= 50 or i == tree.leaves.len - 1: + if err =? (await self.store.putCidAndProofBatch(treeCid, batchStartIndex, batch)).errorOption: + error "Failed to store slot tree", err = err.msg + return failure(err) + batch.setLen(0) + batchStartIndex = i + 1 tree.root() diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 14af3089..5cbd7056 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -88,7 +88,7 @@ method putCidAndProof*( raiseAssert("putCidAndProof not implemented!") method putCidAndProofBatch*( - self: BlockStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] + self: BlockStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)] ): Future[?!void] {.base, gcsafe.} = ## Put a batch of block proofs to the blockstore ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index a078f0b1..e36fb585 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -70,9 +70,9 @@ method putBlock*( return success() method putCidAndProofBatch*( - self: NetworkStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] + self: NetworkStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)] ): Future[?!void] = - self.localStore.putCidAndProofBatch(treeCid, blkCids, proofs) + self.localStore.putCidAndProofBatch(treeCid, startIndex, entries) method putCidAndProof*( self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index bc83ce73..5aa3a440 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -117,18 +117,18 @@ method ensureExpiry*( await self.ensureExpiry(leafMd.blkCid, expiry) method putCidAndProofBatch*( - self: RepoStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] + self: RepoStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)] ): Future[?!void] {.async.} = var batch = newSeq[BatchEntry]() - results = newSeq[StoreResult](blkCids.len) + results = newSeq[StoreResult](entries.len) lock = self.locks.mgetOrPut(treeCid, newAsyncLock()) - batchSize = 50 - + treeIndex = startIndex + echo "Adding cid and proofs", entries.len try: await lock.acquire() - for i, cid in blkCids: - without key =? createBlockCidAndProofMetadataKey(treeCid, i), err: + for i, entry in entries: + without key =? createBlockCidAndProofMetadataKey(treeCid, treeIndex), err: return failure(err) # Check existence before adding to batch @@ -138,29 +138,29 @@ method putCidAndProofBatch*( results[i] = StoreResult(kind: AlreadyInStore) else: results[i] = StoreResult(kind: Stored) - var metadata = LeafMetadata(blkCid: cid, proof: proofs[i]) + var metadata = LeafMetadata(blkCid: entry[0], proof: entry[1]) batch.add((key: key, data: metadata.encode)) + treeIndex += 1 - if batch.len >= batchSize or i == blkCids.len - 1: - try: - if err =? (await self.metaDs.ds.put(batch)).errorOption: - return failure(err) - except CatchableError as e: - return failure(e.msg) - batch = newSeq[BatchEntry]() + try: + if err =? (await self.metaDs.ds.put(batch)).errorOption: + return failure(err) + except CatchableError as e: + return failure(e.msg) finally: lock.release() if not lock.locked: self.locks.del(treeCid) - for i, res in results: - if res.kind == Stored and blkCids[i].mcodec == BlockCodec: - if err =? - (await self.updateBlockMetadata(blkCids[i], plusRefCount = 1)).errorOption: + # Update reference counts for blocks that were stored + for i, entry in entries: + if results[i].kind == Stored and entry[0].mcodec == BlockCodec: + if err =? (await self.updateBlockMetadata(entry[0], plusRefCount = 1)).errorOption: return failure(err) trace "Leaf metadata stored, block refCount incremented" else: trace "Leaf metadata already exists" + return success() method putCidAndProof*( @@ -169,6 +169,7 @@ method putCidAndProof*( ## Put a block to the blockstore ## # TODO: Add locking for treeCid + logScope: treeCid = treeCid index = index @@ -302,7 +303,7 @@ method delBlocks*( lastIdle = getTime() batch = newSeq[Key]() blckCids = newSeq[Cid]() - batchSize = 100 + batchSize = 1000 for i in 0 ..< blocksCount: if (getTime() - lastIdle) >= runtimeQuota: