Add basic retry functionality (#1119)

* adding basic retry functionality

* avoid duplicate requests and batch them

* fix cancelling blocks

* properly resolve blocks

* minor cleanup - use `self`

* avoid useless asyncSpawn

* track retries

* limit max inflight and set libp2p maxIncomingStreams

* cleanup

* add basic yield in readLoop

* use tuple instead of object

* cleanup imports and logs

* increase defaults

* wip

* fix prefetch batching

* cleanup

* decrease timeouts to speedup tests

* remove outdated test

* add retry tests

* should track retries

* remove useless test

* use correct block address (index was off by 1)

* remove duplicate noop proc

* add BlockHandle type

* Use BlockHandle type

* add fetchLocal to control batching from local store

* add format target

* revert deps

* adjust quotaMaxBytes

* cleanup imports and logs

* revert deps

* cleanup blocks on cancelled

* terminate erasure and prefetch jobs on stream end

* split storing and retrieving data into separate tests

* track `b.discoveryLoop` future

* misc

* remove useless check
This commit is contained in:
Dmitriy Ryajov 2025-02-24 15:01:23 -06:00 committed by GitHub
parent f6aee4ff6e
commit a609baea26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 697 additions and 406 deletions

2
.gitignore vendored
View File

@ -45,3 +45,5 @@ docker/prometheus-data
.DS_Store .DS_Store
nim.cfg nim.cfg
tests/integration/logs tests/integration/logs
data/

View File

@ -229,6 +229,11 @@ nph/%: build-nph
echo -e $(FORMAT_MSG) "nph/$*" && \ echo -e $(FORMAT_MSG) "nph/$*" && \
$(NPH) $* $(NPH) $*
format:
$(NPH) *.nim
$(NPH) codex/
$(NPH) tests/
clean-nph: clean-nph:
rm -f $(NPH) rm -f $(NPH)

View File

@ -144,7 +144,6 @@ proc start*(b: DiscoveryEngine) {.async.} =
b.discoveryLoop = b.discoveryQueueLoop() b.discoveryLoop = b.discoveryQueueLoop()
b.trackedFutures.track(b.discoveryLoop) b.trackedFutures.track(b.discoveryLoop)
asyncSpawn b.discoveryLoop
proc stop*(b: DiscoveryEngine) {.async.} = proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine ## Stop the discovery engine

View File

@ -19,6 +19,7 @@ import pkg/metrics
import pkg/stint import pkg/stint
import pkg/questionable import pkg/questionable
import ../../rng
import ../../stores/blockstore import ../../stores/blockstore
import ../../blocktype import ../../blocktype
import ../../utils import ../../utils
@ -67,12 +68,6 @@ const
DefaultMaxPeersPerRequest* = 10 DefaultMaxPeersPerRequest* = 10
DefaultTaskQueueSize = 100 DefaultTaskQueueSize = 100
DefaultConcurrentTasks = 10 DefaultConcurrentTasks = 10
# DefaultMaxRetries = 3
# DefaultConcurrentDiscRequests = 10
# DefaultConcurrentAdvertRequests = 10
# DefaultDiscoveryTimeout = 1.minutes
# DefaultMaxQueriedBlocksCache = 1000
# DefaultMinPeersPerBlock = 3
type type
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
@ -88,10 +83,8 @@ type
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
wallet*: WalletRef # Nitro wallet for micropayments wallet*: WalletRef # Nitro wallet for micropayments
pricing*: ?Pricing # Optional bandwidth pricing pricing*: ?Pricing # Optional bandwidth pricing
blockFetchTimeout*: Duration # Timeout for fetching blocks over the network
discovery*: DiscoveryEngine discovery*: DiscoveryEngine
advertiser*: Advertiser advertiser*: Advertiser
@ -100,124 +93,147 @@ type
price*: UInt256 price*: UInt256
# attach task scheduler to engine # attach task scheduler to engine
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} = proc scheduleTask(self: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe.} =
b.taskQueue.pushOrUpdateNoWait(task).isOk() self.taskQueue.pushOrUpdateNoWait(task).isOk()
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
proc start*(b: BlockExcEngine) {.async.} = proc start*(self: BlockExcEngine) {.async.} =
## Start the blockexc task ## Start the blockexc task
## ##
await b.discovery.start() await self.discovery.start()
await b.advertiser.start() await self.advertiser.start()
trace "Blockexc starting with concurrent tasks", tasks = b.concurrentTasks trace "Blockexc starting with concurrent tasks", tasks = self.concurrentTasks
if b.blockexcRunning: if self.blockexcRunning:
warn "Starting blockexc twice" warn "Starting blockexc twice"
return return
b.blockexcRunning = true self.blockexcRunning = true
for i in 0 ..< b.concurrentTasks: for i in 0 ..< self.concurrentTasks:
let fut = b.blockexcTaskRunner() let fut = self.blockexcTaskRunner()
b.trackedFutures.track(fut) self.trackedFutures.track(fut)
asyncSpawn fut
proc stop*(b: BlockExcEngine) {.async.} = proc stop*(self: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc ## Stop the blockexc blockexc
## ##
await b.discovery.stop() await self.trackedFutures.cancelTracked()
await b.advertiser.stop() await self.network.stop()
await self.discovery.stop()
await self.advertiser.stop()
trace "NetworkStore stop" trace "NetworkStore stop"
if not b.blockexcRunning: if not self.blockexcRunning:
warn "Stopping blockexc without starting it" warn "Stopping blockexc without starting it"
return return
b.blockexcRunning = false self.blockexcRunning = false
await b.trackedFutures.cancelTracked()
trace "NetworkStore stopped" trace "NetworkStore stopped"
proc sendWantHave( proc sendWantHave(
b: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx] self: BlockExcEngine, addresses: seq[BlockAddress], peers: seq[BlockExcPeerCtx]
): Future[void] {.async.} = ): Future[void] {.async.} =
for p in peers: for p in peers:
let toAsk = addresses.filterIt(it notin p.peerHave) let toAsk = addresses.filterIt(it notin p.peerHave)
trace "Sending wantHave request", toAsk, peer = p.id trace "Sending wantHave request", toAsk, peer = p.id
await b.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave) await self.network.request.sendWantList(p.id, toAsk, wantType = WantType.WantHave)
codex_block_exchange_want_have_lists_sent.inc() codex_block_exchange_want_have_lists_sent.inc()
proc sendWantBlock( proc sendWantBlock(
b: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx self: BlockExcEngine, addresses: seq[BlockAddress], blockPeer: BlockExcPeerCtx
): Future[void] {.async.} = ): Future[void] {.async.} =
trace "Sending wantBlock request to", addresses, peer = blockPeer.id trace "Sending wantBlock request to", addresses, peer = blockPeer.id
await b.network.request.sendWantList( await self.network.request.sendWantList(
blockPeer.id, addresses, wantType = WantType.WantBlock blockPeer.id, addresses, wantType = WantType.WantBlock
) # we want this remote to send us a block ) # we want this remote to send us a block
codex_block_exchange_want_block_lists_sent.inc() codex_block_exchange_want_block_lists_sent.inc()
proc monitorBlockHandle( proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId Rng.instance.sample(peers)
) {.async.} =
proc downloadInternal(
self: BlockExcEngine, address: BlockAddress
) {.async: (raises: []).} =
logScope:
address = address
let handle = self.pendingBlocks.getWantHandle(address)
trace "Downloading block"
try: try:
discard await handle while address in self.pendingBlocks:
logScope:
retries = self.pendingBlocks.retries(address)
interval = self.pendingBlocks.retryInterval
if self.pendingBlocks.retriesExhausted(address):
trace "Error retries exhausted"
handle.fail(newException(RetriesExhaustedError, "Error retries exhausted"))
break
trace "Running retry handle"
let peers = self.peers.getPeersForBlock(address)
logScope:
peersWith = peers.with.len
peersWithout = peers.without.len
trace "Peers for block"
if peers.with.len > 0:
self.pendingBlocks.setInFlight(address, true)
await self.sendWantBlock(@[address], peers.with.randomPeer)
else:
self.pendingBlocks.setInFlight(address, false)
if peers.without.len > 0:
await self.sendWantHave(@[address], peers.without)
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
self.pendingBlocks.decRetries(address)
if handle.finished:
trace "Handle for block finished", failed = handle.failed
break
except CancelledError as exc: except CancelledError as exc:
trace "Block handle cancelled", address, peerId trace "Block download cancelled"
if not handle.finished:
await handle.cancelAndWait()
except CatchableError as exc: except CatchableError as exc:
warn "Error block handle, disconnecting peer", address, exc = exc.msg, peerId warn "Error downloadloading block", exc = exc.msg
if not handle.finished:
# TODO: really, this is just a quick and dirty way of handle.fail(exc)
# preventing hitting the same "bad" peer every time, however, finally:
# we might as well discover this on or next iteration, so self.pendingBlocks.setInFlight(address, false)
# it doesn't mean that we're never talking to this peer again.
# TODO: we need a lot more work around peer selection and
# prioritization
# drop unresponsive peer
await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
proc pickPseudoRandom(
address: BlockAddress, peers: seq[BlockExcPeerCtx]
): BlockExcPeerCtx =
return peers[hash(address) mod peers.len]
proc requestBlock*( proc requestBlock*(
b: BlockExcEngine, address: BlockAddress self: BlockExcEngine, address: BlockAddress
): Future[?!Block] {.async.} = ): Future[?!Block] {.async: (raises: [CancelledError]).} =
let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) if address notin self.pendingBlocks:
self.trackedFutures.track(self.downloadInternal(address))
if not b.pendingBlocks.isInFlight(address):
let peers = b.peers.getPeersForBlock(address)
if peers.with.len == 0:
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
else:
let selected = pickPseudoRandom(address, peers.with)
asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id)
b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(@[address], selected)
await b.sendWantHave(@[address], peers.without)
# Don't let timeouts bubble up. We can't be too broad here or we break
# cancellations.
try: try:
success await blockFuture let handle = self.pendingBlocks.getWantHandle(address)
except AsyncTimeoutError as err: success await handle
except CancelledError as err:
warn "Block request cancelled", address
raise err
except CatchableError as err:
error "Block request failed", address, err = err.msg
failure err failure err
proc requestBlock*(b: BlockExcEngine, cid: Cid): Future[?!Block] = proc requestBlock*(
b.requestBlock(BlockAddress.init(cid)) self: BlockExcEngine, cid: Cid
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
self.requestBlock(BlockAddress.init(cid))
proc blockPresenceHandler*( proc blockPresenceHandler*(
b: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence] self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
) {.async.} = ) {.async.} =
trace "Received block presence from peer", peer, blocks = blocks.mapIt($it)
let let
peerCtx = b.peers.get(peer) peerCtx = self.peers.get(peer)
wantList = toSeq(b.pendingBlocks.wantList) ourWantList = toSeq(self.pendingBlocks.wantList)
if peerCtx.isNil: if peerCtx.isNil:
return return
@ -228,82 +244,99 @@ proc blockPresenceHandler*(
let let
peerHave = peerCtx.peerHave peerHave = peerCtx.peerHave
dontWantCids = peerHave.filterIt(it notin wantList) dontWantCids = peerHave.filterIt(it notin ourWantList)
if dontWantCids.len > 0: if dontWantCids.len > 0:
peerCtx.cleanPresence(dontWantCids) peerCtx.cleanPresence(dontWantCids)
let wantCids = wantList.filterIt(it in peerHave) let ourWantCids = ourWantList.filter do(address: BlockAddress) -> bool:
if address in peerHave and not self.pendingBlocks.retriesExhausted(address) and
not self.pendingBlocks.isInFlight(address):
self.pendingBlocks.setInFlight(address, true)
self.pendingBlocks.decRetries(address)
true
else:
false
if wantCids.len > 0: if ourWantCids.len > 0:
trace "Peer has blocks in our wantList", peer, wants = wantCids trace "Peer has blocks in our wantList", peer, wants = ourWantCids
await b.sendWantBlock(wantCids, peerCtx) await self.sendWantBlock(ourWantCids, peerCtx)
# if none of the connected peers report our wants in their have list, proc scheduleTasks(self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
# fire up discovery
b.discovery.queueFindBlocksReq(
toSeq(b.pendingBlocks.wantListCids).filter do(cid: Cid) -> bool:
not b.peers.anyIt(cid in it.peerHaveCids)
)
proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
let cids = blocksDelivery.mapIt(it.blk.cid) let cids = blocksDelivery.mapIt(it.blk.cid)
# schedule any new peers to provide blocks to # schedule any new peers to provide blocks to
for p in b.peers: for p in self.peers:
for c in cids: # for each cid for c in cids: # for each cid
# schedule a peer if it wants at least one cid # schedule a peer if it wants at least one cid
# and we have it in our local store # and we have it in our local store
if c in p.peerWantsCids: if c in p.peerWantsCids:
if await (c in b.localStore): if await (c in self.localStore):
if b.scheduleTask(p): if self.scheduleTask(p):
trace "Task scheduled for peer", peer = p.id trace "Task scheduled for peer", peer = p.id
else: else:
warn "Unable to schedule task for peer", peer = p.id warn "Unable to schedule task for peer", peer = p.id
break # do next peer break # do next peer
proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = proc cancelBlocks(self: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
## Tells neighboring peers that we're no longer interested in a block. ## Tells neighboring peers that we're no longer interested in a block.
trace "Sending block request cancellations to peers", ##
addrs, peers = b.peers.mapIt($it.id)
let failed = ( if self.peers.len == 0:
await allFinished( return
b.peers.mapIt(
b.network.request.sendWantCancellations(peer = it.id, addresses = addrs) trace "Sending block request cancellations to peers",
addrs, peers = self.peers.peerIds
proc mapPeers(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} =
let blocks = addrs.filter do(a: BlockAddress) -> bool:
a in peerCtx.blocks
if blocks.len > 0:
trace "Sending block request cancellations to peer", peer = peerCtx.id, blocks
await self.network.request.sendWantCancellations(
peer = peerCtx.id, addresses = blocks
) )
peerCtx.cleanPresence(addrs)
peerCtx
let failed = (await allFinished(map(toSeq(self.peers.peers.values), mapPeers))).filterIt(
it.failed
) )
).filterIt(it.failed)
if failed.len > 0: if failed.len > 0:
warn "Failed to send block request cancellations to peers", peers = failed.len warn "Failed to send block request cancellations to peers", peers = failed.len
else:
trace "Block request cancellations sent to peers", peers = self.peers.len
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = proc resolveBlocks*(
b.pendingBlocks.resolve(blocksDelivery) self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
await b.scheduleTasks(blocksDelivery) ) {.async.} =
await b.cancelBlocks(blocksDelivery.mapIt(it.address)) self.pendingBlocks.resolve(blocksDelivery)
await self.scheduleTasks(blocksDelivery)
await self.cancelBlocks(blocksDelivery.mapIt(it.address))
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = proc resolveBlocks*(self: BlockExcEngine, blocks: seq[Block]) {.async.} =
await b.resolveBlocks( await self.resolveBlocks(
blocks.mapIt( blocks.mapIt(
BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid)) BlockDelivery(blk: it, address: BlockAddress(leaf: false, cid: it.cid))
) )
) )
proc payForBlocks( proc payForBlocks(
engine: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery] self: BlockExcEngine, peer: BlockExcPeerCtx, blocksDelivery: seq[BlockDelivery]
) {.async.} = ) {.async.} =
let let
sendPayment = engine.network.request.sendPayment sendPayment = self.network.request.sendPayment
price = peer.price(blocksDelivery.mapIt(it.address)) price = peer.price(blocksDelivery.mapIt(it.address))
if payment =? engine.wallet.pay(peer, price): if payment =? self.wallet.pay(peer, price):
trace "Sending payment for blocks", price, len = blocksDelivery.len trace "Sending payment for blocks", price, len = blocksDelivery.len
await sendPayment(peer.id, payment) await sendPayment(peer.id, payment)
proc validateBlockDelivery(b: BlockExcEngine, bd: BlockDelivery): ?!void = proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void =
if bd.address notin b.pendingBlocks: if bd.address notin self.pendingBlocks:
return failure("Received block is not currently a pending block") return failure("Received block is not currently a pending block")
if bd.address.leaf: if bd.address.leaf:
@ -333,7 +366,7 @@ proc validateBlockDelivery(b: BlockExcEngine, bd: BlockDelivery): ?!void =
return success() return success()
proc blocksDeliveryHandler*( proc blocksDeliveryHandler*(
b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery] self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]
) {.async.} = ) {.async.} =
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))
@ -343,11 +376,11 @@ proc blocksDeliveryHandler*(
peer = peer peer = peer
address = bd.address address = bd.address
if err =? b.validateBlockDelivery(bd).errorOption: if err =? self.validateBlockDelivery(bd).errorOption:
warn "Block validation failed", msg = err.msg warn "Block validation failed", msg = err.msg
continue continue
if err =? (await b.localStore.putBlock(bd.blk)).errorOption: if err =? (await self.localStore.putBlock(bd.blk)).errorOption:
error "Unable to store block", err = err.msg error "Unable to store block", err = err.msg
continue continue
@ -356,7 +389,7 @@ proc blocksDeliveryHandler*(
error "Proof expected for a leaf block delivery" error "Proof expected for a leaf block delivery"
continue continue
if err =? ( if err =? (
await b.localStore.putCidAndProof( await self.localStore.putCidAndProof(
bd.address.treeCid, bd.address.index, bd.blk.cid, proof bd.address.treeCid, bd.address.index, bd.blk.cid, proof
) )
).errorOption: ).errorOption:
@ -365,18 +398,22 @@ proc blocksDeliveryHandler*(
validatedBlocksDelivery.add(bd) validatedBlocksDelivery.add(bd)
await b.resolveBlocks(validatedBlocksDelivery) await self.resolveBlocks(validatedBlocksDelivery)
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64) codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
let peerCtx = b.peers.get(peer) let peerCtx = self.peers.get(peer)
if peerCtx != nil: if peerCtx != nil:
await b.payForBlocks(peerCtx, blocksDelivery) await self.payForBlocks(peerCtx, blocksDelivery)
## shouldn't we remove them from the want-list instead of this: ## shouldn't we remove them from the want-list instead of this:
peerCtx.cleanPresence(blocksDelivery.mapIt(it.address)) peerCtx.cleanPresence(blocksDelivery.mapIt(it.address))
proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.async.} = proc wantListHandler*(
let peerCtx = b.peers.get(peer) self: BlockExcEngine, peer: PeerId, wantList: WantList
) {.async.} =
trace "Received want list from peer", peer, wantList = wantList.entries.len
let peerCtx = self.peers.get(peer)
if peerCtx.isNil: if peerCtx.isNil:
return return
@ -395,9 +432,14 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy
if idx < 0: # Adding new entry to peer wants if idx < 0: # Adding new entry to peer wants
let let
have = await e.address in b.localStore have = await e.address in self.localStore
price = @(b.pricing.get(Pricing(price: 0.u256)).price.toBytesBE) price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
if e.cancel:
trace "Received cancelation for untracked block, skipping", address = e.address
continue
trace "Processing want list entry", wantList = $e
case e.wantType case e.wantType
of WantType.WantHave: of WantType.WantHave:
if have: if have:
@ -413,7 +455,6 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy
address: e.address, `type`: BlockPresenceType.DontHave, price: price address: e.address, `type`: BlockPresenceType.DontHave, price: price
) )
) )
peerCtx.peerWants.add(e)
codex_block_exchange_want_have_lists_received.inc() codex_block_exchange_want_have_lists_received.inc()
of WantType.WantBlock: of WantType.WantBlock:
@ -425,73 +466,76 @@ proc wantListHandler*(b: BlockExcEngine, peer: PeerId, wantList: WantList) {.asy
if e.cancel: if e.cancel:
trace "Canceling want for block", address = e.address trace "Canceling want for block", address = e.address
peerCtx.peerWants.del(idx) peerCtx.peerWants.del(idx)
trace "Canceled block request", address = e.address, len = peerCtx.peerWants.len
else: else:
if e.wantType == WantType.WantBlock:
schedulePeer = true
# peer might want to ask for the same cid with # peer might want to ask for the same cid with
# different want params # different want params
trace "Updating want for block", address = e.address trace "Updating want for block", address = e.address
peerCtx.peerWants[idx] = e # update entry peerCtx.peerWants[idx] = e # update entry
trace "Updated block request", address = e.address, len = peerCtx.peerWants.len
if presence.len > 0: if presence.len > 0:
trace "Sending presence to remote", items = presence.mapIt($it).join(",") trace "Sending presence to remote", items = presence.mapIt($it).join(",")
await b.network.request.sendPresence(peer, presence) await self.network.request.sendPresence(peer, presence)
if schedulePeer: if schedulePeer and not self.scheduleTask(peerCtx):
if not b.scheduleTask(peerCtx):
warn "Unable to schedule task for peer", peer warn "Unable to schedule task for peer", peer
proc accountHandler*(engine: BlockExcEngine, peer: PeerId, account: Account) {.async.} = proc accountHandler*(self: BlockExcEngine, peer: PeerId, account: Account) {.async.} =
let context = engine.peers.get(peer) let context = self.peers.get(peer)
if context.isNil: if context.isNil:
return return
context.account = account.some context.account = account.some
proc paymentHandler*( proc paymentHandler*(
engine: BlockExcEngine, peer: PeerId, payment: SignedState self: BlockExcEngine, peer: PeerId, payment: SignedState
) {.async.} = ) {.async.} =
trace "Handling payments", peer trace "Handling payments", peer
without context =? engine.peers.get(peer).option and account =? context.account: without context =? self.peers.get(peer).option and account =? context.account:
trace "No context or account for peer", peer trace "No context or account for peer", peer
return return
if channel =? context.paymentChannel: if channel =? context.paymentChannel:
let sender = account.address let sender = account.address
discard engine.wallet.acceptPayment(channel, Asset, sender, payment) discard self.wallet.acceptPayment(channel, Asset, sender, payment)
else: else:
context.paymentChannel = engine.wallet.acceptChannel(payment).option context.paymentChannel = self.wallet.acceptChannel(payment).option
proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = proc setupPeer*(self: BlockExcEngine, peer: PeerId) {.async.} =
## Perform initial setup, such as want ## Perform initial setup, such as want
## list exchange ## list exchange
## ##
trace "Setting up peer", peer trace "Setting up peer", peer
if peer notin b.peers: if peer notin self.peers:
trace "Setting up new peer", peer trace "Setting up new peer", peer
b.peers.add(BlockExcPeerCtx(id: peer)) self.peers.add(BlockExcPeerCtx(id: peer))
trace "Added peer", peers = b.peers.len trace "Added peer", peers = self.peers.len
# broadcast our want list, the other peer will do the same # broadcast our want list, the other peer will do the same
if b.pendingBlocks.wantListLen > 0: if self.pendingBlocks.wantListLen > 0:
trace "Sending our want list to a peer", peer trace "Sending our want list to a peer", peer
let cids = toSeq(b.pendingBlocks.wantList) let cids = toSeq(self.pendingBlocks.wantList)
await b.network.request.sendWantList(peer, cids, full = true) await self.network.request.sendWantList(peer, cids, full = true)
if address =? b.pricing .? address: if address =? self.pricing .? address:
await b.network.request.sendAccount(peer, Account(address: address)) await self.network.request.sendAccount(peer, Account(address: address))
proc dropPeer*(b: BlockExcEngine, peer: PeerId) = proc dropPeer*(self: BlockExcEngine, peer: PeerId) =
## Cleanup disconnected peer ## Cleanup disconnected peer
## ##
trace "Dropping peer", peer trace "Dropping peer", peer
# drop the peer from the peers table # drop the peer from the peers table
b.peers.remove(peer) self.peers.remove(peer)
proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = proc taskHandler*(self: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
# Send to the peer blocks he wants to get, # Send to the peer blocks he wants to get,
# if they present in our local store # if they present in our local store
@ -514,14 +558,14 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} = proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
if e.address.leaf: if e.address.leaf:
(await b.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map( (await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
(blkAndProof: (Block, CodexProof)) => (blkAndProof: (Block, CodexProof)) =>
BlockDelivery( BlockDelivery(
address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some
) )
) )
else: else:
(await b.localStore.getBlock(e.address)).map( (await self.localStore.getBlock(e.address)).map(
(blk: Block) => (blk: Block) =>
BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none) BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none)
) )
@ -540,22 +584,22 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
if blocksDelivery.len > 0: if blocksDelivery.len > 0:
trace "Sending blocks to peer", trace "Sending blocks to peer",
peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) peer = task.id, blocks = (blocksDelivery.mapIt(it.address))
await b.network.request.sendBlocksDelivery(task.id, blocksDelivery) await self.network.request.sendBlocksDelivery(task.id, blocksDelivery)
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64) codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
task.peerWants.keepItIf(it.address notin successAddresses) task.peerWants.keepItIf(it.address notin successAddresses)
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} = proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
## process tasks ## process tasks
## ##
trace "Starting blockexc task runner" trace "Starting blockexc task runner"
while b.blockexcRunning: while self.blockexcRunning:
try: try:
let peerCtx = await b.taskQueue.pop() let peerCtx = await self.taskQueue.pop()
await b.taskHandler(peerCtx) await self.taskHandler(peerCtx)
except CancelledError: except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e: except CatchableError as e:
@ -573,55 +617,51 @@ proc new*(
peerStore: PeerCtxStore, peerStore: PeerCtxStore,
pendingBlocks: PendingBlocksManager, pendingBlocks: PendingBlocksManager,
concurrentTasks = DefaultConcurrentTasks, concurrentTasks = DefaultConcurrentTasks,
peersPerRequest = DefaultMaxPeersPerRequest,
blockFetchTimeout = DefaultBlockTimeout,
): BlockExcEngine = ): BlockExcEngine =
## Create new block exchange engine instance ## Create new block exchange engine instance
## ##
let engine = BlockExcEngine( let self = BlockExcEngine(
localStore: localStore, localStore: localStore,
peers: peerStore, peers: peerStore,
pendingBlocks: pendingBlocks, pendingBlocks: pendingBlocks,
peersPerRequest: peersPerRequest,
network: network, network: network,
wallet: wallet, wallet: wallet,
concurrentTasks: concurrentTasks, concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(), trackedFutures: TrackedFutures(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery, discovery: discovery,
advertiser: advertiser, advertiser: advertiser,
blockFetchTimeout: blockFetchTimeout,
) )
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined:
await engine.setupPeer(peerId) await self.setupPeer(peerId)
else: else:
engine.dropPeer(peerId) self.dropPeer(peerId)
if not isNil(network.switch): if not isNil(network.switch):
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc blockWantListHandler(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} = proc blockWantListHandler(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} =
engine.wantListHandler(peer, wantList) self.wantListHandler(peer, wantList)
proc blockPresenceHandler( proc blockPresenceHandler(
peer: PeerId, presence: seq[BlockPresence] peer: PeerId, presence: seq[BlockPresence]
): Future[void] {.gcsafe.} = ): Future[void] {.gcsafe.} =
engine.blockPresenceHandler(peer, presence) self.blockPresenceHandler(peer, presence)
proc blocksDeliveryHandler( proc blocksDeliveryHandler(
peer: PeerId, blocksDelivery: seq[BlockDelivery] peer: PeerId, blocksDelivery: seq[BlockDelivery]
): Future[void] {.gcsafe.} = ): Future[void] {.gcsafe.} =
engine.blocksDeliveryHandler(peer, blocksDelivery) self.blocksDeliveryHandler(peer, blocksDelivery)
proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} = proc accountHandler(peer: PeerId, account: Account): Future[void] {.gcsafe.} =
engine.accountHandler(peer, account) self.accountHandler(peer, account)
proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} = proc paymentHandler(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} =
engine.paymentHandler(peer, payment) self.paymentHandler(peer, payment)
network.handlers = BlockExcHandlers( network.handlers = BlockExcHandlers(
onWantList: blockWantListHandler, onWantList: blockWantListHandler,
@ -631,4 +671,4 @@ proc new*(
onPayment: paymentHandler, onPayment: paymentHandler,
) )
return engine return self

View File

@ -7,13 +7,11 @@
## This file may not be copied, modified, or distributed except according to ## This file may not be copied, modified, or distributed except according to
## those terms. ## those terms.
{.push raises: [].}
import std/tables import std/tables
import std/monotimes import std/monotimes
import std/strutils
import pkg/upraises
push:
{.upraises: [].}
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
@ -34,66 +32,76 @@ declareGauge(
codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us" codex_block_exchange_retrieval_time_us, "codex blockexchange block retrieval time us"
) )
const DefaultBlockTimeout* = 10.minutes const
DefaultBlockRetries* = 3000
DefaultRetryInterval* = 500.millis
type type
RetriesExhaustedError* = object of CatchableError
BlockHandle* = Future[Block].Raising([CancelledError, RetriesExhaustedError])
BlockReq* = object BlockReq* = object
handle*: Future[Block] handle*: BlockHandle
inFlight*: bool inFlight*: bool
blockRetries*: int
startTime*: int64 startTime*: int64
PendingBlocksManager* = ref object of RootObj PendingBlocksManager* = ref object of RootObj
blockRetries*: int = DefaultBlockRetries
retryInterval*: Duration = DefaultRetryInterval
blocks*: Table[BlockAddress, BlockReq] # pending Block requests blocks*: Table[BlockAddress, BlockReq] # pending Block requests
proc updatePendingBlockGauge(p: PendingBlocksManager) = proc updatePendingBlockGauge(p: PendingBlocksManager) =
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
proc getWantHandle*( proc getWantHandle*(
p: PendingBlocksManager, self: PendingBlocksManager, address: BlockAddress, inFlight = false
address: BlockAddress, ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
timeout = DefaultBlockTimeout,
inFlight = false,
): Future[Block] {.async.} =
## Add an event for a block ## Add an event for a block
## ##
try: self.blocks.withValue(address, blk):
if address notin p.blocks: return blk[].handle
p.blocks[address] = BlockReq( do:
let blk = BlockReq(
handle: newFuture[Block]("pendingBlocks.getWantHandle"), handle: newFuture[Block]("pendingBlocks.getWantHandle"),
inFlight: inFlight, inFlight: inFlight,
blockRetries: self.blockRetries,
startTime: getMonoTime().ticks, startTime: getMonoTime().ticks,
) )
self.blocks[address] = blk
let handle = blk.handle
p.updatePendingBlockGauge() proc cleanUpBlock(data: pointer) {.raises: [].} =
return await p.blocks[address].handle.wait(timeout) self.blocks.del(address)
except CancelledError as exc: self.updatePendingBlockGauge()
trace "Blocks cancelled", exc = exc.msg, address
raise exc handle.addCallback(cleanUpBlock)
except CatchableError as exc: handle.cancelCallback = proc(data: pointer) {.raises: [].} =
error "Pending WANT failed or expired", exc = exc.msg if not handle.finished:
# no need to cancel, it is already cancelled by wait() handle.removeCallback(cleanUpBlock)
raise exc cleanUpBlock(nil)
finally:
p.blocks.del(address) self.updatePendingBlockGauge()
p.updatePendingBlockGauge() return handle
proc getWantHandle*( proc getWantHandle*(
p: PendingBlocksManager, cid: Cid, timeout = DefaultBlockTimeout, inFlight = false self: PendingBlocksManager, cid: Cid, inFlight = false
): Future[Block] = ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
p.getWantHandle(BlockAddress.init(cid), timeout, inFlight) self.getWantHandle(BlockAddress.init(cid), inFlight)
proc resolve*( proc resolve*(
p: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery]
) {.gcsafe, raises: [].} = ) {.gcsafe, raises: [].} =
## Resolve pending blocks ## Resolve pending blocks
## ##
for bd in blocksDelivery: for bd in blocksDelivery:
p.blocks.withValue(bd.address, blockReq): self.blocks.withValue(bd.address, blockReq):
if not blockReq.handle.finished: if not blockReq[].handle.finished:
trace "Resolving pending block", address = bd.address
let let
startTime = blockReq.startTime startTime = blockReq[].startTime
stopTime = getMonoTime().ticks stopTime = getMonoTime().ticks
retrievalDurationUs = (stopTime - startTime) div 1000 retrievalDurationUs = (stopTime - startTime) div 1000
@ -106,52 +114,70 @@ proc resolve*(
else: else:
trace "Block handle already finished", address = bd.address trace "Block handle already finished", address = bd.address
proc setInFlight*(p: PendingBlocksManager, address: BlockAddress, inFlight = true) = func retries*(self: PendingBlocksManager, address: BlockAddress): int =
self.blocks.withValue(address, pending):
result = pending[].blockRetries
do:
result = 0
func decRetries*(self: PendingBlocksManager, address: BlockAddress) =
self.blocks.withValue(address, pending):
pending[].blockRetries -= 1
func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool =
self.blocks.withValue(address, pending):
result = pending[].blockRetries <= 0
func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) =
## Set inflight status for a block ## Set inflight status for a block
## ##
p.blocks.withValue(address, pending): self.blocks.withValue(address, pending):
pending[].inFlight = inFlight pending[].inFlight = inFlight
proc isInFlight*(p: PendingBlocksManager, address: BlockAddress): bool = func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool =
## Check if a block is in flight ## Check if a block is in flight
## ##
p.blocks.withValue(address, pending): self.blocks.withValue(address, pending):
result = pending[].inFlight result = pending[].inFlight
proc contains*(p: PendingBlocksManager, cid: Cid): bool = func contains*(self: PendingBlocksManager, cid: Cid): bool =
BlockAddress.init(cid) in p.blocks BlockAddress.init(cid) in self.blocks
proc contains*(p: PendingBlocksManager, address: BlockAddress): bool = func contains*(self: PendingBlocksManager, address: BlockAddress): bool =
address in p.blocks address in self.blocks
iterator wantList*(p: PendingBlocksManager): BlockAddress = iterator wantList*(self: PendingBlocksManager): BlockAddress =
for a in p.blocks.keys: for a in self.blocks.keys:
yield a yield a
iterator wantListBlockCids*(p: PendingBlocksManager): Cid = iterator wantListBlockCids*(self: PendingBlocksManager): Cid =
for a in p.blocks.keys: for a in self.blocks.keys:
if not a.leaf: if not a.leaf:
yield a.cid yield a.cid
iterator wantListCids*(p: PendingBlocksManager): Cid = iterator wantListCids*(self: PendingBlocksManager): Cid =
var yieldedCids = initHashSet[Cid]() var yieldedCids = initHashSet[Cid]()
for a in p.blocks.keys: for a in self.blocks.keys:
let cid = a.cidOrTreeCid let cid = a.cidOrTreeCid
if cid notin yieldedCids: if cid notin yieldedCids:
yieldedCids.incl(cid) yieldedCids.incl(cid)
yield cid yield cid
iterator wantHandles*(p: PendingBlocksManager): Future[Block] = iterator wantHandles*(self: PendingBlocksManager): Future[Block] =
for v in p.blocks.values: for v in self.blocks.values:
yield v.handle yield v.handle
proc wantListLen*(p: PendingBlocksManager): int = proc wantListLen*(self: PendingBlocksManager): int =
p.blocks.len self.blocks.len
func len*(p: PendingBlocksManager): int = func len*(self: PendingBlocksManager): int =
p.blocks.len self.blocks.len
func new*(T: type PendingBlocksManager): PendingBlocksManager = func new*(
PendingBlocksManager() T: type PendingBlocksManager,
retries = DefaultBlockRetries,
interval = DefaultRetryInterval,
): PendingBlocksManager =
PendingBlocksManager(blockRetries: retries, retryInterval: interval)

View File

@ -21,17 +21,18 @@ import ../../blocktype as bt
import ../../logutils import ../../logutils
import ../protobuf/blockexc as pb import ../protobuf/blockexc as pb
import ../protobuf/payments import ../protobuf/payments
import ../../utils/trackedfutures
import ./networkpeer import ./networkpeer
export network, payments export networkpeer, payments
logScope: logScope:
topics = "codex blockexcnetwork" topics = "codex blockexcnetwork"
const const
Codec* = "/codex/blockexc/1.0.0" Codec* = "/codex/blockexc/1.0.0"
MaxInflight* = 100 DefaultMaxInflight* = 100
type type
WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} WantListHandler* = proc(peer: PeerId, wantList: WantList): Future[void] {.gcsafe.}
@ -82,6 +83,8 @@ type
request*: BlockExcRequest request*: BlockExcRequest
getConn: ConnProvider getConn: ConnProvider
inflightSema: AsyncSemaphore inflightSema: AsyncSemaphore
maxInflight: int = DefaultMaxInflight
trackedFutures*: TrackedFutures = TrackedFutures()
proc peerId*(b: BlockExcNetwork): PeerId = proc peerId*(b: BlockExcNetwork): PeerId =
## Return peer id ## Return peer id
@ -220,23 +223,25 @@ proc handlePayment(
if not network.handlers.onPayment.isNil: if not network.handlers.onPayment.isNil:
await network.handlers.onPayment(peer.id, payment) await network.handlers.onPayment(peer.id, payment)
proc rpcHandler(b: BlockExcNetwork, peer: NetworkPeer, msg: Message) {.raises: [].} = proc rpcHandler(
b: BlockExcNetwork, peer: NetworkPeer, msg: Message
) {.async: (raises: [CatchableError]).} =
## handle rpc messages ## handle rpc messages
## ##
if msg.wantList.entries.len > 0: if msg.wantList.entries.len > 0:
asyncSpawn b.handleWantList(peer, msg.wantList) b.trackedFutures.track(b.handleWantList(peer, msg.wantList))
if msg.payload.len > 0: if msg.payload.len > 0:
asyncSpawn b.handleBlocksDelivery(peer, msg.payload) b.trackedFutures.track(b.handleBlocksDelivery(peer, msg.payload))
if msg.blockPresences.len > 0: if msg.blockPresences.len > 0:
asyncSpawn b.handleBlockPresence(peer, msg.blockPresences) b.trackedFutures.track(b.handleBlockPresence(peer, msg.blockPresences))
if account =? Account.init(msg.account): if account =? Account.init(msg.account):
asyncSpawn b.handleAccount(peer, account) b.trackedFutures.track(b.handleAccount(peer, account))
if payment =? SignedState.init(msg.payment): if payment =? SignedState.init(msg.payment):
asyncSpawn b.handlePayment(peer, payment) b.trackedFutures.track(b.handlePayment(peer, payment))
proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
## Creates or retrieves a BlockExcNetwork Peer ## Creates or retrieves a BlockExcNetwork Peer
@ -247,6 +252,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} = var getConn: ConnProvider = proc(): Future[Connection] {.async, gcsafe, closure.} =
try: try:
trace "Getting new connection stream", peer
return await b.switch.dial(peer, Codec) return await b.switch.dial(peer, Codec)
except CancelledError as error: except CancelledError as error:
raise error raise error
@ -256,8 +262,10 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
if not isNil(b.getConn): if not isNil(b.getConn):
getConn = b.getConn getConn = b.getConn
let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async.} = let rpcHandler = proc(
b.rpcHandler(p, msg) p: NetworkPeer, msg: Message
) {.async: (raises: [CatchableError]).} =
await b.rpcHandler(p, msg)
# create new pubsub peer # create new pubsub peer
let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler)
@ -282,48 +290,61 @@ proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
trace "Skipping dialing self", peer = peer.peerId trace "Skipping dialing self", peer = peer.peerId
return return
if peer.peerId in b.peers:
trace "Already connected to peer", peer = peer.peerId
return
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = proc dropPeer*(b: BlockExcNetwork, peer: PeerId) =
## Cleanup disconnected peer ## Cleanup disconnected peer
## ##
trace "Dropping peer", peer
b.peers.del(peer) b.peers.del(peer)
method init*(b: BlockExcNetwork) = method init*(self: BlockExcNetwork) =
## Perform protocol initialization ## Perform protocol initialization
## ##
proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} =
if event.kind == PeerEventKind.Joined: if event.kind == PeerEventKind.Joined:
b.setupPeer(peerId) self.setupPeer(peerId)
else: else:
b.dropPeer(peerId) self.dropPeer(peerId)
b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
b.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = proc handler(conn: Connection, proto: string) {.async.} =
let peerId = conn.peerId let peerId = conn.peerId
let blockexcPeer = b.getOrCreatePeer(peerId) let blockexcPeer = self.getOrCreatePeer(peerId)
await blockexcPeer.readLoop(conn) # attach read loop await blockexcPeer.readLoop(conn) # attach read loop
b.handler = handle self.handler = handler
b.codec = Codec self.codec = Codec
proc stop*(self: BlockExcNetwork) {.async: (raises: []).} =
await self.trackedFutures.cancelTracked()
proc new*( proc new*(
T: type BlockExcNetwork, T: type BlockExcNetwork,
switch: Switch, switch: Switch,
connProvider: ConnProvider = nil, connProvider: ConnProvider = nil,
maxInflight = MaxInflight, maxInflight = DefaultMaxInflight,
): BlockExcNetwork = ): BlockExcNetwork =
## Create a new BlockExcNetwork instance ## Create a new BlockExcNetwork instance
## ##
let self = BlockExcNetwork( let self = BlockExcNetwork(
switch: switch, getConn: connProvider, inflightSema: newAsyncSemaphore(maxInflight) switch: switch,
getConn: connProvider,
inflightSema: newAsyncSemaphore(maxInflight),
maxInflight: maxInflight,
) )
self.maxIncomingStreams = self.maxInflight
proc sendWantList( proc sendWantList(
id: PeerId, id: PeerId,
cids: seq[BlockAddress], cids: seq[BlockAddress],

View File

@ -22,39 +22,56 @@ import ../../logutils
logScope: logScope:
topics = "codex blockexcnetworkpeer" topics = "codex blockexcnetworkpeer"
const DefaultYieldInterval = 50.millis
type type
ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.} ConnProvider* = proc(): Future[Connection] {.gcsafe, closure.}
RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} RPCHandler* = proc(
peer: NetworkPeer, msg: Message
): Future[void].Raising(CatchableError) {.gcsafe.}
NetworkPeer* = ref object of RootObj NetworkPeer* = ref object of RootObj
id*: PeerId id*: PeerId
handler*: RPCHandler handler*: RPCHandler
sendConn: Connection sendConn: Connection
getConn: ConnProvider getConn: ConnProvider
yieldInterval*: Duration = DefaultYieldInterval
proc connected*(b: NetworkPeer): bool = proc connected*(b: NetworkPeer): bool =
not (isNil(b.sendConn)) and not (b.sendConn.closed or b.sendConn.atEof) not (isNil(b.sendConn)) and not (b.sendConn.closed or b.sendConn.atEof)
proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} = proc readLoop*(b: NetworkPeer, conn: Connection) {.async.} =
if isNil(conn): if isNil(conn):
trace "No connection to read from", peer = b.id
return return
trace "Attaching read loop", peer = b.id, connId = conn.oid
try: try:
var nextYield = Moment.now() + b.yieldInterval
while not conn.atEof or not conn.closed: while not conn.atEof or not conn.closed:
if Moment.now() > nextYield:
nextYield = Moment.now() + b.yieldInterval
trace "Yielding in read loop",
peer = b.id, nextYield = nextYield, interval = b.yieldInterval
await sleepAsync(10.millis)
let let
data = await conn.readLp(MaxMessageSize.int) data = await conn.readLp(MaxMessageSize.int)
msg = Message.protobufDecode(data).mapFailure().tryGet() msg = Message.protobufDecode(data).mapFailure().tryGet()
trace "Received message", peer = b.id, connId = conn.oid
await b.handler(b, msg) await b.handler(b, msg)
except CancelledError: except CancelledError:
trace "Read loop cancelled" trace "Read loop cancelled"
except CatchableError as err: except CatchableError as err:
warn "Exception in blockexc read loop", msg = err.msg warn "Exception in blockexc read loop", msg = err.msg
finally: finally:
trace "Detaching read loop", peer = b.id, connId = conn.oid
await conn.close() await conn.close()
proc connect*(b: NetworkPeer): Future[Connection] {.async.} = proc connect*(b: NetworkPeer): Future[Connection] {.async.} =
if b.connected: if b.connected:
trace "Already connected", peer = b.id, connId = b.sendConn.oid
return b.sendConn return b.sendConn
b.sendConn = await b.getConn() b.sendConn = await b.getConn()
@ -68,17 +85,9 @@ proc send*(b: NetworkPeer, msg: Message) {.async.} =
warn "Unable to get send connection for peer message not sent", peer = b.id warn "Unable to get send connection for peer message not sent", peer = b.id
return return
trace "Sending message", peer = b.id, connId = conn.oid
await conn.writeLp(protobufEncode(msg)) await conn.writeLp(protobufEncode(msg))
proc broadcast*(b: NetworkPeer, msg: Message) =
proc sendAwaiter() {.async.} =
try:
await b.send(msg)
except CatchableError as exc:
warn "Exception broadcasting message to peer", peer = b.id, exc = exc.msg
asyncSpawn sendAwaiter()
func new*( func new*(
T: type NetworkPeer, T: type NetworkPeer,
peer: PeerId, peer: PeerId,

View File

@ -10,6 +10,7 @@
import std/sequtils import std/sequtils
import std/tables import std/tables
import std/algorithm import std/algorithm
import std/sequtils
import pkg/upraises import pkg/upraises
@ -33,9 +34,7 @@ type
PeerCtxStore* = ref object of RootObj PeerCtxStore* = ref object of RootObj
peers*: OrderedTable[PeerId, BlockExcPeerCtx] peers*: OrderedTable[PeerId, BlockExcPeerCtx]
PeersForBlock* = object of RootObj PeersForBlock* = tuple[with: seq[BlockExcPeerCtx], without: seq[BlockExcPeerCtx]]
with*: seq[BlockExcPeerCtx]
without*: seq[BlockExcPeerCtx]
iterator items*(self: PeerCtxStore): BlockExcPeerCtx = iterator items*(self: PeerCtxStore): BlockExcPeerCtx =
for p in self.peers.values: for p in self.peers.values:
@ -47,6 +46,9 @@ proc contains*(a: openArray[BlockExcPeerCtx], b: PeerId): bool =
a.anyIt(it.id == b) a.anyIt(it.id == b)
func peerIds*(self: PeerCtxStore): seq[PeerId] =
toSeq(self.peers.keys)
func contains*(self: PeerCtxStore, peerId: PeerId): bool = func contains*(self: PeerCtxStore, peerId: PeerId): bool =
peerId in self.peers peerId in self.peers
@ -75,7 +77,7 @@ func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid)) toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
var res = PeersForBlock() var res: PeersForBlock = (@[], @[])
for peer in self: for peer in self:
if peer.peerHave.anyIt(it == address): if peer.peerHave.anyIt(it == address):
res.with.add(peer) res.with.add(peer)

