Fix status handling. (#1008)
* Fix status handling. Add log map of received blocks. * Fix review comments. Fix UnusedImport in sync_protocol.nim
This commit is contained in:
parent
010a00963f
commit
da0b1a4993
|
@ -226,8 +226,6 @@ proc safeClose(conn: Connection) {.async.} =
|
|||
if not conn.closed:
|
||||
await close(conn)
|
||||
|
||||
proc handleIncomingPeer*(peer: Peer)
|
||||
|
||||
include eth/p2p/p2p_backends_helpers
|
||||
include eth/p2p/p2p_tracing
|
||||
|
||||
|
@ -580,8 +578,6 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool,
|
|||
|
||||
let peer = peerFromStream(network, conn)
|
||||
|
||||
handleIncomingPeer(peer)
|
||||
|
||||
try:
|
||||
let deadline = sleepAsync RESP_TIMEOUT
|
||||
var msgBytes = await readMsgBytes(conn, false, deadline)
|
||||
|
@ -616,7 +612,7 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool,
|
|||
finally:
|
||||
await safeClose(conn)
|
||||
|
||||
proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
|
||||
proc handleOutgoingPeer*(peer: Peer): Future[bool] {.async.} =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
|
@ -628,20 +624,24 @@ proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
|
|||
peer.updateScore(NewPeerScore)
|
||||
debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
result = true
|
||||
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc handleIncomingPeer*(peer: Peer) =
|
||||
proc handleIncomingPeer*(peer: Peer): Future[bool] {.async.} =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
debug "Peer (incoming) lost", peer = $peer.info
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = network.peerPool.addIncomingPeerNoWait(peer)
|
||||
let res = await network.peerPool.addIncomingPeer(peer)
|
||||
if res:
|
||||
peer.updateScore(NewPeerScore)
|
||||
debug "Peer (incoming) has been added to PeerPool", peer = $peer.info
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
result = true
|
||||
|
||||
libp2p_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc toPeerInfo*(r: enr.TypedRecord): PeerInfo =
|
||||
|
@ -695,8 +695,6 @@ proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
|
|||
inc libp2p_successful_dials
|
||||
debug "Network handshakes completed"
|
||||
|
||||
await handleOutgoingPeer(peer)
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
|
||||
|
@ -1101,4 +1099,3 @@ iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer =
|
|||
shuffle peers
|
||||
if peers.len > maxPeers: peers.setLen(maxPeers)
|
||||
for p in peers: yield p
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils
|
||||
import options, deques, heapqueue, tables, strutils, sequtils
|
||||
import stew/bitseqs, chronos, chronicles
|
||||
import spec/datatypes, spec/digest, peer_pool
|
||||
export datatypes, digest, chronos, chronicles
|
||||
|
@ -66,6 +66,32 @@ type
|
|||
SyncManagerError* = object of CatchableError
|
||||
OptionBeaconBlocks* = Option[seq[SignedBeaconBlock]]
|
||||
|
||||
proc getShortMap*(req: SyncRequest,
|
||||
data: openarray[SignedBeaconBlock]): string =
|
||||
## Returns all slot numbers in ``data`` as placement map.
|
||||
var res = newStringOfCap(req.count)
|
||||
var slider = req.slot
|
||||
var last = 0
|
||||
for i in 0 ..< req.count:
|
||||
if last < len(data):
|
||||
for k in last ..< len(data):
|
||||
if slider == data[k].message.slot:
|
||||
res.add('x')
|
||||
last = k + 1
|
||||
break
|
||||
elif slider < data[k].message.slot:
|
||||
res.add('.')
|
||||
break
|
||||
else:
|
||||
res.add('.')
|
||||
slider = slider + req.step
|
||||
result = res
|
||||
|
||||
proc getFullMap*(req: SyncRequest,
|
||||
data: openarray[SignedBeaconBlock]): string =
|
||||
# Returns all slot numbers in ``data`` as comma-delimeted string.
|
||||
result = mapIt(data, $it.message.slot).join(", ")
|
||||
|
||||
proc init*(t: typedesc[SyncRequest], slot: Slot,
|
||||
count: uint64): SyncRequest {.inline.} =
|
||||
result = SyncRequest(slot: slot, count: count, step: 1'u64)
|
||||
|
@ -344,12 +370,17 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
|
|||
let blocks = await man.getBlocks(peer, req)
|
||||
if blocks.isSome():
|
||||
let data = blocks.get()
|
||||
await man.queue.push(req, data)
|
||||
peer.updateScore(PeerScoreGoodBlocks)
|
||||
let smap = getShortMap(req, data)
|
||||
debug "Received blocks on request", blocks_count = len(data),
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, peer = peer,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
blocks_map = smap, request_slot = req.slot,
|
||||
request_count = req.count, request_step = req.step,
|
||||
peer = peer, peer_score = peer.getScore(), topics = "syncman"
|
||||
await man.queue.push(req, data)
|
||||
debug "Received blocks got accepted", blocks_count = len(data),
|
||||
blocks_map = smap, request_slot = req.slot,
|
||||
request_count = req.count, request_step = req.step,
|
||||
peer = peer, peer_score = peer.getScore(), topics = "syncman"
|
||||
peer.updateScore(PeerScoreGoodBlocks)
|
||||
else:
|
||||
peer.updateScore(PeerScoreNoBlocks)
|
||||
man.queue.push(req)
|
||||
|
|
|
@ -75,10 +75,10 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} =
|
|||
headRoot: headBlock.root,
|
||||
headSlot: headBlock.slot)
|
||||
|
||||
proc handleInitialStatus(peer: Peer,
|
||||
state: BeaconSyncNetworkState,
|
||||
ourStatus: StatusMsg,
|
||||
theirStatus: StatusMsg): Future[bool] {.async, gcsafe.}
|
||||
proc handleStatus(peer: Peer,
|
||||
state: BeaconSyncNetworkState,
|
||||
ourStatus: StatusMsg,
|
||||
theirStatus: StatusMsg): Future[void] {.gcsafe.}
|
||||
|
||||
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.}
|
||||
|
||||
|
@ -96,26 +96,17 @@ p2pProtocol BeaconSync(version = 1,
|
|||
theirStatus = await peer.status(ourStatus, timeout = 60.seconds)
|
||||
|
||||
if theirStatus.isSome:
|
||||
let tstatus = theirStatus.get()
|
||||
let res = await peer.handleInitialStatus(peer.networkState,
|
||||
ourStatus, tstatus)
|
||||
if res:
|
||||
peer.setStatusMsg(tstatus)
|
||||
await peer.handleStatus(peer.networkState,
|
||||
ourStatus, theirStatus.get())
|
||||
else:
|
||||
warn "Status response not received in time"
|
||||
warn "Status response not received in time", peer = peer
|
||||
|
||||
requestResponse:
|
||||
proc status(peer: Peer, theirStatus: StatusMsg) {.libp2pProtocol("status", 1).} =
|
||||
let
|
||||
ourStatus = peer.networkState.getCurrentStatus()
|
||||
|
||||
if not await peer.handleInitialStatus(
|
||||
peer.networkState, ourStatus, theirStatus):
|
||||
return
|
||||
peer.setStatusMsg(theirStatus)
|
||||
|
||||
trace "Sending status msg", ourStatus
|
||||
let ourStatus = peer.networkState.getCurrentStatus()
|
||||
trace "Sending status message", peer = peer, status = ourStatus
|
||||
await response.send(ourStatus)
|
||||
await peer.handleStatus(peer.networkState, ourStatus, theirStatus)
|
||||
|
||||
proc statusResp(peer: Peer, msg: StatusMsg)
|
||||
|
||||
|
@ -214,16 +205,30 @@ proc getHeadSlot*(peer: Peer): Slot {.inline.} =
|
|||
## Returns head slot for specific peer ``peer``.
|
||||
result = peer.state(BeaconSync).statusMsg.headSlot
|
||||
|
||||
proc handleInitialStatus(peer: Peer,
|
||||
state: BeaconSyncNetworkState,
|
||||
ourStatus: StatusMsg,
|
||||
theirStatus: StatusMsg): Future[bool] {.async, gcsafe.} =
|
||||
proc handleStatus(peer: Peer,
|
||||
state: BeaconSyncNetworkState,
|
||||
ourStatus: StatusMsg,
|
||||
theirStatus: StatusMsg) {.async, gcsafe.} =
|
||||
if theirStatus.forkDigest != state.forkDigest:
|
||||
notice "Irrelevant peer", peer, theirStatus, ourStatus
|
||||
await peer.disconnect(IrrelevantNetwork)
|
||||
return false
|
||||
debug "Peer connected", peer, theirStatus, ourStatus
|
||||
return true
|
||||
else:
|
||||
if not peer.state(BeaconSync).initialStatusReceived:
|
||||
# Initial/handshake status message handling
|
||||
peer.state(BeaconSync).initialStatusReceived = true
|
||||
debug "Peer connected", peer, ourStatus = shortLog(ourStatus),
|
||||
theirStatus = shortLog(theirStatus)
|
||||
var res: bool
|
||||
if peer.wasDialed:
|
||||
res = await handleOutgoingPeer(peer)
|
||||
else:
|
||||
res = await handleIncomingPeer(peer)
|
||||
|
||||
if not res:
|
||||
debug "Peer is dead or already in pool", peer
|
||||
await peer.disconnect(ClientShutDown)
|
||||
|
||||
peer.setStatusMsg(theirStatus)
|
||||
|
||||
proc initBeaconSync*(network: Eth2Node,
|
||||
blockPool: BlockPool,
|
||||
|
|
Loading…
Reference in New Issue