Dont dial self (#633)

* don't dial self

* revert style changes

* filter out self on dial

* add helpers to `peerId` and `isSelf`

* don't fire up discovery eaguerly

* allow excluding multiple peers in sendWantHave

* revert style changes

* move self check to discovery.find

* readd eaguer dht lookup is required in some cases

* revert style changes

* misc

* drop peer first, before queueing a dht lookup

* moar style changes

* use isSelf
This commit is contained in:
Dmitriy Ryajov 2023-11-27 12:25:53 -06:00 committed by GitHub
parent e871859a96
commit 22c31046a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 86 additions and 86 deletions

View File

@ -124,11 +124,11 @@ proc stop*(b: BlockExcEngine) {.async.} =
proc sendWantHave( proc sendWantHave(
b: BlockExcEngine, b: BlockExcEngine,
address: BlockAddress, address: BlockAddress,
selectedPeer: BlockExcPeerCtx, excluded: seq[BlockExcPeerCtx],
peers: seq[BlockExcPeerCtx]): Future[void] {.async.} = peers: seq[BlockExcPeerCtx]): Future[void] {.async.} =
trace "Sending wantHave request to peers", address trace "Sending wantHave request to peers", address
for p in peers: for p in peers:
if p != selectedPeer: if p notin excluded:
if address notin p.peerHave: if address notin p.peerHave:
trace " wantHave > ", peer = p.id trace " wantHave > ", peer = p.id
await b.network.request.sendWantList( await b.network.request.sendWantList(
@ -172,14 +172,13 @@ proc monitorBlockHandle(b: BlockExcEngine, handle: Future[Block], address: Block
# prioritization # prioritization
# drop unresponsive peer # drop unresponsive peer
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
await b.network.switch.disconnect(peerId) await b.network.switch.disconnect(peerId)
b.discovery.queueFindBlocksReq(@[address.cidOrTreeCid])
proc requestBlock*( proc requestBlock*(
b: BlockExcEngine, b: BlockExcEngine,
address: BlockAddress, address: BlockAddress,
timeout = DefaultBlockTimeout timeout = DefaultBlockTimeout): Future[Block] {.async.} =
): Future[Block] {.async.} =
let blockFuture = b.pendingBlocks.getWantHandle(address, timeout) let blockFuture = b.pendingBlocks.getWantHandle(address, timeout)
if b.pendingBlocks.isInFlight(address): if b.pendingBlocks.isInFlight(address):
@ -202,7 +201,7 @@ proc requestBlock*(
b.pendingBlocks.setInFlight(address) b.pendingBlocks.setInFlight(address)
await b.sendWantBlock(address, peer) await b.sendWantBlock(address, peer)
codex_block_exchange_want_block_lists_sent.inc() codex_block_exchange_want_block_lists_sent.inc()
await b.sendWantHave(address, peer, toSeq(b.peers)) await b.sendWantHave(address, @[peer], toSeq(b.peers))
codex_block_exchange_want_have_lists_sent.inc() codex_block_exchange_want_have_lists_sent.inc()
return await blockFuture return await blockFuture
@ -210,8 +209,7 @@ proc requestBlock*(
proc requestBlock*( proc requestBlock*(
b: BlockExcEngine, b: BlockExcEngine,
cid: Cid, cid: Cid,
timeout = DefaultBlockTimeout timeout = DefaultBlockTimeout): Future[Block] =
): Future[Block] =
b.requestBlock(BlockAddress.init(cid)) b.requestBlock(BlockAddress.init(cid))
proc blockPresenceHandler*( proc blockPresenceHandler*(
@ -230,8 +228,8 @@ proc blockPresenceHandler*(
if presence =? Presence.init(blk): if presence =? Presence.init(blk):
logScope: logScope:
address = $presence.address address = $presence.address
have = presence.have have = presence.have
price = presence.price price = presence.price
trace "Updating precense" trace "Updating precense"
peerCtx.setPresence(presence) peerCtx.setPresence(presence)

View File

@ -75,6 +75,18 @@ type
getConn: ConnProvider getConn: ConnProvider
inflightSema: AsyncSemaphore inflightSema: AsyncSemaphore
proc peerId*(b: BlockExcNetwork): PeerId =
## Return peer id
##
return b.switch.peerInfo.peerId
proc isSelf*(b: BlockExcNetwork, peer: PeerId): bool =
## Check if peer is self
##
return b.peerId == peer
proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} = proc send*(b: BlockExcNetwork, id: PeerId, msg: pb.Message) {.async.} =
## Send message to peer ## Send message to peer
## ##
@ -103,15 +115,14 @@ proc handleWantList(
await b.handlers.onWantList(peer.id, list) await b.handlers.onWantList(peer.id, list)
proc sendWantList*( proc sendWantList*(
b: BlockExcNetwork, b: BlockExcNetwork,
id: PeerId, id: PeerId,
addresses: seq[BlockAddress], addresses: seq[BlockAddress],
priority: int32 = 0, priority: int32 = 0,
cancel: bool = false, cancel: bool = false,
wantType: WantType = WantType.WantHave, wantType: WantType = WantType.WantHave,
full: bool = false, full: bool = false,
sendDontHave: bool = false sendDontHave: bool = false): Future[void] =
): Future[void] =
## Send a want message to peer ## Send a want message to peer
## ##
@ -125,14 +136,13 @@ proc sendWantList*(
wantType: wantType, wantType: wantType,
sendDontHave: sendDontHave) ), sendDontHave: sendDontHave) ),
full: full) full: full)
b.send(id, Message(wantlist: msg)) b.send(id, Message(wantlist: msg))
proc handleBlocksDelivery( proc handleBlocksDelivery(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
blocksDelivery: seq[BlockDelivery] blocksDelivery: seq[BlockDelivery]) {.async.} =
) {.async.} =
## Handle incoming blocks ## Handle incoming blocks
## ##
@ -171,10 +181,9 @@ proc sendBlockPresence*(
b.send(id, Message(blockPresences: @presence)) b.send(id, Message(blockPresences: @presence))
proc handleAccount( proc handleAccount(
network: BlockExcNetwork, network: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
account: Account account: Account) {.async.} =
) {.async.} =
## Handle account info ## Handle account info
## ##
@ -182,30 +191,27 @@ proc handleAccount(
await network.handlers.onAccount(peer.id, account) await network.handlers.onAccount(peer.id, account)
proc sendAccount*( proc sendAccount*(
b: BlockExcNetwork, b: BlockExcNetwork,
id: PeerId, id: PeerId,
account: Account account: Account): Future[void] =
): Future[void] =
## Send account info to remote ## Send account info to remote
## ##
b.send(id, Message(account: AccountMessage.init(account))) b.send(id, Message(account: AccountMessage.init(account)))
proc sendPayment*( proc sendPayment*(
b: BlockExcNetwork, b: BlockExcNetwork,
id: PeerId, id: PeerId,
payment: SignedState payment: SignedState): Future[void] =
): Future[void] =
## Send payment to remote ## Send payment to remote
## ##
b.send(id, Message(payment: StateChannelUpdate.init(payment))) b.send(id, Message(payment: StateChannelUpdate.init(payment)))
proc handlePayment( proc handlePayment(
network: BlockExcNetwork, network: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
payment: SignedState payment: SignedState) {.async.} =
) {.async.} =
## Handle payment ## Handle payment
## ##
@ -213,12 +219,11 @@ proc handlePayment(
await network.handlers.onPayment(peer.id, payment) await network.handlers.onPayment(peer.id, payment)
proc rpcHandler( proc rpcHandler(
b: BlockExcNetwork, b: BlockExcNetwork,
peer: NetworkPeer, peer: NetworkPeer,
msg: Message msg: Message) {.async.} =
) {.async.} =
## handle rpc messages ## handle rpc messages
## ##
try: try:
if msg.wantList.entries.len > 0: if msg.wantList.entries.len > 0:
asyncSpawn b.handleWantList(peer, msg.wantList) asyncSpawn b.handleWantList(peer, msg.wantList)
@ -273,6 +278,13 @@ proc setupPeer*(b: BlockExcNetwork, peer: PeerId) =
discard b.getOrCreatePeer(peer) discard b.getOrCreatePeer(peer)
proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} =
## Dial a peer
##
if b.isSelf(peer.peerId):
trace "Skipping dialing self", peer = peer.peerId
return
await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address))
proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = proc dropPeer*(b: BlockExcNetwork, peer: PeerId) =
@ -303,11 +315,10 @@ method init*(b: BlockExcNetwork) =
b.codec = Codec b.codec = Codec
proc new*( proc new*(
T: type BlockExcNetwork, T: type BlockExcNetwork,
switch: Switch, switch: Switch,
connProvider: ConnProvider = nil, connProvider: ConnProvider = nil,
maxInflight = MaxInflight maxInflight = MaxInflight): BlockExcNetwork =
): BlockExcNetwork =
## Create a new BlockExcNetwork instance ## Create a new BlockExcNetwork instance
## ##

View File

@ -83,8 +83,7 @@ func new*(
T: type NetworkPeer, T: type NetworkPeer,
peer: PeerId, peer: PeerId,
connProvider: ConnProvider, connProvider: ConnProvider,
rpcHandler: RPCHandler rpcHandler: RPCHandler): NetworkPeer =
): NetworkPeer =
doAssert(not isNil(connProvider), doAssert(not isNil(connProvider),
"should supply connection provider") "should supply connection provider")

View File

@ -8,6 +8,7 @@
## those terms. ## those terms.
import std/algorithm import std/algorithm
import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/chronicles import pkg/chronicles
@ -33,10 +34,10 @@ logScope:
type type
Discovery* = ref object of RootObj Discovery* = ref object of RootObj
protocol*: discv5.Protocol # dht protocol protocol*: discv5.Protocol # dht protocol
key: PrivateKey # private key key: PrivateKey # private key
peerId: PeerId # the peer id of the local node peerId: PeerId # the peer id of the local node
announceAddrs*: seq[MultiAddress] # addresses announced as part of the provider records announceAddrs*: seq[MultiAddress] # addresses announced as part of the provider records
providerRecord*: ?SignedPeerRecord # record to advertice node connection information, this carry any providerRecord*: ?SignedPeerRecord # record to advertice node connection information, this carry any
# address that the node can be connected on # address that the node can be connected on
dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information dhtRecord*: ?SignedPeerRecord # record to advertice DHT connection information
@ -54,9 +55,8 @@ proc toNodeId*(host: ca.Address): NodeId =
readUintBE[256](keccak256.digest(host.toArray).data) readUintBE[256](keccak256.digest(host.toArray).data)
proc findPeer*( proc findPeer*(
d: Discovery, d: Discovery,
peerId: PeerId peerId: PeerId): Future[?PeerRecord] {.async.} =
): Future[?PeerRecord] {.async.} =
trace "protocol.resolve..." trace "protocol.resolve..."
## Find peer using the given Discovery object ## Find peer using the given Discovery object
## ##
@ -70,9 +70,8 @@ proc findPeer*(
PeerRecord.none PeerRecord.none
method find*( method find*(
d: Discovery, d: Discovery,
cid: Cid cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} =
): Future[seq[SignedPeerRecord]] {.async, base.} =
## Find block providers ## Find block providers
## ##
@ -81,7 +80,7 @@ method find*(
(await d.protocol.getProviders(cid.toNodeId())).mapFailure, error: (await d.protocol.getProviders(cid.toNodeId())).mapFailure, error:
trace "Error finding providers for block", cid, error = error.msg trace "Error finding providers for block", cid, error = error.msg
return providers return providers.filterIt( not (it.data.peerId == d.peerId) )
method provide*(d: Discovery, cid: Cid) {.async, base.} = method provide*(d: Discovery, cid: Cid) {.async, base.} =
## Provide a bock Cid ## Provide a bock Cid
@ -98,9 +97,8 @@ method provide*(d: Discovery, cid: Cid) {.async, base.} =
trace "Provided to nodes", nodes = nodes.len trace "Provided to nodes", nodes = nodes.len
method find*( method find*(
d: Discovery, d: Discovery,
host: ca.Address host: ca.Address): Future[seq[SignedPeerRecord]] {.async, base.} =
): Future[seq[SignedPeerRecord]] {.async, base.} =
## Find host providers ## Find host providers
## ##
@ -131,9 +129,8 @@ method provide*(d: Discovery, host: ca.Address) {.async, base.} =
trace "Provided to nodes", nodes = nodes.len trace "Provided to nodes", nodes = nodes.len
method removeProvider*( method removeProvider*(
d: Discovery, d: Discovery,
peerId: PeerId peerId: PeerId): Future[void] {.base.} =
): Future[void] {.base.} =
## Remove provider from providers table ## Remove provider from providers table
## ##

View File

@ -16,16 +16,16 @@ import std/sugar
import pkg/chronicles import pkg/chronicles
import pkg/chronos import pkg/chronos
import pkg/libp2p import pkg/libp2p
import pkg/questionable/results
import ../blocktype
import ../utils/asyncheapqueue import ../utils/asyncheapqueue
import ../utils/asynciter import ../utils/asynciter
import ../clock import ../clock
import ../blocktype
import ./blockstore import ./blockstore
import ../blockexchange import ../blockexchange
import ../merkletree import ../merkletree
import ../blocktype
export blockstore, blockexchange, asyncheapqueue export blockstore, blockexchange, asyncheapqueue
@ -67,14 +67,13 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo
self.getBlock(BlockAddress.init(treeCid, index)) self.getBlock(BlockAddress.init(treeCid, index))
method putBlock*( method putBlock*(
self: NetworkStore, self: NetworkStore,
blk: Block, blk: Block,
ttl = Duration.none ttl = Duration.none): Future[?!void] {.async.} =
): Future[?!void] {.async.} =
## Store block locally and notify the network ## Store block locally and notify the network
## ##
trace "Puting block into network store", cid = blk.cid trace "Putting block into network store", cid = blk.cid
let res = await self.localStore.putBlock(blk, ttl) let res = await self.localStore.putBlock(blk, ttl)
if res.isErr: if res.isErr:
@ -88,15 +87,13 @@ method putBlockCidAndProof*(
treeCid: Cid, treeCid: Cid,
index: Natural, index: Natural,
blockCid: Cid, blockCid: Cid,
proof: MerkleProof proof: MerkleProof): Future[?!void] =
): Future[?!void] =
self.localStore.putBlockCidAndProof(treeCid, index, blockCid, proof) self.localStore.putBlockCidAndProof(treeCid, index, blockCid, proof)
method ensureExpiry*( method ensureExpiry*(
self: NetworkStore, self: NetworkStore,
cid: Cid, cid: Cid,
expiry: SecondsSince1970 expiry: SecondsSince1970): Future[?!void] {.async.} =
): Future[?!void] {.async.} =
## Ensure that block's assosicated expiry is at least given timestamp ## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
## ##
@ -112,11 +109,10 @@ method ensureExpiry*(
return success() return success()
method ensureExpiry*( method ensureExpiry*(
self: NetworkStore, self: NetworkStore,
treeCid: Cid, treeCid: Cid,
index: Natural, index: Natural,
expiry: SecondsSince1970 expiry: SecondsSince1970): Future[?!void] {.async.} =
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp ## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
## ##

View File

@ -229,7 +229,6 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check responsePast.status == "400 Bad Request" check responsePast.status == "400 Bad Request"
check responsePast.body == "Expiry needs to be in future" check responsePast.body == "Expiry needs to be in future"
test "expired request partially pays out for stored time": test "expired request partially pays out for stored time":
let marketplace = Marketplace.new(Marketplace.address, provider.getSigner()) let marketplace = Marketplace.new(Marketplace.address, provider.getSigner())
let tokenAddress = await marketplace.token() let tokenAddress = await marketplace.token()