Merge branch 'master' into 808-scheduling-prover-on-another-thread

This commit is contained in:
Jaremy Creechley 2024-05-20 21:24:03 +03:00 committed by GitHub
commit 7e6c3a31e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 32 additions and 96 deletions

View File

@ -154,9 +154,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
let
haves = b.peers.peersHave(cid)
trace "Current number of peers for block", cid, peers = haves.len
if haves.len < b.minPeersPerBlock:
trace "Discovering block", cid
try:
let
request = b.discovery
@ -168,7 +166,6 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
let
peers = await request
trace "Discovered peers for block", peers = peers.len, cid
let
dialed = await allFinished(
peers.mapIt( b.network.dialPeer(it.data) ))
@ -189,10 +186,9 @@ proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:
if cid notin b.discoveryQueue:
try:
trace "Queueing find block", cid, queue = b.discoveryQueue.len
b.discoveryQueue.putNoWait(cid)
except CatchableError as exc:
trace "Exception queueing discovery request", exc = exc.msg
warn "Exception queueing discovery request", exc = exc.msg
proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} =
for cid in cids:

View File

@ -125,7 +125,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
proc sendWantHave(
b: BlockExcEngine,
address: BlockAddress,
address: BlockAddress, # pluralize this entire call chain, please
excluded: seq[BlockExcPeerCtx],
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
trace "Sending wantHave request to peers", address
@ -139,7 +139,7 @@ proc sendWantHave(
proc sendWantBlock(
b: BlockExcEngine,
address: BlockAddress,
address: BlockAddress, # pluralize this entire call chain, please
blockPeer: BlockExcPeerCtx): Future[void] {.async.} =
trace "Sending wantBlock request to", peer = blockPeer.id, address
await b.network.request.sendWantList(
@ -154,13 +154,11 @@ proc monitorBlockHandle(
peerId: PeerId) {.async.} =
try:
trace "Monitoring block handle", address, peerId
discard await handle
trace "Block handle success", address, peerId
except CancelledError as exc:
trace "Block handle cancelled", address, peerId
except CatchableError as exc:
trace "Error block handle, disconnecting peer", address, exc = exc.msg, peerId
warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId
# TODO: really, this is just a quick and dirty way of
# preventing hitting the same "bad" peer every time, however,
@ -217,7 +215,6 @@ proc blockPresenceHandler*(
b: BlockExcEngine,
peer: PeerId,
blocks: seq[BlockPresence]) {.async.} =
trace "Received presence update for peer", peer, blocks = blocks.len
let
peerCtx = b.peers.get(peer)
wantList = toSeq(b.pendingBlocks.wantList)
@ -227,12 +224,6 @@ proc blockPresenceHandler*(
for blk in blocks:
if presence =? Presence.init(blk):
logScope:
address = $presence.address
have = presence.have
price = presence.price
trace "Updating presence"
peerCtx.setPresence(presence)
let
@ -323,14 +314,12 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
proc payForBlocks(engine: BlockExcEngine,
peer: BlockExcPeerCtx,
blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Paying for blocks", len = blocksDelivery.len
let
sendPayment = engine.network.request.sendPayment
price = peer.price(blocksDelivery.mapIt(it.address))
if payment =? engine.wallet.pay(peer, price):
trace "Sending payment for blocks", price
trace "Sending payment for blocks", price, len = blocksDelivery.len
await sendPayment(peer.id, payment)
proc validateBlockDelivery(
@ -365,7 +354,7 @@ proc blocksDeliveryHandler*(
b: BlockExcEngine,
peer: PeerId,
blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Got blocks from peer", peer, len = blocksDelivery.len
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",")
var validatedBlocksDelivery: seq[BlockDelivery]
for bd in blocksDelivery:
@ -411,7 +400,6 @@ proc wantListHandler*(
b: BlockExcEngine,
peer: PeerId,
wantList: WantList) {.async.} =
trace "Got wantList for peer", peer, items = wantList.entries.len
let
peerCtx = b.peers.get(peer)
if isNil(peerCtx):
@ -430,8 +418,6 @@ proc wantListHandler*(
wantType = $e.wantType
if idx < 0: # updating entry
trace "Processing new want list entry"
let
have = await e.address in b.localStore
price = @(
@ -442,41 +428,35 @@ proc wantListHandler*(
codex_block_exchange_want_have_lists_received.inc()
if not have and e.sendDontHave:
trace "Adding dont have entry to presence response"
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.DontHave,
price: price))
elif have and e.wantType == WantType.WantHave:
trace "Adding have entry to presence response"
presence.add(
BlockPresence(
address: e.address,
`type`: BlockPresenceType.Have,
price: price))
elif e.wantType == WantType.WantBlock:
trace "Added entry to peer's want blocks list"
peerCtx.peerWants.add(e)
codex_block_exchange_want_block_lists_received.inc()
else:
# peer doesn't want this block anymore
if e.cancel:
trace "Removing entry from peer want list"
peerCtx.peerWants.del(idx)
else:
trace "Updating entry in peer want list"
# peer might want to ask for the same cid with
# different want params
peerCtx.peerWants[idx] = e # update entry
if presence.len > 0:
trace "Sending presence to remote", items = presence.len
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await b.network.request.sendPresence(peer, presence)
trace "Scheduling a task to check want-list", peer
if not b.scheduleTask(peerCtx):
trace "Unable to schedule task for peer", peer
warn "Unable to schedule task for peer", peer
proc accountHandler*(
engine: BlockExcEngine,
@ -541,8 +521,6 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerId) =
b.peers.remove(peer)
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
trace "Handling task for peer", peer = task.id
# Send to the peer blocks he wants to get,
# if they present in our local store
@ -559,7 +537,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
if peerWant.address in addresses:
peerWant.inFlight = inFlight
trace "wantsBlocks", peer = task.id, n = wantsBlocks.len
if wantsBlocks.len > 0:
# Mark wants as in-flight.
let wantAddresses = wantsBlocks.mapIt(it.address)
@ -567,7 +544,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
wantsBlocks.sort(SortOrder.Descending)
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
trace "Handling lookup for entry", address = e.address
if e.address.leaf:
(await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
(blkAndProof: (Block, CodexProof)) =>
@ -591,7 +567,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
updateInFlight(failedAddresses, false)
if blocksDelivery.len > 0:
trace "Sending blocks to peer", peer = task.id, blocks = blocksDelivery.len
trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",")
await b.network.request.sendBlocksDelivery(
task.id,
blocksDelivery
@ -600,7 +576,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
task.peerWants.keepItIf(it.address notin successAddresses)
trace "Removed entries from peerWants", peerWants = task.peerWants.len
proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
## process tasks
@ -611,7 +586,6 @@ proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
let
peerCtx = await b.taskQueue.pop()
trace "Got new task from queue", peerId = peerCtx.id
await b.taskHandler(peerCtx)
info "Exiting blockexc task runner"

View File

@ -58,15 +58,13 @@ proc getWantHandle*(
inFlight: inFlight,
startTime: getMonoTime().ticks)
trace "Adding pending future for block", address, inFlight = p.blocks[address].inFlight
p.updatePendingBlockGauge()
return await p.blocks[address].handle.wait(timeout)
except CancelledError as exc:
trace "Blocks cancelled", exc = exc.msg, address
raise exc
except CatchableError as exc:
trace "Pending WANT failed or expired", exc = exc.msg
error "Pending WANT failed or expired", exc = exc.msg
# no need to cancel, it is already cancelled by wait()
raise exc
finally:
@ -88,8 +86,6 @@ proc resolve*(
for bd in blocksDelivery:
p.blocks.withValue(bd.address, blockReq):
trace "Resolving block", address = bd.address
if not blockReq.handle.finished:
let
startTime = blockReq.startTime
@ -99,7 +95,9 @@ proc resolve*(
blockReq.handle.complete(bd.blk)
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
trace "Block retrieval time", retrievalDurationUs, address = bd.address
if retrievalDurationUs > 500000:
warn "High block retrieval time", retrievalDurationUs, address = bd.address
else:
trace "Block handle already finished", address = bd.address
@ -112,7 +110,6 @@ proc setInFlight*(
p.blocks.withValue(address, pending):
pending[].inFlight = inFlight
trace "Setting inflight", address, inFlight = pending[].inFlight
proc isInFlight*(
p: PendingBlocksManager,
@ -122,7 +119,6 @@ proc isInFlight*(
p.blocks.withValue(address, pending):
result = pending[].inFlight
trace "Getting inflight", address, inFlight = result
proc contains*(p: PendingBlocksManager, cid: Cid): bool =
BlockAddress.init(cid) in p.blocks

View File

@ -96,7 +96,6 @@ proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
b.peers.withValue(id, peer):
try:
await b.inflightSema.acquire()
trace "Sending message to peer", peer = id
await peer[].send(msg)
except CatchableError as err:
error "Error sending message", peer = id, msg = err.msg
@ -113,7 +112,6 @@ proc handleWantList(
##
if not b.handlers.onWantList.isNil:
trace "Handling want list for peer", peer = peer.id, items = list.entries.len
await b.handlers.onWantList(peer.id, list)
proc sendWantList*(
@ -128,7 +126,6 @@ proc sendWantList*(
## Send a want message to peer
##
trace "Sending want list to peer", peer = id, `type` = $wantType, items = addresses.len
let msg = WantList(
entries: addresses.mapIt(
WantListEntry(
@ -157,7 +154,6 @@ proc handleBlocksDelivery(
##
if not b.handlers.onBlocksDelivery.isNil:
trace "Handling blocks for peer", peer = peer.id, items = blocksDelivery.len
await b.handlers.onBlocksDelivery(peer.id, blocksDelivery)
@ -178,7 +174,6 @@ proc handleBlockPresence(
##
if not b.handlers.onPresence.isNil:
trace "Handling block presence for peer", peer = peer.id, items = presence.len
await b.handlers.onPresence(peer.id, presence)
proc sendBlockPresence*(

View File

@ -45,7 +45,6 @@ proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
let
data = await conn.readLp(MaxMessageSize.int)
msg = Message.protobufDecode(data).mapFailure().tryGet()
trace "Got message for peer", peer = b.id
await b.handler(b, msg)
except CatchableError as err:
warn "Exception in blockexc read loop", msg = err.msg
@ -64,10 +63,9 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
let conn = await b.connect()
if isNil(conn):
trace "Unable to get send connection for peer message not sent", peer = b.id
warn "Unable to get send connection for peer message not sent", peer = b.id
return
trace "Sending message to remote", peer = b.id
await conn.writeLp(protobufEncode(msg))
proc broadcast*(b: NetworkPeer, msg: Message) =
@ -75,7 +73,7 @@ proc broadcast*(b: NetworkPeer, msg: Message) =
try:
await b.send(msg)
except CatchableError as exc:
trace "Exception broadcasting message to peer", peer = b.id, exc = exc.msg
warn "Exception broadcasting message to peer", peer = b.id, exc = exc.msg
asyncSpawn sendAwaiter()

View File

@ -25,9 +25,6 @@ import ../../logutils
export payments, nitro
logScope:
topics = "codex peercontext"
type
BlockExcPeerCtx* = ref object of RootObj
id*: PeerId
@ -66,5 +63,4 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 =
self.blocks.withValue(a, precense):
price += precense[].price
trace "Blocks price", price
price

View File

@ -47,15 +47,12 @@ func contains*(self: PeerCtxStore, peerId: PeerId): bool =
peerId in self.peers
func add*(self: PeerCtxStore, peer: BlockExcPeerCtx) =
trace "Adding peer to peer context store", peer = peer.id
self.peers[peer.id] = peer
func remove*(self: PeerCtxStore, peerId: PeerId) =
trace "Removing peer from peer context store", peer = peerId
self.peers.del(peerId)
func get*(self: PeerCtxStore, peerId: PeerId): BlockExcPeerCtx =
trace "Retrieving peer from peer context store", peer = peerId
self.peers.getOrDefault(peerId, nil)
func len*(self: PeerCtxStore): int =

View File

@ -73,16 +73,14 @@ method find*(
cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
## Find block providers
##
trace "Finding providers for block", cid
without providers =?
(await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
trace "Error finding providers for block", cid, error = error.msg
warn "Error finding providers for block", cid, error = error.msg
return providers.filterIt( not (it.data.peerId == d.peerId) )
method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid
## Provide a block Cid
##
let
nodes = await d.protocol.addProvider(

View File

@ -95,15 +95,14 @@ proc retrieveCid(
break
bytes += buff.len
trace "Sending chunk", size = buff.len
await resp.sendChunk(addr buff[0], buff.len)
await resp.finish()
codex_api_downloads.inc()
except CatchableError as exc:
trace "Excepting streaming blocks", exc = exc.msg
warn "Excepting streaming blocks", exc = exc.msg
return RestApiResponse.error(Http500)
finally:
trace "Sent bytes", cid = cid, bytes
info "Sent bytes", cid = cid, bytes
if not stream.isNil:
await stream.close()

View File

@ -34,17 +34,13 @@ type
localStore*: BlockStore # local block store
method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} =
trace "Getting block from local store or network", address
without blk =? (await self.localStore.getBlock(address)), err:
if not (err of BlockNotFoundError):
trace "Error getting block from local store", address, err = err.msg
error "Error getting block from local store", address, err = err.msg
return failure err
trace "Block not in local store", address, err = err.msg
without newBlock =? (await self.engine.requestBlock(address)), err:
trace "Unable to get block from exchange engine", address, err = err.msg
error "Unable to get block from exchange engine", address, err = err.msg
return failure err
return success newBlock

View File

@ -139,10 +139,9 @@ method getCidAndProof*(
return failure(err)
without (cid, proof) =? (Cid, CodexProof).decode(value), err:
trace "Unable to decode cid and proof", err = err.msg
error "Unable to decode cid and proof", err = err.msg
return failure(err)
trace "Got cid and proof for block", cid, proof = $proof
return success (cid, proof)
method getCid*(
@ -154,10 +153,11 @@ method getCid*(
without value =? await self.metaDs.get(key), err:
if err of DatastoreKeyNotFound:
trace "Cid not found", treeCid, index
# This failure is expected to happen frequently:
# NetworkStore.getBlock will call RepoStore.getBlock before starting the block exchange engine.
return failure(newException(BlockNotFoundError, err.msg))
else:
trace "Error getting cid from datastore", err = err.msg, key
error "Error getting cid from datastore", err = err.msg, key
return failure(err)
return (Cid, CodexProof).decodeCid(value)
@ -170,21 +170,19 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
cid = cid
if cid.isEmpty:
trace "Empty block, ignoring"
return cid.emptyBlock
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
error "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
trace "Error getting block from datastore", err = err.msg, key
error "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)

View File

@ -82,7 +82,6 @@ method readOnce*(
## Raise exception if we are already at EOF.
##
trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.blocksCount
if self.atEof:
raise newLPStreamEOFError()
@ -104,7 +103,7 @@ method readOnce*(
without blk =? await self.store.getBlock(address), error:
raise newLPStreamReadError(error)
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
trace "Reading bytes from store stream", manifestCid = self.manifest.cid.get(), numBlocks = self.manifest.blocksCount, blockNum, blkCid = blk.cid, bytes = readBytes, blockOffset
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
if blk.isEmpty:

View File

@ -34,12 +34,6 @@ proc new*(
proc slots*(validation: Validation): seq[SlotId] =
validation.slots.toSeq
proc iterateSlots(validation: Validation, action: proc(s: SlotId): Future[void] {.async.}) {.async.} =
# Copy of hashSet, for iteration.
let slots = validation.slots
for slotId in slots:
await action(slotId)
proc getCurrentPeriod(validation: Validation): UInt256 =
return validation.periodicity.periodOf(validation.clock.now().u256)
@ -61,12 +55,12 @@ proc subscribeSlotFilled(validation: Validation) {.async.} =
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
var ended: HashSet[SlotId]
proc onSlot(slotId: SlotId) {.async.} =
let slots = validation.slots
for slotId in slots:
let state = await validation.market.slotState(slotId)
if state != SlotState.Filled:
trace "Removing slot", slotId
ended.incl(slotId)
await validation.iterateSlots(onSlot)
validation.slots.excl(ended)
proc markProofAsMissing(validation: Validation,
@ -88,10 +82,10 @@ proc markProofAsMissing(validation: Validation,
error "Marking proof as missing failed", msg = e.msg
proc markProofsAsMissing(validation: Validation) {.async.} =
proc onSlot(slotId: SlotId) {.async.} =
let slots = validation.slots
for slotId in slots:
let previousPeriod = validation.getCurrentPeriod() - 1
await validation.markProofAsMissing(slotId, previousPeriod)
await validation.iterateSlots(onSlot)
proc run(validation: Validation) {.async.} =
trace "Validation started"