View File

@ -311,7 +311,7 @@ proc new*(
bufferSize = (1024 * 64), bufferSize = (1024 * 64),
maxRequestBodySize = int.high, maxRequestBodySize = int.high,
) )
.expect("Should start rest server!") .expect("Should create rest server!")
switch.mount(network) switch.mount(network)

View File

@ -51,8 +51,8 @@ export units, net, codextypes, logutils, completeCmdArg, parseCmdArg, NatConfig
export ValidationGroups, MaxSlots export ValidationGroups, MaxSlots
export export
DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockInterval, DefaultNumBlocksPerInterval,
DefaultNumberOfBlocksToMaintainPerInterval, DefaultRequestCacheSize DefaultRequestCacheSize
type ThreadCount* = distinct Natural type ThreadCount* = distinct Natural
@ -251,15 +251,15 @@ type
desc: desc:
"Time interval in seconds - determines frequency of block " & "Time interval in seconds - determines frequency of block " &
"maintenance cycle: how often blocks are checked " & "for expiration and cleanup", "maintenance cycle: how often blocks are checked " & "for expiration and cleanup",
defaultValue: DefaultBlockMaintenanceInterval, defaultValue: DefaultBlockInterval,
defaultValueDesc: $DefaultBlockMaintenanceInterval, defaultValueDesc: $DefaultBlockInterval,
name: "block-mi" name: "block-mi"
.}: Duration .}: Duration
blockMaintenanceNumberOfBlocks* {. blockMaintenanceNumberOfBlocks* {.
desc: "Number of blocks to check every maintenance cycle", desc: "Number of blocks to check every maintenance cycle",
defaultValue: DefaultNumberOfBlocksToMaintainPerInterval, defaultValue: DefaultNumBlocksPerInterval,
defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval, defaultValueDesc: $DefaultNumBlocksPerInterval,
name: "block-mn" name: "block-mn"
.}: int .}: int

