From 5ce50b3aca789e38d49b6c10da3cf6484b70ca27 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 5 Aug 2019 03:00:49 +0300 Subject: [PATCH] Implement the latest networking spec https://github.com/ethereum/eth2.0-specs/pull/1328 --- beacon_chain/attestation_pool.nim | 6 +- beacon_chain/beacon_chain_db.nim | 4 + beacon_chain/beacon_node.nim | 48 +- beacon_chain/beacon_node_types.nim | 2 + beacon_chain/eth2_network.nim | 53 +- beacon_chain/genesis.nim | 6 +- beacon_chain/libp2p_backend.nim | 200 ++++++-- beacon_chain/libp2p_backends_common.nim | 74 --- beacon_chain/libp2p_spec_backend.nim | 645 ------------------------ beacon_chain/spec/crypto.nim | 7 +- beacon_chain/spec/datatypes.nim | 11 +- beacon_chain/ssz.nim | 30 +- beacon_chain/ssz/bytes_reader.nim | 22 +- beacon_chain/ssz/types.nim | 6 +- beacon_chain/sync_protocol.nim | 117 ++++- beacon_chain/version.nim | 14 +- scripts/testnet1.env | 2 +- 17 files changed, 386 insertions(+), 861 deletions(-) delete mode 100644 beacon_chain/libp2p_backends_common.nim delete mode 100644 beacon_chain/libp2p_spec_backend.nim diff --git a/beacon_chain/attestation_pool.nim b/beacon_chain/attestation_pool.nim index 53876c54d..781c79f4b 100644 --- a/beacon_chain/attestation_pool.nim +++ b/beacon_chain/attestation_pool.nim @@ -28,6 +28,8 @@ proc combine*(tgt: var Attestation, src: Attestation, flags: UpdateFlags) = if skipValidation notin flags: tgt.signature.combine(src.signature) + else: + debug "Ignoring overlapping attestations" proc validate( state: BeaconState, attestation: Attestation, flags: UpdateFlags): bool = @@ -61,11 +63,11 @@ proc validate( finalizedEpoch = humaneEpochNum(state.finalized_checkpoint.epoch) return - if not allIt(attestation.custody_bits.bytes, it == 0): + if not attestation.custody_bits.BitSeq.isZeros: notice "Invalid custody bitfield for phase 0" return false - if not anyIt(attestation.aggregation_bits.bytes, it != 0): + if attestation.aggregation_bits.BitSeq.isZeros: notice "Empty aggregation bitfield" return false diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index 3947d7ebd..39bbceceb 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -106,6 +106,10 @@ proc get(db: BeaconChainDB, key: auto, T: typedesc): Option[T] = proc getBlock*(db: BeaconChainDB, key: Eth2Digest): Option[BeaconBlock] = db.get(subkey(BeaconBlock, key), BeaconBlock) +proc getBlock*(db: BeaconChainDB, slot: Slot): Option[BeaconBlock] = + # TODO implement this + discard + proc getState*(db: BeaconChainDB, key: Eth2Digest): Option[BeaconState] = db.get(subkey(BeaconState, key), BeaconState) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index bf85d132e..9636a4be2 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -10,8 +10,11 @@ import sync_protocol, request_manager, genesis const - topicBeaconBlocks = "ethereum/2.1/beacon_chain/blocks" - topicAttestations = "ethereum/2.1/beacon_chain/attestations" + topicBeaconBlocks = "/eth2/beacon_block/ssz" + topicAttestations = "/eth2/beacon_attestation/ssz" + topicVoluntaryExits = "/eth2/voluntary_exit/ssz" + topicProposerSlashings = "/eth2/proposer_slashing/ssz" + topicAttesterSlashings = "/eth2/attester_slashing/ssz" dataDirValidators = "validators" networkMetadataFile = "network.json" @@ -101,6 +104,7 @@ proc initGenesis(node: BeaconNode) {.async.} = quit 1 info "Got genesis state", hash = hash_tree_root(tailState) + node.forkVersion = tailState.fork.current_version try: let tailBlock = get_initial_beacon_block(tailState) @@ -159,6 +163,20 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async if metadataErrorMsg.len > 0: fail "To connect to the ", conf.network, " network, please compile with", metadataErrorMsg + for bootNode in result.networkMetadata.bootstrapNodes: + if bootNode.isSameNode(result.networkIdentity): + result.isBootstrapNode = true + else: + result.bootstrapNodes.add bootNode + + for bootNode in conf.bootstrapNodes: + result.bootstrapNodes.add BootstrapAddr.init(bootNode) + + let bootstrapFile = string conf.bootstrapNodesFile + if bootstrapFile.len > 0: + for ln in lines(bootstrapFile): + result.bootstrapNodes.add BootstrapAddr.init(string ln) + result.attachedValidators = ValidatorPool.init init result.mainchainMonitor, "", Port(0) # TODO: specify geth address and port @@ -176,14 +194,12 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async result.blockPool = BlockPool.init(result.db) result.attestationPool = AttestationPool.init(result.blockPool) - result.network = await createEth2Node(conf) + result.network = await createEth2Node(conf, result.bootstrapNodes) result.requestManager.init result.network # TODO sync is called when a remote peer is connected - is that the right # time to do so? let sync = result.network.protocolState(BeaconSync) - sync.chainId = 0 # TODO specify chainId - sync.networkId = result.networkMetadata.networkId sync.node = result sync.db = result.db @@ -211,28 +227,12 @@ template withState( body proc connectToNetwork(node: BeaconNode) {.async.} = - var bootstrapNodes = newSeq[BootstrapAddr]() - - for bootNode in node.networkMetadata.bootstrapNodes: - if bootNode.isSameNode(node.networkIdentity): - node.isBootstrapNode = true - else: - bootstrapNodes.add bootNode - - for bootNode in node.config.bootstrapNodes: - bootstrapNodes.add BootstrapAddr.init(bootNode) - - let bootstrapFile = string node.config.bootstrapNodesFile - if bootstrapFile.len > 0: - for ln in lines(bootstrapFile): - bootstrapNodes.add BootstrapAddr.init(string ln) - - if bootstrapNodes.len > 0: - info "Connecting to bootstrap nodes", bootstrapNodes + if node.bootstrapNodes.len > 0: + info "Connecting to bootstrap nodes", bootstrapNodes = node.bootstrapNodes else: info "Waiting for connections" - await node.network.connectToNetwork(bootstrapNodes) + await node.network.connectToNetwork(node.bootstrapNodes) template findIt(s: openarray, predicate: untyped): int = var res = -1 diff --git a/beacon_chain/beacon_node_types.nim b/beacon_chain/beacon_node_types.nim index 641d6200c..f57cca4d7 100644 --- a/beacon_chain/beacon_node_types.nim +++ b/beacon_chain/beacon_node_types.nim @@ -14,10 +14,12 @@ type BeaconNode* = ref object nickname*: string network*: Eth2Node + forkVersion*: array[4, byte] networkIdentity*: Eth2NodeIdentity networkMetadata*: NetworkMetadata requestManager*: RequestManager isBootstrapNode*: bool + bootstrapNodes*: seq[BootstrapAddr] db*: BeaconChainDB config*: BeaconNodeConf attachedValidators*: ValidatorPool diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 9d7cedb79..f3028fb50 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -99,7 +99,8 @@ when networkBackend == rlpxBackend: proc readValue*(reader: var JsonReader, value: var BootstrapAddr) {.inline.} = value = initENode reader.readValue(string) - proc createEth2Node*(conf: BeaconNodeConf): Future[EthereumNode] {.async.} = + proc createEth2Node*(conf: BeaconNodeConf, + bootstrapNodes: seq[BootstrapAddr]): Future[EthereumNode] {.async.} = let keys = getPersistentNetIdentity(conf) (ip, tcpPort, udpPort) = setupNat(conf) @@ -125,25 +126,19 @@ else: import os, random, stew/io, libp2p/crypto/crypto, libp2p/daemon/daemonapi, eth/async_utils, - ssz + ssz, libp2p_backend - when networkBackend == libp2pSpecBackend: - import libp2p_spec_backend - export libp2p_spec_backend - const netBackendName* = "libp2p_spec" + export + libp2p_backend - else: - import libp2p_backend - export libp2p_backend - const netBackendName* = "libp2p_native" + const + netBackendName* = "libp2p" + networkKeyFilename = "privkey.protobuf" type BootstrapAddr* = PeerInfo Eth2NodeIdentity* = PeerInfo - const - networkKeyFilename = "privkey.protobuf" - proc init*(T: type BootstrapAddr, str: string): T = Json.decode(str, PeerInfo) @@ -168,7 +163,13 @@ else: var mainDaemon: DaemonAPI - proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} = + proc allMultiAddresses(nodes: seq[BootstrapAddr]): seq[string] = + for node in nodes: + for a in node.addresses: + result.add $a & "/ipfs/" & node.peer.pretty() + + proc createEth2Node*(conf: BeaconNodeConf, + bootstrapNodes: seq[BootstrapAddr]): Future[Eth2Node] {.async.} = var (extIp, extTcpPort, extUdpPort) = setupNat(conf) hostAddress = tcpEndPoint(globalListeningAddr, Port conf.tcpPort) @@ -176,11 +177,25 @@ else: else: @[tcpEndPoint(extIp, extTcpPort)] keyFile = conf.ensureNetworkIdFile - info "Starting the LibP2P daemon", hostAddress, announcedAddresses, keyFile - mainDaemon = await newDaemonApi({PSGossipSub}, - id = keyFile, - hostAddresses = @[hostAddress], - announcedAddresses = announcedAddresses) + info "Starting the LibP2P daemon", hostAddress, announcedAddresses, + keyFile, bootstrapNodes + + var daemonFut = if bootstrapNodes.len == 0: + newDaemonApi({DHTFull, PSGossipSub}, + id = keyFile, + hostAddresses = @[hostAddress], + announcedAddresses = announcedAddresses) + else: + newDaemonApi({DHTFull, PSGossipSub, WaitBootstrap}, + id = keyFile, + hostAddresses = @[hostAddress], + announcedAddresses = announcedAddresses, + bootstrapNodes = allMultiAddresses(bootstrapNodes), + peersRequired = 1) + + info "Deamon started" + + mainDaemon = await daemonFut proc closeDaemon() {.noconv.} = info "Shutting down the LibP2P daemon" diff --git a/beacon_chain/genesis.nim b/beacon_chain/genesis.nim index 69d381b70..b8e4f56a7 100644 --- a/beacon_chain/genesis.nim +++ b/beacon_chain/genesis.nim @@ -1,5 +1,7 @@ -import conf, chronos, web3, json, - spec/[bitfield, datatypes, digest, crypto, beaconstate, helpers, validator], extras +import + chronos, web3, json, + spec/[datatypes, digest, crypto, beaconstate, helpers, validator], + conf, extras contract(DepositContract): proc deposit(pubkey: Bytes48, withdrawalCredentials: Bytes32, signature: Bytes96) diff --git a/beacon_chain/libp2p_backend.nim b/beacon_chain/libp2p_backend.nim index 8d87cd7de..8d9aa775a 100644 --- a/beacon_chain/libp2p_backend.nim +++ b/beacon_chain/libp2p_backend.nim @@ -1,6 +1,6 @@ import algorithm, - stew/shims/[macros, tables], chronos, chronicles, + stew/varints, stew/shims/[macros, tables], chronos, chronicles, libp2p/daemon/daemonapi, faststreams/output_stream, serialization, json_serialization/std/options, eth/p2p/p2p_protocol_dsl, libp2p_json_serialization, ssz @@ -32,8 +32,8 @@ type Disconnected DisconnectionReason* = enum - UselessPeer - BreachOfProtocol + ClientShutDown + IrrelevantNetwork FaultOrError UntypedResponder = object @@ -70,7 +70,6 @@ type ResponseCode* = enum Success - EncodingError InvalidRequest ServerError @@ -92,13 +91,84 @@ type const defaultIncomingReqTimeout = 5000 defaultOutgoingReqTimeout = 10000 - HandshakeTimeout = BreachOfProtocol + HandshakeTimeout = FaultOrError + RQRP_MAX_SIZE = 2 * 1024 * 1024 - IrrelevantNetwork* = UselessPeer +template `$`*(peer: Peer): string = $peer.id +chronicles.formatIt(Peer): $it + +template libp2pProtocol*(name: string, version: int) {.pragma.} include eth/p2p/p2p_backends_helpers include eth/p2p/p2p_tracing -include libp2p_backends_common + +proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.} + +proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = + result = node.peers.getOrDefault(peerId) + if result == nil: + result = Peer.init(node, peerId) + node.peers[peerId] = result + +proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} = + Eth2Node(daemon.userData).getPeer(stream.peer) + +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = + # TODO: How should we notify the other peer? + if peer.connectionState notin {Disconnecting, Disconnected}: + peer.connectionState = Disconnecting + await peer.network.daemon.disconnect(peer.id) + peer.connectionState = Disconnected + peer.network.peers.del(peer.id) + +template raisePeerDisconnected(msg: string, r: DisconnectionReason) = + var e = newException(PeerDisconnected, msg) + e.reason = r + raise e + +proc disconnectAndRaise(peer: Peer, + reason: DisconnectionReason, + msg: string) {.async.} = + let r = reason + await peer.disconnect(r) + raisePeerDisconnected(msg, r) + +template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string, + reason = FaultOrError): auto = + const errMsg = errMsgExpr + debug errMsg, err = getCurrentExceptionMsg() + disconnectAndRaise(peer, reason, errMsg) + +proc getCompressedMsgId*(MsgType: type): CompressedMsgId = + mixin msgId, msgProtocol, protocolInfo + (protocolIdx: MsgType.msgProtocol.protocolInfo.index, + methodId: MsgType.msgId) + +proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = + ## This procs awaits a specific P2P message. + ## Any messages received while waiting will be dispatched to their + ## respective handlers. The designated message handler will also run + ## to completion before the future returned by `nextMsg` is resolved. + let awaitedMsgId = getCompressedMsgId(MsgType) + let f = getOrDefault(peer.awaitedMessages, awaitedMsgId) + if not f.isNil: + return Future[MsgType](f) + + initFuture result + peer.awaitedMessages[awaitedMsgId] = result + +proc registerProtocol(protocol: ProtocolInfo) = + # TODO: This can be done at compile-time in the future + let pos = lowerBound(gProtocols, protocol) + gProtocols.insert(protocol, pos) + for i in 0 ..< gProtocols.len: + gProtocols[i].index = i + +proc setEventHandlers(p: ProtocolInfo, + handshake: HandshakeStep, + disconnectHandler: DisconnectionHandler) = + p.handshake = handshake + p.disconnectHandler = disconnectHandler proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[T] {.async.} = new result @@ -120,6 +190,27 @@ proc readMsg(stream: P2PStream, withResponseCode: bool, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} +proc readSizePrefix(transp: StreamTransport, + deadline: Future[void]): Future[int] {.async.} = + var parser: VarintParser[uint64, ProtoBuf] + while true: + var nextByte: byte + var readNextByte = transp.readExactly(addr nextByte, 1) + await readNextByte or deadline + if not readNextByte.finished: + return -1 + case parser.feedByte(nextByte) + of Done: + let res = parser.getResult + if res > uint64(RQRP_MAX_SIZE): + return -1 + else: + return int(res) + of Overflow: + return -1 + of Incomplete: + continue + proc readMsgBytes(stream: P2PStream, withResponseCode: bool, deadline: Future[void]): Future[Bytes] {.async.} = @@ -127,15 +218,13 @@ proc readMsgBytes(stream: P2PStream, var responseCode: byte var readResponseCode = stream.transp.readExactly(addr responseCode, 1) await readResponseCode or deadline - if not readResponseCode.finished: return + if not readResponseCode.finished: + return if responseCode > ResponseCode.high.byte: return logScope: responseCode = ResponseCode(responseCode) case ResponseCode(responseCode) - of InvalidRequest: - debug "P2P request was classified as invalid" - return - of EncodingError, ServerError: + of InvalidRequest, ServerError: let responseErrMsg = await readMsg(stream, string, false, deadline) debug "P2P request resulted in error", responseErrMsg return @@ -143,18 +232,17 @@ proc readMsgBytes(stream: P2PStream, # The response is OK, the execution continues below discard - var sizePrefix: uint32 - var readSizePrefix = stream.transp.readExactly(addr sizePrefix, sizeof(sizePrefix)) - await readSizePrefix or deadline - if not readSizePrefix.finished: return + var sizePrefix = await readSizePrefix(stream.transp, deadline) + if sizePrefix < -1: + debug "Failed to read an incoming message size prefix", peer = stream.peer + return if sizePrefix == 0: debug "Received SSZ with zero size", peer = stream.peer return - var msgBytes = newSeq[byte](sizePrefix.int + sizeof(sizePrefix)) - copyMem(addr msgBytes[0], addr sizePrefix, sizeof(sizePrefix)) - var readBody = stream.transp.readExactly(addr msgBytes[sizeof(sizePrefix)], sizePrefix.int) + var msgBytes = newSeq[byte](sizePrefix) + var readBody = stream.transp.readExactly(addr msgBytes[0], sizePrefix) await readBody or deadline if not readBody.finished: return @@ -178,6 +266,13 @@ proc readMsg(stream: P2PStream, msgBytes, errMsg = err.formatMsg("") return +proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = + var s = init OutputStream + s.append byte(responseCode) + s.appendVarint errMsg.len + s.appendValue SSZ, errMsg + s.getOutput + proc sendErrorResponse(peer: Peer, stream: P2PStream, err: ref SerializationError, @@ -186,33 +281,44 @@ proc sendErrorResponse(peer: Peer, debug "Received an invalid request", peer, msgName, msgBytes, errMsg = err.formatMsg("") - var responseCode = byte(EncodingError) - discard await stream.transp.write(addr responseCode, 1) + let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg")) + discard await stream.transp.write(responseBytes) await stream.close() proc sendErrorResponse(peer: Peer, stream: P2PStream, responseCode: ResponseCode, errMsg: string) {.async.} = - debug "Error processing request", - peer, responseCode, errMsg + debug "Error processing request", peer, responseCode, errMsg - var outputStream = init OutputStream - outputStream.append byte(responseCode) - outputStream.appendValue SSZ, errMsg - - discard await stream.transp.write(outputStream.getOutput) + let responseBytes = encodeErrorMsg(ServerError, errMsg) + discard await stream.transp.write(responseBytes) await stream.close() +proc writeSizePrefix(transp: StreamTransport, size: uint64) {.async.} = + var + varintBuf: array[10, byte] + varintSize = vsizeof(size) + cursor = createWriteCursor(varintBuf) + cursor.appendVarint size + var sent = await transp.write(varintBuf[0 ..< varintSize]) + if sent != varintSize: + raise newException(TransmissionError, "Failed to deliver size prefix") + proc sendMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = var stream = await peer.network.daemon.openStream(peer.id, @[protocolId]) # TODO how does openStream fail? Set a timeout here and handle it + await writeSizePrefix(stream.transp, uint64(requestBytes.len)) let sent = await stream.transp.write(requestBytes) if sent != requestBytes.len: - raise newException(TransmissionError, "Failed to deliver all bytes") + raise newException(TransmissionError, "Failed to deliver msg bytes") -proc sendBytes(stream: P2PStream, bytes: Bytes) {.async.} = - let sent = await stream.transp.write(bytes) +proc sendResponseBytes(stream: P2PStream, bytes: Bytes) {.async.} = + var sent = await stream.transp.write(@[byte Success]) + if sent != 1: + raise newException(TransmissionError, "Failed to deliver response code") + await writeSizePrefix(stream.transp, uint64(bytes.len)) + sent = await stream.transp.write(bytes) if sent != bytes.len: raise newException(TransmissionError, "Failed to deliver all bytes") @@ -228,6 +334,8 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, # Send the request let stream = streamFut.read + + await writeSizePrefix(stream.transp, requestBytes.len.uint64) let sent = await stream.transp.write(requestBytes) if sent != requestBytes.len: await disconnectAndRaise(peer, FaultOrError, "Incomplete send") @@ -241,7 +349,7 @@ proc exchangeHandshake(peer: Peer, protocolId: string, requestBytes: Bytes, var response = await makeEth2Request(peer, protocolId, requestBytes, ResponseMsg, timeout) if not response.isSome: - await peer.disconnectAndRaise(BreachOfProtocol, "Failed to complete a handshake") + await peer.disconnectAndRaise(FaultOrError, "Failed to complete a handshake") return response.get @@ -269,12 +377,11 @@ template handshakeImpl(outputStreamVar, handshakeSerializationCall: untyped, var responseFut = nextMsg(peer, HandshakeType) await lowLevelThunk(peer.network.daemon, stream) or deadline if not responseFut.finished: - await disconnectAndRaise(peer, BreachOfProtocol, "Failed to complete a handshake") + await disconnectAndRaise(peer, FaultOrError, "Failed to complete a handshake") var outputStreamVar = init OutputStream - append(outputStreamVar, byte(Success)) handshakeSerializationCall - await sendBytes(stream, getOutput(outputStreamVar)) + await sendResponseBytes(stream, getOutput(outputStreamVar)) return responseFut.read @@ -330,7 +437,16 @@ proc registerMsg(protocol: ProtocolInfo, printer: printer) proc getRequestProtoName(fn: NimNode): NimNode = - return newLit("/ETH/BeaconChain/" & $fn.name & "/1/SSZ") + # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes + # (TODO: file as an issue) + + let pragmas = fn.pragma + if pragmas.kind == nnkPragma and pragmas.len > 0: + for pragma in pragmas: + if pragma.len > 0 and $pragma[0] == "libp2pProtocol": + return pragma[1] + + return newLit("") proc init*[MsgType](T: type Responder[MsgType], peer: Peer, stream: P2PStream): T = @@ -363,17 +479,9 @@ proc implementSendProcBody(sendProc: SendProc) = else: quote: sendMsg(`peer`, `msgProto`, `bytes`) else: - quote: sendBytes(`UntypedResponder`(`peer`).stream, `bytes`) + quote: sendResponseBytes(`UntypedResponder`(`peer`).stream, `bytes`) - proc prependResponseCode(stream: NimNode): NimNode = - quote: append(`stream`, byte(Success)) - - let preSerializationStep = if msg.kind == msgResponse: - prependResponseCode - else: - nil - - sendProc.useStandardBody(preSerializationStep, nil, sendCallGenerator) + sendProc.useStandardBody(nil, nil, sendCallGenerator) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var diff --git a/beacon_chain/libp2p_backends_common.nim b/beacon_chain/libp2p_backends_common.nim deleted file mode 100644 index d88b874f8..000000000 --- a/beacon_chain/libp2p_backends_common.nim +++ /dev/null @@ -1,74 +0,0 @@ -# included from libp2p_backend - -template `$`*(peer: Peer): string = $peer.id - -chronicles.formatIt(Peer): $it - -proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer {.gcsafe.} - -proc getPeer*(node: Eth2Node, peerId: PeerID): Peer {.gcsafe.} = - result = node.peers.getOrDefault(peerId) - if result == nil: - result = Peer.init(node, peerId) - node.peers[peerId] = result - -proc peerFromStream(daemon: DaemonAPI, stream: P2PStream): Peer {.gcsafe.} = - Eth2Node(daemon.userData).getPeer(stream.peer) - -proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = - # TODO: How should we notify the other peer? - if peer.connectionState notin {Disconnecting, Disconnected}: - peer.connectionState = Disconnecting - await peer.network.daemon.disconnect(peer.id) - peer.connectionState = Disconnected - peer.network.peers.del(peer.id) - -template raisePeerDisconnected(msg: string, r: DisconnectionReason) = - var e = newException(PeerDisconnected, msg) - e.reason = r - raise e - -proc disconnectAndRaise(peer: Peer, - reason: DisconnectionReason, - msg: string) {.async.} = - let r = reason - await peer.disconnect(r) - raisePeerDisconnected(msg, r) - -template reraiseAsPeerDisconnected(peer: Peer, errMsgExpr: static string, - reason = FaultOrError): auto = - const errMsg = errMsgExpr - debug errMsg, err = getCurrentExceptionMsg() - disconnectAndRaise(peer, reason, errMsg) - -proc getCompressedMsgId*(MsgType: type): CompressedMsgId = - mixin msgId, msgProtocol, protocolInfo - (protocolIdx: MsgType.msgProtocol.protocolInfo.index, - methodId: MsgType.msgId) - -proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = - ## This procs awaits a specific P2P message. - ## Any messages received while waiting will be dispatched to their - ## respective handlers. The designated message handler will also run - ## to completion before the future returned by `nextMsg` is resolved. - let awaitedMsgId = getCompressedMsgId(MsgType) - let f = getOrDefault(peer.awaitedMessages, awaitedMsgId) - if not f.isNil: - return Future[MsgType](f) - - initFuture result - peer.awaitedMessages[awaitedMsgId] = result - -proc registerProtocol(protocol: ProtocolInfo) = - # TODO: This can be done at compile-time in the future - let pos = lowerBound(gProtocols, protocol) - gProtocols.insert(protocol, pos) - for i in 0 ..< gProtocols.len: - gProtocols[i].index = i - -proc setEventHandlers(p: ProtocolInfo, - handshake: HandshakeStep, - disconnectHandler: DisconnectionHandler) = - p.handshake = handshake - p.disconnectHandler = disconnectHandler - diff --git a/beacon_chain/libp2p_spec_backend.nim b/beacon_chain/libp2p_spec_backend.nim deleted file mode 100644 index 6678dbda2..000000000 --- a/beacon_chain/libp2p_spec_backend.nim +++ /dev/null @@ -1,645 +0,0 @@ -import - tables, deques, options, algorithm, stew/shims/[macros, tables], - stew/ranges/ptr_arith, chronos, chronicles, serialization, faststreams/input_stream, - eth/async_utils, eth/p2p/p2p_protocol_dsl, libp2p/daemon/daemonapi, - libp2p_json_serialization, ssz - -export - daemonapi, p2pProtocol, serialization, ssz, libp2p_json_serialization - -const - # Compression nibble - NoCompression* = byte 0 - - # Encoding nibble - SszEncoding* = byte 1 - - beaconChainProtocol = "/eth/serenity/beacon/rpc/1" - -type - Eth2Node* = ref object of RootObj - daemon*: DaemonAPI - peers*: Table[PeerID, Peer] - protocolStates*: seq[RootRef] - - EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers - - Peer* = ref object - network*: Eth2Node - id*: PeerID - lastReqId*: uint64 - rpcStream*: P2PStream - connectionState*: ConnectionState - awaitedMessages: Table[CompressedMsgId, FutureBase] - outstandingRequests*: Table[uint64, OutstandingRequest] - protocolStates*: seq[RootRef] - maxInactivityAllowed*: Duration - - ConnectionState* = enum - None, - Connecting, - Connected, - Disconnecting, - Disconnected - - DisconnectionReason* = enum - ClientShutdown = 1 - IrrelevantNetwork - FaultOrError - - CompressedMsgId = tuple - protocolIdx, methodId: int - - ResponderWithId*[MsgType] = object - peer*: Peer - reqId*: uint64 - - Response*[MsgType] = distinct Peer - - # ----------------------------------------- - - ResponseCode* = enum - NoError - ParseError = 10 - InvalidRequest = 20 - MethodNotFound = 30 - ServerError = 40 - - OutstandingRequest* = object - id*: uint64 - future*: FutureBase - timeoutAt*: Moment - responseThunk*: ThunkProc - - ProtocolConnection* = object - stream*: P2PStream - protocolInfo*: ProtocolInfo - - MessageInfo* = object - id*: int - name*: string - - # Private fields: - thunk*: ThunkProc - printer*: MessageContentPrinter - nextMsgResolver*: NextMsgResolver - requestResolver*: RequestResolver - - ProtocolInfoObj* = object - name*: string - version*: int - messages*: seq[MessageInfo] - index*: int # the position of the protocol in the - # ordered list of supported protocols - - # Private fields: - peerStateInitializer*: PeerStateInitializer - networkStateInitializer*: NetworkStateInitializer - handshake*: HandshakeStep - disconnectHandler*: DisconnectionHandler - - ProtocolInfo* = ptr ProtocolInfoObj - - SpecOuterMsgHeader {.packed.} = object - compression {.bitsize: 4.}: uint - encoding {.bitsize: 4.}: uint - msgLen: uint64 - - SpecInnerMsgHeader {.packed.} = object - reqId: uint64 - methodId: uint16 - - ErrorResponse {.packed.} = object - outerHeader: SpecOuterMsgHeader - innerHeader: SpecInnerMsgHeader - - PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} - NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe.} - - HandshakeStep* = proc(peer: Peer, handshakeStream: P2PStream): Future[void] {.gcsafe.} - DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} - - ThunkProc* = proc(peer: Peer, - stream: P2PStream, - reqId: uint64, - reqFuture: FutureBase, - msgData: ByteStreamVar): Future[void] {.gcsafe.} - - MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} - NextMsgResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.} - RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.} - - Bytes = seq[byte] - - InvalidMsgIdError = object of InvalidMsgError - - PeerDisconnected* = object of P2PBackendError - reason*: DisconnectionReason - - PeerLoopExitReason = enum - Success - UnsupportedCompression - UnsupportedEncoding - ProtocolViolation - InactivePeer - InternalError - -const - HandshakeTimeout = FaultOrError - BreachOfProtocol* = FaultOrError - # TODO: We should lobby for more disconnection reasons. - -template isOdd(val: SomeInteger): bool = - type T = type(val) - (val and T(1)) != 0 - -proc init(T: type SpecOuterMsgHeader, - compression, encoding: byte, msgLen: uint64): T = - T(compression: compression, encoding: encoding, msgLen: msgLen) - -proc readPackedObject(stream: P2PStream, T: type): Future[T] {.async.} = - await stream.transp.readExactly(addr result, sizeof result) - -proc appendPackedObject(stream: OutputStreamVar, value: auto) = - let valueAsBytes = cast[ptr byte](unsafeAddr(value)) - stream.append makeOpenArray(valueAsBytes, sizeof(value)) - -proc getThunk(protocol: ProtocolInfo, methodId: uint16): ThunkProc = - if methodId.int >= protocol.messages.len: return nil - protocol.messages[methodId.int].thunk - -include eth/p2p/p2p_backends_helpers -include eth/p2p/p2p_tracing -include libp2p_backends_common - -proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} - -proc init*(T: type Eth2Node, daemon: DaemonAPI): Future[Eth2Node] {.async.} = - new result - result.daemon = daemon - result.daemon.userData = result - result.peers = initTable[PeerID, Peer]() - - newSeq result.protocolStates, allProtocols.len - for proto in allProtocols: - if proto.networkStateInitializer != nil: - result.protocolStates[proto.index] = proto.networkStateInitializer(result) - - await daemon.addHandler(@[beaconChainProtocol], handleConnectingBeaconChainPeer) - -proc init*(T: type Peer, network: Eth2Node, id: PeerID): Peer = - new result - result.id = id - result.network = network - result.awaitedMessages = initTable[CompressedMsgId, FutureBase]() - result.maxInactivityAllowed = 15.minutes # TODO: read this from the config - result.connectionState = None - newSeq result.protocolStates, allProtocols.len - for i in 0 ..< allProtocols.len: - let proto = allProtocols[i] - if proto.peerStateInitializer != nil: - result.protocolStates[i] = proto.peerStateInitializer(result) - -proc init*[MsgName](T: type ResponderWithId[MsgName], - peer: Peer, reqId: uint64): T = - T(peer: peer, reqId: reqId) - -proc sendMsg*(peer: Peer, data: Bytes) {.gcsafe, async.} = - try: - var unsentBytes = data.len - while true: - # TODO: this looks wrong. - # We are always trying to write the same data. - # Find all other places where such code is used. - unsentBytes -= await peer.rpcStream.transp.write(data) - if unsentBytes <= 0: return - except CatchableError: - await peer.disconnect(FaultOrError) - # this is usually a "(32) Broken pipe": - # FIXME: this exception should be caught somewhere in addMsgHandler() and - # sending should be retried a few times - raise - -proc sendMsg*[T](responder: ResponderWithId[T], data: Bytes): Future[void] = - return sendMsg(responder.peer, data) - -proc sendErrorResponse(peer: Peer, reqId: uint64, - responseCode: ResponseCode): Future[void] = - var resp = ErrorResponse( - outerHeader: SpecOuterMsgHeader.init( - compression = NoCompression, - encoding = SszEncoding, - msgLen = uint64 sizeof(SpecInnerMsgHeader)), - innerHeader: SpecInnerMsgHeader( - reqId: reqId, - methodId: uint16(responseCode))) - - # TODO: don't allocate the Bytes sequence here - return peer.sendMsg @(makeOpenArray(cast[ptr byte](addr resp), sizeof resp)) - -proc recvAndDispatchMsg*(peer: Peer): Future[PeerLoopExitReason] {.async.} = - template fail(reason) = - return reason - - # For now, we won't try to handle the presence of multiple sub-protocols - # since the spec is not defining how they will be mapped to P2P streams. - doAssert allProtocols.len == 1 - - var - stream = peer.rpcStream - protocol = allProtocols[0] - - var outerHeader = await stream.readPackedObject(SpecOuterMsgHeader) - - if outerHeader.compression != NoCompression: - fail UnsupportedCompression - - if outerHeader.encoding != SszEncoding: - fail UnsupportedEncoding - - if outerHeader.msgLen <= SpecInnerMsgHeader.sizeof.uint64: - fail ProtocolViolation - - let - innerHeader = await stream.readPackedObject(SpecInnerMsgHeader) - reqId = innerHeader.reqId - - var msgContent = newSeq[byte](outerHeader.msgLen - SpecInnerMsgHeader.sizeof.uint64) - await stream.transp.readExactly(addr msgContent[0], msgContent.len) - - var msgContentStream = memoryStream(msgContent) - - if reqId.isOdd: - peer.outstandingRequests.withValue(reqId, req): - let thunk = req.responseThunk - let reqFuture = req.future - peer.outstandingRequests.del(reqId) - - try: - await thunk(peer, stream, reqId, reqFuture, msgContentStream) - except SerializationError: - debug "Error during deserialization", err = getCurrentExceptionMsg() - fail ProtocolViolation - except CatchableError: - # TODO - warn "" - do: - debug "Ignoring late or invalid response ID", peer, id = reqId - # TODO: skip the message - else: - let thunk = protocol.getThunk(innerHeader.methodId) - if thunk != nil: - try: - await thunk(peer, stream, reqId, nil, msgContentStream) - except SerializationError: - debug "Error during deserialization", err = getCurrentExceptionMsg() - fail ProtocolViolation - except CatchableError: - # TODO - warn "" - else: - debug "P2P request method not found", methodId = innerHeader.methodId - await peer.sendErrorResponse(reqId, MethodNotFound) - -proc dispatchMessages*(peer: Peer): Future[PeerLoopExitReason] {.async.} = - while true: - let dispatchedMsgFut = recvAndDispatchMsg(peer) - doAssert peer.maxInactivityAllowed.milliseconds > 0 - yield dispatchedMsgFut or sleepAsync(peer.maxInactivityAllowed) - if not dispatchedMsgFut.finished: - return InactivePeer - elif dispatchedMsgFut.failed: - error "Error in peer loop" - return InternalError - else: - let status = dispatchedMsgFut.read - if status == Success: continue - return status - -proc performProtocolHandshakes*(peer: Peer) {.async.} = - peer.initProtocolStates allProtocols - - # Please note that the ordering of operations here is important! - # - # We must first start all handshake procedures and give them a - # chance to send any initial packages they might require over - # the network and to yield on their `nextMsg` waits. - # - var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len) - for protocol in allProtocols: - if protocol.handshake != nil: - subProtocolsHandshakes.add((protocol.handshake)(peer, peer.rpcStream)) - - # The `dispatchMesssages` loop must be started after this. - # Otherwise, we risk that some of the handshake packets sent by - # the other peer may arrrive too early and be processed before - # the handshake code got a change to wait for them. - # - var messageProcessingLoop = peer.dispatchMessages() - messageProcessingLoop.callback = proc(p: pointer) {.gcsafe.} = - if messageProcessingLoop.failed: - debug "Ending dispatchMessages loop", peer, - err = messageProcessingLoop.error.msg - else: - debug "Ending dispatchMessages", peer, - exitCode = messageProcessingLoop.read - traceAsyncErrors peer.disconnect(ClientShutdown) - - # The handshake may involve multiple async steps, so we wait - # here for all of them to finish. - # - await all(subProtocolsHandshakes) - - peer.connectionState = Connected - debug "Peer connection initialized", peer - -proc initializeConnection*(peer: Peer) {.async.} = - let daemon = peer.network.daemon - try: - peer.connectionState = Connecting - peer.rpcStream = await daemon.openStream(peer.id, @[beaconChainProtocol]) - await performProtocolHandshakes(peer) - except CatchableError: - await reraiseAsPeerDisconnected(peer, "Failed to perform handshake") - -proc handleConnectingBeaconChainPeer(daemon: DaemonAPI, stream: P2PStream) {.async, gcsafe.} = - let peer = daemon.peerFromStream(stream) - peer.rpcStream = stream - peer.connectionState = Connecting - await performProtocolHandshakes(peer) - -proc resolvePendingFutures(peer: Peer, protocol: ProtocolInfo, - methodId: int, msg: pointer, reqFuture: FutureBase) = - - let msgId = (protocolIdx: protocol.index, methodId: methodId) - - if peer.awaitedMessages[msgId] != nil: - let msgInfo = protocol.messages[methodId] - msgInfo.nextMsgResolver(msg, peer.awaitedMessages[msgId]) - peer.awaitedMessages[msgId] = nil - - if reqFuture != nil and not reqFuture.finished: - protocol.messages[methodId].requestResolver(msg, reqFuture) - -proc initProtocol(name: string, version: int, - peerInit: PeerStateInitializer, - networkInit: NetworkStateInitializer): ProtocolInfoObj = - result.name = name - result.version = version - result.messages = @[] - result.peerStateInitializer = peerInit - result.networkStateInitializer = networkInit - -proc registerMsg(protocol: ProtocolInfo, - id: int, name: string, - thunk: ThunkProc, - printer: MessageContentPrinter, - requestResolver: RequestResolver, - nextMsgResolver: NextMsgResolver) = - if protocol.messages.len <= id: - protocol.messages.setLen(id + 1) - protocol.messages[id] = MessageInfo(id: id, - name: name, - thunk: thunk, - printer: printer, - requestResolver: requestResolver, - nextMsgResolver: nextMsgResolver) - -template applyDecorator(p: NimNode, decorator: NimNode) = - if decorator.kind != nnkNilLit: p.addPragma decorator - -proc prepareRequest(peer: Peer, - protocol: ProtocolInfo, - requestMethodId, responseMethodId: uint16, - stream: OutputStreamVar, - timeout: Duration, - responseFuture: FutureBase): DelayedWriteCursor = - assert peer != nil and - protocol != nil and - responseFuture != nil and - responseMethodId.int < protocol.messages.len - - doAssert timeout.milliseconds > 0 - - result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) - - inc peer.lastReqId, 2 - let reqId = peer.lastReqId - - stream.appendPackedObject SpecInnerMsgHeader( - reqId: reqId, methodId: requestMethodId) - - template responseMsgInfo: auto = - protocol.messages[responseMethodId.int] - - let - requestResolver = responseMsgInfo.requestResolver - timeoutAt = Moment.fromNow(timeout) - - peer.outstandingRequests[reqId + 1] = OutstandingRequest( - id: reqId, - future: responseFuture, - timeoutAt: timeoutAt, - responseThunk: responseMsgInfo.thunk) - - proc timeoutExpired(udata: pointer) = - requestResolver(nil, responseFuture) - peer.outstandingRequests.del(reqId + 1) - - addTimer(timeoutAt, timeoutExpired, nil) - -proc prepareResponse(responder: ResponderWithId, - stream: OutputStreamVar): DelayedWriteCursor = - result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) - - stream.appendPackedObject SpecInnerMsgHeader( - reqId: responder.reqId + 1, - methodId: uint16(Success)) - -proc prepareMsg(peer: Peer, methodId: uint16, - stream: OutputStreamVar): DelayedWriteCursor = - result = stream.delayFixedSizeWrite sizeof(SpecOuterMsgHeader) - - inc peer.lastReqId, 2 - stream.appendPackedObject SpecInnerMsgHeader( - reqId: peer.lastReqId, methodId: methodId) - -proc finishOuterHeader(headerCursor: DelayedWriteCursor) = - var outerHeader = SpecOuterMsgHeader.init( - compression = NoCompression, - encoding = SszEncoding, - msgLen = uint64(headerCursor.totalBytesWrittenAfterCursor)) - - headerCursor.endWrite makeOpenArray(cast[ptr byte](addr outerHeader), - sizeof outerHeader) - -proc implementSendProcBody(sendProc: SendProc) = - let - msg = sendProc.msg - delayedWriteCursor = ident "delayedWriteCursor" - peer = sendProc.peerParam - - proc preSerializationStep(stream: NimNode): NimNode = - case msg.kind - of msgRequest: - let - requestMethodId = newLit(msg.id) - responseMethodId = newLit(msg.response.id) - protocol = sendProc.msg.protocol.protocolInfoVar - timeout = sendProc.timeoutParam - - quote do: - var `delayedWriteCursor` = prepareRequest( - `peer`, `protocol`, `requestMethodId`, `responseMethodId`, - `stream`, `timeout`, `resultIdent`) - - of msgResponse: - quote do: - var `delayedWriteCursor` = prepareResponse(`peer`, `stream`) - - of msgHandshake, msgNotification: - let methodId = newLit(msg.id) - quote do: - var `delayedWriteCursor` = prepareMsg(`peer`, `methodId`, `stream`) - - proc postSerializationStep(stream: NimNode): NimNode = - newCall(bindSym "finishOuterHeader", delayedWriteCursor) - - proc sendCallGenerator(peer, bytes: NimNode): NimNode = - let - linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture" - sendMsg = bindSym "sendMsg" - sendCall = newCall(sendMsg, peer, bytes) - - if msg.kind == msgRequest: - # In RLPx requests, the returned future was allocated here and passed - # to `prepareRequest`. It's already assigned to the result variable - # of the proc, so we just wait for the sending operation to complete - # and we return in a normal way. (the waiting is done, so we can catch - # any possible errors). - quote: `linkSendFailureToReqFuture`(`sendCall`, `resultIdent`) - else: - # In normal RLPx messages, we are returning the future returned by the - # `sendMsg` call. - quote: return `sendCall` - - sendProc.useStandardBody( - preSerializationStep, - postSerializationStep, - sendCallGenerator) - -proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = - let - Option = bindSym "Option" - Peer = bindSym "Peer" - EthereumNode = bindSym "EthereumNode" - - Format = ident "SSZ" - Response = bindSym "Response" - ResponderWithId = bindSym "ResponderWithId" - perProtocolMsgId = ident "perProtocolMsgId" - - mount = bindSym "mount" - - messagePrinter = bindSym "messagePrinter" - resolveFuture = bindSym "resolveFuture" - requestResolver = bindSym "requestResolver" - resolvePendingFutures = bindSym "resolvePendingFutures" - nextMsg = bindSym "nextMsg" - initProtocol = bindSym "initProtocol" - registerMsg = bindSym "registerMsg" - handshakeImpl = bindSym "handshakeImpl" - - stream = ident "stream" - protocol = ident "protocol" - response = ident "response" - reqFutureVar = ident "reqFuture" - msgContents = ident "msgContents" - receivedMsg = ident "receivedMsg" - - ProtocolInfo = bindSym "ProtocolInfo" - P2PStream = bindSym "P2PStream" - ByteStreamVar = bindSym "ByteStreamVar" - - new result - - result.registerProtocol = bindSym "registerProtocol" - result.setEventHandlers = bindSym "setEventHandlers" - result.PeerType = Peer - result.NetworkType = EthereumNode - result.SerializationFormat = Format - - p.useRequestIds = true - result.ReqIdType = ident "uint64" - result.ResponderType = ResponderWithId - - result.afterProtocolInit = proc (p: P2PProtocol) = - p.onPeerConnected.params.add newIdentDefs(ident"handshakeStream", P2PStream) - - result.implementMsg = proc (msg: Message) = - var - msgIdLit = newLit(msg.id) - msgRecName = msg.recIdent - msgIdent = msg.ident - msgName = $msgIdent - protocol = msg.protocol - - ## - ## Implemenmt Thunk - ## - let traceMsg = when tracingEnabled: - newCall(bindSym"logReceivedMsg", peer, receivedMsg) - else: - newStmtList() - - let callResolvePendingFutures = newCall( - resolvePendingFutures, peerVar, - protocol.protocolInfoVar, - msgIdLit, - newCall("addr", receivedMsg), - reqFutureVar) - - var userHandlerParams = @[peerVar] - if msg.kind == msgRequest: - userHandlerParams.add reqIdVar - - let - thunkName = ident(msgName & "_thunk") - awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, userHandlerParams) - - msg.defineThunk quote do: - proc `thunkName`(`peerVar`: `Peer`, - `stream`: `P2PStream`, - `reqIdVar`: uint64, - `reqFutureVar`: FutureBase, - `msgContents`: `ByteStreamVar`) {.async, gcsafe.} = - var `receivedMsg` = `mount`(`Format`, `msgContents`, `msgRecName`) - `traceMsg` - `awaitUserHandler` - `callResolvePendingFutures` - - ## - ## Implement Senders and Handshake - ## - var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake)) - implementSendProcBody sendProc - - if msg.kind == msgHandshake: - discard msg.createHandshakeTemplate(sendProc.def.name, handshakeImpl, nextMsg) - - protocol.outProcRegistrations.add( - newCall(registerMsg, - protocol.protocolInfoVar, - msgIdLit, - newLit(msgName), - thunkName, - newTree(nnkBracketExpr, messagePrinter, msgRecName), - newTree(nnkBracketExpr, requestResolver, msgRecName), - newTree(nnkBracketExpr, resolveFuture, msgRecName))) - - result.implementProtocolInit = proc (protocol: P2PProtocol): NimNode = - return newCall(initProtocol, - newLit(protocol.shortName), - newLit(protocol.version), - protocol.peerInit, protocol.netInit) - diff --git a/beacon_chain/spec/crypto.nim b/beacon_chain/spec/crypto.nim index 81efbf32c..aac0c4f62 100644 --- a/beacon_chain/spec/crypto.nim +++ b/beacon_chain/spec/crypto.nim @@ -96,6 +96,8 @@ func `$`*(x: BlsValue): string = if x.kind == Real: $x.blsValue else: + # r: is short for random. The prefix must be short + # due to the mechanics of the `shortLog` function. "r:" & toHex(x.blob) func `==`*(a, b: BlsValue): bool = @@ -178,13 +180,16 @@ func bls_verify_multiple*( sig: ValidatorSig, domain: uint64): bool = let L = len(pubkeys) doAssert L == len(message_hashes) + doAssert sig.kind == Real # TODO optimize using multiPairing for pubkey_message_hash in zip(pubkeys, message_hashes): let (pubkey, message_hash) = pubkey_message_hash + doAssert pubkey.kind == Real # TODO spec doesn't say to handle this specially, but it's silly to # validate without any actual public keys. - if not pubkey.bls_verify(message_hash.data, sig, domain): + if pubkey.blsValue != VerKey() and + not sig.blsValue.verify(message_hash.data, domain, pubkey.blsValue): return false true diff --git a/beacon_chain/spec/datatypes.nim b/beacon_chain/spec/datatypes.nim index 0f38109d5..a676f60db 100644 --- a/beacon_chain/spec/datatypes.nim +++ b/beacon_chain/spec/datatypes.nim @@ -518,7 +518,7 @@ Json.useCustomSerialization(BitSeq): BitSeq reader.readValue(string).hexToSeqByte write: - writer.writeValue "0x" & value.bytes.toHex + writer.writeValue "0x" & Bytes(value).toHex template readValue*(reader: var JsonReader, value: var BitList) = type T = type(value) @@ -531,13 +531,14 @@ template init*(T: type BitList, len: int): auto = T init(BitSeq, len) template len*(x: BitList): auto = len(BitSeq(x)) template bytes*(x: BitList): auto = bytes(BitSeq(x)) template `[]`*(x: BitList, idx: auto): auto = BitSeq(x)[idx] -template `[]=`*(x: BitList, idx: auto, val: bool) = BitSeq(x)[idx] = val +template `[]=`*(x: var BitList, idx: auto, val: bool) = BitSeq(x)[idx] = val template `==`*(a, b: BitList): bool = BitSeq(a) == BitSeq(b) -template raiseBit*(x: BitList, idx: int) = raiseBit(BitSeq(x), idx) -template lowerBit*(x: BitList, idx: int) = lowerBit(BitSeq(x), idx) +template raiseBit*(x: var BitList, idx: int) = raiseBit(BitSeq(x), idx) +template lowerBit*(x: var BitList, idx: int) = lowerBit(BitSeq(x), idx) template overlaps*(a, b: BitList): bool = overlaps(BitSeq(a), BitSeq(b)) -template combine*(a, b: BitList) = combine(BitSeq(a), BitSeq(b)) +template combine*(a: var BitList, b: BitList) = combine(BitSeq(a), BitSeq(b)) template isSubsetOf*(a, b: BitList): bool = isSubsetOf(BitSeq(a), BitSeq(b)) +template `$`*(a: BitList): string = $(BitSeq(a)) when useListType: template len*[T; N](x: List[T, N]): auto = len(seq[T](x)) diff --git a/beacon_chain/ssz.nim b/beacon_chain/ssz.nim index 363318371..1c865e6e4 100644 --- a/beacon_chain/ssz.nim +++ b/beacon_chain/ssz.nim @@ -9,7 +9,7 @@ # See https://github.com/ethereum/eth2.0-specs/blob/master/specs/simple-serialize.md import - endians, stew/shims/macros, options, algorithm, math, + endians, stew/shims/macros, options, algorithm, math, options, stew/[bitops2, bitseqs, objects, varints], stew/ranges/ptr_arith, stint, faststreams/input_stream, serialization, serialization/testing/tracing, nimcrypto/sha2, blscurve, eth/common, @@ -59,6 +59,8 @@ type FixedSizedWriterCtx = object + Bytes = seq[byte] + serializationFormat SSZ, Reader = SszReader, Writer = SszWriter, @@ -95,7 +97,9 @@ template toSszType*(x: auto): auto = when x is Slot|Epoch|ValidatorIndex|enum: uint64(x) elif x is Eth2Digest: x.data elif x is BlsValue|BlsCurveType: getBytes(x) - elif x is BitSeq|BitList: bytes(x) + elif x is BitSeq|BitList: Bytes(x) + elif x is ref|ptr: toSszType x[] + elif x is Option: toSszType x.get elif x is TypeWithMaxLen: toSszType valueOf(x) elif useListType and x is List: seq[x.T](x) else: x @@ -173,7 +177,7 @@ template writeField*(w: var SszWriter, field: auto) = mixin toSszType when ctx is FixedSizedWriterCtx: - writeFixedSized(w, toSszType(field)) + writeFixedSized(w.stream, toSszType(field)) else: type FieldType = type toSszType(field) @@ -185,7 +189,7 @@ template writeField*(w: var SszWriter, let initPos = w.stream.pos trs "WRITING VAR SIZE VALUE OF TYPE ", name(FieldType) when FieldType is BitSeq: - trs "BIT SEQ ", field.bytes + trs "BIT SEQ ", Bytes(field) writeVarSizeType(w, toSszType(field)) ctx.offset += w.stream.pos - initPos @@ -200,7 +204,9 @@ func writeVarSizeType(w: var SszWriter, value: auto) = when T is seq|string|openarray: type E = ElemType(T) - when isFixedSize(E): + const isFixed = when E is Option: false + else: isFixedSize(E) + when isFixed: trs "WRITING LIST WITH FIXED SIZE ELEMENTS" for elem in value: w.stream.writeFixedSized toSszType(elem) @@ -211,6 +217,10 @@ func writeVarSizeType(w: var SszWriter, value: auto) = var cursor = w.stream.delayFixedSizeWrite offset for elem in value: cursor.writeFixedSized uint32(offset) + when elem is Option: + if not isSome(elem): continue + elif elem is ptr|ref: + if isNil(elem): continue let initPos = w.stream.pos w.writeVarSizeType toSszType(elem) offset += w.stream.pos - initPos @@ -448,8 +458,8 @@ func bitlistHashTreeRoot(merkelizer: SszChunksMerkelizer, x: BitSeq): Eth2Digest trs "CHUNKIFYING BIT SEQ WITH LIMIT ", merkelizer.limit var - totalBytes = x.bytes.len - lastCorrectedByte = x.bytes[^1] + totalBytes = Bytes(x).len + lastCorrectedByte = Bytes(x)[^1] if lastCorrectedByte == byte(1): if totalBytes == 1: @@ -461,7 +471,7 @@ func bitlistHashTreeRoot(merkelizer: SszChunksMerkelizer, x: BitSeq): Eth2Digest getZeroHashWithoutSideEffect(0)) # this is the mixed length totalBytes -= 1 - lastCorrectedByte = x.bytes[^2] + lastCorrectedByte = Bytes(x)[^2] else: let markerPos = log2trunc(lastCorrectedByte) lastCorrectedByte.lowerBit(markerPos) @@ -480,14 +490,14 @@ func bitlistHashTreeRoot(merkelizer: SszChunksMerkelizer, x: BitSeq): Eth2Digest chunkStartPos = i * bytesPerChunk chunkEndPos = chunkStartPos + bytesPerChunk - 1 - merkelizer.addChunk x.bytes.toOpenArray(chunkEndPos, chunkEndPos) + merkelizer.addChunk Bytes(x).toOpenArray(chunkEndPos, chunkEndPos) var lastChunk: array[bytesPerChunk, byte] chunkStartPos = fullChunks * bytesPerChunk for i in 0 .. bytesInLastChunk - 2: - lastChunk[i] = x.bytes[chunkStartPos + i] + lastChunk[i] = Bytes(x)[chunkStartPos + i] lastChunk[bytesInLastChunk - 1] = lastCorrectedByte diff --git a/beacon_chain/ssz/bytes_reader.nim b/beacon_chain/ssz/bytes_reader.nim index d9685f94d..b50339818 100644 --- a/beacon_chain/ssz/bytes_reader.nim +++ b/beacon_chain/ssz/bytes_reader.nim @@ -1,5 +1,5 @@ import - endians, typetraits, + endians, typetraits, options, stew/[objects, bitseqs], serialization/testing/tracing, ../spec/[digest, datatypes], ./types @@ -7,6 +7,14 @@ template setLen[R, T](a: var array[R, T], length: int) = if length != a.len: raise newException(MalformedSszError, "SSZ input of insufficient size") +template assignNullValue(loc: untyped, T: type): auto = + when T is ref|ptr: + loc = nil + elif T is Option: + loc = T() + else: + raise newException(MalformedSszError, "SSZ list element of zero size") + # fromSszBytes copies the wire representation to a Nim variable, # assuming there's enough data in the buffer func fromSszBytes*(T: type SomeInteger, data: openarray[byte]): T = @@ -61,6 +69,13 @@ proc readSszValue*(input: openarray[byte], T: type): T = when useListType and result is List: type ElemType = type result[0] result = T readSszValue(input, seq[ElemType]) + elif result is ptr|ref: + if input.len > 0: + new result + result[] = readSszValue(input, type(result[])) + elif result is Option: + if input.len > 0: + result = some readSszValue(input, result.T) elif result is string|seq|openarray|array: type ElemType = type result[0] when ElemType is byte|char: @@ -96,7 +111,10 @@ proc readSszValue*(input: openarray[byte], T: type): T = result.setLen resultLen for i in 1 ..< resultLen: let nextOffset = readOffset(i * offsetSize) - result[i - 1] = readSszValue(input[offset ..< nextOffset], ElemType) + if nextOffset == offset: + assignNullValue result[i - 1], ElemType + else: + result[i - 1] = readSszValue(input[offset ..< nextOffset], ElemType) offset = nextOffset result[resultLen - 1] = readSszValue(input[offset ..< input.len], ElemType) diff --git a/beacon_chain/ssz/types.nim b/beacon_chain/ssz/types.nim index d50bd92e7..6e4a541bf 100644 --- a/beacon_chain/ssz/types.nim +++ b/beacon_chain/ssz/types.nim @@ -1,5 +1,5 @@ import - tables, + tables, options, stew/shims/macros, stew/[objects, bitseqs], serialization/[object_serialization, errors] @@ -85,7 +85,7 @@ template ElemType*(T: type[seq|string|List]): untyped = func isFixedSize*(T0: type): bool {.compileTime.} = mixin toSszType, enumAllSerializedFields - when T0 is openarray: + when T0 is openarray|Option|ref|ptr: return false else: type T = type toSszType(default T0) @@ -110,7 +110,7 @@ func fixedPortionSize*(T0: type): int {.compileTime.} = type E = ElemType(T) when isFixedSize(E): elementCount * fixedPortionSize(E) else: elementCount * offsetSize - elif T is seq|string|openarray: offsetSize + elif T is seq|string|openarray|ref|ptr|Option: offsetSize elif T is object|tuple: var res = 0 enumAllSerializedFields(T): diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index b8b51273d..73ab214bd 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -19,8 +19,6 @@ type ValidatorSet = seq[Validator] BeaconSyncState* = ref object - networkId*: uint8 - chainId*: uint64 node*: BeaconNode db*: BeaconChainDB @@ -78,15 +76,13 @@ proc getBeaconBlocks*(peer: Peer, backward: bool): Future[Option[seq[BeaconBlock]]] {.gcsafe, async.} p2pProtocol BeaconSync(version = 1, - shortName = "bcs", + rlpxName = "bcs", networkState = BeaconSyncState): onPeerConnected do (peer: Peer): let protocolVersion = 1 # TODO: Spec doesn't specify this yet node = peer.networkState.node - networkId = peer.networkState.networkId - chainId = peer.networkState.networkId blockPool = node.blockPool finalizedHead = blockPool.finalizedHead headBlock = blockPool.head.blck @@ -94,11 +90,11 @@ p2pProtocol BeaconSync(version = 1, bestSlot = headBlock.slot latestFinalizedEpoch = finalizedHead.slot.compute_epoch_of_slot() - let m = await peer.hello(networkId, chainId, finalizedHead.blck.root, - latestFinalizedEpoch, bestRoot, bestSlot, - timeout = 10.seconds) + let m = await peer.hello(node.forkVersion, + finalizedHead.blck.root, latestFinalizedEpoch, + bestRoot, bestSlot, timeout = 10.seconds) - if m.networkId != networkId: + if m.forkVersion != node.forkVersion: await peer.disconnect(IrrelevantNetwork) return @@ -142,22 +138,59 @@ p2pProtocol BeaconSync(version = 1, handshake: proc hello( peer: Peer, - networkId: uint8, - chainId: uint64, + fork_version: array[4, byte], latestFinalizedRoot: Eth2Digest, latestFinalizedEpoch: Epoch, bestRoot: Eth2Digest, - bestSlot: Slot) + bestSlot: Slot) {. + libp2pProtocol("/eth2/beacon_chain/req/hello", 1).} - proc goodbye(peer: Peer, reason: DisconnectionReason) + proc goodbye( + peer: Peer, + reason: DisconnectionReason) {. + libp2pProtocol("/eth2/beacon_chain/req/goodbye", 1).} - nextId 10 + requestResponse: + proc getBeaconBlocks( + peer: Peer, + headBlockRoot: Eth2Digest, + count: uint64, + step: uint64) {. + libp2pProtocol("/eth2/beacon_chain/req/beacon_blocks", 1).} = + + var blocks = newSeq[Option[BeaconBlock]](int count) + let db = peer.networkState.db + + blocks[0] = db.getBlock(headBlockRoot) + if isSome(blocks[0]): + for i in uint64(1) ..< count: + blocks[i.int] = db.getBlock(Slot(blocks[0].get.slot.uint64 + i * step)) + + await response.send(blocks) + + proc getRecentBeaconBlocks( + peer: Peer, + blockRoots: openarray[Eth2Digest]) {. + libp2pProtocol("/eth2/beacon_chain/req/recent_beacon_blocks", 1).} = + + var blocks = newSeqOfCap[Option[BeaconBlock]](blockRoots.len) + let db = peer.networkState.db + + for root in blockRoots: + blocks.add db.getBlock(root) + + await response.send(blocks) + + proc beaconBlocks( + peer: Peer, + blocks: openarray[Option[BeaconBlock]]) requestResponse: proc getBeaconBlockRoots( peer: Peer, fromSlot: Slot, - maxRoots: uint64) = + maxRoots: uint64) {. + libp2pProtocol("/eth2/beacon_chain/req/beacon_block_roots", 1).} = let maxRoots = min(MaxRootsToRequest, maxRoots) var s = fromSlot var roots = newSeqOfCap[BlockRootSlot](maxRoots) @@ -170,7 +203,9 @@ p2pProtocol BeaconSync(version = 1, s += 1 await response.send(roots) - proc beaconBlockRoots(peer: Peer, roots: openarray[BlockRootSlot]) + proc beaconBlockRoots( + peer: Peer, + roots: openarray[BlockRootSlot]) requestResponse: proc getBeaconBlockHeaders( @@ -179,7 +214,8 @@ p2pProtocol BeaconSync(version = 1, slot: Slot, maxHeaders: uint64, skipSlots: uint64, - backward: bool) = + backward: bool) {. + libp2pProtocol("/eth2/beacon_chain/req/beacon_block_headers", 1).} = let maxHeaders = min(MaxHeadersToRequest, maxHeaders) var headers: seq[BeaconBlockHeader] let db = peer.networkState.db @@ -222,12 +258,55 @@ p2pProtocol BeaconSync(version = 1, await response.send(headers) - proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader]) + proc beaconBlockHeaders( + peer: Peer, + blockHeaders: openarray[BeaconBlockHeader]) + + # TODO move this at the bottom, because it's not in the spec yet, but it will + # consume a `method_id` + requestResponse: + proc getAncestorBlocks( + peer: Peer, + needed: openarray[FetchRecord]) {. + libp2pProtocol("/eth2/beacon_chain/req/ancestor_blocks", 1).} = + var resp = newSeqOfCap[BeaconBlock](needed.len) + let db = peer.networkState.db + var neededRoots = initSet[Eth2Digest]() + for rec in needed: neededRoots.incl(rec.root) + + for rec in needed: + if (var blck = db.getBlock(rec.root); blck.isSome()): + # TODO validate historySlots + let firstSlot = blck.get().slot - rec.historySlots + + for i in 0..= MaxAncestorBlocksResponse: + break + + if blck.get().parent_root in neededRoots: + # Don't send duplicate blocks, if neededRoots has roots that are + # in the same chain + break + + if (blck = db.getBlock(blck.get().parent_root); + blck.isNone() or blck.get().slot < firstSlot): + break + + if resp.len >= MaxAncestorBlocksResponse: + break + + await response.send(resp) + + proc ancestorBlocks( + peer: Peer, + blocks: openarray[BeaconBlock]) requestResponse: proc getBeaconBlockBodies( peer: Peer, - blockRoots: openarray[Eth2Digest]) = + blockRoots: openarray[Eth2Digest]) {. + libp2pProtocol("/eth2/beacon_chain/req/beacon_block_bodies", 1).} = # TODO: Validate blockRoots.len var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len) let db = peer.networkState.db diff --git a/beacon_chain/version.nim b/beacon_chain/version.nim index dc97dc673..d80ec6171 100644 --- a/beacon_chain/version.nim +++ b/beacon_chain/version.nim @@ -1,23 +1,21 @@ type NetworkBackendType* = enum - libp2pSpecBackend - libp2pNativeBackend + libp2pBackend rlpxBackend const - network_type {.strdefine.} = "libp2p_native" + network_type {.strdefine.} = "libp2p" networkBackend* = when network_type == "rlpx": rlpxBackend - elif network_type == "libp2p_spec": libp2pSpecBackend - elif network_type == "libp2p_native": libp2pNativeBackend - else: {.fatal: "The 'network_type' should be one of 'libp2p_spec', 'libp2p_native' or 'rlpx'" .} + elif network_type == "libp2p": libp2pBackend + else: {.fatal: "The 'network_type' should be either 'libp2p' or 'rlpx'" .} const versionMajor* = 0 - versionMinor* = 2 + versionMinor* = 3 versionBuild* = 0 - semanticVersion* = 1 + semanticVersion* = 2 # Bump this up every time a breaking change is introduced # Clients having different semantic versions won't be able # to join the same testnets. diff --git a/scripts/testnet1.env b/scripts/testnet1.env index f391041c6..89bf350f5 100644 --- a/scripts/testnet1.env +++ b/scripts/testnet1.env @@ -1,5 +1,5 @@ NETWORK_ID=20 -NETWORK_TYPE=libp2p_native +NETWORK_TYPE=libp2p SHARD_COUNT=16 SLOTS_PER_EPOCH=16 SECONDS_PER_SLOT=30