Changes related to the new ETH interop spec

* Hello is no longer a handshake message
  (all handshakes related code was deleted for clarity)

* Deal with the single-parameter inlining defined in the new spec
This commit is contained in:
Zahary Karadjov 2019-09-08 18:03:41 -04:00 committed by zah
parent f5b0474aa2
commit 2bbfa8c877
5 changed files with 136 additions and 242 deletions

View File

@ -222,7 +222,8 @@ else:
for bootstrapNode in bootstrapNodes:
try:
await node.daemon.connect(bootstrapNode.peer, bootstrapNode.addresses)
let peer = node.getPeer(bootstrapNode.peer)
var peer = node.getPeer(bootstrapNode.peer)
peer.wasDialed = true
await initializeConnection(peer)
connected = true
except PeerDisconnected:

View File

@ -19,8 +19,8 @@ type
Peer* = ref object
network*: Eth2Node
id*: PeerID
wasDialed*: bool
connectionState*: ConnectionState
awaitedMessages: Table[CompressedMsgId, FutureBase]
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
@ -139,24 +139,6 @@ template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string,
debug errMsg, err = getCurrentExceptionMsg()
disconnectAndRaise(peer, reason, errMsg)
proc getCompressedMsgId*(MsgType: type): CompressedMsgId =
mixin msgId, msgProtocol, protocolInfo
(protocolIdx: MsgType.msgProtocol.protocolInfo.index,
methodId: MsgType.msgId)
proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] =
## This procs awaits a specific P2P message.
## Any messages received while waiting will be dispatched to their
## respective handlers. The designated message handler will also run
## to completion before the future returned by `nextMsg` is resolved.
let awaitedMsgId = getCompressedMsgId(MsgType)
let f = getOrDefault(peer.awaitedMessages, awaitedMsgId)
if not f.isNil:
return Future[MsgType](f)
initFuture result
peer.awaitedMessages[awaitedMsgId] = result
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
@ -343,62 +325,14 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# Read the response
return await stream.readMsg(ResponseMsg, true, deadline)
proc exchangeHandshake(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[ResponseMsg] {.gcsafe, async.} =
var response = await makeEth2Request(peer, protocolId, requestBytes,
ResponseMsg, timeout)
if not response.isSome:
await peer.disconnectAndRaise(FaultOrError, "Failed to complete a handshake")
return response.get
proc p2pStreamName(MsgType: type): string =
mixin msgProtocol, protocolInfo, msgId
MsgType.msgProtocol.protocolInfo.messages[MsgType.msgId].libp2pProtocol
template handshakeImpl(outputStreamVar, handshakeSerializationCall: untyped,
lowLevelThunk: untyped,
HandshakeType: untyped,
# TODO: we cannot use a type parameter above
# because of the following Nim issue:
#
peer: Peer,
stream: P2PStream,
timeout: Duration): auto =
if stream == nil:
var outputStreamVar = init OutputStream
handshakeSerializationCall
exchangeHandshake(peer, p2pStreamName(HandshakeType),
getOutput(outputStreamVar), HandshakeType, timeout)
else:
proc asyncStep: Future[HandshakeType] {.async.} =
let deadline = sleepAsync timeout
var responseFut = nextMsg(peer, HandshakeType)
await lowLevelThunk(peer.network.daemon, stream) or deadline
if not responseFut.finished:
await disconnectAndRaise(peer, FaultOrError, "Failed to complete a handshake")
var outputStreamVar = init OutputStream
handshakeSerializationCall
await sendResponseBytes(stream, getOutput(outputStreamVar))
return responseFut.read
asyncStep()
proc resolveNextMsgFutures(peer: Peer, msg: auto) =
type MsgType = type(msg)
let msgId = getCompressedMsgId(MsgType)
let future = peer.awaitedMessages.getOrDefault(msgId)
if future != nil:
Future[MsgType](future).complete msg
proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer =
new result
result.id = id
result.network = network
result.awaitedMessages = initTable[CompressedMsgId, FutureBase]()
result.connectionState = Connected
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
newSeq result.protocolStates, allProtocols.len
@ -444,7 +378,9 @@ proc getRequestProtoName(fn: NimNode): NimNode =
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
return pragma[1]
let protoName = $(pragma[1])
let protoVer = $(pragma[2].intVal)
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/ssz")
return newLit("")
@ -465,17 +401,10 @@ proc implementSendProcBody(sendProc: SendProc) =
of msgRequest:
let
timeout = msg.timeoutParam[0]
ResponseRecord = msg.response.recIdent
ResponseRecord = msg.response.recName
quote:
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
of msgHandshake:
let
timeout = msg.timeoutParam[0]
HandshakeRecord = msg.recIdent
quote:
exchangeHandshake(`peer`, `msgProto`, `bytes`,
`HandshakeRecord`, `timeout`)
else:
quote: sendMsg(`peer`, `msgProto`, `bytes`)
else:
@ -522,7 +451,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
protocol = msg.protocol
msgName = $msg.ident
msgNameLit = newLit msgName
msgRecName = msg.recIdent
msgRecName = msg.recName
if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest:
# Request procs need an extra param - the stream where the response
@ -567,7 +496,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
try:
`tracing`
`awaitUserHandler`
resolveNextMsgFutures(`peerVar`, `msgVar`)
except CatchableError as `errVar`:
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, `errVar`.msg)
@ -575,84 +503,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
## Implement Senders and Handshake
##
if msg.kind == msgHandshake:
# In LibP2P protocols, the handshake thunk is special. Instead of directly
# deserializing the incoming message and calling the user-supplied handler,
# we execute the `onPeerConnected` handler instead.
#
# The `onPeerConnected` handler is executed symmetrically for both peers
# and it's expected that one of its very first steps would be to send the
# handshake and then await the same from the other side. We call this step
# "handshakeExchanger".
#
# For the initiating peer, the handshakeExchanger opens a stream and sends
# a regular request through it, but on the receiving side, it just setups
# a future and call the lower-level thunk that will complete it.
#
let
handshake = msg.protocol.onPeerConnected
lowLevelThunkName = $thunkName
if handshake.isNil:
macros.error "A LibP2P protocol with a handshake must also include an " &
"`onPeerConnected` handler.", msg.procDef
# We must generate a forward declaration for the `onPeerConnected` handler,
# so we can call it from the thunk proc:
let handshakeProcName = handshake.name
msg.protocol.outRecvProcs.add quote do:
proc `handshakeProcName`(`peerVar`: `Peer`,
`streamVar`: `P2PStream`) {.async, gcsafe.}
# Here we replace the 'thunkProc' that will be registered as a handler
# for incoming messages:
thunkName = ident(msgName & "_handleConnection")
msg.protocol.outRecvProcs.add quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`) {.async, gcsafe.} =
let `peerVar` = peerFromStream(`daemonVar`, `streamVar`)
try:
`await` `handshakeProcName`(`peerVar`, `streamVar`)
except SerializationError as err:
debug "Failed to decode message",
err = err.formatMsg("<msg>"),
msg = `msgNameLit`,
peer = $(`streamVar`.peer)
`await` disconnect(`peerVar`, FaultOrError)
except CatchableError as err:
debug "Failed to complete handshake", err = err.msg
`await` disconnect(`peerVar`, FaultOrError)
var
handshakeSerializer = msg.createSerializer()
handshakeSerializerName = newLit($handshakeSerializer.name)
handshakeExchanger = msg.createSendProc(nnkMacroDef)
paramsArray = newTree(nnkBracket).appendAllParams(handshakeExchanger.def)
handshakeTypeName = newLit($msg.recIdent)
getAst = ident "getAst"
res = ident "result"
handshakeExchanger.setBody quote do:
let
stream = ident "stream"
outputStreamVar = ident "outputStream"
lowLevelThunk = ident `lowLevelThunkName`
HandshakeType = ident `handshakeTypeName`
params = `paramsArray`
peer = params[0]
timeout = params[^1]
handshakeSerializationCall = newCall(`bindSymOp` `handshakeSerializerName`, params)
handshakeSerializationCall[1] = outputStreamVar
handshakeSerializationCall.del(handshakeSerializationCall.len - 1)
`res` = `getAst`(handshakeImpl(outputStreamVar, handshakeSerializationCall,
lowLevelThunk, HandshakeType,
peer, stream, timeout))
when defined(debugMacros) or defined(debugHandshake):
echo "---- Handshake implementation ----"
echo repr(`res`)
macros.error "Handshake messages are not supported in LibP2P protocols"
else:
var sendProc = msg.createSendProc()
implementSendProcBody sendProc

View File

@ -150,7 +150,7 @@ proc readSszValue*(input: openarray[byte], T: type): T =
trs "READING FOREIGN ", fieldName, ": ", name(SszType)
field = fromSszBytes(FieldType, input[startOffset ..< endOffset])
elif result is SomeInteger|bool:
elif result is SomeInteger|bool|enum:
trs "READING BASIC TYPE ", type(result).name, " input=", input.len
result = fromSszBytes(type(result), input)
trs "RESULT WAS ", repr(result)

View File

@ -24,10 +24,13 @@ type
ValidatorSet = seq[Validator]
BeaconSyncState* = ref object
BeaconSyncNetworkState* = ref object
node*: BeaconNode
db*: BeaconChainDB
BeaconSyncPeerState* = ref object
initialHelloReceived: bool
BlockRootSlot* = object
blockRoot: Eth2Digest
slot: Slot
@ -85,27 +88,21 @@ proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest,
slot: Slot, maxBlocks, skipSlots: uint64,
backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.}
p2pProtocol BeaconSync(version = 1,
rlpxName = "bcs",
networkState = BeaconSyncState):
type
HelloMsg = object
forkVersion*: array[4, byte]
latestFinalizedRoot*: Eth2Digest
latestFinalizedEpoch*: Epoch
bestRoot*: Eth2Digest
bestSlot*: Slot
onPeerConnected do (peer: Peer):
let
protocolVersion = 1 # TODO: Spec doesn't specify this yet
node = peer.networkState.node
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
bestRoot = headBlock.root
bestSlot = headBlock.slot
latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
let handshakeFut = peer.hello(node.forkVersion,
finalizedHead.blck.root, latestFinalizedEpoch,
bestRoot, bestSlot, timeout = 10.seconds)
let m = await handshakeFut
if m.forkVersion != node.forkVersion:
proc handleInitialHello(peer: Peer,
node: BeaconNode,
latestFinalizedEpoch: Epoch,
bestSlot: Slot,
bestRoot: Eth2Digest,
h: HelloMsg) {.async.} =
if h.forkVersion != node.forkVersion:
await peer.disconnect(IrrelevantNetwork)
return
@ -115,9 +112,9 @@ p2pProtocol BeaconSync(version = 1,
# number of randomly selected peers. The algorithm itself must be extracted in a proc.
try:
libp2p_peers.set peer.network.peers.len.int64
debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = m.bestSlot
debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = h.bestSlot
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (h.latestFinalizedEpoch, h.bestSlot))
if bestDiff >= 0:
# Nothing to do?
debug "Nothing to sync", peer
@ -126,9 +123,9 @@ p2pProtocol BeaconSync(version = 1,
# connection if it's too big.
var s = bestSlot + 1
while s <= m.bestSlot:
debug "Waiting for block headers", fromPeer = peer, remoteBestSlot = m.bestSlot, peer
let headersLeft = uint64(m.bestSlot - s)
while s <= h.bestSlot:
debug "Waiting for block headers", fromPeer = peer, remoteBestSlot = h.bestSlot, peer
let headersLeft = uint64(h.bestSlot - s)
let blocks = await peer.getBeaconBlocksSpec(bestRoot, s, min(headersLeft, MaxHeadersToRequest), 0, false)
if blocks.isSome:
if blocks.get.len == 0:
@ -147,23 +144,67 @@ p2pProtocol BeaconSync(version = 1,
except CatchableError:
warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg()
p2pProtocol BeaconSync(version = 1,
rlpxName = "bcs",
networkState = BeaconSyncNetworkState,
peerState = BeaconSyncPeerState):
onPeerConnected do (peer: Peer):
if peer.wasDialed:
let
protocolVersion = 1 # TODO: Spec doesn't specify this yet
node = peer.networkState.node
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
bestRoot = headBlock.root
bestSlot = headBlock.slot
latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
let h = await peer.hello(HelloMsg(
fork_version: node.forkVersion,
latestFinalizedRoot: finalizedHead.blck.root,
latestFinalizedEpoch: latestFinalizedEpoch,
bestRoot: bestRoot,
bestSlot: bestSlot), timeout = 10.seconds)
if h.isSome:
await peer.handleInitialHello(node, latestFinalizedEpoch, bestSlot, bestRoot, h.get)
else:
warn "Hello response not received in time"
onPeerDisconnected do (peer: Peer):
libp2p_peers.set peer.network.peers.len.int64
handshake:
proc hello(
peer: Peer,
fork_version: array[4, byte],
latestFinalizedRoot: Eth2Digest,
latestFinalizedEpoch: Epoch,
bestRoot: Eth2Digest,
bestSlot: Slot) {.
libp2pProtocol("/eth2/beacon_chain/req/hello/1/ssz", 1).}
requestResponse:
proc hello(peer: Peer, hhh: HelloMsg) {.libp2pProtocol("hello", 1).} =
let
protocolVersion = 1 # TODO: Spec doesn't specify this yet
node = peer.networkState.node
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
bestRoot = headBlock.root
bestSlot = headBlock.slot
latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
await response.send(HelloMsg(
fork_version: node.forkVersion,
latestFinalizedRoot: finalizedHead.blck.root,
latestFinalizedEpoch: latestFinalizedEpoch,
bestRoot: bestRoot,
bestSlot: bestSlot))
if not peer.state.initialHelloReceived:
peer.state.initialHelloReceived = true
await peer.handleInitialHello(node, latestFinalizedEpoch, bestSlot, bestRoot, hhh)
proc helloResp(peer: Peer, msg: HelloMsg) {.libp2pProtocol("hello", 1).}
proc goodbye(
peer: Peer,
reason: DisconnectionReason) {.
libp2pProtocol("/eth2/beacon_chain/req/goodbye/1/ssz", 1).}
libp2pProtocol("goodbye", 1).}
requestResponse:
proc beaconBlocksByRange(
@ -172,7 +213,7 @@ p2pProtocol BeaconSync(version = 1,
start_slot: uint64,
count: uint64,
step: uint64) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz", 1).} =
libp2pProtocol("beacon_blocks_by_range", 1).} =
var blocks: seq[BeaconBlock]
# `step == 0` has no sense, so we will return empty array of blocks.
# `count == 0` means that empty array of blocks requested.
@ -203,16 +244,15 @@ p2pProtocol BeaconSync(version = 1,
proc beaconBlocksByRoot(
peer: Peer,
blockRoots: openarray[Eth2Digest]) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz", 1).} =
libp2pProtocol("beacon_blocks_by_root", 1).} =
let pool = peer.networkState.node.blockPool
var blocks = newSeq[BeaconBlock](len(blockRoots))
let db = peer.networkState.db
var blocks = newSeqOfCap[BeaconBlock](blockRoots.len)
var index = 0
for root in blockRoots:
let blockRef = pool.getRef(root)
if not(isNil(blockRef)):
blocks[index] = pool.get(blockRef).data
inc(index)
blocks.add pool.get(blockRef).data
await response.send(blocks)
@ -225,7 +265,7 @@ p2pProtocol BeaconSync(version = 1,
peer: Peer,
fromSlot: Slot,
maxRoots: uint64) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_block_roots/1/ssz", 1).} =
libp2pProtocol("beacon_block_roots", 1).} =
let maxRoots = min(MaxRootsToRequest, maxRoots)
var s = fromSlot
var roots = newSeqOfCap[BlockRootSlot](maxRoots)
@ -250,7 +290,7 @@ p2pProtocol BeaconSync(version = 1,
maxHeaders: uint64,
skipSlots: uint64,
backward: bool) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_block_headers/1/ssz", 1).} =
libp2pProtocol("beacon_block_headers", 1).} =
let maxHeaders = min(MaxHeadersToRequest, maxHeaders)
var headers: seq[BeaconBlockHeader]
let db = peer.networkState.db
@ -303,7 +343,7 @@ p2pProtocol BeaconSync(version = 1,
proc getAncestorBlocks(
peer: Peer,
needed: openarray[FetchRecord]) {.
libp2pProtocol("/eth2/beacon_chain/req/ancestor_blocks/1/ssz", 1).} =
libp2pProtocol("ancestor_blocks", 1).} =
var resp = newSeqOfCap[BeaconBlock](needed.len)
let db = peer.networkState.db
var neededRoots = initSet[Eth2Digest]()
@ -341,7 +381,7 @@ p2pProtocol BeaconSync(version = 1,
proc getBeaconBlockBodies(
peer: Peer,
blockRoots: openarray[Eth2Digest]) {.
libp2pProtocol("/eth2/beacon_chain/req/beacon_block_bodies/1/ssz", 1).} =
libp2pProtocol("beacon_block_bodies", 1).} =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db
@ -367,7 +407,7 @@ proc getBeaconBlocks*(peer: Peer,
let headersResp = await peer.getBeaconBlockHeaders(blockRoot, slot, maxBlocks, skipSlots, backward)
if headersResp.isNone: return
let headers = headersResp.get.blockHeaders
let headers = headersResp.get
if headers.len == 0:
info "Peer has no headers", peer
var res: seq[BeaconBlock]
@ -381,7 +421,7 @@ proc getBeaconBlocks*(peer: Peer,
info "Did not receive bodies", peer
return
result = mergeBlockHeadersAndBodies(headers, bodiesResp.get.blockBodies)
result = mergeBlockHeadersAndBodies(headers, bodiesResp.get)
# If result.isNone: disconnect with BreachOfProtocol?
proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest, slot: Slot,
@ -393,6 +433,8 @@ proc getBeaconBlocksSpec*(peer: Peer, blockRoot: Eth2Digest, slot: Slot,
var startSlot = uint64(slot) + skipSlots
var blocksResp = await peer.beaconBlocksByRange(blockRoot, startSlot,
maxBlocks, 1'u64)
let blocks = blocksResp.get.blocks
if blocksResp.isSome:
let blocks = blocksResp.get
info "Peer returned blocks", peer, count = len(blocks)
result = some(blocks)
return some(blocks)

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 44adb2a70a7a6d5720652029f722960eeec400de
Subproject commit b4c0629bf35edd346a54fa6fcf5805395e893a72