View File

@ -152,7 +152,7 @@ proc formatTextLineSeq*(val: seq[string]): string =
template formatIt*(format: LogFormat, T: typedesc, body: untyped) = template formatIt*(format: LogFormat, T: typedesc, body: untyped) =
# Provides formatters for logging with Chronicles for the given type and # Provides formatters for logging with Chronicles for the given type and
# `LogFormat`. # `LogFormat`.
# NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overriddden # NOTE: `seq[T]`, `Option[T]`, and `seq[Option[T]]` are overridden
# since the base `setProperty` is generic using `auto` and conflicts with # since the base `setProperty` is generic using `auto` and conflicts with
# providing a generic `seq` and `Option` override. # providing a generic `seq` and `Option` override.
when format == LogFormat.json: when format == LogFormat.json:

View File

@ -45,13 +45,14 @@ import ./utils
import ./errors import ./errors
import ./logutils import ./logutils
import ./utils/asynciter import ./utils/asynciter
import ./utils/trackedfutures
export logutils export logutils
logScope: logScope:
topics = "codex node" topics = "codex node"
const FetchBatch = 200 const DefaultFetchBatch = 10
type type
Contracts* = Contracts* =
@ -72,6 +73,7 @@ type
clock*: Clock clock*: Clock
storage*: Contracts storage*: Contracts
taskpool: Taskpool taskpool: Taskpool
trackedFutures: TrackedFutures
CodexNodeRef* = ref CodexNode CodexNodeRef* = ref CodexNode
@ -163,8 +165,9 @@ proc fetchBatched*(
self: CodexNodeRef, self: CodexNodeRef,
cid: Cid, cid: Cid,
iter: Iter[int], iter: Iter[int],
batchSize = FetchBatch, batchSize = DefaultFetchBatch,
onBatch: BatchProc = nil, onBatch: BatchProc = nil,
fetchLocal = true,
): Future[?!void] {.async, gcsafe.} = ): Future[?!void] {.async, gcsafe.} =
## Fetch blocks in batches of `batchSize` ## Fetch blocks in batches of `batchSize`
## ##
@ -179,7 +182,9 @@ proc fetchBatched*(
let blocks = collect: let blocks = collect:
for i in 0 ..< batchSize: for i in 0 ..< batchSize:
if not iter.finished: if not iter.finished:
self.networkStore.getBlock(BlockAddress.init(cid, iter.next())) let address = BlockAddress.init(cid, iter.next())
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)
if blocksErr =? (await allFutureResult(blocks)).errorOption: if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr) return failure(blocksErr)
@ -188,21 +193,25 @@ proc fetchBatched*(
batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption: batchErr =? (await onBatch(blocks.mapIt(it.read.get))).errorOption:
return failure(batchErr) return failure(batchErr)
await sleepAsync(1.millis)
success() success()
proc fetchBatched*( proc fetchBatched*(
self: CodexNodeRef, self: CodexNodeRef,
manifest: Manifest, manifest: Manifest,
batchSize = FetchBatch, batchSize = DefaultFetchBatch,
onBatch: BatchProc = nil, onBatch: BatchProc = nil,
fetchLocal = true,
): Future[?!void] = ): Future[?!void] =
## Fetch manifest in batches of `batchSize` ## Fetch manifest in batches of `batchSize`
## ##
trace "Fetching blocks in batches of", size = batchSize trace "Fetching blocks in batches of",
size = batchSize, blocksCount = manifest.blocksCount
let iter = Iter[int].new(0 ..< manifest.blocksCount) let iter = Iter[int].new(0 ..< manifest.blocksCount)
self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch) self.fetchBatched(manifest.treeCid, iter, batchSize, onBatch, fetchLocal)
proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} =
## Streams the contents of a single block. ## Streams the contents of a single block.
@ -223,35 +232,64 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async
finally: finally:
await stream.pushEof() await stream.pushEof()
asyncSpawn streamOneBlock() self.trackedFutures.track(streamOneBlock())
LPStream(stream).success LPStream(stream).success
proc streamEntireDataset( proc streamEntireDataset(
self: CodexNodeRef, manifest: Manifest, manifestCid: Cid self: CodexNodeRef,
manifest: Manifest,
manifestCid: Cid,
prefetchBatch = DefaultFetchBatch,
): Future[?!LPStream] {.async.} = ): Future[?!LPStream] {.async.} =
## Streams the contents of the entire dataset described by the manifest. ## Streams the contents of the entire dataset described by the manifest.
## Background jobs (erasure decoding and prefetching) will be cancelled when
## the stream is closed.
## ##
trace "Retrieving blocks from manifest", manifestCid trace "Retrieving blocks from manifest", manifestCid
let stream = LPStream(StoreStream.new(self.networkStore, manifest, pad = false))
var jobs: seq[Future[void]]
if manifest.protected: if manifest.protected:
# Retrieve, decode and save to the local store all EС groups # Retrieve, decode and save to the local store all EС groups
proc erasureJob(): Future[?!void] {.async.} = proc erasureJob(): Future[void] {.async.} =
try:
# Spawn an erasure decoding job # Spawn an erasure decoding job
let erasure = Erasure.new( let erasure = Erasure.new(
self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool self.networkStore, leoEncoderProvider, leoDecoderProvider, self.taskpool
) )
without _ =? (await erasure.decode(manifest)), error: without _ =? (await erasure.decode(manifest)), error:
error "Unable to erasure decode manifest", manifestCid, exc = error.msg error "Unable to erasure decode manifest", manifestCid, exc = error.msg
return failure(error) except CancelledError:
trace "Erasure job cancelled", manifestCid
except CatchableError as exc:
trace "Error erasure decoding manifest", manifestCid, exc = exc.msg
return success() jobs.add(erasureJob())
if err =? (await erasureJob()).errorOption: proc prefetch(): Future[void] {.async.} =
return failure(err) try:
if err =?
(await self.fetchBatched(manifest, prefetchBatch, fetchLocal = false)).errorOption:
error "Unable to fetch blocks", err = err.msg
except CancelledError:
trace "Prefetch job cancelled"
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg
jobs.add(prefetch())
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async.} =
try:
await stream.join()
finally:
await allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid trace "Creating store stream for manifest", manifestCid
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success stream.success
proc retrieve*( proc retrieve*(
self: CodexNodeRef, cid: Cid, local: bool = true self: CodexNodeRef, cid: Cid, local: bool = true
@ -758,6 +796,11 @@ proc start*(self: CodexNodeRef) {.async.} =
proc stop*(self: CodexNodeRef) {.async.} = proc stop*(self: CodexNodeRef) {.async.} =
trace "Stopping node" trace "Stopping node"
if not self.taskpool.isNil:
self.taskpool.shutdown()
await self.trackedFutures.cancelTracked()
if not self.engine.isNil: if not self.engine.isNil:
await self.engine.stop() await self.engine.stop()
@ -779,9 +822,6 @@ proc stop*(self: CodexNodeRef) {.async.} =
if not self.networkStore.isNil: if not self.networkStore.isNil:
await self.networkStore.close await self.networkStore.close
if not self.taskpool.isNil:
self.taskpool.shutdown()
proc new*( proc new*(
T: type CodexNodeRef, T: type CodexNodeRef,
switch: Switch, switch: Switch,
@ -803,4 +843,5 @@ proc new*(
discovery: discovery, discovery: discovery,
taskPool: taskpool, taskPool: taskpool,
contracts: contracts, contracts: contracts,
trackedFutures: TrackedFutures(),
) )

View File

@ -13,8 +13,8 @@ push:
{.upraises: [].} {.upraises: [].}
import std/sequtils import std/sequtils
import mimetypes import std/mimetypes
import os import std/os
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
@ -120,7 +120,7 @@ proc retrieveCid(
await resp.finish() await resp.finish()
codex_api_downloads.inc() codex_api_downloads.inc()
except CatchableError as exc: except CatchableError as exc:
warn "Excepting streaming blocks", exc = exc.msg warn "Error streaming blocks", exc = exc.msg
resp.status = Http500 resp.status = Http500
return await resp.sendBody("") return await resp.sendBody("")
finally: finally:

View File

@ -55,6 +55,15 @@ proc sample*[T](
break break
proc sample*[T](
rng: Rng, sample: openArray[T], limit: int
): seq[T] {.raises: [Defect, RngSampleError].} =
if limit > sample.len:
raise newException(RngSampleError, "Limit cannot be larger than sample!")
for _ in 0 ..< min(sample.len, limit):
result.add(rng.sample(sample, result))
proc shuffle*[T](rng: Rng, a: var openArray[T]) = proc shuffle*[T](rng: Rng, a: var openArray[T]) =
for i in countdown(a.high, 1): for i in countdown(a.high, 1):
let j = rng.rand(i) let j = rng.rand(i)

View File

@ -189,7 +189,7 @@ proc getCellHashes*[T, H](
blkIdx = blkIdx blkIdx = blkIdx
pos = i pos = i
trace "Getting block CID for tree at index", index = blkIdx trace "Getting block CID for tree at index"
without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root, without (_, tree) =? (await self.buildBlockTree(blkIdx, i)) and digest =? tree.root,
err: err:
error "Failed to get block CID for tree at index", err = err.msg error "Failed to get block CID for tree at index", err = err.msg

View File

@ -22,8 +22,8 @@ import ../logutils
import ../systemclock import ../systemclock
const const
DefaultBlockMaintenanceInterval* = 10.minutes DefaultBlockInterval* = 10.minutes
DefaultNumberOfBlocksToMaintainPerInterval* = 1000 DefaultNumBlocksPerInterval* = 1000
type BlockMaintainer* = ref object of RootObj type BlockMaintainer* = ref object of RootObj
repoStore: RepoStore repoStore: RepoStore

View File

@ -137,6 +137,14 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
trace "Checking network store for block existence", cid trace "Checking network store for block existence", cid
return await self.localStore.hasBlock(cid) return await self.localStore.hasBlock(cid)
method hasBlock*(
self: NetworkStore, tree: Cid, index: Natural
): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
trace "Checking network store for block existence", tree, index
return await self.localStore.hasBlock(tree, index)
method close*(self: NetworkStore): Future[void] {.async.} = method close*(self: NetworkStore): Future[void] {.async.} =
## Close the underlying local blockstore ## Close the underlying local blockstore
## ##

View File

@ -21,8 +21,8 @@ import ../../systemclock
import ../../units import ../../units
const const
DefaultBlockTtl* = 24.hours DefaultBlockTtl* = 30.days
DefaultQuotaBytes* = 8.GiBs DefaultQuotaBytes* = 20.GiBs
type type
QuotaNotEnoughError* = object of CodexError QuotaNotEnoughError* = object of CodexError

View File

@ -1,10 +0,0 @@
import pkg/chronos
proc asyncSpawn*(future: Future[void], ignore: type CatchableError) =
proc ignoringError() {.async.} =
try:
await future
except ignore:
discard
asyncSpawn ignoringError()

View File

@ -1,6 +1,7 @@
{.push raises: [].} {.push raises: [].}
import std/[tables, hashes], pkg/results, stew/shims/net as stewNet, chronos, chronicles import
std/[tables, hashes], pkg/results, pkg/stew/shims/net as stewNet, chronos, chronicles
import pkg/libp2p import pkg/libp2p

View File

@ -76,7 +76,7 @@ asyncchecksuite "Test Discovery Engine":
) )
await discoveryEngine.start() await discoveryEngine.start()
await allFuturesThrowing(allFinished(wants)).wait(1.seconds) await allFuturesThrowing(allFinished(wants)).wait(100.millis)
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should queue discovery request": test "Should queue discovery request":
@ -101,7 +101,7 @@ asyncchecksuite "Test Discovery Engine":
await discoveryEngine.start() await discoveryEngine.start()
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid]) discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
await want.wait(1.seconds) await want.wait(100.millis)
await discoveryEngine.stop() await discoveryEngine.stop()
test "Should not request more than minPeersPerBlock": test "Should not request more than minPeersPerBlock":

View File

@ -1,5 +1,6 @@
import std/sequtils import std/sequtils
import std/algorithm import std/algorithm
import std/importutils
import pkg/chronos import pkg/chronos
import pkg/stew/byteutils import pkg/stew/byteutils
@ -20,7 +21,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
peerCtx1, peerCtx2: BlockExcPeerCtx peerCtx1, peerCtx2: BlockExcPeerCtx
pricing1, pricing2: Pricing pricing1, pricing2: Pricing
blocks1, blocks2: seq[bt.Block] blocks1, blocks2: seq[bt.Block]
pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] pendingBlocks1, pendingBlocks2: seq[BlockHandle]
setup: setup:
blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb) blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
@ -56,7 +57,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
nodeCmps2.switch.peerInfo.peerId, nodeCmps2.switch.peerInfo.addrs nodeCmps2.switch.peerInfo.peerId, nodeCmps2.switch.peerInfo.addrs
) )
await sleepAsync(1.seconds) # give some time to exchange lists await sleepAsync(100.millis) # give some time to exchange lists
peerCtx2 = nodeCmps1.peerStore.get(nodeCmps2.switch.peerInfo.peerId) peerCtx2 = nodeCmps1.peerStore.get(nodeCmps2.switch.peerInfo.peerId)
peerCtx1 = nodeCmps2.peerStore.get(nodeCmps1.switch.peerInfo.peerId) peerCtx1 = nodeCmps2.peerStore.get(nodeCmps1.switch.peerInfo.peerId)
@ -75,7 +76,6 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
test "Should exchange blocks on connect": test "Should exchange blocks on connect":
await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds) await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds)
await allFuturesThrowing(allFinished(pendingBlocks2)).wait(10.seconds) await allFuturesThrowing(allFinished(pendingBlocks2)).wait(10.seconds)
check: check:
@ -178,7 +178,7 @@ asyncchecksuite "NetworkStore - multiple nodes":
(await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(nodes) await connectNodes(nodes)
await sleepAsync(1.seconds) await sleepAsync(100.millis)
await allFuturesThrowing(allFinished(pendingBlocks)) await allFuturesThrowing(allFinished(pendingBlocks))
@ -203,45 +203,9 @@ asyncchecksuite "NetworkStore - multiple nodes":
(await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet() (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(nodes) await connectNodes(nodes)
await sleepAsync(1.seconds) await sleepAsync(100.millis)
await allFuturesThrowing(allFinished(pendingBlocks1), allFinished(pendingBlocks2)) await allFuturesThrowing(allFinished(pendingBlocks1), allFinished(pendingBlocks2))
check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3] check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3]
check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15] check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15]
test "Should actively cancel want-haves if block received from elsewhere":
let
# Peer wanting to download blocks
downloader = nodes[4]
# Bystander peer - gets block request but can't satisfy them
bystander = nodes[3]
# Holder of actual blocks
blockHolder = nodes[1]
let aBlock = blocks[0]
(await blockHolder.engine.localStore.putBlock(aBlock)).tryGet()
await connectNodes(@[downloader, bystander])
# Downloader asks for block...
let blockRequest = downloader.engine.requestBlock(aBlock.cid)
# ... and bystander learns that downloader wants it, but can't provide it.
check eventually(
bystander.engine.peers
.get(downloader.switch.peerInfo.peerId).peerWants
.filterIt(it.address == aBlock.address).len == 1
)
# As soon as we connect the downloader to the blockHolder, the block should
# propagate to the downloader...
await connectNodes(@[downloader, blockHolder])
check (await blockRequest).tryGet().cid == aBlock.cid
check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet()
# ... and the bystander should have cancelled the want-have
check eventually(
bystander.engine.peers
.get(downloader.switch.peerInfo.peerId).peerWants
.filterIt(it.address == aBlock.address).len == 0
)

