mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-06 07:23:10 +00:00
feat: add dataset request batching
This commit is contained in:
parent
2dd436bfb7
commit
41f94d7a73
@ -78,6 +78,8 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
|
|||||||
trace "Discovery request already in progress", cid
|
trace "Discovery request already in progress", cid
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
trace "Running discovery task for cid", cid
|
||||||
|
|
||||||
let haves = b.peers.peersHave(cid)
|
let haves = b.peers.peersHave(cid)
|
||||||
|
|
||||||
if haves.len < b.minPeersPerBlock:
|
if haves.len < b.minPeersPerBlock:
|
||||||
|
|||||||
@ -154,6 +154,28 @@ proc sendWantBlock(
|
|||||||
) # we want this remote to send us a block
|
) # we want this remote to send us a block
|
||||||
codex_block_exchange_want_block_lists_sent.inc()
|
codex_block_exchange_want_block_lists_sent.inc()
|
||||||
|
|
||||||
|
proc refreshBlockKnowledge(self: BlockExcEngine, peer: BlockExcPeerCtx) {.async: (raises: [CancelledError]).} =
|
||||||
|
# broadcast our want list, the other peer will do the same
|
||||||
|
if self.pendingBlocks.wantListLen > 0:
|
||||||
|
let cids = toSeq(self.pendingBlocks.wantList)
|
||||||
|
trace "Sending our want list to a peer", peer = peer.id, length = cids.len
|
||||||
|
await self.network.request.sendWantList(peer.id, cids, full = true)
|
||||||
|
|
||||||
|
proc refreshBlockKnowledge(self: BlockExcEngine) {.async: (raises: [CancelledError]).} =
|
||||||
|
for peer in self.peers.peers.values:
|
||||||
|
# We refresh block knowledge if:
|
||||||
|
# 1. the peer hasn't been refreshed in a while;
|
||||||
|
# 2. the list of blocks we care about has actually changed.
|
||||||
|
#
|
||||||
|
# Note that because of (2), it is important that we update our
|
||||||
|
# want list in the coarsest way possible instead of over many
|
||||||
|
# small updates.
|
||||||
|
#
|
||||||
|
# In dynamic swarms, staleness will dominate latency.
|
||||||
|
if peer.lastRefresh < self.pendingBlocks.lastInclusion or peer.isKnowledgeStale:
|
||||||
|
await self.refreshBlockKnowledge(peer)
|
||||||
|
peer.refreshed()
|
||||||
|
|
||||||
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
|
proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx =
|
||||||
Rng.instance.sample(peers)
|
Rng.instance.sample(peers)
|
||||||
|
|
||||||
@ -189,7 +211,7 @@ proc downloadInternal(
|
|||||||
else:
|
else:
|
||||||
self.pendingBlocks.setInFlight(address, false)
|
self.pendingBlocks.setInFlight(address, false)
|
||||||
if peers.without.len > 0:
|
if peers.without.len > 0:
|
||||||
await self.sendWantHave(@[address], peers.without)
|
await self.refreshBlockKnowledge()
|
||||||
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
self.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
|
||||||
|
|
||||||
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
|
await (handle or sleepAsync(self.pendingBlocks.retryInterval))
|
||||||
@ -209,6 +231,32 @@ proc downloadInternal(
|
|||||||
finally:
|
finally:
|
||||||
self.pendingBlocks.setInFlight(address, false)
|
self.pendingBlocks.setInFlight(address, false)
|
||||||
|
|
||||||
|
proc requestBlocks*(self: BlockExcEngine, addresses: seq[BlockAddress]): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
|
||||||
|
var handles: seq[BlockHandle]
|
||||||
|
# Adds all blocks to pendingBlocks before calling the first downloadInternal. This will
|
||||||
|
# ensure that we don't send incomplete want lists.
|
||||||
|
for address in addresses:
|
||||||
|
if address notin self.pendingBlocks:
|
||||||
|
handles.add(self.pendingBlocks.getWantHandle(address))
|
||||||
|
|
||||||
|
for address in addresses:
|
||||||
|
self.trackedFutures.track(self.downloadInternal(address))
|
||||||
|
|
||||||
|
# TODO: we can reduce latency and improve download times
|
||||||
|
# by returning blocks out of order as futures complete.
|
||||||
|
var blocks: seq[?!Block]
|
||||||
|
for handle in handles:
|
||||||
|
try:
|
||||||
|
blocks.add(success await handle)
|
||||||
|
except CancelledError as err:
|
||||||
|
warn "Block request cancelled", addresses, err = err.msg
|
||||||
|
raise err
|
||||||
|
except CatchableError as err:
|
||||||
|
error "Error getting blocks from exchange engine", addresses, err = err.msg
|
||||||
|
blocks.add(Block.failure err)
|
||||||
|
|
||||||
|
return blocks
|
||||||
|
|
||||||
proc requestBlock*(
|
proc requestBlock*(
|
||||||
self: BlockExcEngine, address: BlockAddress
|
self: BlockExcEngine, address: BlockAddress
|
||||||
): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||||
@ -233,7 +281,7 @@ proc requestBlock*(
|
|||||||
proc blockPresenceHandler*(
|
proc blockPresenceHandler*(
|
||||||
self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
|
self: BlockExcEngine, peer: PeerId, blocks: seq[BlockPresence]
|
||||||
) {.async: (raises: []).} =
|
) {.async: (raises: []).} =
|
||||||
trace "Received block presence from peer", peer, blocks = blocks.mapIt($it)
|
trace "Received block presence from peer", peer, len = blocks.len
|
||||||
let
|
let
|
||||||
peerCtx = self.peers.get(peer)
|
peerCtx = self.peers.get(peer)
|
||||||
ourWantList = toSeq(self.pendingBlocks.wantList)
|
ourWantList = toSeq(self.pendingBlocks.wantList)
|
||||||
@ -476,12 +524,14 @@ proc wantListHandler*(
|
|||||||
case e.wantType
|
case e.wantType
|
||||||
of WantType.WantHave:
|
of WantType.WantHave:
|
||||||
if have:
|
if have:
|
||||||
|
trace "We HAVE the block", address = e.address
|
||||||
presence.add(
|
presence.add(
|
||||||
BlockPresence(
|
BlockPresence(
|
||||||
address: e.address, `type`: BlockPresenceType.Have, price: price
|
address: e.address, `type`: BlockPresenceType.Have, price: price
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
trace "We DON'T HAVE the block", address = e.address
|
||||||
if e.sendDontHave:
|
if e.sendDontHave:
|
||||||
presence.add(
|
presence.add(
|
||||||
BlockPresence(
|
BlockPresence(
|
||||||
@ -554,15 +604,11 @@ proc setupPeer*(
|
|||||||
trace "Setting up peer", peer
|
trace "Setting up peer", peer
|
||||||
|
|
||||||
if peer notin self.peers:
|
if peer notin self.peers:
|
||||||
|
let peerCtx = BlockExcPeerCtx(id: peer)
|
||||||
trace "Setting up new peer", peer
|
trace "Setting up new peer", peer
|
||||||
self.peers.add(BlockExcPeerCtx(id: peer))
|
self.peers.add(peerCtx)
|
||||||
trace "Added peer", peers = self.peers.len
|
trace "Added peer", peers = self.peers.len
|
||||||
|
await self.refreshBlockKnowledge(peerCtx)
|
||||||
# broadcast our want list, the other peer will do the same
|
|
||||||
if self.pendingBlocks.wantListLen > 0:
|
|
||||||
trace "Sending our want list to a peer", peer
|
|
||||||
let cids = toSeq(self.pendingBlocks.wantList)
|
|
||||||
await self.network.request.sendWantList(peer, cids, full = true)
|
|
||||||
|
|
||||||
if address =? self.pricing .? address:
|
if address =? self.pricing .? address:
|
||||||
trace "Sending account to peer", peer
|
trace "Sending account to peer", peer
|
||||||
|
|||||||
@ -34,7 +34,7 @@ declareGauge(
|
|||||||
|
|
||||||
const
|
const
|
||||||
DefaultBlockRetries* = 3000
|
DefaultBlockRetries* = 3000
|
||||||
DefaultRetryInterval* = 500.millis
|
DefaultRetryInterval* = 10.seconds
|
||||||
|
|
||||||
type
|
type
|
||||||
RetriesExhaustedError* = object of CatchableError
|
RetriesExhaustedError* = object of CatchableError
|
||||||
@ -50,6 +50,7 @@ type
|
|||||||
blockRetries*: int = DefaultBlockRetries
|
blockRetries*: int = DefaultBlockRetries
|
||||||
retryInterval*: Duration = DefaultRetryInterval
|
retryInterval*: Duration = DefaultRetryInterval
|
||||||
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
|
blocks*: Table[BlockAddress, BlockReq] # pending Block requests
|
||||||
|
lastInclusion*: Moment # time at which we last included a block into our wantlist
|
||||||
|
|
||||||
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
proc updatePendingBlockGauge(p: PendingBlocksManager) =
|
||||||
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
codex_block_exchange_pending_block_requests.set(p.blocks.len.int64)
|
||||||
@ -70,6 +71,8 @@ proc getWantHandle*(
|
|||||||
startTime: getMonoTime().ticks,
|
startTime: getMonoTime().ticks,
|
||||||
)
|
)
|
||||||
self.blocks[address] = blk
|
self.blocks[address] = blk
|
||||||
|
self.lastInclusion = Moment.now()
|
||||||
|
|
||||||
let handle = blk.handle
|
let handle = blk.handle
|
||||||
|
|
||||||
proc cleanUpBlock(data: pointer) {.raises: [].} =
|
proc cleanUpBlock(data: pointer) {.raises: [].} =
|
||||||
|
|||||||
@ -31,9 +31,16 @@ type BlockExcPeerCtx* = ref object of RootObj
|
|||||||
peerWants*: seq[WantListEntry] # remote peers want lists
|
peerWants*: seq[WantListEntry] # remote peers want lists
|
||||||
exchanged*: int # times peer has exchanged with us
|
exchanged*: int # times peer has exchanged with us
|
||||||
lastExchange*: Moment # last time peer has exchanged with us
|
lastExchange*: Moment # last time peer has exchanged with us
|
||||||
|
lastRefresh*: Moment # last time we refreshed our knowledge of the blocks this peer has
|
||||||
account*: ?Account # ethereum account of this peer
|
account*: ?Account # ethereum account of this peer
|
||||||
paymentChannel*: ?ChannelId # payment channel id
|
paymentChannel*: ?ChannelId # payment channel id
|
||||||
|
|
||||||
|
proc isKnowledgeStale*(self: BlockExcPeerCtx): bool =
|
||||||
|
self.lastRefresh + 15.seconds < Moment.now()
|
||||||
|
|
||||||
|
proc refreshed*(self: BlockExcPeerCtx) =
|
||||||
|
self.lastRefresh = Moment.now()
|
||||||
|
|
||||||
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
|
proc peerHave*(self: BlockExcPeerCtx): seq[BlockAddress] =
|
||||||
toSeq(self.blocks.keys)
|
toSeq(self.blocks.keys)
|
||||||
|
|
||||||
|
|||||||
@ -52,7 +52,7 @@ export logutils
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "codex node"
|
topics = "codex node"
|
||||||
|
|
||||||
const DefaultFetchBatch = 10
|
const DefaultFetchBatch = 1_000_000
|
||||||
|
|
||||||
type
|
type
|
||||||
Contracts* =
|
Contracts* =
|
||||||
@ -187,23 +187,18 @@ proc fetchBatched*(
|
|||||||
# )
|
# )
|
||||||
|
|
||||||
while not iter.finished:
|
while not iter.finished:
|
||||||
let blockFutures = collect:
|
let addresses = collect:
|
||||||
for i in 0 ..< batchSize:
|
for i in 0 ..< batchSize:
|
||||||
if not iter.finished:
|
if not iter.finished:
|
||||||
let address = BlockAddress.init(cid, iter.next())
|
let address = BlockAddress.init(cid, iter.next())
|
||||||
if not (await address in self.networkStore) or fetchLocal:
|
if not (await address in self.networkStore) or fetchLocal:
|
||||||
self.networkStore.getBlock(address)
|
address
|
||||||
|
|
||||||
if blockFutures.len == 0:
|
let
|
||||||
continue
|
blockResults = await self.networkStore.getBlocks(addresses)
|
||||||
|
blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
|
||||||
|
numOfFailedBlocks = blockResults.len - blocks.len
|
||||||
|
|
||||||
without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
|
|
||||||
trace "Some blocks failed to fetch", err = err.msg
|
|
||||||
return failure(err)
|
|
||||||
|
|
||||||
let blocks = blockResults.filterIt(it.isSuccess()).mapIt(it.value)
|
|
||||||
|
|
||||||
let numOfFailedBlocks = blockResults.len - blocks.len
|
|
||||||
if numOfFailedBlocks > 0:
|
if numOfFailedBlocks > 0:
|
||||||
return
|
return
|
||||||
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
|
failure("Some blocks failed (Result) to fetch (" & $numOfFailedBlocks & ")")
|
||||||
|
|||||||
@ -31,6 +31,26 @@ type NetworkStore* = ref object of BlockStore
|
|||||||
engine*: BlockExcEngine # blockexc decision engine
|
engine*: BlockExcEngine # blockexc decision engine
|
||||||
localStore*: BlockStore # local block store
|
localStore*: BlockStore # local block store
|
||||||
|
|
||||||
|
proc getBlocks*(
|
||||||
|
self: NetworkStore,
|
||||||
|
addresses: seq[BlockAddress]
|
||||||
|
): Future[seq[?!Block]] {.async: (raises: [CancelledError]).} =
|
||||||
|
var
|
||||||
|
localBlocks: seq[?!Block]
|
||||||
|
remoteAddresses: seq[BlockAddress]
|
||||||
|
|
||||||
|
# We can resolve local blocks sequentially as for now those are blocking anyway. Still:
|
||||||
|
# TODO: implement getBlocks for local store so we can delegate it here.
|
||||||
|
for address in addresses:
|
||||||
|
if not (await address in self.localStore):
|
||||||
|
remoteAddresses.add(address)
|
||||||
|
else:
|
||||||
|
localBlocks.add(await self.localStore.getBlock(address))
|
||||||
|
|
||||||
|
let remoteBlocks = await self.engine.requestBlocks(remoteAddresses)
|
||||||
|
|
||||||
|
return localBlocks.concat(remoteBlocks)
|
||||||
|
|
||||||
method getBlock*(
|
method getBlock*(
|
||||||
self: NetworkStore, address: BlockAddress
|
self: NetworkStore, address: BlockAddress
|
||||||
): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
): Future[?!Block] {.async: (raises: [CancelledError]).} =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user