diff --git a/eth/p2p/kademlia.nim b/eth/p2p/kademlia.nim index 3496cf3..0ec3324 100644 --- a/eth/p2p/kademlia.nim +++ b/eth/p2p/kademlia.nim @@ -9,12 +9,15 @@ import std/[tables, hashes, times, algorithm, sets, sequtils], - chronos, chronicles, stint, nimcrypto/keccak, + chronos, chronicles, stint, nimcrypto/keccak, metrics, ../keys, ./discoveryv5/random2, ./enode export sets # TODO: This should not be needed, but compilation fails otherwise +declarePublicGauge routing_table_nodes, + "Discovery routing table nodes" + logScope: topics = "eth p2p kademlia" @@ -215,6 +218,7 @@ proc add(k: KBucket, n: Node): Node = k.nodes.add(n) elif k.len < BUCKET_SIZE: k.nodes.add(n) + routing_table_nodes.inc() else: k.replacementCache.add(n) return k.head @@ -222,7 +226,9 @@ proc add(k: KBucket, n: Node): Node = proc removeNode(k: KBucket, n: Node) = let i = k.nodes.find(n) - if i != -1: k.nodes.delete(i) + if i != -1: + routing_table_nodes.dec() + k.nodes.delete(i) proc split(k: KBucket): tuple[lower, upper: KBucket] = ## Split at the median id diff --git a/eth/p2p/peer_pool.nim b/eth/p2p/peer_pool.nim index 98a587b..93570d4 100644 --- a/eth/p2p/peer_pool.nim +++ b/eth/p2p/peer_pool.nim @@ -26,6 +26,42 @@ const maxConcurrentConnectionRequests = 40 sleepBeforeTryingARandomBootnode = chronos.milliseconds(3000) + ## Period of time for dead / unreachable peers. + SeenTableTimeDeadPeer = chronos.minutes(10) + ## Period of time for Useless peers, either because of no matching + ## capabilities or on an irrelevant network. + SeenTableTimeUselessPeer = chronos.hours(24) + ## Period of time for peers with a protocol error. + SeenTableTimeProtocolError = chronos.minutes(30) + ## Period of time for peers with general disconnections / transport errors. + SeenTableTimeReconnect = chronos.minutes(5) + +proc isSeen(p: PeerPool, nodeId: NodeId): bool = + ## Returns ``true`` if ``nodeId`` present in SeenTable and time period is not + ## yet expired. + let currentTime = now(chronos.Moment) + if nodeId notin p.seenTable: + false + else: + let item = try: p.seenTable[nodeId] + except KeyError: raiseAssert "checked with notin" + if currentTime >= item.stamp: + # Peer is in SeenTable, but the time period has expired. + p.seenTable.del(nodeId) + false + else: + true + +proc addSeen( + p: PeerPool, nodeId: NodeId, period: chronos.Duration) = + ## Adds peer with NodeId ``nodeId`` to SeenTable and timeout ``period``. + let item = SeenNode(nodeId: nodeId, stamp: now(chronos.Moment) + period) + withValue(p.seenTable, nodeId, entry) do: + if entry.stamp < item.stamp: + entry.stamp = item.stamp + do: + p.seenTable[nodeId] = item + proc newPeerPool*( network: EthereumNode, networkId: NetworkId, keyPair: KeyPair, discovery: DiscoveryProtocol, clientId: string, minPeers = 10): PeerPool = @@ -84,28 +120,36 @@ proc connect(p: PeerPool, remote: Node): Future[Peer] {.async.} = # debug "skipping connection" return nil + if p.isSeen(remote.id): + return nil + trace "Connecting to node", remote p.connectingNodes.incl(remote) - result = await p.network.rlpxConnect(remote) + let res = await p.network.rlpxConnect(remote) p.connectingNodes.excl(remote) - # expected_exceptions = ( - # UnreachablePeer, TimeoutError, PeerConnectionLost, HandshakeFailure) - # try: - # self.logger.debug("Connecting to %s...", remote) - # peer = await wait_with_token( - # handshake(remote, self.privkey, self.peer_class, self.network_id), - # token=self.cancel_token, - # timeout=HANDSHAKE_TIMEOUT) - # return peer - # except OperationCancelled: - # # Pass it on to instruct our main loop to stop. - # raise - # except expected_exceptions as e: - # self.logger.debug("Could not complete handshake with %s: %s", remote, repr(e)) - # except Exception: - # self.logger.exception("Unexpected error during auth/p2p handshake with %s", remote) - # return None + # TODO: Probably should move all this logic to rlpx.nim + if res.isOk(): + rlpx_connect_success.inc() + return res.get() + else: + rlpx_connect_failure.inc() + rlpx_connect_failure.inc(labelValues = [$res.error]) + case res.error(): + of UselessRlpxPeerError: + p.addSeen(remote.id, SeenTableTimeUselessPeer) + of TransportConnectError: + p.addSeen(remote.id, SeenTableTimeDeadPeer) + of RlpxHandshakeError, ProtocolError, InvalidIdentityError: + p.addSeen(remote.id, SeenTableTimeProtocolError) + of RlpxHandshakeTransportError, + P2PHandshakeError, + P2PTransportError, + PeerDisconnectedError, + TooManyPeersError: + p.addSeen(remote.id, SeenTableTimeReconnect) + + return nil proc lookupRandomNode(p: PeerPool) {.async.} = discard await p.discovery.lookupRandom() @@ -118,7 +162,7 @@ proc getRandomBootnode(p: PeerPool): Option[Node] = proc addPeer*(pool: PeerPool, peer: Peer) {.gcsafe.} = doAssert(peer.remote notin pool.connectedNodes) pool.connectedNodes[peer.remote] = peer - connected_peers.inc() + rlpx_connected_peers.inc() for o in pool.observers.values: if not o.onPeerConnected.isNil: if o.protocol.isNil or peer.supports(o.protocol): diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index 91d1ead..a84068f 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -53,6 +53,10 @@ type when useSnappy: snappyEnabled*: bool + SeenNode* = object + nodeId*: NodeId + stamp*: chronos.Moment + PeerPool* = ref object # Private fields: network*: EthereumNode @@ -63,6 +67,7 @@ type discovery*: DiscoveryProtocol lastLookupTime*: float connQueue*: AsyncQueue[Node] + seenTable*: Table[NodeId, SeenNode] connectedNodes*: Table[Node, Peer] connectingNodes*: HashSet[Node] running*: bool diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index d4616ad..d00a330 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -8,8 +8,8 @@ {.push raises: [Defect].} import - std/[tables, algorithm, deques, hashes, options, typetraits], - stew/shims/macros, chronicles, nimcrypto/utils, chronos, + std/[tables, algorithm, deques, hashes, options, typetraits, os], + stew/shims/macros, chronicles, nimcrypto/utils, chronos, metrics, ".."/[rlp, common, keys, async_utils], ./private/p2p_types, "."/[kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl] @@ -24,6 +24,9 @@ const # github.com/ethereum/devp2p/blob/master/rlpx.md#framing allowObsoletedChunkedMessages = defined(chunked_rlpx_enabled) +# TODO: This doesn't get enabled currently in any of the builds, so we send a +# devp2p protocol handshake message with version. Need to check if some peers +# drop us because of this. when useSnappy: import snappy const devp2pSnappyVersion* = 5 @@ -34,7 +37,20 @@ when useSnappy: export options, p2pProtocol, rlp, chronicles -declarePublicGauge connected_peers, "number of peers in the pool" +declarePublicGauge rlpx_connected_peers, + "Number of connected peers in the pool" + +declarePublicCounter rlpx_connect_success, + "Number of successfull rlpx connects" + +declarePublicCounter rlpx_connect_failure, + "Number of rlpx connects that failed", labels = ["reason"] + +declarePublicCounter rlpx_accept_success, + "Number of successful rlpx accepted peers" + +declarePublicCounter rlpx_accept_failure, + "Number of rlpx accept attempts that failed", labels = ["reason"] logScope: topics = "eth p2p rlpx" @@ -185,6 +201,10 @@ proc handshakeImpl[T](peer: Peer, doAssert timeout.milliseconds > 0 yield responseFut or sleepAsync(timeout) if not responseFut.finished: + # TODO: Really shouldn't disconnect and raise everywhere. In order to avoid + # understanding what error occured where. + # And also, incoming and outgoing disconnect errors should be seperated, + # probably by seperating the actual disconnect call to begin with. await disconnectAndRaise(peer, HandshakeTimeout, "Protocol handshake was not received in time.") elif responseFut.failed: @@ -708,6 +728,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} = "Invalid RLPx message body") elif nextMsgId == 1: # p2p.disconnect + # TODO: can still raise RlpError here...? let reasonList = nextMsgData.read(DisconnectionReasonList) let reason = reasonList.value await peer.disconnect(reason) @@ -716,6 +737,7 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} = else: warn "Dropped RLPX message", msg = peer.dispatcher.messages[nextMsgId].name + # TODO: This is breach of protocol? proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = ## This procs awaits a specific RLPx message. @@ -1048,7 +1070,7 @@ proc removePeer(network: EthereumNode, peer: Peer) = if network.peerPool != nil and not peer.remote.isNil and peer.remote in network.peerPool.connectedNodes: network.peerPool.connectedNodes.del(peer.remote) - connected_peers.dec() + rlpx_connected_peers.dec() # Note: we need to do this check as disconnect (and thus removePeer) # currently can get called before the dispatcher is initialized. @@ -1104,11 +1126,11 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, peer.connectionState = Disconnected removePeer(peer.network, peer) -proc validatePubKeyInHello(msg: DevP2P.hello, pubKey: PublicKey): bool = +func validatePubKeyInHello(msg: DevP2P.hello, pubKey: PublicKey): bool = let pk = PublicKey.fromRaw(msg.nodeId) pk.isOk and pk[] == pubKey -proc checkUselessPeer(peer: Peer) {.raises: [UselessPeerError, Defect].} = +func checkUselessPeer(peer: Peer) {.raises: [UselessPeerError, Defect].} = if peer.dispatcher.numProtocols == 0: # XXX: Send disconnect + UselessPeer raise newException(UselessPeerError, "Useless peer") @@ -1183,13 +1205,12 @@ template `^`(arr): auto = # variable as an open array arr.toOpenArray(0, `arr Len` - 1) -proc initSecretState(hs: var Handshake, authMsg, ackMsg: openArray[byte], - p: Peer) = +proc initSecretState(p: Peer, hs: Handshake, authMsg, ackMsg: openArray[byte]) = var secrets = hs.getSecrets(authMsg, ackMsg) initSecretState(secrets, p.secretsState) burnMem(secrets) -template checkSnappySupport(node: EthereumNode, handshake: Handshake, peer: Peer) = +template setSnappySupport(peer: Peer, node: EthereumNode, handshake: Handshake) = when useSnappy: peer.snappyEnabled = node.protocolVersion >= devp2pSnappyVersion.uint and handshake.version >= devp2pSnappyVersion.uint @@ -1213,106 +1234,170 @@ template baseProtocolVersion(peer: Peer): uint = else: devp2pVersion -proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = +type + RlpxError* = enum + TransportConnectError, + RlpxHandshakeTransportError, + RlpxHandshakeError, + ProtocolError, + P2PHandshakeError, + P2PTransportError, + InvalidIdentityError, + UselessRlpxPeerError, + PeerDisconnectedError, + TooManyPeersError + +proc rlpxConnect*(node: EthereumNode, remote: Node): + Future[Result[Peer, RlpxError]] {.async.} = + # TODO: Should we not set some timeouts on the `connect` and `readExactly`s? + # Or should we have a general timeout on the whole rlpxConnect where it gets + # called? + # Now, some parts could potential hang until a tcp timeout is hit? initTracing(devp2pInfo, node.protocols) let peer = Peer(remote: remote, network: node) let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort) - var ok = false - try: - peer.transport = await connect(ta) - var handshake = Handshake.init( + var error = true + + defer: + if error: # TODO: Not sure if I like this much + if not isNil(peer.transport): + if not peer.transport.closed: + peer.transport.close() + + peer.transport = + try: + await connect(ta) + except TransportError: + return err(TransportConnectError) + except CatchableError as e: + # Aside from TransportOsError, seems raw CatchableError can also occur? + debug "TCP connect with peer failed", err = $e.name, errMsg = $e.msg + return err(TransportConnectError) + + # RLPx initial handshake + var + handshake = Handshake.init( node.rng[], node.keys, {Initiator, EIP8}, node.baseProtocolVersion) + authMsg: array[AuthMessageMaxEIP8, byte] + authMsgLen = 0 + # TODO: Rework this so we won't have to pass an array as parameter? + authMessage( + handshake, node.rng[], remote.node.pubkey, authMsg, authMsgLen).tryGet() - var authMsg: array[AuthMessageMaxEIP8, byte] - var authMsgLen = 0 - authMessage( - handshake, node.rng[], remote.node.pubkey, authMsg, authMsgLen).tryGet() - var res = await peer.transport.write(addr authMsg[0], authMsgLen) - if res != authMsgLen: - raisePeerDisconnected("Unexpected disconnect while authenticating", - TcpError) + let writeRes = + try: + await peer.transport.write(addr authMsg[0], authMsgLen) + except TransportError: + return err(RlpxHandshakeTransportError) + except CatchableError as e: # TODO: Only TransportErrors can occur? + raiseAssert($e.name & " " & $e.msg) + if writeRes != authMsgLen: + return err(RlpxHandshakeTransportError) - let initialSize = handshake.expectedLength - var ackMsg = newSeqOfCap[byte](1024) - ackMsg.setLen(initialSize) + let initialSize = handshake.expectedLength + var ackMsg = newSeqOfCap[byte](1024) + ackMsg.setLen(initialSize) - # TODO: Should we not set some timeouts on these `readExactly`s? + try: await peer.transport.readExactly(addr ackMsg[0], len(ackMsg)) + except TransportError: + return err(RlpxHandshakeTransportError) + except CatchableError as e: + raiseAssert($e.name & " " & $e.msg) - var ret = handshake.decodeAckMessage(ackMsg) - if ret.isErr and ret.error == AuthError.IncompleteError: - ackMsg.setLen(handshake.expectedLength) + let res = handshake.decodeAckMessage(ackMsg) + if res.isErr and res.error == AuthError.IncompleteError: + ackMsg.setLen(handshake.expectedLength) + try: await peer.transport.readExactly(addr ackMsg[initialSize], len(ackMsg) - initialSize) - ret = handshake.decodeAckMessage(ackMsg) + except TransportError: + return err(RlpxHandshakeTransportError) + except CatchableError as e: # TODO: Only TransportErrors can occur? + raiseAssert($e.name & " " & $e.msg) - if ret.isErr(): - debug "rlpxConnect handshake error", error = ret.error - if not isNil(peer.transport): - peer.transport.close() - return nil + # TODO: Bullet 1 of https://github.com/status-im/nim-eth/issues/559 + let res = handshake.decodeAckMessage(ackMsg) + if res.isErr(): + debug "rlpxConnect handshake error", error = res.error + return err(RlpxHandshakeError) - ret.get() + peer.setSnappySupport(node, handshake) + peer.initSecretState(handshake, ^authMsg, ackMsg) - node.checkSnappySupport(handshake, peer) - initSecretState(handshake, ^authMsg, ackMsg, peer) + logConnectedPeer peer - # if handshake.remoteHPubkey != remote.node.pubKey: - # raise newException(Exception, "Remote pubkey is wrong") - logConnectedPeer peer - - var sendHelloFut = peer.hello( + # RLPx p2p capability handshake: After the initial handshake, both sides of + # the connection must send either Hello or a Disconnect message. + let + sendHelloFut = peer.hello( handshake.getVersion(), node.clientId, node.capabilities, uint(node.address.tcpPort), node.keys.pubkey.toRaw()) - var response = await peer.handshakeImpl( - sendHelloFut, - peer.waitSingleMsg(DevP2P.hello), - 10.seconds) + receiveHelloFut = peer.waitSingleMsg(DevP2P.hello) - if not validatePubKeyInHello(response, remote.node.pubkey): - warn "Remote nodeId is not its public key" # XXX: Do we care? + response = + try: + await peer.handshakeImpl( + sendHelloFut, + receiveHelloFut, + 10.seconds) + except RlpError: + return err(ProtocolError) + except PeerDisconnected as e: + return err(PeerDisconnectedError) + # TODO: Strange compiler error + # case e.reason: + # of HandshakeTimeout: + # # Yeah, a bit odd but in this case PeerDisconnected comes from a + # # timeout on the P2P Hello message. TODO: Clean-up that handshakeImpl + # return err(P2PHandshakeError) + # of TooManyPeers: + # return err(TooManyPeersError) + # else: + # return err(PeerDisconnectedError) + except TransportError: + return err(P2PTransportError) + except CatchableError as e: + raiseAssert($e.name & " " & $e.msg) - trace "DevP2P handshake completed", peer = remote, - clientId = response.clientId + if not validatePubKeyInHello(response, remote.node.pubkey): + warn "Wrong devp2p identity in Hello message" + return err(InvalidIdentityError) + trace "DevP2P handshake completed", peer = remote, + clientId = response.clientId + + try: await postHelloSteps(peer, response) - ok = true - trace "Peer fully connected", peer = remote, clientId = response.clientId + except RlpError: + return err(ProtocolError) except PeerDisconnected as e: - case e.reason - of AlreadyConnected, TooManyPeers, MessageTimeout: - trace "Disconnect during rlpxConnect", reason = e.reason, peer = remote + case e.reason: + of TooManyPeers: + return err(TooManyPeersError) else: - debug "Unexpected disconnect during rlpxConnect", reason = e.reason, - msg = e.msg, peer = remote - except TransportIncompleteError: - trace "Connection dropped in rlpxConnect", remote + return err(PeerDisconnectedError) except UselessPeerError: - trace "Disconnecting useless peer", peer = remote - except RlpTypeMismatch: - # Some peers report capabilities with names longer than 3 chars. We ignore - # those for now. Maybe we should allow this though. - debug "Rlp error in rlpxConnect" - except TransportOsError as e: - trace "TransportOsError", err = e.msg + return err(UselessRlpxPeerError) + except TransportError: + return err(P2PTransportError) except CatchableError as e: - error "Unexpected exception in rlpxConnect", remote, exc = e.name, - err = e.msg + raiseAssert($e.name & " " & $e.msg) - if not ok: - if not isNil(peer.transport): - peer.transport.close() - return nil - else: - return peer + debug "Peer fully connected", peer = remote, clientId = response.clientId -proc rlpxAccept*(node: EthereumNode, - transport: StreamTransport): Future[Peer] {.async.} = + error = false + + return ok(peer) + +# TODO: rework rlpxAccept similar to rlpxConnect. +proc rlpxAccept*( + node: EthereumNode, transport: StreamTransport): Future[Peer] {.async.} = initTracing(devp2pInfo, node.protocols) let peer = Peer(transport: transport, network: node) @@ -1340,11 +1425,14 @@ proc rlpxAccept*(node: EthereumNode, trace "rlpxAccept handshake error", error = ret.error if not isNil(peer.transport): peer.transport.close() + + rlpx_accept_failure.inc() + rlpx_accept_failure.inc(labelValues = ["handshake_error"]) return nil ret.get() - node.checkSnappySupport(handshake, peer) + peer.setSnappySupport(node, handshake) handshake.version = uint8(peer.baseProtocolVersion) var ackMsg: array[AckMessageMaxEIP8, byte] @@ -1355,7 +1443,7 @@ proc rlpxAccept*(node: EthereumNode, raisePeerDisconnected("Unexpected disconnect while authenticating", TcpError) - initSecretState(handshake, authMsg, ^ackMsg, peer) + peer.initSecretState(handshake, authMsg, ^ackMsg) let listenPort = transport.localAddress().port @@ -1407,24 +1495,37 @@ proc rlpxAccept*(node: EthereumNode, else: debug "Unexpected disconnect during rlpxAccept", reason = e.reason, msg = e.msg, peer = peer.remote + + rlpx_accept_failure.inc(labelValues = [$e.reason]) except TransportIncompleteError: trace "Connection dropped in rlpxAccept", remote = peer.remote + rlpx_accept_failure.inc(labelValues = [$TransportIncompleteError]) except UselessPeerError: trace "Disconnecting useless peer", peer = peer.remote - except RlpTypeMismatch: + rlpx_accept_failure.inc(labelValues = [$UselessPeerError]) + except RlpTypeMismatch as e: # Some peers report capabilities with names longer than 3 chars. We ignore # those for now. Maybe we should allow this though. - debug "Rlp error in rlpxAccept" + debug "Rlp error in rlpxAccept", err = e.msg, errName = e.name + rlpx_accept_failure.inc(labelValues = [$RlpTypeMismatch]) except TransportOsError as e: - trace "TransportOsError", err = e.msg + trace "TransportOsError", err = e.msg, errName = e.name + if e.code == OSErrorCode(110): + rlpx_accept_failure.inc(labelValues = ["tcp_timeout"]) + else: + rlpx_accept_failure.inc(labelValues = [$e.name]) except CatchableError as e: error "Unexpected exception in rlpxAccept", exc = e.name, err = e.msg + rlpx_accept_failure.inc(labelValues = [$e.name]) if not ok: if not isNil(peer.transport): peer.transport.close() + + rlpx_accept_failure.inc() return nil else: + rlpx_accept_success.inc() return peer when isMainModule: diff --git a/tests/fuzzing/rlpx/thunk.nim b/tests/fuzzing/rlpx/thunk.nim index adcf980..6764af4 100644 --- a/tests/fuzzing/rlpx/thunk.nim +++ b/tests/fuzzing/rlpx/thunk.nim @@ -18,7 +18,11 @@ init: node2 = setupTestNode(rng, eth) node2.startListening() - peer = waitFor node1.rlpxConnect(newNode(node2.toENode())) + let res = waitFor node1.rlpxConnect(newNode(node2.toENode())) + if res.isErr(): + quit 1 + else: + peer = res.get() test: aflLoop: # This appears to have unstable results with afl-clang-fast, probably diff --git a/tests/p2p/test_protocol_handlers.nim b/tests/p2p/test_protocol_handlers.nim index 85c24d9..b15c97d 100644 --- a/tests/p2p/test_protocol_handlers.nim +++ b/tests/p2p/test_protocol_handlers.nim @@ -65,9 +65,11 @@ suite "Testing protocol handlers": var node2 = setupTestNode(rng, abc, xyz) node2.startListening() - let peer = await node1.rlpxConnect(newNode(node2.toENode())) + let res = await node1.rlpxConnect(newNode(node2.toENode())) check: - peer.isNil == false + res.isOk() + + let peer = res.get() await peer.disconnect(SubprotocolReason, true) check: @@ -82,9 +84,9 @@ suite "Testing protocol handlers": var node1 = setupTestNode(rng, hah) var node2 = setupTestNode(rng, hah) node2.startListening() - let peer = await node1.rlpxConnect(newNode(node2.toENode())) + let res = await node1.rlpxConnect(newNode(node2.toENode())) check: - peer.isNil == true + res.isErr() # To check if the disconnection handler did not run node1.protocolState(hah).count == 0 diff --git a/tests/p2p/test_rlpx_thunk.nim b/tests/p2p/test_rlpx_thunk.nim index f73ad2c..344e9d0 100644 --- a/tests/p2p/test_rlpx_thunk.nim +++ b/tests/p2p/test_rlpx_thunk.nim @@ -16,7 +16,10 @@ var node2 = setupTestNode(rng, eth) node2.startListening() -var peer = waitFor node1.rlpxConnect(newNode(node2.toENode())) +let res = waitFor node1.rlpxConnect(newNode(node2.toENode())) +check res.isOk() + +let peer = res.get() proc testThunk(payload: openArray[byte]) = var (msgId, msgData) = recvMsgMock(payload)