Fixes active cancellation for pending want requests (#714)

* add block cancellation support + tests

* tie issueCancellations into resolveBlocks for proper exception tracking, address comments

* pull cancellation as separate primitive in BlockExcNetwork

* use allFutures, rename issueBlockCancellations -> cancelBlocks

* use trc instead of wrn to register send error

* do not log peer IDs
This commit is contained in:
Giuliano Mega 2024-02-22 11:54:45 -03:00 committed by GitHub
parent 1eebaa4ce3
commit 457567531f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 213 additions and 82 deletions

View File

@ -120,7 +120,6 @@ proc stop*(b: BlockExcEngine) {.async.} =
trace "NetworkStore stopped" trace "NetworkStore stopped"
proc sendWantHave( proc sendWantHave(
b: BlockExcEngine, b: BlockExcEngine,
address: BlockAddress, address: BlockAddress,
@ -146,16 +145,6 @@ proc sendWantBlock(
@[address], @[address],
wantType = WantType.WantBlock) # we want this remote to send us a block wantType = WantType.WantBlock) # we want this remote to send us a block
proc findCheapestPeerForBlock(b: BlockExcEngine, cheapestPeers: seq[BlockExcPeerCtx]): ?BlockExcPeerCtx =
if cheapestPeers.len <= 0:
trace "No cheapest peers, selecting first in list"
let
peers = toSeq(b.peers) # Get any peer
if peers.len <= 0:
return none(BlockExcPeerCtx)
return some(peers[0])
return some(cheapestPeers[0]) # get cheapest
proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId) {.async.} = proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: BlockAddress, peerId: PeerId) {.async.} =
try: try:
trace "Monitoring block handle", address, peerId trace "Monitoring block handle", address, peerId
@ -231,7 +220,7 @@ proc blockPresenceHandler*(
have = presence.have have = presence.have
price = presence.price price = presence.price
trace "Updating precense" trace "Updating presence"
peerCtx.setPresence(presence) peerCtx.setPresence(presence)
let let
@ -280,6 +269,20 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn
break # do next peer break # do next peer
proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} =
## Tells neighboring peers that we're no longer interested in a block.
trace "Sending block request cancellations to peers", addrs = addrs.len
let failed = (await allFinished(
b.peers.mapIt(
b.network.request.sendWantCancellations(
peer = it.id,
addresses = addrs))))
.filterIt(it.failed)
if failed.len > 0:
trace "Failed to send block request cancellations to peers", peers = failed.len
proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} =
trace "Resolving blocks", blocks = blocksDelivery.len trace "Resolving blocks", blocks = blocksDelivery.len
@ -290,6 +293,8 @@ proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asy
cids.incl(bd.blk.cid) cids.incl(bd.blk.cid)
if bd.address.leaf: if bd.address.leaf:
cids.incl(bd.address.treeCid) cids.incl(bd.address.treeCid)
await b.cancelBlocks(blocksDelivery.mapIt(it.address))
b.discovery.queueProvideBlocksReq(cids.toSeq) b.discovery.queueProvideBlocksReq(cids.toSeq)
proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} =
@ -398,7 +403,7 @@ proc wantListHandler*(
for e in wantList.entries: for e in wantList.entries:
let let
idx = peerCtx.peerWants.find(e) idx = peerCtx.peerWants.findIt(it.address == e.address)
logScope: logScope:
peer = peerCtx.id peer = peerCtx.id

View File

@ -39,14 +39,6 @@ type
BlockPresenceHandler* = proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.} BlockPresenceHandler* = proc(peer: PeerId, precense: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} AccountHandler* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.}
PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.} PaymentHandler* = proc(peer: PeerId, payment: SignedState): Future[void] {.gcsafe.}
WantListSender* = proc(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.}
BlockExcHandlers* = object BlockExcHandlers* = object
onWantList*: WantListHandler onWantList*: WantListHandler
@ -55,6 +47,15 @@ type
onAccount*: AccountHandler onAccount*: AccountHandler
onPayment*: PaymentHandler onPayment*: PaymentHandler
WantListSender* = proc(
id: PeerId,
addresses: seq[BlockAddress],
priority: int32 = 0,
cancel: bool = false,
wantType: WantType = WantType.WantHave,
full: bool = false,
sendDontHave: bool = false): Future[void] {.gcsafe.}
WantCancellationSender* = proc(peer: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.}
BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} BlocksDeliverySender* = proc(peer: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.}
PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} PresenceSender* = proc(peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.}
AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.} AccountSender* = proc(peer: PeerId, account: Account): Future[void] {.gcsafe.}
@ -62,6 +63,7 @@ type
BlockExcRequest* = object BlockExcRequest* = object
sendWantList*: WantListSender sendWantList*: WantListSender
sendWantCancellations*: WantCancellationSender
sendBlocksDelivery*: BlocksDeliverySender sendBlocksDelivery*: BlocksDeliverySender
sendPresence*: PresenceSender sendPresence*: PresenceSender
sendAccount*: AccountSender sendAccount*: AccountSender
@ -139,6 +141,17 @@ proc sendWantList*(
b.send(id, Message(wantlist: msg)) b.send(id, Message(wantlist: msg))
proc sendWantCancellations*(
b: BlockExcNetwork,
id: PeerId,
addresses: seq[BlockAddress]): Future[void] {.async.} =
## Informs a remote peer that we're no longer interested in a set of blocks
##
trace "Sending block request cancellation to peer", addrs = addresses.len, peer = id
await b.sendWantList(id = id, addresses = addresses, cancel = true)
proc handleBlocksDelivery( proc handleBlocksDelivery(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
@ -340,6 +353,9 @@ proc new*(
id, cids, priority, cancel, id, cids, priority, cancel,
wantType, full, sendDontHave) wantType, full, sendDontHave)
proc sendWantCancellations(id: PeerId, addresses: seq[BlockAddress]): Future[void] {.gcsafe.} =
self.sendWantCancellations(id, addresses)
proc sendBlocksDelivery(id: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} = proc sendBlocksDelivery(id: PeerId, blocksDelivery: seq[BlockDelivery]): Future[void] {.gcsafe.} =
self.sendBlocksDelivery(id, blocksDelivery) self.sendBlocksDelivery(id, blocksDelivery)
@ -354,6 +370,7 @@ proc new*(
self.request = BlockExcRequest( self.request = BlockExcRequest(
sendWantList: sendWantList, sendWantList: sendWantList,
sendWantCancellations: sendWantCancellations,
sendBlocksDelivery: sendBlocksDelivery, sendBlocksDelivery: sendBlocksDelivery,
sendPresence: sendPresence, sendPresence: sendPresence,
sendAccount: sendAccount, sendAccount: sendAccount,

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
## ##
import std/enumerate
import std/parseutils import std/parseutils
import std/options import std/options
@ -17,7 +18,7 @@ import ./utils/asyncheapqueue
import ./utils/fileutils import ./utils/fileutils
import ./utils/asynciter import ./utils/asynciter
export asyncheapqueue, fileutils, asynciter export asyncheapqueue, fileutils, asynciter, chronos
func divUp*[T: SomeInteger](a, b : T): T = func divUp*[T: SomeInteger](a, b : T): T =
@ -35,6 +36,24 @@ proc orElse*[A](a, b: Option[A]): Option[A] =
else: else:
b b
template findIt*(s, pred: untyped): untyped =
## Returns the index of the first object matching a predicate, or -1 if no
## object matches it.
runnableExamples:
type MyType = object
att: int
var s = @[MyType(att: 1), MyType(att: 2), MyType(att: 3)]
doAssert s.findIt(it.att == 2) == 1
doAssert s.findIt(it.att == 4) == -1
var index = -1
for i, it {.inject.} in enumerate(items(s)):
if pred:
index = i
break
index
when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine when not declared(parseDuration): # Odd code formatting to minimize diff v. mainLine
const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'} const Whitespace = {' ', '\t', '\v', '\r', '\l', '\f'}
@ -75,18 +94,3 @@ when not declared(parseDuration): # Odd code formatting to minimize diff v. main
result = start #..is no unit to the end of `s`. result = start #..is no unit to the end of `s`.
var sizeF = number * scale + 0.5 # Saturate to int64.high when too big var sizeF = number * scale + 0.5 # Saturate to int64.high when too big
size = seconds(int(sizeF)) size = seconds(int(sizeF))
when isMainModule:
import unittest2
suite "time parse":
test "parseDuration":
var res: Duration # caller must still know if 'b' refers to bytes|bits
check parseDuration("10Hr", res) == 3
check res == hours(10)
check parseDuration("64min", res) == 3
check res == minutes(64)
check parseDuration("7m/block", res) == 2 # '/' stops parse
check res == minutes(7) # 1 shl 30, forced binary metric
check parseDuration("3d", res) == 2 # '/' stops parse
check res == days(3) # 1 shl 30, forced binary metric

View File

@ -16,10 +16,6 @@ import ../../examples
import ../../helpers import ../../helpers
asyncchecksuite "NetworkStore engine - 2 nodes": asyncchecksuite "NetworkStore engine - 2 nodes":
let
chunker1 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb)
chunker2 = RandomChunker.new(Rng.instance(), size = 2048, chunkSize = 256'nb)
var var
nodeCmps1, nodeCmps2: NodesComponents nodeCmps1, nodeCmps2: NodesComponents
peerCtx1, peerCtx2: BlockExcPeerCtx peerCtx1, peerCtx2: BlockExcPeerCtx
@ -28,20 +24,8 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]]
setup: setup:
while true: blocks1 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
let chunk = await chunker1.getBytes() blocks2 = await makeRandomBlocks(datasetSize = 2048, blockSize = 256'nb)
if chunk.len <= 0:
break
blocks1.add(bt.Block.new(chunk).tryGet())
while true:
let chunk = await chunker2.getBytes()
if chunk.len <= 0:
break
blocks2.add(bt.Block.new(chunk).tryGet())
nodeCmps1 = generateNodes(1, blocks1)[0] nodeCmps1 = generateNodes(1, blocks1)[0]
nodeCmps2 = generateNodes(1, blocks2)[0] nodeCmps2 = generateNodes(1, blocks2)[0]
@ -180,42 +164,30 @@ asyncchecksuite "NetworkStore engine - 2 nodes":
check eventually wallet.balance(channel, Asset) > 0 check eventually wallet.balance(channel, Asset) > 0
asyncchecksuite "NetworkStore - multiple nodes": asyncchecksuite "NetworkStore - multiple nodes":
let
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256'nb)
var var
switch: seq[Switch] nodes: seq[NodesComponents]
networkStore: seq[NetworkStore]
blocks: seq[bt.Block] blocks: seq[bt.Block]
setup: setup:
while true: blocks = await makeRandomBlocks(datasetSize = 4096, blockSize = 256'nb)
let chunk = await chunker.getBytes() nodes = generateNodes(5)
if chunk.len <= 0: for e in nodes:
break
blocks.add(bt.Block.new(chunk).tryGet())
for e in generateNodes(5):
switch.add(e.switch)
networkStore.add(e.networkStore)
await e.engine.start() await e.engine.start()
await allFuturesThrowing( await allFuturesThrowing(
switch.mapIt( it.start() ) nodes.mapIt( it.switch.start() )
) )
teardown: teardown:
await allFuturesThrowing( await allFuturesThrowing(
switch.mapIt( it.stop() ) nodes.mapIt( it.switch.stop() )
) )
switch = @[] nodes = @[]
networkStore = @[]
test "Should receive blocks for own want list": test "Should receive blocks for own want list":
let let
downloader = networkStore[4] downloader = nodes[4].networkStore
engine = downloader.engine engine = downloader.engine
# Add blocks from 1st peer to want list # Add blocks from 1st peer to want list
@ -233,9 +205,9 @@ asyncchecksuite "NetworkStore - multiple nodes":
) )
for i in 0..15: for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(switch) await connectNodes(nodes)
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await allFuturesThrowing( await allFuturesThrowing(
@ -251,7 +223,7 @@ asyncchecksuite "NetworkStore - multiple nodes":
test "Should exchange blocks with multiple nodes": test "Should exchange blocks with multiple nodes":
let let
downloader = networkStore[4] downloader = nodes[4].networkStore
engine = downloader.engine engine = downloader.engine
# Add blocks from 1st peer to want list # Add blocks from 1st peer to want list
@ -264,9 +236,9 @@ asyncchecksuite "NetworkStore - multiple nodes":
) )
for i in 0..15: for i in 0..15:
(await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() (await nodes[i div 4].networkStore.engine.localStore.putBlock(blocks[i])).tryGet()
await connectNodes(switch) await connectNodes(nodes)
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
await allFuturesThrowing( await allFuturesThrowing(
@ -275,3 +247,47 @@ asyncchecksuite "NetworkStore - multiple nodes":
check pendingBlocks1.mapIt( it.read ) == blocks[0..3] check pendingBlocks1.mapIt( it.read ) == blocks[0..3]
check pendingBlocks2.mapIt( it.read ) == blocks[12..15] check pendingBlocks2.mapIt( it.read ) == blocks[12..15]
test "Should actively cancel want-haves if block received from elsewhere":
let
# Peer wanting to download blocks
downloader = nodes[4]
# Bystander peer - gets block request but can't satisfy them
bystander = nodes[3]
# Holder of actual blocks
blockHolder = nodes[1]
let aBlock = blocks[0]
(await blockHolder.engine.localStore.putBlock(aBlock)).tryGet()
await connectNodes(@[downloader, bystander])
# Downloader asks for block...
let blockRequest = downloader.engine.requestBlock(aBlock.cid)
# ... and bystander learns that downloader wants it, but can't provide it.
check eventually(
bystander
.engine
.peers
.get(downloader.switch.peerInfo.peerId)
.peerWants
.filterIt( it.address == aBlock.address )
.len == 1
)
# As soon as we connect the downloader to the blockHolder, the block should
# propagate to the downloader...
await connectNodes(@[downloader, blockHolder])
check (await blockRequest).cid == aBlock.cid
check (await downloader.engine.localStore.hasBlock(aBlock.cid)).tryGet()
# ... and the bystander should have cancelled the want-have
check eventually(
bystander
.engine
.peers
.get(downloader.switch.peerInfo.peerId)
.peerWants
.filterIt( it.address == aBlock.address )
.len == 0
)

View File

@ -142,6 +142,11 @@ asyncchecksuite "NetworkStore engine handlers":
localStore: BlockStore localStore: BlockStore
blocks: seq[Block] blocks: seq[Block]
const NopSendWantCancellationsProc = proc(
id: PeerId,
addresses: seq[BlockAddress]
) {.gcsafe, async.} = discard
setup: setup:
rng = Rng.instance() rng = Rng.instance()
chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb)
@ -275,6 +280,10 @@ asyncchecksuite "NetworkStore engine handlers":
let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
# Install NOP for want list cancellations so they don't cause a crash
engine.network = BlockExcNetwork(
request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc))
await engine.blocksDeliveryHandler(peerId, blocksDelivery) await engine.blocksDeliveryHandler(peerId, blocksDelivery)
let resolved = await allFinished(pending) let resolved = await allFinished(pending)
check resolved.mapIt( it.read ) == blocks check resolved.mapIt( it.read ) == blocks
@ -306,10 +315,14 @@ asyncchecksuite "NetworkStore engine handlers":
check receiver == peerId check receiver == peerId
check balances[account.address.toDestination] == amount check balances[account.address.toDestination] == amount
done.complete() done.complete(),
# Install NOP for want list cancellations so they don't cause a crash
sendWantCancellations: NopSendWantCancellationsProc
)) ))
await engine.blocksDeliveryHandler(peerId, blocks.mapIt(BlockDelivery(blk: it, address: it.address))) await engine.blocksDeliveryHandler(peerId, blocks.mapIt(
BlockDelivery(blk: it, address: it.address)))
await done.wait(100.millis) await done.wait(100.millis)
test "Should handle block presence": test "Should handle block presence":
@ -352,6 +365,30 @@ asyncchecksuite "NetworkStore engine handlers":
check a in peerCtx.peerHave check a in peerCtx.peerHave
check peerCtx.blocks[a].price == price check peerCtx.blocks[a].price == price
test "Should send cancellations for received blocks":
let
pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid))
blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address))
cancellations = newTable(
blocks.mapIt((it.address, newFuture[void]())).toSeq
)
proc sendWantCancellations(
id: PeerId,
addresses: seq[BlockAddress]
) {.gcsafe, async.} =
for address in addresses:
cancellations[address].complete()
engine.network = BlockExcNetwork(
request: BlockExcRequest(
sendWantCancellations: sendWantCancellations
))
await engine.blocksDeliveryHandler(peerId, blocksDelivery)
discard await allFinished(pending)
await allFuturesThrowing(cancellations.values().toSeq)
asyncchecksuite "Task Handler": asyncchecksuite "Task Handler":
var var
rng: Rng rng: Rng

