From a0b12e85bfc4954f68d7ee012f0debab6a4f0a72 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 16 May 2024 19:06:12 +0200 Subject: [PATCH 1/2] Update logging for download (#799) * Updates logging for file upload * Restores trace for placing block and proof in repo store * Reduces logging while transmitting blocks * unnecessary formatter * Clean up some more download related traces * much better * Review comment by dryajov --- codex/blockexchange/engine/discovery.nim | 6 +-- codex/blockexchange/engine/engine.nim | 42 ++++---------------- codex/blockexchange/engine/pendingblocks.nim | 12 ++---- codex/blockexchange/network/network.nim | 5 --- codex/blockexchange/network/networkpeer.nim | 6 +-- codex/blockexchange/peers/peercontext.nim | 4 -- codex/blockexchange/peers/peerctxstore.nim | 3 -- codex/discovery.nim | 6 +-- codex/rest/api.nim | 5 +-- codex/stores/networkstore.nim | 8 +--- codex/stores/repostore.nim | 14 +++---- codex/streams/storestream.nim | 3 +- 12 files changed, 28 insertions(+), 86 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index eb68bce8..2771d52c 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -154,9 +154,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = let haves = b.peers.peersHave(cid) - trace "Current number of peers for block", cid, peers = haves.len if haves.len < b.minPeersPerBlock: - trace "Discovering block", cid try: let request = b.discovery @@ -168,7 +166,6 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = let peers = await request - trace "Discovered peers for block", peers = peers.len, cid let dialed = await allFinished( peers.mapIt( b.network.dialPeer(it.data) )) @@ -189,10 +186,9 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: if cid notin b.discoveryQueue: try: - trace "Queueing find block", cid, queue = b.discoveryQueue.len b.discoveryQueue.putNoWait(cid) except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg + warn "Exception queueing discovery request", exc = exc.msg proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index a4565b10..e34d3e93 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -125,7 +125,7 @@ proc stop*(b: BlockExcEngine) {.async.} = proc sendWantHave( b: BlockExcEngine, - address: BlockAddress, + address: BlockAddress, # pluralize this entire call chain, please excluded: seq[BlockExcPeerCtx], peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = trace "Sending wantHave request to peers", address @@ -139,7 +139,7 @@ proc sendWantHave( proc sendWantBlock( b: BlockExcEngine, - address: BlockAddress, + address: BlockAddress, # pluralize this entire call chain, please blockPeer: BlockExcPeerCtx): Future[void] {.async.} = trace "Sending wantBlock request to", peer = blockPeer.id, address await b.network.request.sendWantList( @@ -154,13 +154,11 @@ proc monitorBlockHandle( peerId: PeerId) {.async.} = try: - trace "Monitoring block handle", address, peerId discard await handle - trace "Block handle success", address, peerId except CancelledError as exc: trace "Block handle cancelled", address, peerId except CatchableError as exc: - trace "Error block handle, disconnecting peer", address, exc = exc.msg, peerId + warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId # TODO: really, this is just a quick and dirty way of # preventing hitting the same "bad" peer every time, however, @@ -217,7 +215,6 @@ proc blockPresenceHandler*( b: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]) {.async.} = - trace "Received presence update for peer", peer, blocks = blocks.len let peerCtx = b.peers.get(peer) wantList = toSeq(b.pendingBlocks.wantList) @@ -227,12 +224,6 @@ proc blockPresenceHandler*( for blk in blocks: if presence =? Presence.init(blk): - logScope: - address = $presence.address - have = presence.have - price = presence.price - - trace "Updating presence" peerCtx.setPresence(presence) let @@ -323,14 +314,12 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Paying for blocks", len = blocksDelivery.len - let sendPayment = engine.network.request.sendPayment price = peer.price(blocksDelivery.mapIt(it.address)) if payment =? engine.wallet.pay(peer, price): - trace "Sending payment for blocks", price + trace "Sending payment for blocks", price, len = blocksDelivery.len await sendPayment(peer.id, payment) proc validateBlockDelivery( @@ -365,7 +354,7 @@ proc blocksDeliveryHandler*( b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Got blocks from peer", peer, len = blocksDelivery.len + trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",") var validatedBlocksDelivery: seq[BlockDelivery] for bd in blocksDelivery: @@ -411,7 +400,6 @@ proc wantListHandler*( b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} = - trace "Got wantList for peer", peer, items = wantList.entries.len let peerCtx = b.peers.get(peer) if isNil(peerCtx): @@ -430,8 +418,6 @@ proc wantListHandler*( wantType = $e.wantType if idx < 0: # updating entry - trace "Processing new want list entry" - let have = await e.address in b.localStore price = @( @@ -442,41 +428,35 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() if not have and e.sendDontHave: - trace "Adding dont have entry to presence response" presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to presence response" presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.Have, price: price)) elif e.wantType == WantType.WantBlock: - trace "Added entry to peer's want blocks list" peerCtx.peerWants.add(e) codex_block_exchange_want_block_lists_received.inc() else: # peer doesn't want this block anymore if e.cancel: - trace "Removing entry from peer want list" peerCtx.peerWants.del(idx) else: - trace "Updating entry in peer want list" # peer might want to ask for the same cid with # different want params peerCtx.peerWants[idx] = e # update entry if presence.len > 0: - trace "Sending presence to remote", items = presence.len + trace "Sending presence to remote", items = presence.mapIt($it).join(",") await b.network.request.sendPresence(peer, presence) - trace "Scheduling a task to check want-list", peer if not b.scheduleTask(peerCtx): - trace "Unable to schedule task for peer", peer + warn "Unable to schedule task for peer", peer proc accountHandler*( engine: BlockExcEngine, @@ -541,8 +521,6 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerId) = b.peers.remove(peer) proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = - trace "Handling task for peer", peer = task.id - # Send to the peer blocks he wants to get, # if they present in our local store @@ -559,7 +537,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = if peerWant.address in addresses: peerWant.inFlight = inFlight - trace "wantsBlocks", peer = task.id, n = wantsBlocks.len if wantsBlocks.len > 0: # Mark wants as in-flight. let wantAddresses = wantsBlocks.mapIt(it.address) @@ -567,7 +544,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = wantsBlocks.sort(SortOrder.Descending) proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = - trace "Handling lookup for entry", address = e.address if e.address.leaf: (await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( (blkAndProof: (Block, CodexProof)) => @@ -591,7 +567,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = updateInFlight(failedAddresses, false) if blocksDelivery.len > 0: - trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len + trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",") await b.network.request.sendBlocksDelivery( task.id, blocksDelivery @@ -600,7 +576,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) task.peerWants.keepItIf(it.address notin successAddresses) - trace "Removed entries from peerWants", peerWants = task.peerWants.len proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = ## process tasks @@ -611,7 +586,6 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = let peerCtx = await b.taskQueue.pop() - trace "Got new task from queue", peerId = peerCtx.id await b.taskHandler(peerCtx) info "Exiting blockexc task runner" diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index 0f8cd439..9c5efc0b 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -58,15 +58,13 @@ proc getWantHandle*( inFlight: inFlight, startTime: getMonoTime().ticks) - trace "Adding pending future for block", address, inFlight = p.blocks[address].inFlight - p.updatePendingBlockGauge() return await p.blocks[address].handle.wait(timeout) except CancelledError as exc: trace "Blocks cancelled", exc = exc.msg, address raise exc except CatchableError as exc: - trace "Pending WANT failed or expired", exc = exc.msg + error "Pending WANT failed or expired", exc = exc.msg # no need to cancel, it is already cancelled by wait() raise exc finally: @@ -88,8 +86,6 @@ proc resolve*( for bd in blocksDelivery: p.blocks.withValue(bd.address, blockReq): - trace "Resolving block", address = bd.address - if not blockReq.handle.finished: let startTime = blockReq.startTime @@ -99,7 +95,9 @@ proc resolve*( blockReq.handle.complete(bd.blk) codex_block_exchange_retrieval_time_us.set(retrievalDurationUs) - trace "Block retrieval time", retrievalDurationUs, address = bd.address + + if retrievalDurationUs > 500000: + warn "High block retrieval time", retrievalDurationUs, address = bd.address else: trace "Block handle already finished", address = bd.address @@ -112,7 +110,6 @@ proc setInFlight*( p.blocks.withValue(address, pending): pending[].inFlight = inFlight - trace "Setting inflight", address, inFlight = pending[].inFlight proc isInFlight*( p: PendingBlocksManager, @@ -122,7 +119,6 @@ proc isInFlight*( p.blocks.withValue(address, pending): result = pending[].inFlight - trace "Getting inflight", address, inFlight = result proc contains*(p: PendingBlocksManager, cid: Cid): bool = BlockAddress.init(cid) in p.blocks diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index d193779d..f64cf8cc 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -96,7 +96,6 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = b.peers.withValue(id, peer): try: await b.inflightSema.acquire() - trace "Sending message to peer", peer = id await peer[].send(msg) except CatchableError as err: error "Error sending message", peer = id, msg = err.msg @@ -113,7 +112,6 @@ proc handleWantList( ## if not b.handlers.onWantList.isNil: - trace "Handling want list for peer", peer = peer.id, items = list.entries.len await b.handlers.onWantList(peer.id, list) proc sendWantList*( @@ -128,7 +126,6 @@ proc sendWantList*( ## Send a want message to peer ## - trace "Sending want list to peer", peer = id, `type` = $wantType, items = addresses.len let msg = WantList( entries: addresses.mapIt( WantListEntry( @@ -157,7 +154,6 @@ proc handleBlocksDelivery( ## if not b.handlers.onBlocksDelivery.isNil: - trace "Handling blocks for peer", peer = peer.id, items = blocksDelivery.len await b.handlers.onBlocksDelivery(peer.id, blocksDelivery) @@ -178,7 +174,6 @@ proc handleBlockPresence( ## if not b.handlers.onPresence.isNil: - trace "Handling block presence for peer", peer = peer.id, items = presence.len await b.handlers.onPresence(peer.id, presence) proc sendBlockPresence*( diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 69a19386..f9ff0b25 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -45,7 +45,6 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = let data = await conn.readLp(MaxMessageSize.int) msg = Message.protobufDecode(data).mapFailure().tryGet() - trace "Got message for peer", peer = b.id await b.handler(b, msg) except CatchableError as err: warn "Exception in blockexc read loop", msg = err.msg @@ -64,10 +63,9 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} = let conn = await b.connect() if isNil(conn): - trace "Unable to get send connection for peer message not sent", peer = b.id + warn "Unable to get send connection for peer message not sent", peer = b.id return - trace "Sending message to remote", peer = b.id await conn.writeLp(protobufEncode(msg)) proc broadcast*(b: NetworkPeer, msg: Message) = @@ -75,7 +73,7 @@ proc broadcast*(b: NetworkPeer, msg: Message) = try: await b.send(msg) except CatchableError as exc: - trace "Exception broadcasting message to peer", peer = b.id, exc = exc.msg + warn "Exception broadcasting message to peer", peer = b.id, exc = exc.msg asyncSpawn sendAwaiter() diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index ef8fa64c..727676de 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -25,9 +25,6 @@ import ../../logutils export payments, nitro -logScope: - topics = "codex peercontext" - type BlockExcPeerCtx* = ref object of RootObj id*: PeerId @@ -66,5 +63,4 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 = self.blocks.withValue(a, precense): price += precense[].price - trace "Blocks price", price price diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index b07265b4..a64ecd22 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -47,15 +47,12 @@ func contains*(self: PeerCtxStore, peerId: PeerId): bool = peerId in self.peers func add*(self: PeerCtxStore, peer: BlockExcPeerCtx) = - trace "Adding peer to peer context store", peer = peer.id self.peers[peer.id] = peer func remove*(self: PeerCtxStore, peerId: PeerId) = - trace "Removing peer from peer context store", peer = peerId self.peers.del(peerId) func get*(self: PeerCtxStore, peerId: PeerId): BlockExcPeerCtx = - trace "Retrieving peer from peer context store", peer = peerId self.peers.getOrDefault(peerId, nil) func len*(self: PeerCtxStore): int = diff --git a/codex/discovery.nim b/codex/discovery.nim index d3cefe05..47ac950d 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -73,16 +73,14 @@ method find*( cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = ## Find block providers ## - - trace "Finding providers for block", cid without providers =? (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error: - trace "Error finding providers for block", cid, error = error.msg + warn "Error finding providers for block", cid, error = error.msg return providers.filterIt( not (it.data.peerId == d.peerId) ) method provide*(d: Discovery, cid: Cid) {.async, base.} = - ## Provide a bock Cid + ## Provide a block Cid ## let nodes = await d.protocol.addProvider( diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 5478516d..1cdc8ab9 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -95,15 +95,14 @@ proc retrieveCid( break bytes += buff.len - trace "Sending chunk", size = buff.len await resp.sendChunk(addr buff[0], buff.len) await resp.finish() codex_api_downloads.inc() except CatchableError as exc: - trace "Excepting streaming blocks", exc = exc.msg + warn "Excepting streaming blocks", exc = exc.msg return RestApiResponse.error(Http500) finally: - trace "Sent bytes", cid = cid, bytes + info "Sent bytes", cid = cid, bytes if not stream.isNil: await stream.close() diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index ce0761d6..40758b94 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -34,17 +34,13 @@ type localStore*: BlockStore # local block store method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} = - trace "Getting block from local store or network", address - without blk =? (await self.localStore.getBlock(address)), err: if not (err of BlockNotFoundError): - trace "Error getting block from local store", address, err = err.msg + error "Error getting block from local store", address, err = err.msg return failure err - trace "Block not in local store", address, err = err.msg - without newBlock =? (await self.engine.requestBlock(address)), err: - trace "Unable to get block from exchange engine", address, err = err.msg + error "Unable to get block from exchange engine", address, err = err.msg return failure err return success newBlock diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 5cabc726..c129710f 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -139,10 +139,9 @@ method getCidAndProof*( return failure(err) without (cid, proof) =? (Cid, CodexProof).decode(value), err: - trace "Unable to decode cid and proof", err = err.msg + error "Unable to decode cid and proof", err = err.msg return failure(err) - trace "Got cid and proof for block", cid, proof = $proof return success (cid, proof) method getCid*( @@ -154,10 +153,11 @@ method getCid*( without value =? await self.metaDs.get(key), err: if err of DatastoreKeyNotFound: - trace "Cid not found", treeCid, index + # This failure is expected to happen frequently: + # NetworkStore.getBlock will call RepoStore.getBlock before starting the block exchange engine. return failure(newException(BlockNotFoundError, err.msg)) else: - trace "Error getting cid from datastore", err = err.msg, key + error "Error getting cid from datastore", err = err.msg, key return failure(err) return (Cid, CodexProof).decodeCid(value) @@ -170,21 +170,19 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = cid = cid if cid.isEmpty: - trace "Empty block, ignoring" return cid.emptyBlock without key =? makePrefixKey(self.postFixLen, cid), err: - trace "Error getting key from provider", err = err.msg + error "Error getting key from provider", err = err.msg return failure(err) without data =? await self.repoDs.get(key), err: if not (err of DatastoreKeyNotFound): - trace "Error getting block from datastore", err = err.msg, key + error "Error getting block from datastore", err = err.msg, key return failure(err) return failure(newException(BlockNotFoundError, err.msg)) - trace "Got block for cid", cid return Block.new(cid, data, verify = true) diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index a20921dd..ce89171c 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -82,7 +82,6 @@ method readOnce*( ## Raise exception if we are already at EOF. ## - trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount if self.atEof: raise newLPStreamEOFError() @@ -104,7 +103,7 @@ method readOnce*( without blk =? await self.store.getBlock(address), error: raise newLPStreamReadError(error) - trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset + trace "Reading bytes from store stream", manifestCid = self.manifest.cid.get(), numBlocks = self.manifest.blocksCount, blockNum, blkCid = blk.cid, bytes = readBytes, blockOffset # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf if blk.isEmpty: From 8bf44e24ee9d7f582141cee38da7812d3783dde6 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Fri, 17 May 2024 02:57:30 +0200 Subject: [PATCH 2/2] inlines copying of hashset (#809) --- codex/validation.nim | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/codex/validation.nim b/codex/validation.nim index 7b1cdd1a..011c6737 100644 --- a/codex/validation.nim +++ b/codex/validation.nim @@ -34,12 +34,6 @@ proc new*( proc slots*(validation: Validation): seq[SlotId] = validation.slots.toSeq -proc iterateSlots(validation: Validation, action: proc(s: SlotId): Future[void] {.async.}) {.async.} = - # Copy of hashSet, for iteration. - let slots = validation.slots - for slotId in slots: - await action(slotId) - proc getCurrentPeriod(validation: Validation): UInt256 = return validation.periodicity.periodOf(validation.clock.now().u256) @@ -61,12 +55,12 @@ proc subscribeSlotFilled(validation: Validation) {.async.} = proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = var ended: HashSet[SlotId] - proc onSlot(slotId: SlotId) {.async.} = + let slots = validation.slots + for slotId in slots: let state = await validation.market.slotState(slotId) if state != SlotState.Filled: trace "Removing slot", slotId ended.incl(slotId) - await validation.iterateSlots(onSlot) validation.slots.excl(ended) proc markProofAsMissing(validation: Validation, @@ -88,10 +82,10 @@ proc markProofAsMissing(validation: Validation, error "Marking proof as missing failed", msg = e.msg proc markProofsAsMissing(validation: Validation) {.async.} = - proc onSlot(slotId: SlotId) {.async.} = + let slots = validation.slots + for slotId in slots: let previousPeriod = validation.getCurrentPeriod() - 1 await validation.markProofAsMissing(slotId, previousPeriod) - await validation.iterateSlots(onSlot) proc run(validation: Validation) {.async.} = trace "Validation started"