View File

@ -20,6 +20,11 @@ import ../../../asynctest
import ../../helpers import ../../helpers
import ../../examples import ../../examples
const NopSendWantCancellationsProc = proc(
id: PeerId, addresses: seq[BlockAddress]
) {.gcsafe, async.} =
discard
asyncchecksuite "NetworkStore engine basic": asyncchecksuite "NetworkStore engine basic":
var var
rng: Rng rng: Rng
@ -129,11 +134,6 @@ asyncchecksuite "NetworkStore engine handlers":
localStore: BlockStore localStore: BlockStore
blocks: seq[Block] blocks: seq[Block]
const NopSendWantCancellationsProc = proc(
id: PeerId, addresses: seq[BlockAddress]
) {.gcsafe, async.} =
discard
setup: setup:
rng = Rng.instance() rng = Rng.instance()
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
@ -292,7 +292,8 @@ asyncchecksuite "NetworkStore engine handlers":
await done.wait(100.millis) await done.wait(100.millis)
test "Should handle block presence": test "Should handle block presence":
var handles: Table[Cid, Future[Block]] var handles:
Table[Cid, Future[Block].Raising([CancelledError, RetriesExhaustedError])]
proc sendWantList( proc sendWantList(
id: PeerId, id: PeerId,
@ -333,6 +334,10 @@ asyncchecksuite "NetworkStore engine handlers":
blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq) cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq)
peerCtx.blocks = blocks.mapIt(
(it.address, Presence(address: it.address, have: true, price: UInt256.example))
).toTable
proc sendWantCancellations( proc sendWantCancellations(
id: PeerId, addresses: seq[BlockAddress] id: PeerId, addresses: seq[BlockAddress]
) {.gcsafe, async.} = ) {.gcsafe, async.} =
@ -344,9 +349,168 @@ asyncchecksuite "NetworkStore engine handlers":
) )
await engine.blocksDeliveryHandler(peerId, blocksDelivery) await engine.blocksDeliveryHandler(peerId, blocksDelivery)
discard await allFinished(pending) discard await allFinished(pending).wait(100.millis)
await allFuturesThrowing(cancellations.values().toSeq) await allFuturesThrowing(cancellations.values().toSeq)
asyncchecksuite "Block Download":
var
rng: Rng
seckey: PrivateKey
peerId: PeerId
chunker: Chunker
wallet: WalletRef
blockDiscovery: Discovery
peerStore: PeerCtxStore
pendingBlocks: PendingBlocksManager
network: BlockExcNetwork
engine: BlockExcEngine
discovery: DiscoveryEngine
advertiser: Advertiser
peerCtx: BlockExcPeerCtx
localStore: BlockStore
blocks: seq[Block]
setup:
rng = Rng.instance()
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(Block.new(chunk).tryGet())
seckey = PrivateKey.random(rng[]).tryGet()
peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet()
wallet = WalletRef.example
blockDiscovery = Discovery.new()
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
localStore = CacheStore.new()
network = BlockExcNetwork()
discovery =
DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
advertiser = Advertiser.new(localStore, blockDiscovery)
engine = BlockExcEngine.new(
localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks
)
peerCtx = BlockExcPeerCtx(id: peerId)
engine.peers.add(peerCtx)
test "Should exhaust retries":
var
retries = 2
address = BlockAddress.init(blocks[0].cid)
proc sendWantList(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
check wantType == WantHave
check not engine.pendingBlocks.isInFlight(address)
check engine.pendingBlocks.retries(address) == retries
retries -= 1
engine.pendingBlocks.blockRetries = 2
engine.pendingBlocks.retryInterval = 10.millis
engine.network =
BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList))
let pending = engine.requestBlock(address)
expect RetriesExhaustedError:
discard (await pending).tryGet()
test "Should retry block request":
let
address = BlockAddress.init(blocks[0].cid)
steps = newAsyncEvent()
proc sendWantList(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
case wantType
of WantHave:
check engine.pendingBlocks.isInFlight(address) == false
check engine.pendingBlocks.retriesExhausted(address) == false
steps.fire()
of WantBlock:
check engine.pendingBlocks.isInFlight(address) == true
check engine.pendingBlocks.retriesExhausted(address) == false
steps.fire()
engine.pendingBlocks.blockRetries = 10
engine.pendingBlocks.retryInterval = 10.millis
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc
)
)
let pending = engine.requestBlock(address)
await steps.wait()
# add blocks presence
peerCtx.blocks = blocks.mapIt(
(it.address, Presence(address: it.address, have: true, price: UInt256.example))
).toTable
steps.clear()
await steps.wait()
await engine.blocksDeliveryHandler(
peerId, @[BlockDelivery(blk: blocks[0], address: address)]
)
check (await pending).tryGet() == blocks[0]
test "Should cancel block request":
var
address = BlockAddress.init(blocks[0].cid)
done = newFuture[void]()
proc sendWantList(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false,
) {.gcsafe, async.} =
done.complete()
engine.pendingBlocks.blockRetries = 10
engine.pendingBlocks.retryInterval = 1.seconds
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc
)
)
let pending = engine.requestBlock(address)
await done.wait(100.millis)
pending.cancel()
expect CancelledError:
discard (await pending).tryGet()
asyncchecksuite "Task Handler": asyncchecksuite "Task Handler":
var var
rng: Rng rng: Rng

