Add rlpx metrics and avoid immediate peer reconnections (#585)

* Add metrics related to devp2p peer connections

* Avoid reconnecting to peers that just failed connection

- Add SeenTable to avoid reconnecting to peers immediately after
a failed connect. Depending on the failure, the amount of time is
different. This is similar to what is done in nimbus-eth2.
- Attempt to rework rlpxConnect at the same time, in order to
make sure that errors are properly handled. The current structure
is far from ideal, but it is hopefully a small step in the right
direction. To many oddities in there right now to really rework
rlpxConnect properply.

* Fix rlpx thunk fuzzer
This commit is contained in:
Kim De Mey 2023-03-16 16:45:12 +01:00 committed by GitHub
parent 29b14749fa
commit d2ba753792
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 274 additions and 109 deletions

View File

@ -9,12 +9,15 @@
import
std/[tables, hashes, times, algorithm, sets, sequtils],
chronos, chronicles, stint, nimcrypto/keccak,
chronos, chronicles, stint, nimcrypto/keccak, metrics,
../keys, ./discoveryv5/random2,
./enode
export sets # TODO: This should not be needed, but compilation fails otherwise
declarePublicGauge routing_table_nodes,
"Discovery routing table nodes"
logScope:
topics = "eth p2p kademlia"
@ -215,6 +218,7 @@ proc add(k: KBucket, n: Node): Node =
k.nodes.add(n)
elif k.len < BUCKET_SIZE:
k.nodes.add(n)
routing_table_nodes.inc()
else:
k.replacementCache.add(n)
return k.head
@ -222,7 +226,9 @@ proc add(k: KBucket, n: Node): Node =
proc removeNode(k: KBucket, n: Node) =
let i = k.nodes.find(n)
if i != -1: k.nodes.delete(i)
if i != -1:
routing_table_nodes.dec()
k.nodes.delete(i)
proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split at the median id

View File

@ -26,6 +26,42 @@ const
maxConcurrentConnectionRequests = 40
sleepBeforeTryingARandomBootnode = chronos.milliseconds(3000)
## Period of time for dead / unreachable peers.
SeenTableTimeDeadPeer = chronos.minutes(10)
## Period of time for Useless peers, either because of no matching
## capabilities or on an irrelevant network.
SeenTableTimeUselessPeer = chronos.hours(24)
## Period of time for peers with a protocol error.
SeenTableTimeProtocolError = chronos.minutes(30)
## Period of time for peers with general disconnections / transport errors.
SeenTableTimeReconnect = chronos.minutes(5)
proc isSeen(p: PeerPool, nodeId: NodeId): bool =
## Returns ``true`` if ``nodeId`` present in SeenTable and time period is not
## yet expired.
let currentTime = now(chronos.Moment)
if nodeId notin p.seenTable:
false
else:
let item = try: p.seenTable[nodeId]
except KeyError: raiseAssert "checked with notin"
if currentTime >= item.stamp:
# Peer is in SeenTable, but the time period has expired.
p.seenTable.del(nodeId)
false
else:
true
proc addSeen(
p: PeerPool, nodeId: NodeId, period: chronos.Duration) =
## Adds peer with NodeId ``nodeId`` to SeenTable and timeout ``period``.
let item = SeenNode(nodeId: nodeId, stamp: now(chronos.Moment) + period)
withValue(p.seenTable, nodeId, entry) do:
if entry.stamp < item.stamp:
entry.stamp = item.stamp
do:
p.seenTable[nodeId] = item
proc newPeerPool*(
network: EthereumNode, networkId: NetworkId, keyPair: KeyPair,
discovery: DiscoveryProtocol, clientId: string, minPeers = 10): PeerPool =
@ -84,28 +120,36 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} =
# debug "skipping connection"
return nil
if p.isSeen(remote.id):
return nil
trace "Connecting to node", remote
p.connectingNodes.incl(remote)
result = await p.network.rlpxConnect(remote)
let res = await p.network.rlpxConnect(remote)
p.connectingNodes.excl(remote)
# expected_exceptions = (
# UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure)
# try:
# self.logger.debug("Connecting to %s...", remote)
# peer = await wait_with_token(
# handshake(remote, self.privkey, self.peer_class, self.network_id),
# token=self.cancel_token,
# timeout=HANDSHAKE_TIMEOUT)
# return peer
# except OperationCancelled:
# # Pass it on to instruct our main loop to stop.
# raise
# except expected_exceptions as e:
# self.logger.debug("Could not complete handshake with %s: %s", remote, repr(e))
# except Exception:
# self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote)
# return None
# TODO: Probably should move all this logic to rlpx.nim
if res.isOk():
rlpx_connect_success.inc()
return res.get()
else:
rlpx_connect_failure.inc()
rlpx_connect_failure.inc(labelValues = [$res.error])
case res.error():
of UselessRlpxPeerError:
p.addSeen(remote.id, SeenTableTimeUselessPeer)
of TransportConnectError:
p.addSeen(remote.id, SeenTableTimeDeadPeer)
of RlpxHandshakeError, ProtocolError, InvalidIdentityError:
p.addSeen(remote.id, SeenTableTimeProtocolError)
of RlpxHandshakeTransportError,
P2PHandshakeError,
P2PTransportError,
PeerDisconnectedError,
TooManyPeersError:
p.addSeen(remote.id, SeenTableTimeReconnect)
return nil
proc lookupRandomNode(p: PeerPool) {.async.} =
discard await p.discovery.lookupRandom()
@ -118,7 +162,7 @@ proc getRandomBootnode(p: PeerPool): Option[Node] =
proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe.} =
doAssert(peer.remote notin pool.connectedNodes)
pool.connectedNodes[peer.remote] = peer
connected_peers.inc()
rlpx_connected_peers.inc()
for o in pool.observers.values:
if not o.onPeerConnected.isNil:
if o.protocol.isNil or peer.supports(o.protocol):

