Add checks for penalties which are not applied immediately. (#1139)

Change default maxHeadAge value to 1 epoch.
Set zero-point at the SyncQueue's initialization.
Remove annoying logs in runDiscoveryLoop.
This commit is contained in:
Eugene Kabanov 2020-06-07 18:36:24 +03:00 committed by GitHub
parent 53e34c9699
commit 3ce98d5bca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 8 deletions

View File

@ -694,7 +694,7 @@ proc addSeen*(network: ETh2Node, pinfo: PeerInfo) =
proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} =
logScope: peer = $peerInfo
debug "Connecting to peer"
debug "Connecting to discovered peer"
await node.switch.connect(peerInfo)
var peer = node.getPeer(peerInfo)
peer.wasDialed = true
@ -737,7 +737,7 @@ proc connectWorker(network: Eth2Node) {.async.} =
inc libp2p_timeout_dials
network.addSeen(pi)
else:
debug "Peer is already connected or already seen", peer = $pi,
trace "Peer is already connected or already seen", peer = $pi,
peer_pool_has_peer = $r1, seen_table_has_peer = $r2,
seen_table_size = len(network.seenTable)
@ -757,7 +757,6 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
let peerInfo = peerRecord.value.toPeerInfo
if peerInfo != nil:
if peerInfo.id notin node.switch.connections:
debug "Discovered new peer", peer = $peer
await node.connQueue.addLast(peerInfo)
else:
peerInfo.close()

View File

@ -300,6 +300,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
counter: 1'u64,
pending: initTable[uint64, SyncRequest[T]](),
debtsQueue: initHeapQueue[SyncRequest[T]](),
zeroPoint: some[Slot](start),
inpSlot: start,
outSlot: start
)
@ -669,7 +670,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalWallSlotCb: GetSlotCallback,
updateLocalBlocksCb: UpdateLocalBlocksCallback,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
sleepTime = (int(SLOTS_PER_EPOCH) *
int(SECONDS_PER_SLOT)).seconds,
chunkSize = uint64(SLOTS_PER_EPOCH),
@ -727,6 +728,17 @@ template headAge(): uint64 =
template peerAge(): uint64 =
if peerSlot > wallSlot: 0'u64 else: wallSlot - peerSlot
template checkPeerScore(peer, body: untyped): untyped =
mixin getScore
let currentScore = peer.getScore()
body
let newScore = peer.getScore()
if currentScore > newScore:
debug "Overdue penalty for peer's score received, exiting", peer = peer,
penalty = newScore - currentScore, peer_score = newScore,
topics = "syncman"
break
proc syncWorker*[A, B](man: SyncManager[A, B],
peer: A): Future[A] {.async.} =
# Sync worker is the lowest level loop which performs syncing with single
@ -775,7 +787,10 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
debug "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
let res = await peer.updateStatus()
checkPeerScore peer:
let res = await peer.updateStatus()
if not(res):
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
@ -822,7 +837,10 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
# pending, this can fall into endless cycle, when low number of peers
# are available in PeerPool. We going to wait for RESP_TIMEOUT time,
# so all pending requests should be finished at this moment.
await sleepAsync(RESP_TIMEOUT)
checkPeerScore peer:
await sleepAsync(RESP_TIMEOUT)
let failure = SyncFailure.init(SyncFailureKind.EmptyProblem, peer)
man.failures.add(failure)
break
@ -833,7 +851,9 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
request_step = req.step, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
let blocks = await man.getBlocks(peer, req)
checkPeerScore peer:
let blocks = await man.getBlocks(peer, req)
if blocks.isOk:
let data = blocks.get()
let smap = getShortMap(req, data)
@ -854,7 +874,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
break
# Scoring will happen in `syncUpdate`.
await man.queue.push(req, data)
checkPeerScore peer,
await man.queue.push(req, data)
# Cleaning up failures.
man.failures.setLen(0)
else: