Addressing PR comments

This commit is contained in:
Tomasz Bekas 2023-11-09 19:29:00 +01:00
parent 7b3a3a96d9
commit 41b5723554
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
5 changed files with 39 additions and 30 deletions

View File

@ -349,21 +349,24 @@ proc blocksDeliveryHandler*(
var validatedBlocksDelivery: seq[BlockDelivery]
for bd in blocksDelivery:
logScope:
peer = peer
address = bd.address
if err =? b.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", address = bd.address, msg = err.msg
warn "Block validation failed", msg = err.msg
continue
if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
error "Unable to store block", address = bd.address, err = err.msg
error "Unable to store block", err = err.msg
continue
if bd.address.leaf:
without proof =? bd.proof:
error "Proof expected for a leaf block delivery", address = bd.address
error "Proof expected for a leaf block delivery"
continue
if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption:
error "Unable to store proof and cid for a block", address = bd.address
error "Unable to store proof and cid for a block"
continue
validatedBlocksDelivery.add(bd)
@ -398,11 +401,11 @@ proc wantListHandler*(
logScope:
peer = peerCtx.id
# cid = e.cid
address = e.address
wantType = $e.wantType
if idx < 0: # updating entry
trace "Processing new want list entry", address = e.address
trace "Processing new want list entry"
let
have = await e.address in b.localStore
@ -414,21 +417,21 @@ proc wantListHandler*(
codex_block_exchange_want_have_lists_received.inc()
if not have and e.sendDontHave:
trace "Adding dont have entry to presence response", address = e.address
trace "Adding dont have entry to presence response"
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
elif have and e.wantType == WantType.WantHave:
trace "Adding have entry to presence response", address = e.address
trace "Adding have entry to presence response"
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
elif e.wantType == WantType.WantBlock:
trace "Added entry to peer's want blocks list", address = e.address
trace "Added entry to peer's want blocks list"
peerCtx.peerWants.add(e)
codex_block_exchange_want_block_lists_received.inc()
else:

View File

@ -104,8 +104,12 @@ proc getPendingBlocks(
proc genNext(): Future[(?!bt.Block, int)] {.async.} =
let completedFut = await one(pendingBlocks)
pendingBlocks.del(pendingBlocks.find(completedFut))
return await completedFut
if (let i = pendingBlocks.find(completedFut); i >= 0):
pendingBlocks.del(i)
return await completedFut
else:
let (_, index) = await completedFut
raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index)
Iter.new(genNext, isFinished)
@ -128,7 +132,7 @@ proc prepareEncodingData(
for fut in pendingBlocksIter:
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
warn "Failed retreiving a block", idx, treeCid = manifest.treeCid
warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg
continue
let pos = indexToPos(params.steps, idx, step)
@ -180,7 +184,7 @@ proc prepareDecodingData(
let (blkOrErr, idx) = await fut
without blk =? blkOrErr, err:
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid
trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg
continue
let
@ -368,7 +372,6 @@ proc decode*(
data = seq[seq[byte]].new()
parityData = seq[seq[byte]].new()
recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int))
resolved = 0
data[].setLen(encoded.ecK) # set len to K
parityData[].setLen(encoded.ecM) # set len to M

View File

@ -78,26 +78,27 @@ func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] =
## Encodes a tuple of cid and merkle proof in a following format:
## | 8-bytes | n-bytes | remaining bytes |
## | n | cid | proof |
##
## where n is a size of cid
##
let
(cid, proof) = cidAndProof
cidBytes = cid.data.buffer
proofBytes = proof.encode
n = cidBytes.len
nBytes = n.uint64.toBytesBE
var buf = newSeq[byte](1 + cidBytes.len + proofBytes.len)
buf[0] = cid.data.buffer.len.byte # cid shouldnt be more than 255 bytes?
buf[1..cidBytes.len] = cidBytes
buf[cidBytes.len + 1..^1] = proofBytes
buf
@nBytes & cidBytes & proofBytes
proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) =
let cidLen = data[0].int
let
n = uint64.fromBytesBE(data[0..<sizeof(uint64)]).int
let
cid = ? Cid.init(data[1..cidLen]).mapFailure
proof = ? MerkleProof.decode(data[cidLen + 1..^1])
cid = ? Cid.init(data[sizeof(uint64)..<sizeof(uint64) + n]).mapFailure
proof = ? MerkleProof.decode(data[sizeof(uint64) + n..^1])
success((cid, proof))
method putBlockCidAndProof*(
@ -608,7 +609,7 @@ proc stop*(self: RepoStore): Future[void] {.async.} =
self.started = false
proc new*(
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,

View File

@ -24,9 +24,12 @@ asyncchecksuite "Erasure encode/decode":
var erasure: Erasure
setup:
let
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
store = CacheStore.new(cacheSize = (dataSetSize * 8), chunkSize = BlockSize)
store = RepoStore.new(repoDs, metaDs)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
manifest = await storeDataGetManifest(store, chunker)

View File

@ -102,8 +102,7 @@ asyncchecksuite "Test Node":
fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet()
check:
fetched.cid == manifest.cid
# fetched.blocks == manifest.blocks
fetched == manifest
test "Block Batching":
let