Full "node" RPC calls implementation and fixes to peer lifetime states. (#2065)
* Initial commit. * Fix log lines and compilation error. * Add get_v1_node_peers() implementation. * Fix peer's lifetime states. * Use the most recent multiaddress. * Fix assign NewPeerScore again. Fix compilation error with last seen address. Fix Peer upgraded log line place. * syncing, health, peer_count, peer_id and fixes for identity. * Fix compilation problems. * Move object declaration to callsigs. Fix identity addresses fields. * Finish node RPC calls. * Avoid leak of lifetime future. * Bump chronos. * Fix json generator problem.
This commit is contained in:
parent
7fdafa65ee
commit
d041287a4f
|
@ -72,7 +72,7 @@ type
|
|||
connTable: HashSet[PeerID]
|
||||
forkId: ENRForkID
|
||||
rng*: ref BrHmacDrbgContext
|
||||
peers: Table[PeerID, Peer]
|
||||
peers*: Table[PeerID, Peer]
|
||||
|
||||
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
|
||||
|
||||
|
@ -101,6 +101,8 @@ type
|
|||
requestQuota*: float
|
||||
lastReqTime*: Moment
|
||||
connections*: int
|
||||
enr*: Option[enr.Record]
|
||||
direction*: PeerType
|
||||
disconnectedFut: Future[void]
|
||||
|
||||
PeerAddr* = object
|
||||
|
@ -241,6 +243,9 @@ const
|
|||
SeenTablePenaltyError* = 60.minutes
|
||||
## Period of time for peers which score below or equal to zero.
|
||||
|
||||
ResolvePeerTimeout* = 1.minutes
|
||||
## Maximum time allowed for peer resolve process.
|
||||
|
||||
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
|
||||
err(type(result), Eth2NetworkingError(kind: kindParam))
|
||||
|
||||
|
@ -263,6 +268,18 @@ declarePublicCounter nbc_timeout_dials,
|
|||
declarePublicGauge nbc_peers,
|
||||
"Number of active libp2p peers"
|
||||
|
||||
declarePublicCounter nbc_successful_discoveries,
|
||||
"Number of successfull discoveries"
|
||||
|
||||
declarePublicCounter nbc_failed_discoveries,
|
||||
"Number of failed discoveries"
|
||||
|
||||
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
|
||||
|
||||
declareHistogram nbc_resolve_time,
|
||||
"Time(s) used while resolving peer information",
|
||||
buckets = delayBuckets
|
||||
|
||||
const
|
||||
snappy_implementation {.strdefine.} = "libp2p"
|
||||
|
||||
|
@ -318,6 +335,10 @@ proc getPeer*(node: Eth2Node, peerId: PeerID): Peer =
|
|||
let peer = Peer.init(node, PeerInfo.init(peerId))
|
||||
return node.peers.mGetOrPut(peerId, peer)
|
||||
|
||||
proc resetPeer*(node: Eth2Node, peerId: PeerID) =
|
||||
let peer = Peer.init(node, PeerInfo.init(peerId))
|
||||
node.peers[peerId] = peer
|
||||
|
||||
proc peerFromStream(network: Eth2Node, conn: Connection): Peer =
|
||||
result = network.getPeer(conn.peerInfo.peerId)
|
||||
result.info = conn.peerInfo
|
||||
|
@ -326,8 +347,6 @@ proc getKey*(peer: Peer): PeerID {.inline.} =
|
|||
peer.info.peerId
|
||||
|
||||
proc getFuture*(peer: Peer): Future[void] {.inline.} =
|
||||
if peer.disconnectedFut.isNil:
|
||||
peer.disconnectedFut = newFuture[void]()
|
||||
peer.disconnectedFut
|
||||
|
||||
proc getScore*(a: Peer): int =
|
||||
|
@ -439,7 +458,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
|||
SeenTablePenaltyError
|
||||
peer.network.addSeen(peer.info.peerId, seenTime)
|
||||
await peer.network.switch.disconnect(peer.info.peerId)
|
||||
peer.connectionState = Disconnected
|
||||
except CatchableError:
|
||||
# We do not care about exceptions in disconnection procedure.
|
||||
trace "Exception while disconnecting peer", peer = peer.info.peerId,
|
||||
|
@ -639,6 +657,22 @@ proc handleIncomingStream(network: Eth2Node,
|
|||
|
||||
let peer = peerFromStream(network, conn)
|
||||
try:
|
||||
case peer.connectionState
|
||||
of Disconnecting, Disconnected, None:
|
||||
# We got incoming stream request while disconnected or disconnecting.
|
||||
warn "Got incoming request from disconnected peer", peer = peer,
|
||||
message = msgName
|
||||
await conn.closeWithEOF()
|
||||
return
|
||||
of Connecting:
|
||||
# We got incoming stream request while handshake is not yet finished,
|
||||
# TODO: We could check it here.
|
||||
debug "Got incoming request from peer while in handshake", peer = peer,
|
||||
msgName
|
||||
of Connected:
|
||||
# We got incoming stream from peer with proper connection state.
|
||||
debug "Got incoming request from peer", peer = peer, msgName
|
||||
|
||||
template returnInvalidRequest(msg: ErrorMsg) =
|
||||
peer.updateScore(PeerScoreInvalidRequest)
|
||||
await sendErrorResponse(peer, conn, InvalidRequest, msg)
|
||||
|
@ -721,8 +755,10 @@ proc handleIncomingStream(network: Eth2Node,
|
|||
await conn.closeWithEOF()
|
||||
discard network.peerPool.checkPeerScore(peer)
|
||||
|
||||
proc toPeerAddr*(r: enr.TypedRecord):
|
||||
Result[PeerAddr, cstring] {.raises: [Defect].} =
|
||||
proc toPeerAddr*(r: enr.TypedRecord,
|
||||
proto: IpTransportProtocol): Result[PeerAddr, cstring] {.
|
||||
raises: [Defect].} =
|
||||
|
||||
if not r.secp256k1.isSome:
|
||||
return err("enr: no secp256k1 key in record")
|
||||
|
||||
|
@ -733,18 +769,34 @@ proc toPeerAddr*(r: enr.TypedRecord):
|
|||
|
||||
var addrs = newSeq[MultiAddress]()
|
||||
|
||||
if r.ip.isSome and r.tcp.isSome:
|
||||
let ip = ipv4(r.ip.get)
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
|
||||
|
||||
if r.ip6.isSome:
|
||||
let ip = ipv6(r.ip6.get)
|
||||
if r.tcp6.isSome:
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
|
||||
elif r.tcp.isSome:
|
||||
case proto
|
||||
of tcpProtocol:
|
||||
if r.ip.isSome and r.tcp.isSome:
|
||||
let ip = ipv4(r.ip.get)
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
if r.ip6.isSome:
|
||||
let ip = ipv6(r.ip6.get)
|
||||
if r.tcp6.isSome:
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
|
||||
elif r.tcp.isSome:
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
of udpProtocol:
|
||||
if r.ip.isSome and r.udp.isSome:
|
||||
let ip = ipv4(r.ip.get)
|
||||
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get)
|
||||
|
||||
if r.ip6.isSome:
|
||||
let ip = ipv6(r.ip6.get)
|
||||
if r.udp6.isSome:
|
||||
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp6.get)
|
||||
elif r.udp.isSome:
|
||||
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
if addrs.len == 0:
|
||||
return err("enr: no addresses in record")
|
||||
|
@ -808,7 +860,7 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} =
|
|||
|
||||
proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} =
|
||||
let nodeRecord = ? node.record.toTypedRecord()
|
||||
let peerAddr = ? nodeRecord.toPeerAddr()
|
||||
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
|
||||
ok(peerAddr)
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
|
@ -872,13 +924,120 @@ proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata =
|
|||
else:
|
||||
result = Json.loadFile(metadataPath, Eth2Metadata)
|
||||
|
||||
proc resolvePeer(peer: Peer) {.async.} =
|
||||
# Resolve task which performs searching of peer's public key and recovery of
|
||||
# ENR using discovery5.
|
||||
# This process could be stopped if one of the conditions is met:
|
||||
# 1. ENR was successfully recovered.
|
||||
# 2. Discovery5 failed to recover ENR.
|
||||
# 3. Timeout ``ResolvePeerTimeout`` has exceeded .
|
||||
# 4. Connection with the peer is lost.
|
||||
logScope: peer = peer.info.peerId
|
||||
let startTime = now(chronos.Moment)
|
||||
let nodeId =
|
||||
block:
|
||||
var key: PublicKey
|
||||
# `secp256k1` keys are always stored inside PeerID.
|
||||
discard peer.info.peerId.extractPublicKey(key)
|
||||
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
|
||||
|
||||
debug "Peer's ENR recovery task started", node_id = $nodeId
|
||||
|
||||
# This is "fast-path" for peers which was dialed. In this case discovery
|
||||
# already has most recent ENR information about this peer.
|
||||
let gnode = peer.network.discovery.getNode(nodeId)
|
||||
if gnode.isSome():
|
||||
peer.enr = some(gnode.get().record)
|
||||
inc(nbc_successful_discoveries)
|
||||
let delay = now(chronos.Moment) - startTime
|
||||
nbc_resolve_time.observe(delay.toFloatSeconds())
|
||||
debug "Peer's ENR recovered", delay = $delay
|
||||
else:
|
||||
let resolveFut = peer.network.discovery.resolve(nodeId)
|
||||
let timeFut = sleepAsync(ResolvePeerTimeout)
|
||||
|
||||
discard await race(peer.disconnectedFut, resolveFut, timeFut)
|
||||
|
||||
if peer.disconnectedFut.finished():
|
||||
# Peer is already disconnected
|
||||
if not(timeFut.finished()):
|
||||
timeFut.cancel()
|
||||
if not(resolveFut.finished()):
|
||||
await resolveFut.cancelAndWait()
|
||||
# allFutures() will perform check for futures which are already finished,
|
||||
# and we do not care about results anymore.
|
||||
await allFutures(timeFut)
|
||||
return
|
||||
|
||||
if resolveFut.finished():
|
||||
if resolveFut.done():
|
||||
let rnode = resolveFut.read()
|
||||
if rnode.isSome():
|
||||
peer.enr = some(rnode.get().record)
|
||||
inc(nbc_successful_discoveries)
|
||||
let delay = now(chronos.Moment) - startTime
|
||||
nbc_resolve_time.observe(delay.toFloatSeconds())
|
||||
debug "Peer's ENR recovered", delay = $delay
|
||||
else:
|
||||
inc(nbc_failed_discoveries)
|
||||
debug "Discovery operation returns empty answer"
|
||||
return
|
||||
else:
|
||||
inc(nbc_failed_discoveries)
|
||||
debug "Discovery operation failed with an error",
|
||||
error_name = resolveFut.error.name,
|
||||
error_msg = resolveFut.error.msg
|
||||
if not(timeFut.finished()):
|
||||
await timeFut.cancelAndWait()
|
||||
return
|
||||
|
||||
if timeFut.finished():
|
||||
inc(nbc_failed_discoveries)
|
||||
debug "Discovery operation exceeds timeout",
|
||||
timeout = $ResolvePeerTimeout
|
||||
if not(resolveFut.finished()):
|
||||
await resolveFut.cancelAndWait()
|
||||
return
|
||||
|
||||
proc handlePeer*(peer: Peer) {.async.} =
|
||||
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
|
||||
case res:
|
||||
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
|
||||
# Peer has low score or we do not have enough space in PeerPool,
|
||||
# we are going to disconnect it gracefully.
|
||||
# Peer' state will be updated in connection event.
|
||||
debug "Peer has low score or there no space in PeerPool",
|
||||
peer = peer, reason = res
|
||||
await peer.disconnect(FaultOrError)
|
||||
of PeerStatus.DeadPeerError:
|
||||
# Peer's lifetime future is finished, so its already dead,
|
||||
# we do not need to perform gracefull disconect.
|
||||
# Peer's state will be updated in connection event.
|
||||
discard
|
||||
of PeerStatus.DuplicateError:
|
||||
# Peer is already present in PeerPool, we can't perform disconnect,
|
||||
# because in such case we could kill both connections (connection
|
||||
# which is present in PeerPool and new one).
|
||||
# This is possible bug, because we could enter here only if number
|
||||
# of `peer.connections == 1`, it means that Peer's lifetime is not
|
||||
# tracked properly and we still not received `Disconnected` event.
|
||||
warn "Peer is already present in PeerPool", peer = peer
|
||||
of PeerStatus.Success:
|
||||
# Peer was added to PeerPool.
|
||||
peer.score = NewPeerScore
|
||||
peer.connectionState = Connected
|
||||
# We spawn task which will obtain ENR for this peer.
|
||||
asyncSpawn resolvePeer(peer)
|
||||
debug "Peer successfully connected", peer = peer,
|
||||
connections = peer.connections
|
||||
|
||||
proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
|
||||
let peer = node.getPeer(peerId)
|
||||
case event.kind
|
||||
of ConnEventKind.Connected:
|
||||
inc peer.connections
|
||||
debug "Peer upgraded", peer = $peerId, connections = peer.connections
|
||||
|
||||
debug "Peer connection upgraded", peer = $peerId,
|
||||
connections = peer.connections
|
||||
if peer.connections == 1:
|
||||
# Libp2p may connect multiple times to the same peer - using different
|
||||
# transports for both incoming and outgoing. For now, we'll count our
|
||||
|
@ -889,45 +1048,58 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
|
|||
# * For peer limits, we might miscount the incoming vs outgoing quota
|
||||
# * Protocol handshakes are wonky: we'll not necessarily use the newly
|
||||
# connected transport - instead we'll just pick a random one!
|
||||
case peer.connectionState
|
||||
of Disconnecting:
|
||||
# We got connection with peer which we currently disconnecting. This
|
||||
# situation should not be happened, because when we disconnecting
|
||||
# we adding peer to `SeenTable`.
|
||||
warn "Got connection attempt from peer that we are disconnecting",
|
||||
peer = peerId
|
||||
return
|
||||
of None:
|
||||
# We have established a connection with the new peer.
|
||||
peer.connectionState = Connecting
|
||||
of Disconnected:
|
||||
# We have established a connection with the peer that we have seen
|
||||
# before, so we perform reset peer in node.peers table.
|
||||
node.resetPeer(peerId)
|
||||
peer.connectionState = Connecting
|
||||
of Connecting, Connected:
|
||||
# This means that we got notification event from peer which we already
|
||||
# connected or connecting right now. If this situation will happened,
|
||||
# it means bug on `nim-libp2p` side.
|
||||
warn "Got connection attempt from peer which we already connected",
|
||||
peer = peerId
|
||||
await peer.disconnect(FaultOrError)
|
||||
return
|
||||
|
||||
await performProtocolHandshakes(peer, event.incoming)
|
||||
# Store connection direction inside Peer object.
|
||||
if event.incoming:
|
||||
peer.direction = PeerType.Incoming
|
||||
else:
|
||||
peer.direction = PeerType.Outgoing
|
||||
|
||||
# While performing the handshake, the peer might have been disconnected -
|
||||
# there's still a slim chance of a race condition here if a reconnect
|
||||
# happens quickly
|
||||
if peer.connections == 1:
|
||||
let res =
|
||||
if event.incoming:
|
||||
node.peerPool.addPeerNoWait(peer, PeerType.Incoming)
|
||||
else:
|
||||
node.peerPool.addPeerNoWait(peer, PeerType.Outgoing)
|
||||
|
||||
case res:
|
||||
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
|
||||
# Peer has low score or we do not have enough space in PeerPool,
|
||||
# we are going to disconnect it gracefully.
|
||||
await peer.disconnect(FaultOrError)
|
||||
of PeerStatus.DeadPeerError:
|
||||
# Peer's lifetime future is finished, so its already dead,
|
||||
# we do not need to perform gracefull disconect.
|
||||
discard
|
||||
of PeerStatus.DuplicateError:
|
||||
# Peer is already present in PeerPool, we can't perform disconnect,
|
||||
# because in such case we could kill both connections (connection
|
||||
# which is present in PeerPool and new one).
|
||||
discard
|
||||
of PeerStatus.Success:
|
||||
# Peer was added to PeerPool.
|
||||
discard
|
||||
if peer.direction == PeerType.Outgoing:
|
||||
# We only perform handshake with outgoing peers, incoming peers should
|
||||
# start handshake first, so it will be handled in handleIncomingStream.
|
||||
await performProtocolHandshakes(peer, event.incoming)
|
||||
|
||||
of ConnEventKind.Disconnected:
|
||||
dec peer.connections
|
||||
debug "Peer disconnected", peer = $peerId, connections = peer.connections
|
||||
debug "Lost connection to peer", peer = peerId,
|
||||
connections = peer.connections
|
||||
if peer.connections == 0:
|
||||
debug "Peer disconnected", peer = $peerId, connections = peer.connections
|
||||
let fut = peer.disconnectedFut
|
||||
if fut != nil:
|
||||
peer.disconnectedFut = nil
|
||||
if not(isNil(fut)):
|
||||
fut.complete()
|
||||
peer.disconnectedFut = nil
|
||||
else:
|
||||
# TODO (cheatfate): This could be removed when bug will be fixed inside
|
||||
# `nim-libp2p`.
|
||||
debug "Got new event while peer is already disconnected",
|
||||
peer = peerId, peer_state = peer.connectionState
|
||||
peer.connectionState = Disconnected
|
||||
|
||||
proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
|
||||
switch: Switch, pubsub: PubSub, ip: Option[ValidIpAddress],
|
||||
|
@ -1017,7 +1189,7 @@ proc start*(node: Eth2Node) {.async.} =
|
|||
for enr in node.discovery.bootstrapRecords:
|
||||
let tr = enr.toTypedRecord()
|
||||
if tr.isOk():
|
||||
let pa = tr.get().toPeerAddr()
|
||||
let pa = tr.get().toPeerAddr(tcpProtocol)
|
||||
if pa.isOk():
|
||||
await node.connQueue.addLast(pa.get())
|
||||
|
||||
|
@ -1036,17 +1208,20 @@ proc stop*(node: Eth2Node) {.async.} =
|
|||
futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)
|
||||
|
||||
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
|
||||
new result
|
||||
result.info = info
|
||||
result.network = network
|
||||
result.connectionState = Connected
|
||||
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
|
||||
result.lastReqTime = now(chronos.Moment)
|
||||
newSeq result.protocolStates, allProtocols.len
|
||||
for i in 0 ..< allProtocols.len:
|
||||
let res = Peer(
|
||||
info: info,
|
||||
network: network,
|
||||
connectionState: ConnectionState.None,
|
||||
maxInactivityAllowed: 15.minutes, # TODO: Read this from the config
|
||||
lastReqTime: now(chronos.Moment),
|
||||
disconnectedFut: newFuture[void]("peer.lifetime"),
|
||||
protocolStates: newSeq[RootRef](len(allProtocols))
|
||||
)
|
||||
for i in 0 ..< len(allProtocols):
|
||||
let proto = allProtocols[i]
|
||||
if proto.peerStateInitializer != nil:
|
||||
result.protocolStates[i] = proto.peerStateInitializer(result)
|
||||
if not(isNil(proto.peerStateInitializer)):
|
||||
res.protocolStates[i] = proto.peerStateInitializer(res)
|
||||
res
|
||||
|
||||
proc registerMsg(protocol: ProtocolInfo,
|
||||
name: string,
|
||||
|
@ -1393,7 +1568,7 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
|
|||
rng = rng)
|
||||
|
||||
let
|
||||
params =
|
||||
params =
|
||||
block:
|
||||
var p = GossipSubParams.init()
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
|
||||
|
@ -1410,9 +1585,9 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
|
|||
pubsub = GossipSub.init(
|
||||
switch = switch,
|
||||
msgIdProvider = msgIdProvider,
|
||||
triggerSelf = true,
|
||||
triggerSelf = true,
|
||||
sign = false,
|
||||
verifySignature = false,
|
||||
verifySignature = false,
|
||||
anonymize = true,
|
||||
parameters = params).PubSub
|
||||
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import
|
||||
import std/options,
|
||||
chronicles,
|
||||
json_rpc/[rpcserver, jsonmarshal],
|
||||
|
||||
../beacon_node_common, ../eth2_network,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/[multiaddress, multicodec],
|
||||
nimcrypto/utils as ncrutils,
|
||||
../beacon_node_common, ../eth2_network, ../sync_manager,
|
||||
../peer_pool, ../version,
|
||||
../spec/[datatypes, digest, presets],
|
||||
../spec/eth2_apis/callsigs_types
|
||||
|
@ -15,30 +17,230 @@ type
|
|||
template unimplemented() =
|
||||
raise (ref CatchableError)(msg: "Unimplemented")
|
||||
|
||||
proc validateState(state: Option[seq[string]]): Option[set[ConnectionState]] =
|
||||
var res: set[ConnectionState]
|
||||
if state.isSome():
|
||||
let states = state.get()
|
||||
for item in states:
|
||||
case item
|
||||
of "disconnected":
|
||||
if ConnectionState.Disconnected notin res:
|
||||
res.incl(ConnectionState.Disconnected)
|
||||
else:
|
||||
# `state` values should be unique
|
||||
return none(set[ConnectionState])
|
||||
of "connecting":
|
||||
if ConnectionState.Disconnected notin res:
|
||||
res.incl(ConnectionState.Connecting)
|
||||
else:
|
||||
# `state` values should be unique
|
||||
return none(set[ConnectionState])
|
||||
of "connected":
|
||||
if ConnectionState.Connected notin res:
|
||||
res.incl(ConnectionState.Connected)
|
||||
else:
|
||||
# `state` values should be unique
|
||||
return none(set[ConnectionState])
|
||||
of "disconnecting":
|
||||
if ConnectionState.Disconnecting notin res:
|
||||
res.incl(ConnectionState.Disconnecting)
|
||||
else:
|
||||
# `state` values should be unique
|
||||
return none(set[ConnectionState])
|
||||
else:
|
||||
# Found incorrect `state` string value
|
||||
return none(set[ConnectionState])
|
||||
|
||||
if res == {}:
|
||||
res = {ConnectionState.Connecting, ConnectionState.Connected,
|
||||
ConnectionState.Disconnecting, ConnectionState.Disconnected}
|
||||
some(res)
|
||||
|
||||
proc validateDirection(direction: Option[seq[string]]): Option[set[PeerType]] =
|
||||
var res: set[PeerType]
|
||||
if direction.isSome():
|
||||
let directions = direction.get()
|
||||
for item in directions:
|
||||
case item
|
||||
of "inbound":
|
||||
if PeerType.Incoming notin res:
|
||||
res.incl(PeerType.Incoming)
|
||||
else:
|
||||
# `direction` values should be unique
|
||||
return none(set[PeerType])
|
||||
of "outbound":
|
||||
if PeerType.Outgoing notin res:
|
||||
res.incl(PeerType.Outgoing)
|
||||
else:
|
||||
# `direction` values should be unique
|
||||
return none(set[PeerType])
|
||||
else:
|
||||
# Found incorrect `direction` string value
|
||||
return none(set[PeerType])
|
||||
|
||||
if res == {}:
|
||||
res = {PeerType.Incoming, PeerType.Outgoing}
|
||||
some(res)
|
||||
|
||||
proc toString(state: ConnectionState): string =
|
||||
case state
|
||||
of ConnectionState.Disconnected:
|
||||
"disconnected"
|
||||
of ConnectionState.Connecting:
|
||||
"connecting"
|
||||
of ConnectionState.Connected:
|
||||
"connected"
|
||||
of ConnectionState.Disconnecting:
|
||||
"disconnecting"
|
||||
else:
|
||||
""
|
||||
|
||||
proc toString(direction: PeerType): string =
|
||||
case direction:
|
||||
of PeerType.Incoming:
|
||||
"inbound"
|
||||
of PeerType.Outgoing:
|
||||
"outbound"
|
||||
|
||||
proc getLastSeenAddress(info: PeerInfo): string =
|
||||
# TODO (cheatfate): We need to provide filter here, which will be able to
|
||||
# filter such multiaddresses like `/ip4/0.0.0.0` or local addresses or
|
||||
# addresses with peer ids.
|
||||
if len(info.addrs) > 0:
|
||||
$info.addrs[len(info.addrs) - 1]
|
||||
else:
|
||||
""
|
||||
|
||||
proc getDiscoveryAddresses(node: BeaconNode): Option[seq[string]] =
|
||||
let restr = node.network.enrRecord().toTypedRecord()
|
||||
if restr.isErr():
|
||||
return none[seq[string]]()
|
||||
let respa = restr.get().toPeerAddr(udpProtocol)
|
||||
if respa.isErr():
|
||||
return none[seq[string]]()
|
||||
let pa = respa.get()
|
||||
let mpa = MultiAddress.init(multicodec("p2p"), pa.peerId)
|
||||
if mpa.isErr():
|
||||
return none[seq[string]]()
|
||||
var addresses = newSeqOfCap[string](len(pa.addrs))
|
||||
for item in pa.addrs:
|
||||
let resa = concat(item, mpa.get())
|
||||
if resa.isOk():
|
||||
addresses.add($(resa.get()))
|
||||
return some(addresses)
|
||||
|
||||
proc getP2PAddresses(node: BeaconNode): Option[seq[string]] =
|
||||
let pinfo = node.network.switch.peerInfo
|
||||
let mpa = MultiAddress.init(multicodec("p2p"), pinfo.peerId)
|
||||
if mpa.isErr():
|
||||
return none[seq[string]]()
|
||||
var addresses = newSeqOfCap[string](len(pinfo.addrs))
|
||||
for item in pinfo.addrs:
|
||||
let resa = concat(item, mpa.get())
|
||||
if resa.isOk():
|
||||
addresses.add($(resa.get()))
|
||||
return some(addresses)
|
||||
|
||||
proc installNodeApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
|
||||
rpcServer.rpc("get_v1_node_identity") do () -> NodeIdentityTuple:
|
||||
let discoveryAddresses =
|
||||
block:
|
||||
let res = node.getDiscoveryAddresses()
|
||||
if res.isSome():
|
||||
res.get()
|
||||
else:
|
||||
newSeq[string](0)
|
||||
|
||||
let p2pAddresses =
|
||||
block:
|
||||
let res = node.getP2PAddresses()
|
||||
if res.isSome():
|
||||
res.get()
|
||||
else:
|
||||
newSeq[string]()
|
||||
|
||||
return (
|
||||
peer_id: node.network.peerId(),
|
||||
enr: node.network.enrRecord(),
|
||||
# TODO rest of fields
|
||||
p2p_addresses: newSeq[MultiAddress](0),
|
||||
discovery_addresses: newSeq[MultiAddress](0),
|
||||
metadata: (0'u64, "")
|
||||
peer_id: $node.network.peerId(),
|
||||
enr: node.network.enrRecord().toUri(),
|
||||
p2p_addresses: p2pAddresses,
|
||||
discovery_addresses: discoveryAddresses,
|
||||
metadata: (node.network.metadata.seq_number,
|
||||
"0x" & ncrutils.toHex(node.network.metadata.attnets.bytes))
|
||||
)
|
||||
|
||||
rpcServer.rpc("get_v1_node_peers") do () -> JsonNode:
|
||||
unimplemented()
|
||||
rpcServer.rpc("get_v1_node_peers") do (state: Option[seq[string]],
|
||||
direction: Option[seq[string]]) -> seq[NodePeerTuple]:
|
||||
var res = newSeq[NodePeerTuple]()
|
||||
let rstates = validateState(state)
|
||||
if rstates.isNone():
|
||||
raise newException(CatchableError, "Incorrect state parameter")
|
||||
let rdirs = validateDirection(direction)
|
||||
if rdirs.isNone():
|
||||
raise newException(CatchableError, "Incorrect direction parameter")
|
||||
let states = rstates.get()
|
||||
let dirs = rdirs.get()
|
||||
for item in node.network.peers.values():
|
||||
if (item.connectionState in states) and (item.direction in dirs):
|
||||
let peer = (
|
||||
peer_id: $item.info.peerId,
|
||||
enr: if item.enr.isSome(): item.enr.get().toUri() else: "",
|
||||
last_seen_p2p_address: item.info.getLastSeenAddress(),
|
||||
state: item.connectionState.toString(),
|
||||
direction: item.direction.toString(),
|
||||
agent: item.info.agentVersion, # Fields `agent` and `proto` are not
|
||||
proto: item.info.protoVersion # part of specification.
|
||||
)
|
||||
res.add(peer)
|
||||
return res
|
||||
|
||||
rpcServer.rpc("get_v1_node_peers_peerId") do () -> JsonNode:
|
||||
unimplemented()
|
||||
rpcServer.rpc("get_v1_node_peer_count") do () -> NodePeerCountTuple:
|
||||
var res: NodePeerCountTuple
|
||||
for item in node.network.peers.values():
|
||||
case item.connectionState
|
||||
of Connecting:
|
||||
inc(res.connecting)
|
||||
of Connected:
|
||||
inc(res.connected)
|
||||
of Disconnecting:
|
||||
inc(res.disconnecting)
|
||||
of Disconnected:
|
||||
inc(res.disconnected)
|
||||
of ConnectionState.None:
|
||||
discard
|
||||
return res
|
||||
|
||||
rpcServer.rpc("get_v1_node_peers_peerId") do (
|
||||
peer_id: string) -> NodePeerTuple:
|
||||
let pres = PeerID.init(peer_id)
|
||||
if pres.isErr():
|
||||
raise newException(CatchableError,
|
||||
"The peer ID supplied could not be parsed")
|
||||
let pid = pres.get()
|
||||
let peer = node.network.peers.getOrDefault(pid)
|
||||
if isNil(peer):
|
||||
raise newException(CatchableError, "Peer not found")
|
||||
|
||||
return (
|
||||
peer_id: $peer.info.peerId,
|
||||
enr: if peer.enr.isSome(): peer.enr.get().toUri() else: "",
|
||||
last_seen_p2p_address: peer.info.getLastSeenAddress(),
|
||||
state: peer.connectionState.toString(),
|
||||
direction: peer.direction.toString(),
|
||||
agent: peer.info.agentVersion, # Fields `agent` and `proto` are not part
|
||||
proto: peer.info.protoVersion # of specification
|
||||
)
|
||||
|
||||
rpcServer.rpc("get_v1_node_version") do () -> JsonNode:
|
||||
return %{
|
||||
"version": "Nimbus/" & fullVersionStr
|
||||
}
|
||||
return %*{"version": "Nimbus/" & fullVersionStr}
|
||||
|
||||
rpcServer.rpc("get_v1_node_syncing") do () -> JsonNode:
|
||||
unimplemented()
|
||||
rpcServer.rpc("get_v1_node_syncing") do () -> SyncInfo:
|
||||
return node.syncManager.getInfo()
|
||||
|
||||
rpcServer.rpc("get_v1_node_health") do () -> JsonNode:
|
||||
unimplemented()
|
||||
# TODO: There currently no way to situation when we node has issues, so
|
||||
# its impossible to return HTTP ERROR 503 according to specification.
|
||||
if node.syncManager.inProgress:
|
||||
# We need to return HTTP ERROR 206 according to specification
|
||||
return %*{"health": 206}
|
||||
else:
|
||||
return %*{"health": 200}
|
||||
|
|
|
@ -5,9 +5,7 @@ import
|
|||
# TODO for some reason "../[datatypes, digest, crypto]" results in "Error: cannot open file"
|
||||
../datatypes,
|
||||
../digest,
|
||||
../crypto,
|
||||
libp2p/[peerid, multiaddress],
|
||||
eth/p2p/discoveryv5/enr
|
||||
../crypto
|
||||
|
||||
type
|
||||
AttesterDuties* = tuple
|
||||
|
@ -46,8 +44,23 @@ type
|
|||
header: SignedBeaconBlockHeader
|
||||
|
||||
NodeIdentityTuple* = tuple
|
||||
peer_id: PeerID
|
||||
enr: Record
|
||||
p2p_addresses: seq[MultiAddress]
|
||||
discovery_addresses: seq[MultiAddress]
|
||||
peer_id: string
|
||||
enr: string
|
||||
p2p_addresses: seq[string]
|
||||
discovery_addresses: seq[string]
|
||||
metadata: tuple[seq_number: uint64, attnets: string]
|
||||
|
||||
NodePeerTuple* = tuple
|
||||
peer_id: string
|
||||
enr: string
|
||||
last_seen_p2p_address: string
|
||||
state: string
|
||||
direction: string
|
||||
agent: string # This is not part of specification
|
||||
proto: string # This is not part of specification
|
||||
|
||||
NodePeerCountTuple* = tuple
|
||||
disconnected: int
|
||||
connecting: int
|
||||
connected: int
|
||||
disconnecting: int
|
||||
|
|
|
@ -129,6 +129,10 @@ type
|
|||
peer*: T
|
||||
stamp*: chronos.Moment
|
||||
|
||||
SyncInfo* = object
|
||||
head_slot*: Slot
|
||||
sync_distance*: int64
|
||||
|
||||
SyncManagerError* = object of CatchableError
|
||||
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
|
||||
|
||||
|
@ -1177,3 +1181,9 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
proc start*[A, B](man: SyncManager[A, B]) =
|
||||
## Starts SyncManager's main loop.
|
||||
man.syncFut = man.syncLoop()
|
||||
|
||||
proc getInfo*[A, B](man: SyncManager[A, B]): SyncInfo =
|
||||
## Returns current synchronization information for RPC call.
|
||||
let wallSlot = man.getLocalWallSlot()
|
||||
let headSlot = man.getLocalHeadSlot()
|
||||
SyncInfo(head_slot: headSlot, sync_distance: int64(wallSlot - headSlot))
|
||||
|
|
|
@ -239,6 +239,10 @@ proc handleStatus(peer: Peer,
|
|||
await peer.disconnect(IrrelevantNetwork)
|
||||
else:
|
||||
peer.setStatusMsg(theirStatus)
|
||||
if peer.connectionState == Connecting:
|
||||
# As soon as we get here it means that we passed handshake succesfully. So
|
||||
# we can add this peer to PeerPool.
|
||||
await peer.handlePeer()
|
||||
|
||||
proc initBeaconSync*(network: Eth2Node, chainDag: ChainDAGRef,
|
||||
forkDigest: ForkDigest) =
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit ac9b3e304f630a450efc996f47dc9e6133246a87
|
||||
Subproject commit 8709ef9ed5e31e71a1c960237d2eb2c23a2adcd2
|
Loading…
Reference in New Issue