Working BeaconSync

Changes:

* Do not send separate network packets for response codes and msg
  len prefixes

* Close streams according to the spec

* Implement more timeouts according to the spec

* Make hello requests during syncing to update our knowledge of
  the head block of the other peer.
This commit is contained in:
Zahary Karadjov 2019-09-09 19:55:01 -04:00 committed by zah
parent 4a54fb4103
commit a83aa83644
2 changed files with 150 additions and 138 deletions

View File

@ -90,9 +90,16 @@ type
const
defaultIncomingReqTimeout = 5000
defaultOutgoingReqTimeout = 10000
HandshakeTimeout = FaultOrError
RQRP_MAX_SIZE = 2 * 1024 * 1024
# Spec constants
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains
REQ_RESP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
TTFB_TIMEOUT* = 5.seconds
RESP_TIMEOUT* = 10.seconds
readTimeoutErrorMsg = "Exceeded read timeout for a request"
template `$`*(peer: Peer): string = $peer.id
chronicles.formatIt(Peer): $it
@ -184,7 +191,7 @@ proc readSizePrefix(transp: StreamTransport,
case parser.feedByte(nextByte)
of Done:
let res = parser.getResult
if res > uint64(RQRP_MAX_SIZE):
if res > uint64(REQ_RESP_MAX_SIZE):
return -1
else:
return int(res)
@ -230,17 +237,11 @@ proc readMsgBytes(stream: P2PStream,
return msgBytes
proc readMsgBytesOrClose(stream: P2PStream,
withResponseCode: bool,
deadline: Future[void]): Future[Bytes] {.async.} =
result = await stream.readMsgBytes(withResponseCode, deadline)
if result.len == 0: await stream.close()
proc readChunk(stream: P2PStream,
MsgType: type,
withResponseCode: bool,
deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} =
var msgBytes = await stream.readMsgBytesOrClose(withResponseCode, deadline)
var msgBytes = await stream.readMsgBytes(withResponseCode, deadline)
try:
if msgBytes.len > 0:
return some SSZ.decode(msgBytes, MsgType)
@ -258,10 +259,6 @@ proc readResponse(
type E = ElemType(MsgType)
var results: MsgType
while true:
# This loop will keep reading messages until the deadline is over
# or the other side closes the stream or provides an invalid respose.
# The underlying use of `readMsgBytesOrClose` will ensure that the
# stream is closed on our side as well.
let nextRes = await readChunk(stream, E, true, deadline)
if nextRes.isNone: break
results.add nextRes.get
@ -309,18 +306,32 @@ proc writeSizePrefix(transp: StreamTransport, size: uint64) {.async.} =
if sent != varintSize:
raise newException(TransmissionError, "Failed to deliver size prefix")
proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var stream = await peer.network.daemon.openStream(peer.id, @[protocolId])
# TODO how does openStream fail? Set a timeout here and handle it
await writeSizePrefix(stream.transp, uint64(requestBytes.len))
let sent = await stream.transp.write(requestBytes)
if sent != requestBytes.len:
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT
var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId])
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
defer:
await close(stream)
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
let sent = await stream.transp.write(bytes)
if sent != bytes.len:
raise newException(TransmissionError, "Failed to deliver msg bytes")
proc sendResponseChunkBytes(stream: P2PStream, payload: Bytes) {.async.} =
var s = init OutputStream
s.append byte(Success)
s.appendVarint payload.len
s.appendVarint payload.len.uint64
s.append payload
let bytes = s.getOutput
let sent = await stream.transp.write(bytes)
@ -350,18 +361,27 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout
# Open a new LibP2P stream
var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId])
await streamFut or deadline
if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
return none(ResponseMsg)
# Send the request
let stream = streamFut.read
defer:
await close(stream)
await writeSizePrefix(stream.transp, requestBytes.len.uint64)
let sent = await stream.transp.write(requestBytes)
if sent != requestBytes.len:
# Send the request
var s = init OutputStream
s.appendVarint requestBytes.len.uint64
s.append requestBytes
let bytes = s.getOutput
let sent = await stream.transp.write(bytes)
if sent != bytes.len:
await disconnectAndRaise(peer, FaultOrError, "Incomplete send")
# Read the response
@ -467,7 +487,7 @@ proc implementSendProcBody(sendProc: SendProc) =
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeout`)
else:
quote: sendMsg(`peer`, `msgProto`, `bytes`)
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
else:
quote: sendResponseChunkBytes(`UntypedResponder`(`peer`).stream, `bytes`)
@ -524,9 +544,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
## Implemenmt Thunk
##
var thunkName = ident(msgName & "_thunk")
let
requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout))
awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar])
let awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar])
let tracing = when tracingEnabled:
quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get)
@ -536,14 +554,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
msg.defineThunk quote do:
proc `thunkName`(`daemonVar`: `DaemonAPI`,
`streamVar`: `P2PStream`) {.async, gcsafe.} =
defer:
`await` close(`streamVar`)
let
`deadlineVar` = sleepAsync `requestDataTimeout`
`deadlineVar` = sleepAsync RESP_TIMEOUT
`msgBytesVar` = `await` readMsgBytes(`streamVar`, false, `deadlineVar`)
`peerVar` = peerFromStream(`daemonVar`, `streamVar`)
if `msgBytesVar`.len == 0:
`await` sendErrorResponse(`peerVar`, `streamVar`, ServerError,
"Exceeded read timeout for a request")
`await` sendErrorResponse(`peerVar`, `streamVar`,
ServerError, readTimeoutErrorMsg)
return
var `msgVar`: `msgRecName`

View File

@ -62,87 +62,34 @@ proc importBlocks(node: BeaconNode,
node.onBeaconBlock(node, blk)
info "Forward sync imported blocks", len = blocks.len
proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] =
if bodies.len != headers.len:
info "Cannot merge bodies and headers. Length mismatch.", bodies = bodies.len, headers = headers.len
return
var res: seq[BeaconBlock]
for i in 0 ..< headers.len:
if hash_tree_root(bodies[i]) != headers[i].body_root:
info "Block body is wrong for header"
return
res.setLen(res.len + 1)
res[^1].fromHeaderAndBody(headers[i], bodies[i])
some(res)
proc beaconBlocksByRange*(
peer: Peer,
headBlockRoot: Eth2Digest,
start_slot: Slot,
count: uint64,
step: uint64,
timeout: Duration = milliseconds(10000'i64)):
Future[Option[seq[BeaconBlock]]] {.gcsafe.}
type
HelloMsg = object
forkVersion*: array[4, byte]
latestFinalizedRoot*: Eth2Digest
latestFinalizedEpoch*: Epoch
bestRoot*: Eth2Digest
bestSlot*: Slot
finalizedRoot*: Eth2Digest
finalizedEpoch*: Epoch
headRoot*: Eth2Digest
headSlot*: Slot
proc getCurrentHello(node: BeaconNode): HelloMsg =
let
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
headRoot = headBlock.root
headSlot = headBlock.slot
finalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
HelloMsg(
fork_version: node.forkVersion,
finalizedRoot: finalizedHead.blck.root,
finalizedEpoch: finalizedEpoch,
headRoot: headRoot,
headSlot: headSlot)
proc handleInitialHello(peer: Peer,
node: BeaconNode,
latestFinalizedEpoch: Epoch,
bestSlot: Slot,
bestRoot: Eth2Digest,
h: HelloMsg) {.async.} =
if h.forkVersion != node.forkVersion:
await peer.disconnect(IrrelevantNetwork)
return
# TODO: onPeerConnected runs unconditionally for every connected peer, but we
# don't need to sync with everybody. The beacon node should detect a situation
# where it needs to sync and it should execute the sync algorithm with a certain
# number of randomly selected peers. The algorithm itself must be extracted in a proc.
try:
libp2p_peers.set peer.network.peers.len.int64
debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = h.bestSlot
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (h.latestFinalizedEpoch, h.bestSlot))
if bestDiff >= 0:
# Nothing to do?
debug "Nothing to sync", peer
else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big.
var s = bestSlot + 1
while s <= h.bestSlot:
debug "Waiting for block headers", peer, remoteBestSlot = h.bestSlot
let numBlocksToRequest = min(uint64(h.bestSlot - s), maxBlocksToRequest)
let blocks = await peer.beaconBlocksByRange(bestRoot, s,
numBlocksToRequest, 1'u64)
if blocks.isSome:
if blocks.get.len == 0:
info "Got 0 blocks while syncing", peer
break
node.importBlocks blocks.get
let lastSlot = blocks.get[^1].slot
if lastSlot <= s:
info "Slot did not advance during sync", peer
break
s = lastSlot + 1
else:
break
except CatchableError:
warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg()
ourHello: HelloMsg,
theirHello: HelloMsg) {.async, gcsafe.}
p2pProtocol BeaconSync(version = 1,
rlpxName = "bcs",
@ -152,24 +99,12 @@ p2pProtocol BeaconSync(version = 1,
onPeerConnected do (peer: Peer):
if peer.wasDialed:
let
protocolVersion = 1 # TODO: Spec doesn't specify this yet
node = peer.networkState.node
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
bestRoot = headBlock.root
bestSlot = headBlock.slot
latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
ourHello = node.getCurrentHello
theirHello = await peer.hello(ourHello)
let h = await peer.hello(HelloMsg(
fork_version: node.forkVersion,
latestFinalizedRoot: finalizedHead.blck.root,
latestFinalizedEpoch: latestFinalizedEpoch,
bestRoot: bestRoot,
bestSlot: bestSlot), timeout = 10.seconds)
if h.isSome:
await peer.handleInitialHello(node, latestFinalizedEpoch, bestSlot, bestRoot, h.get)
if theirHello.isSome:
await peer.handleInitialHello(node, ourHello, theirHello.get)
else:
warn "Hello response not received in time"
@ -177,27 +112,16 @@ p2pProtocol BeaconSync(version = 1,
libp2p_peers.set peer.network.peers.len.int64
requestResponse:
proc hello(peer: Peer, hhh: HelloMsg) {.libp2pProtocol("hello", 1).} =
proc hello(peer: Peer, theirHello: HelloMsg) {.libp2pProtocol("hello", 1).} =
let
protocolVersion = 1 # TODO: Spec doesn't specify this yet
node = peer.networkState.node
blockPool = node.blockPool
finalizedHead = blockPool.finalizedHead
headBlock = blockPool.head.blck
bestRoot = headBlock.root
bestSlot = headBlock.slot
latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot()
ourHello = node.getCurrentHello
await response.send(HelloMsg(
fork_version: node.forkVersion,
latestFinalizedRoot: finalizedHead.blck.root,
latestFinalizedEpoch: latestFinalizedEpoch,
bestRoot: bestRoot,
bestSlot: bestSlot))
await response.send(ourHello)
if not peer.state.initialHelloReceived:
peer.state.initialHelloReceived = true
await peer.handleInitialHello(node, latestFinalizedEpoch, bestSlot, bestRoot, hhh)
await peer.handleInitialHello(node, ourHello, theirHello)
proc helloResp(peer: Peer, msg: HelloMsg) {.libp2pProtocol("hello", 1).}
@ -258,3 +182,70 @@ p2pProtocol BeaconSync(version = 1,
peer: Peer,
blocks: openarray[BeaconBlock])
proc handleInitialHello(peer: Peer,
node: BeaconNode,
ourHello: HelloMsg,
theirHello: HelloMsg) {.async, gcsafe.} =
if theirHello.forkVersion != node.forkVersion:
await peer.disconnect(IrrelevantNetwork)
return
# TODO: onPeerConnected runs unconditionally for every connected peer, but we
# don't need to sync with everybody. The beacon node should detect a situation
# where it needs to sync and it should execute the sync algorithm with a certain
# number of randomly selected peers. The algorithm itself must be extracted in a proc.
try:
libp2p_peers.set peer.network.peers.len.int64
debug "Peer connected. Initiating sync", peer,
headSlot = ourHello.headSlot,
remoteHeadSlot = theirHello.headSlot
let bestDiff = cmp((ourHello.finalizedEpoch, ourHello.headSlot),
(theirHello.finalizedEpoch, theirHello.headSlot))
if bestDiff >= 0:
# Nothing to do?
debug "Nothing to sync", peer
else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big.
var s = ourHello.headSlot + 1
var theirHello = theirHello
while s <= theirHello.headSlot:
debug "Waiting for block headers", peer,
remoteHeadSlot = theirHello.headSlot
let numBlocksToRequest = min(uint64(theirHello.headSlot - s),
maxBlocksToRequest)
let blocks = await peer.beaconBlocksByRange(ourHello.headRoot, s,
numBlocksToRequest, 1'u64)
if blocks.isSome:
if blocks.get.len == 0:
info "Got 0 blocks while syncing", peer
break
node.importBlocks blocks.get
let lastSlot = blocks.get[^1].slot
if lastSlot <= s:
info "Slot did not advance during sync", peer
break
s = lastSlot + 1
# TODO: Maybe this shouldn't happen so often.
# The alternative could be watching up a timer here.
let helloResp = await peer.hello(node.getCurrentHello)
if helloResp.isSome:
theirHello = helloResp.get
else:
# We'll ignore this error and we'll try to request
# another range optimistically. If that fails, the
# syncing will be interrupted.
discard
else:
break
except CatchableError:
warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg()