mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-02-25 07:53:19 +00:00
fix race conditions
This commit is contained in:
parent
8ce9224275
commit
3848f7b8d3
@ -122,6 +122,7 @@ method putCidAndProofBatch*(
|
||||
results = newSeq[StoreResult](blkCids.len)
|
||||
lock = self.locks.mgetOrPut(treeCid, newAsyncLock())
|
||||
batchSize = 50
|
||||
|
||||
try:
|
||||
await lock.acquire()
|
||||
for i, cid in blkCids:
|
||||
@ -129,13 +130,14 @@ method putCidAndProofBatch*(
|
||||
return failure(err)
|
||||
|
||||
# Check existence before adding to batch
|
||||
if exists =? await self.metaDs.has(key):
|
||||
without exists =? await self.metaDs.has(key), err:
|
||||
return failure(err)
|
||||
if exists:
|
||||
results[i] = StoreResult(kind: AlreadyInStore)
|
||||
else:
|
||||
results[i] = StoreResult(kind: Stored)
|
||||
|
||||
let metadata = LeafMetadata(blkCid: cid, proof: proofs[i])
|
||||
batch.add((key: key, data: metadata.encode))
|
||||
var metadata = LeafMetadata(blkCid: cid, proof: proofs[i])
|
||||
batch.add((key: key, data: metadata.encode))
|
||||
|
||||
if batch.len >= batchSize:
|
||||
try:
|
||||
@ -178,17 +180,33 @@ method putCidAndProof*(
|
||||
blkCid = blkCid
|
||||
|
||||
trace "Storing LeafMetadata"
|
||||
var lock = self.locks.mgetOrPut(treeCid, newAsyncLock())
|
||||
try:
|
||||
await lock.acquire()
|
||||
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
|
||||
return failure(err)
|
||||
|
||||
without res =? await self.putLeafMetadata(treeCid, index, blkCid, proof), err:
|
||||
return failure(err)
|
||||
without exists =? await self.metaDs.has(key), err:
|
||||
return failure(err)
|
||||
|
||||
if blkCid.mcodec == BlockCodec:
|
||||
if res == Stored:
|
||||
if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption:
|
||||
return failure(err)
|
||||
trace "Leaf metadata stored, block refCount incremented"
|
||||
else:
|
||||
if exists:
|
||||
trace "Leaf metadata already exists"
|
||||
return success()
|
||||
else:
|
||||
var metadata = LeafMetadata(blkCid: blkCid, proof: proof)
|
||||
if err =? (await self.metaDs.put(key, metadata)).errorOption:
|
||||
return failure(err)
|
||||
|
||||
if blkCid.mcodec == BlockCodec:
|
||||
if err =? (await self.updateBlockMetadata(blkCid, plusRefCount = 1)).errorOption:
|
||||
echo "updateBlockMetadata failed"
|
||||
return failure(err)
|
||||
|
||||
trace "Leaf metadata stored, block refCount incremented"
|
||||
finally:
|
||||
lock.release()
|
||||
if not lock.locked:
|
||||
self.locks.del(treeCid)
|
||||
|
||||
return success()
|
||||
|
||||
@ -211,7 +229,6 @@ method putBlock*(
|
||||
): Future[?!void] {.async.} =
|
||||
## Put a block to the blockstore
|
||||
##
|
||||
|
||||
logScope:
|
||||
cid = blk.cid
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user