- batch leafMetaData updates

- use single db entry for storing quota and blocks count
This commit is contained in:
munna0908 2025-03-26 17:04:22 +05:30
parent 60b6996eb0
commit c63be3b30e
No known key found for this signature in database
GPG Key ID: 2FFCD637E937D3E6
9 changed files with 191 additions and 69 deletions

View File

@ -399,6 +399,8 @@ proc store*(
dataCodec = BlockCodec
chunker = LPStreamChunker.new(stream, chunkSize = blockSize)
var proofs = newSeq[CodexProof]()
var cids: seq[Cid]
try:
@ -433,10 +435,12 @@ proc store*(
for index, cid in cids:
without proof =? tree.getProof(index), err:
return failure(err)
if err =?
(await self.networkStore.putCidAndProof(treeCid, index, cid, proof)).errorOption:
# TODO add log here
return failure(err)
proofs.add(proof)
if err =?
(await self.networkStore.putCidAndProofBatch(treeCid, cids, proofs)).errorOption:
# TODO add log here
return failure(err)
let manifest = Manifest.new(
treeCid = treeCid,

View File

@ -366,7 +366,7 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse:
let json =
%RestRepoStore(
totalBlocks: repoStore.totalBlocks,
totalBlocks: repoStore.storageStats.totalBlocks,
quotaMaxBytes: repoStore.quotaMaxBytes,
quotaUsedBytes: repoStore.quotaUsedBytes,
quotaReservedBytes: repoStore.quotaReservedBytes,

View File

@ -87,6 +87,14 @@ method putCidAndProof*(
raiseAssert("putCidAndProof not implemented!")
method putCidAndProofBatch*(
self: BlockStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof]
): Future[?!void] {.base, gcsafe.} =
## Put a batch of block proofs to the blockstore
##
raiseAssert("putCidAndProofBatch not implemented!")
method getCidAndProof*(
self: BlockStore, treeCid: Cid, index: Natural
): Future[?!(Cid, CodexProof)] {.base, gcsafe.} =

View File

@ -69,6 +69,11 @@ method putBlock*(
await self.engine.resolveBlocks(@[blk])
return success()
method putCidAndProofBatch*(
self: NetworkStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof]
): Future[?!void] =
self.localStore.putCidAndProofBatch(treeCid, blkCids, proofs)
method putCidAndProof*(
self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof
): Future[?!void] =

View File

@ -19,10 +19,10 @@ import ../../errors
import ../../merkletree
import ../../utils/json
proc encode*(t: QuotaUsage): seq[byte] =
proc encode*(t: StorageStats): seq[byte] =
t.toJson().toBytes()
proc decode*(T: type QuotaUsage, bytes: seq[byte]): ?!T =
proc decode*(T: type StorageStats, bytes: seq[byte]): ?!T =
T.fromJson(bytes)
proc encode*(t: BlockMetadata): seq[byte] =

View File

@ -82,57 +82,97 @@ proc getLeafMetadata*(
success(leafMd)
proc updateTotalBlocksCount*(
self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0
): Future[?!void] {.async.} =
await self.metaDs.modify(
CodexTotalBlocksKey,
proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
let count: Natural =
if currCount =? maybeCurrCount:
currCount + plusCount - minusCount
else:
plusCount - minusCount
self.totalBlocks = count
codex_repostore_blocks.set(count.int64)
count.some,
)
proc updateQuotaUsage*(
proc updateQuotaAndBlockCount*(
self: RepoStore,
plusCount: Natural = 0,
minusCount: Natural = 0,
plusUsed: NBytes = 0.NBytes,
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes,
): Future[?!void] {.async.} =
await self.metaDs.modify(
QuotaUsedKey,
proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
var usage: QuotaUsage
if currUsage =? maybeCurrUsage:
usage = QuotaUsage(
used: currUsage.used + plusUsed - minusUsed,
reserved: currUsage.reserved + plusReserved - minusReserved,
CodexTotalBlocksKey,
proc(maybeCurrStats: ?StorageStats): Future[?StorageStats] {.async.} =
var stats: StorageStats
if currStats =? maybeCurrStats:
stats = StorageStats(
quotaUsed: currStats.quotaUsed + plusUsed - minusUsed,
quotaReserved: currStats.quotaReserved + plusReserved - minusReserved,
totalBlocks: currStats.totalBlocks + plusCount - minusCount,
)
else:
usage =
QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)
stats = StorageStats(
quotaUsed: plusUsed - minusUsed,
quotaReserved: plusReserved - minusReserved,
totalBlocks: plusCount - minusCount,
)
if usage.used + usage.reserved > self.quotaMaxBytes:
if stats.quotaUsed + stats.quotaReserved > self.quotaMaxBytes:
raise newException(
QuotaNotEnoughError,
"Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
$usage.reserved & ", limit: " & $self.quotaMaxBytes,
"Quota usage would exceed the limit. Used: " & $stats.quotaUsed &
", reserved: " & $stats.quotaReserved & ", limit: " & $self.quotaMaxBytes,
)
else:
self.quotaUsage = usage
codex_repostore_bytes_used.set(usage.used.int64)
codex_repostore_bytes_reserved.set(usage.reserved.int64)
return usage.some,
self.storageStats = stats
codex_repostore_bytes_used.set(stats.quotaUsed.int64)
codex_repostore_bytes_reserved.set(stats.quotaReserved.int64)
codex_repostore_blocks.set(stats.totalBlocks.int64)
return stats.some,
)
# proc updateTotalBlocksCount*(
# self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0
# ): Future[?!void] {.async.} =
# await self.metaDs.modify(
# CodexTotalBlocksKey,
# proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
# let count: Natural =
# if currCount =? maybeCurrCount:
# currCount + plusCount - minusCount
# else:
# plusCount - minusCount
# self.totalBlocks = count
# codex_repostore_blocks.set(count.int64)
# count.some,
# )
# proc updateQuotaUsage*(
# self: RepoStore,
# plusUsed: NBytes = 0.NBytes,
# minusUsed: NBytes = 0.NBytes,
# plusReserved: NBytes = 0.NBytes,
# minusReserved: NBytes = 0.NBytes,
# ): Future[?!void] {.async.} =
# await self.metaDs.modify(
# QuotaUsedKey,
# proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =
# var usage: QuotaUsage
# if currUsage =? maybeCurrUsage:
# usage = QuotaUsage(
# used: currUsage.used + plusUsed - minusUsed,
# reserved: currUsage.reserved + plusReserved - minusReserved,
# )
# else:
# usage =
# QuotaUsage(used: plusUsed - minusUsed, reserved: plusReserved - minusReserved)
# if usage.used + usage.reserved > self.quotaMaxBytes:
# raise newException(
# QuotaNotEnoughError,
# "Quota usage would exceed the limit. Used: " & $usage.used & ", reserved: " &
# $usage.reserved & ", limit: " & $self.quotaMaxBytes,
# )
# else:
# self.quotaUsage = usage
# codex_repostore_bytes_used.set(usage.used.int64)
# codex_repostore_bytes_reserved.set(usage.reserved.int64)
# return usage.some,
# )
proc updateBlockMetadata*(
self: RepoStore,
cid: Cid,

View File

@ -114,12 +114,64 @@ method ensureExpiry*(
await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProofBatch*(
self: RepoStore, treeCid: Cid, blkCids: seq[Cid], proofs: seq[CodexProof]
): Future[?!void] {.async.} =
var
batch = newSeq[BatchEntry]()
results = newSeq[StoreResult](blkCids.len)
lock = self.locks.mgetOrPut(treeCid, newAsyncLock())
batchSize = 50
try:
await lock.acquire()
for i, cid in blkCids:
without key =? createBlockCidAndProofMetadataKey(treeCid, i), err:
return failure(err)
# Check existence before adding to batch
if exists =? await self.metaDs.has(key):
results[i] = StoreResult(kind: AlreadyInStore)
else:
results[i] = StoreResult(kind: Stored)
let metadata = LeafMetadata(blkCid: cid, proof: proofs[i])
batch.add((key: key, data: metadata.encode))
if batch.len >= batchSize:
try:
if err =? (await self.metaDs.ds.put(batch)).errorOption:
return failure(err)
except CatchableError as e:
return failure(e.msg)
batch = newSeq[BatchEntry]()
if batch.len > 0:
try:
if err =? (await self.metaDs.ds.put(batch)).errorOption:
return failure(err)
except CatchableError as e:
return failure(e.msg)
finally:
lock.release()
if not lock.locked:
self.locks.del(treeCid)
for i, res in results:
if res.kind == Stored:
if err =?
(await self.updateBlockMetadata(blkCids[i], plusRefCount = 1)).errorOption:
return failure(err)
trace "Leaf metadata stored, block refCount incremented"
else:
trace "Leaf metadata already exists"
return success()
method putCidAndProof*(
self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
# TODO: Add locking for treeCid
logScope:
treeCid = treeCid
index = index
@ -170,13 +222,19 @@ method putBlock*(
if res.kind == Stored:
trace "Block Stored"
if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
# rollback changes
# if err =? (await self.updateQuotaUsage(plusUsed = res.used)).errorOption:
# # rollback changes
# without delRes =? await self.tryDeleteBlock(blk.cid), err:
# return failure(err)
# return failure(err)
# if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
# return failure(err)
if err =? (await self.updateQuotaAndBlockCount(plusCount = 1, plusUsed = res.used)).errorOption:
without delRes =? await self.tryDeleteBlock(blk.cid), err:
return failure(err)
return failure(err)
if err =? (await self.updateTotalBlocksCount(plusCount = 1)).errorOption:
return failure(err)
if onBlock =? self.onBlockStored:
@ -200,10 +258,15 @@ proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.a
if res.kind == Deleted:
trace "Block deleted"
if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption:
return failure(err)
# if err =? (await self.updateTotalBlocksCount(minusCount = 1)).errorOption:
# return failure(err)
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
# if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
# return failure(err)
if err =? (
await self.updateQuotaAndBlockCount(minusCount = 1, minusUsed = res.released)
).errorOption:
return failure(err)
success(res.kind)
@ -386,7 +449,7 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
trace "Reserving bytes", bytes
await self.updateQuotaUsage(plusReserved = bytes)
await self.updateQuotaAndBlockCount(plusReserved = bytes)
proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
## Release bytes
@ -394,7 +457,7 @@ proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
trace "Releasing bytes", bytes
await self.updateQuotaUsage(minusReserved = bytes)
await self.updateQuotaAndBlockCount(minusReserved = bytes)
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
@ -405,10 +468,7 @@ proc start*(self: RepoStore): Future[void] {.async.} =
return
trace "Starting rep"
if err =? (await self.updateTotalBlocksCount()).errorOption:
raise newException(CodexError, err.msg)
if err =? (await self.updateQuotaUsage()).errorOption:
if err =? (await self.updateQuotaAndBlockCount()).errorOption:
raise newException(CodexError, err.msg)
self.started = true

View File

@ -33,14 +33,15 @@ type
metaDs*: TypedDatastore
clock*: Clock
quotaMaxBytes*: NBytes
quotaUsage*: QuotaUsage
totalBlocks*: Natural
storageStats*: StorageStats
blockTtl*: Duration
started*: bool
locks*: TableRef[Cid, AsyncLock]
QuotaUsage* {.serialize.} = object
used*: NBytes
reserved*: NBytes
StorageStats* {.serialize.} = object
quotaUsed*: NBytes
quotaReserved*: NBytes
totalBlocks*: Natural
BlockMetadata* {.serialize.} = object
expiry*: SecondsSince1970
@ -73,10 +74,10 @@ type
used*: NBytes
func quotaUsedBytes*(self: RepoStore): NBytes =
self.quotaUsage.used
self.storageStats.quotaUsed
func quotaReservedBytes*(self: RepoStore): NBytes =
self.quotaUsage.reserved
self.storageStats.quotaReserved
func totalUsed*(self: RepoStore): NBytes =
(self.quotaUsedBytes + self.quotaReservedBytes)
@ -106,4 +107,5 @@ func new*(
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl,
onBlockStored: CidCallback.none,
locks: newTable[Cid, AsyncLock](),
)

View File

@ -19,8 +19,10 @@ suite "Test coders":
let ordinals = enumRangeInt64(E)
E(ordinals[rand(ordinals.len - 1)])
proc rand(T: type QuotaUsage): T =
QuotaUsage(used: rand(NBytes), reserved: rand(NBytes))
proc rand(T: type StorageStats): T =
StorageStats(
quotaUsed: rand(NBytes), quotaReserved: rand(NBytes), totalBlocks: rand(Natural)
)
proc rand(T: type BlockMetadata): T =
BlockMetadata(
@ -38,10 +40,11 @@ suite "Test coders":
check:
success(val) == Natural.decode(encode(val))
test "QuotaUsage encode/decode":
for val in newSeqWith[QuotaUsage](100, rand(QuotaUsage)):
check:
success(val) == QuotaUsage.decode(encode(val))
test "StorageStats encode/decode":
for val in newSeqWith[StorageStats](100, rand(StorageStats)):
check:
success(val) == StorageStats.decode(encode(val))
test "BlockMetadata encode/decode":
for val in newSeqWith[BlockMetadata](100, rand(BlockMetadata)):