diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index a9b909192..ab56dc28a 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -90,9 +90,16 @@ type const defaultIncomingReqTimeout = 5000 - defaultOutgoingReqTimeout = 10000 HandshakeTimeout = FaultOrError - RQRP_MAX_SIZE = 2 * 1024 * 1024 + + # Spec constants + # https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains + REQ_RESP_MAX_SIZE* = 1 * 1024 * 1024 # bytes + GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes + TTFB_TIMEOUT* = 5.seconds + RESP_TIMEOUT* = 10.seconds + + readTimeoutErrorMsg = "Exceeded read timeout for a request" template `$`*(peer: Peer): string = $peer.id chronicles.formatIt(Peer): $it @@ -184,7 +191,7 @@ proc readSizePrefix(transp: StreamTransport, case parser.feedByte(nextByte) of Done: let res = parser.getResult - if res > uint64(RQRP_MAX_SIZE): + if res > uint64(REQ_RESP_MAX_SIZE): return -1 else: return int(res) @@ -230,17 +237,11 @@ proc readMsgBytes(stream: P2PStream, return msgBytes -proc readMsgBytesOrClose(stream: P2PStream, - withResponseCode: bool, - deadline: Future[void]): Future[Bytes] {.async.} = - result = await stream.readMsgBytes(withResponseCode, deadline) - if result.len == 0: await stream.close() - proc readChunk(stream: P2PStream, MsgType: type, withResponseCode: bool, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = - var msgBytes = await stream.readMsgBytesOrClose(withResponseCode, deadline) + var msgBytes = await stream.readMsgBytes(withResponseCode, deadline) try: if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType) @@ -258,10 +259,6 @@ proc readResponse( type E = ElemType(MsgType) var results: MsgType while true: - # This loop will keep reading messages until the deadline is over - # or the other side closes the stream or provides an invalid respose. - # The underlying use of `readMsgBytesOrClose` will ensure that the - # stream is closed on our side as well. let nextRes = await readChunk(stream, E, true, deadline) if nextRes.isNone: break results.add nextRes.get @@ -309,18 +306,32 @@ proc writeSizePrefix(transp: StreamTransport, size: uint64) {.async.} = if sent != varintSize: raise newException(TransmissionError, "Failed to deliver size prefix") -proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = - var stream = await peer.network.daemon.openStream(peer.id, @[protocolId]) - # TODO how does openStream fail? Set a timeout here and handle it - await writeSizePrefix(stream.transp, uint64(requestBytes.len)) - let sent = await stream.transp.write(requestBytes) - if sent != requestBytes.len: +proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = + var deadline = sleepAsync RESP_TIMEOUT + var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId]) + await streamFut or deadline + if not streamFut.finished: + # TODO: we are returning here because the deadline passed, but + # the stream can still be opened eventually a bit later. Who is + # going to close it then? + raise newException(TransmissionError, "Failed to open LibP2P stream") + + let stream = streamFut.read + defer: + await close(stream) + + var s = init OutputStream + s.appendVarint requestBytes.len.uint64 + s.append requestBytes + let bytes = s.getOutput + let sent = await stream.transp.write(bytes) + if sent != bytes.len: raise newException(TransmissionError, "Failed to deliver msg bytes") proc sendResponseChunkBytes(stream: P2PStream, payload: Bytes) {.async.} = var s = init OutputStream s.append byte(Success) - s.appendVarint payload.len + s.appendVarint payload.len.uint64 s.append payload let bytes = s.getOutput let sent = await stream.transp.write(bytes) @@ -350,18 +361,27 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = var deadline = sleepAsync timeout + # Open a new LibP2P stream var streamFut = peer.network.daemon.openStream(peer.id, @[protocolId]) await streamFut or deadline if not streamFut.finished: + # TODO: we are returning here because the deadline passed, but + # the stream can still be opened eventually a bit later. Who is + # going to close it then? return none(ResponseMsg) - # Send the request let stream = streamFut.read + defer: + await close(stream) - await writeSizePrefix(stream.transp, requestBytes.len.uint64) - let sent = await stream.transp.write(requestBytes) - if sent != requestBytes.len: + # Send the request + var s = init OutputStream + s.appendVarint requestBytes.len.uint64 + s.append requestBytes + let bytes = s.getOutput + let sent = await stream.transp.write(bytes) + if sent != bytes.len: await disconnectAndRaise(peer, FaultOrError, "Incomplete send") # Read the response @@ -467,7 +487,7 @@ proc implementSendProcBody(sendProc: SendProc) = makeEth2Request(`peer`, `msgProto`, `bytes`, `ResponseRecord`, `timeout`) else: - quote: sendMsg(`peer`, `msgProto`, `bytes`) + quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`) else: quote: sendResponseChunkBytes(`UntypedResponder`(`peer`).stream, `bytes`) @@ -524,9 +544,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = ## Implemenmt Thunk ## var thunkName = ident(msgName & "_thunk") - let - requestDataTimeout = newCall(milliseconds, newLit(defaultIncomingReqTimeout)) - awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar]) + let awaitUserHandler = msg.genAwaitUserHandler(msgVar, [peerVar, streamVar]) let tracing = when tracingEnabled: quote: logReceivedMsg(`streamVar`.peer, `msgVar`.get) @@ -536,14 +554,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = msg.defineThunk quote do: proc `thunkName`(`daemonVar`: `DaemonAPI`, `streamVar`: `P2PStream`) {.async, gcsafe.} = + defer: + `await` close(`streamVar`) + let - `deadlineVar` = sleepAsync `requestDataTimeout` + `deadlineVar` = sleepAsync RESP_TIMEOUT `msgBytesVar` = `await` readMsgBytes(`streamVar`, false, `deadlineVar`) `peerVar` = peerFromStream(`daemonVar`, `streamVar`) if `msgBytesVar`.len == 0: - `await` sendErrorResponse(`peerVar`, `streamVar`, ServerError, - "Exceeded read timeout for a request") + `await` sendErrorResponse(`peerVar`, `streamVar`, + ServerError, readTimeoutErrorMsg) return var `msgVar`: `msgRecName` diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 9f898a051..958511f5d 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -62,87 +62,34 @@ proc importBlocks(node: BeaconNode, node.onBeaconBlock(node, blk) info "Forward sync imported blocks", len = blocks.len -proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] = - if bodies.len != headers.len: - info "Cannot merge bodies and headers. Length mismatch.", bodies = bodies.len, headers = headers.len - return - - var res: seq[BeaconBlock] - for i in 0 ..< headers.len: - if hash_tree_root(bodies[i]) != headers[i].body_root: - info "Block body is wrong for header" - return - - res.setLen(res.len + 1) - res[^1].fromHeaderAndBody(headers[i], bodies[i]) - some(res) - -proc beaconBlocksByRange*( - peer: Peer, - headBlockRoot: Eth2Digest, - start_slot: Slot, - count: uint64, - step: uint64, - timeout: Duration = milliseconds(10000'i64)): - Future[Option[seq[BeaconBlock]]] {.gcsafe.} - type HelloMsg = object forkVersion*: array[4, byte] - latestFinalizedRoot*: Eth2Digest - latestFinalizedEpoch*: Epoch - bestRoot*: Eth2Digest - bestSlot*: Slot + finalizedRoot*: Eth2Digest + finalizedEpoch*: Epoch + headRoot*: Eth2Digest + headSlot*: Slot + +proc getCurrentHello(node: BeaconNode): HelloMsg = + let + blockPool = node.blockPool + finalizedHead = blockPool.finalizedHead + headBlock = blockPool.head.blck + headRoot = headBlock.root + headSlot = headBlock.slot + finalizedEpoch = finalizedHead.slot.compute_epoch_of_slot() + + HelloMsg( + fork_version: node.forkVersion, + finalizedRoot: finalizedHead.blck.root, + finalizedEpoch: finalizedEpoch, + headRoot: headRoot, + headSlot: headSlot) proc handleInitialHello(peer: Peer, node: BeaconNode, - latestFinalizedEpoch: Epoch, - bestSlot: Slot, - bestRoot: Eth2Digest, - h: HelloMsg) {.async.} = - if h.forkVersion != node.forkVersion: - await peer.disconnect(IrrelevantNetwork) - return - - # TODO: onPeerConnected runs unconditionally for every connected peer, but we - # don't need to sync with everybody. The beacon node should detect a situation - # where it needs to sync and it should execute the sync algorithm with a certain - # number of randomly selected peers. The algorithm itself must be extracted in a proc. - try: - libp2p_peers.set peer.network.peers.len.int64 - debug "Peer connected. Initiating sync", peer, bestSlot, remoteBestSlot = h.bestSlot - - let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (h.latestFinalizedEpoch, h.bestSlot)) - if bestDiff >= 0: - # Nothing to do? - debug "Nothing to sync", peer - else: - # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the - # connection if it's too big. - - var s = bestSlot + 1 - while s <= h.bestSlot: - debug "Waiting for block headers", peer, remoteBestSlot = h.bestSlot - - let numBlocksToRequest = min(uint64(h.bestSlot - s), maxBlocksToRequest) - let blocks = await peer.beaconBlocksByRange(bestRoot, s, - numBlocksToRequest, 1'u64) - if blocks.isSome: - if blocks.get.len == 0: - info "Got 0 blocks while syncing", peer - break - node.importBlocks blocks.get - let lastSlot = blocks.get[^1].slot - if lastSlot <= s: - info "Slot did not advance during sync", peer - break - - s = lastSlot + 1 - else: - break - - except CatchableError: - warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() + ourHello: HelloMsg, + theirHello: HelloMsg) {.async, gcsafe.} p2pProtocol BeaconSync(version = 1, rlpxName = "bcs", @@ -152,24 +99,12 @@ p2pProtocol BeaconSync(version = 1, onPeerConnected do (peer: Peer): if peer.wasDialed: let - protocolVersion = 1 # TODO: Spec doesn't specify this yet node = peer.networkState.node - blockPool = node.blockPool - finalizedHead = blockPool.finalizedHead - headBlock = blockPool.head.blck - bestRoot = headBlock.root - bestSlot = headBlock.slot - latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot() + ourHello = node.getCurrentHello + theirHello = await peer.hello(ourHello) - let h = await peer.hello(HelloMsg( - fork_version: node.forkVersion, - latestFinalizedRoot: finalizedHead.blck.root, - latestFinalizedEpoch: latestFinalizedEpoch, - bestRoot: bestRoot, - bestSlot: bestSlot), timeout = 10.seconds) - - if h.isSome: - await peer.handleInitialHello(node, latestFinalizedEpoch, bestSlot, bestRoot, h.get) + if theirHello.isSome: + await peer.handleInitialHello(node, ourHello, theirHello.get) else: warn "Hello response not received in time" @@ -177,27 +112,16 @@ p2pProtocol BeaconSync(version = 1, libp2p_peers.set peer.network.peers.len.int64 requestResponse: - proc hello(peer: Peer, hhh: HelloMsg) {.libp2pProtocol("hello", 1).} = + proc hello(peer: Peer, theirHello: HelloMsg) {.libp2pProtocol("hello", 1).} = let - protocolVersion = 1 # TODO: Spec doesn't specify this yet node = peer.networkState.node - blockPool = node.blockPool - finalizedHead = blockPool.finalizedHead - headBlock = blockPool.head.blck - bestRoot = headBlock.root - bestSlot = headBlock.slot - latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot() + ourHello = node.getCurrentHello - await response.send(HelloMsg( - fork_version: node.forkVersion, - latestFinalizedRoot: finalizedHead.blck.root, - latestFinalizedEpoch: latestFinalizedEpoch, - bestRoot: bestRoot, - bestSlot: bestSlot)) + await response.send(ourHello) if not peer.state.initialHelloReceived: peer.state.initialHelloReceived = true - await peer.handleInitialHello(node, latestFinalizedEpoch, bestSlot, bestRoot, hhh) + await peer.handleInitialHello(node, ourHello, theirHello) proc helloResp(peer: Peer, msg: HelloMsg) {.libp2pProtocol("hello", 1).} @@ -258,3 +182,70 @@ p2pProtocol BeaconSync(version = 1, peer: Peer, blocks: openarray[BeaconBlock]) +proc handleInitialHello(peer: Peer, + node: BeaconNode, + ourHello: HelloMsg, + theirHello: HelloMsg) {.async, gcsafe.} = + + if theirHello.forkVersion != node.forkVersion: + await peer.disconnect(IrrelevantNetwork) + return + + # TODO: onPeerConnected runs unconditionally for every connected peer, but we + # don't need to sync with everybody. The beacon node should detect a situation + # where it needs to sync and it should execute the sync algorithm with a certain + # number of randomly selected peers. The algorithm itself must be extracted in a proc. + try: + libp2p_peers.set peer.network.peers.len.int64 + + debug "Peer connected. Initiating sync", peer, + headSlot = ourHello.headSlot, + remoteHeadSlot = theirHello.headSlot + + let bestDiff = cmp((ourHello.finalizedEpoch, ourHello.headSlot), + (theirHello.finalizedEpoch, theirHello.headSlot)) + if bestDiff >= 0: + # Nothing to do? + debug "Nothing to sync", peer + else: + # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the + # connection if it's too big. + + var s = ourHello.headSlot + 1 + var theirHello = theirHello + while s <= theirHello.headSlot: + debug "Waiting for block headers", peer, + remoteHeadSlot = theirHello.headSlot + + let numBlocksToRequest = min(uint64(theirHello.headSlot - s), + maxBlocksToRequest) + let blocks = await peer.beaconBlocksByRange(ourHello.headRoot, s, + numBlocksToRequest, 1'u64) + if blocks.isSome: + if blocks.get.len == 0: + info "Got 0 blocks while syncing", peer + break + node.importBlocks blocks.get + let lastSlot = blocks.get[^1].slot + if lastSlot <= s: + info "Slot did not advance during sync", peer + break + + s = lastSlot + 1 + + # TODO: Maybe this shouldn't happen so often. + # The alternative could be watching up a timer here. + let helloResp = await peer.hello(node.getCurrentHello) + if helloResp.isSome: + theirHello = helloResp.get + else: + # We'll ignore this error and we'll try to request + # another range optimistically. If that fails, the + # syncing will be interrupted. + discard + else: + break + + except CatchableError: + warn "Failed to sync with peer", peer, err = getCurrentExceptionMsg() +