diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 6ca712e70..0308b3fd5 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -67,18 +67,18 @@ when networkBackend in [libp2p, libp2pDaemon]: import os, random, stew/io, eth/async_utils, - libp2p/[multiaddress, multicodec], + libp2p/[multiaddress, multicodec, peerinfo], ssz export - multiaddress + multiaddress, peerinfo when networkBackend == libp2p: import - libp2p/standard_setup, libp2p_backend + libp2p/standard_setup, libp2p_backend, peer_pool export - libp2p_backend + libp2p_backend, peer_pool else: import @@ -244,7 +244,10 @@ when networkBackend in [libp2p, libp2pDaemon]: writeFile(filename, $id.addresses[0] & "/p2p/" & id.peer.pretty) func peersCount*(node: Eth2Node): int = - node.peers.len + when networkBackend == libp2p: + len(node.peerPool) + else: + node.peers.len proc subscribe*[MsgType](node: Eth2Node, topic: string, diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 1336eac35..52a7dde5d 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -13,7 +13,8 @@ import libp2p/protocols/secure/[secure, secio], libp2p/protocols/pubsub/[pubsub, floodsub], libp2p/transports/[transport, tcptransport], - libp2p_json_serialization, eth2_discovery, conf, ssz + libp2p_json_serialization, eth2_discovery, conf, ssz, + peer_pool import eth/p2p/discoveryv5/protocol as discv5_protocol @@ -29,7 +30,7 @@ type switch*: Switch discovery*: Eth2DiscoveryProtocol wantedPeers*: int - peers*: Table[PeerID, Peer] + peerPool*: PeerPool[Peer, PeerID] protocolStates*: seq[RootRef] libp2pTransportLoops*: seq[Future[void]] @@ -43,6 +44,7 @@ type connectionState*: ConnectionState protocolStates*: seq[RootRef] maxInactivityAllowed*: Duration + score*: int ConnectionState* = enum None, @@ -125,31 +127,68 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.} proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} = let peerId = peerInfo.peerId - result = node.peers.getOrDefault(peerId) + result = node.peerPool.getOrDefault(peerId) if result == nil: result = Peer.init(node, peerInfo) - node.peers[peerId] = result proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} = # TODO: Can this be `nil`? return network.getPeer(stream.peerInfo) -proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = +proc getKey*(peer: Peer): PeerID {.inline.} = + result = peer.info.peerId + +proc getFuture*(peer: Peer): Future[void] {.inline.} = + result = peer.info.lifeFuture() + +proc `<`*(a, b: Peer): bool = + result = `<`(a.score, b.score) + +proc disconnect*(peer: Peer, reason: DisconnectionReason, + notifyOtherPeer = false) {.async.} = # TODO: How should we notify the other peer? if peer.connectionState notin {Disconnecting, Disconnected}: peer.connectionState = Disconnecting await peer.network.switch.disconnect(peer.info) peer.connectionState = Disconnected - peer.network.peers.del(peer.info.peerId) + peer.network.peerPool.release(peer) proc safeClose(stream: P2PStream) {.async.} = if not stream.closed: await close(stream) +proc handleIncomingPeer*(peer: Peer) + include eth/p2p/p2p_backends_helpers include eth/p2p/p2p_tracing include libp2p_backends_common +proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = + let network = peer.network + + proc onPeerClosed(udata: pointer) {.gcsafe.} = + debug "Peer (outgoing) lost", peer = $peer.info + libp2p_peers.set int64(len(network.peerPool)) + + let res = await network.peerPool.addOutgoingPeer(peer) + if res: + debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info + peer.getFuture().addCallback(onPeerClosed) + libp2p_peers.set int64(len(network.peerPool)) + +proc handleIncomingPeer*(peer: Peer) = + let network = peer.network + + proc onPeerClosed(udata: pointer) {.gcsafe.} = + debug "Peer (incoming) lost", peer = $peer.info + libp2p_peers.set int64(len(network.peerPool)) + + let res = network.peerPool.addIncomingPeerNoWait(peer) + if res: + debug "Peer (incoming) has been added to PeerPool", peer = $peer.info + peer.getFuture().addCallback(onPeerClosed) + libp2p_peers.set int64(len(network.peerPool)) + proc toPeerInfo*(r: enr.TypedRecord): PeerInfo = if r.secp256k1.isSome: var pubKey: keys.PublicKey @@ -195,6 +234,8 @@ proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} = inc libp2p_successful_dials debug "Network handshakes completed" + await handleOutgoingPeer(peer) + proc runDiscoveryLoop*(node: Eth2Node) {.async.} = debug "Starting discovery loop" @@ -206,7 +247,6 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = let discoveredPeers = await node.discovery.lookupRandom() debug "Discovered peers", peer = $discoveredPeers for peer in discoveredPeers: - debug "Discovered peer", peer = $peer try: let peerInfo = peer.record.toTypedRecord.toPeerInfo if peerInfo != nil and peerInfo.id notin node.switch.connections: @@ -223,9 +263,9 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, switch: Switch, ip: IpAddress, privKey: keys.PrivateKey): T = new result result.switch = switch - result.peers = initTable[PeerID, Peer]() result.discovery = Eth2DiscoveryProtocol.new(conf, ip, privKey.data) result.wantedPeers = conf.maxPeers + result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers) newSeq result.protocolStates, allProtocols.len for proto in allProtocols: diff --git a/beacon_chain/libp2p_backends_common.nim b/beacon_chain/libp2p_backends_common.nim index 9bd09638b..552ef2263 100644 --- a/beacon_chain/libp2p_backends_common.nim +++ b/beacon_chain/libp2p_backends_common.nim @@ -369,13 +369,16 @@ proc handleIncomingStream(network: Eth2Node, stream: P2PStream, # defer: setLogLevel(LogLevel.DEBUG) # trace "incoming " & `msgNameLit` & " stream" + let peer = peerFromStream(network, stream) + + handleIncomingPeer(peer) + defer: await safeClose(stream) let deadline = sleepAsync RESP_TIMEOUT msgBytes = await readMsgBytes(stream, false, deadline) - peer = peerFromStream(network, stream) if msgBytes.len == 0: await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg) diff --git a/beacon_chain/request_manager.nim b/beacon_chain/request_manager.nim index 70f98b815..f7bedad1d 100644 --- a/beacon_chain/request_manager.nim +++ b/beacon_chain/request_manager.nim @@ -33,6 +33,24 @@ proc fetchAncestorBlocksFromPeer( debug "Error while fetching ancestor blocks", err = err.msg, root = rec.root, peer +proc fetchAncestorBlocksFromNetwork( + network: Eth2Node, + rec: FetchRecord, + responseHandler: FetchAncestorsResponseHandler) {.async.} = + var peer: Peer + try: + peer = await network.peerPool.acquire() + let blocks = await peer.beaconBlocksByRoot([rec.root]) + if blocks.isSome: + for b in blocks.get: + responseHandler(b) + except CatchableError as err: + debug "Error while fetching ancestor blocks", + err = err.msg, root = rec.root, peer = peer.info + finally: + if not(isNil(peer)): + network.peerPool.release(peer) + proc fetchAncestorBlocks*(requestManager: RequestManager, roots: seq[FetchRecord], responseHandler: FetchAncestorsResponseHandler) = @@ -44,8 +62,8 @@ proc fetchAncestorBlocks*(requestManager: RequestManager, # * Keep track of the average latency of each peer # (we can give priority to peers with better latency) # - const ParallelRequests = 2 - - for peer in requestManager.network.randomPeers(ParallelRequests, BeaconSync): - traceAsyncErrors peer.fetchAncestorBlocksFromPeer(roots.sample(), responseHandler) + for i in 0 ..< ParallelRequests: + traceAsyncErrors fetchAncestorBlocksFromNetwork(requestManager.network, + roots.sample(), + responseHandler)