mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-09 08:53:10 +00:00
fix: fix block exchange test to stricter protocol; minor refactor
This commit is contained in:
parent
97fd68e4a3
commit
572b44856b
@ -279,7 +279,7 @@ proc downloadInternal(
|
||||
if not self.pendingBlocks.isRequested(address):
|
||||
let peer = self.selectPeer(peers.with)
|
||||
self.pendingBlocks.markRequested(address, peer.id)
|
||||
peer.blockRequested(address)
|
||||
peer.blockRequestScheduled(address)
|
||||
trace "Request block from block retry loop"
|
||||
await self.sendWantBlock(@[address], peer)
|
||||
peer
|
||||
@ -412,7 +412,7 @@ proc blockPresenceHandler*(
|
||||
for address in ourWantCids:
|
||||
self.pendingBlocks.decRetries(address)
|
||||
self.pendingBlocks.markRequested(address, peer)
|
||||
peerCtx.blockRequested(address)
|
||||
peerCtx.blockRequestScheduled(address)
|
||||
|
||||
if ourWantCids.len > 0:
|
||||
trace "Peer has blocks in our wantList", peer, wants = ourWantCids
|
||||
@ -773,6 +773,8 @@ proc taskHandler*(
|
||||
wantedBlocks = peerCtx.wantedBlocks.filterIt(not peerCtx.isBlockSent(it))
|
||||
sent: HashSet[BlockAddress]
|
||||
|
||||
trace "Running task for peer", peer = peerCtx.id
|
||||
|
||||
for wantedBlock in wantedBlocks:
|
||||
peerCtx.markBlockAsSent(wantedBlock)
|
||||
|
||||
|
||||
@ -80,13 +80,16 @@ func price*(self: BlockExcPeerCtx, addresses: seq[BlockAddress]): UInt256 =
|
||||
|
||||
price
|
||||
|
||||
proc blockRequested*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
# We start counting the timeout from the first block requested.
|
||||
proc blockRequestScheduled*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
## Adds a block the set of blocks that have been requested to this peer
|
||||
## (its request schedule).
|
||||
if self.blocksRequested.len == 0:
|
||||
self.lastExchange = Moment.now()
|
||||
self.blocksRequested.incl(address)
|
||||
|
||||
proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) =
|
||||
## Removes a block from the set of blocks that have been requested to this peer
|
||||
## (its request schedule).
|
||||
self.blocksRequested.excl(address)
|
||||
|
||||
proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress): bool =
|
||||
|
||||
@ -96,6 +96,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
|
||||
test "Should send want-have for block":
|
||||
let blk = bt.Block.new("Block 1".toBytes).tryGet()
|
||||
let blkFut = nodeCmps1.pendingBlocks.getWantHandle(blk.cid)
|
||||
peerCtx2.blockRequestScheduled(blk.address)
|
||||
|
||||
(await nodeCmps2.localStore.putBlock(blk)).tryGet()
|
||||
|
||||
peerCtx1.wantedBlocks.incl(blk.address)
|
||||
|
||||
@ -242,7 +242,7 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
let pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid))
|
||||
|
||||
for blk in blocks:
|
||||
peerCtx.blockRequested(blk.address)
|
||||
peerCtx.blockRequestScheduled(blk.address)
|
||||
|
||||
let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
|
||||
|
||||
@ -270,7 +270,7 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
).toTable
|
||||
|
||||
for blk in blocks:
|
||||
peerContext.blockRequested(blk.address)
|
||||
peerContext.blockRequestScheduled(blk.address)
|
||||
|
||||
engine.network = BlockExcNetwork(
|
||||
request: BlockExcRequest(
|
||||
@ -349,10 +349,10 @@ asyncchecksuite "NetworkStore engine handlers":
|
||||
|
||||
engine.peers.add(senderPeerCtx)
|
||||
for address in reqBlockAddrs:
|
||||
pendingPeerCtx.blockRequested(address)
|
||||
pendingPeerCtx.blockRequestScheduled(address)
|
||||
|
||||
for address in blocks.mapIt(it.address):
|
||||
senderPeerCtx.blockRequested(address)
|
||||
senderPeerCtx.blockRequestScheduled(address)
|
||||
|
||||
proc sendWantCancellations(
|
||||
id: PeerId, addresses: seq[BlockAddress]
|
||||
|
||||
@ -40,7 +40,7 @@ asyncchecksuite "Network - Handlers":
|
||||
done = newFuture[void]()
|
||||
buffer = BufferStream.new()
|
||||
network = BlockExcNetwork.new(switch = newStandardSwitch(), connProvider = getConn)
|
||||
network.setupPeer(peerId)
|
||||
await network.handlePeerJoined(peerId)
|
||||
networkPeer = network.peers[peerId]
|
||||
discard await networkPeer.connect()
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user