View File

@ -28,7 +28,10 @@ checksuite "Pending Blocks":
check blk.cid in pendingBlocks check blk.cid in pendingBlocks
pendingBlocks.resolve(@[blk].mapIt(BlockDelivery(blk: it, address: it.address))) pendingBlocks.resolve(@[blk].mapIt(BlockDelivery(blk: it, address: it.address)))
check (await handle) == blk await sleepAsync(0.millis)
# trigger the event loop, otherwise the block finishes before poll runs
let resolved = await handle
check resolved == blk
check blk.cid notin pendingBlocks check blk.cid notin pendingBlocks
test "Should cancel want handle": test "Should cancel want handle":
@ -41,20 +44,6 @@ checksuite "Pending Blocks":
await handle.cancelAndWait() await handle.cancelAndWait()
check blk.cid notin pendingBlocks check blk.cid notin pendingBlocks
test "Should expire want handle":
let
pendingBlocks = PendingBlocksManager.new()
blk = bt.Block.new("Hello".toBytes).tryGet
handle = pendingBlocks.getWantHandle(blk.cid, 1.millis)
check blk.cid in pendingBlocks
await sleepAsync(10.millis)
expect AsyncTimeoutError:
discard await handle
check blk.cid notin pendingBlocks
test "Should get wants list": test "Should get wants list":
let let
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
@ -79,3 +68,19 @@ checksuite "Pending Blocks":
check: check:
(await allFinished(wantHandles)).mapIt($it.read.cid).sorted(cmp[string]) == (await allFinished(wantHandles)).mapIt($it.read.cid).sorted(cmp[string]) ==
(await allFinished(handles)).mapIt($it.read.cid).sorted(cmp[string]) (await allFinished(handles)).mapIt($it.read.cid).sorted(cmp[string])
test "Should handle retry counters":
let
pendingBlocks = PendingBlocksManager.new(3)
blk = bt.Block.new("Hello".toBytes).tryGet
address = BlockAddress.init(blk.cid)
handle = pendingBlocks.getWantHandle(blk.cid)
check pendingBlocks.retries(address) == 3
pendingBlocks.decRetries(address)
check pendingBlocks.retries(address) == 2
pendingBlocks.decRetries(address)
check pendingBlocks.retries(address) == 1
pendingBlocks.decRetries(address)
check pendingBlocks.retries(address) == 0
check pendingBlocks.retriesExhausted(address)

