From 252b4451b71965f900e0f91e58ebc210d496f324 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Mon, 23 Oct 2023 14:54:48 +0200 Subject: [PATCH] Addressing review comments --- codex/blockexchange/engine/engine.nim | 26 ++++++++++++-------- codex/blockexchange/engine/pendingblocks.nim | 13 +++------- codex/blocktype.nim | 12 ++++----- codex/codex.nim | 3 --- codex/node.nim | 5 ---- codex/stores/cachestore.nim | 6 +---- codex/stores/repostore.nim | 7 ++---- codex/stores/treereader.nim | 10 ++++++-- codex/streams/seekablestorestream.nim | 4 +-- codex/streams/storestream.nim | 2 +- codex/utils/asynciter.nim | 6 +++++ 11 files changed, 46 insertions(+), 48 deletions(-) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index f5c9b6ca..d72151e8 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -121,7 +121,11 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" -proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockExcPeerCtx, peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = +proc sendWantHave( + b: BlockExcEngine, + address: BlockAddress, + selectedPeer: BlockExcPeerCtx, + peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = trace "Sending wantHave request to peers", address for p in peers: if p != selectedPeer: @@ -132,11 +136,10 @@ proc sendWantHave(b: BlockExcEngine, address: BlockAddress, selectedPeer: BlockE @[address], wantType = WantType.WantHave) # we only want to know if the peer has the block -proc sendWantBlock(b: BlockExcEngine, address: BlockAddress, blockPeer: BlockExcPeerCtx): Future[void] {.async.} = - let cid = if address.leaf: - address.treeCid - else: - address.cid +proc sendWantBlock( + b: BlockExcEngine, + address: BlockAddress, + blockPeer: BlockExcPeerCtx): Future[void] {.async.} = trace "Sending wantBlock request to", peer = blockPeer.id, address await b.network.request.sendWantList( blockPeer.id, @@ -376,12 +379,15 @@ proc blocksDeliveryHandler*( blocksDelivery: seq[BlockDelivery]) {.async.} = trace "Got blocks from peer", peer, len = blocksDelivery.len + var storedBlocks: seq[BlockDelivery] for bd in blocksDelivery: - if isErr (await b.localStore.putBlock(bd.blk)): - trace "Unable to store block", cid = bd.blk.cid + if err =? (await b.localStore.putBlock(bd.blk)).errorOption: + trace "Unable to store block", cid = bd.blk.cid, err = err.msg + else: + storedBlocks.add(bd) - await b.resolveBlocks(blocksDelivery) - codexBlockExchangeBlocksReceived.inc(blocksDelivery.len.int64) + await b.resolveBlocks(storedBlocks) + codexBlockExchangeBlocksReceived.inc(storedBlocks.len.int64) let peerCtx = b.peers.get(peer) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index a99b74f8..81d7f079 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -59,7 +59,7 @@ type treeRoot*: MultiHash treeCid*: Cid - TreeHandler* = proc(tree: MerkleTree): Future[void] {.gcsafe.} + TreeHandler* = proc(tree: MerkleTree): Future[void] {.raises:[], gcsafe.} PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, BlockReq] # pending Block requests @@ -107,10 +107,7 @@ proc checkIfAllDelivered(p: PendingBlocksManager, treeReq: TreeReq): void = p.trees.del(treeReq.treeCid) return p.trees.del(treeReq.treeCid) - try: - asyncSpawn p.onTree(tree) - except Exception as err: - error "Exception when handling tree", msg = err.msg + asyncSpawn p.onTree(tree) proc getWantHandleOrCid*( treeReq: TreeReq, @@ -221,9 +218,7 @@ proc resolve*( trace "Block retrieval time", retrievalDurationUs else: warn "Delivery cid doesn't match block cid", deliveryCid = bd.address.cid, blockCid = bd.blk.cid - - # resolve any pending blocks - if bd.address.leaf: + else: # when block.address.leaf == true p.trees.withValue(bd.address.treeCid, treeReq): treeReq.leaves.withValue(bd.address.index, leafReq): if not leafReq.delivered: @@ -253,7 +248,7 @@ proc resolve*( else: warn "Missing proof for a block", address = bd.address else: - trace "Ignore veryfing proof for already delivered block", address = bd.address + trace "Ignore verifying proof for already delivered block", address = bd.address proc setInFlight*(p: PendingBlocksManager, cid: Cid, diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 3d4c8fff..75b25890 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -134,17 +134,17 @@ proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!C DagPB = multiCodec("dag-pb") DagJson = multiCodec("dag-json") - var index {.global, threadvar.}: Table[(CIDv0, Sha256, DagPB), Result[Cid, CidError]] + var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid] once: index = { # source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty - (CIDv0, Sha256, DagPB): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"), - (CIDv1, Sha256, DagPB): Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi"), # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku - (CIDv1, Sha256, DagJson): Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP"), # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta - (CIDv1, Sha256, Raw): Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW"), + (CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure, + (CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku + (CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta + (CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure, }.toTable - index[(version, hcodec, dcodec)].catch.flatMap((a: Result[Cid, CidError]) => a.mapFailure) + index[(version, hcodec, dcodec)].catch proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash = emptyCid(version, hcodec, dcodec) diff --git a/codex/codex.nim b/codex/codex.nim index f2369c82..0b725bb7 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -231,8 +231,6 @@ proc new*( wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) - treeReader = TreeReader.new() - repoData = case config.repoKind of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5) .expect("Should create repo file data store!")) @@ -243,7 +241,6 @@ proc new*( repoDs = repoData, metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create meta data store!"), - treeReader = treeReader, quotaMaxBytes = config.storageQuota.uint, blockTtl = config.blockTtl) diff --git a/codex/node.nim b/codex/node.nim index 74d3f501..f3888ec0 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -118,16 +118,11 @@ proc fetchBatched*( return failure(err) for batchNum in 0..