use libp2p peer events to track peer (#1468)

this resolves some peer counting issues that were happening because the
lifetime future in PeerInfo was unreliable (multiple PeerInfo instances
existed per peer)

In addition, this solves another race condition: when connecting to a
peer and later dialling that protocol, it is not certain that the same
connection will be used if there's a concurrent incoming peer connection
ongoing - better not make too many assumptions about who sent statuses
when.
This commit is contained in:
Jacek Sieka 2020-08-10 12:58:34 +02:00 committed by GitHub
parent 49428f4925
commit 936440fccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 110 additions and 76 deletions

View File

@ -729,7 +729,6 @@ proc installDebugApiHandlers(rpcServer: RpcServer, node: BeaconNode) =
peers.add(
%(
info: shortLog(peer.info),
wasDialed: peer.wasDialed,
connectionState: $peer.connectionState,
score: peer.score,
)

View File

@ -66,6 +66,7 @@ type
connTable: HashSet[PeerID]
forkId: ENRForkID
rng*: ref BrHmacDrbgContext
peers: Table[PeerID, Peer]
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
@ -85,7 +86,6 @@ type
Peer* = ref object
network*: Eth2Node
info*: PeerInfo
wasDialed*: bool
discoveryId*: Eth2DiscoveryId
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
@ -93,6 +93,8 @@ type
netThroughput: AverageThroughput
score*: int
lacksSnappy: bool
connections*: int
disconnectedFut: Future[void]
PeerAddr* = object
peerId*: PeerID
@ -148,7 +150,7 @@ type
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.}
OnPeerConnectedHandler* = proc(peer: Peer, conn: Connection): Future[void] {.gcsafe.}
OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe.}
OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe.}
@ -295,10 +297,11 @@ proc openStream(node: Eth2Node,
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peerPool.getOrDefault(peerId)
if result == nil:
# TODO: We should register this peer in the pool!
result = Peer.init(node, PeerInfo.init(peerId))
node.peers.withValue(peerId, peer) do:
return peer[]
do:
let peer = Peer.init(node, PeerInfo.init(peerId))
return node.peers.mGetOrPut(peerId, peer)
proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} =
# TODO: Can this be `nil`?
@ -308,7 +311,9 @@ proc getKey*(peer: Peer): PeerID {.inline.} =
result = peer.info.peerId
proc getFuture*(peer: Peer): Future[void] {.inline.} =
result = peer.info.lifeFuture()
if peer.disconnectedFut.isNil:
peer.disconnectedFut = newFuture[void]()
result = peer.disconnectedFut
proc getScore*(a: Peer): int =
## Returns current score value for peer ``peer``.
@ -390,7 +395,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
of FaultOrError:
SeemTableTimeFaultOrError
peer.network.addSeen(peer.info.peerId, seenTime)
peer.info.close()
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
@ -537,13 +541,12 @@ template send*[M](r: SingleChunkResponse[M], val: auto): untyped =
doAssert UntypedResponse(r).writtenChunks == 0
sendResponseChunkObj(UntypedResponse(r), val)
proc performProtocolHandshakes*(peer: Peer) {.async.} =
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
proc performProtocolHandshakes*(peer: Peer, incoming: bool) {.async.} =
# Loop down serially because it's easier to reason about the connection state
# when there are fewer async races, specially during setup
for protocol in allProtocols:
if protocol.onPeerConnected != nil:
subProtocolsHandshakes.add protocol.onPeerConnected(peer, nil)
await allFuturesThrowing(subProtocolsHandshakes)
await protocol.onPeerConnected(peer, incoming)
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
@ -688,7 +691,7 @@ proc handleIncomingStream(network: Eth2Node,
finally:
await safeClose(conn)
proc handleOutgoingPeer*(peer: Peer): Future[bool] {.async.} =
proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} =
@ -704,7 +707,7 @@ proc handleOutgoingPeer*(peer: Peer): Future[bool] {.async.} =
nbc_peers.set int64(len(network.peerPool))
proc handleIncomingPeer*(peer: Peer): Future[bool] {.async.} =
proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network
proc onPeerClosed(udata: pointer) {.gcsafe.} =
@ -754,18 +757,18 @@ proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr) {.async.} =
logScope: peer = peerAddr.peerId
debug "Connecting to discovered peer"
# TODO connect is called here, but there's no guarantee that the connection
# we get when using dialPeer later on is the one we just connected
let peer = node.getPeer(peerAddr.peerId)
await node.switch.connect(peerAddr.peerId, peerAddr.addrs)
var peer = node.getPeer(peerAddr.peerId)
peer.wasDialed = true
#let msDial = newMultistream()
#let conn = node.switch.connections.getOrDefault(peerInfo.id)
#let ls = await msDial.list(conn)
#debug "Supported protocols", ls
debug "Initializing connection"
await performProtocolHandshakes(peer)
inc nbc_successful_dials
successfullyDialledAPeer = true
debug "Network handshakes completed"
@ -850,6 +853,50 @@ proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata =
else:
result = Json.loadFile(metadataPath, Eth2Metadata)
proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
let peer = node.getPeer(peerId)
case event.kind
of ConnEventKind.Connected:
inc peer.connections
debug "Peer upgraded", peer = peerId, connections = peer.connections
if peer.connections == 1:
# Libp2p may connect multiple times to the same peer - using different
# transports or both incoming and outgoing. For now, we'll count our
# "fist" encounter with the peer as the true connection, leaving the
# other connections be - libp2p limits the number of concurrent
# connections to the same peer, and only one of these connections will be
# active. Nonetheless, this quirk will cause a number of odd behaviours:
# * For peer limits, we might miscount the incoming vs outgoing quota
# * Protocol handshakes are wonky: we'll not necessarily use the newly
# connected transport - instead we'll just pick a random one!
await performProtocolHandshakes(peer, event.incoming)
# While performing the handshake, the peer might have been disconnected -
# there's still a slim chance of a race condition here if a reconnect
# happens quickly
if peer.connections == 1:
# TODO when the pool is full, adding it will block - this means peers
# will be left in limbo until some other peer makes room for it
let added = if event.incoming:
await handleIncomingPeer(peer)
else:
await handleOutgoingPeer(peer)
if not added:
# We must have hit a limit!
await peer.disconnect(FaultOrError)
of ConnEventKind.Disconnected:
dec peer.connections
debug "Peer disconnected", peer = peerId, connections = peer.connections
if peer.connections == 0:
let fut = peer.disconnectedFut
if fut != nil:
peer.disconnectedFut = nil
fut.complete()
proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
switch: Switch, ip: Option[ValidIpAddress], tcpPort, udpPort: Port,
privKey: keys.PrivateKey, rng: ref BrHmacDrbgContext): T =
@ -878,12 +925,16 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
if msg.protocolMounter != nil:
msg.protocolMounter result
let node = result
proc peerHook(peerId: PeerID, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(node, peerId, event)
switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
template publicKey*(node: Eth2Node): keys.PublicKey =
node.discovery.privKey.toPublicKey
template addKnownPeer*(node: Eth2Node, peer: enr.Record) =
node.discovery.addNode peer
proc startListening*(node: Eth2Node) {.async.} =
node.discovery.open()
node.libp2pTransportLoops = await node.switch.start()
@ -956,9 +1007,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
result.SerializationFormat = Format
result.RequestResultsWrapper = ident "NetRes"
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(streamVar, Connection)
result.implementMsg = proc (msg: p2p_protocol_dsl.Message) =
if msg.kind == msgResponse:
return

View File

@ -40,7 +40,6 @@ type
forkDigest*: ForkDigest
BeaconSyncPeerState* = ref object
initialStatusReceived*: bool
statusMsg*: StatusMsg
BlockRootSlot* = object
@ -89,21 +88,32 @@ p2pProtocol BeaconSync(version = 1,
networkState = BeaconSyncNetworkState,
peerState = BeaconSyncPeerState):
onPeerConnected do (peer: Peer) {.async.}:
onPeerConnected do (peer: Peer, incoming: bool) {.async.}:
debug "Peer connected",
peer, peerInfo = shortLog(peer.info), wasDialed = peer.wasDialed
if peer.wasDialed:
let
ourStatus = peer.networkState.getCurrentStatus()
# TODO: The timeout here is so high only because we fail to
# respond in time due to high CPU load in our single thread.
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
peer, peerInfo = shortLog(peer.info), incoming
# Per the eth2 protocol, whoever dials must send a status message when
# connected for the first time, but because of how libp2p works, there may
# be a race between incoming and outgoing connections and disconnects that
# makes the incoming flag unreliable / obsolete by the time we get to
# this point - instead of making assumptions, we'll just send a status
# message redundantly.
# TODO the spec does not prohibit sending the extra status message on
# incoming connections, but it should not be necessary - this would
# need a dedicated flow in libp2p that resolves the race conditions -
# this needs more thinking around the ordering of events and the
# given incoming flag
let
ourStatus = peer.networkState.getCurrentStatus()
# TODO: The timeout here is so high only because we fail to
# respond in time due to high CPU load in our single thread.
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
if theirStatus.isOk:
await peer.handleStatus(peer.networkState,
ourStatus, theirStatus.get())
else:
warn "Status response not received in time", peer
if theirStatus.isOk:
await peer.handleStatus(peer.networkState,
ourStatus, theirStatus.get())
else:
warn "Status response not received in time",
peer, error = theirStatus.error
proc status(peer: Peer,
theirStatus: StatusMsg,
@ -178,7 +188,6 @@ p2pProtocol BeaconSync(version = 1,
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
debug "Peer status", peer, statusMsg
peer.state(BeaconSync).initialStatusReceived = true
peer.state(BeaconSync).statusMsg = statusMsg
proc updateStatus*(peer: Peer): Future[bool] {.async.} =
@ -197,10 +206,6 @@ proc updateStatus*(peer: Peer): Future[bool] {.async.} =
peer.setStatusMsg(theirStatus.get)
result = true
proc hasInitialStatus*(peer: Peer): bool {.inline.} =
## Returns head slot for specific peer ``peer``.
peer.state(BeaconSync).initialStatusReceived
proc getHeadSlot*(peer: Peer): Slot {.inline.} =
## Returns head slot for specific peer ``peer``.
result = peer.state(BeaconSync).statusMsg.headSlot
@ -213,22 +218,6 @@ proc handleStatus(peer: Peer,
notice "Irrelevant peer", peer, theirStatus, ourStatus
await peer.disconnect(IrrelevantNetwork)
else:
if not peer.state(BeaconSync).initialStatusReceived:
# Initial/handshake status message handling
peer.state(BeaconSync).initialStatusReceived = true
debug "Peer connected", peer, ourStatus = shortLog(ourStatus),
theirStatus = shortLog(theirStatus)
var res: bool
if peer.wasDialed:
res = await handleOutgoingPeer(peer)
else:
res = await handleIncomingPeer(peer)
if not res:
debug "Peer is dead or already in pool", peer
# TODO: DON NOT DROP THE PEER!
# await peer.disconnect(ClientShutDown)
peer.setStatusMsg(theirStatus)
proc initBeaconSync*(network: Eth2Node, chainDag: ChainDAGRef,

View File

@ -1,5 +1,5 @@
## Generated at line 88
## Generated at line 87
type
BeaconSync* = object
template State*(PROTO: type BeaconSync): type =
@ -365,7 +365,7 @@ registerMsg(BeaconSyncProtocol, "beaconBlocksByRoot", beaconBlocksByRootMounter,
"/eth2/beacon_chain/req/beacon_blocks_by_root/1/")
registerMsg(BeaconSyncProtocol, "goodbye", goodbyeMounter,
"/eth2/beacon_chain/req/goodbye/1/")
proc BeaconSyncPeerConnected(peer: Peer; stream: Connection) {.async, gcsafe.} =
proc BeaconSyncPeerConnected(peer: Peer; incoming: bool) {.async, gcsafe.} =
type
CurrentProtocol = BeaconSync
template state(peer: Peer): ref[BeaconSyncPeerState:ObjectType] =
@ -375,16 +375,14 @@ proc BeaconSyncPeerConnected(peer: Peer; stream: Connection) {.async, gcsafe.} =
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
BeaconSyncProtocol))
debug "Peer connected", peer, peerInfo = shortLog(peer.info),
wasDialed = peer.wasDialed
if peer.wasDialed:
let
ourStatus = peer.networkState.getCurrentStatus()
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
if theirStatus.isOk:
await peer.handleStatus(peer.networkState, ourStatus, theirStatus.get())
else:
warn "Status response not received in time", peer
debug "Peer connected", peer, peerInfo = shortLog(peer.info), incoming
let
ourStatus = peer.networkState.getCurrentStatus()
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
if theirStatus.isOk:
await peer.handleStatus(peer.networkState, ourStatus, theirStatus.get())
else:
warn "Status response not received in time", peer, error = theirStatus.error
setEventHandlers(BeaconSyncProtocol, BeaconSyncPeerConnected, nil)
registerProtocol(BeaconSyncProtocol)

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 7c2ab38da113f4974663c75ef818f0e59ab24126
Subproject commit 6ffd5be0598d8c1043c456a2b87bfdb753906c2e

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit ec2f52b0cea1f1daa33f38ca4ba289d8f40f4104
Subproject commit 4c695e59338d752c178934f4b215d43ba72244e4