From d16e127daff5644e9ef291b3f7b08aed7bcbd255 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 26 Nov 2020 20:23:45 +0100 Subject: [PATCH] Peer stuff (#2084) * Revert "Revert "Full "node" RPC calls implementation and fixes to peer lifetime states. (#2065)" (#2082)" This reverts commit 7cc3dc8027a46cb9dee1ae56534880010151481e. * fix nil disconnectedFut crash * fixes don't resetPeer, it causes peer miscounts * disconnect disconnecting peers ...when there's a race. * avoid connection spamming * never decrease SeenTable timeout * only recover ENR for known peers * seen only when really disconnected --- beacon_chain/eth2_network.nim | 265 +++++++++++++----- beacon_chain/rpc/node_api.nim | 240 ++++++++++++++-- .../spec/eth2_apis/callsigs_types.nim | 27 +- beacon_chain/sync_manager.nim | 10 + beacon_chain/sync_protocol.nim | 4 + vendor/nim-chronos | 2 +- 6 files changed, 454 insertions(+), 94 deletions(-) diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index f199faabb..9cec8a770 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -72,7 +72,7 @@ type connTable: HashSet[PeerID] forkId: ENRForkID rng*: ref BrHmacDrbgContext - peers: Table[PeerID, Peer] + peers*: Table[PeerID, Peer] EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers @@ -95,12 +95,13 @@ type discoveryId*: Eth2DiscoveryId connectionState*: ConnectionState protocolStates*: seq[RootRef] - maxInactivityAllowed*: Duration netThroughput: AverageThroughput score*: int requestQuota*: float lastReqTime*: Moment connections*: int + enr*: Option[enr.Record] + direction*: PeerType disconnectedFut: Future[void] PeerAddr* = object @@ -240,6 +241,11 @@ const ## Period of time for `FaultOnError` error reason. SeenTablePenaltyError* = 60.minutes ## Period of time for peers which score below or equal to zero. + SeenTableTimeReconnect* = 1.minutes + ## Minimal time between disconnection and reconnection attempt + + ResolvePeerTimeout* = 1.minutes + ## Maximum time allowed for peer resolve process. template neterr(kindParam: Eth2NetworkingErrorKind): auto = err(type(result), Eth2NetworkingError(kind: kindParam)) @@ -263,6 +269,18 @@ declarePublicCounter nbc_timeout_dials, declarePublicGauge nbc_peers, "Number of active libp2p peers" +declarePublicCounter nbc_successful_discoveries, + "Number of successfull discoveries" + +declarePublicCounter nbc_failed_discoveries, + "Number of failed discoveries" + +const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0] + +declareHistogram nbc_resolve_time, + "Time(s) used while resolving peer information", + buckets = delayBuckets + const snappy_implementation {.strdefine.} = "libp2p" @@ -326,8 +344,8 @@ proc getKey*(peer: Peer): PeerID {.inline.} = peer.info.peerId proc getFuture*(peer: Peer): Future[void] {.inline.} = - if peer.disconnectedFut.isNil: - peer.disconnectedFut = newFuture[void]() + if isNil(peer.disconnectedFut): + peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") peer.disconnectedFut proc getScore*(a: Peer): int = @@ -419,7 +437,11 @@ proc addSeen*(network: ETh2Node, peerId: PeerID, period: chronos.Duration) = ## Adds peer with PeerID ``peerId`` to SeenTable and timeout ``period``. let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period) - network.seenTable[peerId] = item + withValue(network.seenTable, peerId, entry) do: + if entry.stamp < item.stamp: + entry.stamp = item.stamp + do: + network.seenTable[peerId] = item proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = @@ -439,7 +461,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason, SeenTablePenaltyError peer.network.addSeen(peer.info.peerId, seenTime) await peer.network.switch.disconnect(peer.info.peerId) - peer.connectionState = Disconnected except CatchableError: # We do not care about exceptions in disconnection procedure. trace "Exception while disconnecting peer", peer = peer.info.peerId, @@ -639,6 +660,22 @@ proc handleIncomingStream(network: Eth2Node, let peer = peerFromStream(network, conn) try: + case peer.connectionState + of Disconnecting, Disconnected, None: + # We got incoming stream request while disconnected or disconnecting. + warn "Got incoming request from disconnected peer", peer = peer, + message = msgName + await conn.closeWithEOF() + return + of Connecting: + # We got incoming stream request while handshake is not yet finished, + # TODO: We could check it here. + debug "Got incoming request from peer while in handshake", peer = peer, + msgName + of Connected: + # We got incoming stream from peer with proper connection state. + debug "Got incoming request from peer", peer = peer, msgName + template returnInvalidRequest(msg: ErrorMsg) = peer.updateScore(PeerScoreInvalidRequest) await sendErrorResponse(peer, conn, InvalidRequest, msg) @@ -721,8 +758,10 @@ proc handleIncomingStream(network: Eth2Node, await conn.closeWithEOF() discard network.peerPool.checkPeerScore(peer) -proc toPeerAddr*(r: enr.TypedRecord): - Result[PeerAddr, cstring] {.raises: [Defect].} = +proc toPeerAddr*(r: enr.TypedRecord, + proto: IpTransportProtocol): Result[PeerAddr, cstring] {. + raises: [Defect].} = + if not r.secp256k1.isSome: return err("enr: no secp256k1 key in record") @@ -733,18 +772,34 @@ proc toPeerAddr*(r: enr.TypedRecord): var addrs = newSeq[MultiAddress]() - if r.ip.isSome and r.tcp.isSome: - let ip = ipv4(r.ip.get) - addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) - - if r.ip6.isSome: - let ip = ipv6(r.ip6.get) - if r.tcp6.isSome: - addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get) - elif r.tcp.isSome: + case proto + of tcpProtocol: + if r.ip.isSome and r.tcp.isSome: + let ip = ipv4(r.ip.get) addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) - else: - discard + + if r.ip6.isSome: + let ip = ipv6(r.ip6.get) + if r.tcp6.isSome: + addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get) + elif r.tcp.isSome: + addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) + else: + discard + + of udpProtocol: + if r.ip.isSome and r.udp.isSome: + let ip = ipv4(r.ip.get) + addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get) + + if r.ip6.isSome: + let ip = ipv6(r.ip6.get) + if r.udp6.isSome: + addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp6.get) + elif r.udp.isSome: + addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get) + else: + discard if addrs.len == 0: return err("enr: no addresses in record") @@ -808,7 +863,7 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} = proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} = let nodeRecord = ? node.record.toTypedRecord() - let peerAddr = ? nodeRecord.toPeerAddr() + let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol) ok(peerAddr) proc runDiscoveryLoop*(node: Eth2Node) {.async.} = @@ -872,13 +927,71 @@ proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata = else: result = Json.loadFile(metadataPath, Eth2Metadata) +proc resolvePeer(peer: Peer) = + # Resolve task which performs searching of peer's public key and recovery of + # ENR using discovery5. We only resolve ENR for peers we know about to avoid + # querying the network - as of now, the ENR is not needed, except for + # debuggging + logScope: peer = peer.info.peerId + let startTime = now(chronos.Moment) + let nodeId = + block: + var key: PublicKey + # `secp256k1` keys are always stored inside PeerID. + discard peer.info.peerId.extractPublicKey(key) + keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() + + debug "Peer's ENR recovery task started", node_id = $nodeId + + # This is "fast-path" for peers which was dialed. In this case discovery + # already has most recent ENR information about this peer. + let gnode = peer.network.discovery.getNode(nodeId) + if gnode.isSome(): + peer.enr = some(gnode.get().record) + inc(nbc_successful_discoveries) + let delay = now(chronos.Moment) - startTime + nbc_resolve_time.observe(delay.toFloatSeconds()) + debug "Peer's ENR recovered", delay = $delay + +proc handlePeer*(peer: Peer) {.async.} = + let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) + case res: + of PeerStatus.LowScoreError, PeerStatus.NoSpaceError: + # Peer has low score or we do not have enough space in PeerPool, + # we are going to disconnect it gracefully. + # Peer' state will be updated in connection event. + debug "Peer has low score or there no space in PeerPool", + peer = peer, reason = res + await peer.disconnect(FaultOrError) + of PeerStatus.DeadPeerError: + # Peer's lifetime future is finished, so its already dead, + # we do not need to perform gracefull disconect. + # Peer's state will be updated in connection event. + discard + of PeerStatus.DuplicateError: + # Peer is already present in PeerPool, we can't perform disconnect, + # because in such case we could kill both connections (connection + # which is present in PeerPool and new one). + # This is possible bug, because we could enter here only if number + # of `peer.connections == 1`, it means that Peer's lifetime is not + # tracked properly and we still not received `Disconnected` event. + debug "Peer is already present in PeerPool", peer = peer + of PeerStatus.Success: + # Peer was added to PeerPool. + peer.score = NewPeerScore + peer.connectionState = Connected + # We spawn task which will obtain ENR for this peer. + resolvePeer(peer) + debug "Peer successfully connected", peer = peer, + connections = peer.connections + proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} = let peer = node.getPeer(peerId) case event.kind of ConnEventKind.Connected: inc peer.connections - debug "Peer upgraded", peer = $peerId, connections = peer.connections - + debug "Peer connection upgraded", peer = $peerId, + connections = peer.connections if peer.connections == 1: # Libp2p may connect multiple times to the same peer - using different # transports for both incoming and outgoing. For now, we'll count our @@ -889,45 +1002,62 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} = # * For peer limits, we might miscount the incoming vs outgoing quota # * Protocol handshakes are wonky: we'll not necessarily use the newly # connected transport - instead we'll just pick a random one! + case peer.connectionState + of Disconnecting: + # We got connection with peer which we currently disconnecting. + # Normally this does not happen, but if a peer is being disconnected + # while a concurrent (incoming for example) connection attempt happens, + # we might end up here + debug "Got connection attempt from peer that we are disconnecting", + peer = peerId + await node.switch.disconnect(peerId) + return + of None: + # We have established a connection with the new peer. + peer.connectionState = Connecting + of Disconnected: + # We have established a connection with the peer that we have seen + # before - reusing the existing peer object is fine + peer.connectionState = Connecting + peer.score = 0 # Will be set to NewPeerScore after handshake + of Connecting, Connected: + # This means that we got notification event from peer which we already + # connected or connecting right now. If this situation will happened, + # it means bug on `nim-libp2p` side. + warn "Got connection attempt from peer which we already connected", + peer = peerId + await peer.disconnect(FaultOrError) + return + + # Store connection direction inside Peer object. + if event.incoming: + peer.direction = PeerType.Incoming + else: + peer.direction = PeerType.Outgoing await performProtocolHandshakes(peer, event.incoming) - # While performing the handshake, the peer might have been disconnected - - # there's still a slim chance of a race condition here if a reconnect - # happens quickly - if peer.connections == 1: - let res = - if event.incoming: - node.peerPool.addPeerNoWait(peer, PeerType.Incoming) - else: - node.peerPool.addPeerNoWait(peer, PeerType.Outgoing) - - case res: - of PeerStatus.LowScoreError, PeerStatus.NoSpaceError: - # Peer has low score or we do not have enough space in PeerPool, - # we are going to disconnect it gracefully. - await peer.disconnect(FaultOrError) - of PeerStatus.DeadPeerError: - # Peer's lifetime future is finished, so its already dead, - # we do not need to perform gracefull disconect. - discard - of PeerStatus.DuplicateError: - # Peer is already present in PeerPool, we can't perform disconnect, - # because in such case we could kill both connections (connection - # which is present in PeerPool and new one). - discard - of PeerStatus.Success: - # Peer was added to PeerPool. - discard - of ConnEventKind.Disconnected: dec peer.connections - debug "Peer disconnected", peer = $peerId, connections = peer.connections + debug "Lost connection to peer", peer = peerId, + connections = peer.connections + if peer.connections == 0: + debug "Peer disconnected", peer = $peerId, connections = peer.connections + + # Whatever caused disconnection, avoid connection spamming + node.addSeen(peerId, SeenTableTimeReconnect) + let fut = peer.disconnectedFut - if fut != nil: - peer.disconnectedFut = nil + if not(isNil(fut)): fut.complete() + peer.disconnectedFut = nil + else: + # TODO (cheatfate): This could be removed when bug will be fixed inside + # `nim-libp2p`. + debug "Got new event while peer is already disconnected", + peer = peerId, peer_state = peer.connectionState + peer.connectionState = Disconnected proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID, switch: Switch, pubsub: PubSub, ip: Option[ValidIpAddress], @@ -1017,7 +1147,7 @@ proc start*(node: Eth2Node) {.async.} = for enr in node.discovery.bootstrapRecords: let tr = enr.toTypedRecord() if tr.isOk(): - let pa = tr.get().toPeerAddr() + let pa = tr.get().toPeerAddr(tcpProtocol) if pa.isOk(): await node.connQueue.addLast(pa.get()) @@ -1036,17 +1166,18 @@ proc stop*(node: Eth2Node) {.async.} = futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer = - new result - result.info = info - result.network = network - result.connectionState = Connected - result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config - result.lastReqTime = now(chronos.Moment) - newSeq result.protocolStates, allProtocols.len - for i in 0 ..< allProtocols.len: + let res = Peer( + info: info, + network: network, + connectionState: ConnectionState.None, + lastReqTime: now(chronos.Moment), + protocolStates: newSeq[RootRef](len(allProtocols)) + ) + for i in 0 ..< len(allProtocols): let proto = allProtocols[i] - if proto.peerStateInitializer != nil: - result.protocolStates[i] = proto.peerStateInitializer(result) + if not(isNil(proto.peerStateInitializer)): + res.protocolStates[i] = proto.peerStateInitializer(res) + res proc registerMsg(protocol: ProtocolInfo, name: string, @@ -1393,7 +1524,7 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, rng = rng) let - params = + params = block: var p = GossipSubParams.init() # https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub @@ -1411,9 +1542,9 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, pubsub = GossipSub.init( switch = switch, msgIdProvider = msgIdProvider, - triggerSelf = true, + triggerSelf = true, sign = false, - verifySignature = false, + verifySignature = false, anonymize = true, parameters = params).PubSub diff --git a/beacon_chain/rpc/node_api.nim b/beacon_chain/rpc/node_api.nim index 1268fa3a6..6276d964d 100644 --- a/beacon_chain/rpc/node_api.nim +++ b/beacon_chain/rpc/node_api.nim @@ -1,8 +1,10 @@ -import +import std/options, chronicles, json_rpc/[rpcserver, jsonmarshal], - - ../beacon_node_common, ../eth2_network, + eth/p2p/discoveryv5/enr, + libp2p/[multiaddress, multicodec], + nimcrypto/utils as ncrutils, + ../beacon_node_common, ../eth2_network, ../sync_manager, ../peer_pool, ../version, ../spec/[datatypes, digest, presets], ../spec/eth2_apis/callsigs_types @@ -15,30 +17,230 @@ type template unimplemented() = raise (ref CatchableError)(msg: "Unimplemented") +proc validateState(state: Option[seq[string]]): Option[set[ConnectionState]] = + var res: set[ConnectionState] + if state.isSome(): + let states = state.get() + for item in states: + case item + of "disconnected": + if ConnectionState.Disconnected notin res: + res.incl(ConnectionState.Disconnected) + else: + # `state` values should be unique + return none(set[ConnectionState]) + of "connecting": + if ConnectionState.Disconnected notin res: + res.incl(ConnectionState.Connecting) + else: + # `state` values should be unique + return none(set[ConnectionState]) + of "connected": + if ConnectionState.Connected notin res: + res.incl(ConnectionState.Connected) + else: + # `state` values should be unique + return none(set[ConnectionState]) + of "disconnecting": + if ConnectionState.Disconnecting notin res: + res.incl(ConnectionState.Disconnecting) + else: + # `state` values should be unique + return none(set[ConnectionState]) + else: + # Found incorrect `state` string value + return none(set[ConnectionState]) + + if res == {}: + res = {ConnectionState.Connecting, ConnectionState.Connected, + ConnectionState.Disconnecting, ConnectionState.Disconnected} + some(res) + +proc validateDirection(direction: Option[seq[string]]): Option[set[PeerType]] = + var res: set[PeerType] + if direction.isSome(): + let directions = direction.get() + for item in directions: + case item + of "inbound": + if PeerType.Incoming notin res: + res.incl(PeerType.Incoming) + else: + # `direction` values should be unique + return none(set[PeerType]) + of "outbound": + if PeerType.Outgoing notin res: + res.incl(PeerType.Outgoing) + else: + # `direction` values should be unique + return none(set[PeerType]) + else: + # Found incorrect `direction` string value + return none(set[PeerType]) + + if res == {}: + res = {PeerType.Incoming, PeerType.Outgoing} + some(res) + +proc toString(state: ConnectionState): string = + case state + of ConnectionState.Disconnected: + "disconnected" + of ConnectionState.Connecting: + "connecting" + of ConnectionState.Connected: + "connected" + of ConnectionState.Disconnecting: + "disconnecting" + else: + "" + +proc toString(direction: PeerType): string = + case direction: + of PeerType.Incoming: + "inbound" + of PeerType.Outgoing: + "outbound" + +proc getLastSeenAddress(info: PeerInfo): string = + # TODO (cheatfate): We need to provide filter here, which will be able to + # filter such multiaddresses like `/ip4/0.0.0.0` or local addresses or + # addresses with peer ids. + if len(info.addrs) > 0: + $info.addrs[len(info.addrs) - 1] + else: + "" + +proc getDiscoveryAddresses(node: BeaconNode): Option[seq[string]] = + let restr = node.network.enrRecord().toTypedRecord() + if restr.isErr(): + return none[seq[string]]() + let respa = restr.get().toPeerAddr(udpProtocol) + if respa.isErr(): + return none[seq[string]]() + let pa = respa.get() + let mpa = MultiAddress.init(multicodec("p2p"), pa.peerId) + if mpa.isErr(): + return none[seq[string]]() + var addresses = newSeqOfCap[string](len(pa.addrs)) + for item in pa.addrs: + let resa = concat(item, mpa.get()) + if resa.isOk(): + addresses.add($(resa.get())) + return some(addresses) + +proc getP2PAddresses(node: BeaconNode): Option[seq[string]] = + let pinfo = node.network.switch.peerInfo + let mpa = MultiAddress.init(multicodec("p2p"), pinfo.peerId) + if mpa.isErr(): + return none[seq[string]]() + var addresses = newSeqOfCap[string](len(pinfo.addrs)) + for item in pinfo.addrs: + let resa = concat(item, mpa.get()) + if resa.isOk(): + addresses.add($(resa.get())) + return some(addresses) + proc installNodeApiHandlers*(rpcServer: RpcServer, node: BeaconNode) = rpcServer.rpc("get_v1_node_identity") do () -> NodeIdentityTuple: + let discoveryAddresses = + block: + let res = node.getDiscoveryAddresses() + if res.isSome(): + res.get() + else: + newSeq[string](0) + + let p2pAddresses = + block: + let res = node.getP2PAddresses() + if res.isSome(): + res.get() + else: + newSeq[string]() + return ( - peer_id: node.network.peerId(), - enr: node.network.enrRecord(), - # TODO rest of fields - p2p_addresses: newSeq[MultiAddress](0), - discovery_addresses: newSeq[MultiAddress](0), - metadata: (0'u64, "") + peer_id: $node.network.peerId(), + enr: node.network.enrRecord().toUri(), + p2p_addresses: p2pAddresses, + discovery_addresses: discoveryAddresses, + metadata: (node.network.metadata.seq_number, + "0x" & ncrutils.toHex(node.network.metadata.attnets.bytes)) ) - rpcServer.rpc("get_v1_node_peers") do () -> JsonNode: - unimplemented() + rpcServer.rpc("get_v1_node_peers") do (state: Option[seq[string]], + direction: Option[seq[string]]) -> seq[NodePeerTuple]: + var res = newSeq[NodePeerTuple]() + let rstates = validateState(state) + if rstates.isNone(): + raise newException(CatchableError, "Incorrect state parameter") + let rdirs = validateDirection(direction) + if rdirs.isNone(): + raise newException(CatchableError, "Incorrect direction parameter") + let states = rstates.get() + let dirs = rdirs.get() + for item in node.network.peers.values(): + if (item.connectionState in states) and (item.direction in dirs): + let peer = ( + peer_id: $item.info.peerId, + enr: if item.enr.isSome(): item.enr.get().toUri() else: "", + last_seen_p2p_address: item.info.getLastSeenAddress(), + state: item.connectionState.toString(), + direction: item.direction.toString(), + agent: item.info.agentVersion, # Fields `agent` and `proto` are not + proto: item.info.protoVersion # part of specification. + ) + res.add(peer) + return res - rpcServer.rpc("get_v1_node_peers_peerId") do () -> JsonNode: - unimplemented() + rpcServer.rpc("get_v1_node_peer_count") do () -> NodePeerCountTuple: + var res: NodePeerCountTuple + for item in node.network.peers.values(): + case item.connectionState + of Connecting: + inc(res.connecting) + of Connected: + inc(res.connected) + of Disconnecting: + inc(res.disconnecting) + of Disconnected: + inc(res.disconnected) + of ConnectionState.None: + discard + return res + + rpcServer.rpc("get_v1_node_peers_peerId") do ( + peer_id: string) -> NodePeerTuple: + let pres = PeerID.init(peer_id) + if pres.isErr(): + raise newException(CatchableError, + "The peer ID supplied could not be parsed") + let pid = pres.get() + let peer = node.network.peers.getOrDefault(pid) + if isNil(peer): + raise newException(CatchableError, "Peer not found") + + return ( + peer_id: $peer.info.peerId, + enr: if peer.enr.isSome(): peer.enr.get().toUri() else: "", + last_seen_p2p_address: peer.info.getLastSeenAddress(), + state: peer.connectionState.toString(), + direction: peer.direction.toString(), + agent: peer.info.agentVersion, # Fields `agent` and `proto` are not part + proto: peer.info.protoVersion # of specification + ) rpcServer.rpc("get_v1_node_version") do () -> JsonNode: - return %{ - "version": "Nimbus/" & fullVersionStr - } + return %*{"version": "Nimbus/" & fullVersionStr} - rpcServer.rpc("get_v1_node_syncing") do () -> JsonNode: - unimplemented() + rpcServer.rpc("get_v1_node_syncing") do () -> SyncInfo: + return node.syncManager.getInfo() rpcServer.rpc("get_v1_node_health") do () -> JsonNode: - unimplemented() + # TODO: There currently no way to situation when we node has issues, so + # its impossible to return HTTP ERROR 503 according to specification. + if node.syncManager.inProgress: + # We need to return HTTP ERROR 206 according to specification + return %*{"health": 206} + else: + return %*{"health": 200} diff --git a/beacon_chain/spec/eth2_apis/callsigs_types.nim b/beacon_chain/spec/eth2_apis/callsigs_types.nim index 37e5cf6c7..323bb6f96 100644 --- a/beacon_chain/spec/eth2_apis/callsigs_types.nim +++ b/beacon_chain/spec/eth2_apis/callsigs_types.nim @@ -5,9 +5,7 @@ import # TODO for some reason "../[datatypes, digest, crypto]" results in "Error: cannot open file" ../datatypes, ../digest, - ../crypto, - libp2p/[peerid, multiaddress], - eth/p2p/discoveryv5/enr + ../crypto type AttesterDuties* = tuple @@ -46,8 +44,23 @@ type header: SignedBeaconBlockHeader NodeIdentityTuple* = tuple - peer_id: PeerID - enr: Record - p2p_addresses: seq[MultiAddress] - discovery_addresses: seq[MultiAddress] + peer_id: string + enr: string + p2p_addresses: seq[string] + discovery_addresses: seq[string] metadata: tuple[seq_number: uint64, attnets: string] + + NodePeerTuple* = tuple + peer_id: string + enr: string + last_seen_p2p_address: string + state: string + direction: string + agent: string # This is not part of specification + proto: string # This is not part of specification + + NodePeerCountTuple* = tuple + disconnected: int + connecting: int + connected: int + disconnecting: int diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index 25381b5b4..2e630d9a6 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -129,6 +129,10 @@ type peer*: T stamp*: chronos.Moment + SyncInfo* = object + head_slot*: Slot + sync_distance*: int64 + SyncManagerError* = object of CatchableError BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]] @@ -1183,3 +1187,9 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = proc start*[A, B](man: SyncManager[A, B]) = ## Starts SyncManager's main loop. man.syncFut = man.syncLoop() + +proc getInfo*[A, B](man: SyncManager[A, B]): SyncInfo = + ## Returns current synchronization information for RPC call. + let wallSlot = man.getLocalWallSlot() + let headSlot = man.getLocalHeadSlot() + SyncInfo(head_slot: headSlot, sync_distance: int64(wallSlot - headSlot)) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index 1c9ee06aa..ca5734fe5 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -239,6 +239,10 @@ proc handleStatus(peer: Peer, await peer.disconnect(IrrelevantNetwork) else: peer.setStatusMsg(theirStatus) + if peer.connectionState == Connecting: + # As soon as we get here it means that we passed handshake succesfully. So + # we can add this peer to PeerPool. + await peer.handlePeer() proc initBeaconSync*(network: Eth2Node, chainDag: ChainDAGRef, forkDigest: ForkDigest) = diff --git a/vendor/nim-chronos b/vendor/nim-chronos index ac9b3e304..8709ef9ed 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit ac9b3e304f630a450efc996f47dc9e6133246a87 +Subproject commit 8709ef9ed5e31e71a1c960237d2eb2c23a2adcd2