sync only from prysm

This commit is contained in:
Agnish Ghosh 2024-10-17 17:09:07 +05:30
parent a479c7bdf4
commit b2bb6ac863
4 changed files with 191 additions and 18 deletions

View File

@ -0,0 +1,142 @@
# beacon_chain
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import stew/base10
import std/tables
import libp2p/[multiaddress, multicodec, peerstore]
type
Eth2Agent* {.pure.} = enum
Unknown,
Nimbus,
Lighthouse,
Prysm,
Teku,
Lodestar,
Grandine
func `$`*(a: Eth2Agent): string =
case a
of Eth2Agent.Unknown:
"pending/unknown"
of Eth2Agent.Nimbus:
"nimbus"
of Eth2Agent.Lighthouse:
"lighthouse"
of Eth2Agent.Prysm:
"prysm"
of Eth2Agent.Teku:
"teku"
of Eth2Agent.Lodestar:
"lodestar"
of Eth2Agent.Grandine:
"grandine"
const
# Lighthouse errors could be found here
# https://github.com/sigp/lighthouse/blob/5fdd3b39bb8150d1ea8622e42e0166ed46af7693/beacon_node/lighthouse_network/src/rpc/methods.rs#L171
LighthouseErrors = [
(128'u64, "Unable to verify network"),
(129'u64, "The node has too many connected peers"),
(250'u64, "Peer score is too low"),
(251'u64, "The peer is banned"),
(252'u64, "The IP address the peer is using is banned"),
].toTable()
# Prysm errors could be found here
# https://github.com/prysmaticlabs/prysm/blob/7a394062e1054d73014e793819cb9cf0d20ff2e3/beacon-chain/p2p/types/rpc_goodbye_codes.go#L12
PrysmErrors = [
(128'u64, "Unable to verify network"),
(129'u64, "The node has too many connected peers"),
(250'u64, "Peer score is too low"),
(251'u64, "The peer is banned")
].toTable()
# Lodestar errors could be found here
# https://github.com/ChainSafe/lodestar/blob/7280234bea66b49da3900b916a1b54c4666e4173/packages/beacon-node/src/constants/network.ts#L20
LodestarErrors = [
(128'u64, "Unable to verify network"),
(129'u64, "The node has too many connected peers"),
(250'u64, "Peer score is too low"),
(251'u64, "The peer is banned")
].toTable()
# Teku errors could be found here
# https://github.com/Consensys/teku/blob/a3f7ebc75f24ec942286b0c1ae192e411f84aa7e/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/GoodbyeMessage.java#L42
TekuErrors = [
(128'u64, "Unable to verify network"),
(129'u64, "The node has too many connected peers"),
(130'u64, "Too many requests from the peer")
].toTable()
# Nimbus errors could be found here
# https://github.com/status-im/nimbus-eth2/blob/9b6b42c8f9792e657397bb3669a80b57da470c04/beacon_chain/networking/eth2_network.nim#L176
NimbusErrors = [
(237'u64, "Peer score is too low")
].toTable()
# Grandine errors could be found here
# https://github.com/grandinetech/eth2_libp2p/blob/63a0c5e662847b86b1d5617478e39bccd39df0a9/src/rpc/methods.rs#L246
GrandineErrors = [
(128'u64, "Unable to verify network"),
(129'u64, "The node has too many connected peers"),
(250'u64, "Peer score is too low"),
(251'u64, "The peer is banned"),
(252'u64, "The IP address the peer is using is banned"),
].toTable()
# This is combination of all the errors, we need it when remote agent is not
# identified yet.
UnknownErrors = [
(128'u64, "Unable to verify network"),
(129'u64, "The node has too many connected peers"),
(130'u64, "Too many requests from the peer"),
(237'u64, "Peer score is too low"),
(250'u64, "Peer score is too low"),
(251'u64, "The peer is banned"),
(252'u64, "The IP address the peer is using is banned"),
].toTable()
func disconnectReasonName*(agent: Eth2Agent, code: uint64): string =
if code < 128'u64:
case code
of 0'u64:
"Unknown error (0)"
of 1'u64:
"Client shutdown (1)"
of 2'u64:
"Irrelevant network (2)"
of 3'u64:
"Fault or error (3)"
else:
let
scode = " (" & Base10.toString(code) & ")"
defaultMessage = "Disconnected"
defaultMessage & scode
else:
let
scode = " (" & Base10.toString(code) & ")"
defaultMessage = "Disconnected"
case agent
of Eth2Agent.Unknown:
UnknownErrors.getOrDefault(code, defaultMessage) & scode
of Eth2Agent.Nimbus:
NimbusErrors.getOrDefault(code, defaultMessage) & scode
of Eth2Agent.Lighthouse:
LighthouseErrors.getOrDefault(code, defaultMessage) & scode
of Eth2Agent.Prysm:
PrysmErrors.getOrDefault(code, defaultMessage) & scode
of Eth2Agent.Teku:
TekuErrors.getOrDefault(code, defaultMessage) & scode
of Eth2Agent.Lodestar:
LodestarErrors.getOrDefault(code, defaultMessage) & scode
of Eth2Agent.Grandine:
GrandineErrors.getOrDefault(code, defaultMessage) & scode

View File

@ -30,12 +30,13 @@ import
../spec/[eth2_ssz_serialization, network, ../spec/[eth2_ssz_serialization, network,
helpers, forks], helpers, forks],
../validators/keystore_management, ../validators/keystore_management,
"."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] "."/[eth2_discovery, eth2_protocol_dsl, eth2_agents,
libp2p_json_serialization, peer_pool, peer_scores]
export export
tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol, tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol,
connection, libp2p_json_serialization, eth2_ssz_serialization, results, connection, libp2p_json_serialization, eth2_ssz_serialization, results,
eth2_discovery, peer_pool, peer_scores eth2_discovery, peer_pool, peer_scores, eth2_agents
logScope: logScope:
topics = "networking" topics = "networking"
@ -97,6 +98,7 @@ type
Peer* = ref object Peer* = ref object
network*: Eth2Node network*: Eth2Node
peerId*: PeerId peerId*: PeerId
remoteAgent*: Eth2Agent
discoveryId*: Eth2DiscoveryId discoveryId*: Eth2DiscoveryId
connectionState*: ConnectionState connectionState*: ConnectionState
protocolStates*: seq[RootRef] protocolStates*: seq[RootRef]
@ -337,6 +339,31 @@ func shortProtocolId(protocolId: string): string =
protocolId.high protocolId.high
protocolId[start..ends] protocolId[start..ends]
proc updateAgent*(peer: Peer) =
let
agent = toLowerAscii(peer.network.switch.peerStore[AgentBook][peer.peerId])
# proto = peer.network.switch.peerStore[ProtoVersionBook][peer.peerId]
if "nimbus" in agent:
peer.remoteAgent = Eth2Agent.Nimbus
elif "lighthouse" in agent:
peer.remoteAgent = Eth2Agent.Lighthouse
elif "teku" in agent:
peer.remoteAgent = Eth2Agent.Teku
elif "lodestar" in agent:
peer.remoteAgent = Eth2Agent.Lodestar
elif "prysm" in agent:
peer.remoteAgent = Eth2Agent.Prysm
elif "grandine" in agent:
peer.remoteAgent = Eth2Agent.Grandine
else:
peer.remoteAgent = Eth2Agent.Unknown
proc getRemoteAgent*(peer: Peer): Eth2Agent =
if peer.remoteAgent == Eth2Agent.Unknown:
peer.updateAgent()
peer.remoteAgent
proc openStream(node: Eth2Node, proc openStream(node: Eth2Node,
peer: Peer, peer: Peer,
protocolId: string): Future[NetRes[Connection]] protocolId: string): Future[NetRes[Connection]]

View File

@ -8,7 +8,7 @@
{.push raises: [].} {.push raises: [].}
import import
chronicles, chronicles, stew/base10, metrics,
../spec/network, ../spec/network,
../spec/datatypes/[eip7594], ../spec/datatypes/[eip7594],
".."/[beacon_clock], ".."/[beacon_clock],
@ -38,6 +38,10 @@ type
statusLastTime: chronos.Moment statusLastTime: chronos.Moment
statusMsg: StatusMsg statusMsg: StatusMsg
declareCounter nbc_disconnects_count,
"Number disconnected peers", labels = ["agent", "reason"]
func shortLog*(s: StatusMsg): auto = func shortLog*(s: StatusMsg): auto =
( (
forkDigest: s.forkDigest, forkDigest: s.forkDigest,
@ -48,13 +52,6 @@ func shortLog*(s: StatusMsg): auto =
) )
chronicles.formatIt(StatusMsg): shortLog(it) chronicles.formatIt(StatusMsg): shortLog(it)
func disconnectReasonName(reason: uint64): string =
# haha, nim doesn't support uint64 in `case`!
if reason == uint64(ClientShutDown): "Client shutdown"
elif reason == uint64(IrrelevantNetwork): "Irrelevant network"
elif reason == uint64(FaultOrError): "Fault or error"
else: "Disconnected (" & $reason & ")"
func forkDigestAtEpoch(state: PeerSyncNetworkState, func forkDigestAtEpoch(state: PeerSyncNetworkState,
epoch: Epoch): ForkDigest = epoch: Epoch): ForkDigest =
state.forkDigests[].atEpoch(epoch, state.cfg) state.forkDigests[].atEpoch(epoch, state.cfg)
@ -132,9 +129,9 @@ p2pProtocol PeerSync(version = 1,
networkState = PeerSyncNetworkState, networkState = PeerSyncNetworkState,
peerState = PeerSyncPeerState): peerState = PeerSyncPeerState):
onPeerConnected do (peer: Peer, incoming: bool) {.async: (raises: [CancelledError]).}: onPeerConnected do (peer: Peer, incoming: bool) {.
debug "Peer connected", async: (raises: [CancelledError]).}:
peer, peerId = shortLog(peer.peerId), incoming debug "Peer connected", peer, peerId = shortLog(peer.peerId), incoming
# Per the eth2 protocol, whoever dials must send a status message when # Per the eth2 protocol, whoever dials must send a status message when
# connected for the first time, but because of how libp2p works, there may # connected for the first time, but because of how libp2p works, there may
# be a race between incoming and outgoing connections and disconnects that # be a race between incoming and outgoing connections and disconnects that
@ -153,6 +150,7 @@ p2pProtocol PeerSync(version = 1,
if theirStatus.isOk: if theirStatus.isOk:
discard await peer.handleStatus(peer.networkState, theirStatus.get()) discard await peer.handleStatus(peer.networkState, theirStatus.get())
peer.updateAgent()
else: else:
debug "Status response not received in time", debug "Status response not received in time",
peer, errorKind = theirStatus.error.kind peer, errorKind = theirStatus.error.kind
@ -188,9 +186,13 @@ p2pProtocol PeerSync(version = 1,
{.libp2pProtocol("metadata", 3).} = {.libp2pProtocol("metadata", 3).} =
peer.network.metadata peer.network.metadata
proc goodbye(peer: Peer, reason: uint64) proc goodbye(peer: Peer, reason: uint64) {.
{.async, libp2pProtocol("goodbye", 1).} = async, libp2pProtocol("goodbye", 1).} =
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer let remoteAgent = peer.getRemoteAgent()
nbc_disconnects_count.inc(1, [$remoteAgent, Base10.toString(reason)])
debug "Received Goodbye message",
reason = disconnectReasonName(remoteAgent, reason),
remote_agent = $remoteAgent, peer
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) = proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
debug "Peer status", peer, statusMsg debug "Peer status", peer, statusMsg

View File

@ -14,7 +14,8 @@ import
../spec/datatypes/[phase0, altair], ../spec/datatypes/[phase0, altair],
../spec/eth2_apis/rest_types, ../spec/eth2_apis/rest_types,
../spec/[helpers, forks, network, eip7594_helpers], ../spec/[helpers, forks, network, eip7594_helpers],
../networking/[peer_pool, peer_scores, eth2_network], ../networking/[
peer_pool, peer_scores, eth2_network, eth2_agents],
../gossip_processing/block_processor, ../gossip_processing/block_processor,
../beacon_clock, ../beacon_clock,
"."/[sync_protocol, sync_queue] "."/[sync_protocol, sync_queue]
@ -707,6 +708,7 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [Can
await man.notInSyncEvent.wait() await man.notInSyncEvent.wait()
man.workers[index].status = SyncWorkerStatus.WaitingPeer man.workers[index].status = SyncWorkerStatus.WaitingPeer
peer = await man.pool.acquire() peer = await man.pool.acquire()
if peer.remoteAgent == Eth2Agent.Prysm:
await man.syncStep(index, peer) await man.syncStep(index, peer)
man.pool.release(peer) man.pool.release(peer)
peer = nil peer = nil