View File

@ -106,6 +106,19 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest
return manifest return manifest
proc makeRandomBlocks*(
datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} =
var chunker = RandomChunker.new(Rng.instance(), size = datasetSize,
chunkSize = blockSize)
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
result.add(Block.new(chunk).tryGet())
proc corruptBlocks*( proc corruptBlocks*(
store: BlockStore, store: BlockStore,
manifest: Manifest, manifest: Manifest,

View File

@ -62,3 +62,6 @@ proc connectNodes*(nodes: seq[Switch]) {.async.} =
for node in nodes: for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId: if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.connect(node.peerInfo.peerId, node.peerInfo.addrs) await dialer.connect(node.peerInfo.peerId, node.peerInfo.addrs)
proc connectNodes*(nodes: seq[NodesComponents]) {.async.} =
await connectNodes(nodes.mapIt( it.switch ))

View File

@ -0,0 +1,36 @@
import std/unittest
import pkg/codex/utils
suite "findIt":
setup:
type AnObject = object
attribute1*: int
var objList = @[
AnObject(attribute1: 1),
AnObject(attribute1: 3),
AnObject(attribute1: 5),
AnObject(attribute1: 3),
]
test "should retur index of first object matching predicate":
assert objList.findIt(it.attribute1 == 3) == 1
test "should return -1 when no object matches predicate":
assert objList.findIt(it.attribute1 == 15) == -1
suite "parseDuration":
test "should parse durations":
var res: Duration # caller must still know if 'b' refers to bytes|bits
check parseDuration("10Hr", res) == 3
check res == hours(10)
check parseDuration("64min", res) == 3
check res == minutes(64)
check parseDuration("7m/block", res) == 2 # '/' stops parse
check res == minutes(7) # 1 shl 30, forced binary metric
check parseDuration("3d", res) == 2 # '/' stops parse
check res == days(3) # 1 shl 30, forced binary metric