Implement the latest modification of the spec

This commit is contained in:
Zahary Karadjov 2019-09-11 12:45:22 -04:00
parent 37043f0d91
commit 1555efd9d8
No known key found for this signature in database
GPG Key ID: C8936F8A3073D609
2 changed files with 44 additions and 62 deletions

View File

@ -36,10 +36,9 @@ type
IrrelevantNetwork IrrelevantNetwork
FaultOrError FaultOrError
UntypedResponder = ref object UntypedResponder = object
peer*: Peer peer*: Peer
stream*: P2PStream stream*: P2PStream
totalBytesSent*: int
Responder*[MsgType] = distinct UntypedResponder Responder*[MsgType] = distinct UntypedResponder
@ -233,7 +232,7 @@ proc readMsgBytes(stream: P2PStream,
discard discard
var sizePrefix = await readSizePrefix(stream.transp, deadline) var sizePrefix = await readSizePrefix(stream.transp, deadline)
if sizePrefix < -1: if sizePrefix == -1:
debug "Failed to read an incoming message size prefix", peer = stream.peer debug "Failed to read an incoming message size prefix", peer = stream.peer
return return
@ -357,44 +356,27 @@ proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async
if sent != bytes.len: if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes") raise newException(TransmissionError, "Failed to deliver all bytes")
responder.totalBytesSent += bytes.len
proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} = proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} =
var s = init OutputStream var s = init OutputStream
s.append byte(Success) s.append byte(Success)
s.appendValue SSZ, sizePrefixed(val) s.appendValue SSZ, sizePrefixed(val)
let bytes = s.getOutput let bytes = s.getOutput
if responder.totalBytesSent + bytes.len > REQ_RESP_MAX_SIZE:
raiseMaxRespSizeError()
let sent = await responder.stream.transp.write(bytes) let sent = await responder.stream.transp.write(bytes)
if sent != bytes.len: if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes") raise newException(TransmissionError, "Failed to deliver all bytes")
responder.totalBytesSent += bytes.len
proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} = proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} =
var s = init OutputStream var s = init OutputStream
var limitReached = false
for chunk in chunks: for chunk in chunks:
s.append byte(Success) s.append byte(Success)
s.appendValue SSZ, sizePrefixed(chunk) s.appendValue SSZ, sizePrefixed(chunk)
# TODO: This is not quite right, but it will serve as an approximation
# for now. We need a sszSize function to implement it properly.
if s.pos > REQ_RESP_MAX_SIZE:
limitReached = true
break
let bytes = s.getOutput let bytes = s.getOutput
let sent = await responder.stream.transp.write(bytes) let sent = await responder.stream.transp.write(bytes)
if sent != bytes.len: if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver all bytes") raise newException(TransmissionError, "Failed to deliver all bytes")
if limitReached:
raiseMaxRespSizeError()
else:
responder.totalBytesSent += bytes.len
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type, ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =

View File

