Fix PeerPool to use getFuture() only once when adding peer.

Add isAlive(peer) procedure.
Add join(peer) procedure.
Remove useless metric.
Add notifyAndWait() procedure which will help to avoid race while disconnecting.
This commit is contained in:
cheatfate 2020-11-26 21:48:07 +02:00
parent f19a497eec
commit 830f1255c4
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
2 changed files with 48 additions and 11 deletions

View File

@ -356,6 +356,9 @@ proc getFuture*(peer: Peer): Future[void] {.inline.} =
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut")
peer.disconnectedFut peer.disconnectedFut
proc isAlive*(peer: Peer): bool =
peer.connectionState notin {Disconnecting, Disconnected}
proc getScore*(a: Peer): int = proc getScore*(a: Peer): int =
## Returns current score value for peer ``peer``. ## Returns current score value for peer ``peer``.
a.score a.score
@ -366,6 +369,41 @@ proc updateScore*(peer: Peer, score: int) {.inline.} =
if peer.score > PeerScoreHighLimit: if peer.score > PeerScoreHighLimit:
peer.score = PeerScoreHighLimit peer.score = PeerScoreHighLimit
proc join*(peer: Peer): Future[void] =
var retFuture = newFuture[void]("peer.lifetime.join")
let peerFut = peer.getFuture()
let alreadyFinished = peerFut.finished()
proc continuation(udata: pointer) {.gcsafe.} =
if not(retFuture.finished()):
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe.} =
if not(alreadyFinished):
peerFut.removeCallback(continuation)
if alreadyFinished:
# All the `peer.disconnectedFut` callbacks are already scheduled in current
# `poll()` call, to avoid race we going to finish only in next `poll()`
# call.
callSoon(continuation, cast[pointer](retFuture))
else:
# `peer.disconnectedFut` is not yet finished, but we want to be scheduled
# after all callbacks.
peerFut.addCallback(continuation)
return retFuture
proc notifyAndWait*(peer: Peer): Future[void] =
## Notify all the waiters that peer life is finished and wait until all
## callbacks will be processed.
let joinFut = peer.join()
let fut = peer.disconnectedFut
peer.connectionState = Disconnecting
fut.complete()
peer.disconnectedFut = nil
joinFut
proc calcThroughput(dur: Duration, value: uint64): float = proc calcThroughput(dur: Duration, value: uint64): float =
let secs = float(chronos.seconds(1).nanoseconds) let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur): if isZero(dur):
@ -1183,8 +1221,6 @@ proc resolvePeer(peer: Peer) =
discard peer.peerId.extractPublicKey(key) discard peer.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
debug "Peer's ENR recovery task started", node_id = $nodeId
# This is "fast-path" for peers which was dialed. In this case discovery # This is "fast-path" for peers which was dialed. In this case discovery
# already has most recent ENR information about this peer. # already has most recent ENR information about this peer.
let gnode = peer.network.discovery.getNode(nodeId) let gnode = peer.network.discovery.getNode(nodeId)
@ -1194,6 +1230,9 @@ proc resolvePeer(peer: Peer) =
let delay = now(chronos.Moment) - startTime let delay = now(chronos.Moment) - startTime
nbc_resolve_time.observe(delay.toFloatSeconds()) nbc_resolve_time.observe(delay.toFloatSeconds())
debug "Peer's ENR recovered", delay debug "Peer's ENR recovered", delay
else:
inc(nbc_failed_discoveries)
debug "Peer's ENR could not be recovered"
proc handlePeer*(peer: Peer) {.async.} = proc handlePeer*(peer: Peer) {.async.} =
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
@ -1222,8 +1261,7 @@ proc handlePeer*(peer: Peer) {.async.} =
# Peer was added to PeerPool. # Peer was added to PeerPool.
peer.score = NewPeerScore peer.score = NewPeerScore
peer.connectionState = Connected peer.connectionState = Connected
# We spawn task which will obtain ENR for this peer. peer.resolvePeer()
resolvePeer(peer)
debug "Peer successfully connected", peer = peer, debug "Peer successfully connected", peer = peer,
connections = peer.connections connections = peer.connections
@ -1290,10 +1328,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
# Whatever caused disconnection, avoid connection spamming # Whatever caused disconnection, avoid connection spamming
node.addSeen(peerId, SeenTableTimeReconnect) node.addSeen(peerId, SeenTableTimeReconnect)
let fut = peer.disconnectedFut if not(isNil(peer.disconnectedFut)):
if not(isNil(fut)): await peer.notifyAndWait()
fut.complete()
peer.disconnectedFut = nil
else: else:
# TODO (cheatfate): This could be removed when bug will be fixed inside # TODO (cheatfate): This could be removed when bug will be fixed inside
# `nim-libp2p`. # `nim-libp2p`.

View File

@ -349,6 +349,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
peerType: PeerType) = peerType: PeerType) =
mixin getFuture
proc onPeerClosed(udata: pointer) {.gcsafe, raises: [Defect].} = proc onPeerClosed(udata: pointer) {.gcsafe, raises: [Defect].} =
discard pool.deletePeer(peer) discard pool.deletePeer(peer)
@ -377,13 +378,13 @@ proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} =
## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError) ## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError)
## ##
## If peer could be added to PeerPool procedure returns (PeerStatus.Success) ## If peer could be added to PeerPool procedure returns (PeerStatus.Success)
mixin getKey, getFuture mixin getKey, isAlive
if not(pool.checkPeerScore(peer)): if not(pool.checkPeerScore(peer)):
PeerStatus.LowScoreError PeerStatus.LowScoreError
else: else:
let peerKey = getKey(peer) let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)): if not(pool.registry.hasKey(peerKey)):
if not(peer.getFuture().finished): if peer.isAlive():
PeerStatus.Success PeerStatus.Success
else: else:
PeerStatus.DeadPeerError PeerStatus.DeadPeerError
@ -403,7 +404,7 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
## (PeerStatus.NoSpaceError) ## (PeerStatus.NoSpaceError)
## ##
## Procedure returns (PeerStatus.Success) on success. ## Procedure returns (PeerStatus.Success) on success.
mixin getKey, getFuture mixin getKey
let res = pool.checkPeer(peer) let res = pool.checkPeer(peer)
if res != PeerStatus.Success: if res != PeerStatus.Success:
res res