diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 4ed2ad78..c79e1eab 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -130,16 +130,15 @@ proc stop*(b: BlockExcEngine) {.async.} = proc sendWantHave( b: BlockExcEngine, addresses: seq[BlockAddress], - excluded: seq[BlockExcPeerCtx], peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = for p in peers: - if p notin excluded: - let toAsk = addresses.filterIt(it notin p.peerHave) - trace "Sending wantHave request", toAsk, peer = p.id - await b.network.request.sendWantList( - p.id, - toAsk, - wantType = WantType.WantHave) + let toAsk = addresses.filterIt(it notin p.peerHave) + trace "Sending wantHave request", toAsk, peer = p.id + await b.network.request.sendWantList( + p.id, + toAsk, + wantType = WantType.WantHave) + codex_block_exchange_want_have_lists_sent.inc() proc sendWantBlock( b: BlockExcEngine, @@ -150,6 +149,7 @@ proc sendWantBlock( blockPeer.id, addresses, wantType = WantType.WantBlock) # we want this remote to send us a block + codex_block_exchange_want_block_lists_sent.inc() proc monitorBlockHandle( b: BlockExcEngine, @@ -175,6 +175,9 @@ proc monitorBlockHandle( await b.network.switch.disconnect(peerId) b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) +proc pickPseudoRandom(address: BlockAddress, peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + return peers[hash(address) mod peers.len] + proc requestBlock*( b: BlockExcEngine, address: BlockAddress, @@ -182,26 +185,17 @@ proc requestBlock*( let blockFuture = b.pendingBlocks.getWantHandle(address, b.blockFetchTimeout) if not b.pendingBlocks.isInFlight(address): - let peers = b.peers.selectCheapest(address) - if peers.len == 0: + let peers = b.peers.getPeersForBlock(address) + + if peers.with.len == 0: b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) - - let maybePeer = - if peers.len > 0: - peers[hash(address) mod peers.len].some - elif b.peers.len > 0: - toSeq(b.peers)[hash(address) mod b.peers.len].some - else: - BlockExcPeerCtx.none - - if peer =? maybePeer: - asyncSpawn b.monitorBlockHandle(blockFuture, address, peer.id) + else: + let selected = pickPseudoRandom(address, peers.with) + asyncSpawn b.monitorBlockHandle(blockFuture, address, selected.id) b.pendingBlocks.setInFlight(address) - # TODO: Send more block addresses if at all sensible. - await b.sendWantBlock(@[address], peer) - codex_block_exchange_want_block_lists_sent.inc() - await b.sendWantHave(@[address], @[peer], toSeq(b.peers)) - codex_block_exchange_want_have_lists_sent.inc() + await b.sendWantBlock(@[address], selected) + + await b.sendWantHave(@[address], peers.without) # Don't let timeouts bubble up. We can't be too broad here or we break # cancellations. @@ -246,7 +240,7 @@ proc blockPresenceHandler*( ) if wantCids.len > 0: - trace "Peer has blocks in our wantList", peer, wantCount = wantCids.len + trace "Peer has blocks in our wantList", peer, wants = wantCids await b.sendWantBlock(wantCids, peerCtx) # if none of the connected peers report our wants in their have list, @@ -276,7 +270,7 @@ proc scheduleTasks(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.asyn proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = ## Tells neighboring peers that we're no longer interested in a block. - trace "Sending block request cancellations to peers", addrs = addrs.len + trace "Sending block request cancellations to peers", addrs, peers = b.peers.mapIt($it.id) let failed = (await allFinished( b.peers.mapIt( @@ -342,13 +336,13 @@ proc blocksDeliveryHandler*( b: BlockExcEngine, peer: PeerId, blocksDelivery: seq[BlockDelivery]) {.async.} = - trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt($it.address)).join(",") + trace "Received blocks from peer", peer, blocks = (blocksDelivery.mapIt(it.address)) var validatedBlocksDelivery: seq[BlockDelivery] for bd in blocksDelivery: logScope: - peer = peer - address = bd.address + peer = peer + address = bd.address if err =? b.validateBlockDelivery(bd).errorOption: warn "Block validation failed", msg = err.msg @@ -390,11 +384,13 @@ proc wantListHandler*( wantList: WantList) {.async.} = let peerCtx = b.peers.get(peer) - if isNil(peerCtx): + + if peerCtx.isNil: return var presence: seq[BlockPresence] + schedulePeer = false for e in wantList.entries: let @@ -405,7 +401,7 @@ proc wantListHandler*( address = e.address wantType = $e.wantType - if idx < 0: # updating entry + if idx < 0: # Adding new entry to peer wants let have = await e.address in b.localStore price = @( @@ -413,24 +409,27 @@ proc wantListHandler*( .price.toBytesBE) if e.wantType == WantType.WantHave: - codex_block_exchange_want_have_lists_received.inc() + if have: + presence.add( + BlockPresence( + address: e.address, + `type`: BlockPresenceType.Have, + price: price)) + else: + if e.sendDontHave: + presence.add( + BlockPresence( + address: e.address, + `type`: BlockPresenceType.DontHave, + price: price)) + peerCtx.peerWants.add(e) - if not have and e.sendDontHave: - presence.add( - BlockPresence( - address: e.address, - `type`: BlockPresenceType.DontHave, - price: price)) - elif have and e.wantType == WantType.WantHave: - presence.add( - BlockPresence( - address: e.address, - `type`: BlockPresenceType.Have, - price: price)) + codex_block_exchange_want_have_lists_received.inc() elif e.wantType == WantType.WantBlock: peerCtx.peerWants.add(e) + schedulePeer = true codex_block_exchange_want_block_lists_received.inc() - else: + else: # Updating existing entry in peer wants # peer doesn't want this block anymore if e.cancel: peerCtx.peerWants.del(idx) @@ -443,8 +442,9 @@ proc wantListHandler*( trace "Sending presence to remote", items = presence.mapIt($it).join(",") await b.network.request.sendPresence(peer, presence) - if not b.scheduleTask(peerCtx): - warn "Unable to schedule task for peer", peer + if schedulePeer: + if not b.scheduleTask(peerCtx): + warn "Unable to schedule task for peer", peer proc accountHandler*( engine: BlockExcEngine, @@ -555,7 +555,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = updateInFlight(failedAddresses, false) if blocksDelivery.len > 0: - trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt($it.address)).join(",") + trace "Sending blocks to peer", peer = task.id, blocks = (blocksDelivery.mapIt(it.address)) await b.network.request.sendBlocksDelivery( task.id, blocksDelivery diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index a64ecd22..4b65d849 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -32,6 +32,9 @@ logScope: type PeerCtxStore* = ref object of RootObj peers*: OrderedTable[PeerId, BlockExcPeerCtx] + PeersForBlock* = object of RootObj + with*: seq[BlockExcPeerCtx] + without*: seq[BlockExcPeerCtx] iterator items*(self: PeerCtxStore): BlockExcPeerCtx = for p in self.peers.values: @@ -70,32 +73,14 @@ func peersWant*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] func peersWant*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = toSeq(self.peers.values).filterIt( it.peerWants.anyIt( it.address.cidOrTreeCid == cid ) ) -func selectCheapest*(self: PeerCtxStore, address: BlockAddress): seq[BlockExcPeerCtx] = - # assume that the price for all leaves in a tree is the same - let rootAddress = BlockAddress(leaf: false, cid: address.cidOrTreeCid) - var peers = self.peersHave(rootAddress) - - func cmp(a, b: BlockExcPeerCtx): int = - var - priceA = 0.u256 - priceB = 0.u256 - - a.blocks.withValue(rootAddress, precense): - priceA = precense[].price - - b.blocks.withValue(rootAddress, precense): - priceB = precense[].price - - if priceA == priceB: - 0 - elif priceA > priceB: - 1 +proc getPeersForBlock*(self: PeerCtxStore, address: BlockAddress): PeersForBlock = + var res = PeersForBlock() + for peer in self: + if peer.peerHave.anyIt( it == address ): + res.with.add(peer) else: - -1 - - peers.sort(cmp) - trace "Selected cheapest peers", peers = peers.len - return peers + res.without.add(peer) + res proc new*(T: type PeerCtxStore): PeerCtxStore = ## create new instance of a peer context store diff --git a/tests/codex/blockexchange/testpeerctxstore.nim b/tests/codex/blockexchange/testpeerctxstore.nim index 6d7a1af3..dc77fbd8 100644 --- a/tests/codex/blockexchange/testpeerctxstore.nim +++ b/tests/codex/blockexchange/testpeerctxstore.nim @@ -69,27 +69,6 @@ checksuite "Peer Context Store Peer Selection": check peerCtxs[0] in peers check peerCtxs[5] in peers - test "Should select cheapest peers for Cid": - peerCtxs[0].blocks = collect(initTable): - for i, a in addresses: - { a: Presence(address: a, price: (5 + i).u256) } - - peerCtxs[5].blocks = collect(initTable): - for i, a in addresses: - { a: Presence(address: a, price: (2 + i).u256) } - - peerCtxs[9].blocks = collect(initTable): - for i, a in addresses: - { a: Presence(address: a, price: i.u256) } - - let - peers = store.selectCheapest(addresses[0]) - - check peers.len == 3 - check peers[0] == peerCtxs[9] - check peers[1] == peerCtxs[5] - check peers[2] == peerCtxs[0] - test "Should select peers that want Cid": let entries = addresses.mapIt( @@ -109,3 +88,19 @@ checksuite "Peer Context Store Peer Selection": check peers.len == 2 check peerCtxs[0] in peers check peerCtxs[5] in peers + + test "Should return peers with and without block": + let address = addresses[2] + + peerCtxs[1].blocks[address] = Presence(address: address, price: 0.u256) + peerCtxs[2].blocks[address] = Presence(address: address, price: 0.u256) + + let peers = store.getPeersForBlock(address) + + for i, pc in peerCtxs: + if i == 1 or i == 2: + check pc in peers.with + check pc notin peers.without + else: + check pc notin peers.with + check pc in peers.without diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim index 26397f6d..f9af76e5 100644 --- a/tests/integration/testsales.nim +++ b/tests/integration/testsales.nim @@ -19,7 +19,7 @@ multinodesuite "Sales": clients: CodexConfigs.init(nodes=1).some, providers: CodexConfigs.init(nodes=1).some, ) - + var host: CodexClient var client: CodexClient