diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index d4d564de5..a1f2b8ee3 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -4,10 +4,10 @@ import chronos, chronicles, confutils, serialization/errors, eth/trie/db, eth/trie/backends/rocksdb_backend, eth/async_utils, spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator], - conf, time, - state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras, - attestation_pool, block_pool, eth2_network, beacon_node_types, - mainchain_monitor, trusted_state_snapshots, version + conf, time, state_transition, fork_choice, ssz, beacon_chain_db, + validator_pool, extras, attestation_pool, block_pool, eth2_network, + beacon_node_types, mainchain_monitor, trusted_state_snapshots, version, + sync_protocol, request_manager const topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks" @@ -18,13 +18,7 @@ const genesisFile = "genesis.json" testnetsBaseUrl = "https://serenity-testnets.status.im" -# ################################################# -# Careful handling of beacon_node <-> sync_protocol -# to avoid recursive dependencies proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.} - # Forward decl for sync_protocol -import sync_protocol, request_manager -# ################################################# func localValidatorsDir(conf: BeaconNodeConf): string = conf.dataDir / "validators" @@ -88,6 +82,7 @@ proc saveValidatorKey(keyName, key: string, conf: BeaconNodeConf) = proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} = new result + result.onBeaconBlock = onBeaconBlock result.config = conf result.networkIdentity = getPersistentNetIdentity(conf) result.nickname = if conf.nodename == "auto": shortForm(result.networkIdentity) @@ -654,7 +649,7 @@ proc onSecond(node: BeaconNode, moment: Moment) {.async.} = if missingBlocks.len > 0: info "Requesting detected missing blocks", missingBlocks node.requestManager.fetchAncestorBlocks(missingBlocks) do (b: BeaconBlock): - node.onBeaconBlock(b) + onBeaconBlock(node ,b) let nextSecond = max(Moment.now(), moment + chronos.seconds(1)) addTimer(nextSecond) do (p: pointer): @@ -662,7 +657,7 @@ proc onSecond(node: BeaconNode, moment: Moment) {.async.} = proc run*(node: BeaconNode) = waitFor node.network.subscribe(topicBeaconBlocks) do (blck: BeaconBlock): - node.onBeaconBlock(blck) + onBeaconBlock(node, blck) waitFor node.network.subscribe(topicAttestations) do (attestation: Attestation): node.onAttestation(attestation) diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index 0e5306d3f..a0a00f534 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -25,6 +25,7 @@ type attestationPool*: AttestationPool mainchainMonitor*: MainchainMonitor beaconClock*: BeaconClock + onBeaconBlock*: proc (node: BeaconNode, blck: BeaconBlock) {.gcsafe.} stateCache*: StateData ##\ ## State cache object that's used as a scratch pad diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 110a0ee5c..5c9bf03b3 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -175,17 +175,22 @@ else: result = waitFor daemon.identity() waitFor daemon.close() + template tcpEndPoint(address, port): auto = + MultiAddress.init(address, IPPROTO_TCP, port) + proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} = var (extIp, extTcpPort, extUdpPort) = setupNat(conf) hostAddress = tcpEndPoint(globalListeningAddr, Port conf.tcpPort) announcedAddresses = if extIp != globalListeningAddr: @[] else: @[tcpEndPoint(extIp, extTcpPort)] + keyFile = conf.ensureNetworkIdFile - daemon = await newDaemonApi({PSGossipSub}, - id = conf.ensureNetworkIdFile, - hostAddresses = @[hostAddress], - announcedAddresses = announcedAddresses) + info "Starting LibP2P deamon", hostAddress, announcedAddresses, keyFile + let daemon = await newDaemonApi({PSGossipSub}, + id = keyFile, + hostAddresses = @[hostAddress], + announcedAddresses = announcedAddresses) return await Eth2Node.init(daemon) @@ -195,7 +200,7 @@ else: result.addresses = @[tcpEndPoint(ip, port)] proc isSameNode*(bootstrapNode: BootstrapAddr, id: Eth2NodeIdentity): bool = - bootstrapNode == id + bootstrapNode.peer == id.peer proc shortForm*(id: Eth2NodeIdentity): string = # TODO: Make this shorter diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 254b0258c..c3d949e7f 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -22,6 +22,7 @@ type connectionState*: ConnectionState awaitedMessages: Table[CompressedMsgId, FutureBase] protocolStates*: seq[RootRef] + maxInactivityAllowed*: Duration ConnectionState* = enum None, @@ -95,6 +96,7 @@ proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} = new result result.daemon = daemon result.daemon.userData = result + result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config init result.peers newSeq result.protocolStates, allProtocols.len @@ -319,7 +321,7 @@ proc implementSendProcBody(sendProc: SendProc) = else: quote: `sendBytes`(`UntypedResponder`(`peer`).stream, `bytes`) - sendProc.useStandardBody(nil, sendCallGenerator) + sendProc.useStandardBody(nil, nil, sendCallGenerator) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var diff --git a/beacon_chain/libp2p_spec_backend.nim b/beacon_chain/libp2p_spec_backend.nim index cb17794e6..3eb4e44a6 100644 --- a/beacon_chain/libp2p_spec_backend.nim +++ b/beacon_chain/libp2p_spec_backend.nim @@ -1,7 +1,7 @@ import tables, deques, options, algorithm, std_shims/[macros_shim, tables_shims], ranges/ptr_arith, chronos, chronicles, serialization, faststreams/input_stream, - eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi, + eth/async_utils, eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi, ssz export @@ -9,10 +9,10 @@ export const # Compression nibble - NoCompression* = uint 0 + NoCompression* = byte 0 # Encoding nibble - SszEncoding* = uint 1 + SszEncoding* = byte 1 beaconChainProtocol = "/eth/serenity/beacon/rpc/1" @@ -27,13 +27,13 @@ type Peer* = ref object network*: Eth2Node id*: PeerID - lastSentMsgId*: uint64 + lastReqId*: uint64 rpcStream*: P2PStream connectionState*: ConnectionState awaitedMessages: Table[CompressedMsgId, FutureBase] - outstandingRequests*: seq[Deque[OutstandingRequest]] + outstandingRequests*: Table[uint64, OutstandingRequest] protocolStates*: seq[RootRef] - maxInactivityAllowed: Duration + maxInactivityAllowed*: Duration ConnectionState* = enum None, @@ -69,6 +69,7 @@ type id*: uint64 future*: FutureBase timeoutAt*: Moment + responseThunk*: ThunkProc ProtocolConnection* = object stream*: P2PStream @@ -96,13 +97,9 @@ type networkStateInitializer*: NetworkStateInitializer handshake*: HandshakeStep disconnectHandler*: DisconnectionHandler - dispatcher: Dispatcher ProtocolInfo* = ptr ProtocolInfoObj - Dispatcher* = object - messages*: seq[MessageInfo] - SpecOuterMsgHeader {.packed.} = object compression {.bitsize: 4.}: uint encoding {.bitsize: 4.}: uint @@ -112,6 +109,10 @@ type reqId: uint64 methodId: uint16 + ErrorResponse {.packed.} = object + outerHeader: SpecOuterMsgHeader + innerHeader: SpecInnerMsgHeader + PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.} @@ -121,6 +122,7 @@ type ThunkProc* = proc(peer: Peer, stream: P2PStream, reqId: uint64, + reqFuture: FutureBase, msgData: ByteStreamVar): Future[void] {.gcsafe.} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} @@ -147,6 +149,14 @@ const BreachOfProtocol* = FaultOrError # TODO: We should lobby for more disconnection reasons. +template isOdd(val: SomeInteger): bool = + type T = type(val) + (val and T(1)) != 0 + +proc init(T: type SpecOuterMsgHeader, + compression, encoding: byte, msgLen: uint64): T = + T(compression: compression, encoding: encoding, msgLen: msgLen) + proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} = await stream.transp.readExactly(addr result, sizeof result) @@ -154,6 +164,10 @@ proc appendPackedObject(stream: OutputStreamVar, value: auto) = let valueAsBytes = cast[ptr byte](unsafeAddr(value)) stream.append makeOpenArray(valueAsBytes, sizeof(value)) +proc getThunk(protocol: ProtocolInfo, methodId: uint16): ThunkProc = + if methodId.int >= protocol.messages.len: return nil + protocol.messages[methodId.int].thunk + include libp2p_backends_common include eth/p2p/p2p_backends_helpers include eth/p2p/p2p_tracing @@ -178,7 +192,8 @@ proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer = result.id = id result.network = network result.awaitedMessages = initTable[CompressedMsgId, FutureBase]() - result.connectionState = Connected + result.maxInactivityAllowed = 15.minutes # TODO: read this from the config + result.connectionState = None newSeq result.protocolStates, allProtocols.len for i in 0 ..< allProtocols.len: let proto = allProtocols[i] @@ -189,52 +204,51 @@ proc init*[MsgName](T: type ResponderWithId[MsgName], peer: Peer, reqId: uint64): T = T(peer: peer, reqId: reqId) -proc performProtocolHandshakes*(peer: Peer) {.async.} = - var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len) - for protocol in allProtocols: - if protocol.handshake != nil: - subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream)) - - await all(subProtocolsHandshakes) - debug "All protocols initialized", peer - -proc initializeConnection*(peer: Peer) {.async.} = - let daemon = peer.network.daemon +proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = try: - peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol]) - await performProtocolHandshakes(peer) + var unsentBytes = data.len + while true: + # TODO: this looks wrong. + # We are always trying to write the same data. + # Find all other places where such code is used. + unsentBytes -= await peer.rpcStream.transp.write(data) + if unsentBytes <= 0: return except CatchableError: - await reraiseAsPeerDisconnected(peer, "Failed to perform handshake") + await peer.disconnect(FaultOrError) + # this is usually a "(32) Broken pipe": + # FIXME: this exception should be caught somewhere in addMsgHandler() and + # sending should be retried a few times + raise -proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} = - let peer = daemon.peerFromStream(stream) - peer.rpcStream = stream - await performProtocolHandshakes(peer) +proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] = + return sendMsg(responder.peer, data) -proc accepts(d: Dispatcher, methodId: uint16): bool = - methodId.int < d.messages.len +proc sendErrorResponse(peer: Peer, reqId: uint64, + responseCode: ResponseCode): Future[void] = + var resp = ErrorResponse( + outerHeader: SpecOuterMsgHeader.init( + compression = NoCompression, + encoding = SszEncoding, + msgLen = uint64 sizeof(SpecInnerMsgHeader)), + innerHeader: SpecInnerMsgHeader( + reqId: reqId, + methodId: uint16(responseCode))) -proc invokeThunk(peer: Peer, - protocol: ProtocolInfo, - stream: P2PStream, - methodId: int, - reqId: uint64, - msgContents: ByteStreamVar): Future[void] = - template raiseInvalidMsgId = - raise newException(InvalidMsgIdError, - "ETH2 message with an invalid id " & $methodId) + # TODO: don't allocate the Bytes sequence here + return peer.sendMsg @(makeOpenArray(cast[ptr byte](addr resp), sizeof resp)) - if methodId >= protocol.dispatcher.messages.len: raiseInvalidMsgId() - var thunk = protocol.dispatcher.messages[methodId].thunk - if thunk == nil: raiseInvalidMsgId() - - return thunk(peer, stream, reqId, msgContents) - -proc recvAndDispatchMsg*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream): - Future[PeerLoopExitReason] {.async.} = +proc recvAndDispatchMsg*(peer: Peer): Future[PeerLoopExitReason] {.async.} = template fail(reason) = return reason + # For now, we won't try to handle the presence of multiple sub-protocols + # since the spec is not defining how they will be mapped to P2P streams. + doAssert allProtocols.len == 1 + + var + stream = peer.rpcStream + protocol = allProtocols[0] + var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader) if outerHeader.compression != NoCompression: @@ -246,44 +260,51 @@ proc recvAndDispatchMsg*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream): if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64: fail ProtocolViolation - var innerHeader = await stream.readPackedObject(SpecInnerMsgHeader) + let + innerHeader = await stream.readPackedObject(SpecInnerMsgHeader) + reqId = innerHeader.reqId var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64) await stream.transp.readExactly(addr msgContent[0], msgContent.len) var msgContentStream = memoryStream(msgContent) - if protocol.dispatcher.accepts(innerHeader.methodId): - try: - await invokeThunk(peer, protocol, stream, - innerHeader.methodId.int, - innerHeader.reqId, - msgContentStream) - except SerializationError: - fail ProtocolViolation - except CatchableError: - warn "" + if reqId.isOdd: + peer.outstandingRequests.withValue(reqId, req): + let thunk = req.responseThunk + let reqFuture = req.future + peer.outstandingRequests.del(reqId) -proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = - try: - var unsentBytes = data.len - while true: - unsentBytes -= await peer.rpcStream.transp.write(data) - if unsentBytes <= 0: return - except: - await peer.disconnect(FaultOrError) - # this is usually a "(32) Broken pipe": - # FIXME: this exception should be caught somewhere in addMsgHandler() and - # sending should be retried a few times - raise + try: + await thunk(peer, stream, reqId, reqFuture, msgContentStream) + except SerializationError: + debug "Error during deserialization", err = getCurrentExceptionMsg() + fail ProtocolViolation + except CatchableError: + # TODO + warn "" + do: + debug "Ignoring late or invalid response ID", peer, id = reqId + # TODO: skip the message + else: + let thunk = protocol.getThunk(innerHeader.methodId) + if thunk != nil: + try: + await thunk(peer, stream, reqId, nil, msgContentStream) + except SerializationError: + debug "Error during deserialization", err = getCurrentExceptionMsg() + fail ProtocolViolation + except CatchableError: + # TODO + warn "" + else: + debug "P2P request method not found", methodId = innerHeader.methodId + await peer.sendErrorResponse(reqId, MethodNotFound) -proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] = - return sendMsg(responder.peer, data) - -proc dispatchMessages*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream): - Future[PeerLoopExitReason] {.async.} = +proc dispatchMessages*(peer: Peer): Future[PeerLoopExitReason] {.async.} = while true: - let dispatchedMsgFut = recvAndDispatchMsg(peer, protocol, stream) + let dispatchedMsgFut = recvAndDispatchMsg(peer) + doAssert peer.maxInactivityAllowed.milliseconds > 0 yield dispatchedMsgFut or sleepAsync(peer.maxInactivityAllowed) if not dispatchedMsgFut.finished: return InactivePeer @@ -295,87 +316,70 @@ proc dispatchMessages*(peer: Peer, protocol: ProtocolInfo, stream: P2PStream): if status == Success: continue return status -proc registerRequest(peer: Peer, - protocol: ProtocolInfo, - timeout: Duration, - responseFuture: FutureBase, - responseMethodId: uint16): uint64 = - inc peer.lastSentMsgId - result = peer.lastSentMsgId +proc performProtocolHandshakes*(peer: Peer) {.async.} = + peer.initProtocolStates allProtocols - let timeoutAt = Moment.fromNow(timeout) - let req = OutstandingRequest(id: result, - future: responseFuture, - timeoutAt: timeoutAt) - peer.outstandingRequests[responseMethodId.int].addLast req + # Please note that the ordering of operations here is important! + # + # We must first start all handshake procedures and give them a + # chance to send any initial packages they might require over + # the network and to yield on their `nextMsg` waits. + # + var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len) + for protocol in allProtocols: + if protocol.handshake != nil: + subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream)) - let requestResolver = protocol.dispatcher.messages[responseMethodId.int].requestResolver - proc timeoutExpired(udata: pointer) = requestResolver(nil, responseFuture) + # The `dispatchMesssages` loop must be started after this. + # Otherwise, we risk that some of the handshake packets sent by + # the other peer may arrrive too early and be processed before + # the handshake code got a change to wait for them. + # + var messageProcessingLoop = peer.dispatchMessages() + messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} = + if messageProcessingLoop.failed: + debug "Ending dispatchMessages loop", peer, + err = messageProcessingLoop.error.msg + else: + debug "Ending dispatchMessages", peer, + exitCode = messageProcessingLoop.read + traceAsyncErrors peer.disconnect(ClientShutdown) - addTimer(timeoutAt, timeoutExpired, nil) + # The handshake may involve multiple async steps, so we wait + # here for all of them to finish. + # + await all(subProtocolsHandshakes) + + peer.connectionState = Connected + debug "Peer connection initialized", peer + +proc initializeConnection*(peer: Peer) {.async.} = + let daemon = peer.network.daemon + try: + peer.connectionState = Connecting + peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol]) + await performProtocolHandshakes(peer) + except CatchableError: + await reraiseAsPeerDisconnected(peer, "Failed to perform handshake") + +proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} = + let peer = daemon.peerFromStream(stream) + peer.rpcStream = stream + peer.connectionState = Connecting + await performProtocolHandshakes(peer) proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo, - methodId: int, msg: pointer, reqId: uint64) = - logScope: - msg = protocol.dispatcher.messages[methodId].name - msgContents = protocol.dispatcher.messages[methodId].printer(msg) - receivedReqId = reqId - remotePeer = peer.id - - template resolve(future) = - (protocol.dispatcher.messages[methodId].requestResolver)(msg, future) - - template outstandingReqs: auto = - peer.outstandingRequests[methodId] + methodId: int, msg: pointer, reqFuture: FutureBase) = let msgId = (protocolIdx: protocol.index, methodId: methodId) + if peer.awaitedMessages[msgId] != nil: - let msgInfo = protocol.dispatcher.messages[methodId] + let msgInfo = protocol.messages[methodId] msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId]) peer.awaitedMessages[msgId] = nil - # TODO: This is not completely sound because we are still using a global - # `reqId` sequence (the problem is that we might get a response ID that - # matches a request ID for a different type of request). To make the code - # correct, we can use a separate sequence per response type, but we have - # to first verify that the other Ethereum clients are supporting this - # correctly (because then, we'll be reusing the same reqIds for different - # types of requests). Alternatively, we can assign a separate interval in - # the `reqId` space for each type of response. - if reqId > peer.lastSentMsgId: - warn "RLPx response without a matching request" - return - - var idx = 0 - while idx < outstandingReqs.len: - template req: auto = outstandingReqs()[idx] - - if req.future.finished: - doAssert req.timeoutAt <= Moment.now() - # Here we'll remove the expired request by swapping - # it with the last one in the deque (if necessary): - if idx != outstandingReqs.len - 1: - req = outstandingReqs.popLast - continue - else: - outstandingReqs.shrink(fromLast = 1) - # This was the last item, so we don't have any - # more work to do: - return - - if req.id == reqId: - resolve req.future - # Here we'll remove the found request by swapping - # it with the last one in the deque (if necessary): - if idx != outstandingReqs.len - 1: - req = outstandingReqs.popLast - else: - outstandingReqs.shrink(fromLast = 1) - return - - inc idx - - debug "late or duplicate reply for a network request" + if reqFuture != nil and not reqFuture.finished: + protocol.messages[methodId].requestResolver(msg, reqFuture) proc initProtocol(name: string, version: int, peerInit: PeerStateInitializer, @@ -423,28 +427,72 @@ proc prepareRequest(peer: Peer, stream: OutputStreamVar, timeout: Duration, responseFuture: FutureBase): DelayedWriteCursor = + assert peer != nil and + protocol != nil and + responseFuture != nil and + responseMethodId.int < protocol.messages.len - let reqId = registerRequest(peer, protocol, timeout, - responseFuture, responseMethodId) + doAssert timeout.milliseconds > 0 result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + inc peer.lastReqId, 2 + let reqId = peer.lastReqId + stream.appendPackedObject SpecInnerMsgHeader( - reqId: reqId, - methodId: requestMethodId) + reqId: reqId, methodId: requestMethodId) + + template responseMsgInfo: auto = + protocol.messages[responseMethodId.int] + + let + requestResolver = responseMsgInfo.requestResolver + timeoutAt = Moment.fromNow(timeout) + + peer.outstandingRequests[reqId + 1] = OutstandingRequest( + id: reqId, + future: responseFuture, + timeoutAt: timeoutAt, + responseThunk: responseMsgInfo.thunk) + + proc timeoutExpired(udata: pointer) = + requestResolver(nil, responseFuture) + peer.outstandingRequests.del(reqId + 1) + + addTimer(timeoutAt, timeoutExpired, nil) proc prepareResponse(responder: ResponderWithId, stream: OutputStreamVar): DelayedWriteCursor = result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + stream.appendPackedObject SpecInnerMsgHeader( + reqId: responder.reqId + 1, + methodId: uint16(Success)) + +proc prepareMsg(peer: Peer, methodId: uint16, + stream: OutputStreamVar): DelayedWriteCursor = + result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) + + inc peer.lastReqId, 2 + stream.appendPackedObject SpecInnerMsgHeader( + reqId: peer.lastReqId, methodId: methodId) + +proc finishOuterHeader(headerCursor: DelayedWriteCursor) = + var outerHeader = SpecOuterMsgHeader.init( + compression = NoCompression, + encoding = SszEncoding, + msgLen = uint64(headerCursor.totalBytesWrittenAfterCursor)) + + headerCursor.endWrite makeOpenArray(cast[ptr byte](addr outerHeader), + sizeof outerHeader) + proc implementSendProcBody(sendProc: SendProc) = let msg = sendProc.msg delayedWriteCursor = ident "delayedWriteCursor" peer = sendProc.peerParam - proc preludeGenerator(stream: NimNode): NimNode = - result = newStmtList() + proc preSerializationStep(stream: NimNode): NimNode = case msg.kind of msgRequest: let @@ -453,15 +501,22 @@ proc implementSendProcBody(sendProc: SendProc) = protocol = sendProc.msg.protocol.protocolInfoVar timeout = sendProc.timeoutParam - result.add quote do: - let `delayedWriteCursor` = `prepareRequest`( + quote do: + var `delayedWriteCursor` = prepareRequest( `peer`, `protocol`, `requestMethodId`, `responseMethodId`, `stream`, `timeout`, `resultIdent`) + of msgResponse: - result.add quote do: - let `delayedWriteCursor` = `prepareResponse`(`peer`, `stream`) - else: - discard + quote do: + var `delayedWriteCursor` = prepareResponse(`peer`, `stream`) + + of msgHandshake, msgNotification: + let methodId = newLit(msg.id) + quote do: + var `delayedWriteCursor` = prepareMsg(`peer`, `methodId`, `stream`) + + proc postSerializationStep(stream: NimNode): NimNode = + newCall(bindSym "finishOuterHeader", delayedWriteCursor) proc sendCallGenerator(peer, bytes: NimNode): NimNode = let @@ -471,7 +526,7 @@ proc implementSendProcBody(sendProc: SendProc) = if msg.kind == msgRequest: # In RLPx requests, the returned future was allocated here and passed - # to `registerRequest`. It's already assigned to the result variable + # to `prepareRequest`. It's already assigned to the result variable # of the proc, so we just wait for the sending operation to complete # and we return in a normal way. (the waiting is done, so we can catch # any possible errors). @@ -481,7 +536,10 @@ proc implementSendProcBody(sendProc: SendProc) = # `sendMsg` call. quote: return `sendCall` - sendProc.useStandardBody(preludeGenerator, sendCallGenerator) + sendProc.useStandardBody( + preSerializationStep, + postSerializationStep, + sendCallGenerator) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = let @@ -492,7 +550,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = Format = ident "SSZ" Response = bindSym "Response" ResponderWithId = bindSym "ResponderWithId" - perProtocolMsgId = ident"perProtocolMsgId" + perProtocolMsgId = ident "perProtocolMsgId" mount = bindSym "mount" @@ -508,6 +566,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = stream = ident "stream" protocol = ident "protocol" response = ident "response" + reqFutureVar = ident "reqFuture" msgContents = ident "msgContents" receivedMsg = ident "receivedMsg" @@ -546,12 +605,12 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = else: newStmtList() - let callResolvePendingFutures = if msg.kind == msgResponse: - newCall(resolvePendingFutures, - peerVar, protocol.protocolInfoVar, - msgIdLit, newCall("addr", receivedMsg), reqIdVar) - else: - newStmtList() + let callResolvePendingFutures = newCall( + resolvePendingFutures, peerVar, + protocol.protocolInfoVar, + msgIdLit, + newCall("addr", receivedMsg), + reqFutureVar) var userHandlerParams = @[peerVar] if msg.kind == msgRequest: @@ -565,6 +624,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = proc `thunkName`(`peerVar`: `Peer`, `stream`: `P2PStream`, `reqIdVar`: uint64, + `reqFutureVar`: FutureBase, `msgContents`: `ByteStreamVar`) {.async, gcsafe.} = var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`) `traceMsg` diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 9ab26f2fb..ecd777790 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -4,10 +4,6 @@ import spec/[datatypes, crypto, digest, helpers], eth/rlp, beacon_node_types, eth2_network, beacon_chain_db, block_pool, time, ssz -from beacon_node import onBeaconBlock - # Careful handling of beacon_node <-> sync_protocol - # to avoid recursive dependencies - type ValidatorChangeLogEntry* = object case kind*: ValidatorSetDeltaFlags @@ -49,7 +45,7 @@ proc fromHeaderAndBody(b: var BeaconBlock, h: BeaconBlockHeader, body: BeaconBlo proc importBlocks(node: BeaconNode, blocks: openarray[BeaconBlock]) = for blk in blocks: - node.onBeaconBlock(blk) + node.onBeaconBlock(node, blk) info "Forward sync imported blocks", len = blocks.len proc mergeBlockHeadersAndBodies(headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]): Option[seq[BeaconBlock]] = @@ -105,7 +101,7 @@ p2pProtocol BeaconSync(version = 1, let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) if bestDiff >= 0: # Nothing to do? - trace "Nothing to sync", peer = peer.remote + debug "Nothing to sync", peer else: # TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the # connection if it's too big. @@ -124,7 +120,7 @@ p2pProtocol BeaconSync(version = 1, if lastSlot <= s: info "Slot did not advance during sync", peer break - + s = lastSlot + 1 else: break