diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 5f761fa21..7b14f9364 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -226,8 +226,6 @@ proc safeClose(conn: Connection) {.async.} = if not conn.closed: await close(conn) -proc handleIncomingPeer*(peer: Peer) - include eth/p2p/p2p_backends_helpers include eth/p2p/p2p_tracing @@ -580,8 +578,6 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool, let peer = peerFromStream(network, conn) - handleIncomingPeer(peer) - try: let deadline = sleepAsync RESP_TIMEOUT var msgBytes = await readMsgBytes(conn, false, deadline) @@ -616,7 +612,7 @@ proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool, finally: await safeClose(conn) -proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = +proc handleOutgoingPeer*(peer: Peer): Future[bool] {.async.} = let network = peer.network proc onPeerClosed(udata: pointer) {.gcsafe.} = @@ -628,20 +624,24 @@ proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = peer.updateScore(NewPeerScore) debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info peer.getFuture().addCallback(onPeerClosed) + result = true + libp2p_peers.set int64(len(network.peerPool)) -proc handleIncomingPeer*(peer: Peer) = +proc handleIncomingPeer*(peer: Peer): Future[bool] {.async.} = let network = peer.network proc onPeerClosed(udata: pointer) {.gcsafe.} = debug "Peer (incoming) lost", peer = $peer.info libp2p_peers.set int64(len(network.peerPool)) - let res = network.peerPool.addIncomingPeerNoWait(peer) + let res = await network.peerPool.addIncomingPeer(peer) if res: peer.updateScore(NewPeerScore) debug "Peer (incoming) has been added to PeerPool", peer = $peer.info peer.getFuture().addCallback(onPeerClosed) + result = true + libp2p_peers.set int64(len(network.peerPool)) proc toPeerInfo*(r: enr.TypedRecord): PeerInfo = @@ -695,8 +695,6 @@ proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} = inc libp2p_successful_dials debug "Network handshakes completed" - await handleOutgoingPeer(peer) - proc runDiscoveryLoop*(node: Eth2Node) {.async.} = debug "Starting discovery loop" @@ -1101,4 +1099,3 @@ iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer = shuffle peers if peers.len > maxPeers: peers.setLen(maxPeers) for p in peers: yield p - diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index cf4d26ee2..e1bde2b9b 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -1,5 +1,5 @@ import chronicles -import options, deques, heapqueue, tables, strutils +import options, deques, heapqueue, tables, strutils, sequtils import stew/bitseqs, chronos, chronicles import spec/datatypes, spec/digest, peer_pool export datatypes, digest, chronos, chronicles @@ -66,6 +66,32 @@ type SyncManagerError* = object of CatchableError OptionBeaconBlocks* = Option[seq[SignedBeaconBlock]] +proc getShortMap*(req: SyncRequest, + data: openarray[SignedBeaconBlock]): string = + ## Returns all slot numbers in ``data`` as placement map. + var res = newStringOfCap(req.count) + var slider = req.slot + var last = 0 + for i in 0 ..< req.count: + if last < len(data): + for k in last ..< len(data): + if slider == data[k].message.slot: + res.add('x') + last = k + 1 + break + elif slider < data[k].message.slot: + res.add('.') + break + else: + res.add('.') + slider = slider + req.step + result = res + +proc getFullMap*(req: SyncRequest, + data: openarray[SignedBeaconBlock]): string = + # Returns all slot numbers in ``data`` as comma-delimeted string. + result = mapIt(data, $it.message.slot).join(", ") + proc init*(t: typedesc[SyncRequest], slot: Slot, count: uint64): SyncRequest {.inline.} = result = SyncRequest(slot: slot, count: count, step: 1'u64) @@ -344,12 +370,17 @@ proc syncWorker*[A, B](man: SyncManager[A, B], let blocks = await man.getBlocks(peer, req) if blocks.isSome(): let data = blocks.get() - await man.queue.push(req, data) - peer.updateScore(PeerScoreGoodBlocks) + let smap = getShortMap(req, data) debug "Received blocks on request", blocks_count = len(data), - request_slot = req.slot, request_count = req.count, - request_step = req.step, peer = peer, - peer_score = peer.getScore(), topics = "syncman" + blocks_map = smap, request_slot = req.slot, + request_count = req.count, request_step = req.step, + peer = peer, peer_score = peer.getScore(), topics = "syncman" + await man.queue.push(req, data) + debug "Received blocks got accepted", blocks_count = len(data), + blocks_map = smap, request_slot = req.slot, + request_count = req.count, request_step = req.step, + peer = peer, peer_score = peer.getScore(), topics = "syncman" + peer.updateScore(PeerScoreGoodBlocks) else: peer.updateScore(PeerScoreNoBlocks) man.queue.push(req) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index fd25b4b67..8fe580f11 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -75,10 +75,10 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} = headRoot: headBlock.root, headSlot: headBlock.slot) -proc handleInitialStatus(peer: Peer, - state: BeaconSyncNetworkState, - ourStatus: StatusMsg, - theirStatus: StatusMsg): Future[bool] {.async, gcsafe.} +proc handleStatus(peer: Peer, + state: BeaconSyncNetworkState, + ourStatus: StatusMsg, + theirStatus: StatusMsg): Future[void] {.gcsafe.} proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.} @@ -96,26 +96,17 @@ p2pProtocol BeaconSync(version = 1, theirStatus = await peer.status(ourStatus, timeout = 60.seconds) if theirStatus.isSome: - let tstatus = theirStatus.get() - let res = await peer.handleInitialStatus(peer.networkState, - ourStatus, tstatus) - if res: - peer.setStatusMsg(tstatus) + await peer.handleStatus(peer.networkState, + ourStatus, theirStatus.get()) else: - warn "Status response not received in time" + warn "Status response not received in time", peer = peer requestResponse: proc status(peer: Peer, theirStatus: StatusMsg) {.libp2pProtocol("status", 1).} = - let - ourStatus = peer.networkState.getCurrentStatus() - - if not await peer.handleInitialStatus( - peer.networkState, ourStatus, theirStatus): - return - peer.setStatusMsg(theirStatus) - - trace "Sending status msg", ourStatus + let ourStatus = peer.networkState.getCurrentStatus() + trace "Sending status message", peer = peer, status = ourStatus await response.send(ourStatus) + await peer.handleStatus(peer.networkState, ourStatus, theirStatus) proc statusResp(peer: Peer, msg: StatusMsg) @@ -214,16 +205,30 @@ proc getHeadSlot*(peer: Peer): Slot {.inline.} = ## Returns head slot for specific peer ``peer``. result = peer.state(BeaconSync).statusMsg.headSlot -proc handleInitialStatus(peer: Peer, - state: BeaconSyncNetworkState, - ourStatus: StatusMsg, - theirStatus: StatusMsg): Future[bool] {.async, gcsafe.} = +proc handleStatus(peer: Peer, + state: BeaconSyncNetworkState, + ourStatus: StatusMsg, + theirStatus: StatusMsg) {.async, gcsafe.} = if theirStatus.forkDigest != state.forkDigest: notice "Irrelevant peer", peer, theirStatus, ourStatus await peer.disconnect(IrrelevantNetwork) - return false - debug "Peer connected", peer, theirStatus, ourStatus - return true + else: + if not peer.state(BeaconSync).initialStatusReceived: + # Initial/handshake status message handling + peer.state(BeaconSync).initialStatusReceived = true + debug "Peer connected", peer, ourStatus = shortLog(ourStatus), + theirStatus = shortLog(theirStatus) + var res: bool + if peer.wasDialed: + res = await handleOutgoingPeer(peer) + else: + res = await handleIncomingPeer(peer) + + if not res: + debug "Peer is dead or already in pool", peer + await peer.disconnect(ClientShutDown) + + peer.setStatusMsg(theirStatus) proc initBeaconSync*(network: Eth2Node, blockPool: BlockPool,