diff --git a/eth_p2p.nim b/eth_p2p.nim index caba69a..07caf12 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -58,10 +58,15 @@ proc processIncoming(server: StreamServer, yield peerfut if not peerfut.failed: let peer = peerfut.read() - echo "TODO: Add peer to the pool..." + if peer.remote notin node.peerPool.connectedNodes: + node.peerPool.connectedNodes[peer.remote] = peer + for o in node.peerPool.observers.values: + if not o.onPeerConnected.isNil: + o.onPeerConnected(peer) + else: + debug "Disconnecting already connected node" + await peer.disconnect(AlreadyConnected) else: - echo "Could not establish connection with incoming peer ", - $remote.remoteAddress() remote.close() proc startListening*(node: EthereumNode) = diff --git a/eth_p2p/peer_pool.nim b/eth_p2p/peer_pool.nim index 9c5924e..b2c2866 100644 --- a/eth_p2p/peer_pool.nim +++ b/eth_p2p/peer_pool.nim @@ -2,7 +2,7 @@ # on the given network. import - os, tables, times, random, + os, tables, times, random, sequtils, asyncdispatch2, chronicles, rlp, eth_keys, private/types, discovery, kademlia, rlpx @@ -28,7 +28,7 @@ proc newPeerPool*(network: EthereumNode, template ensureFuture(f: untyped) = asyncCheck f proc nodesToConnect(p: PeerPool): seq[Node] {.inline.} = - p.discovery.randomNodes(p.minPeers) + p.discovery.randomNodes(p.minPeers).filterIt(it notin p.discovery.bootstrapNodes) proc addObserver(p: PeerPool, observerId: int, observer: PeerObserver) = assert(observerId notin p.observers) @@ -99,17 +99,22 @@ proc lookupRandomNode(p: PeerPool) {.async.} = discard p.lastLookupTime = epochTime() -proc getRandomBootnode(p: PeerPool): seq[Node] = - @[p.discovery.bootstrapNodes.rand()] +proc getRandomBootnode(p: PeerPool): Node = + p.discovery.bootstrapNodes.rand() proc connectToNode*(p: PeerPool, n: Node) {.async.} = let peer = await p.connect(n) if not peer.isNil: info "Connection established", peer - p.connectedNodes[peer.remote] = peer - for o in p.observers.values: - if not o.onPeerConnected.isNil: - o.onPeerConnected(peer) + if peer.remote notin p.connectedNodes: + p.connectedNodes[peer.remote] = peer + for o in p.observers.values: + if not o.onPeerConnected.isNil: + o.onPeerConnected(peer) + else: + # In case an incoming connection was added in the meanwhile + debug "Disconnecting peer", reason = AlreadyConnected + await peer.disconnect(AlreadyConnected) proc connectToNodes(p: PeerPool, nodes: seq[Node]) {.async.} = for node in nodes: @@ -149,7 +154,7 @@ proc maybeConnectToMorePeers(p: PeerPool) {.async.} = # be full of bad peers, so if we can't connect to any peers we try a random # bootstrap node as well. if p.connectedNodes.len == 0: - await p.connectToNodes(p.getRandomBootnode()) + await p.connectToNode(p.getRandomBootnode()) proc run(p: PeerPool) {.async.} = info "Running PeerPool..." diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index 8b5a778..2002dd5 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -1108,7 +1108,7 @@ rlpxProtocol p2p(version = 0): discard proc removePeer(network: EthereumNode, peer: Peer) = - if network.peerPool != nil: + if network.peerPool != nil and not peer.remote.isNil: network.peerPool.connectedNodes.del(peer.remote) for observer in network.peerPool.observers.values: @@ -1365,12 +1365,22 @@ proc rlpxAccept*(node: EthereumNode, result.remote = newNode(initEnode(handshake.remoteHPubkey, address)) await postHelloSteps(result, response) + except PeerDisconnected as e: + if e.reason == AlreadyConnected: + debug "Disconnect during rlpxAccept", reason = e.reason + else: + debug "Unexpected disconnect during rlpxAccept", reason = e.reason + transport.close() + result = nil + raise e except: + let e = getCurrentException() error "Exception in rlpxAccept", err = getCurrentExceptionMsg(), stackTrace = getCurrentException().getStackTrace() transport.close() result = nil + raise e when isMainModule: