Fix connection workers race. (#1204)
This commit is contained in:
parent
a661ecbae1
commit
47eaaa7696
|
@ -63,6 +63,7 @@ type
|
|||
connQueue: AsyncQueue[PeerInfo]
|
||||
seenTable: Table[PeerID, SeenItem]
|
||||
connWorkers: seq[Future[void]]
|
||||
connTable: Table[PeerID, PeerInfo]
|
||||
forkId: ENRForkID
|
||||
|
||||
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
|
||||
|
@ -735,30 +736,35 @@ proc connectWorker(network: Eth2Node) {.async.} =
|
|||
let pi = await network.connQueue.popFirst()
|
||||
let r1 = network.peerPool.hasPeer(pi.peerId)
|
||||
let r2 = network.isSeen(pi)
|
||||
let r3 = network.connTable.hasKey(pi.peerId)
|
||||
|
||||
if not(r1) and not(r2):
|
||||
# We trying to connect to peers which are not present in our PeerPool and
|
||||
# not present in our SeenTable.
|
||||
var fut = network.dialPeer(pi)
|
||||
# We discarding here just because we going to check future state, to avoid
|
||||
# condition where connection happens and timeout reached.
|
||||
let res = await withTimeout(fut, network.connectTimeout)
|
||||
# We handling only timeout and errors, because successfull connections
|
||||
# will be stored in PeerPool.
|
||||
if fut.finished():
|
||||
if fut.failed() and not(fut.cancelled()):
|
||||
debug "Unable to establish connection with peer", peer = pi.id,
|
||||
errMsg = fut.readError().msg
|
||||
inc nbc_failed_dials
|
||||
network.addSeen(pi, SeenTableTimeDeadPeer)
|
||||
continue
|
||||
debug "Connection to remote peer timed out", peer = pi.id
|
||||
inc nbc_timeout_dials
|
||||
network.addSeen(pi, SeenTableTimeTimeout)
|
||||
if not(r1) and not(r2) and not(r3):
|
||||
network.connTable[pi.peerId] = pi
|
||||
try:
|
||||
# We trying to connect to peers which are not in PeerPool, SeenTable and
|
||||
# ConnTable.
|
||||
var fut = network.dialPeer(pi)
|
||||
# We discarding here just because we going to check future state, to avoid
|
||||
# condition where connection happens and timeout reached.
|
||||
let res = await withTimeout(fut, network.connectTimeout)
|
||||
# We handling only timeout and errors, because successfull connections
|
||||
# will be stored in PeerPool.
|
||||
if fut.finished():
|
||||
if fut.failed() and not(fut.cancelled()):
|
||||
debug "Unable to establish connection with peer", peer = pi.id,
|
||||
errMsg = fut.readError().msg
|
||||
inc nbc_failed_dials
|
||||
network.addSeen(pi, SeenTableTimeDeadPeer)
|
||||
continue
|
||||
debug "Connection to remote peer timed out", peer = pi.id
|
||||
inc nbc_timeout_dials
|
||||
network.addSeen(pi, SeenTableTimeTimeout)
|
||||
finally:
|
||||
network.connTable.del(pi.peerId)
|
||||
else:
|
||||
trace "Peer is already connected or already seen", peer = pi.id,
|
||||
peer_pool_has_peer = $r1, seen_table_has_peer = $r2,
|
||||
seen_table_size = len(network.seenTable)
|
||||
trace "Peer is already connected, connecting or already seen",
|
||||
peer = pi.id, peer_pool_has_peer = $r1, seen_table_has_peer = $r2,
|
||||
connecting_peer = $r3, seen_table_size = len(network.seenTable)
|
||||
|
||||
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
|
||||
debug "Starting discovery loop"
|
||||
|
@ -809,6 +815,7 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
|
|||
result.connectTimeout = 10.seconds
|
||||
result.seenThreshold = 10.minutes
|
||||
result.seenTable = initTable[PeerID, SeenItem]()
|
||||
result.connTable = initTable[PeerID, PeerInfo]()
|
||||
result.connQueue = newAsyncQueue[PeerInfo](ConcurrentConnections)
|
||||
result.metadata = getPersistentNetMetadata(conf)
|
||||
result.forkId = enrForkId
|
||||
|
|
Loading…
Reference in New Issue