From 830f1255c4487fbb59943f076bc0bc987a916f38 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Thu, 26 Nov 2020 21:48:07 +0200 Subject: [PATCH] Fix PeerPool to use getFuture() only once when adding peer. Add isAlive(peer) procedure. Add join(peer) procedure. Remove useless metric. Add notifyAndWait() procedure which will help to avoid race while disconnecting. --- beacon_chain/networking/eth2_network.nim | 52 ++++++++++++++++++++---- beacon_chain/networking/peer_pool.nim | 7 ++-- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index e70404f7c..3a339261b 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -356,6 +356,9 @@ proc getFuture*(peer: Peer): Future[void] {.inline.} = peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") peer.disconnectedFut +proc isAlive*(peer: Peer): bool = + peer.connectionState notin {Disconnecting, Disconnected} + proc getScore*(a: Peer): int = ## Returns current score value for peer ``peer``. a.score @@ -366,6 +369,41 @@ proc updateScore*(peer: Peer, score: int) {.inline.} = if peer.score > PeerScoreHighLimit: peer.score = PeerScoreHighLimit +proc join*(peer: Peer): Future[void] = + var retFuture = newFuture[void]("peer.lifetime.join") + let peerFut = peer.getFuture() + let alreadyFinished = peerFut.finished() + + proc continuation(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe.} = + if not(alreadyFinished): + peerFut.removeCallback(continuation) + + if alreadyFinished: + # All the `peer.disconnectedFut` callbacks are already scheduled in current + # `poll()` call, to avoid race we going to finish only in next `poll()` + # call. + callSoon(continuation, cast[pointer](retFuture)) + else: + # `peer.disconnectedFut` is not yet finished, but we want to be scheduled + # after all callbacks. + peerFut.addCallback(continuation) + + return retFuture + +proc notifyAndWait*(peer: Peer): Future[void] = + ## Notify all the waiters that peer life is finished and wait until all + ## callbacks will be processed. + let joinFut = peer.join() + let fut = peer.disconnectedFut + peer.connectionState = Disconnecting + fut.complete() + peer.disconnectedFut = nil + joinFut + proc calcThroughput(dur: Duration, value: uint64): float = let secs = float(chronos.seconds(1).nanoseconds) if isZero(dur): @@ -1183,8 +1221,6 @@ proc resolvePeer(peer: Peer) = discard peer.peerId.extractPublicKey(key) keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() - debug "Peer's ENR recovery task started", node_id = $nodeId - # This is "fast-path" for peers which was dialed. In this case discovery # already has most recent ENR information about this peer. let gnode = peer.network.discovery.getNode(nodeId) @@ -1194,6 +1230,9 @@ proc resolvePeer(peer: Peer) = let delay = now(chronos.Moment) - startTime nbc_resolve_time.observe(delay.toFloatSeconds()) debug "Peer's ENR recovered", delay + else: + inc(nbc_failed_discoveries) + debug "Peer's ENR could not be recovered" proc handlePeer*(peer: Peer) {.async.} = let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) @@ -1222,8 +1261,7 @@ proc handlePeer*(peer: Peer) {.async.} = # Peer was added to PeerPool. peer.score = NewPeerScore peer.connectionState = Connected - # We spawn task which will obtain ENR for this peer. - resolvePeer(peer) + peer.resolvePeer() debug "Peer successfully connected", peer = peer, connections = peer.connections @@ -1290,10 +1328,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} = # Whatever caused disconnection, avoid connection spamming node.addSeen(peerId, SeenTableTimeReconnect) - let fut = peer.disconnectedFut - if not(isNil(fut)): - fut.complete() - peer.disconnectedFut = nil + if not(isNil(peer.disconnectedFut)): + await peer.notifyAndWait() else: # TODO (cheatfate): This could be removed when bug will be fixed inside # `nim-libp2p`. diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index bb6097cd6..eef85ca30 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -349,6 +349,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, peerType: PeerType) = + mixin getFuture proc onPeerClosed(udata: pointer) {.gcsafe, raises: [Defect].} = discard pool.deletePeer(peer) @@ -377,13 +378,13 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} = ## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError) ## ## If peer could be added to PeerPool procedure returns (PeerStatus.Success) - mixin getKey, getFuture + mixin getKey, isAlive if not(pool.checkPeerScore(peer)): PeerStatus.LowScoreError else: let peerKey = getKey(peer) if not(pool.registry.hasKey(peerKey)): - if not(peer.getFuture().finished): + if peer.isAlive(): PeerStatus.Success else: PeerStatus.DeadPeerError @@ -403,7 +404,7 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B], ## (PeerStatus.NoSpaceError) ## ## Procedure returns (PeerStatus.Success) on success. - mixin getKey, getFuture + mixin getKey let res = pool.checkPeer(peer) if res != PeerStatus.Success: res