Addressing review comments

This commit is contained in:
Tomasz Bekas 2023-10-23 14:54:48 +02:00
parent ac2fc71c23
commit 252b4451b7
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
11 changed files with 46 additions and 48 deletions

View File

@ -121,7 +121,11 @@ proc stop*(b: BlockExcEngine) {.async.} =
trace "NetworkStore stopped"
proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
proc sendWantHave(
b: BlockExcEngine,
address: BlockAddress,
selectedPeer: BlockExcPeerCtx,
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
trace "Sending wantHave request to peers", address
for p in peers:
if p != selectedPeer:
@ -132,11 +136,10 @@ proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockE
@[address],
wantType = WantType.WantHave) # we only want to know if the peer has the block
proc sendWantBlock(b: BlockExcEngine, address: BlockAddress, blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
let cid = if address.leaf:
address.treeCid
else:
address.cid
proc sendWantBlock(
b: BlockExcEngine,
address: BlockAddress,
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
trace "Sending wantBlock request to", peer = blockPeer.id, address
await b.network.request.sendWantList(
blockPeer.id,
@ -376,12 +379,15 @@ proc blocksDeliveryHandler*(
blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Got blocks from peer", peer, len = blocksDelivery.len
var storedBlocks: seq[BlockDelivery]
for bd in blocksDelivery:
if isErr (await b.localStore.putBlock(bd.blk)):
trace "Unable to store block", cid = bd.blk.cid
if err =? (await b.localStore.putBlock(bd.blk)).errorOption:
trace "Unable to store block", cid = bd.blk.cid, err = err.msg
else:
storedBlocks.add(bd)
await b.resolveBlocks(blocksDelivery)
codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64)
await b.resolveBlocks(storedBlocks)
codexBlockExchangeBlocksReceived.inc(storedBlocks.len.int64)
let
peerCtx = b.peers.get(peer)

View File

@ -59,7 +59,7 @@ type
treeRoot*: MultiHash
treeCid*: Cid
TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.}
TreeHandler* = proc(tree: MerkleTree): Future[void] {.raises:[], gcsafe.}
PendingBlocksManager* = ref object of RootObj
blocks*: Table[Cid, BlockReq] # pending Block requests
@ -107,10 +107,7 @@ proc checkIfAllDelivered(p: PendingBlocksManager, treeReq: TreeReq): void =
p.trees.del(treeReq.treeCid)
return
p.trees.del(treeReq.treeCid)
try:
asyncSpawn p.onTree(tree)
except Exception as err:
error "Exception when handling tree", msg = err.msg
asyncSpawn p.onTree(tree)
proc getWantHandleOrCid*(
treeReq: TreeReq,
@ -221,9 +218,7 @@ proc resolve*(
trace "Block retrieval time", retrievalDurationUs
else:
warn "Delivery cid doesn't match block cid", deliveryCid = bd.address.cid, blockCid = bd.blk.cid
# resolve any pending blocks
if bd.address.leaf:
else: # when block.address.leaf == true
p.trees.withValue(bd.address.treeCid, treeReq):
treeReq.leaves.withValue(bd.address.index, leafReq):
if not leafReq.delivered:
@ -253,7 +248,7 @@ proc resolve*(
else:
warn "Missing proof for a block", address = bd.address
else:
trace "Ignore veryfing proof for already delivered block", address = bd.address
trace "Ignore verifying proof for already delivered block", address = bd.address
proc setInFlight*(p: PendingBlocksManager,
cid: Cid,

View File

@ -134,17 +134,17 @@ proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!C
DagPB = multiCodec("dag-pb")
DagJson = multiCodec("dag-json")
var index {.global, threadvar.}: Table[(CIDv0, Sha256, DagPB), Result[Cid, CidError]]
var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid]
once:
index = {
# source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty
(CIDv0, Sha256, DagPB): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"),
(CIDv1, Sha256, DagPB): Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi"), # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
(CIDv1, Sha256, DagJson): Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP"), # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
(CIDv1, Sha256, Raw): Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW"),
(CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure,
(CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku
(CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta
(CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure,
}.toTable
index[(version, hcodec, dcodec)].catch.flatMap((a: Result[Cid, CidError]) => a.mapFailure)
index[(version, hcodec, dcodec)].catch
proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash =
emptyCid(version, hcodec, dcodec)

View File

@ -231,8 +231,6 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
treeReader = TreeReader.new()
repoData = case config.repoKind
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
.expect("Should create repo file data store!"))
@ -243,7 +241,6 @@ proc new*(
repoDs = repoData,
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!"),
treeReader = treeReader,
quotaMaxBytes = config.storageQuota.uint,
blockTtl = config.blockTtl)

View File

@ -118,16 +118,11 @@ proc fetchBatched*(
return failure(err)
for batchNum in 0..<batchCount:
let blocks = collect:
for i in 0..<batchSize:
if not iter.finished:
iter.next()
# let
# indexRange = (batchNum * batchSize)..<(min((batchNum + 1) * batchSize, manifest.blocksCount))
# blocks = indexRange.mapIt(node.blockStore.getBlock(manifest.treeCid, it))
try:
await allFuturesThrowing(allFinished(blocks))
if not onBatch.isNil:

View File

@ -230,21 +230,17 @@ proc new*(
if cacheSize < chunkSize:
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
var treeReader = TreeReader.new()
let
currentSize = 0'nb
size = int(cacheSize div chunkSize)
cache = newLruCache[Cid, Block](size)
store = CacheStore(
treeReader: treeReader,
cache: cache,
currentSize: currentSize,
size: cacheSize)
proc getBlockFromStore(cid: Cid): Future[?!Block] = store.getBlock(cid)
treeReader.getBlockFromStore = getBlockFromStore
store.treeReader = TreeReader.new(getBlockFromStore)
for blk in blocks:
discard store.putBlockSync(blk)

View File

@ -483,11 +483,10 @@ proc stop*(self: RepoStore): Future[void] {.async.} =
self.started = false
func new*(
proc new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
treeReader: TreeReader = TreeReader.new(),
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
@ -499,13 +498,11 @@ func new*(
let store = RepoStore(
repoDs: repoDs,
metaDs: metaDs,
treeReader: treeReader,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl)
proc getBlockFromStore(cid: Cid): Future[?!Block] = store.getBlock(cid)
treeReader.getBlockFromStore = getBlockFromStore
store.treeReader = TreeReader.new(getBlockFromStore, treeCacheCapacity)
store

View File

@ -97,5 +97,11 @@ proc getBlocks*(self: TreeReader, treeCid: Cid, leavesCount: Natural): Future[?!
)
return success(iter)
proc new*(T: type TreeReader, treeCacheCap = DefaultTreeCacheCapacity): TreeReader =
TreeReader(treeCache: newLruCache[Cid, MerkleTree](treeCacheCap))
func new*(
T: type TreeReader,
getBlockFromStore: GetBlock,
treeCacheCap = DefaultTreeCacheCapacity
): TreeReader {.noSideEffect.} =
TreeReader(
getBlockFromStore: getBlockFromStore,
treeCache: newLruCache[Cid, MerkleTree](treeCacheCap))

View File

@ -1,5 +1,5 @@
## Nim-Dagger
## Copyright (c) 2022 Status Research & Development GmbH
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))

View File

@ -37,7 +37,7 @@ type
store*: BlockStore # Store where to lookup block contents
manifest*: Manifest # List of block CIDs
pad*: bool # Pad last block to manifest.blockSize?
iter*: AsyncIter[?!Block]
iter: AsyncIter[?!Block]
lastBlock: Block
lastIndex: int
offset: int

View File

@ -23,6 +23,12 @@ iterator items*[T](self: Iter[T]): T =
while not self.finished:
yield self.next()
iterator pairs*[T](self: Iter[T]): tuple[key: int, val: T] {.inline.} =
var i = 0
while not self.finished:
yield (i, self.next())
inc(i)
proc map*[T, U](fut: Future[T], fn: Function[T, U]): Future[U] {.async.} =
let t = await fut
fn(t)