mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
feat: Block exchange optimizations (#1325)
Signed-off-by: Giuliano Mega <giuliano.mega@gmail.com> Signed-off-by: Chrysostomos Nanakos <chris@include.gr> Co-authored-by: gmega <giuliano.mega@gmail.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
parent
6147a751f1
commit
be759baf4d
@ -8,6 +8,7 @@
|
||||
## those terms.
|
||||
|
||||
import std/sequtils
|
||||
import std/algorithm
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/libp2p/cid
|
||||
@ -38,6 +39,7 @@ const
|
||||
DefaultConcurrentDiscRequests = 10
|
||||
DefaultDiscoveryTimeout = 1.minutes
|
||||
DefaultMinPeersPerBlock = 3
|
||||
DefaultMaxPeersPerBlock = 8
|
||||
DefaultDiscoveryLoopSleep = 3.seconds
|
||||
|
||||
type DiscoveryEngine* = ref object of RootObj
|
||||
@ -51,11 +53,32 @@ type DiscoveryEngine* = ref object of RootObj
|
||||
discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle
|
||||
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
|
||||
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
|
||||
minPeersPerBlock*: int # Max number of peers with block
|
||||
minPeersPerBlock*: int # Min number of peers with block
|
||||
maxPeersPerBlock*: int # Max number of peers with block
|
||||
discoveryLoopSleep: Duration # Discovery loop sleep
|
||||
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]]
|
||||
# Inflight discovery requests
|
||||
|
||||
proc cleanupExcessPeers(b: DiscoveryEngine, cid: Cid) {.gcsafe, raises: [].} =
|
||||
var haves = b.peers.peersHave(cid)
|
||||
let count = haves.len - b.maxPeersPerBlock
|
||||
if count <= 0:
|
||||
return
|
||||
|
||||
haves.sort(
|
||||
proc(a, b: BlockExcPeerCtx): int =
|
||||
cmp(a.lastExchange, b.lastExchange)
|
||||
)
|
||||
|
||||
let toRemove = haves[0 ..< count]
|
||||
for peer in toRemove:
|
||||
try:
|
||||
peer.cleanPresence(BlockAddress.init(cid))
|
||||
trace "Removed block presence from peer", cid, peer = peer.id
|
||||
except CatchableError as exc:
|
||||
error "Failed to clean presence for peer",
|
||||
cid, peer = peer.id, error = exc.msg, name = exc.name
|
||||
|
||||
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
|
||||
try:
|
||||
while b.discEngineRunning:
|
||||
@ -78,8 +101,16 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
|
||||
trace "Discovery request already in progress", cid
|
||||
continue
|
||||
|
||||
trace "Running discovery task for cid", cid
|
||||
|
||||
let haves = b.peers.peersHave(cid)
|
||||
|
||||
if haves.len > b.maxPeersPerBlock:
|
||||
trace "Cleaning up excess peers",
|
||||
cid, peers = haves.len, max = b.maxPeersPerBlock
|
||||
b.cleanupExcessPeers(cid)
|
||||
continue
|
||||
|
||||
if haves.len < b.minPeersPerBlock:
|
||||
let request = b.discovery.find(cid)
|
||||
b.inFlightDiscReqs[cid] = request
|
||||
@ -156,6 +187,7 @@ proc new*(
|
||||
concurrentDiscReqs = DefaultConcurrentDiscRequests,
|
||||
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
|
||||
minPeersPerBlock = DefaultMinPeersPerBlock,
|
||||
maxPeersPerBlock = DefaultMaxPeersPerBlock,
|
||||
): DiscoveryEngine =
|
||||
## Create a discovery engine instance for advertising services
|
||||
##
|
||||
@ -171,4 +203,5 @@ proc new*(
|
||||
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
|
||||
discoveryLoopSleep: discoveryLoopSleep,
|
||||
minPeersPerBlock: minPeersPerBlock,
|
||||
maxPeersPerBlock: maxPeersPerBlock,
|
||||
)
|
||||
|
||||
@ -12,12 +12,14 @@ import std/sets
|
||||
import std/options
|
||||
import std/algorithm
|
||||
import std/sugar
|
||||
import std/random
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/libp2p/[cid, switch, multihash, multicodec]
|
||||
import pkg/metrics
|
||||
import pkg/stint
|
||||
import pkg/questionable
|
||||
import pkg/stew/shims/sets
|
||||
|
||||
import ../../rng
|
||||
import ../../stores/blockstore
|
||||
@ -63,30 +65,59 @@ declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sen
|
||||
declareCounter(
|
||||
codex_block_exchange_blocks_received, "codex blockexchange blocks received"
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_spurious_blocks_received,
|
||||
"codex blockexchange unrequested/duplicate blocks received",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_discovery_requests_total,
|
||||
"Total number of peer discovery requests sent",
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_peer_timeouts_total, "Total number of peer activity timeouts"
|
||||
)
|
||||
declareCounter(
|
||||
codex_block_exchange_requests_failed_total,
|
||||
"Total number of block requests that failed after exhausting retries",
|
||||
)
|
||||
|
||||
const
|
||||
DefaultMaxPeersPerRequest* = 10
|
||||
# The default max message length of nim-libp2p is 100 megabytes, meaning we can
|
||||
# in principle fit up to 1600 64k blocks per message, so 20 is well under
|
||||
# that number.
|
||||
DefaultMaxBlocksPerMessage = 20
|
||||
DefaultTaskQueueSize = 100
|
||||
DefaultConcurrentTasks = 10
|
||||
# Don't do more than one discovery request per `DiscoveryRateLimit` seconds.
|
||||
DiscoveryRateLimit = 3.seconds
|
||||
DefaultPeerActivityTimeout = 1.minutes
|
||||
# Match MaxWantListBatchSize to efficiently respond to incoming WantLists
|
||||
PresenceBatchSize = MaxWantListBatchSize
|
||||
CleanupBatchSize = 2048
|
||||
|
||||
type
|
||||
TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.}
|
||||
TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.}
|
||||
PeerSelector* =
|
||||
proc(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx {.gcsafe, raises: [].}
|
||||
|
||||
BlockExcEngine* = ref object of RootObj
|
||||
localStore*: BlockStore # Local block store for this instance
|
||||
network*: BlockExcNetwork # Petwork interface
|
||||
network*: BlockExcNetwork # Network interface
|
||||
peers*: PeerCtxStore # Peers we're currently actively exchanging with
|
||||
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx]
|
||||
# Peers we're currently processing tasks for
|
||||
selectPeer*: PeerSelector # Peers we're currently processing tasks for
|
||||
concurrentTasks: int # Number of concurrent peers we're serving at any given time
|
||||
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
|
||||
blockexcRunning: bool # Indicates if the blockexc task is running
|
||||
maxBlocksPerMessage: int
|
||||
# Maximum number of blocks we can squeeze in a single message
|
||||
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
|
||||
wallet*: WalletRef # Nitro wallet for micropayments
|
||||
pricing*: ?Pricing # Optional bandwidth pricing
|
||||
discovery*: DiscoveryEngine
|
||||
advertiser*: Advertiser
|
||||
lastDiscRequest: Moment # time of last discovery request
|
||||
|
||||
Pricing* = object
|
||||
address*: EthAddress
|
||||
@ -104,7 +135,6 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).}
|
||||
proc start*(self: BlockExcEngine) {.async: (raises: []).} =
|
||||
## Start the blockexc task
|
||||
##
|
||||
|
||||
await self.discovery.start()
|
||||
await self.advertiser.start()
|
||||
|
||||
@ -154,8 +184,145 @@ proc sendWantBlock(
|
||||
) # we want this remote to send us a block
|
||||
codex_block_exchange_want_block_lists_sent.inc()
|
||||
|
||||
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
|
||||
Rng.instance.sample(peers)
|
||||
proc sendBatchedWantList(
|
||||
self: BlockExcEngine,
|
||||
peer: BlockExcPeerCtx,
|
||||
addresses: seq[BlockAddress],
|
||||
full: bool,
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
var offset = 0
|
||||
while offset < addresses.len:
|
||||
let batchEnd = min(offset + MaxWantListBatchSize, addresses.len)
|
||||
let batch = addresses[offset ..< batchEnd]
|
||||
|
||||
trace "Sending want list batch",
|
||||
peer = peer.id,
|
||||
batchSize = batch.len,
|
||||
offset = offset,
|
||||
total = addresses.len,
|
||||
full = full
|
||||
|
||||
await self.network.request.sendWantList(
|
||||
peer.id, batch, full = (full and offset == 0)
|
||||
)
|
||||
for address in batch:
|
||||
peer.lastSentWants.incl(address)
|
||||
|
||||
offset = batchEnd
|
||||
|
||||
proc refreshBlockKnowledge(
|
||||
self: BlockExcEngine, peer: BlockExcPeerCtx, skipDelta = false, resetBackoff = false
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
if peer.lastSentWants.len > 0:
|
||||
var toRemove: seq[BlockAddress]
|
||||
|
||||
for address in peer.lastSentWants:
|
||||
if address notin self.pendingBlocks:
|
||||
toRemove.add(address)
|
||||
|
||||
if toRemove.len >= CleanupBatchSize:
|
||||
await idleAsync()
|
||||
break
|
||||
|
||||
for addr in toRemove:
|
||||
peer.lastSentWants.excl(addr)
|
||||
|
||||
if self.pendingBlocks.wantListLen == 0:
|
||||
if peer.lastSentWants.len > 0:
|
||||
trace "Clearing want list tracking, no pending blocks", peer = peer.id
|
||||
peer.lastSentWants.clear()
|
||||
return
|
||||
|
||||
# We send only blocks that the peer hasn't already told us that they already have.
|
||||
let
|
||||
peerHave = peer.peerHave
|
||||
toAsk = toHashSet(self.pendingBlocks.wantList.toSeq.filterIt(it notin peerHave))
|
||||
|
||||
if toAsk.len == 0:
|
||||
if peer.lastSentWants.len > 0:
|
||||
trace "Clearing want list tracking, peer has all blocks", peer = peer.id
|
||||
peer.lastSentWants.clear()
|
||||
return
|
||||
|
||||
let newWants = toAsk - peer.lastSentWants
|
||||
|
||||
if peer.lastSentWants.len > 0 and not skipDelta:
|
||||
if newWants.len > 0:
|
||||
trace "Sending delta want list update",
|
||||
peer = peer.id, newWants = newWants.len, totalWants = toAsk.len
|
||||
|
||||
await self.sendBatchedWantList(peer, newWants.toSeq, full = false)
|
||||
|
||||
if resetBackoff:
|
||||
peer.wantsUpdated
|
||||
else:
|
||||
trace "No changes in want list, skipping send", peer = peer.id
|
||||
peer.lastSentWants = toAsk
|
||||
else:
|
||||
trace "Sending full want list", peer = peer.id, length = toAsk.len
|
||||
|
||||
await self.sendBatchedWantList(peer, toAsk.toSeq, full = true)
|
||||
|
||||
if resetBackoff:
|
||||
peer.wantsUpdated
|
||||
|
||||
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
for peer in self.peers.peers.values.toSeq:
|
||||
# We refresh block knowledge if:
|
||||
# 1. the peer hasn't been refreshed in a while;
|
||||
# 2. the list of blocks we care about has changed.
|
||||
#
|
||||
# Note that because of (2), it is important that we update our
|
||||
# want list in the coarsest way possible instead of over many
|
||||
# small updates.
|
||||
#
|
||||
|
||||
# In dynamic swarms, staleness will dominate latency.
|
||||
let
|
||||
hasNewBlocks = peer.lastRefresh < self.pendingBlocks.lastInclusion
|
||||
isKnowledgeStale = peer.isKnowledgeStale
|
||||
|
||||
if isKnowledgeStale or hasNewBlocks:
|
||||
if not peer.refreshInProgress:
|
||||
peer.refreshRequested()
|
||||
await self.refreshBlockKnowledge(
|
||||
peer, skipDelta = isKnowledgeStale, resetBackoff = hasNewBlocks
|
||||
)
|
||||
else:
|
||||
trace "Not refreshing: peer is up to date", peer = peer.id
|
||||
|
||||
if (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
try:
|
||||
await idleAsync()
|
||||
except CancelledError:
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
proc searchForNewPeers(self: BlockExcEngine, cid: Cid) =
|
||||
if self.lastDiscRequest + DiscoveryRateLimit < Moment.now():
|
||||
trace "Searching for new peers for", cid = cid
|
||||
codex_block_exchange_discovery_requests_total.inc()
|
||||
self.lastDiscRequest = Moment.now() # always refresh before calling await!
|
||||
self.discovery.queueFindBlocksReq(@[cid])
|
||||
else:
|
||||
trace "Not searching for new peers, rate limit not expired", cid = cid
|
||||
|
||||
proc evictPeer(self: BlockExcEngine, peer: PeerId) =
|
||||
## Cleanup disconnected peer
|
||||
##
|
||||
|
||||
trace "Evicting disconnected/departed peer", peer
|
||||
|
||||
let peerCtx = self.peers.get(peer)
|
||||
if not peerCtx.isNil:
|
||||
for address in peerCtx.blocksRequested:
|
||||
self.pendingBlocks.clearRequest(address, peer.some)
|
||||
|
||||
# drop the peer from the peers table
|
||||
self.peers.remove(peer)
|
||||
|
||||
proc downloadInternal(
|
||||
self: BlockExcEngine, address: BlockAddress
|
||||
@ -173,41 +340,147 @@ proc downloadInternal(
|
||||
|
||||
if self.pendingBlocks.retriesExhausted(address):
|
||||
trace "Error retries exhausted"
|
||||
codex_block_exchange_requests_failed_total.inc()
|
||||
handle.fail(newException(RetriesExhaustedError, "Error retries exhausted"))
|
||||
break
|
||||
|
||||
trace "Running retry handle"
|
||||
let peers = self.peers.getPeersForBlock(address)
|
||||
logScope:
|
||||
peersWith = peers.with.len
|
||||
peersWithout = peers.without.len
|
||||
|
||||
trace "Peers for block"
|
||||
if peers.with.len > 0:
|
||||
self.pendingBlocks.setInFlight(address, true)
|
||||
await self.sendWantBlock(@[address], peers.with.randomPeer)
|
||||
else:
|
||||
self.pendingBlocks.setInFlight(address, false)
|
||||
if peers.with.len == 0:
|
||||
# We know of no peers that have the block.
|
||||
if peers.without.len > 0:
|
||||
await self.sendWantHave(@[address], peers.without)
|
||||
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
||||
# If we have peers connected but none of them have the block, this
|
||||
# could be because our knowledge about what they have has run stale.
|
||||
# Tries to refresh it.
|
||||
await self.refreshBlockKnowledge()
|
||||
# Also tries to look for new peers for good measure.
|
||||
# TODO: in the future, peer search and knowledge maintenance should
|
||||
# be completely decoupled from one another. It is very hard to
|
||||
# control what happens and how many neighbors we get like this.
|
||||
self.searchForNewPeers(address.cidOrTreeCid)
|
||||
|
||||
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
|
||||
let nextDiscovery =
|
||||
if self.lastDiscRequest + DiscoveryRateLimit > Moment.now():
|
||||
(self.lastDiscRequest + DiscoveryRateLimit - Moment.now())
|
||||
else:
|
||||
0.milliseconds
|
||||
|
||||
let retryDelay =
|
||||
max(secs(rand(self.pendingBlocks.retryInterval.secs)), nextDiscovery)
|
||||
|
||||
# We now wait for a bit and then retry. If the handle gets completed in the
|
||||
# meantime (cause the presence handler might have requested the block and
|
||||
# received it in the meantime), we are done. Retry delays are randomized
|
||||
# so we don't get all block loops spinning at the same time.
|
||||
await handle or sleepAsync(retryDelay)
|
||||
if handle.finished:
|
||||
break
|
||||
|
||||
# Without decrementing the retries count, this would infinitely loop
|
||||
# trying to find peers.
|
||||
self.pendingBlocks.decRetries(address)
|
||||
|
||||
# If we still don't have the block, we'll go for another cycle.
|
||||
trace "No peers for block, will retry shortly"
|
||||
continue
|
||||
|
||||
# Once again, it might happen that the block was requested to a peer
|
||||
# in the meantime. If so, we don't need to do anything. Otherwise,
|
||||
# we'll be the ones placing the request.
|
||||
let scheduledPeer =
|
||||
if not self.pendingBlocks.isRequested(address):
|
||||
let peer = self.selectPeer(peers.with)
|
||||
discard self.pendingBlocks.markRequested(address, peer.id)
|
||||
peer.blockRequestScheduled(address)
|
||||
trace "Request block from block retry loop"
|
||||
await self.sendWantBlock(@[address], peer)
|
||||
peer
|
||||
else:
|
||||
let peerId = self.pendingBlocks.getRequestPeer(address).get()
|
||||
self.peers.get(peerId)
|
||||
|
||||
if scheduledPeer.isNil:
|
||||
trace "Scheduled peer no longer available, clearing stale request", address
|
||||
self.pendingBlocks.clearRequest(address)
|
||||
continue
|
||||
|
||||
# Parks until either the block is received, or the peer times out.
|
||||
let activityTimer = scheduledPeer.activityTimer()
|
||||
await handle or activityTimer # TODO: or peerDropped
|
||||
activityTimer.cancel()
|
||||
|
||||
# XXX: we should probably not have this. Blocks should be retried
|
||||
# to infinity unless cancelled by the client.
|
||||
self.pendingBlocks.decRetries(address)
|
||||
|
||||
if handle.finished:
|
||||
trace "Handle for block finished", failed = handle.failed
|
||||
break
|
||||
else:
|
||||
# If the peer timed out, retries immediately.
|
||||
trace "Peer timed out during block request", peer = scheduledPeer.id
|
||||
codex_block_exchange_peer_timeouts_total.inc()
|
||||
await self.network.dropPeer(scheduledPeer.id)
|
||||
# Evicts peer immediately or we may end up picking it again in the
|
||||
# next retry.
|
||||
self.evictPeer(scheduledPeer.id)
|
||||
except CancelledError as exc:
|
||||
trace "Block download cancelled"
|
||||
if not handle.finished:
|
||||
await handle.cancelAndWait()
|
||||
except RetriesExhaustedError as exc:
|
||||
warn "Retries exhausted for block", address, exc = exc.msg
|
||||
codex_block_exchange_requests_failed_total.inc()
|
||||
if not handle.finished:
|
||||
handle.fail(exc)
|
||||
finally:
|
||||
self.pendingBlocks.setInFlight(address, false)
|
||||
self.pendingBlocks.clearRequest(address)
|
||||
|
||||
proc requestBlocks*(
|
||||
self: BlockExcEngine, addresses: seq[BlockAddress]
|
||||
): SafeAsyncIter[Block] =
|
||||
var handles: seq[BlockHandle]
|
||||
|
||||
# Adds all blocks to pendingBlocks before calling the first downloadInternal. This will
|
||||
# ensure that we don't send incomplete want lists.
|
||||
for address in addresses:
|
||||
if address notin self.pendingBlocks:
|
||||
handles.add(self.pendingBlocks.getWantHandle(address))
|
||||
|
||||
for address in addresses:
|
||||
self.trackedFutures.track(self.downloadInternal(address))
|
||||
|
||||
let totalHandles = handles.len
|
||||
var completed = 0
|
||||
|
||||
proc isFinished(): bool =
|
||||
completed == totalHandles
|
||||
|
||||
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
# Be it success or failure, we're completing this future.
|
||||
let value =
|
||||
try:
|
||||
# FIXME: this is super expensive. We're doing several linear scans,
|
||||
# not to mention all the copying and callback fumbling in `one`.
|
||||
let
|
||||
handle = await one(handles)
|
||||
i = handles.find(handle)
|
||||
handles.del(i)
|
||||
success await handle
|
||||
except CancelledError as err:
|
||||
warn "Block request cancelled", addresses, err = err.msg
|
||||
raise err
|
||||
except CatchableError as err:
|
||||
error "Error getting blocks from exchange engine", addresses, err = err.msg
|
||||
failure err
|
||||
|
||||
inc(completed)
|
||||
return value
|
||||
|
||||
return SafeAsyncIter[Block].new(genNext, isFinished)
|
||||
|
||||
proc requestBlock*(
|
||||
self: BlockExcEngine, address: BlockAddress
|
||||
@ -239,60 +512,64 @@ proc completeBlock*(self: BlockExcEngine, address: BlockAddress, blk: Block) =
|
||||
proc blockPresenceHandler*(
|
||||
self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
|
||||
) {.async: (raises: []).} =
|
||||
trace "Received block presence from peer", peer, blocks = blocks.mapIt($it)
|
||||
trace "Received block presence from peer", peer, len = blocks.len
|
||||
let
|
||||
peerCtx = self.peers.get(peer)
|
||||
ourWantList = toSeq(self.pendingBlocks.wantList)
|
||||
ourWantList = toHashSet(self.pendingBlocks.wantList.toSeq)
|
||||
|
||||
if peerCtx.isNil:
|
||||
return
|
||||
|
||||
peerCtx.refreshReplied()
|
||||
|
||||
for blk in blocks:
|
||||
if presence =? Presence.init(blk):
|
||||
peerCtx.setPresence(presence)
|
||||
|
||||
let
|
||||
peerHave = peerCtx.peerHave
|
||||
dontWantCids = peerHave.filterIt(it notin ourWantList)
|
||||
dontWantCids = peerHave - ourWantList
|
||||
|
||||
if dontWantCids.len > 0:
|
||||
peerCtx.cleanPresence(dontWantCids)
|
||||
peerCtx.cleanPresence(dontWantCids.toSeq)
|
||||
|
||||
let ourWantCids = ourWantList.filterIt(
|
||||
it in peerHave and not self.pendingBlocks.retriesExhausted(it) and
|
||||
not self.pendingBlocks.isInFlight(it)
|
||||
)
|
||||
self.pendingBlocks.markRequested(it, peer)
|
||||
).toSeq
|
||||
|
||||
for address in ourWantCids:
|
||||
self.pendingBlocks.setInFlight(address, true)
|
||||
self.pendingBlocks.decRetries(address)
|
||||
peerCtx.blockRequestScheduled(address)
|
||||
|
||||
if ourWantCids.len > 0:
|
||||
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
|
||||
# FIXME: this will result in duplicate requests for blocks
|
||||
if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption:
|
||||
warn "Failed to send wantBlock to peer", peer, err = err.msg
|
||||
for address in ourWantCids:
|
||||
self.pendingBlocks.clearRequest(address, peer.some)
|
||||
|
||||
proc scheduleTasks(
|
||||
self: BlockExcEngine, blocksDelivery: seq[BlockDelivery]
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
let cids = blocksDelivery.mapIt(it.blk.cid)
|
||||
|
||||
# schedule any new peers to provide blocks to
|
||||
for p in self.peers:
|
||||
for c in cids: # for each cid
|
||||
for blockDelivery in blocksDelivery: # for each cid
|
||||
# schedule a peer if it wants at least one cid
|
||||
# and we have it in our local store
|
||||
if c in p.peerWantsCids:
|
||||
if blockDelivery.address in p.wantedBlocks:
|
||||
let cid = blockDelivery.blk.cid
|
||||
try:
|
||||
if await (c in self.localStore):
|
||||
if await (cid in self.localStore):
|
||||
# TODO: the try/except should go away once blockstore tracks exceptions
|
||||
self.scheduleTask(p)
|
||||
break
|
||||
except CancelledError as exc:
|
||||
warn "Checking local store canceled", cid = c, err = exc.msg
|
||||
warn "Checking local store canceled", cid = cid, err = exc.msg
|
||||
return
|
||||
except CatchableError as exc:
|
||||
error "Error checking local store for cid", cid = c, err = exc.msg
|
||||
error "Error checking local store for cid", cid = cid, err = exc.msg
|
||||
raiseAssert "Unexpected error checking local store for cid"
|
||||
|
||||
proc cancelBlocks(
|
||||
@ -301,28 +578,45 @@ proc cancelBlocks(
|
||||
## Tells neighboring peers that we're no longer interested in a block.
|
||||
##
|
||||
|
||||
let blocksDelivered = toHashSet(addrs)
|
||||
var scheduledCancellations: Table[PeerId, HashSet[BlockAddress]]
|
||||
|
||||
if self.peers.len == 0:
|
||||
return
|
||||
|
||||
trace "Sending block request cancellations to peers",
|
||||
addrs, peers = self.peers.peerIds
|
||||
|
||||
proc processPeer(peerCtx: BlockExcPeerCtx): Future[BlockExcPeerCtx] {.async.} =
|
||||
proc dispatchCancellations(
|
||||
entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]]
|
||||
): Future[PeerId] {.async: (raises: [CancelledError]).} =
|
||||
trace "Sending block request cancellations to peer",
|
||||
peer = entry.peerId, addresses = entry.addresses.len
|
||||
await self.network.request.sendWantCancellations(
|
||||
peer = peerCtx.id, addresses = addrs.filterIt(it in peerCtx)
|
||||
peer = entry.peerId, addresses = entry.addresses.toSeq
|
||||
)
|
||||
|
||||
return peerCtx
|
||||
return entry.peerId
|
||||
|
||||
try:
|
||||
let (succeededFuts, failedFuts) = await allFinishedFailed[BlockExcPeerCtx](
|
||||
toSeq(self.peers.peers.values).filterIt(it.peerHave.anyIt(it in addrs)).map(
|
||||
processPeer
|
||||
)
|
||||
for peerCtx in self.peers.peers.values:
|
||||
# Do we have pending requests, towards this peer, for any of the blocks
|
||||
# that were just delivered?
|
||||
let intersection = peerCtx.blocksRequested.intersection(blocksDelivered)
|
||||
if intersection.len > 0:
|
||||
# If so, schedules a cancellation.
|
||||
scheduledCancellations[peerCtx.id] = intersection
|
||||
|
||||
if scheduledCancellations.len == 0:
|
||||
return
|
||||
|
||||
let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId](
|
||||
toSeq(scheduledCancellations.pairs).map(dispatchCancellations)
|
||||
)
|
||||
|
||||
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerCtx: BlockExcPeerCtx):
|
||||
peerCtx.cleanPresence(addrs)
|
||||
(await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId):
|
||||
let ctx = self.peers.get(peerId)
|
||||
if not ctx.isNil:
|
||||
ctx.cleanPresence(addrs)
|
||||
for address in scheduledCancellations[peerId]:
|
||||
ctx.blockRequestCancelled(address)
|
||||
|
||||
if failedFuts.len > 0:
|
||||
warn "Failed to send block request cancellations to peers", peers = failedFuts.len
|
||||
@ -392,17 +686,31 @@ proc validateBlockDelivery(self: BlockExcEngine, bd: BlockDelivery): ?!void =
|
||||
return success()
|
||||
|
||||
proc blocksDeliveryHandler*(
|
||||
self: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]
|
||||
self: BlockExcEngine,
|
||||
peer: PeerId,
|
||||
blocksDelivery: seq[BlockDelivery],
|
||||
allowSpurious: bool = false,
|
||||
) {.async: (raises: []).} =
|
||||
trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address))
|
||||
|
||||
var validatedBlocksDelivery: seq[BlockDelivery]
|
||||
let peerCtx = self.peers.get(peer)
|
||||
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
for bd in blocksDelivery:
|
||||
logScope:
|
||||
peer = peer
|
||||
address = bd.address
|
||||
|
||||
try:
|
||||
# Unknown peers and unrequested blocks are dropped with a warning.
|
||||
if not allowSpurious and (peerCtx == nil or not peerCtx.blockReceived(bd.address)):
|
||||
warn "Dropping unrequested or duplicate block received from peer"
|
||||
codex_block_exchange_spurious_blocks_received.inc()
|
||||
continue
|
||||
|
||||
if err =? self.validateBlockDelivery(bd).errorOption:
|
||||
warn "Block validation failed", msg = err.msg
|
||||
continue
|
||||
@ -422,15 +730,25 @@ proc blocksDeliveryHandler*(
|
||||
).errorOption:
|
||||
warn "Unable to store proof and cid for a block"
|
||||
continue
|
||||
except CancelledError:
|
||||
trace "Block delivery handling cancelled"
|
||||
except CatchableError as exc:
|
||||
warn "Error handling block delivery", error = exc.msg
|
||||
continue
|
||||
|
||||
validatedBlocksDelivery.add(bd)
|
||||
|
||||
if (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
try:
|
||||
await idleAsync()
|
||||
except CancelledError:
|
||||
discard
|
||||
except CatchableError:
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
codex_block_exchange_blocks_received.inc(validatedBlocksDelivery.len.int64)
|
||||
|
||||
let peerCtx = self.peers.get(peer)
|
||||
if peerCtx != nil:
|
||||
if err =? catch(await self.payForBlocks(peerCtx, blocksDelivery)).errorOption:
|
||||
warn "Error paying for blocks", err = err.msg
|
||||
@ -454,16 +772,17 @@ proc wantListHandler*(
|
||||
presence: seq[BlockPresence]
|
||||
schedulePeer = false
|
||||
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
try:
|
||||
for e in wantList.entries:
|
||||
let idx = peerCtx.peerWants.findIt(it.address == e.address)
|
||||
|
||||
logScope:
|
||||
peer = peerCtx.id
|
||||
address = e.address
|
||||
wantType = $e.wantType
|
||||
|
||||
if idx < 0: # Adding new entry to peer wants
|
||||
if e.address notin peerCtx.wantedBlocks: # Adding new entry to peer wants
|
||||
let
|
||||
have =
|
||||
try:
|
||||
@ -474,6 +793,8 @@ proc wantListHandler*(
|
||||
price = @(self.pricing.get(Pricing(price: 0.u256)).price.toBytesBE)
|
||||
|
||||
if e.cancel:
|
||||
# This is sort of expected if we sent the block to the peer, as we have removed
|
||||
# it from the peer's wantlist ourselves.
|
||||
trace "Received cancelation for untracked block, skipping",
|
||||
address = e.address
|
||||
continue
|
||||
@ -482,12 +803,14 @@ proc wantListHandler*(
|
||||
case e.wantType
|
||||
of WantType.WantHave:
|
||||
if have:
|
||||
trace "We HAVE the block", address = e.address
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
address: e.address, `type`: BlockPresenceType.Have, price: price
|
||||
)
|
||||
)
|
||||
else:
|
||||
trace "We DON'T HAVE the block", address = e.address
|
||||
if e.sendDontHave:
|
||||
presence.add(
|
||||
BlockPresence(
|
||||
@ -497,28 +820,35 @@ proc wantListHandler*(
|
||||
|
||||
codex_block_exchange_want_have_lists_received.inc()
|
||||
of WantType.WantBlock:
|
||||
peerCtx.peerWants.add(e)
|
||||
peerCtx.wantedBlocks.incl(e.address)
|
||||
schedulePeer = true
|
||||
codex_block_exchange_want_block_lists_received.inc()
|
||||
else: # Updating existing entry in peer wants
|
||||
# peer doesn't want this block anymore
|
||||
if e.cancel:
|
||||
trace "Canceling want for block", address = e.address
|
||||
peerCtx.peerWants.del(idx)
|
||||
peerCtx.wantedBlocks.excl(e.address)
|
||||
trace "Canceled block request",
|
||||
address = e.address, len = peerCtx.peerWants.len
|
||||
address = e.address, len = peerCtx.wantedBlocks.len
|
||||
else:
|
||||
trace "Peer has requested a block more than once", address = e.address
|
||||
if e.wantType == WantType.WantBlock:
|
||||
schedulePeer = true
|
||||
# peer might want to ask for the same cid with
|
||||
# different want params
|
||||
trace "Updating want for block", address = e.address
|
||||
peerCtx.peerWants[idx] = e # update entry
|
||||
trace "Updated block request",
|
||||
address = e.address, len = peerCtx.peerWants.len
|
||||
|
||||
if presence.len >= PresenceBatchSize or (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
if presence.len > 0:
|
||||
trace "Sending presence batch to remote", items = presence.len
|
||||
await self.network.request.sendPresence(peer, presence)
|
||||
presence = @[]
|
||||
try:
|
||||
await idleAsync()
|
||||
except CancelledError:
|
||||
discard
|
||||
lastIdle = Moment.now()
|
||||
|
||||
# Send any remaining presence messages
|
||||
if presence.len > 0:
|
||||
trace "Sending presence to remote", items = presence.mapIt($it).join(",")
|
||||
trace "Sending final presence to remote", items = presence.len
|
||||
await self.network.request.sendPresence(peer, presence)
|
||||
|
||||
if schedulePeer:
|
||||
@ -550,7 +880,7 @@ proc paymentHandler*(
|
||||
else:
|
||||
context.paymentChannel = self.wallet.acceptChannel(payment).option
|
||||
|
||||
proc setupPeer*(
|
||||
proc peerAddedHandler*(
|
||||
self: BlockExcEngine, peer: PeerId
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
## Perform initial setup, such as want
|
||||
@ -560,88 +890,85 @@ proc setupPeer*(
|
||||
trace "Setting up peer", peer
|
||||
|
||||
if peer notin self.peers:
|
||||
let peerCtx = BlockExcPeerCtx(id: peer, activityTimeout: DefaultPeerActivityTimeout)
|
||||
trace "Setting up new peer", peer
|
||||
self.peers.add(BlockExcPeerCtx(id: peer))
|
||||
self.peers.add(peerCtx)
|
||||
trace "Added peer", peers = self.peers.len
|
||||
|
||||
# broadcast our want list, the other peer will do the same
|
||||
if self.pendingBlocks.wantListLen > 0:
|
||||
trace "Sending our want list to a peer", peer
|
||||
let cids = toSeq(self.pendingBlocks.wantList)
|
||||
await self.network.request.sendWantList(peer, cids, full = true)
|
||||
await self.refreshBlockKnowledge(peerCtx)
|
||||
|
||||
if address =? self.pricing .? address:
|
||||
trace "Sending account to peer", peer
|
||||
await self.network.request.sendAccount(peer, Account(address: address))
|
||||
|
||||
proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} =
|
||||
## Cleanup disconnected peer
|
||||
##
|
||||
proc localLookup(
|
||||
self: BlockExcEngine, address: BlockAddress
|
||||
): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} =
|
||||
if address.leaf:
|
||||
(await self.localStore.getBlockAndProof(address.treeCid, address.index)).map(
|
||||
(blkAndProof: (Block, CodexProof)) =>
|
||||
BlockDelivery(address: address, blk: blkAndProof[0], proof: blkAndProof[1].some)
|
||||
)
|
||||
else:
|
||||
(await self.localStore.getBlock(address)).map(
|
||||
(blk: Block) => BlockDelivery(address: address, blk: blk, proof: CodexProof.none)
|
||||
)
|
||||
|
||||
trace "Dropping peer", peer
|
||||
iterator splitBatches[T](sequence: seq[T], batchSize: int): seq[T] =
|
||||
var batch: seq[T]
|
||||
for element in sequence:
|
||||
if batch.len == batchSize:
|
||||
yield batch
|
||||
batch = @[]
|
||||
batch.add(element)
|
||||
|
||||
# drop the peer from the peers table
|
||||
self.peers.remove(peer)
|
||||
if batch.len > 0:
|
||||
yield batch
|
||||
|
||||
proc taskHandler*(
|
||||
self: BlockExcEngine, task: BlockExcPeerCtx
|
||||
self: BlockExcEngine, peerCtx: BlockExcPeerCtx
|
||||
) {.gcsafe, async: (raises: [CancelledError, RetriesExhaustedError]).} =
|
||||
# Send to the peer blocks he wants to get,
|
||||
# if they present in our local store
|
||||
|
||||
# TODO: There should be all sorts of accounting of
|
||||
# bytes sent/received here
|
||||
# Blocks that have been sent have already been picked up by other tasks and
|
||||
# should not be re-sent.
|
||||
var
|
||||
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it))
|
||||
sent: HashSet[BlockAddress]
|
||||
|
||||
var wantsBlocks =
|
||||
task.peerWants.filterIt(it.wantType == WantType.WantBlock and not it.inFlight)
|
||||
trace "Running task for peer", peer = peerCtx.id
|
||||
|
||||
proc updateInFlight(addresses: seq[BlockAddress], inFlight: bool) =
|
||||
for peerWant in task.peerWants.mitems:
|
||||
if peerWant.address in addresses:
|
||||
peerWant.inFlight = inFlight
|
||||
for wantedBlock in wantedBlocks:
|
||||
peerCtx.markBlockAsSent(wantedBlock)
|
||||
|
||||
if wantsBlocks.len > 0:
|
||||
# Mark wants as in-flight.
|
||||
let wantAddresses = wantsBlocks.mapIt(it.address)
|
||||
updateInFlight(wantAddresses, true)
|
||||
wantsBlocks.sort(SortOrder.Descending)
|
||||
try:
|
||||
for batch in wantedBlocks.toSeq.splitBatches(self.maxBlocksPerMessage):
|
||||
var blockDeliveries: seq[BlockDelivery]
|
||||
for wantedBlock in batch:
|
||||
# I/O is blocking so looking up blocks sequentially is fine.
|
||||
without blockDelivery =? await self.localLookup(wantedBlock), err:
|
||||
error "Error getting block from local store",
|
||||
err = err.msg, address = wantedBlock
|
||||
peerCtx.markBlockAsNotSent(wantedBlock)
|
||||
continue
|
||||
blockDeliveries.add(blockDelivery)
|
||||
sent.incl(wantedBlock)
|
||||
|
||||
proc localLookup(e: WantListEntry): Future[?!BlockDelivery] {.async.} =
|
||||
if e.address.leaf:
|
||||
(await self.localStore.getBlockAndProof(e.address.treeCid, e.address.index)).map(
|
||||
(blkAndProof: (Block, CodexProof)) =>
|
||||
BlockDelivery(
|
||||
address: e.address, blk: blkAndProof[0], proof: blkAndProof[1].some
|
||||
)
|
||||
)
|
||||
else:
|
||||
(await self.localStore.getBlock(e.address)).map(
|
||||
(blk: Block) =>
|
||||
BlockDelivery(address: e.address, blk: blk, proof: CodexProof.none)
|
||||
)
|
||||
if blockDeliveries.len == 0:
|
||||
continue
|
||||
|
||||
let
|
||||
blocksDeliveryFut = await allFinished(wantsBlocks.map(localLookup))
|
||||
blocksDelivery = blocksDeliveryFut.filterIt(it.completed and it.value.isOk).mapIt:
|
||||
if bd =? it.value:
|
||||
bd
|
||||
else:
|
||||
raiseAssert "Unexpected error in local lookup"
|
||||
|
||||
# All the wants that failed local lookup must be set to not-in-flight again.
|
||||
let
|
||||
successAddresses = blocksDelivery.mapIt(it.address)
|
||||
failedAddresses = wantAddresses.filterIt(it notin successAddresses)
|
||||
updateInFlight(failedAddresses, false)
|
||||
|
||||
if blocksDelivery.len > 0:
|
||||
trace "Sending blocks to peer",
|
||||
peer = task.id, blocks = (blocksDelivery.mapIt(it.address))
|
||||
await self.network.request.sendBlocksDelivery(task.id, blocksDelivery)
|
||||
|
||||
codex_block_exchange_blocks_sent.inc(blocksDelivery.len.int64)
|
||||
|
||||
task.peerWants.keepItIf(it.address notin successAddresses)
|
||||
await self.network.request.sendBlocksDelivery(peerCtx.id, blockDeliveries)
|
||||
codex_block_exchange_blocks_sent.inc(blockDeliveries.len.int64)
|
||||
# Drops the batch from the peer's set of wanted blocks; i.e. assumes that after
|
||||
# we send the blocks, then the peer no longer wants them, so we don't need to
|
||||
# re-send them. Note that the send might still fail down the line and we will
|
||||
# have removed those anyway. At that point, we rely on the requester performing
|
||||
# a retry for the request to succeed.
|
||||
peerCtx.wantedBlocks.keepItIf(it notin sent)
|
||||
finally:
|
||||
# Better safe than sorry: if an exception does happen, we don't want to keep
|
||||
# those as sent, as it'll effectively prevent the blocks from ever being sent again.
|
||||
peerCtx.blocksSent.keepItIf(it notin wantedBlocks)
|
||||
|
||||
proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
|
||||
## process tasks
|
||||
@ -652,11 +979,47 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} =
|
||||
while self.blockexcRunning:
|
||||
let peerCtx = await self.taskQueue.pop()
|
||||
await self.taskHandler(peerCtx)
|
||||
except CancelledError:
|
||||
trace "block exchange task runner cancelled"
|
||||
except CatchableError as exc:
|
||||
error "error running block exchange task", error = exc.msg
|
||||
|
||||
info "Exiting blockexc task runner"
|
||||
|
||||
proc selectRandom*(
|
||||
peers: seq[BlockExcPeerCtx]
|
||||
): BlockExcPeerCtx {.gcsafe, raises: [].} =
|
||||
if peers.len == 1:
|
||||
return peers[0]
|
||||
|
||||
proc evalPeerScore(peer: BlockExcPeerCtx): float =
|
||||
let
|
||||
loadPenalty = peer.blocksRequested.len.float * 2.0
|
||||
successRate =
|
||||
if peer.exchanged > 0:
|
||||
peer.exchanged.float / (peer.exchanged + peer.blocksRequested.len).float
|
||||
else:
|
||||
0.5
|
||||
failurePenalty = (1.0 - successRate) * 5.0
|
||||
return loadPenalty + failurePenalty
|
||||
|
||||
let
|
||||
scores = peers.mapIt(evalPeerScore(it))
|
||||
maxScore = scores.max() + 1.0
|
||||
weights = scores.mapIt(maxScore - it)
|
||||
|
||||
var totalWeight = 0.0
|
||||
for w in weights:
|
||||
totalWeight += w
|
||||
|
||||
var r = rand(totalWeight)
|
||||
for i, weight in weights:
|
||||
r -= weight
|
||||
if r <= 0.0:
|
||||
return peers[i]
|
||||
|
||||
return peers[^1]
|
||||
|
||||
proc new*(
|
||||
T: type BlockExcEngine,
|
||||
localStore: BlockStore,
|
||||
@ -666,7 +1029,9 @@ proc new*(
|
||||
advertiser: Advertiser,
|
||||
peerStore: PeerCtxStore,
|
||||
pendingBlocks: PendingBlocksManager,
|
||||
maxBlocksPerMessage = DefaultMaxBlocksPerMessage,
|
||||
concurrentTasks = DefaultConcurrentTasks,
|
||||
selectPeer: PeerSelector = selectRandom,
|
||||
): BlockExcEngine =
|
||||
## Create new block exchange engine instance
|
||||
##
|
||||
@ -679,23 +1044,13 @@ proc new*(
|
||||
wallet: wallet,
|
||||
concurrentTasks: concurrentTasks,
|
||||
trackedFutures: TrackedFutures(),
|
||||
maxBlocksPerMessage: maxBlocksPerMessage,
|
||||
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
|
||||
discovery: discovery,
|
||||
advertiser: advertiser,
|
||||
selectPeer: selectPeer,
|
||||
)
|
||||
|
||||
proc peerEventHandler(
|
||||
peerId: PeerId, event: PeerEvent
|
||||
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||
if event.kind == PeerEventKind.Joined:
|
||||
await self.setupPeer(peerId)
|
||||
else:
|
||||
self.dropPeer(peerId)
|
||||
|
||||
if not isNil(network.switch):
|
||||
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
|
||||
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
|
||||
|
||||
proc blockWantListHandler(
|
||||
peer: PeerId, wantList: WantList
|
||||
): Future[void] {.async: (raises: []).} =
|
||||
@ -721,12 +1076,24 @@ proc new*(
|
||||
): Future[void] {.async: (raises: []).} =
|
||||
self.paymentHandler(peer, payment)
|
||||
|
||||
proc peerAddedHandler(
|
||||
peer: PeerId
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
await self.peerAddedHandler(peer)
|
||||
|
||||
proc peerDepartedHandler(
|
||||
peer: PeerId
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
self.evictPeer(peer)
|
||||
|
||||
network.handlers = BlockExcHandlers(
|
||||
onWantList: blockWantListHandler,
|
||||
onBlocksDelivery: blocksDeliveryHandler,
|
||||
onPresence: blockPresenceHandler,
|
||||
onAccount: accountHandler,
|
||||
onPayment: paymentHandler,
|
||||
onPeerJoined: peerAddedHandler,
|
||||
onPeerDeparted: peerDepartedHandler,
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@ -34,7 +34,7 @@ declareGauge(
|
||||
|
||||
const
|
||||
DefaultBlockRetries* = 3000
|
||||
DefaultRetryInterval* = 500.millis
|
||||
DefaultRetryInterval* = 2.seconds
|
||||
|
||||
type
|
||||
RetriesExhaustedError* = object of CatchableError
|
||||
@ -42,7 +42,7 @@ type
|
||||
|
||||
BlockReq* = object
|
||||
handle*: BlockHandle
|
||||
inFlight*: bool
|
||||
requested*: ?PeerId
|
||||
blockRetries*: int
|
||||
startTime*: int64
|
||||
|
||||
@ -50,12 +50,13 @@ type
|
||||
blockRetries*: int = DefaultBlockRetries
|
||||
retryInterval*: Duration = DefaultRetryInterval
|
||||
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
|
||||
lastInclusion*: Moment # time at which we last included a block into our wantlist
|
||||
|
||||
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
||||
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
||||
|
||||
proc getWantHandle*(
|
||||
self: PendingBlocksManager, address: BlockAddress, inFlight = false
|
||||
self: PendingBlocksManager, address: BlockAddress, requested: ?PeerId = PeerId.none
|
||||
): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
|
||||
## Add an event for a block
|
||||
##
|
||||
@ -65,11 +66,13 @@ proc getWantHandle*(
|
||||
do:
|
||||
let blk = BlockReq(
|
||||
handle: newFuture[Block]("pendingBlocks.getWantHandle"),
|
||||
inFlight: inFlight,
|
||||
requested: requested,
|
||||
blockRetries: self.blockRetries,
|
||||
startTime: getMonoTime().ticks,
|
||||
)
|
||||
self.blocks[address] = blk
|
||||
self.lastInclusion = Moment.now()
|
||||
|
||||
let handle = blk.handle
|
||||
|
||||
proc cleanUpBlock(data: pointer) {.raises: [].} =
|
||||
@ -86,9 +89,9 @@ proc getWantHandle*(
|
||||
return handle
|
||||
|
||||
proc getWantHandle*(
|
||||
self: PendingBlocksManager, cid: Cid, inFlight = false
|
||||
self: PendingBlocksManager, cid: Cid, requested: ?PeerId = PeerId.none
|
||||
): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} =
|
||||
self.getWantHandle(BlockAddress.init(cid), inFlight)
|
||||
self.getWantHandle(BlockAddress.init(cid), requested)
|
||||
|
||||
proc completeWantHandle*(
|
||||
self: PendingBlocksManager, address: BlockAddress, blk: Block
|
||||
@ -121,9 +124,6 @@ proc resolve*(
|
||||
blockReq.handle.complete(bd.blk)
|
||||
|
||||
codex_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
||||
|
||||
if retrievalDurationUs > 500000:
|
||||
warn "High block retrieval time", retrievalDurationUs, address = bd.address
|
||||
else:
|
||||
trace "Block handle already finished", address = bd.address
|
||||
|
||||
@ -141,19 +141,40 @@ func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool
|
||||
self.blocks.withValue(address, pending):
|
||||
result = pending[].blockRetries <= 0
|
||||
|
||||
func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) =
|
||||
## Set inflight status for a block
|
||||
func isRequested*(self: PendingBlocksManager, address: BlockAddress): bool =
|
||||
## Check if a block has been requested to a peer
|
||||
##
|
||||
result = false
|
||||
self.blocks.withValue(address, pending):
|
||||
result = pending[].requested.isSome
|
||||
|
||||
func getRequestPeer*(self: PendingBlocksManager, address: BlockAddress): ?PeerId =
|
||||
## Returns the peer that requested this block
|
||||
##
|
||||
result = PeerId.none
|
||||
self.blocks.withValue(address, pending):
|
||||
result = pending[].requested
|
||||
|
||||
proc markRequested*(
|
||||
self: PendingBlocksManager, address: BlockAddress, peer: PeerId
|
||||
): bool =
|
||||
## Marks this block as having been requested to a peer
|
||||
##
|
||||
|
||||
self.blocks.withValue(address, pending):
|
||||
pending[].inFlight = inFlight
|
||||
|
||||
func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool =
|
||||
## Check if a block is in flight
|
||||
##
|
||||
if self.isRequested(address):
|
||||
return false
|
||||
|
||||
self.blocks.withValue(address, pending):
|
||||
result = pending[].inFlight
|
||||
pending[].requested = peer.some
|
||||
return true
|
||||
|
||||
proc clearRequest*(
|
||||
self: PendingBlocksManager, address: BlockAddress, peer: ?PeerId = PeerId.none
|
||||
) =
|
||||
self.blocks.withValue(address, pending):
|
||||
if peer.isSome:
|
||||
assert peer == pending[].requested
|
||||
pending[].requested = PeerId.none
|
||||
|
||||
func contains*(self: PendingBlocksManager, cid: Cid): bool =
|
||||
BlockAddress.init(cid) in self.blocks
|
||||
|
||||
@ -44,6 +44,7 @@ type
|
||||
AccountHandler* = proc(peer: PeerId, account: Account) {.gcsafe, async: (raises: []).}
|
||||
PaymentHandler* =
|
||||
proc(peer: PeerId, payment: SignedState) {.gcsafe, async: (raises: []).}
|
||||
PeerEventHandler* = proc(peer: PeerId) {.gcsafe, async: (raises: [CancelledError]).}
|
||||
|
||||
BlockExcHandlers* = object
|
||||
onWantList*: WantListHandler
|
||||
@ -51,6 +52,9 @@ type
|
||||
onPresence*: BlockPresenceHandler
|
||||
onAccount*: AccountHandler
|
||||
onPayment*: PaymentHandler
|
||||
onPeerJoined*: PeerEventHandler
|
||||
onPeerDeparted*: PeerEventHandler
|
||||
onPeerDropped*: PeerEventHandler
|
||||
|
||||
WantListSender* = proc(
|
||||
id: PeerId,
|
||||
@ -240,86 +244,104 @@ proc handlePayment(
|
||||
await network.handlers.onPayment(peer.id, payment)
|
||||
|
||||
proc rpcHandler(
|
||||
b: BlockExcNetwork, peer: NetworkPeer, msg: Message
|
||||
self: BlockExcNetwork, peer: NetworkPeer, msg: Message
|
||||
) {.async: (raises: []).} =
|
||||
## handle rpc messages
|
||||
##
|
||||
if msg.wantList.entries.len > 0:
|
||||
b.trackedFutures.track(b.handleWantList(peer, msg.wantList))
|
||||
self.trackedFutures.track(self.handleWantList(peer, msg.wantList))
|
||||
|
||||
if msg.payload.len > 0:
|
||||
b.trackedFutures.track(b.handleBlocksDelivery(peer, msg.payload))
|
||||
self.trackedFutures.track(self.handleBlocksDelivery(peer, msg.payload))
|
||||
|
||||
if msg.blockPresences.len > 0:
|
||||
b.trackedFutures.track(b.handleBlockPresence(peer, msg.blockPresences))
|
||||
self.trackedFutures.track(self.handleBlockPresence(peer, msg.blockPresences))
|
||||
|
||||
if account =? Account.init(msg.account):
|
||||
b.trackedFutures.track(b.handleAccount(peer, account))
|
||||
self.trackedFutures.track(self.handleAccount(peer, account))
|
||||
|
||||
if payment =? SignedState.init(msg.payment):
|
||||
b.trackedFutures.track(b.handlePayment(peer, payment))
|
||||
self.trackedFutures.track(self.handlePayment(peer, payment))
|
||||
|
||||
proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer =
|
||||
proc getOrCreatePeer(self: BlockExcNetwork, peer: PeerId): NetworkPeer =
|
||||
## Creates or retrieves a BlockExcNetwork Peer
|
||||
##
|
||||
|
||||
if peer in b.peers:
|
||||
return b.peers.getOrDefault(peer, nil)
|
||||
if peer in self.peers:
|
||||
return self.peers.getOrDefault(peer, nil)
|
||||
|
||||
var getConn: ConnProvider = proc(): Future[Connection] {.
|
||||
async: (raises: [CancelledError])
|
||||
.} =
|
||||
try:
|
||||
trace "Getting new connection stream", peer
|
||||
return await b.switch.dial(peer, Codec)
|
||||
return await self.switch.dial(peer, Codec)
|
||||
except CancelledError as error:
|
||||
raise error
|
||||
except CatchableError as exc:
|
||||
trace "Unable to connect to blockexc peer", exc = exc.msg
|
||||
|
||||
if not isNil(b.getConn):
|
||||
getConn = b.getConn
|
||||
if not isNil(self.getConn):
|
||||
getConn = self.getConn
|
||||
|
||||
let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async: (raises: []).} =
|
||||
await b.rpcHandler(p, msg)
|
||||
await self.rpcHandler(p, msg)
|
||||
|
||||
# create new pubsub peer
|
||||
let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler)
|
||||
debug "Created new blockexc peer", peer
|
||||
|
||||
b.peers[peer] = blockExcPeer
|
||||
self.peers[peer] = blockExcPeer
|
||||
|
||||
return blockExcPeer
|
||||
|
||||
proc setupPeer*(b: BlockExcNetwork, peer: PeerId) =
|
||||
## Perform initial setup, such as want
|
||||
## list exchange
|
||||
##
|
||||
|
||||
discard b.getOrCreatePeer(peer)
|
||||
|
||||
proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
|
||||
proc dialPeer*(self: BlockExcNetwork, peer: PeerRecord) {.async.} =
|
||||
## Dial a peer
|
||||
##
|
||||
|
||||
if b.isSelf(peer.peerId):
|
||||
if self.isSelf(peer.peerId):
|
||||
trace "Skipping dialing self", peer = peer.peerId
|
||||
return
|
||||
|
||||
if peer.peerId in b.peers:
|
||||
if peer.peerId in self.peers:
|
||||
trace "Already connected to peer", peer = peer.peerId
|
||||
return
|
||||
|
||||
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
|
||||
await self.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
|
||||
|
||||
proc dropPeer*(b: BlockExcNetwork, peer: PeerId) =
|
||||
proc dropPeer*(
|
||||
self: BlockExcNetwork, peer: PeerId
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
trace "Dropping peer", peer
|
||||
|
||||
try:
|
||||
if not self.switch.isNil:
|
||||
await self.switch.disconnect(peer)
|
||||
except CatchableError as error:
|
||||
warn "Error attempting to disconnect from peer", peer = peer, error = error.msg
|
||||
|
||||
if not self.handlers.onPeerDropped.isNil:
|
||||
await self.handlers.onPeerDropped(peer)
|
||||
|
||||
proc handlePeerJoined*(
|
||||
self: BlockExcNetwork, peer: PeerId
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
discard self.getOrCreatePeer(peer)
|
||||
if not self.handlers.onPeerJoined.isNil:
|
||||
await self.handlers.onPeerJoined(peer)
|
||||
|
||||
proc handlePeerDeparted*(
|
||||
self: BlockExcNetwork, peer: PeerId
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
## Cleanup disconnected peer
|
||||
##
|
||||
|
||||
trace "Dropping peer", peer
|
||||
b.peers.del(peer)
|
||||
trace "Cleaning up departed peer", peer
|
||||
self.peers.del(peer)
|
||||
if not self.handlers.onPeerDeparted.isNil:
|
||||
await self.handlers.onPeerDeparted(peer)
|
||||
|
||||
method init*(self: BlockExcNetwork) =
|
||||
method init*(self: BlockExcNetwork) {.raises: [].} =
|
||||
## Perform protocol initialization
|
||||
##
|
||||
|
||||
@ -327,9 +349,11 @@ method init*(self: BlockExcNetwork) =
|
||||
peerId: PeerId, event: PeerEvent
|
||||
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
|
||||
if event.kind == PeerEventKind.Joined:
|
||||
self.setupPeer(peerId)
|
||||
await self.handlePeerJoined(peerId)
|
||||
elif event.kind == PeerEventKind.Left:
|
||||
await self.handlePeerDeparted(peerId)
|
||||
else:
|
||||
self.dropPeer(peerId)
|
||||
warn "Unknown peer event", event
|
||||
|
||||
self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
|
||||
self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
|
||||
|
||||
@ -65,7 +65,9 @@ proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} =
|
||||
except CatchableError as err:
|
||||
warn "Exception in blockexc read loop", msg = err.msg
|
||||
finally:
|
||||
trace "Detaching read loop", peer = self.id, connId = conn.oid
|
||||
warn "Detaching read loop", peer = self.id, connId = conn.oid
|
||||
if self.sendConn == conn:
|
||||
self.sendConn = nil
|
||||
await conn.close()
|
||||
|
||||
proc connect*(
|
||||
@ -89,7 +91,12 @@ proc send*(
|
||||
return
|
||||
|
||||
trace "Sending message", peer = self.id, connId = conn.oid
|
||||
await conn.writeLp(protobufEncode(msg))
|
||||
try:
|
||||
await conn.writeLp(protobufEncode(msg))
|
||||
except CatchableError as err:
|
||||
if self.sendConn == conn:
|
||||
self.sendConn = nil
|
||||
raise newException(LPStreamError, "Failed to send message: " & err.msg)
|
||||
|
||||
func new*(
|
||||
T: type NetworkPeer,
|
||||
|
||||
@ -25,28 +25,77 @@ import ../../logutils
|
||||
|
||||
export payments, nitro
|
||||
|
||||
const
|
||||
MinRefreshInterval = 1.seconds
|
||||
MaxRefreshBackoff = 36 # 36 seconds
|
||||
MaxWantListBatchSize* = 1024 # Maximum blocks to send per WantList message
|
||||
|
||||
type BlockExcPeerCtx* = ref object of RootObj
|
||||
id*: PeerId
|
||||
blocks*: Table[BlockAddress, Presence] # remote peer have list including price
|
||||
peerWants*: seq[WantListEntry] # remote peers want lists
|
||||
wantedBlocks*: HashSet[BlockAddress] # blocks that the peer wants
|
||||
exchanged*: int # times peer has exchanged with us
|
||||
lastExchange*: Moment # last time peer has exchanged with us
|
||||
refreshInProgress*: bool # indicates if a refresh is in progress
|
||||
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
|
||||
refreshBackoff*: int = 1 # backoff factor for refresh requests
|
||||
account*: ?Account # ethereum account of this peer
|
||||
paymentChannel*: ?ChannelId # payment channel id
|
||||
blocksSent*: HashSet[BlockAddress] # blocks sent to peer
|
||||
blocksRequested*: HashSet[BlockAddress] # pending block requests to this peer
|
||||
lastExchange*: Moment # last time peer has sent us a block
|
||||
activityTimeout*: Duration
|
||||
lastSentWants*: HashSet[BlockAddress]
|
||||
# track what wantList we last sent for delta updates
|
||||
|
||||
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
|
||||
toSeq(self.blocks.keys)
|
||||
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
|
||||
let staleness =
|
||||
self.lastRefresh + self.refreshBackoff * MinRefreshInterval < Moment.now()
|
||||
|
||||
proc peerHaveCids*(self: BlockExcPeerCtx): HashSet[Cid] =
|
||||
self.blocks.keys.toSeq.mapIt(it.cidOrTreeCid).toHashSet
|
||||
if staleness and self.refreshInProgress:
|
||||
trace "Cleaning up refresh state", peer = self.id
|
||||
self.refreshInProgress = false
|
||||
self.refreshBackoff = 1
|
||||
|
||||
proc peerWantsCids*(self: BlockExcPeerCtx): HashSet[Cid] =
|
||||
self.peerWants.mapIt(it.address.cidOrTreeCid).toHashSet
|
||||
staleness
|
||||
|
||||
proc isBlockSent*(self: BlockExcPeerCtx, address: BlockAddress): bool =
|
||||
address in self.blocksSent
|
||||
|
||||
proc markBlockAsSent*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
self.blocksSent.incl(address)
|
||||
|
||||
proc markBlockAsNotSent*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
self.blocksSent.excl(address)
|
||||
|
||||
proc refreshRequested*(self: BlockExcPeerCtx) =
|
||||
trace "Refresh requested for peer", peer = self.id, backoff = self.refreshBackoff
|
||||
self.refreshInProgress = true
|
||||
self.lastRefresh = Moment.now()
|
||||
|
||||
proc refreshReplied*(self: BlockExcPeerCtx) =
|
||||
self.refreshInProgress = false
|
||||
self.lastRefresh = Moment.now()
|
||||
self.refreshBackoff = min(self.refreshBackoff * 2, MaxRefreshBackoff)
|
||||
|
||||
proc havesUpdated(self: BlockExcPeerCtx) =
|
||||
self.refreshBackoff = 1
|
||||
|
||||
proc wantsUpdated*(self: BlockExcPeerCtx) =
|
||||
self.refreshBackoff = 1
|
||||
|
||||
proc peerHave*(self: BlockExcPeerCtx): HashSet[BlockAddress] =
|
||||
# XXX: this is ugly an inefficient, but since those will typically
|
||||
# be used in "joins", it's better to pay the price here and have
|
||||
# a linear join than to not do it and have a quadratic join.
|
||||
toHashSet(self.blocks.keys.toSeq)
|
||||
|
||||
proc contains*(self: BlockExcPeerCtx, address: BlockAddress): bool =
|
||||
address in self.blocks
|
||||
|
||||
func setPresence*(self: BlockExcPeerCtx, presence: Presence) =
|
||||
if presence.address notin self.blocks:
|
||||
self.havesUpdated()
|
||||
|
||||
self.blocks[presence.address] = presence
|
||||
|
||||
func cleanPresence*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]) =
|
||||
@ -63,3 +112,36 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 =
|
||||
price += precense[].price
|
||||
|
||||
price
|
||||
|
||||
proc blockRequestScheduled*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
## Adds a block the set of blocks that have been requested to this peer
|
||||
## (its request schedule).
|
||||
if self.blocksRequested.len == 0:
|
||||
self.lastExchange = Moment.now()
|
||||
self.blocksRequested.incl(address)
|
||||
|
||||
proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
## Removes a block from the set of blocks that have been requested to this peer
|
||||
## (its request schedule).
|
||||
self.blocksRequested.excl(address)
|
||||
|
||||
proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress): bool =
|
||||
let wasRequested = address in self.blocksRequested
|
||||
self.blocksRequested.excl(address)
|
||||
self.lastExchange = Moment.now()
|
||||
wasRequested
|
||||
|
||||
proc activityTimer*(
|
||||
self: BlockExcPeerCtx
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
## This is called by the block exchange when a block is scheduled for this peer.
|
||||
## If the peer sends no blocks for a while, it is considered inactive/uncooperative
|
||||
## and the peer is dropped. Note that ANY block that the peer sends will reset this
|
||||
## timer for all blocks.
|
||||
##
|
||||
while true:
|
||||
let idleTime = Moment.now() - self.lastExchange
|
||||
if idleTime > self.activityTimeout:
|
||||
return
|
||||
|
||||
await sleepAsync(self.activityTimeout - idleTime)
|
||||
|
||||
@ -62,21 +62,23 @@ func len*(self: PeerCtxStore): int =
|
||||
self.peers.len
|
||||
|
||||
func peersHave*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
|
||||
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it == address))
|
||||
toSeq(self.peers.values).filterIt(address in it.peerHave)
|
||||
|
||||
func peersHave*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
|
||||
# FIXME: this is way slower and can end up leading to unexpected performance loss.
|
||||
toSeq(self.peers.values).filterIt(it.peerHave.anyIt(it.cidOrTreeCid == cid))
|
||||
|
||||
func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] =
|
||||
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it == address))
|
||||
toSeq(self.peers.values).filterIt(address in it.wantedBlocks)
|
||||
|
||||
func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] =
|
||||
toSeq(self.peers.values).filterIt(it.peerWants.anyIt(it.address.cidOrTreeCid == cid))
|
||||
# FIXME: this is way slower and can end up leading to unexpected performance loss.
|
||||
toSeq(self.peers.values).filterIt(it.wantedBlocks.anyIt(it.cidOrTreeCid == cid))
|
||||
|
||||
proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock =
|
||||
var res: PeersForBlock = (@[], @[])
|
||||
for peer in self:
|
||||
if peer.peerHave.anyIt(it == address):
|
||||
if address in peer:
|
||||
res.with.add(peer)
|
||||
else:
|
||||
res.without.add(peer)
|
||||
|
||||
@ -9,7 +9,6 @@
|
||||
|
||||
import std/hashes
|
||||
import std/sequtils
|
||||
import pkg/stew/endians2
|
||||
|
||||
import message
|
||||
|
||||
@ -20,13 +19,6 @@ export Wantlist, WantType, WantListEntry
|
||||
export BlockDelivery, BlockPresenceType, BlockPresence
|
||||
export AccountMessage, StateChannelUpdate
|
||||
|
||||
proc hash*(a: BlockAddress): Hash =
|
||||
if a.leaf:
|
||||
let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE)
|
||||
hash(data)
|
||||
else:
|
||||
hash(a.cid.data.buffer)
|
||||
|
||||
proc hash*(e: WantListEntry): Hash =
|
||||
hash(e.address)
|
||||
|
||||
|
||||
@ -25,11 +25,15 @@ type
|
||||
|
||||
WantListEntry* = object
|
||||
address*: BlockAddress
|
||||
# XXX: I think explicit priority is pointless as the peer will request
|
||||
# the blocks in the order it wants to receive them, and all we have to
|
||||
# do is process those in the same order as we send them back. It also
|
||||
# complicates things for no reason at the moment, as the priority is
|
||||
# always set to 0.
|
||||
priority*: int32 # The priority (normalized). default to 1
|
||||
cancel*: bool # Whether this revokes an entry
|
||||
wantType*: WantType # Note: defaults to enum 0, ie Block
|
||||
sendDontHave*: bool # Note: defaults to false
|
||||
inFlight*: bool # Whether block sending is in progress. Not serialized.
|
||||
|
||||
WantList* = object
|
||||
entries*: seq[WantListEntry] # A list of wantList entries
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
|
||||
import std/tables
|
||||
import std/sugar
|
||||
import std/hashes
|
||||
|
||||
export tables
|
||||
|
||||
@ -18,7 +19,7 @@ push:
|
||||
{.upraises: [].}
|
||||
|
||||
import pkg/libp2p/[cid, multicodec, multihash]
|
||||
import pkg/stew/byteutils
|
||||
import pkg/stew/[byteutils, endians2]
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
@ -67,6 +68,13 @@ proc `$`*(a: BlockAddress): string =
|
||||
else:
|
||||
"cid: " & $a.cid
|
||||
|
||||
proc hash*(a: BlockAddress): Hash =
|
||||
if a.leaf:
|
||||
let data = a.treeCid.data.buffer & @(a.index.uint64.toBytesBE)
|
||||
hash(data)
|
||||
else:
|
||||
hash(a.cid.data.buffer)
|
||||
|
||||
proc cidOrTreeCid*(a: BlockAddress): Cid =
|
||||
if a.leaf: a.treeCid else: a.cid
|
||||
|
||||
|
||||
@ -211,7 +211,7 @@ proc new*(
|
||||
.withMaxConnections(config.maxPeers)
|
||||
.withAgentVersion(config.agentString)
|
||||
.withSignedPeerRecord(true)
|
||||
.withTcpTransport({ServerFlags.ReuseAddr})
|
||||
.withTcpTransport({ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay})
|
||||
.build()
|
||||
|
||||
var
|
||||
|
||||
@ -44,7 +44,7 @@ import ./indexingstrategy
|
||||
import ./utils
|
||||
import ./errors
|
||||
import ./logutils
|
||||
import ./utils/asynciter
|
||||
import ./utils/safeasynciter
|
||||
import ./utils/trackedfutures
|
||||
|
||||
export logutils
|
||||
@ -52,7 +52,10 @@ export logutils
|
||||
logScope:
|
||||
topics = "codex node"
|
||||
|
||||
const DefaultFetchBatch = 10
|
||||
const
|
||||
DefaultFetchBatch = 1024
|
||||
MaxOnBatchBlocks = 128
|
||||
BatchRefillThreshold = 0.75 # Refill when 75% of window completes
|
||||
|
||||
type
|
||||
Contracts* =
|
||||
@ -186,34 +189,62 @@ proc fetchBatched*(
|
||||
# (i: int) => self.networkStore.getBlock(BlockAddress.init(cid, i))
|
||||
# )
|
||||
|
||||
while not iter.finished:
|
||||
let blockFutures = collect:
|
||||
for i in 0 ..< batchSize:
|
||||
if not iter.finished:
|
||||
let address = BlockAddress.init(cid, iter.next())
|
||||
if not (await address in self.networkStore) or fetchLocal:
|
||||
self.networkStore.getBlock(address)
|
||||
# Sliding window: maintain batchSize blocks in-flight
|
||||
let
|
||||
refillThreshold = int(float(batchSize) * BatchRefillThreshold)
|
||||
refillSize = max(refillThreshold, 1)
|
||||
maxCallbackBlocks = min(batchSize, MaxOnBatchBlocks)
|
||||
|
||||
if blockFutures.len == 0:
|
||||
var
|
||||
blockData: seq[bt.Block]
|
||||
failedBlocks = 0
|
||||
successfulBlocks = 0
|
||||
completedInWindow = 0
|
||||
|
||||
var addresses = newSeqOfCap[BlockAddress](batchSize)
|
||||
for i in 0 ..< batchSize:
|
||||
if not iter.finished:
|
||||
let address = BlockAddress.init(cid, iter.next())
|
||||
if fetchLocal or not (await address in self.networkStore):
|
||||
addresses.add(address)
|
||||
|
||||
var blockResults = await self.networkStore.getBlocks(addresses)
|
||||
|
||||
while not blockResults.finished:
|
||||
without blk =? await blockResults.next(), err:
|
||||
inc(failedBlocks)
|
||||
continue
|
||||
|
||||
without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
|
||||
trace "Some blocks failed to fetch", err = err.msg
|
||||
return failure(err)
|
||||
inc(successfulBlocks)
|
||||
inc(completedInWindow)
|
||||
|
||||
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
|
||||
if not onBatch.isNil:
|
||||
blockData.add(blk)
|
||||
if blockData.len >= maxCallbackBlocks:
|
||||
if batchErr =? (await onBatch(blockData)).errorOption:
|
||||
return failure(batchErr)
|
||||
blockData = @[]
|
||||
|
||||
let numOfFailedBlocks = blockResults.len - blocks.len
|
||||
if numOfFailedBlocks > 0:
|
||||
return
|
||||
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
|
||||
if completedInWindow >= refillThreshold and not iter.finished:
|
||||
var refillAddresses = newSeqOfCap[BlockAddress](refillSize)
|
||||
for i in 0 ..< refillSize:
|
||||
if not iter.finished:
|
||||
let address = BlockAddress.init(cid, iter.next())
|
||||
if fetchLocal or not (await address in self.networkStore):
|
||||
refillAddresses.add(address)
|
||||
|
||||
if not onBatch.isNil and batchErr =? (await onBatch(blocks)).errorOption:
|
||||
if refillAddresses.len > 0:
|
||||
blockResults =
|
||||
chain(blockResults, await self.networkStore.getBlocks(refillAddresses))
|
||||
completedInWindow = 0
|
||||
|
||||
if failedBlocks > 0:
|
||||
return failure("Some blocks failed (Result) to fetch (" & $failedBlocks & ")")
|
||||
|
||||
if not onBatch.isNil and blockData.len > 0:
|
||||
if batchErr =? (await onBatch(blockData)).errorOption:
|
||||
return failure(batchErr)
|
||||
|
||||
if not iter.finished:
|
||||
await sleepAsync(1.millis)
|
||||
|
||||
success()
|
||||
|
||||
proc fetchBatched*(
|
||||
|
||||
@ -70,6 +70,14 @@ method completeBlock*(
|
||||
) {.base, gcsafe.} =
|
||||
discard
|
||||
|
||||
method getBlocks*(
|
||||
self: BlockStore, addresses: seq[BlockAddress]
|
||||
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
|
||||
## Gets a set of blocks from the blockstore. Blocks might
|
||||
## be returned in any order.
|
||||
|
||||
raiseAssert("getBlocks not implemented!")
|
||||
|
||||
method getBlockAndProof*(
|
||||
self: BlockStore, treeCid: Cid, index: Natural
|
||||
): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} =
|
||||
|
||||
@ -66,6 +66,21 @@ method getBlock*(
|
||||
trace "Error requesting block from cache", cid, error = exc.msg
|
||||
return failure exc
|
||||
|
||||
method getBlocks*(
|
||||
self: CacheStore, addresses: seq[BlockAddress]
|
||||
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
|
||||
var i = 0
|
||||
|
||||
proc isFinished(): bool =
|
||||
i == addresses.len
|
||||
|
||||
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
let value = await self.getBlock(addresses[i])
|
||||
inc(i)
|
||||
return value
|
||||
|
||||
return SafeAsyncIter[Block].new(genNext, isFinished)
|
||||
|
||||
method getCidAndProof*(
|
||||
self: CacheStore, treeCid: Cid, index: Natural
|
||||
): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
@ -31,6 +31,31 @@ type NetworkStore* = ref object of BlockStore
|
||||
engine*: BlockExcEngine # blockexc decision engine
|
||||
localStore*: BlockStore # local block store
|
||||
|
||||
method getBlocks*(
|
||||
self: NetworkStore, addresses: seq[BlockAddress]
|
||||
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
|
||||
var
|
||||
localAddresses: seq[BlockAddress]
|
||||
remoteAddresses: seq[BlockAddress]
|
||||
|
||||
let runtimeQuota = 10.milliseconds
|
||||
var lastIdle = Moment.now()
|
||||
|
||||
for address in addresses:
|
||||
if not (await address in self.localStore):
|
||||
remoteAddresses.add(address)
|
||||
else:
|
||||
localAddresses.add(address)
|
||||
|
||||
if (Moment.now() - lastIdle) >= runtimeQuota:
|
||||
await idleAsync()
|
||||
lastIdle = Moment.now()
|
||||
|
||||
return chain(
|
||||
await self.localStore.getBlocks(localAddresses),
|
||||
self.engine.requestBlocks(remoteAddresses),
|
||||
)
|
||||
|
||||
method getBlock*(
|
||||
self: NetworkStore, address: BlockAddress
|
||||
): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
@ -38,6 +38,21 @@ logScope:
|
||||
# BlockStore API
|
||||
###########################################################
|
||||
|
||||
method getBlocks*(
|
||||
self: RepoStore, addresses: seq[BlockAddress]
|
||||
): Future[SafeAsyncIter[Block]] {.async: (raises: [CancelledError]).} =
|
||||
var i = 0
|
||||
|
||||
proc isFinished(): bool =
|
||||
i == addresses.len
|
||||
|
||||
proc genNext(): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
let value = await self.getBlock(addresses[i])
|
||||
inc(i)
|
||||
return value
|
||||
|
||||
return SafeAsyncIter[Block].new(genNext, isFinished)
|
||||
|
||||
method getBlock*(
|
||||
self: RepoStore, cid: Cid
|
||||
): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||
|
||||
@ -232,3 +232,28 @@ proc empty*[T](_: type SafeAsyncIter[T]): SafeAsyncIter[T] =
|
||||
true
|
||||
|
||||
SafeAsyncIter[T].new(genNext, isFinished)
|
||||
|
||||
proc chain*[T](iters: seq[SafeAsyncIter[T]]): SafeAsyncIter[T] =
|
||||
if iters.len == 0:
|
||||
return SafeAsyncIter[T].empty
|
||||
|
||||
var curIdx = 0
|
||||
|
||||
proc ensureNext(): void =
|
||||
while curIdx < iters.len and iters[curIdx].finished:
|
||||
inc(curIdx)
|
||||
|
||||
proc isFinished(): bool =
|
||||
curIdx == iters.len
|
||||
|
||||
proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} =
|
||||
let item = await iters[curIdx].next()
|
||||
ensureNext()
|
||||
return item
|
||||
|
||||
ensureNext()
|
||||
|
||||
return SafeAsyncIter[T].new(genNext, isFinished)
|
||||
|
||||
proc chain*[T](iters: varargs[SafeAsyncIter[T]]): SafeAsyncIter[T] =
|
||||
chain(iters.toSeq)
|
||||
|
||||
@ -54,7 +54,7 @@ asyncchecksuite "Block Advertising and Discovery":
|
||||
peerStore = PeerCtxStore.new()
|
||||
pendingBlocks = PendingBlocksManager.new()
|
||||
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
manifestBlock =
|
||||
bt.Block.new(manifest.encode().tryGet(), codec = ManifestCodec).tryGet()
|
||||
|
||||
@ -172,7 +172,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
break
|
||||
|
||||
blocks.add(bt.Block.new(chunk).tryGet())
|
||||
let (manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
let (_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
manifests.add(manifest)
|
||||
mBlocks.add(manifest.asBlock())
|
||||
trees.add(tree)
|
||||
@ -216,7 +216,6 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
test "E2E - Should advertise and discover blocks":
|
||||
# Distribute the manifests and trees amongst 1..3
|
||||
# Ask 0 to download everything without connecting him beforehand
|
||||
|
||||
var advertised: Table[Cid, SignedPeerRecord]
|
||||
|
||||
MockDiscovery(blockexc[1].engine.discovery.discovery).publishBlockProvideHandler = proc(
|
||||
@ -242,6 +241,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid)
|
||||
)
|
||||
],
|
||||
allowSpurious = true,
|
||||
)
|
||||
|
||||
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
|
||||
@ -252,6 +252,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid)
|
||||
)
|
||||
],
|
||||
allowSpurious = true,
|
||||
)
|
||||
|
||||
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
|
||||
@ -262,6 +263,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid)
|
||||
)
|
||||
],
|
||||
allowSpurious = true,
|
||||
)
|
||||
|
||||
MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc(
|
||||
@ -311,6 +313,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid)
|
||||
)
|
||||
],
|
||||
allowSpurious = true,
|
||||
)
|
||||
|
||||
discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid)
|
||||
@ -321,6 +324,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid)
|
||||
)
|
||||
],
|
||||
allowSpurious = true,
|
||||
)
|
||||
|
||||
discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid)
|
||||
@ -331,6 +335,7 @@ asyncchecksuite "E2E - Multiple Nodes Discovery":
|
||||
blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid)
|
||||
)
|
||||
],
|
||||
allowSpurious = true,
|
||||
)
|
||||
|
||||
MockDiscovery(blockexc[0].engine.discovery.discovery).findBlockProvidersHandler = proc(
|
||||
|
||||
@ -43,7 +43,7 @@ asyncchecksuite "Test Discovery Engine":
|
||||
|
||||
blocks.add(bt.Block.new(chunk).tryGet())
|
||||
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
manifestBlock = manifest.asBlock()
|
||||
blocks.add(manifestBlock)
|
||||
|
||||
|
||||
@ -29,14 +29,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
nodeCmps1 = generateNodes(1, blocks1).components[0]
|
||||
nodeCmps2 = generateNodes(1, blocks2).components[0]
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodeCmps1.switch.start(),
|
||||
nodeCmps1.blockDiscovery.start(),
|
||||
nodeCmps1.engine.start(),
|
||||
nodeCmps2.switch.start(),
|
||||
nodeCmps2.blockDiscovery.start(),
|
||||
nodeCmps2.engine.start(),
|
||||
)
|
||||
await allFuturesThrowing(nodeCmps1.start(), nodeCmps2.start())
|
||||
|
||||
# initialize our want lists
|
||||
pendingBlocks1 =
|
||||
@ -65,14 +58,7 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
check isNil(peerCtx2).not
|
||||
|
||||
teardown:
|
||||
await allFuturesThrowing(
|
||||
nodeCmps1.blockDiscovery.stop(),
|
||||
nodeCmps1.engine.stop(),
|
||||
nodeCmps1.switch.stop(),
|
||||
nodeCmps2.blockDiscovery.stop(),
|
||||
nodeCmps2.engine.stop(),
|
||||
nodeCmps2.switch.stop(),
|
||||
)
|
||||
await allFuturesThrowing(nodeCmps1.stop(), nodeCmps2.stop())
|
||||
|
||||
test "Should exchange blocks on connect":
|
||||
await allFuturesThrowing(allFinished(pendingBlocks1)).wait(10.seconds)
|
||||
@ -96,17 +82,11 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
test "Should send want-have for block":
|
||||
let blk = bt.Block.new("Block 1".toBytes).tryGet()
|
||||
let blkFut = nodeCmps1.pendingBlocks.getWantHandle(blk.cid)
|
||||
peerCtx2.blockRequestScheduled(blk.address)
|
||||
|
||||
(await nodeCmps2.localStore.putBlock(blk)).tryGet()
|
||||
|
||||
let entry = WantListEntry(
|
||||
address: blk.address,
|
||||
priority: 1,
|
||||
cancel: false,
|
||||
wantType: WantType.WantBlock,
|
||||
sendDontHave: false,
|
||||
)
|
||||
|
||||
peerCtx1.peerWants.add(entry)
|
||||
peerCtx1.wantedBlocks.incl(blk.address)
|
||||
check nodeCmps2.engine.taskQueue.pushOrUpdateNoWait(peerCtx1).isOk
|
||||
|
||||
check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet()
|
||||
@ -209,3 +189,38 @@ asyncchecksuite "NetworkStore - multiple nodes":
|
||||
|
||||
check pendingBlocks1.mapIt(it.read) == blocks[0 .. 3]
|
||||
check pendingBlocks2.mapIt(it.read) == blocks[12 .. 15]
|
||||
|
||||
asyncchecksuite "NetworkStore - dissemination":
|
||||
var nodes: seq[NodesComponents]
|
||||
|
||||
teardown:
|
||||
if nodes.len > 0:
|
||||
await nodes.stop()
|
||||
|
||||
test "Should disseminate blocks across large diameter swarm":
|
||||
let dataset = makeDataset(await makeRandomBlocks(60 * 256, 256'nb)).tryGet()
|
||||
|
||||
nodes = generateNodes(
|
||||
6,
|
||||
config = NodeConfig(
|
||||
useRepoStore: false,
|
||||
findFreePorts: false,
|
||||
basePort: 8080,
|
||||
createFullNode: false,
|
||||
enableBootstrap: false,
|
||||
enableDiscovery: true,
|
||||
),
|
||||
)
|
||||
|
||||
await assignBlocks(nodes[0], dataset, 0 .. 9)
|
||||
await assignBlocks(nodes[1], dataset, 10 .. 19)
|
||||
await assignBlocks(nodes[2], dataset, 20 .. 29)
|
||||
await assignBlocks(nodes[3], dataset, 30 .. 39)
|
||||
await assignBlocks(nodes[4], dataset, 40 .. 49)
|
||||
await assignBlocks(nodes[5], dataset, 50 .. 59)
|
||||
|
||||
await nodes.start()
|
||||
await nodes.linearTopology()
|
||||
|
||||
let downloads = nodes.mapIt(downloadDataset(it, dataset))
|
||||
await allFuturesThrowing(downloads).wait(30.seconds)
|
||||
|
||||
@ -27,8 +27,6 @@ const NopSendWantCancellationsProc = proc(
|
||||
|
||||
asyncchecksuite "NetworkStore engine basic":
|
||||
var
|
||||
rng: Rng
|
||||
seckey: PrivateKey
|
||||
peerId: PeerId
|
||||
chunker: Chunker
|
||||
wallet: WalletRef
|
||||
@ -39,9 +37,7 @@ asyncchecksuite "NetworkStore engine basic":
|
||||
done: Future[void]
|
||||
|
||||
setup:
|
||||
rng = Rng.instance()
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
peerId = PeerId.example
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb)
|
||||
wallet = WalletRef.example
|
||||
blockDiscovery = Discovery.new()
|
||||
@ -83,7 +79,7 @@ asyncchecksuite "NetworkStore engine basic":
|
||||
|
||||
for b in blocks:
|
||||
discard engine.pendingBlocks.getWantHandle(b.cid)
|
||||
await engine.setupPeer(peerId)
|
||||
await engine.peerAddedHandler(peerId)
|
||||
|
||||
await done.wait(100.millis)
|
||||
|
||||
@ -111,14 +107,12 @@ asyncchecksuite "NetworkStore engine basic":
|
||||
)
|
||||
|
||||
engine.pricing = pricing.some
|
||||
await engine.setupPeer(peerId)
|
||||
await engine.peerAddedHandler(peerId)
|
||||
|
||||
await done.wait(100.millis)
|
||||
|
||||
asyncchecksuite "NetworkStore engine handlers":
|
||||
var
|
||||
rng: Rng
|
||||
seckey: PrivateKey
|
||||
peerId: PeerId
|
||||
chunker: Chunker
|
||||
wallet: WalletRef
|
||||
@ -134,8 +128,7 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
blocks: seq[Block]
|
||||
|
||||
setup:
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
@ -144,8 +137,7 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
|
||||
blocks.add(Block.new(chunk).tryGet())
|
||||
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
peerId = PeerId.example
|
||||
wallet = WalletRef.example
|
||||
blockDiscovery = Discovery.new()
|
||||
peerStore = PeerCtxStore.new()
|
||||
@ -174,7 +166,7 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
let ctx = await engine.taskQueue.pop()
|
||||
check ctx.id == peerId
|
||||
# only `wantBlock` scheduled
|
||||
check ctx.peerWants.mapIt(it.address.cidOrTreeCid) == blocks.mapIt(it.cid)
|
||||
check ctx.wantedBlocks == blocks.mapIt(it.address).toHashSet
|
||||
|
||||
let done = handler()
|
||||
await engine.wantListHandler(peerId, wantList)
|
||||
@ -249,6 +241,9 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
test "Should store blocks in local store":
|
||||
let pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid))
|
||||
|
||||
for blk in blocks:
|
||||
peerCtx.blockRequestScheduled(blk.address)
|
||||
|
||||
let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
|
||||
|
||||
# Install NOP for want list cancellations so they don't cause a crash
|
||||
@ -274,6 +269,9 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
(it.address, Presence(address: it.address, price: rand(uint16).u256, have: true))
|
||||
).toTable
|
||||
|
||||
for blk in blocks:
|
||||
peerContext.blockRequestScheduled(blk.address)
|
||||
|
||||
engine.network = BlockExcNetwork(
|
||||
request: BlockExcRequest(
|
||||
sendPayment: proc(
|
||||
@ -337,33 +335,44 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
check a in peerCtx.peerHave
|
||||
check peerCtx.blocks[a].price == price
|
||||
|
||||
test "Should send cancellations for received blocks":
|
||||
test "Should send cancellations for requested blocks only":
|
||||
let
|
||||
pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid))
|
||||
blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
|
||||
cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq)
|
||||
pendingPeer = peerId # peer towards which we have pending block requests
|
||||
pendingPeerCtx = peerCtx
|
||||
senderPeer = PeerId.example # peer that will actually send the blocks
|
||||
senderPeerCtx = BlockExcPeerCtx(id: senderPeer)
|
||||
reqBlocks = @[blocks[0], blocks[4]] # blocks that we requested to pendingPeer
|
||||
reqBlockAddrs = reqBlocks.mapIt(it.address)
|
||||
blockHandles = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid))
|
||||
|
||||
peerCtx.blocks = blocks.mapIt(
|
||||
(it.address, Presence(address: it.address, have: true, price: UInt256.example))
|
||||
).toTable
|
||||
var cancelled: HashSet[BlockAddress]
|
||||
|
||||
engine.peers.add(senderPeerCtx)
|
||||
for address in reqBlockAddrs:
|
||||
pendingPeerCtx.blockRequestScheduled(address)
|
||||
|
||||
for address in blocks.mapIt(it.address):
|
||||
senderPeerCtx.blockRequestScheduled(address)
|
||||
|
||||
proc sendWantCancellations(
|
||||
id: PeerId, addresses: seq[BlockAddress]
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
assert id == pendingPeer
|
||||
for address in addresses:
|
||||
cancellations[address].catch.expect("address should exist").complete()
|
||||
cancelled.incl(address)
|
||||
|
||||
engine.network = BlockExcNetwork(
|
||||
request: BlockExcRequest(sendWantCancellations: sendWantCancellations)
|
||||
)
|
||||
|
||||
await engine.blocksDeliveryHandler(peerId, blocksDelivery)
|
||||
discard await allFinished(pending).wait(100.millis)
|
||||
await allFuturesThrowing(cancellations.values().toSeq)
|
||||
let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
|
||||
await engine.blocksDeliveryHandler(senderPeer, blocksDelivery)
|
||||
discard await allFinished(blockHandles).wait(100.millis)
|
||||
|
||||
check cancelled == reqBlockAddrs.toHashSet()
|
||||
|
||||
asyncchecksuite "Block Download":
|
||||
var
|
||||
rng: Rng
|
||||
seckey: PrivateKey
|
||||
peerId: PeerId
|
||||
chunker: Chunker
|
||||
@ -380,8 +389,7 @@ asyncchecksuite "Block Download":
|
||||
blocks: seq[Block]
|
||||
|
||||
setup:
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
@ -390,8 +398,7 @@ asyncchecksuite "Block Download":
|
||||
|
||||
blocks.add(Block.new(chunk).tryGet())
|
||||
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
peerId = PeerId.example
|
||||
wallet = WalletRef.example
|
||||
blockDiscovery = Discovery.new()
|
||||
peerStore = PeerCtxStore.new()
|
||||
@ -409,13 +416,27 @@ asyncchecksuite "Block Download":
|
||||
localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks
|
||||
)
|
||||
|
||||
peerCtx = BlockExcPeerCtx(id: peerId)
|
||||
peerCtx = BlockExcPeerCtx(id: peerId, activityTimeout: 100.milliseconds)
|
||||
engine.peers.add(peerCtx)
|
||||
|
||||
test "Should exhaust retries":
|
||||
test "Should reschedule blocks on peer timeout":
|
||||
let
|
||||
slowPeer = peerId
|
||||
fastPeer = PeerId.example
|
||||
slowPeerCtx = peerCtx
|
||||
# "Fast" peer has in fact a generous timeout. This should avoid timing issues
|
||||
# in the test.
|
||||
fastPeerCtx = BlockExcPeerCtx(id: fastPeer, activityTimeout: 60.seconds)
|
||||
requestedBlock = blocks[0]
|
||||
|
||||
var
|
||||
retries = 2
|
||||
address = BlockAddress.init(blocks[0].cid)
|
||||
slowPeerWantList = newFuture[void]("slowPeerWantList")
|
||||
fastPeerWantList = newFuture[void]("fastPeerWantList")
|
||||
slowPeerDropped = newFuture[void]("slowPeerDropped")
|
||||
slowPeerBlockRequest = newFuture[void]("slowPeerBlockRequest")
|
||||
fastPeerBlockRequest = newFuture[void]("fastPeerBlockRequest")
|
||||
|
||||
engine.peers.add(fastPeerCtx)
|
||||
|
||||
proc sendWantList(
|
||||
id: PeerId,
|
||||
@ -426,68 +447,63 @@ asyncchecksuite "Block Download":
|
||||
full: bool = false,
|
||||
sendDontHave: bool = false,
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
check wantType == WantHave
|
||||
check not engine.pendingBlocks.isInFlight(address)
|
||||
check engine.pendingBlocks.retries(address) == retries
|
||||
retries -= 1
|
||||
check addresses == @[requestedBlock.address]
|
||||
|
||||
engine.pendingBlocks.blockRetries = 2
|
||||
engine.pendingBlocks.retryInterval = 10.millis
|
||||
if wantType == WantBlock:
|
||||
if id == slowPeer:
|
||||
slowPeerBlockRequest.complete()
|
||||
else:
|
||||
fastPeerBlockRequest.complete()
|
||||
|
||||
if wantType == WantHave:
|
||||
if id == slowPeer:
|
||||
slowPeerWantList.complete()
|
||||
else:
|
||||
fastPeerWantList.complete()
|
||||
|
||||
proc onPeerDropped(
|
||||
peer: PeerId
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
assert peer == slowPeer
|
||||
slowPeerDropped.complete()
|
||||
|
||||
proc selectPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
|
||||
# Looks for the slow peer.
|
||||
for peer in peers:
|
||||
if peer.id == slowPeer:
|
||||
return peer
|
||||
|
||||
return peers[0]
|
||||
|
||||
engine.selectPeer = selectPeer
|
||||
engine.pendingBlocks.retryInterval = 200.milliseconds
|
||||
engine.network =
|
||||
BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList))
|
||||
engine.network.handlers.onPeerDropped = onPeerDropped
|
||||
|
||||
let pending = engine.requestBlock(address)
|
||||
let blockHandle = engine.requestBlock(requestedBlock.address)
|
||||
|
||||
expect RetriesExhaustedError:
|
||||
discard (await pending).tryGet()
|
||||
# Waits for the peer to send its want list to both peers.
|
||||
await slowPeerWantList.wait(5.seconds)
|
||||
await fastPeerWantList.wait(5.seconds)
|
||||
|
||||
test "Should retry block request":
|
||||
var
|
||||
address = BlockAddress.init(blocks[0].cid)
|
||||
steps = newAsyncEvent()
|
||||
let blockPresence =
|
||||
@[BlockPresence(address: requestedBlock.address, type: BlockPresenceType.Have)]
|
||||
|
||||
proc sendWantList(
|
||||
id: PeerId,
|
||||
addresses: seq[BlockAddress],
|
||||
priority: int32 = 0,
|
||||
cancel: bool = false,
|
||||
wantType: WantType = WantType.WantHave,
|
||||
full: bool = false,
|
||||
sendDontHave: bool = false,
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
case wantType
|
||||
of WantHave:
|
||||
check engine.pendingBlocks.isInFlight(address) == false
|
||||
check engine.pendingBlocks.retriesExhausted(address) == false
|
||||
steps.fire()
|
||||
of WantBlock:
|
||||
check engine.pendingBlocks.isInFlight(address) == true
|
||||
check engine.pendingBlocks.retriesExhausted(address) == false
|
||||
steps.fire()
|
||||
|
||||
engine.pendingBlocks.blockRetries = 10
|
||||
engine.pendingBlocks.retryInterval = 10.millis
|
||||
engine.network = BlockExcNetwork(
|
||||
request: BlockExcRequest(
|
||||
sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc
|
||||
)
|
||||
)
|
||||
|
||||
let pending = engine.requestBlock(address)
|
||||
await steps.wait()
|
||||
|
||||
# add blocks precense
|
||||
peerCtx.blocks = blocks.mapIt(
|
||||
(it.address, Presence(address: it.address, have: true, price: UInt256.example))
|
||||
).toTable
|
||||
|
||||
steps.clear()
|
||||
await steps.wait()
|
||||
await engine.blockPresenceHandler(slowPeer, blockPresence)
|
||||
await engine.blockPresenceHandler(fastPeer, blockPresence)
|
||||
# Waits for the peer to ask for the block.
|
||||
await slowPeerBlockRequest.wait(5.seconds)
|
||||
# Don't reply and wait for the peer to be dropped by timeout.
|
||||
await slowPeerDropped.wait(5.seconds)
|
||||
|
||||
# The engine should retry and ask the fast peer for the block.
|
||||
await fastPeerBlockRequest.wait(5.seconds)
|
||||
await engine.blocksDeliveryHandler(
|
||||
peerId, @[BlockDelivery(blk: blocks[0], address: address)]
|
||||
fastPeer, @[BlockDelivery(blk: requestedBlock, address: requestedBlock.address)]
|
||||
)
|
||||
check (await pending).tryGet() == blocks[0]
|
||||
|
||||
discard await blockHandle.wait(5.seconds)
|
||||
|
||||
test "Should cancel block request":
|
||||
var
|
||||
@ -522,8 +538,6 @@ asyncchecksuite "Block Download":
|
||||
|
||||
asyncchecksuite "Task Handler":
|
||||
var
|
||||
rng: Rng
|
||||
seckey: PrivateKey
|
||||
peerId: PeerId
|
||||
chunker: Chunker
|
||||
wallet: WalletRef
|
||||
@ -541,8 +555,7 @@ asyncchecksuite "Task Handler":
|
||||
blocks: seq[Block]
|
||||
|
||||
setup:
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256'nb)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256'nb)
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
if chunk.len <= 0:
|
||||
@ -550,8 +563,7 @@ asyncchecksuite "Task Handler":
|
||||
|
||||
blocks.add(Block.new(chunk).tryGet())
|
||||
|
||||
seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet()
|
||||
peerId = PeerId.example
|
||||
wallet = WalletRef.example
|
||||
blockDiscovery = Discovery.new()
|
||||
peerStore = PeerCtxStore.new()
|
||||
@ -571,138 +583,72 @@ asyncchecksuite "Task Handler":
|
||||
peersCtx = @[]
|
||||
|
||||
for i in 0 .. 3:
|
||||
let seckey = PrivateKey.random(rng[]).tryGet()
|
||||
peers.add(PeerId.init(seckey.getPublicKey().tryGet()).tryGet())
|
||||
|
||||
peers.add(PeerId.example)
|
||||
peersCtx.add(BlockExcPeerCtx(id: peers[i]))
|
||||
peerStore.add(peersCtx[i])
|
||||
|
||||
engine.pricing = Pricing.example.some
|
||||
|
||||
test "Should send want-blocks in priority order":
|
||||
# FIXME: this is disabled for now: I've dropped block priorities to make
|
||||
# my life easier as I try to optimize the protocol, and also because
|
||||
# they were not being used anywhere.
|
||||
#
|
||||
# test "Should send want-blocks in priority order":
|
||||
# proc sendBlocksDelivery(
|
||||
# id: PeerId, blocksDelivery: seq[BlockDelivery]
|
||||
# ) {.async: (raises: [CancelledError]).} =
|
||||
# check blocksDelivery.len == 2
|
||||
# check:
|
||||
# blocksDelivery[1].address == blocks[0].address
|
||||
# blocksDelivery[0].address == blocks[1].address
|
||||
|
||||
# for blk in blocks:
|
||||
# (await engine.localStore.putBlock(blk)).tryGet()
|
||||
# engine.network.request.sendBlocksDelivery = sendBlocksDelivery
|
||||
|
||||
# # second block to send by priority
|
||||
# peersCtx[0].peerWants.add(
|
||||
# WantListEntry(
|
||||
# address: blocks[0].address,
|
||||
# priority: 49,
|
||||
# cancel: false,
|
||||
# wantType: WantType.WantBlock,
|
||||
# sendDontHave: false,
|
||||
# )
|
||||
# )
|
||||
|
||||
# # first block to send by priority
|
||||
# peersCtx[0].peerWants.add(
|
||||
# WantListEntry(
|
||||
# address: blocks[1].address,
|
||||
# priority: 50,
|
||||
# cancel: false,
|
||||
# wantType: WantType.WantBlock,
|
||||
# sendDontHave: false,
|
||||
# )
|
||||
# )
|
||||
|
||||
# await engine.taskHandler(peersCtx[0])
|
||||
|
||||
test "Should mark outgoing blocks as sent":
|
||||
proc sendBlocksDelivery(
|
||||
id: PeerId, blocksDelivery: seq[BlockDelivery]
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
check blocksDelivery.len == 2
|
||||
check:
|
||||
blocksDelivery[1].address == blocks[0].address
|
||||
blocksDelivery[0].address == blocks[1].address
|
||||
let blockAddress = peersCtx[0].wantedBlocks.toSeq[0]
|
||||
check peersCtx[0].isBlockSent(blockAddress)
|
||||
|
||||
for blk in blocks:
|
||||
(await engine.localStore.putBlock(blk)).tryGet()
|
||||
engine.network.request.sendBlocksDelivery = sendBlocksDelivery
|
||||
|
||||
# second block to send by priority
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: blocks[0].address,
|
||||
priority: 49,
|
||||
cancel: false,
|
||||
wantType: WantType.WantBlock,
|
||||
sendDontHave: false,
|
||||
)
|
||||
)
|
||||
|
||||
# first block to send by priority
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: blocks[1].address,
|
||||
priority: 50,
|
||||
cancel: false,
|
||||
wantType: WantType.WantBlock,
|
||||
sendDontHave: false,
|
||||
)
|
||||
)
|
||||
peersCtx[0].wantedBlocks.incl(blocks[0].address)
|
||||
|
||||
await engine.taskHandler(peersCtx[0])
|
||||
|
||||
test "Should set in-flight for outgoing blocks":
|
||||
proc sendBlocksDelivery(
|
||||
id: PeerId, blocksDelivery: seq[BlockDelivery]
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
check peersCtx[0].peerWants[0].inFlight
|
||||
|
||||
for blk in blocks:
|
||||
(await engine.localStore.putBlock(blk)).tryGet()
|
||||
engine.network.request.sendBlocksDelivery = sendBlocksDelivery
|
||||
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: blocks[0].address,
|
||||
priority: 50,
|
||||
cancel: false,
|
||||
wantType: WantType.WantBlock,
|
||||
sendDontHave: false,
|
||||
inFlight: false,
|
||||
)
|
||||
)
|
||||
await engine.taskHandler(peersCtx[0])
|
||||
|
||||
test "Should clear in-flight when local lookup fails":
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: blocks[0].address,
|
||||
priority: 50,
|
||||
cancel: false,
|
||||
wantType: WantType.WantBlock,
|
||||
sendDontHave: false,
|
||||
inFlight: false,
|
||||
)
|
||||
)
|
||||
await engine.taskHandler(peersCtx[0])
|
||||
|
||||
check not peersCtx[0].peerWants[0].inFlight
|
||||
|
||||
test "Should send presence":
|
||||
let present = blocks
|
||||
let missing = @[Block.new("missing".toBytes).tryGet()]
|
||||
let price = (!engine.pricing).price
|
||||
|
||||
proc sendPresence(
|
||||
id: PeerId, presence: seq[BlockPresence]
|
||||
) {.async: (raises: [CancelledError]).} =
|
||||
check presence.mapIt(!Presence.init(it)) ==
|
||||
@[
|
||||
Presence(address: present[0].address, have: true, price: price),
|
||||
Presence(address: present[1].address, have: true, price: price),
|
||||
Presence(address: missing[0].address, have: false),
|
||||
]
|
||||
|
||||
for blk in blocks:
|
||||
(await engine.localStore.putBlock(blk)).tryGet()
|
||||
engine.network.request.sendPresence = sendPresence
|
||||
|
||||
# have block
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: present[0].address,
|
||||
priority: 1,
|
||||
cancel: false,
|
||||
wantType: WantType.WantHave,
|
||||
sendDontHave: false,
|
||||
)
|
||||
)
|
||||
|
||||
# have block
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: present[1].address,
|
||||
priority: 1,
|
||||
cancel: false,
|
||||
wantType: WantType.WantHave,
|
||||
sendDontHave: false,
|
||||
)
|
||||
)
|
||||
|
||||
# don't have block
|
||||
peersCtx[0].peerWants.add(
|
||||
WantListEntry(
|
||||
address: missing[0].address,
|
||||
priority: 1,
|
||||
cancel: false,
|
||||
wantType: WantType.WantHave,
|
||||
sendDontHave: false,
|
||||
)
|
||||
)
|
||||
test "Should not mark blocks for which local look fails as sent":
|
||||
peersCtx[0].wantedBlocks.incl(blocks[0].address)
|
||||
|
||||
await engine.taskHandler(peersCtx[0])
|
||||
|
||||
let blockAddress = peersCtx[0].wantedBlocks.toSeq[0]
|
||||
check not peersCtx[0].isBlockSent(blockAddress)
|
||||
|
||||
@ -40,7 +40,7 @@ asyncchecksuite "Network - Handlers":
|
||||
done = newFuture[void]()
|
||||
buffer = BufferStream.new()
|
||||
network = BlockExcNetwork.new(switch = newStandardSwitch(), connProvider = getConn)
|
||||
network.setupPeer(peerId)
|
||||
await network.handlePeerJoined(peerId)
|
||||
networkPeer = network.peers[peerId]
|
||||
discard await networkPeer.connect()
|
||||
|
||||
|
||||
@ -81,8 +81,9 @@ suite "Peer Context Store Peer Selection":
|
||||
)
|
||||
)
|
||||
|
||||
peerCtxs[0].peerWants = entries
|
||||
peerCtxs[5].peerWants = entries
|
||||
for address in addresses:
|
||||
peerCtxs[0].wantedBlocks.incl(address)
|
||||
peerCtxs[5].wantedBlocks.incl(address)
|
||||
|
||||
let peers = store.peersWant(addresses[4])
|
||||
|
||||
|
||||
@ -38,8 +38,7 @@ proc example*(_: type Pricing): Pricing =
|
||||
Pricing(address: EthAddress.example, price: uint32.rand.u256)
|
||||
|
||||
proc example*(_: type bt.Block, size: int = 4096): bt.Block =
|
||||
let length = rand(size)
|
||||
let bytes = newSeqWith(length, rand(uint8))
|
||||
let bytes = newSeqWith(size, rand(uint8))
|
||||
bt.Block.new(bytes).tryGet()
|
||||
|
||||
proc example*(_: type PeerId): PeerId =
|
||||
|
||||
@ -12,13 +12,16 @@ import pkg/codex/rng
|
||||
import pkg/codex/utils
|
||||
|
||||
import ./helpers/nodeutils
|
||||
import ./helpers/datasetutils
|
||||
import ./helpers/randomchunker
|
||||
import ./helpers/mockchunker
|
||||
import ./helpers/mockdiscovery
|
||||
import ./helpers/always
|
||||
import ../checktest
|
||||
|
||||
export randomchunker, nodeutils, mockdiscovery, mockchunker, always, checktest, manifest
|
||||
export
|
||||
randomchunker, nodeutils, datasetutils, mockdiscovery, mockchunker, always, checktest,
|
||||
manifest
|
||||
|
||||
export libp2p except setup, eventually
|
||||
|
||||
@ -46,23 +49,6 @@ proc lenPrefix*(msg: openArray[byte]): seq[byte] =
|
||||
|
||||
return buf
|
||||
|
||||
proc makeManifestAndTree*(blocks: seq[Block]): ?!(Manifest, CodexTree) =
|
||||
if blocks.len == 0:
|
||||
return failure("Blocks list was empty")
|
||||
|
||||
let
|
||||
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
|
||||
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
|
||||
tree = ?CodexTree.init(blocks.mapIt(it.cid))
|
||||
treeCid = ?tree.rootCid
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(blockSize),
|
||||
datasetSize = NBytes(datasetSize),
|
||||
)
|
||||
|
||||
return success((manifest, tree))
|
||||
|
||||
proc makeWantList*(
|
||||
cids: seq[Cid],
|
||||
priority: int = 0,
|
||||
@ -91,7 +77,7 @@ proc storeDataGetManifest*(
|
||||
(await store.putBlock(blk)).tryGet()
|
||||
|
||||
let
|
||||
(manifest, tree) = makeManifestAndTree(blocks).tryGet()
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
|
||||
for i in 0 ..< tree.leavesCount:
|
||||
@ -110,19 +96,6 @@ proc storeDataGetManifest*(
|
||||
|
||||
return await storeDataGetManifest(store, blocks)
|
||||
|
||||
proc makeRandomBlocks*(
|
||||
datasetSize: int, blockSize: NBytes
|
||||
): Future[seq[Block]] {.async.} =
|
||||
var chunker =
|
||||
RandomChunker.new(Rng.instance(), size = datasetSize, chunkSize = blockSize)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
result.add(Block.new(chunk).tryGet())
|
||||
|
||||
proc corruptBlocks*(
|
||||
store: BlockStore, manifest: Manifest, blks, bytes: int
|
||||
): Future[seq[int]] {.async.} =
|
||||
@ -147,4 +120,5 @@ proc corruptBlocks*(
|
||||
|
||||
bytePos.add(ii)
|
||||
blk.data[ii] = byte 0
|
||||
|
||||
return pos
|
||||
|
||||
45
tests/codex/helpers/datasetutils.nim
Normal file
45
tests/codex/helpers/datasetutils.nim
Normal file
@ -0,0 +1,45 @@
|
||||
import std/random
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/codex/blocktype as bt
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/manifest
|
||||
import pkg/codex/rng
|
||||
|
||||
import ./randomchunker
|
||||
|
||||
type TestDataset* = tuple[blocks: seq[Block], tree: CodexTree, manifest: Manifest]
|
||||
|
||||
proc makeRandomBlock*(size: NBytes): Block =
|
||||
let bytes = newSeqWith(size.int, rand(uint8))
|
||||
Block.new(bytes).tryGet()
|
||||
|
||||
proc makeRandomBlocks*(
|
||||
datasetSize: int, blockSize: NBytes
|
||||
): Future[seq[Block]] {.async.} =
|
||||
var chunker =
|
||||
RandomChunker.new(Rng.instance(), size = datasetSize, chunkSize = blockSize)
|
||||
|
||||
while true:
|
||||
let chunk = await chunker.getBytes()
|
||||
if chunk.len <= 0:
|
||||
break
|
||||
|
||||
result.add(Block.new(chunk).tryGet())
|
||||
|
||||
proc makeDataset*(blocks: seq[Block]): ?!TestDataset =
|
||||
if blocks.len == 0:
|
||||
return failure("Blocks list was empty")
|
||||
|
||||
let
|
||||
datasetSize = blocks.mapIt(it.data.len).foldl(a + b)
|
||||
blockSize = blocks.mapIt(it.data.len).foldl(max(a, b))
|
||||
tree = ?CodexTree.init(blocks.mapIt(it.cid))
|
||||
treeCid = ?tree.rootCid
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(blockSize),
|
||||
datasetSize = NBytes(datasetSize),
|
||||
)
|
||||
|
||||
return success((blocks, tree, manifest))
|
||||
@ -70,3 +70,31 @@ method provide*(
|
||||
return
|
||||
|
||||
await d.publishHostProvideHandler(d, host)
|
||||
|
||||
proc nullDiscovery*(): MockDiscovery =
|
||||
proc findBlockProvidersHandler(
|
||||
d: MockDiscovery, cid: Cid
|
||||
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
return @[]
|
||||
|
||||
proc publishBlockProvideHandler(
|
||||
d: MockDiscovery, cid: Cid
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
return
|
||||
|
||||
proc findHostProvidersHandler(
|
||||
d: MockDiscovery, host: ca.Address
|
||||
): Future[seq[SignedPeerRecord]] {.async: (raises: [CancelledError]).} =
|
||||
return @[]
|
||||
|
||||
proc publishHostProvideHandler(
|
||||
d: MockDiscovery, host: ca.Address
|
||||
): Future[void] {.async: (raises: [CancelledError]).} =
|
||||
return
|
||||
|
||||
return MockDiscovery(
|
||||
findBlockProvidersHandler: findBlockProvidersHandler,
|
||||
publishBlockProvideHandler: publishBlockProvideHandler,
|
||||
findHostProvidersHandler: findHostProvidersHandler,
|
||||
publishHostProvideHandler: publishHostProvideHandler,
|
||||
)
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import std/sequtils
|
||||
import std/sets
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/taskpools
|
||||
@ -12,10 +13,15 @@ import pkg/codex/blockexchange
|
||||
import pkg/codex/systemclock
|
||||
import pkg/codex/nat
|
||||
import pkg/codex/utils/natutils
|
||||
import pkg/codex/utils/safeasynciter
|
||||
import pkg/codex/slots
|
||||
import pkg/codex/merkletree
|
||||
import pkg/codex/manifest
|
||||
|
||||
import pkg/codex/node
|
||||
|
||||
import ./datasetutils
|
||||
import ./mockdiscovery
|
||||
import ../examples
|
||||
import ../../helpers
|
||||
|
||||
@ -58,6 +64,7 @@ type
|
||||
basePort*: int = 8080
|
||||
createFullNode*: bool = false
|
||||
enableBootstrap*: bool = false
|
||||
enableDiscovery*: bool = true
|
||||
|
||||
converter toTuple*(
|
||||
nc: NodesComponents
|
||||
@ -90,6 +97,36 @@ proc localStores*(cluster: NodesCluster): seq[BlockStore] =
|
||||
proc switches*(cluster: NodesCluster): seq[Switch] =
|
||||
cluster.components.mapIt(it.switch)
|
||||
|
||||
proc assignBlocks*(
|
||||
node: NodesComponents,
|
||||
dataset: TestDataset,
|
||||
indices: seq[int],
|
||||
putMerkleProofs = true,
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
let rootCid = dataset.tree.rootCid.tryGet()
|
||||
|
||||
for i in indices:
|
||||
assert (await node.networkStore.putBlock(dataset.blocks[i])).isOk
|
||||
if putMerkleProofs:
|
||||
assert (
|
||||
await node.networkStore.putCidAndProof(
|
||||
rootCid, i, dataset.blocks[i].cid, dataset.tree.getProof(i).tryGet()
|
||||
)
|
||||
).isOk
|
||||
|
||||
proc assignBlocks*(
|
||||
node: NodesComponents,
|
||||
dataset: TestDataset,
|
||||
indices: HSlice[int, int],
|
||||
putMerkleProofs = true,
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
await assignBlocks(node, dataset, indices.toSeq, putMerkleProofs)
|
||||
|
||||
proc assignBlocks*(
|
||||
node: NodesComponents, dataset: TestDataset, putMerkleProofs = true
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
await assignBlocks(node, dataset, 0 ..< dataset.blocks.len, putMerkleProofs)
|
||||
|
||||
proc generateNodes*(
|
||||
num: Natural, blocks: openArray[bt.Block] = [], config: NodeConfig = NodeConfig()
|
||||
): NodesCluster =
|
||||
@ -145,13 +182,18 @@ proc generateNodes*(
|
||||
store =
|
||||
RepoStore.new(repoStore.newDb(), mdStore.newDb(), clock = SystemClock.new())
|
||||
blockDiscoveryStore = bdStore.newDb()
|
||||
discovery = Discovery.new(
|
||||
switch.peerInfo.privateKey,
|
||||
announceAddrs = @[listenAddr],
|
||||
bindPort = bindPort.Port,
|
||||
store = blockDiscoveryStore,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
)
|
||||
discovery =
|
||||
if config.enableDiscovery:
|
||||
Discovery.new(
|
||||
switch.peerInfo.privateKey,
|
||||
announceAddrs = @[listenAddr],
|
||||
bindPort = bindPort.Port,
|
||||
store = blockDiscoveryStore,
|
||||
bootstrapNodes = bootstrapNodes,
|
||||
)
|
||||
else:
|
||||
nullDiscovery()
|
||||
|
||||
waitFor store.start()
|
||||
(store.BlockStore, @[bdStore, repoStore, mdStore], discovery)
|
||||
else:
|
||||
@ -225,6 +267,26 @@ proc generateNodes*(
|
||||
|
||||
return NodesCluster(components: components, taskpool: taskpool)
|
||||
|
||||
proc start*(nodes: NodesComponents) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(
|
||||
nodes.switch.start(),
|
||||
#nodes.blockDiscovery.start(),
|
||||
nodes.engine.start(),
|
||||
)
|
||||
|
||||
proc stop*(nodes: NodesComponents) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(
|
||||
nodes.switch.stop(),
|
||||
# nodes.blockDiscovery.stop(),
|
||||
nodes.engine.stop(),
|
||||
)
|
||||
|
||||
proc start*(nodes: seq[NodesComponents]) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(nodes.mapIt(it.start()).toSeq)
|
||||
|
||||
proc stop*(nodes: seq[NodesComponents]) {.async: (raises: [CatchableError]).} =
|
||||
await allFuturesThrowing(nodes.mapIt(it.stop()).toSeq)
|
||||
|
||||
proc connectNodes*(nodes: seq[Switch]) {.async.} =
|
||||
for dialer in nodes:
|
||||
for node in nodes:
|
||||
@ -234,6 +296,15 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} =
|
||||
proc connectNodes*(nodes: seq[NodesComponents]) {.async.} =
|
||||
await connectNodes(nodes.mapIt(it.switch))
|
||||
|
||||
proc connectNodes*(nodes: varargs[NodesComponents]): Future[void] =
|
||||
# varargs can't be captured on closures, and async procs are closures,
|
||||
# so we have to do this mess
|
||||
let copy = nodes.toSeq
|
||||
(
|
||||
proc() {.async.} =
|
||||
await connectNodes(copy.mapIt(it.switch))
|
||||
)()
|
||||
|
||||
proc connectNodes*(cluster: NodesCluster) {.async.} =
|
||||
await connectNodes(cluster.components)
|
||||
|
||||
@ -252,3 +323,26 @@ proc cleanup*(cluster: NodesCluster) {.async.} =
|
||||
await RepoStore(component.localStore).stop()
|
||||
|
||||
cluster.taskpool.shutdown()
|
||||
|
||||
proc linearTopology*(nodes: seq[NodesComponents]) {.async.} =
|
||||
for i in 0 .. nodes.len - 2:
|
||||
await connectNodes(nodes[i], nodes[i + 1])
|
||||
|
||||
proc downloadDataset*(
|
||||
node: NodesComponents, dataset: TestDataset
|
||||
): Future[void] {.async.} =
|
||||
# This is the same as fetchBatched, but we don't construct CodexNodes so I can't use
|
||||
# it here.
|
||||
let requestAddresses = collect:
|
||||
for i in 0 ..< dataset.manifest.blocksCount:
|
||||
BlockAddress.init(dataset.manifest.treeCid, i)
|
||||
|
||||
let blockCids = dataset.blocks.mapIt(it.cid).toHashSet()
|
||||
|
||||
var count = 0
|
||||
for blockFut in (await node.networkStore.getBlocks(requestAddresses)):
|
||||
let blk = (await blockFut).tryGet()
|
||||
assert blk.cid in blockCids, "Unknown block CID: " & $blk.cid
|
||||
count += 1
|
||||
|
||||
assert count == dataset.blocks.len, "Incorrect number of blocks downloaded"
|
||||
|
||||
@ -82,7 +82,7 @@ asyncchecksuite "Test Node - Basic":
|
||||
).tryGet()
|
||||
|
||||
test "Block Batching with corrupted blocks":
|
||||
let blocks = await makeRandomBlocks(datasetSize = 64.KiBs.int, blockSize = 64.KiBs)
|
||||
let blocks = await makeRandomBlocks(datasetSize = 65536, blockSize = 64.KiBs)
|
||||
assert blocks.len == 1
|
||||
|
||||
let blk = blocks[0]
|
||||
|
||||
@ -48,6 +48,7 @@ asyncchecksuite "Test Node - Slot Repair":
|
||||
findFreePorts: true,
|
||||
createFullNode: true,
|
||||
enableBootstrap: true,
|
||||
enableDiscovery: true,
|
||||
)
|
||||
var
|
||||
manifest: Manifest
|
||||
|
||||
@ -38,8 +38,8 @@ proc commonBlockStoreTests*(
|
||||
newBlock2 = Block.new("2".repeat(100).toBytes()).tryGet()
|
||||
newBlock3 = Block.new("3".repeat(100).toBytes()).tryGet()
|
||||
|
||||
(manifest, tree) =
|
||||
makeManifestAndTree(@[newBlock, newBlock1, newBlock2, newBlock3]).tryGet()
|
||||
(_, tree, manifest) =
|
||||
makeDataset(@[newBlock, newBlock1, newBlock2, newBlock3]).tryGet()
|
||||
|
||||
if not isNil(before):
|
||||
await before()
|
||||
|
||||
@ -364,9 +364,11 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
(blocks, tree, manifest) = makeDataset(
|
||||
await makeRandomBlocks(datasetSize = 2 * 256, blockSize = 256'nb)
|
||||
)
|
||||
.tryGet()
|
||||
blk = blocks[0]
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(0).tryGet()
|
||||
|
||||
@ -381,9 +383,11 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
(blocks, tree, manifest) = makeDataset(
|
||||
await makeRandomBlocks(datasetSize = 2 * 256, blockSize = 256'nb)
|
||||
)
|
||||
.tryGet()
|
||||
blk = blocks[0]
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(0).tryGet()
|
||||
|
||||
@ -406,9 +410,9 @@ asyncchecksuite "RepoStore":
|
||||
let sharedBlock = blockPool[1]
|
||||
|
||||
let
|
||||
(manifest1, tree1) = makeManifestAndTree(dataset1).tryGet()
|
||||
(_, tree1, manifest1) = makeDataset(dataset1).tryGet()
|
||||
treeCid1 = tree1.rootCid.tryGet()
|
||||
(manifest2, tree2) = makeManifestAndTree(dataset2).tryGet()
|
||||
(_, tree2, manifest2) = makeDataset(dataset2).tryGet()
|
||||
treeCid2 = tree2.rootCid.tryGet()
|
||||
|
||||
(await repo.putBlock(sharedBlock)).tryGet()
|
||||
@ -435,9 +439,9 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
blocks = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = blocks[0]
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(1).tryGet()
|
||||
|
||||
@ -455,9 +459,9 @@ asyncchecksuite "RepoStore":
|
||||
let
|
||||
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes =
|
||||
1000'nb)
|
||||
dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = dataset[0]
|
||||
(manifest, tree) = makeManifestAndTree(dataset).tryGet()
|
||||
blocks = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb)
|
||||
blk = blocks[0]
|
||||
(_, tree, manifest) = makeDataset(blocks).tryGet()
|
||||
treeCid = tree.rootCid.tryGet()
|
||||
proof = tree.getProof(1).tryGet()
|
||||
|
||||
|
||||
44
tests/codex/testblocktype.nim
Normal file
44
tests/codex/testblocktype.nim
Normal file
@ -0,0 +1,44 @@
|
||||
import pkg/unittest2
|
||||
import pkg/libp2p/cid
|
||||
|
||||
import pkg/codex/blocktype
|
||||
|
||||
import ./examples
|
||||
|
||||
suite "blocktype":
|
||||
test "should hash equal non-leaf block addresses onto the same hash":
|
||||
let
|
||||
cid1 = Cid.example
|
||||
nonLeaf1 = BlockAddress.init(cid1)
|
||||
nonLeaf2 = BlockAddress.init(cid1)
|
||||
|
||||
check nonLeaf1 == nonLeaf2
|
||||
check nonLeaf1.hash == nonLeaf2.hash
|
||||
|
||||
test "should hash equal leaf block addresses onto the same hash":
|
||||
let
|
||||
cid1 = Cid.example
|
||||
leaf1 = BlockAddress.init(cid1, 0)
|
||||
leaf2 = BlockAddress.init(cid1, 0)
|
||||
|
||||
check leaf1 == leaf2
|
||||
check leaf1.hash == leaf2.hash
|
||||
|
||||
test "should hash different non-leaf block addresses onto different hashes":
|
||||
let
|
||||
cid1 = Cid.example
|
||||
cid2 = Cid.example
|
||||
nonLeaf1 = BlockAddress.init(cid1)
|
||||
nonLeaf2 = BlockAddress.init(cid2)
|
||||
|
||||
check nonLeaf1 != nonLeaf2
|
||||
check nonLeaf1.hash != nonLeaf2.hash
|
||||
|
||||
test "should hash different leaf block addresses onto different hashes":
|
||||
let
|
||||
cid1 = Cid.example
|
||||
leaf1 = BlockAddress.init(cid1, 0)
|
||||
leaf2 = BlockAddress.init(cid1, 1)
|
||||
|
||||
check leaf1 != leaf2
|
||||
check leaf1.hash != leaf2.hash
|
||||
@ -373,7 +373,7 @@ asyncchecksuite "Test SafeAsyncIter":
|
||||
# Now, to make sure that this mechanism works, and to document its
|
||||
# cancellation semantics, this test shows that when the async predicate
|
||||
# function is cancelled, this cancellation has immediate effect, which means
|
||||
# that `next()` (or more precisely `getNext()` in `mapFilter` function), is
|
||||
# that `next()` (or more precisely `getNext()` in `mapFilter` function), is
|
||||
# interrupted immediately. If this is the case, the the iterator be interrupted
|
||||
# before `next()` returns this locally captured value from the previous
|
||||
# iteration and this is exactly the reason why at the end of the test
|
||||
@ -415,3 +415,20 @@ asyncchecksuite "Test SafeAsyncIter":
|
||||
# will not be returned because of the cancellation.
|
||||
collected == @["0", "1"]
|
||||
iter2.finished
|
||||
|
||||
test "should allow chaining":
|
||||
let
|
||||
iter1 = SafeAsyncIter[int].new(0 ..< 5)
|
||||
iter2 = SafeAsyncIter[int].new(5 ..< 10)
|
||||
iter3 = chain[int](iter1, SafeAsyncIter[int].empty, iter2)
|
||||
|
||||
var collected: seq[int]
|
||||
|
||||
for fut in iter3:
|
||||
without i =? (await fut), err:
|
||||
fail()
|
||||
collected.add(i)
|
||||
|
||||
check:
|
||||
iter3.finished
|
||||
collected == @[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||
|
||||
@ -7,12 +7,11 @@ import std/sequtils, chronos
|
||||
export multisetup, trackers, templeveldb
|
||||
|
||||
### taken from libp2p errorhelpers.nim
|
||||
proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
|
||||
proc allFuturesThrowing(futs: seq[FutureBase]): Future[void] =
|
||||
# This proc is only meant for use in tests / not suitable for general use.
|
||||
# - Swallowing errors arbitrarily instead of aggregating them is bad design
|
||||
# - It raises `CatchableError` instead of the union of the `futs` errors,
|
||||
# inflating the caller's `raises` list unnecessarily. `macro` could fix it
|
||||
let futs = @args
|
||||
(
|
||||
proc() {.async: (raises: [CatchableError]).} =
|
||||
await allFutures(futs)
|
||||
@ -28,6 +27,9 @@ proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
|
||||
raise firstErr
|
||||
)()
|
||||
|
||||
proc allFuturesThrowing*(args: varargs[FutureBase]): Future[void] =
|
||||
allFuturesThrowing(@args)
|
||||
|
||||
proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] =
|
||||
allFuturesThrowing(futs.mapIt(FutureBase(it)))
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user