Replace Table[PeerID, Peer] with PeerPool.

Add `score` Peer.
This commit is contained in:
cheatfate 2020-02-26 14:40:12 +02:00 committed by zah
parent 2623ac191f
commit 547c8a44d0
4 changed files with 82 additions and 18 deletions

View File

@ -67,18 +67,18 @@ when networkBackend in [libp2p, libp2pDaemon]:
import
os, random,
stew/io, eth/async_utils,
libp2p/[multiaddress, multicodec],
libp2p/[multiaddress, multicodec, peerinfo],
ssz
export
multiaddress
multiaddress, peerinfo
when networkBackend == libp2p:
import
libp2p/standard_setup, libp2p_backend
libp2p/standard_setup, libp2p_backend, peer_pool
export
libp2p_backend
libp2p_backend, peer_pool
else:
import
@ -244,7 +244,10 @@ when networkBackend in [libp2p, libp2pDaemon]:
writeFile(filename, $id.addresses[0] & "/p2p/" & id.peer.pretty)
func peersCount*(node: Eth2Node): int =
node.peers.len
when networkBackend == libp2p:
len(node.peerPool)
else:
node.peers.len
proc subscribe*[MsgType](node: Eth2Node,
topic: string,

View File

@ -13,7 +13,8 @@ import
libp2p/protocols/secure/[secure, secio],
libp2p/protocols/pubsub/[pubsub, floodsub],
libp2p/transports/[transport, tcptransport],
libp2p_json_serialization, eth2_discovery, conf, ssz
libp2p_json_serialization, eth2_discovery, conf, ssz,
peer_pool
import
eth/p2p/discoveryv5/protocol as discv5_protocol
@ -29,7 +30,7 @@ type
switch*: Switch
discovery*: Eth2DiscoveryProtocol
wantedPeers*: int
peers*: Table[PeerID, Peer]
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
libp2pTransportLoops*: seq[Future[void]]
@ -43,6 +44,7 @@ type
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
score*: int
ConnectionState* = enum
None,
@ -125,31 +127,68 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} =
let peerId = peerInfo.peerId
result = node.peers.getOrDefault(peerId)
result = node.peerPool.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerInfo)
node.peers[peerId] = result
proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} =
# TODO: Can this be `nil`?
return network.getPeer(stream.peerInfo)
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
proc getKey*(peer: Peer): PeerID {.inline.} =
result = peer.info.peerId
proc getFuture*(peer: Peer): Future[void] {.inline.} =
result = peer.info.lifeFuture()
proc `<`*(a, b: Peer): bool =
result = `<`(a.score, b.score)
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
await peer.network.switch.disconnect(peer.info)
peer.connectionState = Disconnected
peer.network.peers.del(peer.info.peerId)
peer.network.peerPool.release(peer)
proc safeClose(stream: P2PStream) {.async.} =
if not stream.closed:
await close(stream)
proc handleIncomingPeer*(peer: Peer)
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} =
debug "Peer (outgoing) lost", peer = $peer.info
libp2p_peers.set int64(len(network.peerPool))
let res = await network.peerPool.addOutgoingPeer(peer)
if res:
debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info
peer.getFuture().addCallback(onPeerClosed)
libp2p_peers.set int64(len(network.peerPool))
proc handleIncomingPeer*(peer: Peer) =
let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} =
debug "Peer (incoming) lost", peer = $peer.info
libp2p_peers.set int64(len(network.peerPool))
let res = network.peerPool.addIncomingPeerNoWait(peer)
if res:
debug "Peer (incoming) has been added to PeerPool", peer = $peer.info
peer.getFuture().addCallback(onPeerClosed)
libp2p_peers.set int64(len(network.peerPool))
proc toPeerInfo*(r: enr.TypedRecord): PeerInfo =
if r.secp256k1.isSome:
var pubKey: keys.PublicKey
@ -195,6 +234,8 @@ proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
inc libp2p_successful_dials
debug "Network handshakes completed"
await handleOutgoingPeer(peer)
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
debug "Starting discovery loop"
@ -206,7 +247,6 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
let discoveredPeers = await node.discovery.lookupRandom()
debug "Discovered peers", peer = $discoveredPeers
for peer in discoveredPeers:
debug "Discovered peer", peer = $peer
try:
let peerInfo = peer.record.toTypedRecord.toPeerInfo
if peerInfo != nil and peerInfo.id notin node.switch.connections:
@ -223,9 +263,9 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf,
switch: Switch, ip: IpAddress, privKey: keys.PrivateKey): T =
new result
result.switch = switch
result.peers = initTable[PeerID, Peer]()
result.discovery = Eth2DiscoveryProtocol.new(conf, ip, privKey.data)
result.wantedPeers = conf.maxPeers
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
newSeq result.protocolStates, allProtocols.len
for proto in allProtocols:

View File

@ -369,13 +369,16 @@ proc handleIncomingStream(network: Eth2Node, stream: P2PStream,
# defer: setLogLevel(LogLevel.DEBUG)
# trace "incoming " & `msgNameLit` & " stream"
let peer = peerFromStream(network, stream)
handleIncomingPeer(peer)
defer:
await safeClose(stream)
let
deadline = sleepAsync RESP_TIMEOUT
msgBytes = await readMsgBytes(stream, false, deadline)
peer = peerFromStream(network, stream)
if msgBytes.len == 0:
await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg)

View File

@ -33,6 +33,24 @@ proc fetchAncestorBlocksFromPeer(
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer
proc fetchAncestorBlocksFromNetwork(
network: Eth2Node,
rec: FetchRecord,
responseHandler: FetchAncestorsResponseHandler) {.async.} =
var peer: Peer
try:
peer = await network.peerPool.acquire()
let blocks = await peer.beaconBlocksByRoot([rec.root])
if blocks.isSome:
for b in blocks.get:
responseHandler(b)
except CatchableError as err:
debug "Error while fetching ancestor blocks",
err = err.msg, root = rec.root, peer = peer.info
finally:
if not(isNil(peer)):
network.peerPool.release(peer)
proc fetchAncestorBlocks*(requestManager: RequestManager,
roots: seq[FetchRecord],
responseHandler: FetchAncestorsResponseHandler) =
@ -44,8 +62,8 @@ proc fetchAncestorBlocks*(requestManager: RequestManager,
# * Keep track of the average latency of each peer
# (we can give priority to peers with better latency)
#
const ParallelRequests = 2
for peer in requestManager.network.randomPeers(ParallelRequests, BeaconSync):
traceAsyncErrors peer.fetchAncestorBlocksFromPeer(roots.sample(), responseHandler)
for i in 0 ..< ParallelRequests:
traceAsyncErrors fetchAncestorBlocksFromNetwork(requestManager.network,
roots.sample(),
responseHandler)