Implement the latest networking spec
This commit is contained in:
@ -28,6 +28,8 @@ proc combine*(tgt: var Attestation, src: Attestation, flags: UpdateFlags) =
if skipValidation notin flags:
debug "Ignoring overlapping attestations"
proc validate(
state: BeaconState, attestation: Attestation, flags: UpdateFlags): bool =
@ -61,11 +63,11 @@ proc validate(
finalizedEpoch = humaneEpochNum(state.finalized_checkpoint.epoch)
if not allIt(attestation.custody_bits.bytes, it == 0):
if not attestation.custody_bits.BitSeq.isZeros:
notice "Invalid custody bitfield for phase 0"
return false
if not anyIt(attestation.aggregation_bits.bytes, it != 0):
if attestation.aggregation_bits.BitSeq.isZeros:
notice "Empty aggregation bitfield"
return false
@ -106,6 +106,10 @@ proc get(db: BeaconChainDB, key: auto, T: typedesc): Option[T] =
proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Option[BeaconBlock] =
db.get(subkey(BeaconBlock, key), BeaconBlock)
proc getBlock*(db: BeaconChainDB, slot: Slot): Option[BeaconBlock] =
# TODO implement this
proc getState*(db: BeaconChainDB, key: Eth2Digest): Option[BeaconState] =
db.get(subkey(BeaconState, key), BeaconState)
@ -10,8 +10,11 @@ import
sync_protocol, request_manager, genesis
topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks"
topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicBeaconBlocks = "/eth2/beacon_block/ssz"
topicAttestations = "/eth2/beacon_attestation/ssz"
topicVoluntaryExits = "/eth2/voluntary_exit/ssz"
topicProposerSlashings = "/eth2/proposer_slashing/ssz"
topicAttesterSlashings = "/eth2/attester_slashing/ssz"
dataDirValidators = "validators"
networkMetadataFile = "network.json"
@ -101,6 +104,7 @@ proc initGenesis(node: BeaconNode) {.async.} =
quit 1
info "Got genesis state", hash = hash_tree_root(tailState)
node.forkVersion = tailState.fork.current_version
let tailBlock = get_initial_beacon_block(tailState)
@ -159,6 +163,20 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
if metadataErrorMsg.len > 0:
fail "To connect to the ",, " network, please compile with", metadataErrorMsg
for bootNode in result.networkMetadata.bootstrapNodes:
if bootNode.isSameNode(result.networkIdentity):
result.isBootstrapNode = true
result.bootstrapNodes.add bootNode
for bootNode in conf.bootstrapNodes:
result.bootstrapNodes.add BootstrapAddr.init(bootNode)
let bootstrapFile = string conf.bootstrapNodesFile
if bootstrapFile.len > 0:
for ln in lines(bootstrapFile):
result.bootstrapNodes.add BootstrapAddr.init(string ln)
result.attachedValidators = ValidatorPool.init
init result.mainchainMonitor, "", Port(0) # TODO: specify geth address and port
@ -176,14 +194,12 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
result.blockPool = BlockPool.init(result.db)
result.attestationPool = AttestationPool.init(result.blockPool)
|||| = await createEth2Node(conf)
|||| = await createEth2Node(conf, result.bootstrapNodes)
# TODO sync is called when a remote peer is connected - is that the right
# time to do so?
let sync =
sync.chainId = 0 # TODO specify chainId
sync.networkId = result.networkMetadata.networkId
sync.node = result
sync.db = result.db
@ -211,28 +227,12 @@ template withState(
proc connectToNetwork(node: BeaconNode) {.async.} =
var bootstrapNodes = newSeq[BootstrapAddr]()
for bootNode in node.networkMetadata.bootstrapNodes:
if bootNode.isSameNode(node.networkIdentity):
node.isBootstrapNode = true
bootstrapNodes.add bootNode
for bootNode in node.config.bootstrapNodes:
bootstrapNodes.add BootstrapAddr.init(bootNode)
let bootstrapFile = string node.config.bootstrapNodesFile
if bootstrapFile.len > 0:
for ln in lines(bootstrapFile):
bootstrapNodes.add BootstrapAddr.init(string ln)
if bootstrapNodes.len > 0:
info "Connecting to bootstrap nodes", bootstrapNodes
if node.bootstrapNodes.len > 0:
info "Connecting to bootstrap nodes", bootstrapNodes = node.bootstrapNodes
info "Waiting for connections"
template findIt(s: openarray, predicate: untyped): int =
var res = -1
@ -14,10 +14,12 @@ type
BeaconNode* = ref object
nickname*: string
network*: Eth2Node
forkVersion*: array[4, byte]
networkIdentity*: Eth2NodeIdentity
networkMetadata*: NetworkMetadata
requestManager*: RequestManager
isBootstrapNode*: bool
bootstrapNodes*: seq[BootstrapAddr]
db*: BeaconChainDB
config*: BeaconNodeConf
attachedValidators*: ValidatorPool
@ -99,7 +99,8 @@ when networkBackend == rlpxBackend:
proc readValue*(reader: var JsonReader, value: var BootstrapAddr) {.inline.} =
value = initENode reader.readValue(string)
proc createEth2Node*(conf: BeaconNodeConf): Future[EthereumNode] {.async.} =
proc createEth2Node*(conf: BeaconNodeConf,
bootstrapNodes: seq[BootstrapAddr]): Future[EthereumNode] {.async.} =
keys = getPersistentNetIdentity(conf)
(ip, tcpPort, udpPort) = setupNat(conf)
@ -125,25 +126,19 @@ else:
os, random, stew/io,
libp2p/crypto/crypto, libp2p/daemon/daemonapi, eth/async_utils,
ssz, libp2p_backend
when networkBackend == libp2pSpecBackend:
import libp2p_spec_backend
export libp2p_spec_backend
const netBackendName* = "libp2p_spec"
import libp2p_backend
export libp2p_backend
const netBackendName* = "libp2p_native"
netBackendName* = "libp2p"
networkKeyFilename = "privkey.protobuf"
BootstrapAddr* = PeerInfo
Eth2NodeIdentity* = PeerInfo
networkKeyFilename = "privkey.protobuf"
proc init*(T: type BootstrapAddr, str: string): T =
Json.decode(str, PeerInfo)
@ -168,7 +163,13 @@ else:
var mainDaemon: DaemonAPI
proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} =
proc allMultiAddresses(nodes: seq[BootstrapAddr]): seq[string] =
for node in nodes:
for a in node.addresses:
result.add $a & "/ipfs/" & node.peer.pretty()
proc createEth2Node*(conf: BeaconNodeConf,
bootstrapNodes: seq[BootstrapAddr]): Future[Eth2Node] {.async.} =
(extIp, extTcpPort, extUdpPort) = setupNat(conf)
hostAddress = tcpEndPoint(globalListeningAddr, Port conf.tcpPort)
@ -176,11 +177,25 @@ else:
else: @[tcpEndPoint(extIp, extTcpPort)]
keyFile = conf.ensureNetworkIdFile
info "Starting the LibP2P daemon", hostAddress, announcedAddresses, keyFile
mainDaemon = await newDaemonApi({PSGossipSub},
info "Starting the LibP2P daemon", hostAddress, announcedAddresses,
keyFile, bootstrapNodes
var daemonFut = if bootstrapNodes.len == 0:
newDaemonApi({DHTFull, PSGossipSub},
id = keyFile,
hostAddresses = @[hostAddress],
announcedAddresses = announcedAddresses)
newDaemonApi({DHTFull, PSGossipSub, WaitBootstrap},
id = keyFile,
hostAddresses = @[hostAddress],
announcedAddresses = announcedAddresses,
bootstrapNodes = allMultiAddresses(bootstrapNodes),
peersRequired = 1)
info "Deamon started"
mainDaemon = await daemonFut
proc closeDaemon() {.noconv.} =
info "Shutting down the LibP2P daemon"
@ -1,5 +1,7 @@
import conf, chronos, web3, json,
spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator], extras
chronos, web3, json,
spec/[datatypes, digest, crypto, beaconstate, helpers, validator],
conf, extras
proc deposit(pubkey: Bytes48, withdrawalCredentials: Bytes32, signature: Bytes96)
@ -1,6 +1,6 @@
stew/shims/[macros, tables], chronos, chronicles,
stew/varints, stew/shims/[macros, tables], chronos, chronicles,
libp2p/daemon/daemonapi, faststreams/output_stream, serialization,
json_serialization/std/options, eth/p2p/p2p_protocol_dsl,
libp2p_json_serialization, ssz
@ -32,8 +32,8 @@ type
DisconnectionReason* = enum
UntypedResponder = object
@ -70,7 +70,6 @@ type
ResponseCode* = enum
@ -92,13 +91,84 @@ type
defaultIncomingReqTimeout = 5000
defaultOutgoingReqTimeout = 10000
HandshakeTimeout = BreachOfProtocol
HandshakeTimeout = FaultOrError
RQRP_MAX_SIZE = 2 * 1024 * 1024
IrrelevantNetwork* = UselessPeer
template `$`*(peer: Peer): string = $
chronicles.formatIt(Peer): $it
template libp2pProtocol*(name: string, version: int) {.pragma.}
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peers.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerId)
node.peers[peerId] = result
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
peer.connectionState = Disconnected
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string,
reason = FaultOrError): auto =
const errMsg = errMsgExpr
debug errMsg, err = getCurrentExceptionMsg()
disconnectAndRaise(peer, reason, errMsg)
proc getCompressedMsgId*(MsgType: type): CompressedMsgId =
mixin msgId, msgProtocol, protocolInfo
(protocolIdx: MsgType.msgProtocol.protocolInfo.index,
methodId: MsgType.msgId)
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific P2P message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
## to completion before the future returned by `nextMsg` is resolved.
let awaitedMsgId = getCompressedMsgId(MsgType)
let f = getOrDefault(peer.awaitedMessages, awaitedMsgId)
if not f.isNil:
return Future[MsgType](f)
initFuture result
peer.awaitedMessages[awaitedMsgId] = result
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} =
new result
@ -120,6 +190,27 @@ proc readMsg(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.}
proc readSizePrefix(transp: StreamTransport,
deadline: Future[void]): Future[int] {.async.} =
var parser: VarintParser[uint64, ProtoBuf]
while true:
var nextByte: byte
var readNextByte = transp.readExactly(addr nextByte, 1)
await readNextByte or deadline
if not readNextByte.finished:
return -1
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(RQRP_MAX_SIZE):
return -1
return int(res)
of Overflow:
return -1
of Incomplete:
proc readMsgBytes(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
@ -127,15 +218,13 @@ proc readMsgBytes(stream: P2PStream,
var responseCode: byte
var readResponseCode = stream.transp.readExactly(addr responseCode, 1)
await readResponseCode or deadline
if not readResponseCode.finished: return
if not readResponseCode.finished:
if responseCode > ResponseCode.high.byte: return
logScope: responseCode = ResponseCode(responseCode)
case ResponseCode(responseCode)
of InvalidRequest:
debug "P2P request was classified as invalid"
of EncodingError, ServerError:
of InvalidRequest, ServerError:
let responseErrMsg = await readMsg(stream, string, false, deadline)
debug "P2P request resulted in error", responseErrMsg
@ -143,18 +232,17 @@ proc readMsgBytes(stream: P2PStream,
# The response is OK, the execution continues below
var sizePrefix: uint32
var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix))
await readSizePrefix or deadline
if not readSizePrefix.finished: return
var sizePrefix = await readSizePrefix(stream.transp, deadline)
if sizePrefix < -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer
if sizePrefix == 0:
debug "Received SSZ with zero size", peer = stream.peer
var msgBytes = newSeq[byte]( + sizeof(sizePrefix))
copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix))
var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)],
var msgBytes = newSeq[byte](sizePrefix)
var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix)
await readBody or deadline
if not readBody.finished: return
@ -178,6 +266,13 @@ proc readMsg(stream: P2PStream,
msgBytes, errMsg = err.formatMsg("<msg>")
proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes =
var s = init OutputStream
s.append byte(responseCode)
s.appendVarint errMsg.len
s.appendValue SSZ, errMsg
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
err: ref SerializationError,
@ -186,33 +281,44 @@ proc sendErrorResponse(peer: Peer,
debug "Received an invalid request",
peer, msgName, msgBytes, errMsg = err.formatMsg("<msg>")
var responseCode = byte(EncodingError)
discard await stream.transp.write(addr responseCode, 1)
let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg"))
discard await stream.transp.write(responseBytes)
await stream.close()
proc sendErrorResponse(peer: Peer,
stream: P2PStream,
responseCode: ResponseCode,
errMsg: string) {.async.} =
debug "Error processing request",
peer, responseCode, errMsg
debug "Error processing request", peer, responseCode, errMsg
var outputStream = init OutputStream
outputStream.append byte(responseCode)
outputStream.appendValue SSZ, errMsg
discard await stream.transp.write(outputStream.getOutput)
let responseBytes = encodeErrorMsg(ServerError, errMsg)
discard await stream.transp.write(responseBytes)
await stream.close()
proc writeSizePrefix(transp: StreamTransport, size: uint64) {.async.} =
varintBuf: array[10, byte]
varintSize = vsizeof(size)
cursor = createWriteCursor(varintBuf)
cursor.appendVarint size
var sent = await transp.write(varintBuf[0 ..< varintSize])
if sent != varintSize:
raise newException(TransmissionError, "Failed to deliver size prefix")
proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var stream = await, @[protocolId])
# TODO how does openStream fail? Set a timeout here and handle it
await writeSizePrefix(stream.transp, uint64(requestBytes.len))
let sent = await stream.transp.write(requestBytes)
if sent != requestBytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
raise newException(TransmissionError, "Failed to deliver msg bytes")
proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} =
let sent = await stream.transp.write(bytes)
proc sendResponseBytes(stream: P2PStream, bytes: Bytes) {.async.} =
var sent = await stream.transp.write(@[byte Success])
if sent != 1:
raise newException(TransmissionError, "Failed to deliver response code")
await writeSizePrefix(stream.transp, uint64(bytes.len))
sent = await stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes")
@ -228,6 +334,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# Send the request
let stream =
await writeSizePrefix(stream.transp, requestBytes.len.uint64)
let sent = await stream.transp.write(requestBytes)
if sent != requestBytes.len:
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")
@ -241,7 +349,7 @@ proc exchangeHandshake(peer: Peer, protocolId: string, requestBytes: Bytes,
var response = await makeEth2Request(peer, protocolId, requestBytes,
ResponseMsg, timeout)
if not response.isSome:
await peer.disconnectAndRaise(BreachOfProtocol, "Failed to complete a handshake")
await peer.disconnectAndRaise(FaultOrError, "Failed to complete a handshake")
return response.get
@ -269,12 +377,11 @@ template handshakeImpl(outputStreamVar, handshakeSerializationCall: untyped,
var responseFut = nextMsg(peer, HandshakeType)
await lowLevelThunk(, stream) or deadline
if not responseFut.finished:
await disconnectAndRaise(peer, BreachOfProtocol, "Failed to complete a handshake")
await disconnectAndRaise(peer, FaultOrError, "Failed to complete a handshake")
var outputStreamVar = init OutputStream
append(outputStreamVar, byte(Success))
await sendBytes(stream, getOutput(outputStreamVar))
await sendResponseBytes(stream, getOutput(outputStreamVar))
@ -330,7 +437,16 @@ proc registerMsg(protocol: ProtocolInfo,
printer: printer)
proc getRequestProtoName(fn: NimNode): NimNode =
return newLit("/ETH/BeaconChain/" & $ & "/1/SSZ")
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
return pragma[1]
return newLit("")
proc init*[MsgType](T: type Responder[MsgType],
peer: Peer, stream: P2PStream): T =
@ -363,17 +479,9 @@ proc implementSendProcBody(sendProc: SendProc) =
quote: sendMsg(`peer`, `msgProto`, `bytes`)
quote: sendBytes(`UntypedResponder`(`peer`).stream, `bytes`)
quote: sendResponseBytes(`UntypedResponder`(`peer`).stream, `bytes`)
proc prependResponseCode(stream: NimNode): NimNode =
quote: append(`stream`, byte(Success))
let preSerializationStep = if msg.kind == msgResponse:
sendProc.useStandardBody(preSerializationStep, nil, sendCallGenerator)
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
@ -1,74 +0,0 @@
# included from libp2p_backend
template `$`*(peer: Peer): string = $
chronicles.formatIt(Peer): $it
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.}
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} =
result = node.peers.getOrDefault(peerId)
if result == nil:
result = Peer.init(node, peerId)
node.peers[peerId] = result
proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} =
proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
peer.connectionState = Disconnected
template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
e.reason = r
raise e
proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)
template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string,
reason = FaultOrError): auto =
const errMsg = errMsgExpr
debug errMsg, err = getCurrentExceptionMsg()
disconnectAndRaise(peer, reason, errMsg)
proc getCompressedMsgId*(MsgType: type): CompressedMsgId =
mixin msgId, msgProtocol, protocolInfo
(protocolIdx: MsgType.msgProtocol.protocolInfo.index,
methodId: MsgType.msgId)
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific P2P message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
## to completion before the future returned by `nextMsg` is resolved.
let awaitedMsgId = getCompressedMsgId(MsgType)
let f = getOrDefault(peer.awaitedMessages, awaitedMsgId)
if not f.isNil:
return Future[MsgType](f)
initFuture result
peer.awaitedMessages[awaitedMsgId] = result
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
handshake: HandshakeStep,
disconnectHandler: DisconnectionHandler) =
p.handshake = handshake
p.disconnectHandler = disconnectHandler
@ -1,645 +0,0 @@
tables, deques, options, algorithm, stew/shims/[macros, tables],
stew/ranges/ptr_arith, chronos, chronicles, serialization, faststreams/input_stream,
eth/async_utils, eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi,
libp2p_json_serialization, ssz
daemonapi, p2pProtocol, serialization, ssz, libp2p_json_serialization
# Compression nibble
NoCompression* = byte 0
# Encoding nibble
SszEncoding* = byte 1
beaconChainProtocol = "/eth/serenity/beacon/rpc/1"
Eth2Node* = ref object of RootObj
daemon*: DaemonAPI
peers*: Table[PeerID, Peer]
protocolStates*: seq[RootRef]
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
Peer* = ref object
network*: Eth2Node
id*: PeerID
lastReqId*: uint64
rpcStream*: P2PStream
connectionState*: ConnectionState
awaitedMessages: Table[CompressedMsgId, FutureBase]
outstandingRequests*: Table[uint64, OutstandingRequest]
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
ConnectionState* = enum
DisconnectionReason* = enum
ClientShutdown = 1
CompressedMsgId = tuple
protocolIdx, methodId: int
ResponderWithId*[MsgType] = object
peer*: Peer
reqId*: uint64
Response*[MsgType] = distinct Peer
# -----------------------------------------
ResponseCode* = enum
ParseError = 10
InvalidRequest = 20
MethodNotFound = 30
ServerError = 40
OutstandingRequest* = object
id*: uint64
future*: FutureBase
timeoutAt*: Moment
responseThunk*: ThunkProc
ProtocolConnection* = object
stream*: P2PStream
protocolInfo*: ProtocolInfo
MessageInfo* = object
id*: int
name*: string
# Private fields:
thunk*: ThunkProc
printer*: MessageContentPrinter
nextMsgResolver*: NextMsgResolver
requestResolver*: RequestResolver
ProtocolInfoObj* = object
name*: string
version*: int
messages*: seq[MessageInfo]
index*: int # the position of the protocol in the
# ordered list of supported protocols
# Private fields:
peerStateInitializer*: PeerStateInitializer
networkStateInitializer*: NetworkStateInitializer
handshake*: HandshakeStep
disconnectHandler*: DisconnectionHandler
ProtocolInfo* = ptr ProtocolInfoObj
SpecOuterMsgHeader {.packed.} = object
compression {.bitsize: 4.}: uint
encoding {.bitsize: 4.}: uint
msgLen: uint64
SpecInnerMsgHeader {.packed.} = object
reqId: uint64
methodId: uint16
ErrorResponse {.packed.} = object
outerHeader: SpecOuterMsgHeader
innerHeader: SpecInnerMsgHeader
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.}
NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.}
HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.}
DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.}
ThunkProc* = proc(peer: Peer,
stream: P2PStream,
reqId: uint64,
reqFuture: FutureBase,
msgData: ByteStreamVar): Future[void] {.gcsafe.}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.}
NextMsgResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.}
RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.}
Bytes = seq[byte]
InvalidMsgIdError = object of InvalidMsgError
PeerDisconnected* = object of P2PBackendError
reason*: DisconnectionReason
PeerLoopExitReason = enum
HandshakeTimeout = FaultOrError
BreachOfProtocol* = FaultOrError
# TODO: We should lobby for more disconnection reasons.
template isOdd(val: SomeInteger): bool =
type T = type(val)
(val and T(1)) != 0
proc init(T: type SpecOuterMsgHeader,
compression, encoding: byte, msgLen: uint64): T =
T(compression: compression, encoding: encoding, msgLen: msgLen)
proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} =
await stream.transp.readExactly(addr result, sizeof result)
proc appendPackedObject(stream: OutputStreamVar, value: auto) =
let valueAsBytes = cast[ptr byte](unsafeAddr(value))
stream.append makeOpenArray(valueAsBytes, sizeof(value))
proc getThunk(protocol: ProtocolInfo, methodId: uint16): ThunkProc =
if >= protocol.messages.len: return nil
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
include libp2p_backends_common
proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.}
proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[Eth2Node] {.async.} =
new result
result.daemon = daemon
result.daemon.userData = result
result.peers = initTable[PeerID, Peer]()
newSeq result.protocolStates, allProtocols.len
for proto in allProtocols:
if proto.networkStateInitializer != nil:
result.protocolStates[proto.index] = proto.networkStateInitializer(result)
await daemon.addHandler(@[beaconChainProtocol], handleConnectingBeaconChainPeer)
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
new result
|||| = id
|||| = network
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
result.maxInactivityAllowed = 15.minutes # TODO: read this from the config
result.connectionState = None
newSeq result.protocolStates, allProtocols.len
for i in 0 ..< allProtocols.len:
let proto = allProtocols[i]
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
proc init*[MsgName](T: type ResponderWithId[MsgName],
peer: Peer, reqId: uint64): T =
T(peer: peer, reqId: reqId)
proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} =
var unsentBytes = data.len
while true:
# TODO: this looks wrong.
# We are always trying to write the same data.
# Find all other places where such code is used.
unsentBytes -= await peer.rpcStream.transp.write(data)
if unsentBytes <= 0: return
except CatchableError:
await peer.disconnect(FaultOrError)
# this is usually a "(32) Broken pipe":
# FIXME: this exception should be caught somewhere in addMsgHandler() and
# sending should be retried a few times
proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] =
return sendMsg(responder.peer, data)
proc sendErrorResponse(peer: Peer, reqId: uint64,
responseCode: ResponseCode): Future[void] =
var resp = ErrorResponse(
outerHeader: SpecOuterMsgHeader.init(
compression = NoCompression,
encoding = SszEncoding,
msgLen = uint64 sizeof(SpecInnerMsgHeader)),
innerHeader: SpecInnerMsgHeader(
reqId: reqId,
methodId: uint16(responseCode)))
# TODO: don't allocate the Bytes sequence here
return peer.sendMsg @(makeOpenArray(cast[ptr byte](addr resp), sizeof resp))
proc recvAndDispatchMsg*(peer: Peer): Future[PeerLoopExitReason] {.async.} =
template fail(reason) =
return reason
# For now, we won't try to handle the presence of multiple sub-protocols
# since the spec is not defining how they will be mapped to P2P streams.
doAssert allProtocols.len == 1
stream = peer.rpcStream
protocol = allProtocols[0]
var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader)
if outerHeader.compression != NoCompression:
fail UnsupportedCompression
if outerHeader.encoding != SszEncoding:
fail UnsupportedEncoding
if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64:
fail ProtocolViolation
innerHeader = await stream.readPackedObject(SpecInnerMsgHeader)
reqId = innerHeader.reqId
var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64)
await stream.transp.readExactly(addr msgContent[0], msgContent.len)
var msgContentStream = memoryStream(msgContent)
if reqId.isOdd:
peer.outstandingRequests.withValue(reqId, req):
let thunk = req.responseThunk
let reqFuture = req.future
await thunk(peer, stream, reqId, reqFuture, msgContentStream)
except SerializationError:
debug "Error during deserialization", err = getCurrentExceptionMsg()
fail ProtocolViolation
except CatchableError:
warn ""
debug "Ignoring late or invalid response ID", peer, id = reqId
# TODO: skip the message
let thunk = protocol.getThunk(innerHeader.methodId)
if thunk != nil:
await thunk(peer, stream, reqId, nil, msgContentStream)
except SerializationError:
debug "Error during deserialization", err = getCurrentExceptionMsg()
fail ProtocolViolation
except CatchableError:
warn ""
debug "P2P request method not found", methodId = innerHeader.methodId
await peer.sendErrorResponse(reqId, MethodNotFound)
proc dispatchMessages*(peer: Peer): Future[PeerLoopExitReason] {.async.} =
while true:
let dispatchedMsgFut = recvAndDispatchMsg(peer)
doAssert peer.maxInactivityAllowed.milliseconds > 0
yield dispatchedMsgFut or sleepAsync(peer.maxInactivityAllowed)
if not dispatchedMsgFut.finished:
return InactivePeer
elif dispatchedMsgFut.failed:
error "Error in peer loop"
return InternalError
let status =
if status == Success: continue
return status
proc performProtocolHandshakes*(peer: Peer) {.async.} =
peer.initProtocolStates allProtocols
# Please note that the ordering of operations here is important!
# We must first start all handshake procedures and give them a
# chance to send any initial packages they might require over
# the network and to yield on their `nextMsg` waits.
var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len)
for protocol in allProtocols:
if protocol.handshake != nil:
subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream))
# The `dispatchMesssages` loop must be started after this.
# Otherwise, we risk that some of the handshake packets sent by
# the other peer may arrrive too early and be processed before
# the handshake code got a change to wait for them.
var messageProcessingLoop = peer.dispatchMessages()
messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} =
if messageProcessingLoop.failed:
debug "Ending dispatchMessages loop", peer,
err = messageProcessingLoop.error.msg
debug "Ending dispatchMessages", peer,
exitCode =
traceAsyncErrors peer.disconnect(ClientShutdown)
# The handshake may involve multiple async steps, so we wait
# here for all of them to finish.
await all(subProtocolsHandshakes)
peer.connectionState = Connected
debug "Peer connection initialized", peer
proc initializeConnection*(peer: Peer) {.async.} =
let daemon =
peer.connectionState = Connecting
peer.rpcStream = await daemon.openStream(, @[beaconChainProtocol])
await performProtocolHandshakes(peer)
except CatchableError:
await reraiseAsPeerDisconnected(peer, "Failed to perform handshake")
proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} =
let peer = daemon.peerFromStream(stream)
peer.rpcStream = stream
peer.connectionState = Connecting
await performProtocolHandshakes(peer)
proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo,
methodId: int, msg: pointer, reqFuture: FutureBase) =
let msgId = (protocolIdx: protocol.index, methodId: methodId)
if peer.awaitedMessages[msgId] != nil:
let msgInfo = protocol.messages[methodId]
msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId])
peer.awaitedMessages[msgId] = nil
if reqFuture != nil and not reqFuture.finished:
protocol.messages[methodId].requestResolver(msg, reqFuture)
proc initProtocol(name: string, version: int,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
|||| = name
result.version = version
result.messages = @[]
result.peerStateInitializer = peerInit
result.networkStateInitializer = networkInit
proc registerMsg(protocol: ProtocolInfo,
id: int, name: string,
thunk: ThunkProc,
printer: MessageContentPrinter,
requestResolver: RequestResolver,
nextMsgResolver: NextMsgResolver) =
if protocol.messages.len <= id:
protocol.messages.setLen(id + 1)
protocol.messages[id] = MessageInfo(id: id,
name: name,
thunk: thunk,
printer: printer,
requestResolver: requestResolver,
nextMsgResolver: nextMsgResolver)
template applyDecorator(p: NimNode, decorator: NimNode) =
if decorator.kind != nnkNilLit: p.addPragma decorator
proc prepareRequest(peer: Peer,
protocol: ProtocolInfo,
requestMethodId, responseMethodId: uint16,
stream: OutputStreamVar,
timeout: Duration,
responseFuture: FutureBase): DelayedWriteCursor =
assert peer != nil and
protocol != nil and
responseFuture != nil and
|||| < protocol.messages.len
doAssert timeout.milliseconds > 0
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
inc peer.lastReqId, 2
let reqId = peer.lastReqId
stream.appendPackedObject SpecInnerMsgHeader(
reqId: reqId, methodId: requestMethodId)
template responseMsgInfo: auto =
requestResolver = responseMsgInfo.requestResolver
timeoutAt = Moment.fromNow(timeout)
peer.outstandingRequests[reqId + 1] = OutstandingRequest(
id: reqId,
future: responseFuture,
timeoutAt: timeoutAt,
responseThunk: responseMsgInfo.thunk)
proc timeoutExpired(udata: pointer) =
requestResolver(nil, responseFuture)
peer.outstandingRequests.del(reqId + 1)
addTimer(timeoutAt, timeoutExpired, nil)
proc prepareResponse(responder: ResponderWithId,
stream: OutputStreamVar): DelayedWriteCursor =
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
stream.appendPackedObject SpecInnerMsgHeader(
reqId: responder.reqId + 1,
methodId: uint16(Success))
proc prepareMsg(peer: Peer, methodId: uint16,
stream: OutputStreamVar): DelayedWriteCursor =
result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader)
inc peer.lastReqId, 2
stream.appendPackedObject SpecInnerMsgHeader(
reqId: peer.lastReqId, methodId: methodId)
proc finishOuterHeader(headerCursor: DelayedWriteCursor) =
var outerHeader = SpecOuterMsgHeader.init(
compression = NoCompression,
encoding = SszEncoding,
msgLen = uint64(headerCursor.totalBytesWrittenAfterCursor))
headerCursor.endWrite makeOpenArray(cast[ptr byte](addr outerHeader),
sizeof outerHeader)
proc implementSendProcBody(sendProc: SendProc) =
msg = sendProc.msg
delayedWriteCursor = ident "delayedWriteCursor"
peer = sendProc.peerParam
proc preSerializationStep(stream: NimNode): NimNode =
case msg.kind
of msgRequest:
requestMethodId = newLit(
responseMethodId = newLit(
protocol = sendProc.msg.protocol.protocolInfoVar
timeout = sendProc.timeoutParam
quote do:
var `delayedWriteCursor` = prepareRequest(
`peer`, `protocol`, `requestMethodId`, `responseMethodId`,
`stream`, `timeout`, `resultIdent`)
of msgResponse:
quote do:
var `delayedWriteCursor` = prepareResponse(`peer`, `stream`)
of msgHandshake, msgNotification:
let methodId = newLit(
quote do:
var `delayedWriteCursor` = prepareMsg(`peer`, `methodId`, `stream`)
proc postSerializationStep(stream: NimNode): NimNode =
newCall(bindSym "finishOuterHeader", delayedWriteCursor)
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"
sendMsg = bindSym "sendMsg"
sendCall = newCall(sendMsg, peer, bytes)
if msg.kind == msgRequest:
# In RLPx requests, the returned future was allocated here and passed
# to `prepareRequest`. It's already assigned to the result variable
# of the proc, so we just wait for the sending operation to complete
# and we return in a normal way. (the waiting is done, so we can catch
# any possible errors).
quote: `linkSendFailureToReqFuture`(`sendCall`, `resultIdent`)
# In normal RLPx messages, we are returning the future returned by the
# `sendMsg` call.
quote: return `sendCall`
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
Option = bindSym "Option"
Peer = bindSym "Peer"
EthereumNode = bindSym "EthereumNode"
Format = ident "SSZ"
Response = bindSym "Response"
ResponderWithId = bindSym "ResponderWithId"
perProtocolMsgId = ident "perProtocolMsgId"
mount = bindSym "mount"
messagePrinter = bindSym "messagePrinter"
resolveFuture = bindSym "resolveFuture"
requestResolver = bindSym "requestResolver"
resolvePendingFutures = bindSym "resolvePendingFutures"
nextMsg = bindSym "nextMsg"
initProtocol = bindSym "initProtocol"
registerMsg = bindSym "registerMsg"
handshakeImpl = bindSym "handshakeImpl"
stream = ident "stream"
protocol = ident "protocol"
response = ident "response"
reqFutureVar = ident "reqFuture"
msgContents = ident "msgContents"
receivedMsg = ident "receivedMsg"
ProtocolInfo = bindSym "ProtocolInfo"
P2PStream = bindSym "P2PStream"
ByteStreamVar = bindSym "ByteStreamVar"
new result
result.registerProtocol = bindSym "registerProtocol"
result.setEventHandlers = bindSym "setEventHandlers"
result.PeerType = Peer
result.NetworkType = EthereumNode
result.SerializationFormat = Format
p.useRequestIds = true
result.ReqIdType = ident "uint64"
result.ResponderType = ResponderWithId
result.afterProtocolInit = proc (p: P2PProtocol) =
p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream)
result.implementMsg = proc (msg: Message) =
msgIdLit = newLit(
msgRecName = msg.recIdent
msgIdent = msg.ident
msgName = $msgIdent
protocol = msg.protocol
## Implemenmt Thunk
let traceMsg = when tracingEnabled:
newCall(bindSym"logReceivedMsg", peer, receivedMsg)
let callResolvePendingFutures = newCall(
resolvePendingFutures, peerVar,
newCall("addr", receivedMsg),
var userHandlerParams = @[peerVar]
if msg.kind == msgRequest:
userHandlerParams.add reqIdVar
thunkName = ident(msgName & "_thunk")
awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, userHandlerParams)
msg.defineThunk quote do:
proc `thunkName`(`peerVar`: `Peer`,
`stream`: `P2PStream`,
`reqIdVar`: uint64,
`reqFutureVar`: FutureBase,
`msgContents`: `ByteStreamVar`) {.async, gcsafe.} =
var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`)
## Implement Senders and Handshake
var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake))
implementSendProcBody sendProc
if msg.kind == msgHandshake:
discard msg.createHandshakeTemplate(, handshakeImpl, nextMsg)
newTree(nnkBracketExpr, messagePrinter, msgRecName),
newTree(nnkBracketExpr, requestResolver, msgRecName),
newTree(nnkBracketExpr, resolveFuture, msgRecName)))
result.implementProtocolInit = proc (protocol: P2PProtocol): NimNode =
return newCall(initProtocol,
protocol.peerInit, protocol.netInit)
@ -96,6 +96,8 @@ func `$`*(x: BlsValue): string =
if x.kind == Real:
# r: is short for random. The prefix must be short
# due to the mechanics of the `shortLog` function.
"r:" & toHex(x.blob)
func `==`*(a, b: BlsValue): bool =
@ -178,13 +180,16 @@ func bls_verify_multiple*(
sig: ValidatorSig, domain: uint64): bool =
let L = len(pubkeys)
doAssert L == len(message_hashes)
doAssert sig.kind == Real
# TODO optimize using multiPairing
for pubkey_message_hash in zip(pubkeys, message_hashes):
let (pubkey, message_hash) = pubkey_message_hash
doAssert pubkey.kind == Real
# TODO spec doesn't say to handle this specially, but it's silly to
# validate without any actual public keys.
if not pubkey.bls_verify(, sig, domain):
if pubkey.blsValue != VerKey() and
not sig.blsValue.verify(, domain, pubkey.blsValue):
return false
@ -518,7 +518,7 @@ Json.useCustomSerialization(BitSeq):
BitSeq reader.readValue(string).hexToSeqByte
writer.writeValue "0x" & value.bytes.toHex
writer.writeValue "0x" & Bytes(value).toHex
template readValue*(reader: var JsonReader, value: var BitList) =
type T = type(value)
@ -531,13 +531,14 @@ template init*(T: type BitList, len: int): auto = T init(BitSeq, len)
template len*(x: BitList): auto = len(BitSeq(x))
template bytes*(x: BitList): auto = bytes(BitSeq(x))
template `[]`*(x: BitList, idx: auto): auto = BitSeq(x)[idx]
template `[]=`*(x: BitList, idx: auto, val: bool) = BitSeq(x)[idx] = val
template `[]=`*(x: var BitList, idx: auto, val: bool) = BitSeq(x)[idx] = val
template `==`*(a, b: BitList): bool = BitSeq(a) == BitSeq(b)
template raiseBit*(x: BitList, idx: int) = raiseBit(BitSeq(x), idx)
template lowerBit*(x: BitList, idx: int) = lowerBit(BitSeq(x), idx)
template raiseBit*(x: var BitList, idx: int) = raiseBit(BitSeq(x), idx)
template lowerBit*(x: var BitList, idx: int) = lowerBit(BitSeq(x), idx)
template overlaps*(a, b: BitList): bool = overlaps(BitSeq(a), BitSeq(b))
template combine*(a, b: BitList) = combine(BitSeq(a), BitSeq(b))
template combine*(a: var BitList, b: BitList) = combine(BitSeq(a), BitSeq(b))
template isSubsetOf*(a, b: BitList): bool = isSubsetOf(BitSeq(a), BitSeq(b))
template `$`*(a: BitList): string = $(BitSeq(a))
when useListType:
template len*[T; N](x: List[T, N]): auto = len(seq[T](x))
@ -9,7 +9,7 @@
# See
endians, stew/shims/macros, options, algorithm, math,
endians, stew/shims/macros, options, algorithm, math, options,
stew/[bitops2, bitseqs, objects, varints], stew/ranges/ptr_arith, stint,
faststreams/input_stream, serialization, serialization/testing/tracing,
nimcrypto/sha2, blscurve, eth/common,
@ -59,6 +59,8 @@ type
FixedSizedWriterCtx = object
Bytes = seq[byte]
serializationFormat SSZ,
Reader = SszReader,
Writer = SszWriter,
@ -95,7 +97,9 @@ template toSszType*(x: auto): auto =
when x is Slot|Epoch|ValidatorIndex|enum: uint64(x)
elif x is Eth2Digest:
elif x is BlsValue|BlsCurveType: getBytes(x)
elif x is BitSeq|BitList: bytes(x)
elif x is BitSeq|BitList: Bytes(x)
elif x is ref|ptr: toSszType x[]
elif x is Option: toSszType x.get
elif x is TypeWithMaxLen: toSszType valueOf(x)
elif useListType and x is List: seq[x.T](x)
else: x
@ -173,7 +177,7 @@ template writeField*(w: var SszWriter,
field: auto) =
mixin toSszType
when ctx is FixedSizedWriterCtx:
writeFixedSized(w, toSszType(field))
writeFixedSized(, toSszType(field))
type FieldType = type toSszType(field)
@ -185,7 +189,7 @@ template writeField*(w: var SszWriter,
let initPos =
trs "WRITING VAR SIZE VALUE OF TYPE ", name(FieldType)
when FieldType is BitSeq:
trs "BIT SEQ ", field.bytes
trs "BIT SEQ ", Bytes(field)
writeVarSizeType(w, toSszType(field))
ctx.offset += - initPos
@ -200,7 +204,9 @@ func writeVarSizeType(w: var SszWriter, value: auto) =
when T is seq|string|openarray:
type E = ElemType(T)
when isFixedSize(E):
const isFixed = when E is Option: false
else: isFixedSize(E)
when isFixed:
for elem in value:
|||| toSszType(elem)
@ -211,6 +217,10 @@ func writeVarSizeType(w: var SszWriter, value: auto) =
var cursor = offset
for elem in value:
cursor.writeFixedSized uint32(offset)
when elem is Option:
if not isSome(elem): continue
elif elem is ptr|ref:
if isNil(elem): continue
let initPos =
w.writeVarSizeType toSszType(elem)
offset += - initPos
@ -448,8 +458,8 @@ func bitlistHashTreeRoot(merkelizer: SszChunksMerkelizer, x: BitSeq): Eth2Digest
trs "CHUNKIFYING BIT SEQ WITH LIMIT ", merkelizer.limit
totalBytes = x.bytes.len
lastCorrectedByte = x.bytes[^1]
totalBytes = Bytes(x).len
lastCorrectedByte = Bytes(x)[^1]
if lastCorrectedByte == byte(1):
if totalBytes == 1:
@ -461,7 +471,7 @@ func bitlistHashTreeRoot(merkelizer: SszChunksMerkelizer, x: BitSeq): Eth2Digest
getZeroHashWithoutSideEffect(0)) # this is the mixed length
totalBytes -= 1
lastCorrectedByte = x.bytes[^2]
lastCorrectedByte = Bytes(x)[^2]
let markerPos = log2trunc(lastCorrectedByte)
@ -480,14 +490,14 @@ func bitlistHashTreeRoot(merkelizer: SszChunksMerkelizer, x: BitSeq): Eth2Digest
chunkStartPos = i * bytesPerChunk
chunkEndPos = chunkStartPos + bytesPerChunk - 1
merkelizer.addChunk x.bytes.toOpenArray(chunkEndPos, chunkEndPos)
merkelizer.addChunk Bytes(x).toOpenArray(chunkEndPos, chunkEndPos)
lastChunk: array[bytesPerChunk, byte]
chunkStartPos = fullChunks * bytesPerChunk
for i in 0 .. bytesInLastChunk - 2:
lastChunk[i] = x.bytes[chunkStartPos + i]
lastChunk[i] = Bytes(x)[chunkStartPos + i]
lastChunk[bytesInLastChunk - 1] = lastCorrectedByte
@ -1,5 +1,5 @@
endians, typetraits,
endians, typetraits, options,
stew/[objects, bitseqs], serialization/testing/tracing,
../spec/[digest, datatypes], ./types
@ -7,6 +7,14 @@ template setLen[R, T](a: var array[R, T], length: int) =
if length != a.len:
raise newException(MalformedSszError, "SSZ input of insufficient size")
template assignNullValue(loc: untyped, T: type): auto =
when T is ref|ptr:
loc = nil
elif T is Option:
loc = T()
raise newException(MalformedSszError, "SSZ list element of zero size")
# fromSszBytes copies the wire representation to a Nim variable,
# assuming there's enough data in the buffer
func fromSszBytes*(T: type SomeInteger, data: openarray[byte]): T =
@ -61,6 +69,13 @@ proc readSszValue*(input: openarray[byte], T: type): T =
when useListType and result is List:
type ElemType = type result[0]
result = T readSszValue(input, seq[ElemType])
elif result is ptr|ref:
if input.len > 0:
new result
result[] = readSszValue(input, type(result[]))
elif result is Option:
if input.len > 0:
result = some readSszValue(input, result.T)
elif result is string|seq|openarray|array:
type ElemType = type result[0]
when ElemType is byte|char:
@ -96,6 +111,9 @@ proc readSszValue*(input: openarray[byte], T: type): T =
result.setLen resultLen
for i in 1 ..< resultLen:
let nextOffset = readOffset(i * offsetSize)
if nextOffset == offset:
assignNullValue result[i - 1], ElemType
result[i - 1] = readSszValue(input[offset ..< nextOffset], ElemType)
offset = nextOffset
@ -1,5 +1,5 @@
tables, options,
stew/shims/macros, stew/[objects, bitseqs],
serialization/[object_serialization, errors]
@ -85,7 +85,7 @@ template ElemType*(T: type[seq|string|List]): untyped =
func isFixedSize*(T0: type): bool {.compileTime.} =
mixin toSszType, enumAllSerializedFields
when T0 is openarray:
when T0 is openarray|Option|ref|ptr:
return false
type T = type toSszType(default T0)
@ -110,7 +110,7 @@ func fixedPortionSize*(T0: type): int {.compileTime.} =
type E = ElemType(T)
when isFixedSize(E): elementCount * fixedPortionSize(E)
else: elementCount * offsetSize
elif T is seq|string|openarray: offsetSize
elif T is seq|string|openarray|ref|ptr|Option: offsetSize
elif T is object|tuple:
var res = 0
@ -19,8 +19,6 @@ type
ValidatorSet = seq[Validator]
BeaconSyncState* = ref object
networkId*: uint8
chainId*: uint64
node*: BeaconNode
db*: BeaconChainDB
@ -78,15 +76,13 @@ proc getBeaconBlocks*(peer: Peer,
backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.}
p2pProtocol BeaconSync(version = 1,
shortName = "bcs",
rlpxName = "bcs",
networkState = BeaconSyncState):
onPeerConnected do (peer: Peer):
protocolVersion = 1 # TODO: Spec doesn't specify this yet
node = peer.networkState.node
networkId = peer.networkState.networkId
chainId = peer.networkState.networkId
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
@ -94,11 +90,11 @@ p2pProtocol BeaconSync(version = 1,
bestSlot = headBlock.slot
latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
let m = await peer.hello(networkId, chainId, finalizedHead.blck.root,
latestFinalizedEpoch, bestRoot, bestSlot,
timeout = 10.seconds)
let m = await peer.hello(node.forkVersion,
finalizedHead.blck.root, latestFinalizedEpoch,
bestRoot, bestSlot, timeout = 10.seconds)
if m.networkId != networkId:
if m.forkVersion != node.forkVersion:
await peer.disconnect(IrrelevantNetwork)
@ -142,22 +138,59 @@ p2pProtocol BeaconSync(version = 1,
proc hello(
peer: Peer,
networkId: uint8,
chainId: uint64,
fork_version: array[4, byte],
latestFinalizedRoot: Eth2Digest,
latestFinalizedEpoch: Epoch,
bestRoot: Eth2Digest,
bestSlot: Slot)
bestSlot: Slot) {.
libp2pProtocol("/eth2/beacon_chain/req/hello", 1).}
proc goodbye(peer: Peer, reason: DisconnectionReason)
proc goodbye(
peer: Peer,
reason: DisconnectionReason) {.
libp2pProtocol("/eth2/beacon_chain/req/goodbye", 1).}
nextId 10
proc getBeaconBlocks(
peer: Peer,
headBlockRoot: Eth2Digest,
count: uint64,
step: uint64) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_blocks", 1).} =
var blocks = newSeq[Option[BeaconBlock]](int count)
let db = peer.networkState.db
blocks[0] = db.getBlock(headBlockRoot)
if isSome(blocks[0]):
for i in uint64(1) ..< count:
blocks[] = db.getBlock(Slot(blocks[0].get.slot.uint64 + i * step))
await response.send(blocks)
proc getRecentBeaconBlocks(
peer: Peer,
blockRoots: openarray[Eth2Digest]) {.
libp2pProtocol("/eth2/beacon_chain/req/recent_beacon_blocks", 1).} =
var blocks = newSeqOfCap[Option[BeaconBlock]](blockRoots.len)
let db = peer.networkState.db
for root in blockRoots:
blocks.add db.getBlock(root)
await response.send(blocks)
proc beaconBlocks(
peer: Peer,
blocks: openarray[Option[BeaconBlock]])
proc getBeaconBlockRoots(
peer: Peer,
fromSlot: Slot,
maxRoots: uint64) =
maxRoots: uint64) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_block_roots", 1).} =
let maxRoots = min(MaxRootsToRequest, maxRoots)
var s = fromSlot
var roots = newSeqOfCap[BlockRootSlot](maxRoots)
@ -170,7 +203,9 @@ p2pProtocol BeaconSync(version = 1,
s += 1
await response.send(roots)
proc beaconBlockRoots(peer: Peer, roots: openarray[BlockRootSlot])
proc beaconBlockRoots(
peer: Peer,
roots: openarray[BlockRootSlot])
proc getBeaconBlockHeaders(
@ -179,7 +214,8 @@ p2pProtocol BeaconSync(version = 1,
slot: Slot,
maxHeaders: uint64,
skipSlots: uint64,
backward: bool) =
backward: bool) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_block_headers", 1).} =
let maxHeaders = min(MaxHeadersToRequest, maxHeaders)
var headers: seq[BeaconBlockHeader]
let db = peer.networkState.db
@ -222,12 +258,55 @@ p2pProtocol BeaconSync(version = 1,
await response.send(headers)
proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader])
proc beaconBlockHeaders(
peer: Peer,
blockHeaders: openarray[BeaconBlockHeader])
# TODO move this at the bottom, because it's not in the spec yet, but it will
# consume a `method_id`
proc getAncestorBlocks(
peer: Peer,
needed: openarray[FetchRecord]) {.
libp2pProtocol("/eth2/beacon_chain/req/ancestor_blocks", 1).} =
var resp = newSeqOfCap[BeaconBlock](needed.len)
let db = peer.networkState.db
var neededRoots = initSet[Eth2Digest]()
for rec in needed: neededRoots.incl(rec.root)
for rec in needed:
if (var blck = db.getBlock(rec.root); blck.isSome()):
# TODO validate historySlots
let firstSlot = blck.get().slot - rec.historySlots
for i in 0..<
if resp.len >= MaxAncestorBlocksResponse:
if blck.get().parent_root in neededRoots:
# Don't send duplicate blocks, if neededRoots has roots that are
# in the same chain
if (blck = db.getBlock(blck.get().parent_root);
blck.isNone() or blck.get().slot < firstSlot):
if resp.len >= MaxAncestorBlocksResponse:
await response.send(resp)
proc ancestorBlocks(
peer: Peer,
blocks: openarray[BeaconBlock])
proc getBeaconBlockBodies(
peer: Peer,
blockRoots: openarray[Eth2Digest]) =
blockRoots: openarray[Eth2Digest]) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_block_bodies", 1).} =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db
@ -1,23 +1,21 @@
NetworkBackendType* = enum
network_type {.strdefine.} = "libp2p_native"
network_type {.strdefine.} = "libp2p"
networkBackend* = when network_type == "rlpx": rlpxBackend
elif network_type == "libp2p_spec": libp2pSpecBackend
elif network_type == "libp2p_native": libp2pNativeBackend
else: {.fatal: "The 'network_type' should be one of 'libp2p_spec', 'libp2p_native' or 'rlpx'" .}
elif network_type == "libp2p": libp2pBackend
else: {.fatal: "The 'network_type' should be either 'libp2p' or 'rlpx'" .}
versionMajor* = 0
versionMinor* = 2
versionMinor* = 3
versionBuild* = 0
semanticVersion* = 1
semanticVersion* = 2
# Bump this up every time a breaking change is introduced
# Clients having different semantic versions won't be able
# to join the same testnets.
@ -1,5 +1,5 @@
Reference in New Issue