View File

@ -123,7 +123,7 @@ template setupAndTearDown*() {.dirty.} =
) )
teardown: teardown:
close(file) file.close()
await node.stop() await node.stop()
await metaTmp.destroyDb() await metaTmp.destroyDb()
await repoTmp.destroyDb() await repoTmp.destroyDb()

View File

@ -64,21 +64,6 @@ asyncchecksuite "Test Node - Basic":
check: check:
fetched == manifest fetched == manifest
test "Should not lookup non-existing blocks twice":
# https://github.com/codex-storage/nim-codex/issues/699
let
cstore = CountingStore.new(engine, localStore)
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery, Taskpool.new())
missingCid =
Cid.init("zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()
engine.blockFetchTimeout = timer.milliseconds(100)
discard await node.retrieve(missingCid, local = false)
let lookupCount = cstore.lookups.getOrDefault(missingCid)
check lookupCount == 1
test "Block Batching": test "Block Batching":
let manifest = await storeDataGetManifest(localStore, chunker) let manifest = await storeDataGetManifest(localStore, chunker)
@ -93,17 +78,15 @@ asyncchecksuite "Test Node - Basic":
) )
).tryGet() ).tryGet()
test "Store and retrieve Data Stream": test "Should store Data Stream":
let let
stream = BufferStream.new() stream = BufferStream.new()
storeFut = node.store(stream) storeFut = node.store(stream)
oddChunkSize = math.trunc(DefaultBlockSize.float / 3.14).NBytes
# Let's check that node.store can correctly rechunk these odd chunks # Let's check that node.store can correctly rechunk these odd chunks
oddChunker = FileChunker.new(file = file, chunkSize = oddChunkSize, pad = false) oddChunker = FileChunker.new(file = file, chunkSize = 1024.NBytes, pad = false)
# TODO: doesn't work with pad=tue # don't pad, so `node.store` gets the correct size
var original: seq[byte] var original: seq[byte]
try: try:
while (let chunk = await oddChunker.getBytes(); chunk.len > 0): while (let chunk = await oddChunker.getBytes(); chunk.len > 0):
original &= chunk original &= chunk
@ -116,13 +99,35 @@ asyncchecksuite "Test Node - Basic":
manifestCid = (await storeFut).tryGet() manifestCid = (await storeFut).tryGet()
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
localManifest = Manifest.decode(manifestBlock).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet()
data = await (await node.retrieve(manifestCid)).drain()
var data: seq[byte]
for i in 0 ..< localManifest.blocksCount:
let blk = (await localStore.getBlock(localManifest.treeCid, i)).tryGet()
data &= blk.data
data.setLen(localManifest.datasetSize.int) # truncate data to original size
check: check:
data.len == localManifest.datasetSize.int
data.len == original.len data.len == original.len
sha256.digest(data) == sha256.digest(original) sha256.digest(data) == sha256.digest(original)
test "Should retrieve a Data Stream":
let
manifest = await storeDataGetManifest(localStore, chunker)
manifestBlk =
bt.Block.new(data = manifest.encode().tryGet, codec = ManifestCodec).tryGet()
(await localStore.putBlock(manifestBlk)).tryGet()
let data = await ((await node.retrieve(manifestBlk.cid)).tryGet()).drain()
var storedData: seq[byte]
for i in 0 ..< manifest.blocksCount:
let blk = (await localStore.getBlock(manifest.treeCid, i)).tryGet()
storedData &= blk.data
storedData.setLen(manifest.datasetSize.int) # truncate data to original size
check:
storedData == data
test "Retrieve One Block": test "Retrieve One Block":
let let
testString = "Block 1" testString = "Block 1"

View File

@ -37,7 +37,7 @@ twonodessuite "REST API":
let space = client1.space().tryGet() let space = client1.space().tryGet()
check: check:
space.totalBlocks == 2 space.totalBlocks == 2
space.quotaMaxBytes == 8589934592.NBytes space.quotaMaxBytes == 21474836480.NBytes
space.quotaUsedBytes == 65592.NBytes space.quotaUsedBytes == 65592.NBytes
space.quotaReservedBytes == 12.NBytes space.quotaReservedBytes == 12.NBytes

2
vendor/nim-serde vendored

@ -1 +1 @@
Subproject commit 69a7a0111addaa4aad885dd4bd7b5ee4684a06de Subproject commit c82e85c62436218592fbe876df5ac389ef8b964b