diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 3a339261b..f4ceab67c 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -372,17 +372,16 @@ proc updateScore*(peer: Peer, score: int) {.inline.} = proc join*(peer: Peer): Future[void] = var retFuture = newFuture[void]("peer.lifetime.join") let peerFut = peer.getFuture() - let alreadyFinished = peerFut.finished() proc continuation(udata: pointer) {.gcsafe.} = if not(retFuture.finished()): retFuture.complete() proc cancellation(udata: pointer) {.gcsafe.} = - if not(alreadyFinished): + if not(isNil(peerFut)): peerFut.removeCallback(continuation) - if alreadyFinished: + if peerFut.finished(): # All the `peer.disconnectedFut` callbacks are already scheduled in current # `poll()` call, to avoid race we going to finish only in next `poll()` # call. @@ -391,18 +390,21 @@ proc join*(peer: Peer): Future[void] = # `peer.disconnectedFut` is not yet finished, but we want to be scheduled # after all callbacks. peerFut.addCallback(continuation) + retFuture.cancelCallback = cancellation - return retFuture + retFuture -proc notifyAndWait*(peer: Peer): Future[void] = +proc notifyAndWait*(network: ETh2Node, peer: Peer): Future[void] = ## Notify all the waiters that peer life is finished and wait until all ## callbacks will be processed. - let joinFut = peer.join() - let fut = peer.disconnectedFut + let + joinFut = peer.join() + poolFut = network.peerPool.joinPeer(peer) + discFut = peer.disconnectedFut peer.connectionState = Disconnecting - fut.complete() + discFut.complete() peer.disconnectedFut = nil - joinFut + allFutures(joinFut, poolFut) proc calcThroughput(dur: Duration, value: uint64): float = let secs = float(chronos.seconds(1).nanoseconds) @@ -1329,7 +1331,7 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} = node.addSeen(peerId, SeenTableTimeReconnect) if not(isNil(peer.disconnectedFut)): - await peer.notifyAndWait() + await node.notifyAndWait(peer) else: # TODO (cheatfate): This could be removed when bug will be fixed inside # `nim-libp2p`. diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index eef85ca30..c31c78f94 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -34,6 +34,7 @@ type peerType: PeerType flags: set[PeerFlags] index: int + future: Future[void] PeerIndex = object data: int @@ -311,6 +312,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = dec(pool.curOutPeersCount) dec(pool.acqOutPeersCount) + let fut = item[].future # Indicate that we have an empty space pool.fireNotFullEvent(item[]) # Cleanup storage with default item, and removing key from hashtable. @@ -318,6 +320,8 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = pool.registry.del(key) pool.peerDeleted(peer) pool.peerCountChanged() + # Indicate that peer was deleted + fut.complete() else: if item[].peerType == PeerType.Incoming: # If peer is available, then its copy present in heapqueue, so we need @@ -336,6 +340,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = break dec(pool.curOutPeersCount) + let fut = item[].future # Indicate that we have an empty space pool.fireNotFullEvent(item[]) # Cleanup storage with default item, and removing key from hashtable. @@ -343,10 +348,44 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = pool.registry.del(key) pool.peerDeleted(peer) pool.peerCountChanged() + # Indicate that peer was deleted + fut.complete() true else: false +proc joinPeer*[A, B](pool: PeerPool[A, B], peer: A): Future[void] = + ## This procedure will only when peer ``peer`` finally leaves PeerPool + ## ``pool``. + mixin getKey + var retFuture = newFuture[void]("peerpool.joinPeer") + var future: Future[void] + + proc continuation(udata: pointer) {.gcsafe.} = + if not(retFuture.finished()): + retFuture.complete() + + proc cancellation(udata: pointer) {.gcsafe.} = + if not(isNil(future)): + future.removeCallback(continuation) + + let key = getKey(peer) + if pool.registry.hasKey(key): + let pindex = pool.registry[key].data + var item = addr(pool.storage[pindex]) + future = item[].future + # If peer is still in PeerPool, then item[].future should not be finished. + doAssert(not(future.finished())) + future.addCallback(continuation) + retFuture.cancelCallback = cancellation + else: + # If there no such peer in PeerPool anymore, its possible that + # PeerItem.future's callbacks is not yet processed, so we going to complete + # retFuture only in next `poll()` call. + callSoon(continuation, cast[pointer](retFuture)) + + retFuture + proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, peerType: PeerType) = mixin getFuture @@ -354,7 +393,8 @@ proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, discard pool.deletePeer(peer) let item = PeerItem[A](data: peer, peerType: peerType, - index: len(pool.storage)) + index: len(pool.storage), + future: newFuture[void]("peerpool.peer")) pool.storage.add(item) var pitem = addr(pool.storage[^1]) let pindex = PeerIndex(data: item.index, cmp: pool.cmp)