improve store batch

This commit is contained in:
munna0908 2025-03-28 22:20:26 +05:30
parent 44074f7915
commit 1519100ea4
No known key found for this signature in database
GPG Key ID: 2FFCD637E937D3E6
5 changed files with 46 additions and 34 deletions

View File

@ -53,6 +53,7 @@ logScope:
topics = "codex node" topics = "codex node"
const DefaultFetchBatch = 10 const DefaultFetchBatch = 10
const DefaultLeafBatch = 100
type type
Contracts* = Contracts* =
@ -422,15 +423,23 @@ proc store*(
without treeCid =? tree.rootCid(CIDv1, dataCodec), err: without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
return failure(err) return failure(err)
var
batch = newSeq[(Cid, CodexProof)]()
batchStartIndex = 0
for index, cid in cids: for index, cid in cids:
without proof =? tree.getProof(index), err: without proof =? tree.getProof(index), err:
return failure(err) return failure(err)
proofs.add(proof) batch.add((cid, proof))
if err =? if batch.len >= DefaultLeafBatch or index == cids.len - 1:
(await self.networkStore.putCidAndProofBatch(treeCid, cids, proofs)).errorOption: if err =? (
# TODO add log here await self.networkStore.putCidAndProofBatch(treeCid, batchStartIndex, batch)
return failure(err) ).errorOption:
# TODO add log here
return failure(err)
batch.setLen(0)
batchStartIndex = index + 1
let manifest = Manifest.new( let manifest = Manifest.new(
treeCid = treeCid, treeCid = treeCid,

View File

@ -225,8 +225,8 @@ proc buildSlot*[T, H](
trace "Building slot tree" trace "Building slot tree"
var var
cids = newSeq[Cid]() batch = newSeq[(Cid, CodexProof)]()
proofs = newSeq[CodexProof]() batchStartIndex = 0
without tree =? (await self.buildSlotTree(slotIndex)) and without tree =? (await self.buildSlotTree(slotIndex)) and
treeCid =? tree.root .? toSlotCid, err: treeCid =? tree.root .? toSlotCid, err:
@ -243,12 +243,14 @@ proc buildSlot*[T, H](
error "Failed to get proof for slot tree", err = err.msg error "Failed to get proof for slot tree", err = err.msg
return failure(err) return failure(err)
cids.add(cellCid) batch.add((cellCid, encodableProof))
proofs.add(encodableProof)
if err =? (await self.store.putCidAndProofBatch(treeCid, cids, proofs)).errorOption: if batch.len >= 50 or i == tree.leaves.len - 1:
error "Failed to store slot tree", err = err.msg if err =? (await self.store.putCidAndProofBatch(treeCid, batchStartIndex, batch)).errorOption:
return failure(err) error "Failed to store slot tree", err = err.msg
return failure(err)
batch.setLen(0)
batchStartIndex = i + 1
tree.root() tree.root()

View File

@ -88,7 +88,7 @@ method putCidAndProof*(
raiseAssert("putCidAndProof not implemented!") raiseAssert("putCidAndProof not implemented!")
method putCidAndProofBatch*( method putCidAndProofBatch*(
self: BlockStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] self: BlockStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)]
): Future[?!void] {.base, gcsafe.} = ): Future[?!void] {.base, gcsafe.} =
## Put a batch of block proofs to the blockstore ## Put a batch of block proofs to the blockstore
## ##

View File

@ -70,9 +70,9 @@ method putBlock*(
return success() return success()
method putCidAndProofBatch*( method putCidAndProofBatch*(
self: NetworkStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] self: NetworkStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)]
): Future[?!void] = ): Future[?!void] =
self.localStore.putCidAndProofBatch(treeCid, blkCids, proofs) self.localStore.putCidAndProofBatch(treeCid, startIndex, entries)
method putCidAndProof*( method putCidAndProof*(
self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof

View File

@ -117,18 +117,18 @@ method ensureExpiry*(
await self.ensureExpiry(leafMd.blkCid, expiry) await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProofBatch*( method putCidAndProofBatch*(
self: RepoStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof] self: RepoStore, treeCid: Cid, startIndex: int, entries: seq[(Cid, CodexProof)]
): Future[?!void] {.async.} = ): Future[?!void] {.async.} =
var var
batch = newSeq[BatchEntry]() batch = newSeq[BatchEntry]()
results = newSeq[StoreResult](blkCids.len) results = newSeq[StoreResult](entries.len)
lock = self.locks.mgetOrPut(treeCid, newAsyncLock()) lock = self.locks.mgetOrPut(treeCid, newAsyncLock())
batchSize = 50 treeIndex = startIndex
echo "Adding cid and proofs", entries.len
try: try:
await lock.acquire() await lock.acquire()
for i, cid in blkCids: for i, entry in entries:
without key =? createBlockCidAndProofMetadataKey(treeCid, i), err: without key =? createBlockCidAndProofMetadataKey(treeCid, treeIndex), err:
return failure(err) return failure(err)
# Check existence before adding to batch # Check existence before adding to batch
@ -138,29 +138,29 @@ method putCidAndProofBatch*(
results[i] = StoreResult(kind: AlreadyInStore) results[i] = StoreResult(kind: AlreadyInStore)
else: else:
results[i] = StoreResult(kind: Stored) results[i] = StoreResult(kind: Stored)
var metadata = LeafMetadata(blkCid: cid, proof: proofs[i]) var metadata = LeafMetadata(blkCid: entry[0], proof: entry[1])
batch.add((key: key, data: metadata.encode)) batch.add((key: key, data: metadata.encode))
treeIndex += 1
if batch.len >= batchSize or i == blkCids.len - 1: try:
try: if err =? (await self.metaDs.ds.put(batch)).errorOption:
if err =? (await self.metaDs.ds.put(batch)).errorOption: return failure(err)
return failure(err) except CatchableError as e:
except CatchableError as e: return failure(e.msg)
return failure(e.msg)
batch = newSeq[BatchEntry]()
finally: finally:
lock.release() lock.release()
if not lock.locked: if not lock.locked:
self.locks.del(treeCid) self.locks.del(treeCid)
for i, res in results: # Update reference counts for blocks that were stored
if res.kind == Stored and blkCids[i].mcodec == BlockCodec: for i, entry in entries:
if err =? if results[i].kind == Stored and entry[0].mcodec == BlockCodec:
(await self.updateBlockMetadata(blkCids[i], plusRefCount = 1)).errorOption: if err =? (await self.updateBlockMetadata(entry[0], plusRefCount = 1)).errorOption:
return failure(err) return failure(err)
trace "Leaf metadata stored, block refCount incremented" trace "Leaf metadata stored, block refCount incremented"
else: else:
trace "Leaf metadata already exists" trace "Leaf metadata already exists"
return success() return success()
method putCidAndProof*( method putCidAndProof*(
@ -169,6 +169,7 @@ method putCidAndProof*(
## Put a block to the blockstore ## Put a block to the blockstore
## ##
# TODO: Add locking for treeCid # TODO: Add locking for treeCid
logScope: logScope:
treeCid = treeCid treeCid = treeCid
index = index index = index
@ -302,7 +303,7 @@ method delBlocks*(
lastIdle = getTime() lastIdle = getTime()
batch = newSeq[Key]() batch = newSeq[Key]()
blckCids = newSeq[Cid]() blckCids = newSeq[Cid]()
batchSize = 100 batchSize = 1000
for i in 0 ..< blocksCount: for i in 0 ..< blocksCount:
if (getTime() - lastIdle) >= runtimeQuota: if (getTime() - lastIdle) >= runtimeQuota: