Peer stuff (#2084)

* Revert "Revert "Full "node" RPC calls implementation and fixes to peer lifetime states. (#2065)" (#2082)"

This reverts commit 7cc3dc8027.

* fix nil disconnectedFut crash

* fixes

don't resetPeer, it causes peer miscounts

* disconnect disconnecting peers

...when there's a race.

* avoid connection spamming

* never decrease SeenTable timeout
* only recover ENR for known peers

* seen only when really disconnected
This commit is contained in:
Jacek Sieka 2020-11-26 20:23:45 +01:00 committed by GitHub
parent cb682c8b9a
commit d16e127daf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 454 additions and 94 deletions

View File

@ -72,7 +72,7 @@ type
connTable: HashSet[PeerID]
forkId: ENRForkID
rng*: ref BrHmacDrbgContext
peers: Table[PeerID, Peer]
peers*: Table[PeerID, Peer]
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
@ -95,12 +95,13 @@ type
discoveryId*: Eth2DiscoveryId
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration
netThroughput: AverageThroughput
score*: int
requestQuota*: float
lastReqTime*: Moment
connections*: int
enr*: Option[enr.Record]
direction*: PeerType
disconnectedFut: Future[void]
PeerAddr* = object
@ -240,6 +241,11 @@ const
## Period of time for `FaultOnError` error reason.
SeenTablePenaltyError* = 60.minutes
## Period of time for peers which score below or equal to zero.
SeenTableTimeReconnect* = 1.minutes
## Minimal time between disconnection and reconnection attempt
ResolvePeerTimeout* = 1.minutes
## Maximum time allowed for peer resolve process.
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
err(type(result), Eth2NetworkingError(kind: kindParam))
@ -263,6 +269,18 @@ declarePublicCounter nbc_timeout_dials,
declarePublicGauge nbc_peers,
"Number of active libp2p peers"
declarePublicCounter nbc_successful_discoveries,
"Number of successfull discoveries"
declarePublicCounter nbc_failed_discoveries,
"Number of failed discoveries"
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
declareHistogram nbc_resolve_time,
"Time(s) used while resolving peer information",
buckets = delayBuckets
const
snappy_implementation {.strdefine.} = "libp2p"
@ -326,8 +344,8 @@ proc getKey*(peer: Peer): PeerID {.inline.} =
peer.info.peerId
proc getFuture*(peer: Peer): Future[void] {.inline.} =
if peer.disconnectedFut.isNil:
peer.disconnectedFut = newFuture[void]()
if isNil(peer.disconnectedFut):
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut")
peer.disconnectedFut
proc getScore*(a: Peer): int =
@ -419,7 +437,11 @@ proc addSeen*(network: ETh2Node, peerId: PeerID,
period: chronos.Duration) =
## Adds peer with PeerID ``peerId`` to SeenTable and timeout ``period``.
let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period)
network.seenTable[peerId] = item
withValue(network.seenTable, peerId, entry) do:
if entry.stamp < item.stamp:
entry.stamp = item.stamp
do:
network.seenTable[peerId] = item
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
@ -439,7 +461,6 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
SeenTablePenaltyError
peer.network.addSeen(peer.info.peerId, seenTime)
await peer.network.switch.disconnect(peer.info.peerId)
peer.connectionState = Disconnected
except CatchableError:
# We do not care about exceptions in disconnection procedure.
trace "Exception while disconnecting peer", peer = peer.info.peerId,
@ -639,6 +660,22 @@ proc handleIncomingStream(network: Eth2Node,
let peer = peerFromStream(network, conn)
try:
case peer.connectionState
of Disconnecting, Disconnected, None:
# We got incoming stream request while disconnected or disconnecting.
warn "Got incoming request from disconnected peer", peer = peer,
message = msgName
await conn.closeWithEOF()
return
of Connecting:
# We got incoming stream request while handshake is not yet finished,
# TODO: We could check it here.
debug "Got incoming request from peer while in handshake", peer = peer,
msgName
of Connected:
# We got incoming stream from peer with proper connection state.
debug "Got incoming request from peer", peer = peer, msgName
template returnInvalidRequest(msg: ErrorMsg) =
peer.updateScore(PeerScoreInvalidRequest)
await sendErrorResponse(peer, conn, InvalidRequest, msg)
@ -721,8 +758,10 @@ proc handleIncomingStream(network: Eth2Node,
await conn.closeWithEOF()
discard network.peerPool.checkPeerScore(peer)
proc toPeerAddr*(r: enr.TypedRecord):
Result[PeerAddr, cstring] {.raises: [Defect].} =
proc toPeerAddr*(r: enr.TypedRecord,
proto: IpTransportProtocol): Result[PeerAddr, cstring] {.
raises: [Defect].} =
if not r.secp256k1.isSome:
return err("enr: no secp256k1 key in record")
@ -733,18 +772,34 @@ proc toPeerAddr*(r: enr.TypedRecord):
var addrs = newSeq[MultiAddress]()
if r.ip.isSome and r.tcp.isSome:
let ip = ipv4(r.ip.get)
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.tcp6.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
elif r.tcp.isSome:
case proto
of tcpProtocol:
if r.ip.isSome and r.tcp.isSome:
let ip = ipv4(r.ip.get)
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
else:
discard
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.tcp6.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
elif r.tcp.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
else:
discard
of udpProtocol:
if r.ip.isSome and r.udp.isSome:
let ip = ipv4(r.ip.get)
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get)
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.udp6.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp6.get)
elif r.udp.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get)
else:
discard
if addrs.len == 0:
return err("enr: no addresses in record")
@ -808,7 +863,7 @@ proc connectWorker(node: Eth2Node, index: int) {.async.} =
proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} =
let nodeRecord = ? node.record.toTypedRecord()
let peerAddr = ? nodeRecord.toPeerAddr()
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr)
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
@ -872,13 +927,71 @@ proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata =
else:
result = Json.loadFile(metadataPath, Eth2Metadata)
proc resolvePeer(peer: Peer) =
# Resolve task which performs searching of peer's public key and recovery of
# ENR using discovery5. We only resolve ENR for peers we know about to avoid
# querying the network - as of now, the ENR is not needed, except for
# debuggging
logScope: peer = peer.info.peerId
let startTime = now(chronos.Moment)
let nodeId =
block:
var key: PublicKey
# `secp256k1` keys are always stored inside PeerID.
discard peer.info.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
debug "Peer's ENR recovery task started", node_id = $nodeId
# This is "fast-path" for peers which was dialed. In this case discovery
# already has most recent ENR information about this peer.
let gnode = peer.network.discovery.getNode(nodeId)
if gnode.isSome():
peer.enr = some(gnode.get().record)
inc(nbc_successful_discoveries)
let delay = now(chronos.Moment) - startTime
nbc_resolve_time.observe(delay.toFloatSeconds())
debug "Peer's ENR recovered", delay = $delay
proc handlePeer*(peer: Peer) {.async.} =
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
case res:
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
# Peer has low score or we do not have enough space in PeerPool,
# we are going to disconnect it gracefully.
# Peer' state will be updated in connection event.
debug "Peer has low score or there no space in PeerPool",
peer = peer, reason = res
await peer.disconnect(FaultOrError)
of PeerStatus.DeadPeerError:
# Peer's lifetime future is finished, so its already dead,
# we do not need to perform gracefull disconect.
# Peer's state will be updated in connection event.
discard
of PeerStatus.DuplicateError:
# Peer is already present in PeerPool, we can't perform disconnect,
# because in such case we could kill both connections (connection
# which is present in PeerPool and new one).
# This is possible bug, because we could enter here only if number
# of `peer.connections == 1`, it means that Peer's lifetime is not
# tracked properly and we still not received `Disconnected` event.
debug "Peer is already present in PeerPool", peer = peer
of PeerStatus.Success:
# Peer was added to PeerPool.
peer.score = NewPeerScore
peer.connectionState = Connected
# We spawn task which will obtain ENR for this peer.
resolvePeer(peer)
debug "Peer successfully connected", peer = peer,
connections = peer.connections
proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
let peer = node.getPeer(peerId)
case event.kind
of ConnEventKind.Connected:
inc peer.connections
debug "Peer upgraded", peer = $peerId, connections = peer.connections
debug "Peer connection upgraded", peer = $peerId,
connections = peer.connections
if peer.connections == 1:
# Libp2p may connect multiple times to the same peer - using different
# transports for both incoming and outgoing. For now, we'll count our
@ -889,45 +1002,62 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
# * For peer limits, we might miscount the incoming vs outgoing quota
# * Protocol handshakes are wonky: we'll not necessarily use the newly
# connected transport - instead we'll just pick a random one!
case peer.connectionState
of Disconnecting:
# We got connection with peer which we currently disconnecting.
# Normally this does not happen, but if a peer is being disconnected
# while a concurrent (incoming for example) connection attempt happens,
# we might end up here
debug "Got connection attempt from peer that we are disconnecting",
peer = peerId
await node.switch.disconnect(peerId)
return
of None:
# We have established a connection with the new peer.
peer.connectionState = Connecting
of Disconnected:
# We have established a connection with the peer that we have seen
# before - reusing the existing peer object is fine
peer.connectionState = Connecting
peer.score = 0 # Will be set to NewPeerScore after handshake
of Connecting, Connected:
# This means that we got notification event from peer which we already
# connected or connecting right now. If this situation will happened,
# it means bug on `nim-libp2p` side.
warn "Got connection attempt from peer which we already connected",
peer = peerId
await peer.disconnect(FaultOrError)
return
# Store connection direction inside Peer object.
if event.incoming:
peer.direction = PeerType.Incoming
else:
peer.direction = PeerType.Outgoing
await performProtocolHandshakes(peer, event.incoming)
# While performing the handshake, the peer might have been disconnected -
# there's still a slim chance of a race condition here if a reconnect
# happens quickly
if peer.connections == 1:
let res =
if event.incoming:
node.peerPool.addPeerNoWait(peer, PeerType.Incoming)
else:
node.peerPool.addPeerNoWait(peer, PeerType.Outgoing)
case res:
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
# Peer has low score or we do not have enough space in PeerPool,
# we are going to disconnect it gracefully.
await peer.disconnect(FaultOrError)
of PeerStatus.DeadPeerError:
# Peer's lifetime future is finished, so its already dead,
# we do not need to perform gracefull disconect.
discard
of PeerStatus.DuplicateError:
# Peer is already present in PeerPool, we can't perform disconnect,
# because in such case we could kill both connections (connection
# which is present in PeerPool and new one).
discard
of PeerStatus.Success:
# Peer was added to PeerPool.
discard
of ConnEventKind.Disconnected:
dec peer.connections
debug "Peer disconnected", peer = $peerId, connections = peer.connections
debug "Lost connection to peer", peer = peerId,
connections = peer.connections
if peer.connections == 0:
debug "Peer disconnected", peer = $peerId, connections = peer.connections
# Whatever caused disconnection, avoid connection spamming
node.addSeen(peerId, SeenTableTimeReconnect)
let fut = peer.disconnectedFut
if fut != nil:
peer.disconnectedFut = nil
if not(isNil(fut)):
fut.complete()
peer.disconnectedFut = nil
else:
# TODO (cheatfate): This could be removed when bug will be fixed inside
# `nim-libp2p`.
debug "Got new event while peer is already disconnected",
peer = peerId, peer_state = peer.connectionState
peer.connectionState = Disconnected
proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
switch: Switch, pubsub: PubSub, ip: Option[ValidIpAddress],
@ -1017,7 +1147,7 @@ proc start*(node: Eth2Node) {.async.} =
for enr in node.discovery.bootstrapRecords:
let tr = enr.toTypedRecord()
if tr.isOk():
let pa = tr.get().toPeerAddr()
let pa = tr.get().toPeerAddr(tcpProtocol)
if pa.isOk():
await node.connQueue.addLast(pa.get())
@ -1036,17 +1166,18 @@ proc stop*(node: Eth2Node) {.async.} =
futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
new result
result.info = info
result.network = network
result.connectionState = Connected
result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config
result.lastReqTime = now(chronos.Moment)
newSeq result.protocolStates, allProtocols.len
for i in 0 ..< allProtocols.len:
let res = Peer(
info: info,
network: network,
connectionState: ConnectionState.None,
lastReqTime: now(chronos.Moment),
protocolStates: newSeq[RootRef](len(allProtocols))
)
for i in 0 ..< len(allProtocols):
let proto = allProtocols[i]
if proto.peerStateInitializer != nil:
result.protocolStates[i] = proto.peerStateInitializer(result)
if not(isNil(proto.peerStateInitializer)):
res.protocolStates[i] = proto.peerStateInitializer(res)
res
proc registerMsg(protocol: ProtocolInfo,
name: string,
@ -1393,7 +1524,7 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
rng = rng)
let
params =
params =
block:
var p = GossipSubParams.init()
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.0/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
@ -1411,9 +1542,9 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
pubsub = GossipSub.init(
switch = switch,
msgIdProvider = msgIdProvider,
triggerSelf = true,
triggerSelf = true,
sign = false,
verifySignature = false,
verifySignature = false,
anonymize = true,
parameters = params).PubSub

View File

@ -1,8 +1,10 @@
import
import std/options,
chronicles,
json_rpc/[rpcserver, jsonmarshal],
../beacon_node_common, ../eth2_network,
eth/p2p/discoveryv5/enr,
libp2p/[multiaddress, multicodec],
nimcrypto/utils as ncrutils,
../beacon_node_common, ../eth2_network, ../sync_manager,
../peer_pool, ../version,
../spec/[datatypes, digest, presets],
../spec/eth2_apis/callsigs_types
@ -15,30 +17,230 @@ type
template unimplemented() =
raise (ref CatchableError)(msg: "Unimplemented")
proc validateState(state: Option[seq[string]]): Option[set[ConnectionState]] =
var res: set[ConnectionState]
if state.isSome():
let states = state.get()
for item in states:
case item
of "disconnected":
if ConnectionState.Disconnected notin res:
res.incl(ConnectionState.Disconnected)
else:
# `state` values should be unique
return none(set[ConnectionState])
of "connecting":
if ConnectionState.Disconnected notin res:
res.incl(ConnectionState.Connecting)
else:
# `state` values should be unique
return none(set[ConnectionState])
of "connected":
if ConnectionState.Connected notin res:
res.incl(ConnectionState.Connected)
else:
# `state` values should be unique
return none(set[ConnectionState])
of "disconnecting":
if ConnectionState.Disconnecting notin res:
res.incl(ConnectionState.Disconnecting)
else:
# `state` values should be unique
return none(set[ConnectionState])
else:
# Found incorrect `state` string value
return none(set[ConnectionState])
if res == {}:
res = {ConnectionState.Connecting, ConnectionState.Connected,
ConnectionState.Disconnecting, ConnectionState.Disconnected}
some(res)
proc validateDirection(direction: Option[seq[string]]): Option[set[PeerType]] =
var res: set[PeerType]
if direction.isSome():
let directions = direction.get()
for item in directions:
case item
of "inbound":
if PeerType.Incoming notin res:
res.incl(PeerType.Incoming)
else:
# `direction` values should be unique
return none(set[PeerType])
of "outbound":
if PeerType.Outgoing notin res:
res.incl(PeerType.Outgoing)
else:
# `direction` values should be unique
return none(set[PeerType])
else:
# Found incorrect `direction` string value
return none(set[PeerType])
if res == {}:
res = {PeerType.Incoming, PeerType.Outgoing}
some(res)
proc toString(state: ConnectionState): string =
case state
of ConnectionState.Disconnected:
"disconnected"
of ConnectionState.Connecting:
"connecting"
of ConnectionState.Connected:
"connected"
of ConnectionState.Disconnecting:
"disconnecting"
else:
""
proc toString(direction: PeerType): string =
case direction:
of PeerType.Incoming:
"inbound"
of PeerType.Outgoing:
"outbound"
proc getLastSeenAddress(info: PeerInfo): string =
# TODO (cheatfate): We need to provide filter here, which will be able to
# filter such multiaddresses like `/ip4/0.0.0.0` or local addresses or
# addresses with peer ids.
if len(info.addrs) > 0:
$info.addrs[len(info.addrs) - 1]
else:
""
proc getDiscoveryAddresses(node: BeaconNode): Option[seq[string]] =
let restr = node.network.enrRecord().toTypedRecord()
if restr.isErr():
return none[seq[string]]()
let respa = restr.get().toPeerAddr(udpProtocol)
if respa.isErr():
return none[seq[string]]()
let pa = respa.get()
let mpa = MultiAddress.init(multicodec("p2p"), pa.peerId)
if mpa.isErr():
return none[seq[string]]()
var addresses = newSeqOfCap[string](len(pa.addrs))
for item in pa.addrs:
let resa = concat(item, mpa.get())
if resa.isOk():
addresses.add($(resa.get()))
return some(addresses)
proc getP2PAddresses(node: BeaconNode): Option[seq[string]] =
let pinfo = node.network.switch.peerInfo
let mpa = MultiAddress.init(multicodec("p2p"), pinfo.peerId)
if mpa.isErr():
return none[seq[string]]()
var addresses = newSeqOfCap[string](len(pinfo.addrs))
for item in pinfo.addrs:
let resa = concat(item, mpa.get())
if resa.isOk():
addresses.add($(resa.get()))
return some(addresses)
proc installNodeApiHandlers*(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.rpc("get_v1_node_identity") do () -> NodeIdentityTuple:
let discoveryAddresses =
block:
let res = node.getDiscoveryAddresses()
if res.isSome():
res.get()
else:
newSeq[string](0)
let p2pAddresses =
block:
let res = node.getP2PAddresses()
if res.isSome():
res.get()
else:
newSeq[string]()
return (
peer_id: node.network.peerId(),
enr: node.network.enrRecord(),
# TODO rest of fields
p2p_addresses: newSeq[MultiAddress](0),
discovery_addresses: newSeq[MultiAddress](0),
metadata: (0'u64, "")
peer_id: $node.network.peerId(),
enr: node.network.enrRecord().toUri(),
p2p_addresses: p2pAddresses,
discovery_addresses: discoveryAddresses,
metadata: (node.network.metadata.seq_number,
"0x" & ncrutils.toHex(node.network.metadata.attnets.bytes))
)
rpcServer.rpc("get_v1_node_peers") do () -> JsonNode:
unimplemented()
rpcServer.rpc("get_v1_node_peers") do (state: Option[seq[string]],
direction: Option[seq[string]]) -> seq[NodePeerTuple]:
var res = newSeq[NodePeerTuple]()
let rstates = validateState(state)
if rstates.isNone():
raise newException(CatchableError, "Incorrect state parameter")
let rdirs = validateDirection(direction)
if rdirs.isNone():
raise newException(CatchableError, "Incorrect direction parameter")
let states = rstates.get()
let dirs = rdirs.get()
for item in node.network.peers.values():
if (item.connectionState in states) and (item.direction in dirs):
let peer = (
peer_id: $item.info.peerId,
enr: if item.enr.isSome(): item.enr.get().toUri() else: "",
last_seen_p2p_address: item.info.getLastSeenAddress(),
state: item.connectionState.toString(),
direction: item.direction.toString(),
agent: item.info.agentVersion, # Fields `agent` and `proto` are not
proto: item.info.protoVersion # part of specification.
)
res.add(peer)
return res
rpcServer.rpc("get_v1_node_peers_peerId") do () -> JsonNode:
unimplemented()
rpcServer.rpc("get_v1_node_peer_count") do () -> NodePeerCountTuple:
var res: NodePeerCountTuple
for item in node.network.peers.values():
case item.connectionState
of Connecting:
inc(res.connecting)
of Connected:
inc(res.connected)
of Disconnecting:
inc(res.disconnecting)
of Disconnected:
inc(res.disconnected)
of ConnectionState.None:
discard
return res
rpcServer.rpc("get_v1_node_peers_peerId") do (
peer_id: string) -> NodePeerTuple:
let pres = PeerID.init(peer_id)
if pres.isErr():
raise newException(CatchableError,
"The peer ID supplied could not be parsed")
let pid = pres.get()
let peer = node.network.peers.getOrDefault(pid)
if isNil(peer):
raise newException(CatchableError, "Peer not found")
return (
peer_id: $peer.info.peerId,
enr: if peer.enr.isSome(): peer.enr.get().toUri() else: "",
last_seen_p2p_address: peer.info.getLastSeenAddress(),
state: peer.connectionState.toString(),
direction: peer.direction.toString(),
agent: peer.info.agentVersion, # Fields `agent` and `proto` are not part
proto: peer.info.protoVersion # of specification
)
rpcServer.rpc("get_v1_node_version") do () -> JsonNode:
return %{
"version": "Nimbus/" & fullVersionStr
}
return %*{"version": "Nimbus/" & fullVersionStr}
rpcServer.rpc("get_v1_node_syncing") do () -> JsonNode:
unimplemented()
rpcServer.rpc("get_v1_node_syncing") do () -> SyncInfo:
return node.syncManager.getInfo()
rpcServer.rpc("get_v1_node_health") do () -> JsonNode:
unimplemented()
# TODO: There currently no way to situation when we node has issues, so
# its impossible to return HTTP ERROR 503 according to specification.
if node.syncManager.inProgress:
# We need to return HTTP ERROR 206 according to specification
return %*{"health": 206}
else:
return %*{"health": 200}

View File

@ -5,9 +5,7 @@ import
# TODO for some reason "../[datatypes, digest, crypto]" results in "Error: cannot open file"
../datatypes,
../digest,
../crypto,
libp2p/[peerid, multiaddress],
eth/p2p/discoveryv5/enr
../crypto
type
AttesterDuties* = tuple
@ -46,8 +44,23 @@ type
header: SignedBeaconBlockHeader
NodeIdentityTuple* = tuple
peer_id: PeerID
enr: Record
p2p_addresses: seq[MultiAddress]
discovery_addresses: seq[MultiAddress]
peer_id: string
enr: string
p2p_addresses: seq[string]
discovery_addresses: seq[string]
metadata: tuple[seq_number: uint64, attnets: string]
NodePeerTuple* = tuple
peer_id: string
enr: string
last_seen_p2p_address: string
state: string
direction: string
agent: string # This is not part of specification
proto: string # This is not part of specification
NodePeerCountTuple* = tuple
disconnected: int
connecting: int
connected: int
disconnecting: int

View File

@ -129,6 +129,10 @@ type
peer*: T
stamp*: chronos.Moment
SyncInfo* = object
head_slot*: Slot
sync_distance*: int64
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
@ -1183,3 +1187,9 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
proc start*[A, B](man: SyncManager[A, B]) =
## Starts SyncManager's main loop.
man.syncFut = man.syncLoop()
proc getInfo*[A, B](man: SyncManager[A, B]): SyncInfo =
## Returns current synchronization information for RPC call.
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
SyncInfo(head_slot: headSlot, sync_distance: int64(wallSlot - headSlot))

View File

@ -239,6 +239,10 @@ proc handleStatus(peer: Peer,
await peer.disconnect(IrrelevantNetwork)
else:
peer.setStatusMsg(theirStatus)
if peer.connectionState == Connecting:
# As soon as we get here it means that we passed handshake succesfully. So
# we can add this peer to PeerPool.
await peer.handlePeer()
proc initBeaconSync*(network: Eth2Node, chainDag: ChainDAGRef,
forkDigest: ForkDigest) =

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit ac9b3e304f630a450efc996f47dc9e6133246a87
Subproject commit 8709ef9ed5e31e71a1c960237d2eb2c23a2adcd2