diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 2d8e906ff..70c56af53 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -190,17 +190,15 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async var bootNodes: seq[ENode] var bootEnrs: seq[enr.Record] - for node in conf.bootstrapNodes: addBootstrapNode(node, bootNodes, bootEnrs, ourPubKey) + for node in conf.bootstrapNodes: + addBootstrapNode(node, bootNodes, bootEnrs, ourPubKey) loadBootstrapFile(string conf.bootstrapNodesFile, bootNodes, bootEnrs, ourPubKey) let persistentBootstrapFile = conf.dataDir / "bootstrap_nodes.txt" if fileExists(persistentBootstrapFile): loadBootstrapFile(persistentBootstrapFile, bootNodes, bootEnrs, ourPubKey) - let - network = await createEth2Node(conf, bootNodes) - addressFile = string(conf.dataDir) / "beacon_node.address" - network.saveConnectionAddressFile(addressFile) + let network = await createEth2Node(conf) let rpcServer = if conf.rpcEnabled: RpcServer.init(conf.rpcAddress, conf.rpcPort) @@ -252,13 +250,12 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async return res proc connectToNetwork(node: BeaconNode) {.async.} = - if node.bootstrapNodes.len > 0: - info "Connecting to bootstrap nodes", bootstrapNodes = node.bootstrapNodes + if node.bootstrapEnrs.len > 0: + info "Connecting to bootstrap nodes", bootstrapEnrs = node.bootstrapEnrs else: info "Waiting for connections" - await node.network.connectToNetwork(node.bootstrapNodes, - node.bootstrapEnrs) + await node.network.connectToNetwork(node.bootstrapEnrs) template findIt(s: openarray, predicate: untyped): int = var res = -1 diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 7629c3b84..57655b3c9 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -38,7 +38,6 @@ type PrivateKey* = crypto.PrivateKey Bytes = seq[byte] - P2PStream = Connection # TODO Is this really needed? Eth2Node* = ref object of RootObj @@ -70,7 +69,7 @@ type UntypedResponder = object peer*: Peer - stream*: P2PStream + stream*: Connection Responder*[MsgType] = distinct UntypedResponder @@ -80,8 +79,6 @@ type # Private fields: libp2pCodecName: string protocolMounter*: MounterProc - printer*: MessageContentPrinter - nextMsgResolver*: NextMsgResolver ProtocolInfoObj* = object name*: string @@ -104,12 +101,11 @@ type PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.} - HandshakeStep* = proc(peer: Peer, stream: P2PStream): Future[void] {.gcsafe.} + HandshakeStep* = proc(peer: Peer, conn: Connection): Future[void] {.gcsafe.} DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} ThunkProc* = LPProtoHandler MounterProc* = proc(network: Eth2Node) {.gcsafe.} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} - NextMsgResolver* = proc(msgData: SszReader, future: FutureBase) {.gcsafe.} DisconnectionReason* = enum ClientShutDown @@ -148,10 +144,10 @@ declareCounter gossip_messages_received, "Number of gossip messages received by this peer" declarePublicGauge libp2p_successful_dials, - "Number of successfully dialed peers" + "Number of successfully dialed peers" declarePublicGauge libp2p_peers, - "Number of active libp2p peers" + "Number of active libp2p peers" template libp2pProtocol*(name: string, version: int) {.pragma.} @@ -161,21 +157,12 @@ chronicles.formatIt(Peer): $it template remote*(peer: Peer): untyped = peer.info.peerId -# TODO: This exists only as a compatibility layer between the daemon -# APIs and the native LibP2P ones. It won't be necessary once the -# daemon is removed. -# -template writeAllBytes(stream: P2PStream, bytes: seq[byte]): untyped = - write(stream, bytes) - template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped = dial(node.switch, peer.info, protocolId) -proc peer(stream: P2PStream): PeerID = +func peerId(conn: Connection): PeerID = # TODO: Can this be `nil`? - stream.peerInfo.peerId -# -# End of compatibility layer + conn.peerInfo.peerId proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.} @@ -183,11 +170,12 @@ proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} = let peerId = peerInfo.peerId result = node.peerPool.getOrDefault(peerId) if result == nil: + # TODO: We should register this peer in the pool! result = Peer.init(node, peerInfo) -proc peerFromStream(network: Eth2Node, stream: P2PStream): Peer {.gcsafe.} = +proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} = # TODO: Can this be `nil`? - return network.getPeer(stream.peerInfo) + return network.getPeer(conn.peerInfo) proc getKey*(peer: Peer): PeerID {.inline.} = result = peer.info.peerId @@ -208,9 +196,9 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, peer.network.peerPool.release(peer) peer.info.close() -proc safeClose(stream: P2PStream) {.async.} = - if not stream.closed: - await close(stream) +proc safeClose(conn: Connection) {.async.} = + if not conn.closed: + await close(conn) proc handleIncomingPeer*(peer: Peer) @@ -243,18 +231,18 @@ proc disconnectAndRaise(peer: Peer, await peer.disconnect(r) raisePeerDisconnected(msg, r) -proc readChunk(stream: P2PStream, +proc readChunk(conn: Connection, MsgType: type, withResponseCode: bool, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} -proc readSizePrefix(stream: P2PStream, +proc readSizePrefix(conn: Connection, deadline: Future[void]): Future[int] {.async.} = trace "about to read msg size prefix" var parser: VarintParser[uint64, ProtoBuf] while true: var nextByte: byte - var readNextByte = stream.readExactly(addr nextByte, 1) + var readNextByte = conn.readExactly(addr nextByte, 1) await readNextByte or deadline if not readNextByte.finished: trace "size prefix byte not received in time" @@ -274,7 +262,7 @@ proc readSizePrefix(stream: P2PStream, of Incomplete: continue -proc readMsgBytes(stream: P2PStream, +proc readMsgBytes(conn: Connection, withResponseCode: bool, deadline: Future[void]): Future[Bytes] {.async.} = trace "about to read message bytes", withResponseCode @@ -283,7 +271,7 @@ proc readMsgBytes(stream: P2PStream, if withResponseCode: var responseCode: byte trace "about to read response code" - var readResponseCode = stream.readExactly(addr responseCode, 1) + var readResponseCode = conn.readExactly(addr responseCode, 1) await readResponseCode or deadline if not readResponseCode.finished: @@ -299,7 +287,7 @@ proc readMsgBytes(stream: P2PStream, case ResponseCode(responseCode) of InvalidRequest, ServerError: - let responseErrMsg = await readChunk(stream, string, false, deadline) + let responseErrMsg = await conn.readChunk(string, false, deadline) debug "P2P request resulted in error", responseErrMsg return @@ -307,20 +295,20 @@ proc readMsgBytes(stream: P2PStream, # The response is OK, the execution continues below discard - var sizePrefix = await readSizePrefix(stream, deadline) + var sizePrefix = await conn.readSizePrefix(deadline) trace "got msg size prefix", sizePrefix if sizePrefix == -1: - debug "Failed to read an incoming message size prefix", peer = stream.peer + debug "Failed to read an incoming message size prefix", peer = conn.peerId return if sizePrefix == 0: - debug "Received SSZ with zero size", peer = stream.peer + debug "Received SSZ with zero size", peer = conn.peerId return trace "about to read msg bytes", len = sizePrefix var msgBytes = newSeq[byte](sizePrefix) - var readBody = stream.readExactly(addr msgBytes[0], sizePrefix) + var readBody = conn.readExactly(addr msgBytes[0], sizePrefix) await readBody or deadline if not readBody.finished: trace "msg bytes not received in time" @@ -332,11 +320,11 @@ proc readMsgBytes(stream: P2PStream, except TransportIncompleteError: return @[] -proc readChunk(stream: P2PStream, +proc readChunk(conn: Connection, MsgType: type, withResponseCode: bool, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = - var msgBytes = await stream.readMsgBytes(withResponseCode, deadline) + var msgBytes = await conn.readMsgBytes(withResponseCode, deadline) try: if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType) @@ -346,7 +334,7 @@ proc readChunk(stream: P2PStream, return proc readResponse( - stream: P2PStream, + conn: Connection, MsgType: type, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = @@ -354,13 +342,13 @@ proc readResponse( type E = ElemType(MsgType) var results: MsgType while true: - let nextRes = await readChunk(stream, E, true, deadline) + let nextRes = await conn.readChunk(E, true, deadline) if nextRes.isNone: break results.add nextRes.get if results.len > 0: return some(results) else: - return await readChunk(stream, MsgType, true, deadline) + return await conn.readChunk(MsgType, true, deadline) proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = var s = init OutputStream @@ -370,7 +358,7 @@ proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = s.getOutput proc sendErrorResponse(peer: Peer, - stream: P2PStream, + conn: Connection, err: ref SerializationError, msgName: string, msgBytes: Bytes) {.async.} = @@ -378,18 +366,18 @@ proc sendErrorResponse(peer: Peer, peer, msgName, msgBytes, errMsg = err.formatMsg("") let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg")) - await stream.writeAllBytes(responseBytes) - await stream.close() + await conn.write(responseBytes) + await conn.close() proc sendErrorResponse(peer: Peer, - stream: P2PStream, + conn: Connection, responseCode: ResponseCode, errMsg: string) {.async.} = debug "Error processing request", peer, responseCode, errMsg let responseBytes = encodeErrorMsg(ServerError, errMsg) - await stream.writeAllBytes(responseBytes) - await stream.close() + await conn.write(responseBytes) + await conn.close() proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = var deadline = sleepAsync RESP_TIMEOUT @@ -409,7 +397,7 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. s.appendVarint requestBytes.len.uint64 s.append requestBytes let bytes = s.getOutput - await stream.writeAllBytes(bytes) + await stream.write(bytes) # TODO There is too much duplication in the responder functions, but # I hope to reduce this when I increse the reliance on output streams. @@ -419,14 +407,14 @@ proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async s.appendVarint payload.len.uint64 s.append payload let bytes = s.getOutput - await responder.stream.writeAllBytes(bytes) + await responder.stream.write(bytes) proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} = var s = init OutputStream s.append byte(Success) s.appendValue SSZ, sizePrefixed(val) let bytes = s.getOutput - await responder.stream.writeAllBytes(bytes) + await responder.stream.write(bytes) proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} = var s = init OutputStream @@ -435,7 +423,7 @@ proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async. s.appendValue SSZ, sizePrefixed(chunk) let bytes = s.getOutput - await responder.stream.writeAllBytes(bytes) + await responder.stream.write(bytes) proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, @@ -460,14 +448,14 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, s.appendVarint requestBytes.len.uint64 s.append requestBytes let bytes = s.getOutput - await stream.writeAllBytes(bytes) + await stream.write(bytes) # Read the response return await stream.readResponse(ResponseMsg, deadline) proc init*[MsgType](T: type Responder[MsgType], - peer: Peer, stream: P2PStream): T = - T(UntypedResponder(peer: peer, stream: stream)) + peer: Peer, conn: Connection): T = + T(UntypedResponder(peer: peer, stream: conn)) template write*[M](r: var Responder[M], val: auto): auto = mixin send @@ -539,7 +527,7 @@ proc implementSendProcBody(sendProc: SendProc) = sendProc.useStandardBody(nil, nil, sendCallGenerator) -proc handleIncomingStream(network: Eth2Node, stream: P2PStream, +proc handleIncomingStream(network: Eth2Node, conn: Connection, MsgType, Format: distinct type) {.async, gcsafe.} = mixin callUserHandler, RecType const msgName = typetraits.name(MsgType) @@ -550,21 +538,21 @@ proc handleIncomingStream(network: Eth2Node, stream: P2PStream, # when chronicles.runtimeFilteringEnabled: # setLogLevel(LogLevel.TRACE) # defer: setLogLevel(LogLevel.DEBUG) - # trace "incoming " & `msgNameLit` & " stream" + # trace "incoming " & `msgNameLit` & " conn" - let peer = peerFromStream(network, stream) + let peer = peerFromStream(network, conn) handleIncomingPeer(peer) defer: - await safeClose(stream) + await safeClose(conn) let deadline = sleepAsync RESP_TIMEOUT - msgBytes = await readMsgBytes(stream, false, deadline) + msgBytes = await readMsgBytes(conn, false, deadline) if msgBytes.len == 0: - await sendErrorResponse(peer, stream, ServerError, readTimeoutErrorMsg) + await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg) return type MsgRec = RecType(MsgType) @@ -572,21 +560,21 @@ proc handleIncomingStream(network: Eth2Node, stream: P2PStream, try: msg = decode(Format, msgBytes, MsgRec) except SerializationError as err: - await sendErrorResponse(peer, stream, err, msgName, msgBytes) + await sendErrorResponse(peer, conn, err, msgName, msgBytes) return except Exception as err: # TODO. This is temporary code that should be removed after interop. # It can be enabled only in certain diagnostic builds where it should # re-raise the exception. debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName - await sendErrorResponse(peer, stream, ServerError, err.msg) + await sendErrorResponse(peer, conn, ServerError, err.msg) raise err try: logReceivedMsg(peer, MsgType(msg)) - await callUserHandler(peer, stream, msg) + await callUserHandler(peer, conn, msg) except CatchableError as err: - await sendErrorResponse(peer, stream, ServerError, err.msg) + await sendErrorResponse(peer, conn, ServerError, err.msg) proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = let network = peer.network @@ -727,21 +715,18 @@ proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer = proc registerMsg(protocol: ProtocolInfo, name: string, mounter: MounterProc, - libp2pCodecName: string, - printer: MessageContentPrinter) = + libp2pCodecName: string) = protocol.messages.add MessageInfo(name: name, protocolMounter: mounter, - libp2pCodecName: libp2pCodecName, - printer: printer) + libp2pCodecName: libp2pCodecName) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var Format = ident "SSZ" Responder = bindSym "Responder" - P2PStream = bindSym "P2PStream" + Connection = bindSym "Connection" Peer = bindSym "Peer" Eth2Node = bindSym "Eth2Node" - messagePrinter = bindSym "messagePrinter" registerMsg = bindSym "registerMsg" initProtocol = bindSym "initProtocol" msgVar = ident "msg" @@ -761,7 +746,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = result.ResponderType = Responder result.afterProtocolInit = proc (p: P2PProtocol) = - p.onPeerConnected.params.add newIdentDefs(streamVar, P2PStream) + p.onPeerConnected.params.add newIdentDefs(streamVar, Connection) result.implementMsg = proc (msg: Message) = let @@ -775,13 +760,13 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest: # Request procs need an extra param - the stream where the response # should be written: - msg.userHandler.params.insert(2, newIdentDefs(streamVar, P2PStream)) + msg.userHandler.params.insert(2, newIdentDefs(streamVar, Connection)) msg.initResponderCall.add streamVar ## ## Implement the Thunk: ## - ## The protocol handlers in nim-libp2p receive only a `P2PStream` + ## The protocol handlers in nim-libp2p receive only a `Connection` ## parameter and there is no way to access the wider context (such ## as the current `Switch`). In our handlers, we may need to list all ## peers in the current network, so we must keep a reference to the @@ -799,12 +784,12 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = if msg.userHandler != nil: protocol.outRecvProcs.add quote do: template `callUserHandler`(`peerVar`: `Peer`, - `streamVar`: `P2PStream`, + `streamVar`: `Connection`, `msgVar`: `MsgRecName`): untyped = `userHandlerCall` proc `protocolMounterName`(`networkVar`: `Eth2Node`) = - proc thunk(`streamVar`: `P2PStream`, + proc thunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, `MsgStrongRecName`, `Format`) @@ -830,8 +815,7 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = protocol.protocolInfoVar, msgNameLit, mounter, - codecNameLit, - newTree(nnkBracketExpr, messagePrinter, MsgRecName))) + codecNameLit)) result.implementProtocolInit = proc (p: P2PProtocol): NimNode = return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit) @@ -903,8 +887,7 @@ proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair = KeyPair(seckey: privKey, pubkey: privKey.getKey()) -proc createEth2Node*(conf: BeaconNodeConf, - bootstrapNodes: seq[ENode]): Future[Eth2Node] {.async.} = +proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async.} = var (extIp, extTcpPort, _) = setupNat(conf) hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort) @@ -912,8 +895,7 @@ proc createEth2Node*(conf: BeaconNodeConf, else: @[tcpEndPoint(extIp, extTcpPort)] info "Initializing networking", hostAddress, - announcedAddresses, - bootstrapNodes + announcedAddresses let keys = conf.getPersistentNetKeys # TODO nim-libp2p still doesn't have support for announcing addresses @@ -924,9 +906,14 @@ proc createEth2Node*(conf: BeaconNodeConf, result = Eth2Node.init(conf, switch, extIp, keys.seckey.asEthKey) proc getPersistenBootstrapAddr*(conf: BeaconNodeConf, - ip: IpAddress, port: Port): ENode = - let pair = getPersistentNetKeys(conf) - initENode(pair.pubkey.skkey, Address(ip: ip, udpPort: port)) + ip: IpAddress, port: Port): enr.Record = + let + pair = getPersistentNetKeys(conf) + enode = initENode(pair.pubkey.skkey, Address(ip: ip, udpPort: port)) + + return enr.Record.init(1'u64, # sequence number + pair.seckey.asEthKey, + enode.address) proc shortForm*(id: KeyPair): string = $PeerID.init(id.pubkey) @@ -938,7 +925,6 @@ proc toPeerInfo(enode: ENode): PeerInfo = return PeerInfo.init(peerId, addresses) proc connectToNetwork*(node: Eth2Node, - bootstrapNodes: seq[ENode], bootstrapEnrs: seq[enr.Record]) {.async.} = for bootstrapNode in bootstrapEnrs: debug "Adding known peer", peer = bootstrapNode @@ -954,10 +940,6 @@ proc connectToNetwork*(node: Eth2Node, traceAsyncErrors checkIfConnectedToBootstrapNode() -proc saveConnectionAddressFile*(node: Eth2Node, filename: string) = - writeFile(filename, $node.switch.peerInfo.addrs[0] & "/p2p/" & - node.switch.peerInfo.id) - func peersCount*(node: Eth2Node): int = len(node.peerPool) diff --git a/tests/simulation/start.sh b/tests/simulation/start.sh index 2bdd61ec7..314ba3192 100755 --- a/tests/simulation/start.sh +++ b/tests/simulation/start.sh @@ -76,8 +76,8 @@ fi rm -f beacon_node.log # Delete any leftover address files from a previous session -if [ -f "${MASTER_NODE_ADDRESS_FILE}" ]; then - rm "${MASTER_NODE_ADDRESS_FILE}" +if [ -f "${MASTER_NODE_PID_FILE}" ]; then + rm "${MASTER_NODE_PID_FILE}" fi # to allow overriding the program names @@ -137,7 +137,7 @@ fi for i in $(seq $MASTER_NODE -1 $TOTAL_USER_NODES); do if [[ "$i" != "$MASTER_NODE" && "$USE_MULTITAIL" == "no" ]]; then # Wait for the master node to write out its address file - while [ ! -f "${MASTER_NODE_ADDRESS_FILE}" ]; do + while [ ! -f "${MASTER_NODE_PID_FILE}" ]; do sleep 0.1 done fi diff --git a/tests/simulation/vars.sh b/tests/simulation/vars.sh index 977878408..f3837a495 100644 --- a/tests/simulation/vars.sh +++ b/tests/simulation/vars.sh @@ -33,7 +33,7 @@ NETWORK_BOOTSTRAP_FILE="${SIMULATION_DIR}/bootstrap_nodes.txt" BEACON_NODE_BIN="${SIMULATION_DIR}/beacon_node" BOOTSTRAP_NODE_BIN="${SIMULATION_DIR}/bootstrap_node" DEPLOY_DEPOSIT_CONTRACT_BIN="${SIMULATION_DIR}/deploy_deposit_contract" -MASTER_NODE_ADDRESS_FILE="${SIMULATION_DIR}/node-${MASTER_NODE}/beacon_node.address" +MASTER_NODE_PID_FILE="${SIMULATION_DIR}/node-${MASTER_NODE}/beacon_node.pid" BASE_P2P_PORT=30000 BASE_RPC_PORT=7000 diff --git a/tests/simulation/wait_master_node.sh b/tests/simulation/wait_master_node.sh index 6a9c092cd..dc5d08197 100755 --- a/tests/simulation/wait_master_node.sh +++ b/tests/simulation/wait_master_node.sh @@ -1,8 +1,8 @@ #!/bin/bash -if [ ! -f "${MASTER_NODE_ADDRESS_FILE}" ]; then +if [ ! -f "${MASTER_NODE_PID_FILE}" ]; then echo Waiting for master node... - while [ ! -f "${MASTER_NODE_ADDRESS_FILE}" ]; do + while [ ! -f "${MASTER_NODE_PID_FILE}" ]; do sleep 0.1 done fi diff --git a/tests/test_peer_connection.nim b/tests/test_peer_connection.nim index 32ab23858..91e919cc3 100644 --- a/tests/test_peer_connection.nim +++ b/tests/test_peer_connection.nim @@ -35,5 +35,5 @@ asyncTest "connect two nodes": c2.nat = "none" var n2 = await createEth2Node(c2) - await n2.connectToNetwork(bootstrapNodes = @[n1PersistentAddress]) + await n2.connectToNetwork(@[n1PersistentAddress])