diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index e42851c5..ddff3c49 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -124,11 +124,11 @@ proc stop*(b: BlockExcEngine) {.async.} = proc sendWantHave( b: BlockExcEngine, address: BlockAddress, - selectedPeer: BlockExcPeerCtx, + excluded: seq[BlockExcPeerCtx], peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = trace "Sending wantHave request to peers", address for p in peers: - if p != selectedPeer: + if p notin excluded: if address notin p.peerHave: trace " wantHave > ", peer = p.id await b.network.request.sendWantList( @@ -172,14 +172,13 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block # prioritization # drop unresponsive peer - b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) await b.network.switch.disconnect(peerId) + b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid]) proc requestBlock*( b: BlockExcEngine, address: BlockAddress, - timeout = DefaultBlockTimeout -): Future[Block] {.async.} = + timeout = DefaultBlockTimeout): Future[Block] {.async.} = let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) if b.pendingBlocks.isInFlight(address): @@ -202,7 +201,7 @@ proc requestBlock*( b.pendingBlocks.setInFlight(address) await b.sendWantBlock(address, peer) codex_block_exchange_want_block_lists_sent.inc() - await b.sendWantHave(address, peer, toSeq(b.peers)) + await b.sendWantHave(address, @[peer], toSeq(b.peers)) codex_block_exchange_want_have_lists_sent.inc() return await blockFuture @@ -210,8 +209,7 @@ proc requestBlock*( proc requestBlock*( b: BlockExcEngine, cid: Cid, - timeout = DefaultBlockTimeout -): Future[Block] = + timeout = DefaultBlockTimeout): Future[Block] = b.requestBlock(BlockAddress.init(cid)) proc blockPresenceHandler*( @@ -230,8 +228,8 @@ proc blockPresenceHandler*( if presence =? Presence.init(blk): logScope: address = $presence.address - have = presence.have - price = presence.price + have = presence.have + price = presence.price trace "Updating precense" peerCtx.setPresence(presence) diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index 0c7d2e2c..0ce0c9ce 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -75,6 +75,18 @@ type getConn: ConnProvider inflightSema: AsyncSemaphore +proc peerId*(b: BlockExcNetwork): PeerId = + ## Return peer id + ## + + return b.switch.peerInfo.peerId + +proc isSelf*(b: BlockExcNetwork, peer: PeerId): bool = + ## Check if peer is self + ## + + return b.peerId == peer + proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = ## Send message to peer ## @@ -103,15 +115,14 @@ proc handleWantList( await b.handlers.onWantList(peer.id, list) proc sendWantList*( - b: BlockExcNetwork, - id: PeerId, - addresses: seq[BlockAddress], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.WantHave, - full: bool = false, - sendDontHave: bool = false -): Future[void] = + b: BlockExcNetwork, + id: PeerId, + addresses: seq[BlockAddress], + priority: int32 = 0, + cancel: bool = false, + wantType: WantType = WantType.WantHave, + full: bool = false, + sendDontHave: bool = false): Future[void] = ## Send a want message to peer ## @@ -125,14 +136,13 @@ proc sendWantList*( wantType: wantType, sendDontHave: sendDontHave) ), full: full) - + b.send(id, Message(wantlist: msg)) proc handleBlocksDelivery( - b: BlockExcNetwork, - peer: NetworkPeer, - blocksDelivery: seq[BlockDelivery] -) {.async.} = + b: BlockExcNetwork, + peer: NetworkPeer, + blocksDelivery: seq[BlockDelivery]) {.async.} = ## Handle incoming blocks ## @@ -171,10 +181,9 @@ proc sendBlockPresence*( b.send(id, Message(blockPresences: @presence)) proc handleAccount( - network: BlockExcNetwork, - peer: NetworkPeer, - account: Account -) {.async.} = + network: BlockExcNetwork, + peer: NetworkPeer, + account: Account) {.async.} = ## Handle account info ## @@ -182,30 +191,27 @@ proc handleAccount( await network.handlers.onAccount(peer.id, account) proc sendAccount*( - b: BlockExcNetwork, - id: PeerId, - account: Account -): Future[void] = + b: BlockExcNetwork, + id: PeerId, + account: Account): Future[void] = ## Send account info to remote ## b.send(id, Message(account: AccountMessage.init(account))) proc sendPayment*( - b: BlockExcNetwork, - id: PeerId, - payment: SignedState -): Future[void] = + b: BlockExcNetwork, + id: PeerId, + payment: SignedState): Future[void] = ## Send payment to remote ## b.send(id, Message(payment: StateChannelUpdate.init(payment))) proc handlePayment( - network: BlockExcNetwork, - peer: NetworkPeer, - payment: SignedState -) {.async.} = + network: BlockExcNetwork, + peer: NetworkPeer, + payment: SignedState) {.async.} = ## Handle payment ## @@ -213,12 +219,11 @@ proc handlePayment( await network.handlers.onPayment(peer.id, payment) proc rpcHandler( - b: BlockExcNetwork, - peer: NetworkPeer, - msg: Message -) {.async.} = + b: BlockExcNetwork, + peer: NetworkPeer, + msg: Message) {.async.} = ## handle rpc messages - ## + ## try: if msg.wantList.entries.len > 0: asyncSpawn b.handleWantList(peer, msg.wantList) @@ -273,6 +278,13 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerId) = discard b.getOrCreatePeer(peer) proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = + ## Dial a peer + ## + + if b.isSelf(peer.peerId): + trace "Skipping dialing self", peer = peer.peerId + return + await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = @@ -303,11 +315,10 @@ method init*(b: BlockExcNetwork) = b.codec = Codec proc new*( - T: type BlockExcNetwork, - switch: Switch, - connProvider: ConnProvider = nil, - maxInflight = MaxInflight -): BlockExcNetwork = + T: type BlockExcNetwork, + switch: Switch, + connProvider: ConnProvider = nil, + maxInflight = MaxInflight): BlockExcNetwork = ## Create a new BlockExcNetwork instance ## diff --git a/codex/blockexchange/network/networkpeer.nim b/codex/blockexchange/network/networkpeer.nim index 1ed5f46b..c3536f61 100644 --- a/codex/blockexchange/network/networkpeer.nim +++ b/codex/blockexchange/network/networkpeer.nim @@ -83,8 +83,7 @@ func new*( T: type NetworkPeer, peer: PeerId, connProvider: ConnProvider, - rpcHandler: RPCHandler -): NetworkPeer = + rpcHandler: RPCHandler): NetworkPeer = doAssert(not isNil(connProvider), "should supply connection provider") diff --git a/codex/discovery.nim b/codex/discovery.nim index 45430af5..67aacd17 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -8,6 +8,7 @@ ## those terms. import std/algorithm +import std/sequtils import pkg/chronos import pkg/chronicles @@ -33,10 +34,10 @@ logScope: type Discovery* = ref object of RootObj - protocol*: discv5.Protocol # dht protocol + protocol*: discv5.Protocol # dht protocol key: PrivateKey # private key peerId: PeerId # the peer id of the local node - announceAddrs*: seq[MultiAddress] # addresses announced as part of the provider records + announceAddrs*: seq[MultiAddress] # addresses announced as part of the provider records providerRecord*: ?SignedPeerRecord # record to advertice node connection information, this carry any # address that the node can be connected on dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information @@ -54,9 +55,8 @@ proc toNodeId*(host: ca.Address): NodeId = readUintBE[256](keccak256.digest(host.toArray).data) proc findPeer*( - d: Discovery, - peerId: PeerId -): Future[?PeerRecord] {.async.} = + d: Discovery, + peerId: PeerId): Future[?PeerRecord] {.async.} = trace "protocol.resolve..." ## Find peer using the given Discovery object ## @@ -70,9 +70,8 @@ proc findPeer*( PeerRecord.none method find*( - d: Discovery, - cid: Cid -): Future[seq[SignedPeerRecord]] {.async, base.} = + d: Discovery, + cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = ## Find block providers ## @@ -81,7 +80,7 @@ method find*( (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error: trace "Error finding providers for block", cid, error = error.msg - return providers + return providers.filterIt( not (it.data.peerId == d.peerId) ) method provide*(d: Discovery, cid: Cid) {.async, base.} = ## Provide a bock Cid @@ -98,9 +97,8 @@ method provide*(d: Discovery, cid: Cid) {.async, base.} = trace "Provided to nodes", nodes = nodes.len method find*( - d: Discovery, - host: ca.Address -): Future[seq[SignedPeerRecord]] {.async, base.} = + d: Discovery, + host: ca.Address): Future[seq[SignedPeerRecord]] {.async, base.} = ## Find host providers ## @@ -131,9 +129,8 @@ method provide*(d: Discovery, host: ca.Address) {.async, base.} = trace "Provided to nodes", nodes = nodes.len method removeProvider*( - d: Discovery, - peerId: PeerId -): Future[void] {.base.} = + d: Discovery, + peerId: PeerId): Future[void] {.base.} = ## Remove provider from providers table ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index eec84611..6da1465c 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -16,16 +16,16 @@ import std/sugar import pkg/chronicles import pkg/chronos import pkg/libp2p +import pkg/questionable/results -import ../blocktype import ../utils/asyncheapqueue import ../utils/asynciter import ../clock +import ../blocktype import ./blockstore import ../blockexchange import ../merkletree -import ../blocktype export blockstore, blockexchange, asyncheapqueue @@ -67,14 +67,13 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo self.getBlock(BlockAddress.init(treeCid, index)) method putBlock*( - self: NetworkStore, - blk: Block, - ttl = Duration.none -): Future[?!void] {.async.} = + self: NetworkStore, + blk: Block, + ttl = Duration.none): Future[?!void] {.async.} = ## Store block locally and notify the network ## - trace "Puting block into network store", cid = blk.cid + trace "Putting block into network store", cid = blk.cid let res = await self.localStore.putBlock(blk, ttl) if res.isErr: @@ -88,15 +87,13 @@ method putBlockCidAndProof*( treeCid: Cid, index: Natural, blockCid: Cid, - proof: MerkleProof -): Future[?!void] = + proof: MerkleProof): Future[?!void] = self.localStore.putBlockCidAndProof(treeCid, index, blockCid, proof) method ensureExpiry*( - self: NetworkStore, - cid: Cid, - expiry: SecondsSince1970 -): Future[?!void] {.async.} = + self: NetworkStore, + cid: Cid, + expiry: SecondsSince1970): Future[?!void] {.async.} = ## Ensure that block's assosicated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -112,11 +109,10 @@ method ensureExpiry*( return success() method ensureExpiry*( - self: NetworkStore, - treeCid: Cid, - index: Natural, - expiry: SecondsSince1970 -): Future[?!void] {.async.} = + self: NetworkStore, + treeCid: Cid, + index: Natural, + expiry: SecondsSince1970): Future[?!void] {.async.} = ## Ensure that block's associated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## diff --git a/tests/integration/testIntegration.nim b/tests/integration/testIntegration.nim index 1f51d513..3708a4f5 100644 --- a/tests/integration/testIntegration.nim +++ b/tests/integration/testIntegration.nim @@ -229,7 +229,6 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false: check responsePast.status == "400 Bad Request" check responsePast.body == "Expiry needs to be in future" - test "expired request partially pays out for stored time": let marketplace = Marketplace.new(Marketplace.address, provider.getSigner()) let tokenAddress = await marketplace.token()