Resolving several connection issues:
- Adding peers to peer pool on incoming connection - Exclude bootnodes from nodes to connect to - Check if peer is already connected to before adding to peer pool - Check if peer is still connected to before removing from peer pool
This commit is contained in:
parent
58d6c9c208
commit
19cd8201b0
11
eth_p2p.nim
11
eth_p2p.nim
|
@ -58,10 +58,15 @@ proc processIncoming(server: StreamServer,
|
||||||
yield peerfut
|
yield peerfut
|
||||||
if not peerfut.failed:
|
if not peerfut.failed:
|
||||||
let peer = peerfut.read()
|
let peer = peerfut.read()
|
||||||
echo "TODO: Add peer to the pool..."
|
if peer.remote notin node.peerPool.connectedNodes:
|
||||||
|
node.peerPool.connectedNodes[peer.remote] = peer
|
||||||
|
for o in node.peerPool.observers.values:
|
||||||
|
if not o.onPeerConnected.isNil:
|
||||||
|
o.onPeerConnected(peer)
|
||||||
|
else:
|
||||||
|
debug "Disconnecting already connected node"
|
||||||
|
await peer.disconnect(AlreadyConnected)
|
||||||
else:
|
else:
|
||||||
echo "Could not establish connection with incoming peer ",
|
|
||||||
$remote.remoteAddress()
|
|
||||||
remote.close()
|
remote.close()
|
||||||
|
|
||||||
proc startListening*(node: EthereumNode) =
|
proc startListening*(node: EthereumNode) =
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# on the given network.
|
# on the given network.
|
||||||
|
|
||||||
import
|
import
|
||||||
os, tables, times, random,
|
os, tables, times, random, sequtils,
|
||||||
asyncdispatch2, chronicles, rlp, eth_keys,
|
asyncdispatch2, chronicles, rlp, eth_keys,
|
||||||
private/types, discovery, kademlia, rlpx
|
private/types, discovery, kademlia, rlpx
|
||||||
|
|
||||||
|
@ -28,7 +28,7 @@ proc newPeerPool*(network: EthereumNode,
|
||||||
template ensureFuture(f: untyped) = asyncCheck f
|
template ensureFuture(f: untyped) = asyncCheck f
|
||||||
|
|
||||||
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
|
proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} =
|
||||||
p.discovery.randomNodes(p.minPeers)
|
p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes)
|
||||||
|
|
||||||
proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
|
proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) =
|
||||||
assert(observerId notin p.observers)
|
assert(observerId notin p.observers)
|
||||||
|
@ -99,17 +99,22 @@ proc lookupRandomNode(p: PeerPool) {.async.} =
|
||||||
discard
|
discard
|
||||||
p.lastLookupTime = epochTime()
|
p.lastLookupTime = epochTime()
|
||||||
|
|
||||||
proc getRandomBootnode(p: PeerPool): seq[Node] =
|
proc getRandomBootnode(p: PeerPool): Node =
|
||||||
@[p.discovery.bootstrapNodes.rand()]
|
p.discovery.bootstrapNodes.rand()
|
||||||
|
|
||||||
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
|
proc connectToNode*(p: PeerPool, n: Node) {.async.} =
|
||||||
let peer = await p.connect(n)
|
let peer = await p.connect(n)
|
||||||
if not peer.isNil:
|
if not peer.isNil:
|
||||||
info "Connection established", peer
|
info "Connection established", peer
|
||||||
p.connectedNodes[peer.remote] = peer
|
if peer.remote notin p.connectedNodes:
|
||||||
for o in p.observers.values:
|
p.connectedNodes[peer.remote] = peer
|
||||||
if not o.onPeerConnected.isNil:
|
for o in p.observers.values:
|
||||||
o.onPeerConnected(peer)
|
if not o.onPeerConnected.isNil:
|
||||||
|
o.onPeerConnected(peer)
|
||||||
|
else:
|
||||||
|
# In case an incoming connection was added in the meanwhile
|
||||||
|
debug "Disconnecting peer", reason = AlreadyConnected
|
||||||
|
await peer.disconnect(AlreadyConnected)
|
||||||
|
|
||||||
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} =
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
|
@ -149,7 +154,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} =
|
||||||
# be full of bad peers, so if we can't connect to any peers we try a random
|
# be full of bad peers, so if we can't connect to any peers we try a random
|
||||||
# bootstrap node as well.
|
# bootstrap node as well.
|
||||||
if p.connectedNodes.len == 0:
|
if p.connectedNodes.len == 0:
|
||||||
await p.connectToNodes(p.getRandomBootnode())
|
await p.connectToNode(p.getRandomBootnode())
|
||||||
|
|
||||||
proc run(p: PeerPool) {.async.} =
|
proc run(p: PeerPool) {.async.} =
|
||||||
info "Running PeerPool..."
|
info "Running PeerPool..."
|
||||||
|
|
|
@ -1108,7 +1108,7 @@ rlpxProtocol p2p(version = 0):
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc removePeer(network: EthereumNode, peer: Peer) =
|
proc removePeer(network: EthereumNode, peer: Peer) =
|
||||||
if network.peerPool != nil:
|
if network.peerPool != nil and not peer.remote.isNil:
|
||||||
network.peerPool.connectedNodes.del(peer.remote)
|
network.peerPool.connectedNodes.del(peer.remote)
|
||||||
|
|
||||||
for observer in network.peerPool.observers.values:
|
for observer in network.peerPool.observers.values:
|
||||||
|
@ -1365,12 +1365,22 @@ proc rlpxAccept*(node: EthereumNode,
|
||||||
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
|
result.remote = newNode(initEnode(handshake.remoteHPubkey, address))
|
||||||
|
|
||||||
await postHelloSteps(result, response)
|
await postHelloSteps(result, response)
|
||||||
|
except PeerDisconnected as e:
|
||||||
|
if e.reason == AlreadyConnected:
|
||||||
|
debug "Disconnect during rlpxAccept", reason = e.reason
|
||||||
|
else:
|
||||||
|
debug "Unexpected disconnect during rlpxAccept", reason = e.reason
|
||||||
|
transport.close()
|
||||||
|
result = nil
|
||||||
|
raise e
|
||||||
except:
|
except:
|
||||||
|
let e = getCurrentException()
|
||||||
error "Exception in rlpxAccept",
|
error "Exception in rlpxAccept",
|
||||||
err = getCurrentExceptionMsg(),
|
err = getCurrentExceptionMsg(),
|
||||||
stackTrace = getCurrentException().getStackTrace()
|
stackTrace = getCurrentException().getStackTrace()
|
||||||
transport.close()
|
transport.close()
|
||||||
result = nil
|
result = nil
|
||||||
|
raise e
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue