From 47eaaa76963f93c379ee65e6f07f1171b08b9368 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Sun, 21 Jun 2020 19:49:48 +0300 Subject: [PATCH] Fix connection workers race. (#1204) --- beacon_chain/eth2_network.nim | 51 ++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 6df69c205..173847282 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -63,6 +63,7 @@ type connQueue: AsyncQueue[PeerInfo] seenTable: Table[PeerID, SeenItem] connWorkers: seq[Future[void]] + connTable: Table[PeerID, PeerInfo] forkId: ENRForkID EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers @@ -735,30 +736,35 @@ proc connectWorker(network: Eth2Node) {.async.} = let pi = await network.connQueue.popFirst() let r1 = network.peerPool.hasPeer(pi.peerId) let r2 = network.isSeen(pi) + let r3 = network.connTable.hasKey(pi.peerId) - if not(r1) and not(r2): - # We trying to connect to peers which are not present in our PeerPool and - # not present in our SeenTable. - var fut = network.dialPeer(pi) - # We discarding here just because we going to check future state, to avoid - # condition where connection happens and timeout reached. - let res = await withTimeout(fut, network.connectTimeout) - # We handling only timeout and errors, because successfull connections - # will be stored in PeerPool. - if fut.finished(): - if fut.failed() and not(fut.cancelled()): - debug "Unable to establish connection with peer", peer = pi.id, - errMsg = fut.readError().msg - inc nbc_failed_dials - network.addSeen(pi, SeenTableTimeDeadPeer) - continue - debug "Connection to remote peer timed out", peer = pi.id - inc nbc_timeout_dials - network.addSeen(pi, SeenTableTimeTimeout) + if not(r1) and not(r2) and not(r3): + network.connTable[pi.peerId] = pi + try: + # We trying to connect to peers which are not in PeerPool, SeenTable and + # ConnTable. + var fut = network.dialPeer(pi) + # We discarding here just because we going to check future state, to avoid + # condition where connection happens and timeout reached. + let res = await withTimeout(fut, network.connectTimeout) + # We handling only timeout and errors, because successfull connections + # will be stored in PeerPool. + if fut.finished(): + if fut.failed() and not(fut.cancelled()): + debug "Unable to establish connection with peer", peer = pi.id, + errMsg = fut.readError().msg + inc nbc_failed_dials + network.addSeen(pi, SeenTableTimeDeadPeer) + continue + debug "Connection to remote peer timed out", peer = pi.id + inc nbc_timeout_dials + network.addSeen(pi, SeenTableTimeTimeout) + finally: + network.connTable.del(pi.peerId) else: - trace "Peer is already connected or already seen", peer = pi.id, - peer_pool_has_peer = $r1, seen_table_has_peer = $r2, - seen_table_size = len(network.seenTable) + trace "Peer is already connected, connecting or already seen", + peer = pi.id, peer_pool_has_peer = $r1, seen_table_has_peer = $r2, + connecting_peer = $r3, seen_table_size = len(network.seenTable) proc runDiscoveryLoop*(node: Eth2Node) {.async.} = debug "Starting discovery loop" @@ -809,6 +815,7 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID, result.connectTimeout = 10.seconds result.seenThreshold = 10.minutes result.seenTable = initTable[PeerID, SeenItem]() + result.connTable = initTable[PeerID, PeerInfo]() result.connQueue = newAsyncQueue[PeerInfo](ConcurrentConnections) result.metadata = getPersistentNetMetadata(conf) result.forkId = enrForkId