@ -29,14 +29,14 @@ type
db*: BeaconChainDB db*: BeaconChainDB
BeaconSyncPeerState* = ref object BeaconSyncPeerState* = ref object
initialHelloReceived: bool initialStatusReceived: bool
BlockRootSlot* = object BlockRootSlot* = object
blockRoot: Eth2Digest blockRoot: Eth2Digest
slot: Slot slot: Slot
const const
maxBlocksToRequest = 64'u64 MAX_REQUESTED_BLOCKS = 20'u64
MaxAncestorBlocksResponse = 256 MaxAncestorBlocksResponse = 256
func toHeader(b: BeaconBlock): BeaconBlockHeader = func toHeader(b: BeaconBlock): BeaconBlockHeader =
@ -63,14 +63,14 @@ proc importBlocks(node: BeaconNode,
info "Forward sync imported blocks", len = blocks.len info "Forward sync imported blocks", len = blocks.len
type type
HelloMsg = object StatusMsg = object
forkVersion*: array[4, byte] forkVersion*: array[4, byte]
finalizedRoot*: Eth2Digest finalizedRoot*: Eth2Digest
finalizedEpoch*: Epoch finalizedEpoch*: Epoch
headRoot*: Eth2Digest headRoot*: Eth2Digest
headSlot*: Slot headSlot*: Slot
proc getCurrentHello(node: BeaconNode): HelloMsg = proc getCurrentStatus(node: BeaconNode): StatusMsg =
let let
blockPool = node.blockPool blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead finalizedHead = blockPool.finalizedHead
@ -79,17 +79,17 @@ proc getCurrentHello(node: BeaconNode): HelloMsg =
headSlot = headBlock.slot headSlot = headBlock.slot
finalizedEpoch = finalizedHead.slot.compute_epoch_of_slot() finalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
HelloMsg( StatusMsg(
fork_version: node.forkVersion, fork_version: node.forkVersion,
finalizedRoot: finalizedHead.blck.root, finalizedRoot: finalizedHead.blck.root,
finalizedEpoch: finalizedEpoch, finalizedEpoch: finalizedEpoch,
headRoot: headRoot, headRoot: headRoot,
headSlot: headSlot) headSlot: headSlot)
proc handleInitialHello(peer: Peer, proc handleInitialStatus(peer: Peer,
node: BeaconNode, node: BeaconNode,
ourHello: HelloMsg, ourStatus: StatusMsg,
theirHello: HelloMsg) {.async, gcsafe.} theirStatus: StatusMsg) {.async, gcsafe.}
p2pProtocol BeaconSync(version = 1, p2pProtocol BeaconSync(version = 1,
rlpxName = "bcs", rlpxName = "bcs",
@ -100,30 +100,30 @@ p2pProtocol BeaconSync(version = 1,
if peer.wasDialed: if peer.wasDialed:
let let
node = peer.networkState.node node = peer.networkState.node
ourHello = node.getCurrentHello ourStatus = node.getCurrentStatus
theirHello = await peer.hello(ourHello) theirStatus = await peer.status(ourStatus)
if theirHello.isSome: if theirStatus.isSome:
await peer.handleInitialHello(node, ourHello, theirHello.get) await peer.handleInitialStatus(node, ourStatus, theirStatus.get)
else: else:
warn "Hello response not received in time" warn "Status response not received in time"
onPeerDisconnected do (peer: Peer): onPeerDisconnected do (peer: Peer):
libp2p_peers.set peer.network.peers.len.int64 libp2p_peers.set peer.network.peers.len.int64
requestResponse: requestResponse:
proc hello(peer: Peer, theirHello: HelloMsg) {.libp2pProtocol("hello", 1).} = proc status(peer: Peer, theirStatus: StatusMsg) {.libp2pProtocol("status", 1).} =
let let
node = peer.networkState.node node = peer.networkState.node
ourHello = node.getCurrentHello ourStatus = node.getCurrentStatus
await response.send(ourHello) await response.send(ourStatus)
if not peer.state.initialHelloReceived: if not peer.state.initialStatusReceived:
peer.state.initialHelloReceived = true peer.state.initialStatusReceived = true
await peer.handleInitialHello(node, ourHello, theirHello) await peer.handleInitialStatus(node, ourStatus, theirStatus)
proc helloResp(peer: Peer, msg: HelloMsg) {.libp2pProtocol("hello", 1).} proc statusResp(peer: Peer, msg: StatusMsg)
proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).} proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).}
@ -137,9 +137,9 @@ p2pProtocol BeaconSync(version = 1,
libp2pProtocol("beacon_blocks_by_range", 1).} = libp2pProtocol("beacon_blocks_by_range", 1).} =
if count > 0'u64: if count > 0'u64:
let count = if step != 0: min(count, maxBlocksToRequest.uint64) else: 1 let count = if step != 0: min(count, MAX_REQUESTED_BLOCKS.uint64) else: 1
let pool = peer.networkState.node.blockPool let pool = peer.networkState.node.blockPool
var results: array[maxBlocksToRequest, BlockRef] var results: array[MAX_REQUESTED_BLOCKS, BlockRef]
let let
lastPos = min(count.int, results.len) - 1 lastPos = min(count.int, results.len) - 1
firstPos = pool.getBlockRange(headBlockRoot, startSlot, step, firstPos = pool.getBlockRange(headBlockRoot, startSlot, step,
@ -164,12 +164,12 @@ p2pProtocol BeaconSync(version = 1,
peer: Peer, peer: Peer,
blocks: openarray[BeaconBlock]) blocks: openarray[BeaconBlock])
proc handleInitialHello(peer: Peer, proc handleInitialStatus(peer: Peer,
node: BeaconNode, node: BeaconNode,
ourHello: HelloMsg, ourStatus: StatusMsg,
theirHello: HelloMsg) {.async, gcsafe.} = theirStatus: StatusMsg) {.async, gcsafe.} =
if theirHello.forkVersion != node.forkVersion: if theirStatus.forkVersion != node.forkVersion:
await peer.disconnect(IrrelevantNetwork) await peer.disconnect(IrrelevantNetwork)
return return
@ -181,28 +181,28 @@ proc handleInitialHello(peer: Peer,
libp2p_peers.set peer.network.peers.len.int64 libp2p_peers.set peer.network.peers.len.int64
debug "Peer connected. Initiating sync", peer, debug "Peer connected. Initiating sync", peer,
headSlot = ourHello.headSlot, headSlot = ourStatus.headSlot,
remoteHeadSlot = theirHello.headSlot remoteHeadSlot = theirStatus.headSlot
let bestDiff = cmp((ourHello.finalizedEpoch, ourHello.headSlot), let bestDiff = cmp((ourStatus.finalizedEpoch, ourStatus.headSlot),
(theirHello.finalizedEpoch, theirHello.headSlot)) (theirStatus.finalizedEpoch, theirStatus.headSlot))
if bestDiff >= 0: if bestDiff >= 0:
# Nothing to do? # Nothing to do?
debug "Nothing to sync", peer debug "Nothing to sync", peer
else: else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big. # connection if it's too big.
var s = ourHello.headSlot + 1 var s = ourStatus.headSlot + 1
var theirHello = theirHello var theirStatus = theirStatus
while s <= theirHello.headSlot: while s <= theirStatus.headSlot:
let numBlocksToRequest = min(uint64(theirHello.headSlot - s), let numBlocksToRequest = min(uint64(theirStatus.headSlot - s),
maxBlocksToRequest) MAX_REQUESTED_BLOCKS)
debug "Requesting blocks", peer, remoteHeadSlot = theirHello.headSlot, debug "Requesting blocks", peer, remoteHeadSlot = theirStatus.headSlot,
ourHeadSlot = s, ourHeadSlot = s,
numBlocksToRequest numBlocksToRequest
let blocks = await peer.beaconBlocksByRange(theirHello.headRoot, s, let blocks = await peer.beaconBlocksByRange(theirStatus.headRoot, s,
numBlocksToRequest, 1'u64) numBlocksToRequest, 1'u64)
if blocks.isSome: if blocks.isSome:
info "got blocks", total = blocks.get.len info "got blocks", total = blocks.get.len
@ -220,9 +220,9 @@ proc handleInitialHello(peer: Peer,
# TODO: Maybe this shouldn't happen so often. # TODO: Maybe this shouldn't happen so often.
# The alternative could be watching up a timer here. # The alternative could be watching up a timer here.
let helloResp = await peer.hello(node.getCurrentHello) let statusResp = await peer.status(node.getCurrentStatus)
if helloResp.isSome: if statusResp.isSome:
theirHello = helloResp.get theirStatus = statusResp.get
else: else:
# We'll ignore this error and we'll try to request # We'll ignore this error and we'll try to request
# another range optimistically. If that fails, the # another range optimistically. If that fails, the