View File

@ -53,6 +53,10 @@ type
when useSnappy:
snappyEnabled*: bool
SeenNode* = object
nodeId*: NodeId
stamp*: chronos.Moment
PeerPool* = ref object
# Private fields:
network*: EthereumNode
@ -63,6 +67,7 @@ type
discovery*: DiscoveryProtocol
lastLookupTime*: float
connQueue*: AsyncQueue[Node]
seenTable*: Table[NodeId, SeenNode]
connectedNodes*: Table[Node, Peer]
connectingNodes*: HashSet[Node]
running*: bool

View File

@ -8,8 +8,8 @@
{.push raises: [Defect].}
import
std/[tables, algorithm, deques, hashes, options, typetraits],
stew/shims/macros, chronicles, nimcrypto/utils, chronos,
std/[tables, algorithm, deques, hashes, options, typetraits, os],
stew/shims/macros, chronicles, nimcrypto/utils, chronos, metrics,
".."/[rlp, common, keys, async_utils],
./private/p2p_types, "."/[kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl]
@ -24,6 +24,9 @@ const
# github.com/ethereum/devp2p/blob/master/rlpx.md#framing
allowObsoletedChunkedMessages = defined(chunked_rlpx_enabled)
# TODO: This doesn't get enabled currently in any of the builds, so we send a
# devp2p protocol handshake message with version. Need to check if some peers
# drop us because of this.
when useSnappy:
import snappy
const devp2pSnappyVersion* = 5
@ -34,7 +37,20 @@ when useSnappy:
export
options, p2pProtocol, rlp, chronicles
declarePublicGauge connected_peers, "number of peers in the pool"
declarePublicGauge rlpx_connected_peers,
"Number of connected peers in the pool"
declarePublicCounter rlpx_connect_success,
"Number of successfull rlpx connects"
declarePublicCounter rlpx_connect_failure,
"Number of rlpx connects that failed", labels = ["reason"]
declarePublicCounter rlpx_accept_success,
"Number of successful rlpx accepted peers"
declarePublicCounter rlpx_accept_failure,
"Number of rlpx accept attempts that failed", labels = ["reason"]
logScope:
topics = "eth p2p rlpx"
@ -185,6 +201,10 @@ proc handshakeImpl[T](peer: Peer,
doAssert timeout.milliseconds > 0
yield responseFut or sleepAsync(timeout)
if not responseFut.finished:
# TODO: Really shouldn't disconnect and raise everywhere. In order to avoid
# understanding what error occured where.
# And also, incoming and outgoing disconnect errors should be seperated,
# probably by seperating the actual disconnect call to begin with.
await disconnectAndRaise(peer, HandshakeTimeout,
"Protocol handshake was not received in time.")
elif responseFut.failed:
@ -708,6 +728,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
"Invalid RLPx message body")
elif nextMsgId == 1: # p2p.disconnect
# TODO: can still raise RlpError here...?
let reasonList = nextMsgData.read(DisconnectionReasonList)
let reason = reasonList.value
await peer.disconnect(reason)
@ -716,6 +737,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} =
else:
warn "Dropped RLPX message",
msg = peer.dispatcher.messages[nextMsgId].name
# TODO: This is breach of protocol?
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific RLPx message.
@ -1048,7 +1070,7 @@ proc removePeer(network: EthereumNode, peer: Peer) =
if network.peerPool != nil and not peer.remote.isNil and
peer.remote in network.peerPool.connectedNodes:
network.peerPool.connectedNodes.del(peer.remote)
connected_peers.dec()
rlpx_connected_peers.dec()
# Note: we need to do this check as disconnect (and thus removePeer)
# currently can get called before the dispatcher is initialized.
@ -1104,11 +1126,11 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
peer.connectionState = Disconnected
removePeer(peer.network, peer)
proc validatePubKeyInHello(msg: DevP2P.hello, pubKey: PublicKey): bool =
func validatePubKeyInHello(msg: DevP2P.hello, pubKey: PublicKey): bool =
let pk = PublicKey.fromRaw(msg.nodeId)
pk.isOk and pk[] == pubKey
proc checkUselessPeer(peer: Peer) {.raises: [UselessPeerError, Defect].} =
func checkUselessPeer(peer: Peer) {.raises: [UselessPeerError, Defect].} =
if peer.dispatcher.numProtocols == 0:
# XXX: Send disconnect + UselessPeer
raise newException(UselessPeerError, "Useless peer")
@ -1183,13 +1205,12 @@ template `^`(arr): auto =
# variable as an open array
arr.toOpenArray(0, `arr Len` - 1)
proc initSecretState(hs: var Handshake, authMsg, ackMsg: openArray[byte],
p: Peer) =
proc initSecretState(p: Peer, hs: Handshake, authMsg, ackMsg: openArray[byte]) =
var secrets = hs.getSecrets(authMsg, ackMsg)
initSecretState(secrets, p.secretsState)
burnMem(secrets)
template checkSnappySupport(node: EthereumNode, handshake: Handshake, peer: Peer) =
template setSnappySupport(peer: Peer, node: EthereumNode, handshake: Handshake) =
when useSnappy:
peer.snappyEnabled = node.protocolVersion >= devp2pSnappyVersion.uint and
handshake.version >= devp2pSnappyVersion.uint
@ -1213,106 +1234,170 @@ template baseProtocolVersion(peer: Peer): uint =
else:
devp2pVersion
proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} =
type
RlpxError* = enum
TransportConnectError,
RlpxHandshakeTransportError,
RlpxHandshakeError,
ProtocolError,
P2PHandshakeError,
P2PTransportError,
InvalidIdentityError,
UselessRlpxPeerError,
PeerDisconnectedError,
TooManyPeersError
proc rlpxConnect*(node: EthereumNode, remote: Node):
Future[Result[Peer, RlpxError]] {.async.} =
# TODO: Should we not set some timeouts on the `connect` and `readExactly`s?
# Or should we have a general timeout on the whole rlpxConnect where it gets
# called?
# Now, some parts could potential hang until a tcp timeout is hit?
initTracing(devp2pInfo, node.protocols)
let peer = Peer(remote: remote, network: node)
let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort)
var ok = false
try:
peer.transport = await connect(ta)
var handshake = Handshake.init(
var error = true
defer:
if error: # TODO: Not sure if I like this much
if not isNil(peer.transport):
if not peer.transport.closed:
peer.transport.close()
peer.transport =
try:
await connect(ta)
except TransportError:
return err(TransportConnectError)
except CatchableError as e:
# Aside from TransportOsError, seems raw CatchableError can also occur?
debug "TCP connect with peer failed", err = $e.name, errMsg = $e.msg
return err(TransportConnectError)
# RLPx initial handshake
var
handshake = Handshake.init(
node.rng[], node.keys, {Initiator, EIP8}, node.baseProtocolVersion)
authMsg: array[AuthMessageMaxEIP8, byte]
authMsgLen = 0
# TODO: Rework this so we won't have to pass an array as parameter?
authMessage(
handshake, node.rng[], remote.node.pubkey, authMsg, authMsgLen).tryGet()
var authMsg: array[AuthMessageMaxEIP8, byte]
var authMsgLen = 0
authMessage(
handshake, node.rng[], remote.node.pubkey, authMsg, authMsgLen).tryGet()
var res = await peer.transport.write(addr authMsg[0], authMsgLen)
if res != authMsgLen:
raisePeerDisconnected("Unexpected disconnect while authenticating",
TcpError)
let writeRes =
try:
await peer.transport.write(addr authMsg[0], authMsgLen)
except TransportError:
return err(RlpxHandshakeTransportError)
except CatchableError as e: # TODO: Only TransportErrors can occur?
raiseAssert($e.name & " " & $e.msg)
if writeRes != authMsgLen:
return err(RlpxHandshakeTransportError)
let initialSize = handshake.expectedLength
var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(initialSize)
let initialSize = handshake.expectedLength
var ackMsg = newSeqOfCap[byte](1024)
ackMsg.setLen(initialSize)
# TODO: Should we not set some timeouts on these `readExactly`s?
try:
await peer.transport.readExactly(addr ackMsg[0], len(ackMsg))
except TransportError:
return err(RlpxHandshakeTransportError)
except CatchableError as e:
raiseAssert($e.name & " " & $e.msg)
var ret = handshake.decodeAckMessage(ackMsg)
if ret.isErr and ret.error == AuthError.IncompleteError:
ackMsg.setLen(handshake.expectedLength)
let res = handshake.decodeAckMessage(ackMsg)
if res.isErr and res.error == AuthError.IncompleteError:
ackMsg.setLen(handshake.expectedLength)
try:
await peer.transport.readExactly(addr ackMsg[initialSize],
len(ackMsg) - initialSize)
ret = handshake.decodeAckMessage(ackMsg)
except TransportError:
return err(RlpxHandshakeTransportError)
except CatchableError as e: # TODO: Only TransportErrors can occur?
raiseAssert($e.name & " " & $e.msg)
if ret.isErr():
debug "rlpxConnect handshake error", error = ret.error
if not isNil(peer.transport):
peer.transport.close()
return nil
# TODO: Bullet 1 of https://github.com/status-im/nim-eth/issues/559
let res = handshake.decodeAckMessage(ackMsg)
if res.isErr():
debug "rlpxConnect handshake error", error = res.error
return err(RlpxHandshakeError)
ret.get()
peer.setSnappySupport(node, handshake)
peer.initSecretState(handshake, ^authMsg, ackMsg)
node.checkSnappySupport(handshake, peer)
initSecretState(handshake, ^authMsg, ackMsg, peer)
logConnectedPeer peer
# if handshake.remoteHPubkey != remote.node.pubKey:
# raise newException(Exception, "Remote pubkey is wrong")
logConnectedPeer peer
var sendHelloFut = peer.hello(
# RLPx p2p capability handshake: After the initial handshake, both sides of
# the connection must send either Hello or a Disconnect message.
let
sendHelloFut = peer.hello(
handshake.getVersion(),
node.clientId,
node.capabilities,
uint(node.address.tcpPort),
node.keys.pubkey.toRaw())
var response = await peer.handshakeImpl(
sendHelloFut,
peer.waitSingleMsg(DevP2P.hello),
10.seconds)
receiveHelloFut = peer.waitSingleMsg(DevP2P.hello)
if not validatePubKeyInHello(response, remote.node.pubkey):
warn "Remote nodeId is not its public key" # XXX: Do we care?
response =
try:
await peer.handshakeImpl(
sendHelloFut,
receiveHelloFut,
10.seconds)
except RlpError:
return err(ProtocolError)
except PeerDisconnected as e:
return err(PeerDisconnectedError)
# TODO: Strange compiler error
# case e.reason:
# of HandshakeTimeout:
# # Yeah, a bit odd but in this case PeerDisconnected comes from a
# # timeout on the P2P Hello message. TODO: Clean-up that handshakeImpl
# return err(P2PHandshakeError)
# of TooManyPeers:
# return err(TooManyPeersError)
# else:
# return err(PeerDisconnectedError)
except TransportError:
return err(P2PTransportError)
except CatchableError as e:
raiseAssert($e.name & " " & $e.msg)
trace "DevP2P handshake completed", peer = remote,
clientId = response.clientId
if not validatePubKeyInHello(response, remote.node.pubkey):
warn "Wrong devp2p identity in Hello message"
return err(InvalidIdentityError)
trace "DevP2P handshake completed", peer = remote,
clientId = response.clientId
try:
await postHelloSteps(peer, response)
ok = true
trace "Peer fully connected", peer = remote, clientId = response.clientId
except RlpError:
return err(ProtocolError)
except PeerDisconnected as e:
case e.reason
of AlreadyConnected, TooManyPeers, MessageTimeout:
trace "Disconnect during rlpxConnect", reason = e.reason, peer = remote
case e.reason:
of TooManyPeers:
return err(TooManyPeersError)
else:
debug "Unexpected disconnect during rlpxConnect", reason = e.reason,
msg = e.msg, peer = remote
except TransportIncompleteError:
trace "Connection dropped in rlpxConnect", remote
return err(PeerDisconnectedError)
except UselessPeerError:
trace "Disconnecting useless peer", peer = remote
except RlpTypeMismatch:
# Some peers report capabilities with names longer than 3 chars. We ignore
# those for now. Maybe we should allow this though.
debug "Rlp error in rlpxConnect"
except TransportOsError as e:
trace "TransportOsError", err = e.msg
return err(UselessRlpxPeerError)
except TransportError:
return err(P2PTransportError)
except CatchableError as e:
error "Unexpected exception in rlpxConnect", remote, exc = e.name,
err = e.msg
raiseAssert($e.name & " " & $e.msg)
if not ok:
if not isNil(peer.transport):
peer.transport.close()
return nil
else:
return peer
debug "Peer fully connected", peer = remote, clientId = response.clientId
proc rlpxAccept*(node: EthereumNode,
transport: StreamTransport): Future[Peer] {.async.} =
error = false
return ok(peer)
# TODO: rework rlpxAccept similar to rlpxConnect.
proc rlpxAccept*(
node: EthereumNode, transport: StreamTransport): Future[Peer] {.async.} =
initTracing(devp2pInfo, node.protocols)
let peer = Peer(transport: transport, network: node)
@ -1340,11 +1425,14 @@ proc rlpxAccept*(node: EthereumNode,
trace "rlpxAccept handshake error", error = ret.error
if not isNil(peer.transport):
peer.transport.close()
rlpx_accept_failure.inc()
rlpx_accept_failure.inc(labelValues = ["handshake_error"])
return nil
ret.get()
node.checkSnappySupport(handshake, peer)
peer.setSnappySupport(node, handshake)
handshake.version = uint8(peer.baseProtocolVersion)
var ackMsg: array[AckMessageMaxEIP8, byte]
@ -1355,7 +1443,7 @@ proc rlpxAccept*(node: EthereumNode,
raisePeerDisconnected("Unexpected disconnect while authenticating",
TcpError)
initSecretState(handshake, authMsg, ^ackMsg, peer)
peer.initSecretState(handshake, authMsg, ^ackMsg)
let listenPort = transport.localAddress().port
@ -1407,24 +1495,37 @@ proc rlpxAccept*(node: EthereumNode,
else:
debug "Unexpected disconnect during rlpxAccept", reason = e.reason,
msg = e.msg, peer = peer.remote
rlpx_accept_failure.inc(labelValues = [$e.reason])
except TransportIncompleteError:
trace "Connection dropped in rlpxAccept", remote = peer.remote
rlpx_accept_failure.inc(labelValues = [$TransportIncompleteError])
except UselessPeerError:
trace "Disconnecting useless peer", peer = peer.remote
except RlpTypeMismatch:
rlpx_accept_failure.inc(labelValues = [$UselessPeerError])
except RlpTypeMismatch as e:
# Some peers report capabilities with names longer than 3 chars. We ignore
# those for now. Maybe we should allow this though.
debug "Rlp error in rlpxAccept"
debug "Rlp error in rlpxAccept", err = e.msg, errName = e.name
rlpx_accept_failure.inc(labelValues = [$RlpTypeMismatch])
except TransportOsError as e:
trace "TransportOsError", err = e.msg
trace "TransportOsError", err = e.msg, errName = e.name
if e.code == OSErrorCode(110):
rlpx_accept_failure.inc(labelValues = ["tcp_timeout"])
else:
rlpx_accept_failure.inc(labelValues = [$e.name])
except CatchableError as e:
error "Unexpected exception in rlpxAccept", exc = e.name, err = e.msg
rlpx_accept_failure.inc(labelValues = [$e.name])
if not ok:
if not isNil(peer.transport):
peer.transport.close()
rlpx_accept_failure.inc()
return nil
else:
rlpx_accept_success.inc()
return peer
when isMainModule:

View File

@ -18,7 +18,11 @@ init:
node2 = setupTestNode(rng, eth)
node2.startListening()
peer = waitFor node1.rlpxConnect(newNode(node2.toENode()))
let res = waitFor node1.rlpxConnect(newNode(node2.toENode()))
if res.isErr():
quit 1
else:
peer = res.get()
test:
aflLoop: # This appears to have unstable results with afl-clang-fast, probably

View File

@ -65,9 +65,11 @@ suite "Testing protocol handlers":
var node2 = setupTestNode(rng, abc, xyz)
node2.startListening()
let peer = await node1.rlpxConnect(newNode(node2.toENode()))
let res = await node1.rlpxConnect(newNode(node2.toENode()))
check:
peer.isNil == false
res.isOk()
let peer = res.get()
await peer.disconnect(SubprotocolReason, true)
check:
@ -82,9 +84,9 @@ suite "Testing protocol handlers":
var node1 = setupTestNode(rng, hah)
var node2 = setupTestNode(rng, hah)
node2.startListening()
let peer = await node1.rlpxConnect(newNode(node2.toENode()))
let res = await node1.rlpxConnect(newNode(node2.toENode()))
check:
peer.isNil == true
res.isErr()
# To check if the disconnection handler did not run
node1.protocolState(hah).count == 0

View File

@ -16,7 +16,10 @@ var
node2 = setupTestNode(rng, eth)
node2.startListening()
var peer = waitFor node1.rlpxConnect(newNode(node2.toENode()))
let res = waitFor node1.rlpxConnect(newNode(node2.toENode()))
check res.isOk()
let peer = res.get()
proc testThunk(payload: openArray[byte]) =
var (msgId, msgData) = recvMsgMock(payload)