Avoid global in p2p macro (fixes #4578) (#5719)

* Avoid global in p2p macro (fixes #4578)

* copy p2p macro to this repo and start de-crufting it
* make protocol registration dynamic, removing light client hacks et al
* split out light client protocol into its own file

* cleanups

* Option -> Opt
* remove more cruft

* further split beacon_sync

this allows the light client to respond to peer metadata messages
without exposing the block sync protocol

* better protocol init

* "constant" protocol index

* avoid casts

* copyright

* move some discovery code to discovery

* avoid extraneous data copy when sending chunks

* remove redundant forkdigest field

* document how to connect to a specific peer
This commit is contained in:
Jacek Sieka 2024-01-13 10:54:24 +01:00 committed by GitHub
parent 69af8f943e
commit b98f46c04d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1673 additions and 686 deletions

View File

@ -8,21 +8,24 @@
{.push raises: [].} {.push raises: [].}
import import
chronicles, stew/shims/net, stew/results, std/[algorithm, sequtils],
eth/p2p/discoveryv5/[enr, protocol, node], chronos, chronicles, stew/results,
eth/p2p/discoveryv5/[enr, protocol, node, random2],
../spec/datatypes/altair,
../spec/eth2_ssz_serialization,
".."/[conf, conf_light_client] ".."/[conf, conf_light_client]
from std/os import splitFile from std/os import splitFile
from std/strutils import cmpIgnoreCase, split, startsWith, strip, toLowerAscii from std/strutils import cmpIgnoreCase, split, startsWith, strip, toLowerAscii
export protocol export protocol, node
type type
Eth2DiscoveryProtocol* = protocol.Protocol Eth2DiscoveryProtocol* = protocol.Protocol
Eth2DiscoveryId* = NodeId Eth2DiscoveryId* = NodeId
export export
Eth2DiscoveryProtocol, open, start, close, closeWait, queryRandom, Eth2DiscoveryProtocol, open, start, close, closeWait,
updateRecord, results updateRecord, results
func parseBootstrapAddress*(address: string): func parseBootstrapAddress*(address: string):
@ -100,3 +103,82 @@ proc new*(T: type Eth2DiscoveryProtocol,
newProtocol(pk, enrIp, enrTcpPort, enrUdpPort, enrFields, bootstrapEnrs, newProtocol(pk, enrIp, enrTcpPort, enrUdpPort, enrFields, bootstrapEnrs,
bindPort = config.udpPort, bindIp = config.listenAddress, bindPort = config.udpPort, bindIp = config.listenAddress,
enrAutoUpdate = config.enrAutoUpdate, rng = rng) enrAutoUpdate = config.enrAutoUpdate, rng = rng)
func isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool =
if discoveryForkId.fork_digest == peerForkId.fork_digest:
if discoveryForkId.next_fork_version < peerForkId.next_fork_version:
# Peer knows about a fork and we don't
true
elif discoveryForkId.next_fork_version == peerForkId.next_fork_version:
# We should have the same next_fork_epoch
discoveryForkId.next_fork_epoch == peerForkId.next_fork_epoch
else:
# Our next fork version is bigger than the peer's one
false
else:
# Wrong fork digest
false
proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits,
minScore: int): Future[seq[Node]] {.async.} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
let nodes = await d.queryRandom()
var filtered: seq[(int, Node)]
for n in nodes:
var score: int = 0
let
eth2FieldBytes = n.record.get(enrForkIdField, seq[byte]).valueOr:
continue
peerForkId =
try:
SSZ.decode(eth2FieldBytes, ENRForkID)
except SszError as e:
debug "Could not decode the eth2 field of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
if not forkId.isCompatibleForkId(peerForkId):
continue
let attnetsBytes = n.record.get(enrAttestationSubnetsField, seq[byte])
if attnetsBytes.isOk():
let attnetsNode =
try:
SSZ.decode(attnetsBytes.get(), AttnetBits)
except SszError as e:
debug "Could not decode the attnets ERN bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
for i in 0..<ATTESTATION_SUBNET_COUNT:
if wantedAttnets[i] and attnetsNode[i]:
score += 1
let syncnetsBytes = n.record.get(enrSyncSubnetsField, seq[byte])
if syncnetsBytes.isOk():
let syncnetsNode =
try:
SSZ.decode(syncnetsBytes.get(), SyncnetBits)
except SszError as e:
debug "Could not decode the syncnets ENR bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
for i in SyncSubcommitteeIndex:
if wantedSyncnets[i] and syncnetsNode[i]:
score += 10 # connecting to the right syncnet is urgent
if score >= minScore:
filtered.add((score, n))
d.rng[].shuffle(filtered)
return filtered.sortedByIt(-it[0]).mapIt(it[1])

View File

@ -15,7 +15,7 @@ import
stew/[leb128, endians2, results, byteutils, io2, bitops2], stew/[leb128, endians2, results, byteutils, io2, bitops2],
stew/shims/net as stewNet, stew/shims/net as stewNet,
stew/shims/[macros], stew/shims/[macros],
faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams, snappy,
json_serialization, json_serialization/std/[net, sets, options], json_serialization, json_serialization/std/[net, sets, options],
chronos, chronos/ratelimit, chronicles, metrics, chronos, chronos/ratelimit, chronicles, metrics,
libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto,
@ -23,13 +23,13 @@ import
libp2p/protocols/pubsub/[ libp2p/protocols/pubsub/[
pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer], pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer],
libp2p/stream/connection, libp2p/stream/connection,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, eth/[keys, async_utils],
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[version, conf, beacon_clock, conf_light_client], ".."/[version, conf, beacon_clock, conf_light_client],
../spec/datatypes/[phase0, altair, bellatrix], ../spec/datatypes/[phase0, altair, bellatrix],
../spec/[eth2_ssz_serialization, network, helpers, forks], ../spec/[eth2_ssz_serialization, network, helpers, forks],
../validators/keystore_management, ../validators/keystore_management,
"."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores]
export export
tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol, tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol,
@ -44,7 +44,6 @@ type
PublicKey* = crypto.PublicKey PublicKey* = crypto.PublicKey
PrivateKey* = crypto.PrivateKey PrivateKey* = crypto.PrivateKey
Bytes = seq[byte]
ErrorMsg = List[byte, 256] ErrorMsg = List[byte, 256]
SendResult* = Result[void, cstring] SendResult* = Result[void, cstring]
@ -66,6 +65,8 @@ type
wantedPeers*: int wantedPeers*: int
hardMaxPeers*: int hardMaxPeers*: int
peerPool*: PeerPool[Peer, PeerId] peerPool*: PeerPool[Peer, PeerId]
protocols: seq[ProtocolInfo]
## Protocols managed by the DSL and mounted on the switch
protocolStates*: seq[RootRef] protocolStates*: seq[RootRef]
metadata*: altair.MetaData metadata*: altair.MetaData
connectTimeout*: chronos.Duration connectTimeout*: chronos.Duration
@ -88,8 +89,6 @@ type
quota: TokenBucket ## Global quota mainly for high-bandwidth stuff quota: TokenBucket ## Global quota mainly for high-bandwidth stuff
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
AverageThroughput* = object AverageThroughput* = object
count*: uint64 count*: uint64
average*: float average*: float
@ -105,8 +104,8 @@ type
quota*: TokenBucket quota*: TokenBucket
lastReqTime*: Moment lastReqTime*: Moment
connections*: int connections*: int
enr*: Option[enr.Record] enr*: Opt[enr.Record]
metadata*: Option[altair.MetaData] metadata*: Opt[altair.MetaData]
failedMetadataRequests: int failedMetadataRequests: int
lastMetadataTime*: Moment lastMetadataTime*: Moment
direction*: PeerType direction*: PeerType
@ -144,7 +143,6 @@ type
# Private fields: # Private fields:
libp2pCodecName: string libp2pCodecName: string
protocolMounter*: MounterProc protocolMounter*: MounterProc
isRequired, isLightClientRequest: bool
ProtocolInfoObj* = object ProtocolInfoObj* = object
name*: string name*: string
@ -167,11 +165,11 @@ type
ResourceUnavailable ResourceUnavailable
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [].} PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [].}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe, raises: [].} NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe, raises: [].}
OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe, raises: [].} OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe, raises: [].}
OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe, raises: [].} OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe, raises: [].}
ThunkProc* = LPProtoHandler ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [CatchableError].} MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [].}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [].} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [].}
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#goodbye # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#goodbye
@ -324,9 +322,7 @@ when libp2p_pki_schemes != "secp256k1":
const const
NetworkInsecureKeyPassword = "INSECUREPASSWORD" NetworkInsecureKeyPassword = "INSECUREPASSWORD"
template libp2pProtocol*(name: string, version: int, template libp2pProtocol*(name: string, version: int) {.pragma.}
isRequired = false,
isLightClientRequest = false) {.pragma.}
func shortLog*(peer: Peer): string = shortLog(peer.peerId) func shortLog*(peer: Peer): string = shortLog(peer.peerId)
chronicles.formatIt(Peer): shortLog(it) chronicles.formatIt(Peer): shortLog(it)
@ -354,6 +350,37 @@ proc openStream(node: Eth2Node,
proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.}
proc getState*(peer: Peer, proto: ProtocolInfo): RootRef =
doAssert peer.protocolStates[proto.index] != nil, $proto.index
peer.protocolStates[proto.index]
template state*(peer: Peer, Protocol: type): untyped =
## Returns the state object of a particular protocol for a
## particular connection.
mixin State
bind getState
type S = Protocol.State
S(getState(peer, Protocol.protocolInfo))
proc getNetworkState*(node: Eth2Node, proto: ProtocolInfo): RootRef =
doAssert node.protocolStates[proto.index] != nil, $proto.index
node.protocolStates[proto.index]
template protocolState*(node: Eth2Node, Protocol: type): untyped =
mixin NetworkState
bind getNetworkState
type S = Protocol.NetworkState
S(getNetworkState(node, Protocol.protocolInfo))
proc initProtocolState*[T](state: T, x: Peer|Eth2Node)
{.gcsafe, raises: [].} =
discard
template networkState*(connection: Peer, Protocol: type): untyped =
## Returns the network state object of a particular protocol for a
## particular connection.
protocolState(connection.network, Protocol)
func peerId*(node: Eth2Node): PeerId = func peerId*(node: Eth2Node): PeerId =
node.switch.peerInfo.peerId node.switch.peerInfo.peerId
@ -528,9 +555,6 @@ proc releasePeer*(peer: Peer) =
score_high_limit = PeerScoreHighLimit score_high_limit = PeerScoreHighLimit
asyncSpawn(peer.disconnect(PeerScoreLow)) asyncSpawn(peer.disconnect(PeerScoreLow))
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
proc getRequestProtoName(fn: NimNode): NimNode = proc getRequestProtoName(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue) # (TODO: file as an issue)
@ -547,110 +571,53 @@ proc getRequestProtoName(fn: NimNode): NimNode =
return newLit("") return newLit("")
proc isRequiredProto(fn: NimNode): NimNode = proc add(s: var seq[byte], pos: var int, bytes: openArray[byte]) =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes s[pos..<pos+bytes.len] = bytes
# (TODO: file as an issue) pos += bytes.len
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
try:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
if pragma.len <= 3:
return newLit(false)
for i in 3 ..< pragma.len:
let param = pragma[i]
case param.kind
of nnkExprEqExpr:
if $param[0] == "isRequired":
if $param[1] == "true":
return newLit(true)
if $param[1] == "false":
return newLit(false)
raiseAssert "Unexpected value: " & $param
if $param[0] != "isLightClientRequest":
raiseAssert "Unexpected param: " & $param
of nnkIdent:
if i == 3:
return newLit(param.boolVal)
else: raiseAssert "Unexpected kind: " & param.kind.repr
return newLit(false)
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454
return newLit(false)
proc isLightClientRequestProto(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
try:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
if pragma.len <= 3:
return newLit(false)
for i in 3 ..< pragma.len:
let param = pragma[i]
case param.kind
of nnkExprEqExpr:
if $param[0] == "isLightClientRequest":
if $param[1] == "true":
return newLit(true)
if $param[1] == "false":
return newLit(false)
raiseAssert "Unexpected value: " & $param
if $param[0] != "isRequired":
raiseAssert "Unexpected param: " & $param
of nnkIdent:
if i == 4:
return newLit(param.boolVal)
else: raiseAssert "Unexpected kind: " & param.kind.repr
return newLit(false)
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454
return newLit(false)
proc writeChunkSZ( proc writeChunkSZ(
conn: Connection, responseCode: Option[ResponseCode], conn: Connection, responseCode: Opt[ResponseCode],
uncompressedLen: uint64, payloadSZ: openArray[byte], uncompressedLen: uint64, payloadSZ: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
# max 10 bytes varint length + 1 byte response code + data let
const numOverheadBytes = sizeof(byte) + Leb128.maxLen(typeof(uncompressedLen)) uncompressedLenBytes = toBytes(uncompressedLen, Leb128)
var output = memoryOutput(payloadSZ.len + contextBytes.len + numOverheadBytes)
try: var
data = newSeqUninitialized[byte](
ord(responseCode.isSome) + contextBytes.len + uncompressedLenBytes.len +
payloadSZ.len)
pos = 0
if responseCode.isSome: if responseCode.isSome:
output.write byte(responseCode.get) data.add(pos, [byte responseCode.get])
data.add(pos, contextBytes)
if contextBytes.len > 0: data.add(pos, uncompressedLenBytes.toOpenArray())
output.write contextBytes data.add(pos, payloadSZ)
conn.write(data)
output.write toBytes(uncompressedLen, Leb128).toOpenArray()
output.write payloadSZ
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
conn.write(output.getOutput)
proc writeChunk(conn: Connection, proc writeChunk(conn: Connection,
responseCode: Option[ResponseCode], responseCode: Opt[ResponseCode],
payload: openArray[byte], payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
var output = memoryOutput() let
uncompressedLenBytes = toBytes(payload.lenu64, Leb128)
var
data = newSeqUninitialized[byte](
ord(responseCode.isSome) + contextBytes.len + uncompressedLenBytes.len +
snappy.maxCompressedLenFramed(payload.len).int)
pos = 0
try:
if responseCode.isSome: if responseCode.isSome:
output.write byte(responseCode.get) data.add(pos, [byte responseCode.get])
data.add(pos, contextBytes)
data.add(pos, uncompressedLenBytes.toOpenArray())
let
pre = pos
written = snappy.compressFramed(payload, data.toOpenArray(pos, data.high))
.expect("compression shouldn't fail with correctly preallocated buffer")
data.setLen(pre + written)
if contextBytes.len > 0: conn.write(data)
output.write contextBytes
output.write toBytes(payload.lenu64, Leb128).toOpenArray()
compressFramed(payload, output)
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
conn.write(output.getOutput)
template errorMsgLit(x: static string): ErrorMsg = template errorMsgLit(x: static string): ErrorMsg =
const val = ErrorMsg toBytes(x) const val = ErrorMsg toBytes(x)
@ -671,9 +638,9 @@ proc sendErrorResponse(peer: Peer,
errMsg: ErrorMsg): Future[void] = errMsg: ErrorMsg): Future[void] =
debug "Error processing request", debug "Error processing request",
peer, responseCode, errMsg = formatErrorMsg(errMsg) peer, responseCode, errMsg = formatErrorMsg(errMsg)
conn.writeChunk(some responseCode, SSZ.encode(errMsg)) conn.writeChunk(Opt.some responseCode, SSZ.encode(errMsg))
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async.} = proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte]) {.async.} =
var var
deadline = sleepAsync RESP_TIMEOUT_DUR deadline = sleepAsync RESP_TIMEOUT_DUR
streamFut = peer.network.openStream(peer, protocolId) streamFut = peer.network.openStream(peer, protocolId)
@ -686,7 +653,7 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
let stream = streamFut.read let stream = streamFut.read
try: try:
await stream.writeChunk(none ResponseCode, requestBytes) await stream.writeChunk(Opt.none ResponseCode, requestBytes)
finally: finally:
await stream.close() await stream.close()
@ -696,13 +663,13 @@ proc sendResponseChunkBytesSZ(
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks inc response.writtenChunks
response.stream.writeChunkSZ( response.stream.writeChunkSZ(
some ResponseCode.Success, uncompressedLen, payloadSZ, contextBytes) Opt.some ResponseCode.Success, uncompressedLen, payloadSZ, contextBytes)
proc sendResponseChunkBytes( proc sendResponseChunkBytes(
response: UntypedResponse, payload: openArray[byte], response: UntypedResponse, payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] = contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks inc response.writtenChunks
response.stream.writeChunk(some ResponseCode.Success, payload, contextBytes) response.stream.writeChunk(Opt.some ResponseCode.Success, payload, contextBytes)
proc sendResponseChunk( proc sendResponseChunk(
response: UntypedResponse, val: auto, response: UntypedResponse, val: auto,
@ -712,11 +679,11 @@ proc sendResponseChunk(
template sendUserHandlerResultAsChunkImpl*(stream: Connection, template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResultFut: Future): untyped = handlerResultFut: Future): untyped =
let handlerRes = await handlerResultFut let handlerRes = await handlerResultFut
writeChunk(stream, some ResponseCode.Success, SSZ.encode(handlerRes)) writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerRes))
template sendUserHandlerResultAsChunkImpl*(stream: Connection, template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResult: auto): untyped = handlerResult: auto): untyped =
writeChunk(stream, some ResponseCode.Success, SSZ.encode(handlerResult)) writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerResult))
proc uncompressFramedStream(conn: Connection, proc uncompressFramedStream(conn: Connection,
expectedSize: int): Future[Result[seq[byte], cstring]] expectedSize: int): Future[Result[seq[byte], cstring]]
@ -948,7 +915,7 @@ proc readResponse(conn: Connection, peer: Peer,
return neterr(ReadResponseTimeout) return neterr(ReadResponseTimeout)
return nextFut.read() return nextFut.read()
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte],
ResponseMsg: type, ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]] timeout: Duration): Future[NetRes[ResponseMsg]]
{.async.} = {.async.} =
@ -960,7 +927,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# Some clients don't want a length sent for empty requests # Some clients don't want a length sent for empty requests
# So don't send anything on empty requests # So don't send anything on empty requests
if requestBytes.len > 0: if requestBytes.len > 0:
await stream.writeChunk(none ResponseCode, requestBytes) await stream.writeChunk(Opt.none ResponseCode, requestBytes)
# Half-close the stream to mark the end of the request - if this is not # Half-close the stream to mark the end of the request - if this is not
# done, the other peer might never send us the response. # done, the other peer might never send us the response.
await stream.close() await stream.close()
@ -1023,26 +990,21 @@ template sendSSZ*[M](
proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} = proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} =
# Loop down serially because it's easier to reason about the connection state # Loop down serially because it's easier to reason about the connection state
# when there are fewer async races, specially during setup # when there are fewer async races, specially during setup
for protocol in allProtocols: for protocol in peer.network.protocols:
if protocol.onPeerConnected != nil: if protocol.onPeerConnected != nil:
await protocol.onPeerConnected(peer, incoming) await protocol.onPeerConnected(peer, incoming)
proc initProtocol(name: string, proc initProtocol(name: string,
peerInit: PeerStateInitializer, peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj = networkInit: NetworkStateInitializer,
index: int): ProtocolInfoObj =
ProtocolInfoObj( ProtocolInfoObj(
name: name, name: name,
messages: @[], messages: @[],
index: index,
peerStateInitializer: peerInit, peerStateInitializer: peerInit,
networkStateInitializer: networkInit) networkStateInitializer: networkInit)
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo, proc setEventHandlers(p: ProtocolInfo,
onPeerConnected: OnPeerConnectedHandler, onPeerConnected: OnPeerConnectedHandler,
onPeerDisconnected: OnPeerDisconnectedHandler) = onPeerDisconnected: OnPeerDisconnectedHandler) =
@ -1155,9 +1117,6 @@ proc handleIncomingStream(network: Eth2Node,
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.formatMsg("msg") returnInvalidRequest err.formatMsg("msg")
except SnappyError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
returnInvalidRequest err.msg
finally: finally:
# The request quota is shared between all requests - it represents the # The request quota is shared between all requests - it represents the
# cost to perform a service on behalf of a client and is incurred # cost to perform a service on behalf of a client and is incurred
@ -1226,7 +1185,7 @@ proc handleIncomingStream(network: Eth2Node,
return return
try: try:
logReceivedMsg(peer, MsgType(msg.get)) # logReceivedMsg(peer, MsgType(msg.get))
await callUserHandler(MsgType, peer, conn, msg.get) await callUserHandler(MsgType, peer, conn, msg.get)
except InvalidInputsError as err: except InvalidInputsError as err:
nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)])
@ -1360,85 +1319,6 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] =
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol) let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr) ok(peerAddr)
func isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool =
if discoveryForkId.fork_digest == peerForkId.fork_digest:
if discoveryForkId.next_fork_version < peerForkId.next_fork_version:
# Peer knows about a fork and we don't
true
elif discoveryForkId.next_fork_version == peerForkId.next_fork_version:
# We should have the same next_fork_epoch
discoveryForkId.next_fork_epoch == peerForkId.next_fork_epoch
else:
# Our next fork version is bigger than the peer's one
false
else:
# Wrong fork digest
false
proc queryRandom*(
d: Eth2DiscoveryProtocol,
forkId: ENRForkID,
wantedAttnets: AttnetBits,
wantedSyncnets: SyncnetBits,
minScore: int): Future[seq[Node]] {.async.} =
## Perform a discovery query for a random target
## (forkId) and matching at least one of the attestation subnets.
let nodes = await d.queryRandom()
var filtered: seq[(int, Node)]
for n in nodes:
var score: int = 0
let eth2FieldBytes = n.record.tryGet(enrForkIdField, seq[byte])
if eth2FieldBytes.isNone():
continue
let peerForkId =
try:
SSZ.decode(eth2FieldBytes.get(), ENRForkID)
except SszError as e:
debug "Could not decode the eth2 field of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
if not forkId.isCompatibleForkId(peerForkId):
continue
let attnetsBytes = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
if attnetsBytes.isSome():
let attnetsNode =
try:
SSZ.decode(attnetsBytes.get(), AttnetBits)
except SszError as e:
debug "Could not decode the attnets ERN bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
for i in 0..<ATTESTATION_SUBNET_COUNT:
if wantedAttnets[i] and attnetsNode[i]:
score += 1
let syncnetsBytes = n.record.tryGet(enrSyncSubnetsField, seq[byte])
if syncnetsBytes.isSome():
let syncnetsNode =
try:
SSZ.decode(syncnetsBytes.get(), SyncnetBits)
except SszError as e:
debug "Could not decode the syncnets ENR bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
for i in SyncSubcommitteeIndex:
if wantedSyncnets[i] and syncnetsNode[i]:
score += 10 # connecting to the right syncnet is urgent
if score >= minScore:
filtered.add((score, n))
d.rng[].shuffle(filtered)
return filtered.sortedByIt(-it[0]).mapIt(it[1])
proc trimConnections(node: Eth2Node, count: int) = proc trimConnections(node: Eth2Node, count: int) =
# Kill `count` peers, scoring them to remove the least useful ones # Kill `count` peers, scoring them to remove the least useful ones
@ -1688,7 +1568,7 @@ proc resolvePeer(peer: Peer) =
# already has most recent ENR information about this peer. # already has most recent ENR information about this peer.
let gnode = peer.network.discovery.getNode(nodeId) let gnode = peer.network.discovery.getNode(nodeId)
if gnode.isSome(): if gnode.isSome():
peer.enr = some(gnode.get().record) peer.enr = Opt.some(gnode.get().record)
inc(nbc_successful_discoveries) inc(nbc_successful_discoveries)
let delay = now(chronos.Moment) - startTime let delay = now(chronos.Moment) - startTime
nbc_resolve_time.observe(delay.toFloatSeconds()) nbc_resolve_time.observe(delay.toFloatSeconds())
@ -1854,21 +1734,6 @@ proc new(T: type Eth2Node,
quota: TokenBucket.new(maxGlobalQuota, fullReplenishTime) quota: TokenBucket.new(maxGlobalQuota, fullReplenishTime)
) )
newSeq node.protocolStates, allProtocols.len
for proto in allProtocols:
if proto.networkStateInitializer != nil:
node.protocolStates[proto.index] = proto.networkStateInitializer(node)
for msg in proto.messages:
when config is BeaconNodeConf:
if msg.isLightClientRequest and not config.lightClientDataServe:
continue
elif config is LightClientConf:
if not msg.isRequired:
continue
if msg.protocolMounter != nil:
msg.protocolMounter node
proc peerHook(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe.} = proc peerHook(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(node, peerId, event) onConnEvent(node, peerId, event)
@ -1886,6 +1751,18 @@ proc new(T: type Eth2Node,
node node
proc registerProtocol*(node: Eth2Node, Proto: type, state: Proto.NetworkState) =
# This convoluted registration process is a leftover from the shared p2p macro
# and should be refactored
let proto = Proto.protocolInfo()
node.protocols.add(proto)
node.protocolStates.setLen(max(proto.index + 1, node.protocolStates.len))
node.protocolStates[proto.index] = state
for msg in proto.messages:
if msg.protocolMounter != nil:
msg.protocolMounter node
proc startListening*(node: Eth2Node) {.async.} = proc startListening*(node: Eth2Node) {.async.} =
if node.discoveryEnabled: if node.discoveryEnabled:
try: try:
@ -1958,30 +1835,25 @@ proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer =
connectionState: ConnectionState.None, connectionState: ConnectionState.None,
lastReqTime: now(chronos.Moment), lastReqTime: now(chronos.Moment),
lastMetadataTime: now(chronos.Moment), lastMetadataTime: now(chronos.Moment),
protocolStates: newSeq[RootRef](len(allProtocols)),
quota: TokenBucket.new(maxRequestQuota.int, fullReplenishTime) quota: TokenBucket.new(maxRequestQuota.int, fullReplenishTime)
) )
for i in 0 ..< len(allProtocols): res.protocolStates.setLen(network.protocolStates.len())
let proto = allProtocols[i] for proto in network.protocols:
if not(isNil(proto.peerStateInitializer)): if not(isNil(proto.peerStateInitializer)):
res.protocolStates[i] = proto.peerStateInitializer(res) res.protocolStates[proto.index] = proto.peerStateInitializer(res)
res res
proc registerMsg(protocol: ProtocolInfo, proc registerMsg(protocol: ProtocolInfo,
name: string, name: string,
mounter: MounterProc, mounter: MounterProc,
libp2pCodecName: string, libp2pCodecName: string) =
isRequired, isLightClientRequest: bool) =
protocol.messages.add MessageInfo(name: name, protocol.messages.add MessageInfo(name: name,
protocolMounter: mounter, protocolMounter: mounter,
libp2pCodecName: libp2pCodecName, libp2pCodecName: libp2pCodecName)
isRequired: isRequired,
isLightClientRequest: isLightClientRequest)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var var
Format = ident "SSZ" Format = ident "SSZ"
Bool = bindSym "bool"
Connection = bindSym "Connection" Connection = bindSym "Connection"
Peer = bindSym "Peer" Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node" Eth2Node = bindSym "Eth2Node"
@ -1992,19 +1864,15 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
callUserHandler = ident "callUserHandler" callUserHandler = ident "callUserHandler"
MSG = ident "MSG" MSG = ident "MSG"
p.useRequestIds = false
p.useSingleRecordInlining = true
new result new result
result.PeerType = Peer result.PeerType = Peer
result.NetworkType = Eth2Node result.NetworkType = Eth2Node
result.registerProtocol = bindSym "registerProtocol"
result.setEventHandlers = bindSym "setEventHandlers" result.setEventHandlers = bindSym "setEventHandlers"
result.SerializationFormat = Format result.SerializationFormat = Format
result.RequestResultsWrapper = ident "NetRes" result.RequestResultsWrapper = ident "NetRes"
result.implementMsg = proc (msg: p2p_protocol_dsl.Message) = result.implementMsg = proc (msg: eth2_protocol_dsl.Message) =
if msg.kind == msgResponse: if msg.kind == msgResponse:
return return
@ -2015,8 +1883,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
MsgRecName = msg.recName MsgRecName = msg.recName
MsgStrongRecName = msg.strongRecName MsgStrongRecName = msg.strongRecName
codecNameLit = getRequestProtoName(msg.procDef) codecNameLit = getRequestProtoName(msg.procDef)
isRequiredLit = isRequiredProto(msg.procDef)
isLightClientRequestLit = isLightClientRequestProto(msg.procDef)
protocolMounterName = ident(msgName & "Mounter") protocolMounterName = ident(msgName & "Mounter")
## ##
@ -2068,15 +1934,19 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
`msgVar`: `MsgRecName`): untyped = `msgVar`: `MsgRecName`): untyped =
`userHandlerCall` `userHandlerCall`
proc `protocolMounterName`(`networkVar`: `Eth2Node`) = proc `protocolMounterName`(`networkVar`: `Eth2Node`) {.raises: [].} =
proc snappyThunk(`streamVar`: `Connection`, proc snappyThunk(`streamVar`: `Connection`,
`protocolVar`: string): Future[void] {.gcsafe.} = `protocolVar`: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`, return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`,
`MsgStrongRecName`) `MsgStrongRecName`)
try:
mount `networkVar`.switch, mount `networkVar`.switch,
LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk) LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk)
except LPError as exc:
# Failure here indicates that the mounting was done incorrectly which
# would be a programming error
raiseAssert exc.msg
## ##
## Implement Senders and Handshake ## Implement Senders and Handshake
## ##
@ -2091,16 +1961,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
protocol.protocolInfoVar, protocol.protocolInfoVar,
msgNameLit, msgNameLit,
protocolMounterName, protocolMounterName,
codecNameLit, codecNameLit))
isRequiredLit,
isLightClientRequestLit))
result.implementProtocolInit = proc (p: P2PProtocol): NimNode = result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit) var id {.global.}: int
let tmp = id
id += 1
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit, newLit(tmp))
#Must import here because of cyclicity #Must import here because of cyclicity
import ../sync/sync_protocol import ./peer_protocol
export sync_protocol export peer_protocol
proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} = proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} =
trace "updating peer metadata", peerId trace "updating peer metadata", peerId
@ -2116,7 +1987,7 @@ proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} =
peer.failedMetadataRequests.inc() peer.failedMetadataRequests.inc()
return return
peer.metadata = some(newMetadata) peer.metadata = Opt.some(newMetadata)
peer.failedMetadataRequests = 0 peer.failedMetadataRequests = 0
peer.lastMetadataTime = Moment.now() peer.lastMetadataTime = Moment.now()
@ -2201,9 +2072,9 @@ proc getPersistentNetKeys*(
# Insecure password used only for automated testing. # Insecure password used only for automated testing.
insecurePassword = insecurePassword =
if netKeyInsecurePassword: if netKeyInsecurePassword:
some(NetworkInsecureKeyPassword) Opt.some(NetworkInsecureKeyPassword)
else: else:
none[string]() Opt.none(string)
keyPath = keyPath =
if isAbsolute(netKeyFile): if isAbsolute(netKeyFile):
@ -2286,7 +2157,6 @@ proc newBeaconSwitch(config: BeaconNodeConf | LightClientConf,
.withTcpTransport({ServerFlags.ReuseAddr}) .withTcpTransport({ServerFlags.ReuseAddr})
.build() .build()
proc createEth2Node*(rng: ref HmacDrbgContext, proc createEth2Node*(rng: ref HmacDrbgContext,
config: BeaconNodeConf | LightClientConf, config: BeaconNodeConf | LightClientConf,
netKeys: NetKeyPair, netKeys: NetKeyPair,
@ -2601,7 +2471,7 @@ proc updateForkId*(node: Eth2Node, epoch: Epoch, genesis_validators_root: Eth2Di
node.updateForkId(getENRForkID(node.cfg, epoch, genesis_validators_root)) node.updateForkId(getENRForkID(node.cfg, epoch, genesis_validators_root))
node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesis_validators_root) node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesis_validators_root)
func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest = func forkDigestAtEpoch*(node: Eth2Node, epoch: Epoch): ForkDigest =
node.forkDigests[].atEpoch(epoch, node.cfg) node.forkDigests[].atEpoch(epoch, node.cfg)
proc getWallEpoch(node: Eth2Node): Epoch = proc getWallEpoch(node: Eth2Node): Epoch =

View File

@ -0,0 +1,961 @@
# beacon_chain
# Copyright (c) 2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
std/[sequtils],
results,
stew/shims/macros, chronos, faststreams/outputs
export chronos, results
type
MessageKind* = enum
msgHandshake
msgNotification
msgRequest
msgResponse
Message* = ref object
id*: int
ident*: NimNode
kind*: MessageKind
procDef*: NimNode
timeoutParam*: NimNode
recName*: NimNode
strongRecName*: NimNode
recBody*: NimNode
protocol*: P2PProtocol
response*: Message
userHandler*: NimNode
initResponderCall*: NimNode
outputParamDef*: NimNode
Request* = ref object
queries*: seq[Message]
response*: Message
SendProc* = object
## A `SendProc` is a proc used to send a single P2P message.
## If it's a Request, then the return type will be a Future
## of the respective Response type. All send procs also have
## an automatically inserted `timeout` parameter.
msg*: Message
## The message being implemented
def*: NimNode
## The definition of the proc
peerParam*: NimNode
## Cached ident for the peer param
msgParams*: seq[NimNode]
## Cached param ident for all values that must be written
## on the wire. The automatically inserted `timeout` is not
## included.
timeoutParam*: NimNode
## Cached ident for the timeout parameter
extraDefs*: NimNode
## The response procs have extra templates that must become
## part of the generated code
P2PProtocol* = ref object
# Settings
name*: string
version*: int
timeouts*: int64
outgoingRequestDecorator*: NimNode
incomingRequestDecorator*: NimNode
incomingRequestThunkDecorator*: NimNode
incomingResponseDecorator*: NimNode
incomingResponseThunkDecorator*: NimNode
PeerStateType*: NimNode
NetworkStateType*: NimNode
backend*: Backend
# Cached properties
nameIdent*: NimNode
protocolInfoVar*: NimNode
# All messages
messages*: seq[Message]
# Messages by type:
handshake*: Message
notifications*: seq[Message]
requests*: seq[Request]
# Output procs
outSendProcs*: NimNode
outRecvProcs*: NimNode
outProcRegistrations*: NimNode
# Event handlers
onPeerConnected*: NimNode
onPeerDisconnected*: NimNode
Backend* = ref object
# Code generators
implementMsg*: proc (msg: Message)
implementProtocolInit*: proc (protocol: P2PProtocol): NimNode
afterProtocolInit*: proc (protocol: P2PProtocol)
# Bound symbols to the back-end run-time types and procs
PeerType*: NimNode
NetworkType*: NimNode
SerializationFormat*: NimNode
ResponderType*: NimNode
RequestResultsWrapper*: NimNode
registerProtocol*: NimNode
setEventHandlers*: NimNode
BackendFactory* = proc (p: P2PProtocol): Backend
P2PBackendError* = object of CatchableError
InvalidMsgError* = object of P2PBackendError
const
defaultReqTimeout = 10.seconds
tracingEnabled = defined(p2pdump)
let
# Variable names affecting the public interface of the library:
peerVar* {.compileTime.} = ident "peer"
responseVar* {.compileTime.} = ident "response"
streamVar* {.compileTime.} = ident "stream"
protocolVar* {.compileTime.} = ident "protocol"
deadlineVar* {.compileTime.} = ident "deadline"
timeoutVar* {.compileTime.} = ident "timeout"
currentProtocolSym* {.compileTime.} = ident "CurrentProtocol"
resultIdent* {.compileTime.} = ident "result"
# Locally used symbols:
Opt {.compileTime.} = ident "Opt"
Future {.compileTime.} = ident "Future"
Void {.compileTime.} = ident "void"
writeField {.compileTime.} = ident "writeField"
PROTO {.compileTime.} = ident "PROTO"
MSG {.compileTime.} = ident "MSG"
template Fut(T): auto = newTree(nnkBracketExpr, Future, T)
proc initFuture*[T](loc: var Future[T]) =
loc = newFuture[T]()
template applyDecorator(p: NimNode, decorator: NimNode) =
if decorator.kind != nnkNilLit:
p.pragma.insert(0, decorator)
when tracingEnabled:
proc logSentMsgFields(peer: NimNode,
protocolInfo: NimNode,
msgName: string,
fields: openArray[NimNode]): NimNode =
## This generates the tracing code inserted in the message sending procs
## `fields` contains all the params that were serialized in the message
let
tracer = ident "tracer"
tracerStream = ident "tracerStream"
logMsgEventImpl = ident "logMsgEventImpl"
result = quote do:
var `tracerStream` = memoryOutput()
var `tracer` = JsonWriter.init(`tracerStream`)
beginRecord(`tracer`)
for f in fields:
result.add newCall(writeField, tracer, newLit($f), f)
result.add quote do:
endRecord(`tracer`)
`logMsgEventImpl`("outgoing_msg", `peer`,
`protocolInfo`, `msgName`,
getOutput(`tracerStream`, string))
proc createPeerState[Peer, ProtocolState](peer: Peer): RootRef =
var res = new ProtocolState
mixin initProtocolState
initProtocolState(res, peer)
return cast[RootRef](res)
proc expectBlockWithProcs*(n: NimNode): seq[NimNode] =
template helperName: auto = $n[0]
if n.len != 2 or n[1].kind != nnkStmtList:
error(helperName & " expects a block", n)
for p in n[1]:
if p.kind == nnkProcDef:
result.add p
elif p.kind == nnkCommentStmt:
continue
else:
error(helperName & " expects a proc definition.", p)
proc nameOrNil*(procDef: NimNode): NimNode =
if procDef != nil:
procDef.name
else:
newNilLit()
proc isOutputParamName(paramName: NimNode): bool =
eqIdent(paramName, "output") or eqIdent(paramName, "response")
proc isOutputParam(param: NimNode): bool =
param.len > 0 and param[0].skipPragma.isOutputParamName
proc getOutputParam(procDef: NimNode): NimNode =
let params = procDef.params
for i in countdown(params.len - 1, 1):
let param = params[i]
if isOutputParam(param):
return param
proc outputParam*(msg: Message): NimNode =
case msg.kind
of msgRequest:
outputParam(msg.response)
of msgResponse:
msg.outputParamDef
else:
raiseAssert "Only requests (and the attached responses) can have output parameters"
proc outputParamIdent*(msg: Message): NimNode =
let outputParam = msg.outputParam
if outputParam != nil:
return outputParam[0].skipPragma
proc outputParamType*(msg: Message): NimNode =
let outputParam = msg.outputParam
if outputParam != nil:
return outputParam[1]
proc refreshParam(n: NimNode): NimNode =
result = copyNimTree(n)
if n.kind == nnkIdentDefs:
for i in 0..<n.len-2:
if n[i].kind == nnkSym:
result[i] = genSym(symKind(n[i]), $n[i])
iterator typedInputParams(procDef: NimNode, skip = 0): (NimNode, NimNode) =
for paramName, paramType in typedParams(procDef, skip):
if not isOutputParamName(paramName):
yield (paramName, paramType)
proc copyInputParams(params: NimNode): NimNode =
result = newTree(params.kind)
for param in params:
if not isOutputParam(param):
result.add param.refreshParam
proc chooseFieldType(n: NimNode): NimNode =
## Examines the parameter types used in the message signature
## and selects the corresponding field type for use in the
## message object type (i.e. `p2p.hello`).
##
## For now, only openArray types are remapped to sequences.
result = n
if n.kind == nnkBracketExpr and eqIdent(n[0], "openArray"):
result = n.copyNimTree
result[0] = ident("seq")
proc verifyStateType(t: NimNode): NimNode =
result = t[1]
if result.kind == nnkSym and $result == "nil":
return nil
if result.kind != nnkBracketExpr or $result[0] != "ref":
error $result & " must be a ref type"
proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode)
proc init*(T: type P2PProtocol, backendFactory: BackendFactory,
name: string, version: int, body: NimNode,
timeouts: int64,
outgoingRequestDecorator: NimNode,
incomingRequestDecorator: NimNode,
incomingRequestThunkDecorator: NimNode,
incomingResponseDecorator: NimNode,
incomingResponseThunkDecorator: NimNode,
peerState, networkState: NimNode): P2PProtocol =
result = P2PProtocol(
name: name,
version: version,
timeouts: timeouts,
outgoingRequestDecorator: outgoingRequestDecorator,
incomingRequestDecorator: incomingRequestDecorator,
incomingRequestThunkDecorator: incomingRequestThunkDecorator,
incomingResponseDecorator: incomingResponseDecorator,
incomingResponseThunkDecorator: incomingResponseThunkDecorator,
PeerStateType: verifyStateType peerState,
NetworkStateType: verifyStateType networkState,
nameIdent: ident(name),
protocolInfoVar: ident(name & "Protocol"),
outSendProcs: newStmtList(),
outRecvProcs: newStmtList(),
outProcRegistrations: newStmtList())
result.backend = backendFactory(result)
assert(not result.backend.implementProtocolInit.isNil)
result.processProtocolBody body
if not result.backend.afterProtocolInit.isNil:
result.backend.afterProtocolInit(result)
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode) =
## This procs adds a set of common helpers available in all messages handlers
## (e.g. `peer.state`, etc).
userHandlerProc.addPragma ident"gcsafe"
var
getState = ident"getState"
getNetworkState = ident"getNetworkState"
protocolInfoVar = p.protocolInfoVar
protocolNameIdent = p.nameIdent
PeerType = p.backend.PeerType
PeerStateType = p.PeerStateType
NetworkStateType = p.NetworkStateType
prelude = newStmtList()
userHandlerProc.body.insert 0, prelude
# We allow the user handler to use `openArray` params, but we turn
# those into sequences to make the `async` pragma happy.
for i in 1 ..< userHandlerProc.params.len:
var param = userHandlerProc.params[i]
param[^2] = chooseFieldType(param[^2])
prelude.add quote do:
type `currentProtocolSym` {.used.} = `protocolNameIdent`
# Define local accessors for the peer and the network protocol states
# inside each user message handler proc (e.g. peer.state.foo = bar)
if PeerStateType != nil:
prelude.add quote do:
template state(`peerVar`: `PeerType`): `PeerStateType` =
cast[`PeerStateType`](`getState`(`peerVar`, `protocolInfoVar`))
if NetworkStateType != nil:
prelude.add quote do:
template networkState(`peerVar`: `PeerType`): `NetworkStateType` =
cast[`NetworkStateType`](`getNetworkState`(`peerVar`.network, `protocolInfoVar`))
proc addPreludeDefs*(userHandlerProc: NimNode, definitions: NimNode) =
userHandlerProc.body[0].add definitions
proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode =
## Turns a "named" do block to a regular async proc
## (e.g. onPeerConnected do ...)
result = newTree(nnkProcDef)
doBlock.copyChildrenTo(result)
result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName)
p.augmentUserHandler result
proc addTimeoutParam(procDef: NimNode, defaultValue: int64) =
var
Duration = bindSym"Duration"
milliseconds = bindSym"milliseconds"
procDef.params.add newTree(nnkIdentDefs,
timeoutVar,
Duration,
newCall(milliseconds, newLit(defaultValue)))
proc ResponderType(msg: Message): NimNode =
var resp = if msg.kind == msgRequest: msg.response else: msg
newTree(nnkBracketExpr,
msg.protocol.backend.ResponderType, resp.strongRecName)
proc needsSingleParamInlining(msg: Message): bool =
msg.recBody.kind == nnkDistinctTy
proc newMsg(protocol: P2PProtocol, kind: MessageKind,
procDef: NimNode, response: Message = nil): Message =
if procDef[0].kind == nnkPostfix:
error("p2pProtocol procs are public by default. " &
"Please remove the postfix `*`.", procDef)
var
msgIdent = procDef.name
msgName = $msgIdent
recFields = newTree(nnkRecList)
recBody = newTree(nnkObjectTy, newEmptyNode(), newEmptyNode(), recFields)
strongRecName = ident(msgName & "Obj")
recName = strongRecName
for param, paramType in procDef.typedInputParams(skip = 1):
recFields.add newTree(nnkIdentDefs,
newTree(nnkPostfix, ident("*"), param), # The fields are public
chooseFieldType(paramType), # some types such as openArray
newEmptyNode()) # are automatically remapped
if recFields.len == 1:
# When we have a single parameter, it's treated as the transferred message
# type. `recName` will be resolved to the message type that's intended
# for serialization while `strongRecName` will be a distinct type over
# which overloads such as `msgId` can be defined. We must use a distinct
# type because otherwise Nim may see multiple overloads defined over the
# same request parameter type and this will be an ambiguity error.
recName = recFields[0][1]
recBody = newTree(nnkDistinctTy, recName)
result = Message(protocol: protocol,
ident: msgIdent,
kind: kind,
procDef: procDef,
recName: recName,
strongRecName: strongRecName,
recBody: recBody,
response: response)
if procDef.body.kind != nnkEmpty:
var userHandler = copy procDef
protocol.augmentUserHandler userHandler
userHandler.name = ident(msgName & "UserHandler")
case kind
of msgRequest: userHandler.applyDecorator protocol.incomingRequestDecorator
of msgResponse: userHandler.applyDecorator protocol.incomingResponseDecorator
else: discard
result.userHandler = userHandler
protocol.outRecvProcs.add result.userHandler
protocol.messages.add result
proc isVoid(t: NimNode): bool =
t.kind == nnkEmpty or eqIdent(t, "void")
proc addMsg(p: P2PProtocol, procDef: NimNode) =
var
returnType = procDef.params[0]
hasReturnValue = not isVoid(returnType)
outputParam = procDef.getOutputParam()
if outputParam != nil:
if hasReturnValue:
error "A request proc should either use a return value or an output parameter"
returnType = outputParam[1]
hasReturnValue = true
if hasReturnValue:
let
responseIdent = ident($procDef.name & "Response")
response = Message(protocol: p,
ident: responseIdent,
kind: msgResponse,
recName: returnType,
strongRecName: returnType,
recBody: returnType,
outputParamDef: outputParam)
p.messages.add response
let msg = p.newMsg(msgRequest, procDef, response = response)
p.requests.add Request(queries: @[msg], response: response)
else:
p.notifications.add p.newMsg(msgNotification, procDef)
proc identWithExportMarker*(msg: Message): NimNode =
newTree(nnkPostfix, ident("*"), msg.ident)
proc requestResultType*(msg: Message): NimNode =
let
protocol = msg.protocol
backend = protocol.backend
responseRec = msg.response.recName
var wrapperType = backend.RequestResultsWrapper
if wrapperType != nil:
if eqIdent(wrapperType, "void"):
return responseRec
else:
return newTree(nnkBracketExpr, wrapperType, responseRec)
else:
return newTree(nnkBracketExpr, Opt, responseRec)
proc createSendProc*(msg: Message,
procType = nnkProcDef,
isRawSender = false,
nameSuffix = ""): SendProc =
# TODO: file an issue:
# macros.newProc and macros.params doesn't work with nnkMacroDef
let
nameSuffix = if nameSuffix.len == 0: (if isRawSender: "RawSender" else: "")
else: nameSuffix
name = if nameSuffix.len == 0: msg.identWithExportMarker
else: ident($msg.ident & nameSuffix)
pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
else: newEmptyNode()
var def = newNimNode(procType).add(
name,
newEmptyNode(),
newEmptyNode(),
copyInputParams msg.procDef.params,
pragmas,
newEmptyNode(),
newStmtList()) ## body
if procType == nnkProcDef:
for p in msg.procDef.pragma:
if not eqIdent(p, "async"):
def.addPragma p
result.msg = msg
result.def = def
for param, paramType in def.typedInputParams():
if result.peerParam.isNil:
result.peerParam = param
else:
result.msgParams.add param
case msg.kind
of msgHandshake, msgRequest:
# Add a timeout parameter for all request procs
def.addTimeoutParam(msg.protocol.timeouts)
of msgResponse:
if msg.ResponderType != nil:
# A response proc must be called with a response object that originates
# from a certain request. Here we change the Peer parameter at position
# 1 to the correct strongly-typed ResponderType. The incoming procs still
# gets the normal Peer parameter.
let
ResponderType = msg.ResponderType
sendProcName = msg.ident
def[3][1][1] = ResponderType
# We create a helper that enables the `response.send()` syntax
# inside the user handler of the request proc:
result.extraDefs = quote do:
template send*(r: `ResponderType`, args: varargs[untyped]): auto =
`sendProcName`(r, args)
of msgNotification:
discard
def[3][0] = if procType == nnkMacroDef:
ident "untyped"
elif msg.kind == msgRequest and not isRawSender:
Fut(msg.requestResultType)
elif msg.kind == msgHandshake and not isRawSender:
Fut(msg.recName)
else:
Fut(Void)
proc setBody*(sendProc: SendProc, body: NimNode) =
var
msg = sendProc.msg
protocol = msg.protocol
def = sendProc.def
# TODO: macros.body triggers an assertion error when the proc type is nnkMacroDef
def[6] = body
if msg.kind == msgRequest:
def.applyDecorator protocol.outgoingRequestDecorator
msg.protocol.outSendProcs.add def
if sendProc.extraDefs != nil:
msg.protocol.outSendProcs.add sendProc.extraDefs
proc writeParamsAsRecord*(params: openArray[NimNode],
outputStream, Format, RecordType: NimNode): NimNode =
if params.len == 0:
return newStmtList()
var
appendParams = newStmtList()
recordWriterCtx = ident "recordWriterCtx"
writer = ident "writer"
for param in params:
appendParams.add newCall(writeField,
writer, recordWriterCtx,
newLit($param), param)
if params.len > 1:
result = quote do:
mixin init, writerType, beginRecord, endRecord
var `writer` = init(WriterType(`Format`), `outputStream`)
var `recordWriterCtx` = beginRecord(`writer`, `RecordType`)
`appendParams`
endRecord(`writer`, `recordWriterCtx`)
else:
let param = params[0]
result = quote do:
var `writer` = init(WriterType(`Format`), `outputStream`)
writeValue(`writer`, `param`)
proc useStandardBody*(sendProc: SendProc,
preSerializationStep: proc(stream: NimNode): NimNode,
postSerializationStep: proc(stream: NimNode): NimNode,
sendCallGenerator: proc (peer, bytes: NimNode): NimNode) =
let
msg = sendProc.msg
msgBytes = ident "msgBytes"
recipient = sendProc.peerParam
sendCall = sendCallGenerator(recipient, msgBytes)
if sendProc.msgParams.len == 0:
sendProc.setBody quote do:
var `msgBytes`: seq[byte]
`sendCall`
return
let
outputStream = ident "outputStream"
msgRecName = msg.recName
Format = msg.protocol.backend.SerializationFormat
preSerialization = if preSerializationStep.isNil: newStmtList()
else: preSerializationStep(outputStream)
serialization = writeParamsAsRecord(sendProc.msgParams,
outputStream, Format, msgRecName)
postSerialization = if postSerializationStep.isNil: newStmtList()
else: postSerializationStep(outputStream)
tracing = when not tracingEnabled:
newStmtList()
else:
logSentMsgFields(recipient,
msg.protocol.protocolInfoVar,
$msg.ident,
sendProc.msgParams)
sendProc.setBody quote do:
mixin init, WriterType, beginRecord, endRecord, getOutput
var `outputStream` = memoryOutput()
`preSerialization`
`serialization`
`postSerialization`
`tracing`
let `msgBytes` = getOutput(`outputStream`)
`sendCall`
proc correctSerializerProcParams(params: NimNode) =
# A serializer proc is just like a send proc, but:
# 1. it has a void return type
params[0] = ident "void"
# 2. The peer params is replaced with OutputStream
params[1] = newIdentDefs(streamVar, bindSym "OutputStream")
# 3. The timeout param is removed
params.del(params.len - 1)
proc createSerializer*(msg: Message, procType = nnkProcDef): NimNode =
var serializer = msg.createSendProc(procType, nameSuffix = "Serializer")
correctSerializerProcParams serializer.def.params
serializer.setBody writeParamsAsRecord(
serializer.msgParams,
streamVar,
msg.protocol.backend.SerializationFormat,
msg.recName)
return serializer.def
proc defineThunk*(msg: Message, thunk: NimNode) =
let protocol = msg.protocol
case msg.kind
of msgRequest: thunk.applyDecorator protocol.incomingRequestThunkDecorator
of msgResponse: thunk.applyDecorator protocol.incomingResponseThunkDecorator
else: discard
protocol.outRecvProcs.add thunk
proc genUserHandlerCall*(msg: Message, receivedMsg: NimNode,
leadingParams: openArray[NimNode],
outputParam: NimNode = nil): NimNode =
if msg.userHandler == nil:
return newStmtList()
result = newCall(msg.userHandler.name, leadingParams)
if msg.needsSingleParamInlining:
result.add receivedMsg
else:
var params = toSeq(msg.procDef.typedInputParams(skip = 1))
for p in params:
result.add newDotExpr(receivedMsg, p[0])
if outputParam != nil:
result.add outputParam
proc genAwaitUserHandler*(msg: Message, receivedMsg: NimNode,
leadingParams: openArray[NimNode],
outputParam: NimNode = nil): NimNode =
result = msg.genUserHandlerCall(receivedMsg, leadingParams, outputParam)
if result.len > 0: result = newCall("await", result)
proc appendAllInputParams*(node: NimNode, procDef: NimNode): NimNode =
result = node
for p, _ in procDef.typedInputParams():
result.add p
proc paramNames*(procDef: NimNode, skipFirst = 0): seq[NimNode] =
result = newSeq[NimNode]()
for name, _ in procDef.typedParams(skip = skipFirst):
result.add name
proc netInit*(p: P2PProtocol): NimNode =
newNilLit()
# if p.NetworkStateType == nil:
# newNilLit()
# else:
# newTree(nnkBracketExpr, bindSym"createNetworkState",
# p.backend.NetworkType,
# p.NetworkStateType)
proc createHandshakeTemplate*(msg: Message,
rawSendProc, handshakeImpl,
nextMsg: NimNode): SendProc =
let
handshakeExchanger = msg.createSendProc(procType = nnkTemplateDef)
forwardCall = newCall(rawSendProc).appendAllInputParams(handshakeExchanger.def)
peerValue = forwardCall[1]
msgRecName = msg.recName
forwardCall[1] = peerVar
forwardCall.del(forwardCall.len - 1)
let peerVar = genSym(nskLet ,"peer")
handshakeExchanger.setBody quote do:
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`(`peerVar`,
sendingFuture,
`nextMsg`(`peerVar`, `msgRecName`),
`timeoutVar`)
return handshakeExchanger
proc peerInit*(p: P2PProtocol): NimNode =
if p.PeerStateType == nil:
newNilLit()
else:
newTree(nnkBracketExpr, bindSym"createPeerState",
p.backend.PeerType,
p.PeerStateType)
proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
## This procs handles all DSL statements valid inside a p2pProtocol.
##
## It will populate the protocol's fields such as:
## * handshake
## * requests
## * notifications
## * onPeerConnected
## * onPeerDisconnected
##
## All messages will have properly computed numeric IDs
##
for n in protocolBody:
case n.kind
of {nnkCall, nnkCommand}:
if eqIdent(n[0], "requestResponse"):
# `requestResponse` can be given a block of 2 or more procs.
# The last one is considered to be a response message, while
# all preceding ones are requests triggering the response.
# The system makes sure to automatically insert a hidden `reqId`
# parameter used to discriminate the individual messages.
let procs = expectBlockWithProcs(n)
if procs.len < 2:
error "requestResponse expects a block with at least two proc definitions"
var queries = newSeq[Message]()
let responseMsg = p.newMsg(msgResponse, procs[^1])
for i in 0 .. procs.len - 2:
queries.add p.newMsg(msgRequest, procs[i], response = responseMsg)
p.requests.add Request(queries: queries, response: responseMsg)
elif eqIdent(n[0], "handshake"):
let procs = expectBlockWithProcs(n)
if procs.len != 1:
error "handshake expects a block with a single proc definition", n
if p.handshake != nil:
error "The handshake for the protocol is already defined", n
p.handshake = p.newMsg(msgHandshake, procs[0])
elif eqIdent(n[0], "onPeerConnected"):
p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected")
elif eqIdent(n[0], "onPeerDisconnected"):
p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected")
else:
error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
of nnkProcDef, nnkIteratorDef:
p.addMsg(n)
of nnkCommentStmt:
discard
else:
error "Illegal syntax in a P2P protocol definition", n
proc genTypeSection*(p: P2PProtocol): NimNode =
var
protocolName = p.nameIdent
peerState = p.PeerStateType
networkState= p.NetworkStateType
result = newStmtList()
result.add quote do:
# Create a type acting as a pseudo-object representing the protocol
# (e.g. p2p)
type `protocolName`* = object
if peerState != nil:
result.add quote do:
template State*(`PROTO`: type `protocolName`): type = `peerState`
if networkState != nil:
result.add quote do:
template NetworkState*(`PROTO`: type `protocolName`): type = `networkState`
for msg in p.messages:
if msg.procDef == nil:
continue
let
msgName = msg.ident
msgRecName = msg.recName
msgStrongRecName = msg.strongRecName
msgRecBody = msg.recBody
result.add quote do:
# This is a type featuring a single field for each message param:
type `msgStrongRecName`* = `msgRecBody`
# Add a helper template for accessing the message type:
# e.g. p2p.hello:
template `msgName`*(`PROTO`: type `protocolName`): type = `msgRecName`
# Add a helper template for obtaining the message Id for
# a particular message type:
template msgProtocol*(`MSG`: type `msgStrongRecName`): type = `protocolName`
template RecType*(`MSG`: type `msgStrongRecName`): untyped = `msgRecName`
proc genCode*(p: P2PProtocol): NimNode =
for msg in p.messages:
p.backend.implementMsg msg
result = newStmtList()
result.add p.genTypeSection()
let
protocolInfoVar = p.protocolInfoVar
protocolInfoVarObj = ident($protocolInfoVar & "Obj")
protocolName = p.nameIdent
protocolInit = p.backend.implementProtocolInit(p)
result.add quote do:
# One global variable per protocol holds the protocol run-time data
var `protocolInfoVarObj` = `protocolInit`
let `protocolInfoVar` = addr `protocolInfoVarObj`
# The protocol run-time data is available as a pseudo-field
# (e.g. `p2p.protocolInfo`)
template protocolInfo*(`PROTO`: type `protocolName`): auto = `protocolInfoVar`
result.add p.outSendProcs,
p.outRecvProcs,
p.outProcRegistrations
if p.onPeerConnected != nil: result.add p.onPeerConnected
if p.onPeerDisconnected != nil: result.add p.onPeerDisconnected
result.add newCall(p.backend.setEventHandlers,
protocolInfoVar,
nameOrNil p.onPeerConnected,
nameOrNil p.onPeerDisconnected)
macro emitForSingleBackend(
name: static[string],
version: static[int],
backend: static[BackendFactory],
body: untyped,
# TODO Nim can't handle a proper duration parameter here
timeouts: static[int64] = defaultReqTimeout.milliseconds,
outgoingRequestDecorator: untyped = nil,
incomingRequestDecorator: untyped = nil,
incomingRequestThunkDecorator: untyped = nil,
incomingResponseDecorator: untyped = nil,
incomingResponseThunkDecorator: untyped = nil,
peerState = type(nil),
networkState = type(nil)): untyped =
var p = P2PProtocol.init(
backend,
name, version, body, timeouts,
outgoingRequestDecorator,
incomingRequestDecorator,
incomingRequestThunkDecorator,
incomingResponseDecorator,
incomingResponseThunkDecorator,
peerState.getType, networkState.getType)
result = p.genCode()
try:
result.storeMacroResult true
except IOError:
# IO error so the generated nim code might not be stored, don't sweat it.
discard
macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped =
let name = $(options[0])
var backends = newSeq[NimNode]()
if backendSyms.kind == nnkSym:
backends.add backendSyms
else:
for backend in backendSyms:
backends.add backend
result = newStmtList()
for backend in backends:
let call = copy options
call[0] = bindSym"emitForSingleBackend"
call.add newTree(nnkExprEqExpr, ident("name"), newLit(name))
call.add newTree(nnkExprEqExpr, ident("backend"), backend)
call.add newTree(nnkExprEqExpr, ident("body"), body)
result.add call
template p2pProtocol*(options: untyped, body: untyped) {.dirty.} =
bind emitForAllBackends
emitForAllBackends(p2pProtocolBackendImpl, options, body)

View File

@ -0,0 +1,247 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
chronicles,
../spec/network,
".."/[beacon_clock],
../networking/eth2_network,
../consensus_object_pools/blockchain_dag,
../rpc/rest_constants
logScope:
topics = "peer_proto"
type
StatusMsg* = object
forkDigest*: ForkDigest
finalizedRoot*: Eth2Digest
finalizedEpoch*: Epoch
headRoot*: Eth2Digest
headSlot*: Slot
PeerSyncNetworkState* {.final.} = ref object of RootObj
dag: ChainDAGRef
cfg: RuntimeConfig
forkDigests: ref ForkDigests
genesisBlockRoot: Eth2Digest
getBeaconTime: GetBeaconTimeFn
PeerSyncPeerState* {.final.} = ref object of RootObj
statusLastTime: chronos.Moment
statusMsg: StatusMsg
func shortLog*(s: StatusMsg): auto =
(
forkDigest: s.forkDigest,
finalizedRoot: shortLog(s.finalizedRoot),
finalizedEpoch: shortLog(s.finalizedEpoch),
headRoot: shortLog(s.headRoot),
headSlot: shortLog(s.headSlot)
)
chronicles.formatIt(StatusMsg): shortLog(it)
func disconnectReasonName(reason: uint64): string =
# haha, nim doesn't support uint64 in `case`!
if reason == uint64(ClientShutDown): "Client shutdown"
elif reason == uint64(IrrelevantNetwork): "Irrelevant network"
elif reason == uint64(FaultOrError): "Fault or error"
else: "Disconnected (" & $reason & ")"
func forkDigestAtEpoch(state: PeerSyncNetworkState,
epoch: Epoch): ForkDigest =
state.forkDigests[].atEpoch(epoch, state.cfg)
proc getCurrentStatus(state: PeerSyncNetworkState): StatusMsg =
let
dag = state.dag
wallSlot = state.getBeaconTime().slotOrZero
if dag != nil:
StatusMsg(
forkDigest: state.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: dag.finalizedHead.blck.root,
finalizedEpoch: dag.finalizedHead.slot.epoch,
headRoot: dag.head.root,
headSlot: dag.head.slot)
else:
StatusMsg(
forkDigest: state.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: state.genesisBlockRoot,
finalizedEpoch: GENESIS_EPOCH,
headRoot: state.genesisBlockRoot,
headSlot: GENESIS_SLOT)
proc checkStatusMsg(state: PeerSyncNetworkState, status: StatusMsg):
Result[void, cstring] =
let
dag = state.dag
wallSlot = (state.getBeaconTime() + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero
if status.finalizedEpoch > status.headSlot.epoch:
# Can be equal during genesis or checkpoint start
return err("finalized epoch newer than head")
if status.headSlot > wallSlot:
return err("head more recent than wall clock")
if state.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest:
return err("fork digests differ")
if dag != nil:
if status.finalizedEpoch <= dag.finalizedHead.slot.epoch:
let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot())
if blockId.isSome and
(not status.finalizedRoot.isZero) and
status.finalizedRoot != blockId.get().bid.root:
return err("peer following different finality")
else:
if status.finalizedEpoch == GENESIS_EPOCH:
if status.finalizedRoot != state.genesisBlockRoot:
return err("peer following different finality")
ok()
proc handleStatus(peer: Peer,
state: PeerSyncNetworkState,
theirStatus: StatusMsg): Future[bool] {.gcsafe.}
{.pop.} # TODO fix p2p macro for raises
p2pProtocol PeerSync(version = 1,
networkState = PeerSyncNetworkState,
peerState = PeerSyncPeerState):
onPeerConnected do (peer: Peer, incoming: bool) {.async.}:
debug "Peer connected",
peer, peerId = shortLog(peer.peerId), incoming
# Per the eth2 protocol, whoever dials must send a status message when
# connected for the first time, but because of how libp2p works, there may
# be a race between incoming and outgoing connections and disconnects that
# makes the incoming flag unreliable / obsolete by the time we get to
# this point - instead of making assumptions, we'll just send a status
# message redundantly.
# TODO(zah)
# the spec does not prohibit sending the extra status message on
# incoming connections, but it should not be necessary - this would
# need a dedicated flow in libp2p that resolves the race conditions -
# this needs more thinking around the ordering of events and the
# given incoming flag
let
ourStatus = peer.networkState.getCurrentStatus()
theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
if theirStatus.isOk:
discard await peer.handleStatus(peer.networkState, theirStatus.get())
else:
debug "Status response not received in time",
peer, errorKind = theirStatus.error.kind
await peer.disconnect(FaultOrError)
proc status(peer: Peer,
theirStatus: StatusMsg,
response: SingleChunkResponse[StatusMsg])
{.async, libp2pProtocol("status", 1).} =
let ourStatus = peer.networkState.getCurrentStatus()
trace "Sending status message", peer = peer, status = ourStatus
await response.send(ourStatus)
discard await peer.handleStatus(peer.networkState, theirStatus)
proc ping(peer: Peer, value: uint64): uint64
{.libp2pProtocol("ping", 1).} =
return peer.network.metadata.seq_number
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2
proc getMetaData(peer: Peer): uint64
{.libp2pProtocol("metadata", 1).} =
raise newException(InvalidInputsError, "GetMetaData v1 unsupported")
proc getMetadata_v2(peer: Peer): altair.MetaData
{.libp2pProtocol("metadata", 2).} =
return peer.network.metadata
proc goodbye(peer: Peer,
reason: uint64)
{.async, libp2pProtocol("goodbye", 1).} =
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
debug "Peer status", peer, statusMsg
peer.state(PeerSync).statusMsg = statusMsg
peer.state(PeerSync).statusLastTime = Moment.now()
proc handleStatus(peer: Peer,
state: PeerSyncNetworkState,
theirStatus: StatusMsg): Future[bool] {.async.} =
let
res = checkStatusMsg(state, theirStatus)
return if res.isErr():
debug "Irrelevant peer", peer, theirStatus, err = res.error()
await peer.disconnect(IrrelevantNetwork)
false
else:
peer.setStatusMsg(theirStatus)
if peer.connectionState == Connecting:
# As soon as we get here it means that we passed handshake succesfully. So
# we can add this peer to PeerPool.
await peer.handlePeer()
true
proc updateStatus*(peer: Peer): Future[bool] {.async.} =
## Request `status` of remote peer ``peer``.
let
nstate = peer.networkState(PeerSync)
ourStatus = getCurrentStatus(nstate)
let theirFut = awaitne peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
if theirFut.failed():
return false
else:
let theirStatus = theirFut.read()
if theirStatus.isOk:
return await peer.handleStatus(nstate, theirStatus.get())
else:
return false
proc getHeadSlot*(peer: Peer): Slot =
## Returns head slot for specific peer ``peer``.
peer.state(PeerSync).statusMsg.headSlot
proc getFinalizedEpoch*(peer: Peer): Epoch =
## Returns head slot for specific peer ``peer``.
peer.state(PeerSync).statusMsg.finalizedEpoch
proc getStatusLastTime*(peer: Peer): chronos.Moment =
## Returns head slot for specific peer ``peer``.
peer.state(PeerSync).statusLastTime
proc init*(T: type PeerSync.NetworkState,
dag: ChainDAGRef, getBeaconTime: GetBeaconTimeFn): T =
T(
dag: dag,
cfg: dag.cfg,
forkDigests: dag.forkDigests,
genesisBlockRoot: dag.genesisBlockRoot,
getBeaconTime: getBeaconTime,
)
proc init*(T: type PeerSync.NetworkState,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
genesisBlockRoot: Eth2Digest,
getBeaconTime: GetBeaconTimeFn): T =
T(
dag: nil,
cfg: cfg,
forkDigests: forkDigests,
genesisBlockRoot: genesisBlockRoot,
getBeaconTime: getBeaconTime,
)

View File

@ -19,6 +19,7 @@ import
./rpc/[rest_api, state_ttl_cache], ./rpc/[rest_api, state_ttl_cache],
./spec/datatypes/[altair, bellatrix, phase0], ./spec/datatypes/[altair, bellatrix, phase0],
./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], ./spec/[deposit_snapshots, engine_authentication, weak_subjectivity],
./sync/[sync_protocol, light_client_protocol],
./validators/[keystore_management, beacon_validators], ./validators/[keystore_management, beacon_validators],
"."/[ "."/[
beacon_node, beacon_node_light_client, deposits, beacon_node, beacon_node_light_client, deposits,
@ -516,7 +517,18 @@ proc initFullNode(
# Here, we also set the correct ENR should we be in all subnets mode! # Here, we also set the correct ENR should we be in all subnets mode!
node.network.updateStabilitySubnetMetadata(stabilitySubnets) node.network.updateStabilitySubnetMetadata(stabilitySubnets)
node.network.initBeaconSync(dag, getBeaconTime) node.network.registerProtocol(
PeerSync, PeerSync.NetworkState.init(
node.dag,
node.beaconClock.getBeaconTimeFn(),
))
node.network.registerProtocol(
BeaconSync, BeaconSync.NetworkState.init(node.dag))
if node.dag.lcDataStore.serve:
node.network.registerProtocol(
LightClientSync, LightClientSync.NetworkState.init(node.dag))
node.updateValidatorMetrics() node.updateValidatorMetrics()

View File

@ -135,7 +135,10 @@ programMain:
elManager.start(syncChain = false) elManager.start(syncChain = false)
info "Listening to incoming network requests" info "Listening to incoming network requests"
network.initBeaconSync(cfg, forkDigests, genesisBlockRoot, getBeaconTime) network.registerProtocol(
PeerSync, PeerSync.NetworkState.init(
cfg, forkDigests, genesisBlockRoot, getBeaconTime))
withAll(ConsensusFork): withAll(ConsensusFork):
let forkDigest = forkDigests[].atConsensusFork(consensusFork) let forkDigest = forkDigests[].atConsensusFork(consensusFork)
network.addValidator( network.addValidator(

View File

@ -62,7 +62,7 @@
import import
std/[macros, hashes, sets, strutils, tables, typetraits], std/[macros, hashes, sets, strutils, tables, typetraits],
stew/[assign2, byteutils, results], stew/[assign2, byteutils, endians2, results],
chronicles, chronicles,
json_serialization, json_serialization,
ssz_serialization/types as sszTypes, ssz_serialization/types as sszTypes,
@ -70,7 +70,7 @@ import
".."/[beacon_time, crypto, digest, presets] ".."/[beacon_time, crypto, digest, presets]
export export
tables, results, json_serialization, sszTypes, beacon_time, crypto, tables, results, endians2, json_serialization, sszTypes, beacon_time, crypto,
digest, presets digest, presets
const SPEC_VERSION* = "1.4.0-beta.5" const SPEC_VERSION* = "1.4.0-beta.5"

View File

@ -1,4 +1,4 @@
# Copyright (c) 2023 Status Research & Development GmbH # Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -7,11 +7,11 @@
{.push raises: [].} {.push raises: [].}
import import
chronos, presto/client, chronicles, presto/client, chronicles,
".."/[helpers, forks, eth2_ssz_serialization], ".."/[helpers, forks, eth2_ssz_serialization],
"."/[rest_types, rest_common, eth2_rest_serialization] "."/[rest_types, rest_common, eth2_rest_serialization]
export chronos, client, rest_types, eth2_rest_serialization export client, rest_types, eth2_rest_serialization
proc getNextWithdrawals*(state_id: StateIdent proc getNextWithdrawals*(state_id: StateIdent
): RestResponse[GetNextWithdrawalsResponse] {. ): RestResponse[GetNextWithdrawalsResponse] {.

View File

@ -7,10 +7,10 @@
{.push raises: [].} {.push raises: [].}
import import
chronos, presto/client, presto/client,
"."/[rest_types, eth2_rest_serialization] "."/[rest_types, eth2_rest_serialization]
export chronos, client, rest_types, eth2_rest_serialization export client, rest_types, eth2_rest_serialization
proc getForkSchedulePlain*(): RestPlainResponse {. proc getForkSchedulePlain*(): RestPlainResponse {.
rest, endpoint: "/eth/v1/config/fork_schedule", meth: MethodGet.} rest, endpoint: "/eth/v1/config/fork_schedule", meth: MethodGet.}

View File

@ -1,4 +1,4 @@
# Copyright (c) 2023 Status Research & Development GmbH # Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -7,7 +7,7 @@
{.push raises: [].} {.push raises: [].}
import import
chronos, presto/client, presto/client,
"."/[rest_types, eth2_rest_serialization] "."/[rest_types, eth2_rest_serialization]
proc subscribeEventStream*(topics: set[EventTopic]): RestHttpResponseRef {. proc subscribeEventStream*(topics: set[EventTopic]): RestHttpResponseRef {.

View File

@ -7,13 +7,13 @@
{.push raises: [].} {.push raises: [].}
import import
chronos, presto/client, chronicles, presto/client, chronicles,
".."/".."/validators/slashing_protection_common, ".."/".."/validators/slashing_protection_common,
".."/datatypes/[phase0, altair], ".."/datatypes/[phase0, altair],
".."/[helpers, forks, keystore, eth2_ssz_serialization], ".."/[helpers, forks, keystore, eth2_ssz_serialization],
"."/[rest_types, rest_common, rest_keymanager_types, eth2_rest_serialization] "."/[rest_types, rest_common, rest_keymanager_types, eth2_rest_serialization]
export chronos, client, rest_types, eth2_rest_serialization, export client, rest_types, eth2_rest_serialization,
rest_keymanager_types rest_keymanager_types
UUID.serializesAsBaseIn RestJson UUID.serializesAsBaseIn RestJson

View File

@ -1,4 +1,4 @@
# Copyright (c) 2023 Status Research & Development GmbH # Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed and distributed under either of # Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).

View File

@ -7,7 +7,7 @@
{.push raises: [].} {.push raises: [].}
import import
chronos, chronicles, presto/client, chronicles, presto/client,
"."/[rest_types, eth2_rest_serialization, rest_common] "."/[rest_types, eth2_rest_serialization, rest_common]
proc getValidatorsActivity*(epoch: Epoch, proc getValidatorsActivity*(epoch: Epoch,

View File

@ -8,13 +8,11 @@
import import
chronicles, metrics, chronicles, metrics,
chronos, chronos/apps/http/httpclient, presto, presto/client, chronos, presto/client,
serialization, json_serialization,
json_serialization/std/[net, sets],
stew/[results, base10, byteutils], stew/[results, base10, byteutils],
"."/[rest_types, eth2_rest_serialization] "."/[rest_types, eth2_rest_serialization]
export chronos, httpclient, client, rest_types, eth2_rest_serialization, results export chronos, client, rest_types, eth2_rest_serialization, results
type type
Web3SignerErrorKind* {.pure.} = enum Web3SignerErrorKind* {.pure.} = enum

View File

@ -7,10 +7,10 @@
{.push raises: [].} {.push raises: [].}
import import
chronos, presto/client, presto/client,
"."/[rest_types, eth2_rest_serialization] "."/[rest_types, eth2_rest_serialization]
export chronos, client, rest_types, eth2_rest_serialization export client, rest_types, eth2_rest_serialization
proc getAttesterDutiesPlain*( proc getAttesterDutiesPlain*(
epoch: Epoch, epoch: Epoch,

View File

@ -12,7 +12,7 @@ import
../spec/network, ../spec/network,
../networking/eth2_network, ../networking/eth2_network,
../beacon_clock, ../beacon_clock,
"."/[light_client_sync_helpers, sync_protocol, sync_manager] "."/[light_client_sync_helpers, light_client_protocol, sync_manager]
export sync_manager export sync_manager
logScope: logScope:

View File

@ -0,0 +1,192 @@
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
import
chronicles, chronos, snappy, snappy/codec,
../spec/[helpers, forks, network],
../networking/eth2_network,
../consensus_object_pools/blockchain_dag,
../rpc/rest_constants
logScope:
topics = "lc_proto"
const
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1)
## Only one bootstrap per peer should ever be needed - no need to allow more
lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000)
## Updates are tiny - we can allow lots of them
lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100)
type
LightClientNetworkState* {.final.} = ref object of RootObj
dag*: ChainDAGRef
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject):
Future[NetRes[MsgType]] {.async.} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError:
return neterr UnexpectedEOF
let contextFork =
peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr:
return neterr InvalidContextBytes
withLcDataFork(lcDataForkAtConsensusFork(contextFork)):
when lcDataFork > LightClientDataFork.None:
let res = await eth2_network.readChunkPayload(
conn, peer, MsgType.Forky(lcDataFork))
if res.isOk:
if contextFork !=
peer.network.cfg.consensusForkAtEpoch(res.get.contextEpoch):
return neterr InvalidContextBytes
return ok MsgType.init(res.get)
else:
return err(res.error)
else:
return neterr InvalidContextBytes
{.pop.}
func forkDigestAtEpoch(state: LightClientNetworkState,
epoch: Epoch): ForkDigest =
state.dag.forkDigests[].atEpoch(epoch, state.dag.cfg)
p2pProtocol LightClientSync(version = 1,
networkState = LightClientNetworkState):
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap
proc lightClientBootstrap(
peer: Peer,
blockRoot: Eth2Digest,
response: SingleChunkResponse[ForkedLightClientBootstrap])
{.async, libp2pProtocol("light_client_bootstrap", 1).} =
trace "Received LC bootstrap request", peer, blockRoot
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let bootstrap = dag.getLightClientBootstrap(blockRoot)
withForkyBootstrap(bootstrap):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyBootstrap.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientBootstrapResponseCost,
"light_client_bootstrap/1")
await response.sendSSZ(forkyBootstrap, contextBytes)
else:
raise newException(ResourceUnavailableError, LCBootstrapUnavailable)
debug "LC bootstrap request done", peer, blockRoot
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
proc lightClientUpdatesByRange(
peer: Peer,
startPeriod: SyncCommitteePeriod,
reqCount: uint64,
response: MultipleChunksResponse[
ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES])
{.async, libp2pProtocol("light_client_updates_by_range", 1).} =
trace "Received LC updates by range request", peer, startPeriod, reqCount
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let
headPeriod = dag.head.slot.sync_committee_period
# Limit number of updates in response
maxSupportedCount =
if startPeriod > headPeriod:
0'u64
else:
min(headPeriod + 1 - startPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
count = min(reqCount, maxSupportedCount)
onePastPeriod = startPeriod + count
var found = 0
for period in startPeriod..<onePastPeriod:
let update = dag.getLightClientUpdateForPeriod(period)
withForkyUpdate(update):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyUpdate.contextEpoch
contextBytes =
peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientUpdateResponseCost,
"light_client_updates_by_range/1")
await response.writeSSZ(forkyUpdate, contextBytes)
inc found
else:
discard
debug "LC updates by range request done", peer, startPeriod, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
proc lightClientFinalityUpdate(
peer: Peer,
response: SingleChunkResponse[ForkedLightClientFinalityUpdate])
{.async, libp2pProtocol("light_client_finality_update", 1).} =
trace "Received LC finality update request", peer
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let finality_update = dag.getLightClientFinalityUpdate()
withForkyFinalityUpdate(finality_update):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyFinalityUpdate.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientFinalityUpdateResponseCost,
"light_client_finality_update/1")
await response.sendSSZ(forkyFinalityUpdate, contextBytes)
else:
raise newException(ResourceUnavailableError, LCFinUpdateUnavailable)
debug "LC finality update request done", peer
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
proc lightClientOptimisticUpdate(
peer: Peer,
response: SingleChunkResponse[ForkedLightClientOptimisticUpdate])
{.async, libp2pProtocol("light_client_optimistic_update", 1).} =
trace "Received LC optimistic update request", peer
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let optimistic_update = dag.getLightClientOptimisticUpdate()
withForkyOptimisticUpdate(optimistic_update):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyOptimisticUpdate.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientOptimisticUpdateResponseCost,
"light_client_optimistic_update/1")
await response.sendSSZ(forkyOptimisticUpdate, contextBytes)
else:
raise newException(ResourceUnavailableError, LCOptUpdateUnavailable)
debug "LC optimistic update request done", peer
proc init*(T: type LightClientSync.NetworkState, dag: ChainDAGRef): T =
T(
dag: dag,
)

View File

@ -7,7 +7,7 @@
{.push raises: [].} {.push raises: [].}
import std/[heapqueue, tables, strutils, sequtils, algorithm] import std/[strutils, sequtils, algorithm]
import stew/[results, base10], chronos, chronicles import stew/[results, base10], chronos, chronicles
import import
../spec/datatypes/[phase0, altair], ../spec/datatypes/[phase0, altair],
@ -287,7 +287,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
remote_head_slot = peerSlot, local_head_slot = headSlot remote_head_slot = peerSlot, local_head_slot = headSlot
let let
peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime peerStatusAge = Moment.now() - peer.getStatusLastTime()
needsUpdate = needsUpdate =
# Latest status we got is old # Latest status we got is old
peerStatusAge >= StatusExpirationTime or peerStatusAge >= StatusExpirationTime or

View File

@ -8,9 +8,7 @@
{.push raises: [].} {.push raises: [].}
import import
std/[tables, sets, macros],
chronicles, chronos, snappy, snappy/codec, chronicles, chronos, snappy, snappy/codec,
libp2p/switch,
../spec/datatypes/[phase0, altair, bellatrix, capella, deneb], ../spec/datatypes/[phase0, altair, bellatrix, capella, deneb],
../spec/[helpers, forks, network], ../spec/[helpers, forks, network],
".."/[beacon_clock], ".."/[beacon_clock],
@ -19,47 +17,16 @@ import
../rpc/rest_constants ../rpc/rest_constants
logScope: logScope:
topics = "sync" topics = "sync_proto"
const const
blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs) blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs)
lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1)
## Only one bootstrap per peer should ever be needed - no need to allow more
lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000)
## Updates are tiny - we can allow lots of them
lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100)
type type
StatusMsg* = object BeaconSyncNetworkState* {.final.} = ref object of RootObj
forkDigest*: ForkDigest
finalizedRoot*: Eth2Digest
finalizedEpoch*: Epoch
headRoot*: Eth2Digest
headSlot*: Slot
ValidatorSetDeltaFlags {.pure.} = enum
Activation = 0
Exit = 1
ValidatorChangeLogEntry* = object
case kind*: ValidatorSetDeltaFlags
of Activation:
pubkey: ValidatorPubKey
else:
index: uint32
BeaconSyncNetworkState = ref object
dag: ChainDAGRef dag: ChainDAGRef
cfg: RuntimeConfig cfg: RuntimeConfig
forkDigests: ref ForkDigests
genesisBlockRoot: Eth2Digest genesisBlockRoot: Eth2Digest
getBeaconTime: GetBeaconTimeFn
BeaconSyncPeerState* = ref object
statusLastTime*: chronos.Moment
statusMsg*: StatusMsg
BlockRootSlot* = object BlockRootSlot* = object
blockRoot: Eth2Digest blockRoot: Eth2Digest
@ -133,163 +100,10 @@ proc readChunkPayload*(
else: else:
return neterr InvalidContextBytes return neterr InvalidContextBytes
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject):
Future[NetRes[MsgType]] {.async.} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError:
return neterr UnexpectedEOF
let contextFork =
peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr:
return neterr InvalidContextBytes
withLcDataFork(lcDataForkAtConsensusFork(contextFork)):
when lcDataFork > LightClientDataFork.None:
let res = await eth2_network.readChunkPayload(
conn, peer, MsgType.Forky(lcDataFork))
if res.isOk:
if contextFork !=
peer.network.cfg.consensusForkAtEpoch(res.get.contextEpoch):
return neterr InvalidContextBytes
return ok MsgType.init(res.get)
else:
return err(res.error)
else:
return neterr InvalidContextBytes
func shortLog*(s: StatusMsg): auto =
(
forkDigest: s.forkDigest,
finalizedRoot: shortLog(s.finalizedRoot),
finalizedEpoch: shortLog(s.finalizedEpoch),
headRoot: shortLog(s.headRoot),
headSlot: shortLog(s.headSlot)
)
chronicles.formatIt(StatusMsg): shortLog(it)
func disconnectReasonName(reason: uint64): string =
# haha, nim doesn't support uint64 in `case`!
if reason == uint64(ClientShutDown): "Client shutdown"
elif reason == uint64(IrrelevantNetwork): "Irrelevant network"
elif reason == uint64(FaultOrError): "Fault or error"
else: "Disconnected (" & $reason & ")"
func forkDigestAtEpoch(state: BeaconSyncNetworkState,
epoch: Epoch): ForkDigest =
state.forkDigests[].atEpoch(epoch, state.cfg)
proc getCurrentStatus(state: BeaconSyncNetworkState): StatusMsg =
let
dag = state.dag
wallSlot = state.getBeaconTime().slotOrZero
if dag != nil:
StatusMsg(
forkDigest: state.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: dag.finalizedHead.blck.root,
finalizedEpoch: dag.finalizedHead.slot.epoch,
headRoot: dag.head.root,
headSlot: dag.head.slot)
else:
StatusMsg(
forkDigest: state.forkDigestAtEpoch(wallSlot.epoch),
finalizedRoot: state.genesisBlockRoot,
finalizedEpoch: GENESIS_EPOCH,
headRoot: state.genesisBlockRoot,
headSlot: GENESIS_SLOT)
proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg):
Result[void, cstring] =
let
dag = state.dag
wallSlot = (state.getBeaconTime() + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero
if status.finalizedEpoch > status.headSlot.epoch:
# Can be equal during genesis or checkpoint start
return err("finalized epoch newer than head")
if status.headSlot > wallSlot:
return err("head more recent than wall clock")
if state.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest:
return err("fork digests differ")
if dag != nil:
if status.finalizedEpoch <= dag.finalizedHead.slot.epoch:
let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot())
if blockId.isSome and
(not status.finalizedRoot.isZero) and
status.finalizedRoot != blockId.get().bid.root:
return err("peer following different finality")
else:
if status.finalizedEpoch == GENESIS_EPOCH:
if status.finalizedRoot != state.genesisBlockRoot:
return err("peer following different finality")
ok()
proc handleStatus(peer: Peer,
state: BeaconSyncNetworkState,
theirStatus: StatusMsg): Future[bool] {.gcsafe.}
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.}
{.pop.} # TODO fix p2p macro for raises {.pop.} # TODO fix p2p macro for raises
p2pProtocol BeaconSync(version = 1, p2pProtocol BeaconSync(version = 1,
networkState = BeaconSyncNetworkState, networkState = BeaconSyncNetworkState):
peerState = BeaconSyncPeerState):
onPeerConnected do (peer: Peer, incoming: bool) {.async.}:
debug "Peer connected",
peer, peerId = shortLog(peer.peerId), incoming
# Per the eth2 protocol, whoever dials must send a status message when
# connected for the first time, but because of how libp2p works, there may
# be a race between incoming and outgoing connections and disconnects that
# makes the incoming flag unreliable / obsolete by the time we get to
# this point - instead of making assumptions, we'll just send a status
# message redundantly.
# TODO(zah)
# the spec does not prohibit sending the extra status message on
# incoming connections, but it should not be necessary - this would
# need a dedicated flow in libp2p that resolves the race conditions -
# this needs more thinking around the ordering of events and the
# given incoming flag
let
ourStatus = peer.networkState.getCurrentStatus()
theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
if theirStatus.isOk:
discard await peer.handleStatus(peer.networkState, theirStatus.get())
else:
debug "Status response not received in time",
peer, errorKind = theirStatus.error.kind
await peer.disconnect(FaultOrError)
proc status(peer: Peer,
theirStatus: StatusMsg,
response: SingleChunkResponse[StatusMsg])
{.async, libp2pProtocol("status", 1, isRequired = true).} =
let ourStatus = peer.networkState.getCurrentStatus()
trace "Sending status message", peer = peer, status = ourStatus
await response.send(ourStatus)
discard await peer.handleStatus(peer.networkState, theirStatus)
proc ping(peer: Peer, value: uint64): uint64
{.libp2pProtocol("ping", 1, isRequired = true).} =
return peer.network.metadata.seq_number
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2
proc getMetaData(peer: Peer): uint64
{.libp2pProtocol("metadata", 1, isRequired = true).} =
raise newException(InvalidInputsError, "GetMetaData v1 unsupported")
proc getMetadata_v2(peer: Peer): altair.MetaData
{.libp2pProtocol("metadata", 2, isRequired = true).} =
return peer.network.metadata
proc beaconBlocksByRange_v2( proc beaconBlocksByRange_v2(
peer: Peer, peer: Peer,
startSlot: Slot, startSlot: Slot,
@ -352,7 +166,7 @@ p2pProtocol BeaconSync(version = 1,
await response.writeBytesSZ( await response.writeBytesSZ(
uncompressedLen, bytes, uncompressedLen, bytes,
peer.networkState.forkDigestAtEpoch(blocks[i].slot.epoch).data) peer.network.forkDigestAtEpoch(blocks[i].slot.epoch).data)
inc found inc found
@ -414,14 +228,13 @@ p2pProtocol BeaconSync(version = 1,
await response.writeBytesSZ( await response.writeBytesSZ(
uncompressedLen, bytes, uncompressedLen, bytes,
peer.networkState.forkDigestAtEpoch(blockRef.slot.epoch).data) peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found inc found
debug "Block root request done", debug "Block root request done",
peer, roots = blockRoots.len, count, found peer, roots = blockRoots.len, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1 # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
proc blobSidecarsByRoot( proc blobSidecarsByRoot(
peer: Peer, peer: Peer,
@ -468,7 +281,7 @@ p2pProtocol BeaconSync(version = 1,
await response.writeBytesSZ( await response.writeBytesSZ(
uncompressedLen, bytes, uncompressedLen, bytes,
peer.networkState.forkDigestAtEpoch(blockRef.slot.epoch).data) peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data)
inc found inc found
debug "Blob root request done", debug "Blob root request done",
@ -538,7 +351,7 @@ p2pProtocol BeaconSync(version = 1,
await response.writeBytesSZ( await response.writeBytesSZ(
uncompressedLen, bytes, uncompressedLen, bytes,
peer.networkState.forkDigestAtEpoch(blockIds[i].slot.epoch).data) peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data)
inc found inc found
else: else:
break break
@ -546,203 +359,7 @@ p2pProtocol BeaconSync(version = 1,
debug "BlobSidecar range request done", debug "BlobSidecar range request done",
peer, startSlot, count = reqCount, found peer, startSlot, count = reqCount, found
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T =
proc lightClientBootstrap( T(
peer: Peer, dag: dag,
blockRoot: Eth2Digest, )
response: SingleChunkResponse[ForkedLightClientBootstrap])
{.async, libp2pProtocol("light_client_bootstrap", 1,
isLightClientRequest = true).} =
trace "Received LC bootstrap request", peer, blockRoot
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let bootstrap = dag.getLightClientBootstrap(blockRoot)
withForkyBootstrap(bootstrap):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyBootstrap.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientBootstrapResponseCost,
"light_client_bootstrap/1")
await response.sendSSZ(forkyBootstrap, contextBytes)
else:
raise newException(ResourceUnavailableError, LCBootstrapUnavailable)
debug "LC bootstrap request done", peer, blockRoot
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange
proc lightClientUpdatesByRange(
peer: Peer,
startPeriod: SyncCommitteePeriod,
reqCount: uint64,
response: MultipleChunksResponse[
ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES])
{.async, libp2pProtocol("light_client_updates_by_range", 1,
isLightClientRequest = true).} =
trace "Received LC updates by range request", peer, startPeriod, reqCount
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let
headPeriod = dag.head.slot.sync_committee_period
# Limit number of updates in response
maxSupportedCount =
if startPeriod > headPeriod:
0'u64
else:
min(headPeriod + 1 - startPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
count = min(reqCount, maxSupportedCount)
onePastPeriod = startPeriod + count
var found = 0
for period in startPeriod..<onePastPeriod:
let update = dag.getLightClientUpdateForPeriod(period)
withForkyUpdate(update):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyUpdate.contextEpoch
contextBytes =
peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientUpdateResponseCost,
"light_client_updates_by_range/1")
await response.writeSSZ(forkyUpdate, contextBytes)
inc found
else:
discard
debug "LC updates by range request done", peer, startPeriod, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate
proc lightClientFinalityUpdate(
peer: Peer,
response: SingleChunkResponse[ForkedLightClientFinalityUpdate])
{.async, libp2pProtocol("light_client_finality_update", 1,
isLightClientRequest = true).} =
trace "Received LC finality update request", peer
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let finality_update = dag.getLightClientFinalityUpdate()
withForkyFinalityUpdate(finality_update):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyFinalityUpdate.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientFinalityUpdateResponseCost,
"light_client_finality_update/1")
await response.sendSSZ(forkyFinalityUpdate, contextBytes)
else:
raise newException(ResourceUnavailableError, LCFinUpdateUnavailable)
debug "LC finality update request done", peer
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate
proc lightClientOptimisticUpdate(
peer: Peer,
response: SingleChunkResponse[ForkedLightClientOptimisticUpdate])
{.async, libp2pProtocol("light_client_optimistic_update", 1,
isLightClientRequest = true).} =
trace "Received LC optimistic update request", peer
let dag = peer.networkState.dag
doAssert dag.lcDataStore.serve
let optimistic_update = dag.getLightClientOptimisticUpdate()
withForkyOptimisticUpdate(optimistic_update):
when lcDataFork > LightClientDataFork.None:
let
contextEpoch = forkyOptimisticUpdate.contextEpoch
contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data
# TODO extract from libp2pProtocol
peer.awaitQuota(
lightClientOptimisticUpdateResponseCost,
"light_client_optimistic_update/1")
await response.sendSSZ(forkyOptimisticUpdate, contextBytes)
else:
raise newException(ResourceUnavailableError, LCOptUpdateUnavailable)
debug "LC optimistic update request done", peer
proc goodbye(peer: Peer,
reason: uint64)
{.async, libp2pProtocol("goodbye", 1, isRequired = true).} =
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
debug "Peer status", peer, statusMsg
peer.state(BeaconSync).statusMsg = statusMsg
peer.state(BeaconSync).statusLastTime = Moment.now()
proc handleStatus(peer: Peer,
state: BeaconSyncNetworkState,
theirStatus: StatusMsg): Future[bool] {.async, gcsafe.} =
let
res = checkStatusMsg(state, theirStatus)
return if res.isErr():
debug "Irrelevant peer", peer, theirStatus, err = res.error()
await peer.disconnect(IrrelevantNetwork)
false
else:
peer.setStatusMsg(theirStatus)
if peer.connectionState == Connecting:
# As soon as we get here it means that we passed handshake succesfully. So
# we can add this peer to PeerPool.
await peer.handlePeer()
true
proc updateStatus*(peer: Peer): Future[bool] {.async.} =
## Request `status` of remote peer ``peer``.
let
nstate = peer.networkState(BeaconSync)
ourStatus = getCurrentStatus(nstate)
let theirFut = awaitne peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR)
if theirFut.failed():
return false
else:
let theirStatus = theirFut.read()
if theirStatus.isOk:
return await peer.handleStatus(nstate, theirStatus.get())
else:
return false
proc getHeadSlot*(peer: Peer): Slot =
## Returns head slot for specific peer ``peer``.
peer.state(BeaconSync).statusMsg.headSlot
proc getFinalizedEpoch*(peer: Peer): Epoch =
## Returns head slot for specific peer ``peer``.
peer.state(BeaconSync).statusMsg.finalizedEpoch
proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef,
getBeaconTime: GetBeaconTimeFn) =
var networkState = network.protocolState(BeaconSync)
networkState.dag = dag
networkState.cfg = dag.cfg
networkState.forkDigests = dag.forkDigests
networkState.genesisBlockRoot = dag.genesisBlockRoot
networkState.getBeaconTime = getBeaconTime
proc initBeaconSync*(network: Eth2Node,
cfg: RuntimeConfig,
forkDigests: ref ForkDigests,
genesisBlockRoot: Eth2Digest,
getBeaconTime: GetBeaconTimeFn) =
var networkState = network.protocolState(BeaconSync)
networkState.dag = nil
networkState.cfg = cfg
networkState.forkDigests = forkDigests
networkState.genesisBlockRoot = genesisBlockRoot
networkState.getBeaconTime = getBeaconTime

View File

@ -940,7 +940,7 @@ func mapErrTo*[T, E](r: Result[T, E], v: static KeystoreGenerationErrorKind):
KeystoreGenerationError(kind: v, error: $e)) KeystoreGenerationError(kind: v, error: $e))
proc loadNetKeystore*(keystorePath: string, proc loadNetKeystore*(keystorePath: string,
insecurePwd: Option[string]): Opt[lcrypto.PrivateKey] = insecurePwd: Opt[string]): Opt[lcrypto.PrivateKey] =
if not(checkSensitiveFilePermissions(keystorePath)): if not(checkSensitiveFilePermissions(keystorePath)):
error "Network keystorage file has insecure permissions", error "Network keystorage file has insecure permissions",
@ -984,7 +984,7 @@ proc loadNetKeystore*(keystorePath: string,
return return
proc saveNetKeystore*(rng: var HmacDrbgContext, keystorePath: string, proc saveNetKeystore*(rng: var HmacDrbgContext, keystorePath: string,
netKey: lcrypto.PrivateKey, insecurePwd: Option[string] netKey: lcrypto.PrivateKey, insecurePwd: Opt[string]
): Result[void, KeystoreGenerationError] = ): Result[void, KeystoreGenerationError] =
let password = let password =
if insecurePwd.isSome(): if insecurePwd.isSome():
@ -2092,7 +2092,7 @@ proc loadWallet*(fileName: string): Result[Wallet, string] =
err "Error accessing wallet file \"" & fileName & "\": " & err.msg err "Error accessing wallet file \"" & fileName & "\": " & err.msg
proc findWallet*(config: BeaconNodeConf, proc findWallet*(config: BeaconNodeConf,
name: WalletName): Result[Option[WalletPathPair], string] = name: WalletName): Result[Opt[WalletPathPair], string] =
var walletFiles = newSeq[string]() var walletFiles = newSeq[string]()
try: try:
for kind, walletFile in walkDir(config.walletsDir): for kind, walletFile in walkDir(config.walletsDir):
@ -2100,7 +2100,7 @@ proc findWallet*(config: BeaconNodeConf,
let walletId = splitFile(walletFile).name let walletId = splitFile(walletFile).name
if cmpIgnoreCase(walletId, name.string) == 0: if cmpIgnoreCase(walletId, name.string) == 0:
let wallet = ? loadWallet(walletFile) let wallet = ? loadWallet(walletFile)
return ok some WalletPathPair(wallet: wallet, path: walletFile) return ok Opt.some WalletPathPair(wallet: wallet, path: walletFile)
walletFiles.add walletFile walletFiles.add walletFile
except OSError as err: except OSError as err:
return err("Error accessing the wallets directory \"" & return err("Error accessing the wallets directory \"" &
@ -2110,9 +2110,9 @@ proc findWallet*(config: BeaconNodeConf,
let wallet = ? loadWallet(walletFile) let wallet = ? loadWallet(walletFile)
if cmpIgnoreCase(wallet.name.string, name.string) == 0 or if cmpIgnoreCase(wallet.name.string, name.string) == 0 or
cmpIgnoreCase(wallet.uuid.string, name.string) == 0: cmpIgnoreCase(wallet.uuid.string, name.string) == 0:
return ok some WalletPathPair(wallet: wallet, path: walletFile) return ok Opt.some WalletPathPair(wallet: wallet, path: walletFile)
return ok none(WalletPathPair) return ok Opt.none(WalletPathPair)
type type
# This is not particularly well-standardized yet. # This is not particularly well-standardized yet.

View File

@ -251,3 +251,8 @@ For example, to run the block simulator for 384 slots, with 20,000 validators, a
build/block_sim --slots=384 --validators=20000 --attesterRatio=0.66 build/block_sim --slots=384 --validators=20000 --attesterRatio=0.66
``` ```
## Sync from a specific peer
```sh
build/nimbus_beacon_node --discv5:off --tcp-port=9876 --direct-peer="/ip4/127.0.0.1/tcp/9000/p2p/$(curl -s -X 'GET' 'http://localhost:5052/eth/v1/node/identity' -H 'accept: application/json' | jq -r .data.peer_id)"
```