From 5abf80cc69c818a1a24cf3ce686fc698214774f6 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 15 Nov 2022 09:46:21 -0600 Subject: [PATCH] Block download (#304) * track inflight requests * preperly handle precense updates * trace number of of scheduled blocks * invoke `payForBlocks` at the correct time * reduntant block info on want list updates * don't update prices in task handler * PeerID -> PeerId * cleanup * proper log topic * better chronicles topic filtering * more trace logging * sort want blocks * wip - fix tests * wip - fix tests, presence changes * fix small test issue * return price * payment related changes * misc * re-enable payment tests * fix warn wording * fix `u256` conversion * minor misc changes * don't idle for so long on `encode` * logging * move buff * disable cache by default * disable cache by default * fix streamOneBlock * log node stopping/exiting * trace logging * don't stringify cid * use `self` * quick cleanup * rename enums * rename enums * turns out we don't needs this test * fix wording --- codex.nim | 5 +- codex/blockexchange/engine/discovery.nim | 2 +- codex/blockexchange/engine/engine.nim | 249 +++++++++++------- codex/blockexchange/engine/pendingblocks.nim | 11 +- codex/blockexchange/network/network.nim | 19 +- codex/blockexchange/network/networkpeer.nim | 2 +- codex/blockexchange/peers/peercontext.nim | 45 ++-- codex/blockexchange/peers/peerctxstore.nim | 29 +- codex/blockexchange/protobuf/message.nim | 8 +- codex/blockexchange/protobuf/presence.nim | 19 +- codex/chunker.nim | 2 +- codex/codex.nim | 4 +- codex/conf.nim | 6 +- codex/erasure/erasure.nim | 2 +- codex/node.nim | 39 +-- codex/stores/cachestore.nim | 2 +- codex/stores/fsstore.nim | 5 +- codex/streams/storestream.nim | 4 +- .../blockexchange/discovery/testdiscovery.nim | 5 +- .../discovery/testdiscoveryengine.nim | 3 +- .../blockexchange/engine/testblockexc.nim | 105 +++++--- .../codex/blockexchange/engine/testengine.nim | 135 +++++++--- .../blockexchange/protobuf/testpresence.nim | 8 +- tests/codex/blockexchange/testnetwork.nim | 12 +- .../codex/blockexchange/testpeerctxstore.nim | 23 +- 25 files changed, 477 insertions(+), 267 deletions(-) diff --git a/codex.nim b/codex.nim index af39ba03..528e48d2 100644 --- a/codex.nim +++ b/codex.nim @@ -43,7 +43,7 @@ when isMainModule: quit QuitFailure if config.nat == ValidIpAddress.init("127.0.0.1"): - warn "`--nat` is set to local loopback, your node wont be properly announce over the DHT" + warn "`--nat` is set to loopback, your node wont properly announce over the DHT" if not(checkAndCreateDataDir((config.dataDir).string)): # We are unable to access/create data folder or data folder's @@ -89,9 +89,12 @@ when isMainModule: proc SIGTERMHandler(signal: cint) {.noconv.} = notice "Shutting down after having received SIGTERM" waitFor server.stop() + notice "Stopped Codex" c_signal(ansi_c.SIGTERM, SIGTERMHandler) waitFor server.start() + notice "Exited codex" + of StartUpCommand.initNode: discard diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index c05c0034..aef9a4bc 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -27,7 +27,7 @@ import ../../stores/blockstore import ./pendingblocks logScope: - topics = "codex discovery engine" + topics = "codex discoveryengine" declareGauge(codex_inflight_discovery, "inflight discovery requests") diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 3c9c2bdd..91a3e8d4 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -15,6 +15,7 @@ import std/algorithm import pkg/chronos import pkg/chronicles import pkg/libp2p +import pkg/stint import ../../stores/blockstore import ../../blocktype as bt @@ -33,7 +34,7 @@ import ./pendingblocks export peers, pendingblocks, payments, discovery logScope: - topics = "codex blockexc engine" + topics = "codex blockexcengine" const DefaultMaxPeersPerRequest* = 10 @@ -122,9 +123,10 @@ proc requestBlock*( ## Request a block from remotes ## - trace "Requesting block", cid + trace "Requesting block", cid, peers = b.peers.len - if cid in b.pendingBlocks: + if b.pendingBlocks.isInFlight(cid): + trace "Request handle already pending", cid return await b.pendingBlocks.getWantHandle(cid, timeout) let @@ -134,6 +136,7 @@ proc requestBlock*( peers = b.peers.selectCheapest(cid) if peers.len <= 0: + trace "No cheapest peers, selecting first in list", cid peers = toSeq(b.peers) # Get any peer if peers.len <= 0: trace "No peers to request blocks from", cid @@ -143,40 +146,67 @@ proc requestBlock*( let blockPeer = peers[0] # get cheapest + proc blockHandleMonitor() {.async.} = + try: + trace "Monigoring block handle", cid + b.pendingBlocks.setInFlight(cid, true) + discard await blk + trace "Block handle success", cid + except CatchableError as exc: + trace "Error block handle, disconnecting peer", cid, exc = exc.msg + + # TODO: really, this is just a quick and dirty way of + # preventing hitting the same "bad" peer every time, however, + # we might as well discover this on or next iteration, so + # it doesn't mean that we're never talking to this peer again. + # TODO: we need a lot more work around peer selection and + # prioritization + + # drop unresponsive peer + await b.network.switch.disconnect(blockPeer.id) + + trace "Sending block request to peer", peer = blockPeer.id, cid + + # monitor block handle + asyncSpawn blockHandleMonitor() + # request block await b.network.request.sendWantList( blockPeer.id, @[cid], - wantType = WantType.wantBlock) # we want this remote to send us a block + wantType = WantType.WantBlock) # we want this remote to send us a block if (peers.len - 1) == 0: - trace "Not enough peers to send want list to", cid + trace "No peers to send want list to", cid b.discovery.queueFindBlocksReq(@[cid]) return await blk # no peers to send wants to # filter out the peer we've already requested from - let stop = min(peers.high, b.peersPerRequest) - trace "Sending want list requests to remaining peers", count = stop + 1 - for p in peers[1..stop]: + let remaining = peers[1..min(peers.high, b.peersPerRequest)] + trace "Sending want list to remaining peers", count = remaining.len + for p in remaining: if cid notin p.peerHave: # just send wants await b.network.request.sendWantList( p.id, @[cid], - wantType = WantType.wantHave) # we only want to know if the peer has the block + wantType = WantType.WantHave) # we only want to know if the peer has the block return await blk proc blockPresenceHandler*( b: BlockExcEngine, - peer: PeerID, + peer: PeerId, blocks: seq[BlockPresence]) {.async.} = ## Handle block presence ## - trace "Received presence update for peer", peer - let peerCtx = b.peers.get(peer) - if isNil(peerCtx): + trace "Received presence update for peer", peer, blocks = blocks.len + let + peerCtx = b.peers.get(peer) + wantList = toSeq(b.pendingBlocks.wantList) + + if peerCtx.isNil: return for blk in blocks: @@ -187,19 +217,29 @@ proc blockPresenceHandler*( price = presence.price trace "Updating precense" - peerCtx.updatePresence(presence) + peerCtx.setPresence(presence) - var - cids = toSeq(b.pendingBlocks.wantList).filterIt( - it in peerCtx.peerHave + let + peerHave = peerCtx.peerHave + dontWantCids = peerHave.filterIt( + it notin wantList ) - trace "Received presence update for cids", peer, count = cids.len - if cids.len > 0: - await b.network.request.sendWantList( - peer, - cids, - wantType = WantType.wantBlock) # we want this remote to send us a block + if dontWantCids.len > 0: + trace "Cleaning peer haves", peer, count = dontWantCids.len + peerCtx.cleanPresence(dontWantCids) + + trace "Peer want/have", items = peerHave.len, wantList = wantList.len + let + wantCids = wantList.filterIt( + it in peerHave + ) + + if wantCids.len > 0: + trace "Getting blocks based on updated precense", peer, count = wantCids.len + discard await allFinished( + wantCids.mapIt(b.requestBlock(it))) + trace "Requested blocks based on updated precense", peer, count = wantCids.len # if none of the connected peers report our wants in their have list, # fire up discovery @@ -209,7 +249,7 @@ proc blockPresenceHandler*( not b.peers.anyIt( cid in it.peerHave )) proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = - trace "Schedule a task for new blocks" + trace "Schedule a task for new blocks", items = blocks.len let cids = blocks.mapIt( it.cid ) @@ -242,17 +282,19 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, blocks: seq[bt.Block]) {.async.} = - let sendPayment = engine.network.request.sendPayment - if sendPayment.isNil: - return + trace "Paying for blocks", blocks = blocks.len - let cids = blocks.mapIt(it.cid) - if payment =? engine.wallet.pay(peer, peer.price(cids)): + let + sendPayment = engine.network.request.sendPayment + price = peer.price(blocks.mapIt(it.cid)) + + if payment =? engine.wallet.pay(peer, price): + trace "Sending payment for blocks", price await sendPayment(peer.id, payment) proc blocksHandler*( b: BlockExcEngine, - peer: PeerID, + peer: PeerId, blocks: seq[bt.Block]) {.async.} = ## handle incoming blocks ## @@ -263,59 +305,85 @@ proc blocksHandler*( trace "Unable to store block", cid = blk.cid await b.resolveBlocks(blocks) - let peerCtx = b.peers.get(peer) + let + peerCtx = b.peers.get(peer) + if peerCtx != nil: + # we don't care about this blocks anymore, lets cleanup the list await b.payForBlocks(peerCtx, blocks) + peerCtx.cleanPresence(blocks.mapIt( it.cid )) proc wantListHandler*( b: BlockExcEngine, - peer: PeerID, + peer: PeerId, wantList: WantList) {.async.} = ## Handle incoming want lists ## - trace "Got want list for peer", peer + trace "Got want list for peer", peer, items = wantList.entries.len let peerCtx = b.peers.get(peer) if isNil(peerCtx): return - var dontHaves: seq[Cid] - let entries = wantList.entries - for e in entries: - let idx = peerCtx.peerWants.find(e) - if idx > -1: + var + precense: seq[BlockPresence] + + for e in wantList.entries: + let + idx = peerCtx.peerWants.find(e) + + logScope: + peer = peerCtx.id + cid = e.cid + wantType = $e.wantType + + if idx < 0: # updating entry + trace "Processing new want list entry", cid = e.cid + + let + have = await e.cid in b.localStore + price = @( + b.pricing.get(Pricing(price: 0.u256)) + .price.toBytesBE) + + if not have and e.sendDontHave: + trace "Adding dont have entry to precense response", cid = e.cid + precense.add( + BlockPresence( + cid: e.cid.data.buffer, + `type`: BlockPresenceType.DontHave, + price: price)) + elif have and e.wantType == WantType.WantHave: + trace "Adding have entry to precense response", cid = e.cid + precense.add( + BlockPresence( + cid: e.cid.data.buffer, + `type`: BlockPresenceType.Have, + price: price)) + elif e.wantType == WantType.WantBlock: + trace "Added entry to peer's want blocks list", cid = e.cid + peerCtx.peerWants.add(e) + else: # peer doesn't want this block anymore if e.cancel: + trace "Removing entry from peer want list" peerCtx.peerWants.del(idx) - continue + else: + trace "Updating entry in peer want list" + # peer might want to ask for the same cid with + # different want params + peerCtx.peerWants[idx] = e # update entry - peerCtx.peerWants[idx] = e # update entry - else: - peerCtx.peerWants.add(e) - - trace "Added entry to peer's want list", peer = peerCtx.id, cid = $e.cid - - # peer might want to ask for the same cid with - # different want params - if e.sendDontHave: - if not(await e.cid in b.localStore): - dontHaves.add(e.cid) - - # send don't have's to remote - if dontHaves.len > 0: - await b.network.request.sendPresence( - peer, - dontHaves.mapIt( - BlockPresence( - cid: it.data.buffer, - `type`: BlockPresenceType.presenceDontHave))) + if precense.len > 0: + trace "Sending precense to remote", items = precense.len + await b.network.request.sendPresence(peer, precense) if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer proc accountHandler*( engine: BlockExcEngine, - peer: PeerID, + peer: PeerId, account: Account) {.async.} = let context = engine.peers.get(peer) if context.isNil: @@ -327,8 +395,11 @@ proc paymentHandler*( engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = + trace "Handling payments", peer + without context =? engine.peers.get(peer).option and account =? context.account: + trace "No context or account for peer", peer return if channel =? context.paymentChannel: @@ -337,16 +408,17 @@ proc paymentHandler*( else: context.paymentChannel = engine.wallet.acceptChannel(payment).option -proc setupPeer*(b: BlockExcEngine, peer: PeerID) {.async.} = +proc setupPeer*(b: BlockExcEngine, peer: PeerId) {.async.} = ## Perform initial setup, such as want ## list exchange ## - trace "Setting up new peer", peer if peer notin b.peers: + trace "Setting up new peer", peer b.peers.add(BlockExcPeerCtx( id: peer )) + trace "Added peer", peers = b.peers.len # broadcast our want list, the other peer will do the same if b.pendingBlocks.len > 0: @@ -356,7 +428,7 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) {.async.} = if address =? b.pricing.?address: await b.network.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(b: BlockExcEngine, peer: PeerID) = +proc dropPeer*(b: BlockExcEngine, peer: PeerId) = ## Cleanup disconnected peer ## @@ -368,25 +440,32 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id - # PART 1: Send to the peer blocks he wants to get, + # Send to the peer blocks he wants to get, # if they present in our local store # TODO: There should be all sorts of accounting of # bytes sent/received here - var wantsBlocks = task.peerWants.filterIt(it.wantType == WantType.wantBlock) + var + wantsBlocks = task.peerWants.filterIt( + it.wantType == WantType.WantBlock + ) if wantsBlocks.len > 0: + trace "Got peer want blocks list", items = wantsBlocks.len + wantsBlocks.sort(SortOrder.Descending) - let blockFuts = await allFinished(wantsBlocks.mapIt( + let + blockFuts = await allFinished(wantsBlocks.mapIt( b.localStore.getBlock(it.cid) - )) + )) - # Extract succesfully received blocks - let blocks = blockFuts - .filterIt(it.completed and it.read.isOk) - .mapIt(it.read.get) + # Extract successfully received blocks + let + blocks = blockFuts + .filterIt(it.completed and it.read.isOk) + .mapIt(it.read.get) if blocks.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = blocks.len @@ -394,29 +473,13 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = task.id, blocks) + trace "About to remove entries from peerWants", blocks = blocks.len, items = task.peerWants.len # Remove successfully sent blocks task.peerWants.keepIf( proc(e: Entry): bool = not blocks.anyIt( it.cid == e.cid ) ) - - - # PART 2: Send to the peer prices of the blocks he wants to discover, - # if they present in our local store - - var wants: seq[BlockPresence] - # do not remove wants from the queue unless - # we send the block or get a cancel - for e in task.peerWants: - if e.wantType == WantType.wantHave: - var presence = Presence(cid: e.cid) - presence.have = await (presence.cid in b.localStore) - if presence.have and price =? b.pricing.?price: - presence.price = price - wants.add(BlockPresence.init(presence)) - - if wants.len > 0: - await b.network.request.sendPresence(task.id, wants) + trace "Removed entries from peerWants", items = task.peerWants.len proc blockexcTaskRunner(b: BlockExcEngine) {.async.} = ## process tasks @@ -455,7 +518,7 @@ proc new*( taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), discovery: discovery) - proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = + proc peerEventHandler(peerId: PeerId, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: await engine.setupPeer(peerId) else: @@ -466,17 +529,17 @@ proc new*( network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) proc blockWantListHandler( - peer: PeerID, + peer: PeerId, wantList: WantList): Future[void] {.gcsafe.} = engine.wantListHandler(peer, wantList) proc blockPresenceHandler( - peer: PeerID, + peer: PeerId, presence: seq[BlockPresence]): Future[void] {.gcsafe.} = engine.blockPresenceHandler(peer, presence) proc blocksHandler( - peer: PeerID, + peer: PeerId, blocks: seq[bt.Block]): Future[void] {.gcsafe.} = engine.blocksHandler(peer, blocks) diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index c1d32cf5..dd24b1d5 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -21,7 +21,7 @@ import pkg/libp2p import ../../blocktype logScope: - topics = "codex blockexc pendingblocks" + topics = "codex pendingblocks" const DefaultBlockTimeout* = 10.minutes @@ -48,7 +48,7 @@ proc getWantHandle*( handle: newFuture[Block]("pendingBlocks.getWantHandle"), inFlight: inFlight) - trace "Adding pending future for block", cid + trace "Adding pending future for block", cid, inFlight = p.blocks[cid].inFlight return await p.blocks[cid].handle.wait(timeout) except CancelledError as exc: @@ -76,15 +76,18 @@ proc resolve*( proc setInFlight*( p: PendingBlocksManager, - cid: Cid) = + cid: Cid, + inFlight = true) = p.blocks.withValue(cid, pending): - pending[].inFlight = true + pending[].inFlight = inFlight + trace "Setting inflight", cid, inFlight = pending[].inFlight proc isInFlight*( p: PendingBlocksManager, cid: Cid): bool = p.blocks.withValue(cid, pending): result = pending[].inFlight + trace "Getting inflight", cid, inFlight = result proc pending*( p: PendingBlocksManager, diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 4960a2fc..ee835fd5 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -27,7 +27,7 @@ import ./networkpeer export network, payments logScope: - topics = "codex blockexc network" + topics = "codex blockexcnetwork" const Codec* = "/codex/blockexc/1.0.0" @@ -44,7 +44,7 @@ type cids: seq[Cid], priority: int32 = 0, cancel: bool = false, - wantType: WantType = WantType.wantHave, + wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false): Future[void] {.gcsafe.} @@ -105,7 +105,7 @@ proc makeWantList*( cids: seq[Cid], priority: int = 0, cancel: bool = false, - wantType: WantType = WantType.wantHave, + wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false): WantList = WantList( @@ -124,7 +124,7 @@ proc sendWantList*( cids: seq[Cid], priority: int32 = 0, cancel: bool = false, - wantType: WantType = WantType.wantHave, + wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false): Future[void] = ## Send a want message to peer @@ -332,17 +332,18 @@ proc new*( ## Create a new BlockExcNetwork instance ## - let self = BlockExcNetwork( - switch: switch, - getConn: connProvider, - inflightSema: newAsyncSemaphore(maxInflight)) + let + self = BlockExcNetwork( + switch: switch, + getConn: connProvider, + inflightSema: newAsyncSemaphore(maxInflight)) proc sendWantList( id: PeerID, cids: seq[Cid], priority: int32 = 0, cancel: bool = false, - wantType: WantType = WantType.wantHave, + wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false): Future[void] {.gcsafe.} = self.sendWantList( diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 33d233fd..cef58bc5 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -18,7 +18,7 @@ import ../protobuf/blockexc import ../../errors logScope: - topics = "codex blockexc networkpeer" + topics = "codex blockexcnetworkpeer" const MaxMessageSize = 100 * 1 shl 20 # manifest files can be big diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index 9aab35b2..b9504c90 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -9,6 +9,8 @@ import std/sequtils import std/tables + +import pkg/chronicles import pkg/libp2p import pkg/chronos import pkg/nitro @@ -20,35 +22,40 @@ import ../protobuf/presence export payments, nitro +logScope: + topics = "codex peercontext" + type BlockExcPeerCtx* = ref object of RootObj id*: PeerID - peerPrices*: Table[Cid, UInt256] # remote peer have list including price + blocks*: Table[Cid, Presence] # remote peer have list including price peerWants*: seq[Entry] # remote peers want lists exchanged*: int # times peer has exchanged with us lastExchange*: Moment # last time peer has exchanged with us account*: ?Account # ethereum account of this peer paymentChannel*: ?ChannelId # payment channel id -proc peerHave*(context: BlockExcPeerCtx): seq[Cid] = - toSeq(context.peerPrices.keys) +proc peerHave*(self: BlockExcPeerCtx): seq[Cid] = + toSeq(self.blocks.keys) -proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool = - ## Convenience method to check for peer prepense - ## +proc contains*(self: BlockExcPeerCtx, cid: Cid): bool = + cid in self.blocks - a.anyIt( it.id == b ) +func setPresence*(self: BlockExcPeerCtx, presence: Presence) = + self.blocks[presence.cid] = presence -func updatePresence*(context: BlockExcPeerCtx, presence: Presence) = - let cid = presence.cid - let price = presence.price - - if cid notin context.peerHave and presence.have: - context.peerPrices[cid] = price - elif cid in context.peerHave and not presence.have: - context.peerPrices.del(cid) - -func price*(context: BlockExcPeerCtx, cids: seq[Cid]): UInt256 = +func cleanPresence*(self: BlockExcPeerCtx, cids: seq[Cid]) = for cid in cids: - if price =? context.peerPrices.?[cid]: - result += price + self.blocks.del(cid) + +func cleanPresence*(self: BlockExcPeerCtx, cid: Cid) = + self.cleanPresence(@[cid]) + +func price*(self: BlockExcPeerCtx, cids: seq[Cid]): UInt256 = + var price = 0.u256 + for cid in cids: + self.blocks.withValue(cid, precense): + price += precense[].price + + trace "Blocks price", price + price diff --git a/codex/blockexchange/peers/peerctxstore.nim b/codex/blockexchange/peers/peerctxstore.nim index 442dd206..3d17fdbe 100644 --- a/codex/blockexchange/peers/peerctxstore.nim +++ b/codex/blockexchange/peers/peerctxstore.nim @@ -25,7 +25,7 @@ import ./peercontext export peercontext logScope: - topics = "codex blockexc peerctxstore" + topics = "codex peerctxstore" type PeerCtxStore* = ref object of RootObj @@ -35,6 +35,12 @@ iterator items*(self: PeerCtxStore): BlockExcPeerCtx = for p in self.peers.values: yield p +proc contains*(a: openArray[BlockExcPeerCtx], b: PeerID): bool = + ## Convenience method to check for peer precense + ## + + a.anyIt( it.id == b ) + func contains*(self: PeerCtxStore, peerId: PeerID): bool = peerId in self.peers @@ -63,13 +69,21 @@ func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = var peers = self.peersHave(cid) + trace "Selecting cheapest peers", peers = peers.len func cmp(a, b: BlockExcPeerCtx): int = - # Can't do (a - b) without cast[int](a - b) - if a.peerPrices.getOrDefault(cid, 0.u256) == - b.peerPrices.getOrDefault(cid, 0.u256): + var + priceA = 0.u256 + priceB = 0.u256 + + a.blocks.withValue(cid, precense): + priceA = precense[].price + + b.blocks.withValue(cid, precense): + priceB = precense[].price + + if priceA == priceB: 0 - elif a.peerPrices.getOrDefault(cid, 0.u256) > - b.peerPrices.getOrDefault(cid, 0.u256): + elif priceA > priceB: 1 else: -1 @@ -79,4 +93,5 @@ func selectCheapest*(self: PeerCtxStore, cid: Cid): seq[BlockExcPeerCtx] = return peers proc new*(T: type PeerCtxStore): PeerCtxStore = - T(peers: initOrderedTable[PeerID, BlockExcPeerCtx]()) + T( + peers: initOrderedTable[PeerID, BlockExcPeerCtx]()) diff --git a/codex/blockexchange/protobuf/message.nim b/codex/blockexchange/protobuf/message.nim index a8849aae..e3d44155 100644 --- a/codex/blockexchange/protobuf/message.nim +++ b/codex/blockexchange/protobuf/message.nim @@ -8,8 +8,8 @@ import pkg/libp2p/protobuf/minprotobuf type WantType* = enum - wantBlock = 0, - wantHave = 1 + WantBlock = 0, + WantHave = 1 Entry* = object `block`*: seq[byte] # The block cid @@ -27,8 +27,8 @@ type data*: seq[byte] BlockPresenceType* = enum - presenceHave = 0, - presenceDontHave = 1 + Have = 0, + DontHave = 1 BlockPresence* = object cid*: seq[byte] # The block cid diff --git a/codex/blockexchange/protobuf/presence.nim b/codex/blockexchange/protobuf/presence.nim index cc53edcb..1a1c6c5c 100644 --- a/codex/blockexchange/protobuf/presence.nim +++ b/codex/blockexchange/protobuf/presence.nim @@ -18,13 +18,6 @@ type have*: bool price*: UInt256 -func init*(_: type PresenceMessage, presence: Presence): PresenceMessage = - PresenceMessage( - cid: presence.cid.data.buffer, - `type`: if presence.have: presenceHave else: presenceDontHave, - price: @(presence.price.toBytesBE) - ) - func parse(_: type UInt256, bytes: seq[byte]): ?UInt256 = if bytes.len > 32: return UInt256.none @@ -37,6 +30,16 @@ func init*(_: type Presence, message: PresenceMessage): ?Presence = some Presence( cid: cid, - have: message.`type` == presenceHave, + have: message.`type` == BlockPresenceType.Have, price: price ) + +func init*(_: type PresenceMessage, presence: Presence): PresenceMessage = + PresenceMessage( + cid: presence.cid.data.buffer, + `type`: if presence.have: + BlockPresenceType.Have + else: + BlockPresenceType.DontHave, + price: @(presence.price.toBytesBE) + ) diff --git a/codex/chunker.nim b/codex/chunker.nim index 4f25214e..9d4d0fd2 100644 --- a/codex/chunker.nim +++ b/codex/chunker.nim @@ -57,7 +57,7 @@ proc getBytes*(c: Chunker): Future[seq[byte]] {.async.} = if not c.pad and buff.len > read: buff.setLen(read) - return buff + return move buff func new*( T: type Chunker, diff --git a/codex/codex.nim b/codex/codex.nim index b3753b44..a6273785 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -75,7 +75,7 @@ proc start*(s: CodexServer) {.async.} = s.codexNode.discovery.updateAnnounceRecord(announceAddrs) s.codexNode.discovery.updateDhtRecord(s.config.nat, s.config.discoveryPort) - s.runHandle = newFuture[void]() + s.runHandle = newFuture[void]("codex.runHandle") await s.runHandle proc stop*(s: CodexServer) {.async.} = @@ -116,7 +116,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): .build() var - cache: CacheStore + cache: CacheStore = nil if config.cacheSize > 0: cache = CacheStore.new(cacheSize = config.cacheSize * MiB) diff --git a/codex/conf.nim b/codex/conf.nim index 9c1a966f..2e211d99 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -143,9 +143,9 @@ type abbr: "p" }: int cacheSize* {. - desc: "The size in MiB of the block cache, 0 disables the cache" - defaultValue: DefaultCacheSizeMiB - defaultValueDesc: $DefaultCacheSizeMiB + desc: "The size in MiB of the block cache, 0 disables the cache - might help on slow hardrives" + defaultValue: 0 + defaultValueDesc: "0" name: "cache-size" abbr: "c" }: Natural diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index e1252b43..ccc793d5 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -109,7 +109,7 @@ proc encode*( # TODO: this is a tight blocking loop so we sleep here to allow # other events to be processed, this should be addressed # by threading - await sleepAsync(100.millis) + await sleepAsync(10.millis) for j in 0.. 0 @@ -194,15 +217,24 @@ suite "NetworkStore - multiple nodes": switch = @[] networkStore = @[] - test "Should receive haves for own want list": + test "Should receive blocks for own want list": let downloader = networkStore[4] engine = downloader.engine # Add blocks from 1st peer to want list let - pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) - pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) + downloadCids = + blocks[0..3].mapIt( + it.cid + ) & + blocks[12..15].mapIt( + it.cid + ) + + pendingBlocks = downloadCids.mapIt( + engine.pendingBlocks.getWantHandle( it ) + ) for i in 0..15: (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() @@ -211,18 +243,15 @@ suite "NetworkStore - multiple nodes": await sleepAsync(1.seconds) await allFuturesThrowing( - allFinished(pendingBlocks1), - allFinished(pendingBlocks2)) - - let - peers = toSeq(engine.peers) + allFinished(pendingBlocks)) check: - peers[0].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) - - peers[3].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) + (await allFinished( + downloadCids.mapIt( + downloader.localStore.getBlock( it ) ))) + .filterIt( it.completed and it.read.isOk ) + .mapIt( $it.read.get.cid ).sorted(cmp[string]) == + downloadCids.mapIt( $it ).sorted(cmp[string]) test "Should exchange blocks with multiple nodes": let @@ -231,8 +260,12 @@ suite "NetworkStore - multiple nodes": # Add blocks from 1st peer to want list let - pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) - pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) + pendingBlocks1 = blocks[0..3].mapIt( + engine.pendingBlocks.getWantHandle( it.cid ) + ) + pendingBlocks2 = blocks[12..15].mapIt( + engine.pendingBlocks.getWantHandle( it.cid ) + ) for i in 0..15: (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 7a360597..212dc8c3 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -58,7 +58,7 @@ suite "NetworkStore engine basic": cids: seq[Cid], priority: int32 = 0, cancel: bool = false, - wantType: WantType = WantType.wantHave, + wantType: WantType = WantType.WantHave, full: bool = false, sendDontHave: bool = false) {.gcsafe, async.} = check cids.mapIt($it).sorted == blocks.mapIt( $it.cid ).sorted @@ -183,26 +183,55 @@ suite "NetworkStore engine handlers": id: peerId ) engine.peers.add(peerCtx) - done = newFuture[void]() - test "Should handle want list": - let wantList = makeWantList(blocks.mapIt( it.cid )) + test "Should schedule block requests": + let + wantList = makeWantList( + blocks.mapIt( it.cid ), + wantType = WantType.WantBlock) # only `wantBlock` are stored in `peerWants` + proc handler() {.async.} = let ctx = await engine.taskQueue.pop() check ctx.id == peerId + # only `wantBlock` scheduled check ctx.peerWants.mapIt( it.cid ) == blocks.mapIt( it.cid ) let done = handler() await engine.wantListHandler(peerId, wantList) await done + test "Should handle want list": + let + done = newFuture[void]() + wantList = makeWantList(blocks.mapIt( it.cid )) + + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = + check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) + done.complete() + + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendPresence: sendPresence + )) + + await allFuturesThrowing( + allFinished(blocks.mapIt( localStore.putBlock(it) ))) + + await engine.wantListHandler(peerId, wantList) + await done + test "Should handle want list - `dont-have`": - let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) + let + done = newFuture[void]() + wantList = makeWantList( + blocks.mapIt( it.cid ), + sendDontHave = true) + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = check presence.mapIt( it.cid ) == wantList.entries.mapIt( it.`block` ) for p in presence: check: - p.`type` == BlockPresenceType.presenceDontHave + p.`type` == BlockPresenceType.DontHave done.complete() @@ -211,21 +240,31 @@ suite "NetworkStore engine handlers": )) await engine.wantListHandler(peerId, wantList) - await done test "Should handle want list - `dont-have` some blocks": - let wantList = makeWantList(blocks.mapIt( it.cid ), sendDontHave = true) + let + done = newFuture[void]() + wantList = makeWantList( + blocks.mapIt( it.cid ), + sendDontHave = true) + proc sendPresence(peerId: PeerID, presence: seq[BlockPresence]) {.gcsafe, async.} = - check presence.mapIt( it.cid ) == blocks[2..blocks.high].mapIt( it.cid.data.buffer ) + let + cid1Buf = blocks[0].cid.data.buffer + cid2Buf = blocks[1].cid.data.buffer + for p in presence: - check: - p.`type` == BlockPresenceType.presenceDontHave + if p.cid != cid1Buf and p.cid != cid2Buf: + check p.`type` == BlockPresenceType.DontHave + else: + check p.`type` == BlockPresenceType.Have done.complete() - engine.network = BlockExcNetwork(request: BlockExcRequest( - sendPresence: sendPresence + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendPresence: sendPresence )) (await engine.localStore.putBlock(blocks[0])).tryGet() @@ -247,24 +286,58 @@ suite "NetworkStore engine handlers": check present.tryGet() test "Should send payments for received blocks": - let account = Account(address: EthAddress.example) - let peerContext = peerStore.get(peerId) - peerContext.account = account.some - peerContext.peerPrices = blocks.mapIt((it.cid, rand(uint16).u256)).toTable + let + done = newFuture[void]() + account = Account(address: EthAddress.example) + peerContext = peerStore.get(peerId) - engine.network = BlockExcNetwork(request: BlockExcRequest( - sendPayment: proc(receiver: PeerID, payment: SignedState) {.gcsafe, async.} = - let amount = blocks.mapIt(peerContext.peerPrices[it.cid]).foldl(a+b) - let balances = !payment.state.outcome.balances(Asset) - check receiver == peerId - check balances[account.address.toDestination] == amount - done.complete() + peerContext.account = account.some + peerContext.blocks = blocks.mapIt( + (it.cid, Presence(cid: it.cid, price: rand(uint16).u256)) + ).toTable + + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendPayment: proc(receiver: PeerID, payment: SignedState) {.gcsafe, async.} = + let + amount = + blocks.mapIt( + peerContext.blocks[it.cid].price + ).foldl(a + b) + + balances = !payment.state.outcome.balances(Asset) + + check receiver == peerId + check balances[account.address.toDestination] == amount + done.complete() )) await engine.blocksHandler(peerId, blocks) await done.wait(100.millis) test "Should handle block presence": + var + handles: Table[Cid, Future[bt.Block]] + + proc sendWantList( + id: PeerID, + cids: seq[Cid], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false) {.gcsafe, async.} = + engine.pendingBlocks.resolve(blocks.filterIt( it.cid in cids )) + + engine.network = BlockExcNetwork( + request: BlockExcRequest( + sendWantList: sendWantList + )) + + # only Cids in peer want lists are requested + handles = blocks.mapIt( + (it.cid, engine.pendingBlocks.getWantHandle( it.cid ))).toTable + let price = UInt256.example await engine.blockPresenceHandler( peerId, @@ -277,8 +350,8 @@ suite "NetworkStore engine handlers": )))) for cid in blocks.mapIt(it.cid): - check peerCtx.peerHave.contains(cid) - check peerCtx.peerPrices[cid] == price + check cid in peerCtx.peerHave + check peerCtx.blocks[cid].price == price suite "Task Handler": var @@ -366,7 +439,7 @@ suite "Task Handler": `block`: blocks[0].cid.data.buffer, priority: 49, cancel: false, - wantType: WantType.wantBlock, + wantType: WantType.WantBlock, sendDontHave: false) ) @@ -376,7 +449,7 @@ suite "Task Handler": `block`: blocks[1].cid.data.buffer, priority: 50, cancel: false, - wantType: WantType.wantBlock, + wantType: WantType.WantBlock, sendDontHave: false) ) @@ -404,7 +477,7 @@ suite "Task Handler": `block`: present[0].cid.data.buffer, priority: 1, cancel: false, - wantType: WantType.wantHave, + wantType: WantType.WantHave, sendDontHave: false) ) @@ -414,7 +487,7 @@ suite "Task Handler": `block`: present[1].cid.data.buffer, priority: 1, cancel: false, - wantType: WantType.wantHave, + wantType: WantType.WantHave, sendDontHave: false) ) @@ -424,7 +497,7 @@ suite "Task Handler": `block`: missing[0].cid.data.buffer, priority: 1, cancel: false, - wantType: WantType.wantHave, + wantType: WantType.WantHave, sendDontHave: false) ) diff --git a/tests/codex/blockexchange/protobuf/testpresence.nim b/tests/codex/blockexchange/protobuf/testpresence.nim index 43593e13..7d02269e 100644 --- a/tests/codex/blockexchange/protobuf/testpresence.nim +++ b/tests/codex/blockexchange/protobuf/testpresence.nim @@ -18,9 +18,9 @@ suite "block presence protobuf messages": test "encodes have/donthave": var presence = presence presence.have = true - check PresenceMessage.init(presence).`type` == presenceHave + check PresenceMessage.init(presence).`type` == Have presence.have = false - check PresenceMessage.init(presence).`type` == presenceDontHave + check PresenceMessage.init(presence).`type` == DontHave test "encodes price": check message.price == @(price.toBytesBE) @@ -35,9 +35,9 @@ suite "block presence protobuf messages": test "decodes have/donthave": var message = message - message.`type` = presenceHave + message.`type` = BlockPresenceType.Have check Presence.init(message).?have == true.some - message.`type` = presenceDontHave + message.`type` = BlockPresenceType.DontHave check Presence.init(message).?have == false.some test "decodes price": diff --git a/tests/codex/blockexchange/testnetwork.nim b/tests/codex/blockexchange/testnetwork.nim index f26f5ae6..9f3c399e 100644 --- a/tests/codex/blockexchange/testnetwork.nim +++ b/tests/codex/blockexchange/testnetwork.nim @@ -56,7 +56,7 @@ suite "Network - Handlers": for b in blocks: check b.cid in wantList.entries let entry = wantList.entries[wantList.entries.find(b.cid)] - check entry.wantType == WantType.wantHave + check entry.wantType == WantType.WantHave check entry.priority == 1 check entry.cancel == true check entry.sendDontHave == true @@ -67,7 +67,7 @@ suite "Network - Handlers": let wantList = makeWantList( blocks.mapIt( it.cid ), - 1, true, WantType.wantHave, + 1, true, WantType.WantHave, true, true) let msg = Message(wantlist: wantList) @@ -103,7 +103,7 @@ suite "Network - Handlers": blockPresences: blocks.mapIt( BlockPresence( cid: it.cid.data.buffer, - type: BlockPresenceType.presenceHave + type: BlockPresenceType.Have ))) await buffer.pushData(lenPrefix(ProtobufEncode(msg))) @@ -186,7 +186,7 @@ suite "Network - Senders": for b in blocks: check b.cid in wantList.entries let entry = wantList.entries[wantList.entries.find(b.cid)] - check entry.wantType == WantType.wantHave + check entry.wantType == WantType.WantHave check entry.priority == 1 check entry.cancel == true check entry.sendDontHave == true @@ -197,7 +197,7 @@ suite "Network - Senders": await network1.sendWantList( switch2.peerInfo.peerId, blocks.mapIt( it.cid ), - 1, true, WantType.wantHave, + 1, true, WantType.WantHave, true, true) await done.wait(500.millis) @@ -231,7 +231,7 @@ suite "Network - Senders": blocks.mapIt( BlockPresence( cid: it.cid.data.buffer, - type: BlockPresenceType.presenceHave + type: BlockPresenceType.Have ))) await done.wait(500.millis) diff --git a/tests/codex/blockexchange/testpeerctxstore.nim b/tests/codex/blockexchange/testpeerctxstore.nim index a1c826e3..1cf66a09 100644 --- a/tests/codex/blockexchange/testpeerctxstore.nim +++ b/tests/codex/blockexchange/testpeerctxstore.nim @@ -6,6 +6,7 @@ import pkg/libp2p import pkg/codex/blockexchange/peers import pkg/codex/blockexchange/protobuf/blockexc +import pkg/codex/blockexchange/protobuf/presence import ../examples @@ -52,13 +53,13 @@ suite "Peer Context Store Peer Selection": peerCtxs = @[] test "Should select peers that have Cid": - peerCtxs[0].peerPrices = collect(initTable): + peerCtxs[0].blocks = collect(initTable): for i, c in cids: - { c: i.u256 } + { c: Presence(cid: c, price: i.u256) } - peerCtxs[5].peerPrices = collect(initTable): + peerCtxs[5].blocks = collect(initTable): for i, c in cids: - { c: i.u256 } + { c: Presence(cid: c, price: i.u256) } let peers = store.peersHave(cids[0]) @@ -68,17 +69,17 @@ suite "Peer Context Store Peer Selection": check peerCtxs[5] in peers test "Should select cheapest peers for Cid": - peerCtxs[0].peerPrices = collect(initTable): + peerCtxs[0].blocks = collect(initTable): for i, c in cids: - { c: (5 + i).u256 } + { c: Presence(cid: c, price: (5 + i).u256) } - peerCtxs[5].peerPrices = collect(initTable): + peerCtxs[5].blocks = collect(initTable): for i, c in cids: - { c: (2 + i).u256 } + { c: Presence(cid: c, price: (2 + i).u256) } - peerCtxs[9].peerPrices = collect(initTable): + peerCtxs[9].blocks = collect(initTable): for i, c in cids: - { c: i.u256 } + { c: Presence(cid: c, price: i.u256) } let peers = store.selectCheapest(cids[0]) @@ -95,7 +96,7 @@ suite "Peer Context Store Peer Selection": `block`: it.data.buffer, priority: 1, cancel: false, - wantType: WantType.wantBlock, + wantType: WantType.WantBlock, sendDontHave: false)) peerCtxs[0].peerWants = entries