Fix sync issues. (#1035)

* Fix sync issues.

* Add documentation about zero-point.
Add more comments about syncing loops.
Change to 4 blocks per request.
This commit is contained in:
Eugene Kabanov 2020-05-19 15:08:50 +03:00 committed by GitHub
parent 4359147efc
commit ea95021073
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 505 additions and 115 deletions

View File

@ -7,7 +7,7 @@
import
# Standard library
os, tables, random, strutils, times,
os, tables, random, strutils, times, math,
# Nimble packages
stew/[objects, bitseqs, byteutils], stew/shims/macros,
@ -516,12 +516,22 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): bool =
debug "Forward sync imported blocks", count = len(list),
local_head_slot = getLocalHeadSlot()
let sm = now(chronos.Moment)
for blk in list:
if not(node.storeBlock(blk)):
return false
discard node.updateHead()
let dur = now(chronos.Moment) - sm
let secs = float(chronos.seconds(1).nanoseconds)
var storeSpeed = 0.0
if not(dur.isZero()):
let v = float(len(list)) * (secs / float(dur.nanoseconds))
# We doing round manually because stdlib.round is deprecated
storeSpeed = round(v * 10000) / 10000
info "Forward sync blocks got imported sucessfully", count = len(list),
local_head_slot = getLocalHeadSlot()
local_head_slot = getLocalHeadSlot(), store_speed = storeSpeed
result = true
proc scoreCheck(peer: Peer): bool =
@ -540,8 +550,12 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
var syncman = newSyncManager[Peer, PeerID](
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
updateLocalBlocks,
# TODO increase when block processing perf improves
chunkSize = 16
# 4 blocks per chunk is the optimal value right now, because our current
# syncing speed is around 4 blocks per second. So there no need to request
# more then 4 blocks right now. As soon as `store_speed` value become
# significantly more then 4 blocks per second you can increase this
# value appropriately.
chunkSize = 4
)
await syncman.sync()

View File

@ -1,7 +1,8 @@
import chronicles
import options, deques, heapqueue, tables, strutils, sequtils
import options, deques, heapqueue, tables, strutils, sequtils, math
import stew/bitseqs, chronos, chronicles
import spec/datatypes, spec/digest, peer_pool, eth2_network
import eth/async_utils
export datatypes, digest, chronos, chronicles
logScope:
@ -18,6 +19,10 @@ const
## Peer did not respond in time on `blocksByRange` request.
PeerScoreGoodBlocks* = 100
## Peer' `blocksByRange` answer is fine.
PeerScoreBadBlocks* = -1000
## Peer response contains incorrect blocks
PeerScoreJokeBlocks* = -200
## Peer response contains too many empty blocks
type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
@ -25,16 +30,21 @@ type
UpdateLocalBlocksCallback* =
proc(list: openarray[SignedBeaconBlock]): bool {.gcsafe.}
SyncRequest* = object
SyncUpdateCallback*[T] =
proc(req: SyncRequest[T],
list: openarray[SignedBeaconBlock]): bool {.gcsafe.}
SyncRequest*[T] = object
slot*: Slot
count*: uint64
step*: uint64
item*: T
SyncResult* = object
request*: SyncRequest
SyncResult*[T] = object
request*: SyncRequest[T]
data*: seq[SignedBeaconBlock]
SyncQueue* = ref object
SyncQueue*[T] = ref object
inpSlot*: Slot
outSlot*: Slot
@ -44,11 +54,14 @@ type
queueSize*: int
notFullEvent*: AsyncEvent
syncUpdate*: UpdateLocalBlocksCallback
syncUpdate*: SyncUpdateCallback[T]
debtsQueue: HeapQueue[SyncRequest]
debtsQueue: HeapQueue[SyncRequest[T]]
debtsCount: uint64
readyQueue: HeapQueue[SyncResult]
readyQueue: HeapQueue[SyncResult[T]]
zeroPoint: Option[Slot]
suspects: seq[SyncResult[T]]
SyncManager*[A, B] = ref object
pool: PeerPool[A, B]
@ -59,15 +72,19 @@ type
toleranceValue: uint64
getLocalHeadSlot: GetSlotCallback
getLocalWallSlot: GetSlotCallback
updateLocalBlocks: UpdateLocalBlocksCallback
syncUpdate: SyncUpdateCallback[A]
chunkSize: uint64
queue: SyncQueue
queue: SyncQueue[A]
SyncMoment* = object
stamp*: chronos.Moment
slot*: Slot
SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
proc getShortMap*(req: SyncRequest,
data: openarray[SignedBeaconBlock]): string =
proc getShortMap*[T](req: SyncRequest[T],
data: openarray[SignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count)
var slider = req.slot
@ -87,29 +104,43 @@ proc getShortMap*(req: SyncRequest,
slider = slider + req.step
result = res
proc getFullMap*(req: SyncRequest,
data: openarray[SignedBeaconBlock]): string =
proc getFullMap*[T](req: SyncRequest[T],
data: openarray[SignedBeaconBlock]): string =
# Returns all slot numbers in ``data`` as comma-delimeted string.
result = mapIt(data, $it.message.slot).join(", ")
proc init*(t: typedesc[SyncRequest], slot: Slot,
count: uint64): SyncRequest {.inline.} =
result = SyncRequest(slot: slot, count: count, step: 1'u64)
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], slot: Slot,
count: uint64): SyncRequest[T] {.inline.} =
result = SyncRequest[T](slot: slot, count: count, step: 1'u64)
proc init*(t: typedesc[SyncRequest], start: Slot,
finish: Slot): SyncRequest {.inline.} =
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot,
finish: Slot): SyncRequest[T] {.inline.} =
let count = finish - start + 1'u64
result = SyncRequest(slot: start, count: count, step: 1'u64)
result = SyncRequest[T](slot: start, count: count, step: 1'u64)
proc empty*(t: typedesc[SyncRequest]): SyncRequest {.inline.} =
result = SyncRequest(step: 0'u64, count: 0'u64)
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], slot: Slot,
count: uint64, item: T): SyncRequest[T] {.inline.} =
result = SyncRequest[T](slot: slot, count: count, item: item, step: 1'u64)
proc isEmpty*(sr: SyncRequest): bool {.inline.} =
proc init*[T](t1: typedesc[SyncRequest], t2: typedesc[T], start: Slot,
finish: Slot, item: T): SyncRequest[T] {.inline.} =
let count = finish - start + 1'u64
result = SyncRequest[T](slot: start, count: count, step: 1'u64, item: item)
proc empty*[T](t: typedesc[SyncRequest],
t2: typedesc[T]): SyncRequest[T] {.inline.} =
result = SyncRequest[T](step: 0'u64, count: 0'u64)
proc setItem*[T](sr: var SyncRequest[T], item: T) =
sr.item = item
proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
result = (sr.step == 0'u64) and (sr.count == 0'u64)
proc init*(t: typedesc[SyncQueue], start, last: Slot, chunkSize: uint64,
updateCb: UpdateLocalBlocksCallback,
queueSize: int = -1): SyncQueue =
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
start, last: Slot, chunkSize: uint64,
updateCb: SyncUpdateCallback[T],
queueSize: int = -1): SyncQueue[T] =
## Create new synchronization queue with parameters
##
## ``start`` and ``last`` are starting and finishing Slots.
@ -123,50 +154,138 @@ proc init*(t: typedesc[SyncQueue], start, last: Slot, chunkSize: uint64,
## ``updateCb`` procedure which will be used to send downloaded blocks to
## consumer. Procedure should return ``false`` only when it receives
## incorrect blocks, and ``true`` if sequence of blocks is correct.
# SyncQueue is the core of sync manager, this data structure distributes
# requests to peers and manages responses from peers.
#
# Because SyncQueue is async data structure it manages backpressure and
# order of incoming responses and it also resolves "joker's" problem.
#
# Joker's problem
#
# According to current Ethereum2 network specification
# > Clients MUST respond with at least one block, if they have it and it
# > exists in the range. Clients MAY limit the number of blocks in the
# > response.
#
# Such rule can lead to very uncertain responses, for example let slots from
# 10 to 12 will be not empty. Client which follows specification can answer
# with any response from this list (X - block, `-` empty space):
#
# 1. X X X
# 2. - - X
# 3. - X -
# 4. - X X
# 5. X - -
# 6. X - X
# 7. X X -
#
# If peer answers with `1` everything will be fine and `block_pool` will be
# able to process all 3 blocks. In case of `2`, `3`, `4`, `6` - `block_pool`
# will fail immediately with chunk and report "parent is missing" error.
# But in case of `5` and `7` blocks will be processed by `block_pool` without
# any problems, however it will start producing problems right from this
# uncertain last slot. SyncQueue will start producing requests for next
# blocks, but all the responses from this point will fail with "parent is
# missing" error. Lets call such peers "jokers", because they are joking
# with responses.
#
# To fix "joker" problem i'm going to introduce "zero-point" which will
# represent first non-empty slot in gap at the end of requested chunk.
# If SyncQueue receives chunk of blocks with gap at the end and this chunk
# will be successfully processed by `block_pool` it will set `zero_point` to
# the first uncertain (empty) slot. For example:
#
# Case 1
# X X X X X -
# 3 4 5 6 7 8
#
# Case2
# X X - - - -
# 3 4 5 6 7 8
#
# In Case 1 `zero-point` will be equal to 8, in Case 2 `zero-point` will be
# set to 5.
#
# When `zero-point` is set and the next received chunk of blocks will be
# empty, then peer produced this chunk of blocks will be added to suspect
# list.
#
# If the next chunk of blocks has at least one non-empty block and this chunk
# will be successfully processed by `block_pool`, then `zero-point` will be
# reset and suspect list will be cleared.
#
# If the `block_pool` failed to process next chunk of blocks, SyncQueue will
# perform rollback to `zero-point` and penalize all the peers in suspect list.
doAssert(chunkSize > 0'u64, "Chunk size should not be zero")
result = SyncQueue(
result = SyncQueue[T](
startSlot: start,
lastSlot: last,
chunkSize: chunkSize,
queueSize: queueSize,
syncUpdate: updateCb,
notFullEvent: newAsyncEvent(),
debtsQueue: initHeapQueue[SyncRequest](),
debtsQueue: initHeapQueue[SyncRequest[T]](),
inpSlot: start,
outSlot: start
)
proc `<`*(a, b: SyncRequest): bool {.inline.} =
proc `<`*[T](a, b: SyncRequest[T]): bool {.inline.} =
result = (a.slot < b.slot)
proc `<`*(a, b: SyncResult): bool {.inline.} =
proc `<`*[T](a, b: SyncResult[T]): bool {.inline.} =
result = (a.request.slot < b.request.slot)
proc `==`*(a, b: SyncRequest): bool {.inline.} =
proc `==`*[T](a, b: SyncRequest[T]): bool {.inline.} =
result = ((a.slot == b.slot) and (a.count == b.count) and
(a.step == b.step))
proc lastSlot*(req: SyncRequest): Slot {.inline.} =
proc lastSlot*[T](req: SyncRequest[T]): Slot {.inline.} =
## Returns last slot for request ``req``.
result = req.slot + req.count - 1'u64
proc updateLastSlot*(sq: SyncQueue, last: Slot) {.inline.} =
proc updateLastSlot*[T](sq: SyncQueue[T], last: Slot) {.inline.} =
## Update last slot stored in queue ``sq`` with value ``last``.
doAssert(sq.lastSlot <= last,
"Last slot could not be lower then stored one " &
$sq.lastSlot & " <= " & $last)
sq.lastSlot = last
proc push*(sq: SyncQueue, sr: SyncRequest,
data: seq[SignedBeaconBlock]) {.async, gcsafe.} =
proc isEmpty*[T](sr: SyncResult[T]): bool {.inline.} =
## Returns ``true`` if response has only empty slots.
len(sr.data) == 0
proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
## Returns ``true`` if response has gap at the end.
let lastslot = sr.request.slot + sr.request.count - 1'u64
if len(sr.data) == 0:
return true
if sr.data[^1].message.slot != lastslot:
return true
return false
proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
## Returns last non-empty slot from result ``sr``. If response has only
## empty slots, original request slot will be returned.
if len(sr.data) == 0:
# If response has only empty slots we going to use original request slot
sr.request.slot
else:
sr.data[^1].message.slot
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[SignedBeaconBlock]) {.async, gcsafe.} =
## Push successfull result to queue ``sq``.
mixin updateScore
while true:
if (sq.queueSize > 0) and
(sr.slot >= sq.outSlot + uint64(sq.queueSize) * sq.chunkSize):
await sq.notFullEvent.wait()
sq.notFullEvent.clear()
continue
let res = SyncResult(request: sr, data: data)
let res = SyncResult[T](request: sr, data: data)
sq.readyQueue.push(res)
break
@ -175,62 +294,180 @@ proc push*(sq: SyncQueue, sr: SyncRequest,
if sq.outSlot != minSlot:
break
let item = sq.readyQueue.pop()
if not(sq.syncUpdate(item.data)):
let res = sq.syncUpdate(item.request, item.data)
if res:
if sq.zeroPoint.isSome():
if item.isEmpty():
# If the `zeropoint` is set and response is empty, we will add this
# request to suspect list.
debug "Adding peer to suspect list", peer = item.request.item,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
response_count = len(item.data), topics = "syncman"
sq.suspects.add(item)
else:
# If the `zeropoint` is set and response is not empty, we will clean
# suspect list and reset `zeropoint`.
sq.suspects.setLen(0)
sq.zeroPoint = none[Slot]()
# At this point `zeropoint` is unset, but received response can have
# gap at the end.
if item.hasEndGap():
debug "Zero-point reset and new zero-point found",
peer = item.request.item, request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
response_count = len(item.data),
blocks_map = getShortMap(item.request, item.data),
topics = "syncman"
sq.suspects.add(item)
sq.zeroPoint = some(item.getLastNonEmptySlot())
else:
debug "Zero-point reset", peer = item.request.item,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
response_count = len(item.data),
blocks_map = getShortMap(item.request, item.data),
topics = "syncman"
else:
# If the `zeropoint` is not set and response has gap at the end, we
# will add first suspect to the suspect list and set `zeropoint`.
if item.hasEndGap():
debug "New zero-point found", peer = item.request.item,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
response_count = len(item.data),
blocks_map = getShortMap(item.request, item.data),
topics = "syncman"
sq.suspects.add(item)
sq.zeroPoint = some(item.getLastNonEmptySlot())
sq.outSlot = sq.outSlot + item.request.count
sq.notFullEvent.fire()
else:
# TODO: At this point we can handle different `block_pool` errors,
# because `Parent missing` errors are not so harsh like `Incorrect block`
# errors.
debug "Block pool rejected peer's response", peer = item.request.item,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
blocks_map = getShortMap(item.request, item.data),
blocks_count = len(item.data)
if sq.zeroPoint.isSome():
# If the `zeropoint` is set and we are unable to store response in
# `block_pool` we are going to revert suspicious responses list.
# If `zeropoint` is set, suspicious list should not be empty.
var req: SyncRequest[T]
if isEmpty(sq.suspects[0]):
# If initial suspicious response is an empty list, then previous chunk
# of blocks did not have a gap at the end. So we are going to request
# suspicious response one more time without any changes.
req = sq.suspects[0].request
else:
# If initial suspicious response is not an empty list, we are going to
# request only gap at the end of the suspicious response.
let startSlot = sq.suspects[0].getLastNonEmptySlot() + 1'u64
let lastSlot = sq.suspects[0].request.lastSlot()
req = SyncRequest.init(T, startSlot, lastSlot)
debug "Resolve joker's problem", request_slot = req.slot,
request_count = req.count,
request_step = req.step,
suspects_count = (len(sq.suspects) - 1)
sq.suspects[0].request.item.updateScore(PeerScoreJokeBlocks)
sq.debtsQueue.push(req)
sq.debtsCount = sq.debtsCount + req.count
# We move all left suspicious responses to the debts queue.
if len(sq.suspects) > 1:
for i in 1 ..< len(sq.suspects):
sq.debtsQueue.push(sq.suspects[i].request)
sq.debtsCount = sq.debtsCount + sq.suspects[i].request.count
sq.suspects[i].request.item.updateScore(PeerScoreJokeBlocks)
# Reset state to the `zeropoint`.
sq.suspects.setLen(0)
sq.outSlot = sq.zeroPoint.get()
sq.zeroPoint = none[Slot]()
# We need to move failed response to the debts queue.
sq.debtsQueue.push(item.request)
sq.debtsCount = sq.debtsCount + item.request.count
break
sq.outSlot = sq.outSlot + item.request.count
sq.notFullEvent.fire()
proc push*(sq: SyncQueue, sr: SyncRequest) =
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
## Push failed request back to queue.
sq.debtsQueue.push(sr)
sq.debtsCount = sq.debtsCount + sr.count
proc pop*(sq: SyncQueue, maxslot: Slot): SyncRequest =
proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
if len(sq.debtsQueue) > 0:
if maxSlot < sq.debtsQueue[0].slot:
return SyncRequest.empty()
return SyncRequest.empty(T)
let sr = sq.debtsQueue.pop()
var sr = sq.debtsQueue.pop()
if sr.lastSlot() <= maxSlot:
sq.debtsCount = sq.debtsCount - sr.count
sr.setItem(item)
return sr
let sr1 = SyncRequest.init(sr.slot, maxslot)
let sr2 = SyncRequest.init(maxslot + 1'u64, sr.lastSlot())
let sr1 = SyncRequest.init(T, sr.slot, maxslot, item)
let sr2 = SyncRequest.init(T, maxslot + 1'u64, sr.lastSlot())
sq.debtsQueue.push(sr2)
sq.debtsCount = sq.debtsCount - sr1.count
return sr1
else:
if maxSlot < sq.inpSlot:
return SyncRequest.empty()
return SyncRequest.empty(T)
if sq.inpSlot > sq.lastSlot:
return SyncRequest.empty()
return SyncRequest.empty(T)
let lastSlot = min(maxslot, sq.lastSlot)
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
let sr = SyncRequest.init(sq.inpSlot, count)
let sr = SyncRequest.init(T, sq.inpSlot, count, item)
sq.inpSlot = sq.inpSlot + count
return sr
proc len*(sq: SyncQueue): uint64 {.inline.} =
proc len*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns number of slots left in queue ``sq``.
if sq.inpSlot > sq.lastSlot:
result = sq.debtsCount
else:
result = sq.lastSlot - sq.inpSlot + 1'u64 + sq.debtsCount
proc total*(sq: SyncQueue): uint64 {.inline.} =
proc total*[T](sq: SyncQueue[T]): uint64 {.inline.} =
## Returns total number of slots in queue ``sq``.
result = sq.lastSlot - sq.startSlot + 1'u64
proc progress*(sq: SyncQueue): uint64 =
proc progress*[T](sq: SyncQueue[T]): uint64 =
## Returns queue's ``sq`` progress string.
let curSlot = sq.outSlot - sq.startSlot
result = (curSlot * 100'u64) div sq.total()
proc now*(sm: typedesc[SyncMoment], slot: Slot): SyncMoment {.inline.} =
result = SyncMoment(stamp: now(chronos.Moment), slot: slot)
proc speed*(start, finish: SyncMoment): float {.inline.} =
## Returns number of slots per second.
let slots = finish.slot - start.slot
let dur = finish.stamp - start.stamp
let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur):
result = 0.0
else:
let v = float(slots) * (secs / float(dur.nanoseconds))
# We doing round manually because stdlib.round is deprecated
result = round(v * 10000) / 10000
proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback,
@ -242,13 +479,25 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1)
): SyncManager[A, B] =
let queue = SyncQueue.init(getLocalHeadSlotCb(), getLocalWallSlotCb(),
chunkSize, updateLocalBlocksCb, 2)
proc syncUpdate(req: SyncRequest[A],
list: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
let peer = req.item
if updateLocalBlocksCb(list):
peer.updateScore(PeerScoreGoodBlocks)
result = true
else:
# Scoring will be done inside of SyncQueue.
result = false
let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(),
chunkSize, syncUpdate, 2)
result = SyncManager[A, B](
pool: pool,
maxStatusAge: maxStatusAge,
getLocalHeadSlot: getLocalHeadSlotCb,
updateLocalBlocks: updateLocalBlocksCb,
syncUpdate: syncUpdate,
getLocalWallSlot: getLocalWallSlotCb,
maxHeadAge: maxHeadAge,
sleepTime: sleepTime,
@ -284,6 +533,19 @@ template peerAge(): uint64 =
proc syncWorker*[A, B](man: SyncManager[A, B],
peer: A): Future[A] {.async.} =
# Sync worker is the lowest level loop which performs syncing with single
# peer.
#
# Logic here is pretty simple:
# 1. Obtain request from SyncQueue.
# 2. Send this request to a peer and obtain response.
# 3. Push response to the SyncQueue, (doesn't matter if it success or failure)
# 4. Update main SyncQueue last slot with wall time slot number.
# 5. From time to time we also requesting peer's status information.
# 6. If our current head slot is near equal to peer's head slot we are
# exiting this loop and finishing that sync-worker task.
# 7. Repeat
mixin getKey, getScore, getHeadSlot
debug "Starting syncing with peer", peer = peer,
@ -346,7 +608,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
peer = peer, peer_score = peer.getScore(), topics = "syncman"
break
let req = man.queue.pop(peerSlot)
let req = man.queue.pop(peerSlot, peer)
if req.isEmpty():
debug "Empty request received from queue, exiting", peer = peer,
local_head_slot = headSlot, remote_head_slot = peerSlot,
@ -375,12 +637,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
blocks_map = smap, request_slot = req.slot,
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
# Scoring will happen in `syncUpdate`.
await man.queue.push(req, data)
debug "Received blocks got accepted", blocks_count = len(data),
blocks_map = smap, request_slot = req.slot,
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
peer.updateScore(PeerScoreGoodBlocks)
else:
peer.updateScore(PeerScoreNoBlocks)
man.queue.push(req)
@ -395,16 +653,36 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
man.pool.release(peer)
proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
# This procedure manages main loop of SyncManager and in this loop it
# performs
# 1. It checks for current sync status, "are we synced?".
# 2. If we are in active syncing, it tries to acquire peers from PeerPool and
# spawns new sync-workers.
# 3. It stops spawning sync-workers when we are "in sync".
# 4. It calculates syncing performance.
mixin getKey, getScore
var pending = newSeq[Future[A]]()
var acquireFut: Future[A]
var wallSlot, headSlot: Slot
var syncSpeed: float = 0.0
template workersCount(): int =
if isNil(acquireFut): len(pending) else: (len(pending) - 1)
proc speedometerTask() {.async.} =
while true:
let lsm1 = SyncMoment.now(man.getLocalHeadSlot())
await sleepAsync(chronos.seconds(int(SECONDS_PER_SLOT)))
let lsm2 = SyncMoment.now(man.getLocalHeadSlot())
if workersCount() == 0:
syncSpeed = 0.0
else:
syncSpeed = speed(lsm1, lsm2)
debug "Synchronization loop started", topics = "syncman"
traceAsyncErrors speedometerTask()
while true:
wallSlot = man.getLocalWallSlot()
headSlot = man.getLocalHeadSlot()
@ -468,16 +746,10 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), topics = "syncman"
man.pool.release(peer)
elif not peer.hasInitialStatus:
# TODO Don't even consider these peers!
debug "Peer not ready", peer
man.pool.release(peer)
# TODO goes into tight loop without this
await sleepAsync(RESP_TIMEOUT)
else:
if headSlot > man.queue.lastSlot:
man.queue = SyncQueue.init(headSlot, wallSlot, man.chunkSize,
man.updateLocalBlocks, 2)
man.queue = SyncQueue.init(A, headSlot, wallSlot,
man.chunkSize, man.syncUpdate, 2)
debug "Synchronization loop starting new worker", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), topics = "syncman"
@ -513,7 +785,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
temp.add(fut)
pending = temp
debug "Synchronization loop end tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, workers_count = workersCount(),
waiting_for_new_peer = $not(isNil(acquireFut)),
topics = "syncman"
sync_speed = syncSpeed, topics = "syncman"

View File

@ -4,6 +4,15 @@ import unittest
import chronos
import ../beacon_chain/sync_manager
type
SomeTPeer = ref object
proc `$`*(peer: SomeTPeer): string =
"SomeTPeer"
proc updateScore(peer: SomeTPeer, score: int) =
discard
suite "SyncManager test suite":
proc createChain(start, finish: Slot): seq[SignedBeaconBlock] =
doAssert(start <= finish)
@ -14,48 +23,64 @@ suite "SyncManager test suite":
item.message.slot = curslot
curslot = curslot + 1'u64
proc syncUpdate(req: SyncRequest[SomeTPeer],
data: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
discard
test "[SyncQueue] Start and finish slots equal":
var queue = SyncQueue.init(Slot(0), Slot(0), 1'u64, nil)
let p1 = SomeTPeer()
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, syncUpdate)
check len(queue) == 1
var r11 = queue.pop(Slot(0))
var r11 = queue.pop(Slot(0), p1)
check len(queue) == 0
queue.push(r11)
check len(queue) == 1
var r11e = queue.pop(Slot(0))
var r11e = queue.pop(Slot(0), p1)
check:
len(queue) == 0
r11e == r11
r11.item == p1
r11e.item == r11.item
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
test "[SyncQueue] Two full requests success/fail":
var queue = SyncQueue.init(Slot(0), Slot(1), 1'u64, nil)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, syncUpdate)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
check len(queue) == 2
var r21 = queue.pop(Slot(1))
var r21 = queue.pop(Slot(1), p1)
check len(queue) == 1
var r22 = queue.pop(Slot(1))
var r22 = queue.pop(Slot(1), p2)
check len(queue) == 0
queue.push(r22)
check len(queue) == 1
queue.push(r21)
check len(queue) == 2
var r21e = queue.pop(Slot(1))
var r21e = queue.pop(Slot(1), p1)
check len(queue) == 1
var r22e = queue.pop(Slot(1))
var r22e = queue.pop(Slot(1), p2)
check:
len(queue) == 0
r21 == r21e
r22 == r22e
r21.item == p1
r22.item == p2
r21.item == r21e.item
r22.item == r22e.item
r21.slot == Slot(0) and r21.count == 1'u64 and r21.step == 1'u64
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from zero":
var queue = SyncQueue.init(Slot(0), Slot(4), 2'u64, nil)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, syncUpdate)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
check len(queue) == 5
var r31 = queue.pop(Slot(4))
var r31 = queue.pop(Slot(4), p1)
check len(queue) == 3
var r32 = queue.pop(Slot(4))
var r32 = queue.pop(Slot(4), p2)
check len(queue) == 1
var r33 = queue.pop(Slot(4))
var r33 = queue.pop(Slot(4), p3)
check len(queue) == 0
queue.push(r33)
check len(queue) == 1
@ -63,76 +88,98 @@ suite "SyncManager test suite":
check len(queue) == 3
queue.push(r31)
check len(queue) == 5
var r31e = queue.pop(Slot(4))
var r31e = queue.pop(Slot(4), p1)
check len(queue) == 3
var r32e = queue.pop(Slot(4))
var r32e = queue.pop(Slot(4), p2)
check len(queue) == 1
var r33e = queue.pop(Slot(4))
var r33e = queue.pop(Slot(4), p3)
check:
len(queue) == 0
r31 == r31e
r32 == r32e
r33 == r33e
r31.item == r31e.item
r32.item == r32e.item
r33.item == r33e.item
r31.item == p1
r32.item == p2
r33.item == p3
r31.slot == Slot(0) and r31.count == 2'u64 and r31.step == 1'u64
r32.slot == Slot(2) and r32.count == 2'u64 and r32.step == 1'u64
r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from non-zero":
var queue = SyncQueue.init(Slot(1), Slot(5), 3'u64, nil)
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, syncUpdate)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
check len(queue) == 5
var r41 = queue.pop(Slot(5))
var r41 = queue.pop(Slot(5), p1)
check len(queue) == 2
var r42 = queue.pop(Slot(5))
var r42 = queue.pop(Slot(5), p2)
check len(queue) == 0
queue.push(r42)
check len(queue) == 2
queue.push(r41)
check len(queue) == 5
var r41e = queue.pop(Slot(5))
var r41e = queue.pop(Slot(5), p1)
check len(queue) == 2
var r42e = queue.pop(Slot(5))
var r42e = queue.pop(Slot(5), p2)
check:
len(queue) == 0
r41 == r41e
r42 == r42e
r41.item == r41e.item
r42.item == r42e.item
r41.item == p1
r42.item == p2
r41.slot == Slot(1) and r41.count == 3'u64 and r41.step == 1'u64
r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64
test "[SyncQueue] Smart and stupid success/fail":
var queue = SyncQueue.init(Slot(0), Slot(4), 5'u64, nil)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
check len(queue) == 5
var r51 = queue.pop(Slot(3))
var r51 = queue.pop(Slot(3), p1)
check len(queue) == 1
var r52 = queue.pop(Slot(4))
var r52 = queue.pop(Slot(4), p2)
check len(queue) == 0
queue.push(r52)
check len(queue) == 1
queue.push(r51)
check len(queue) == 5
var r51e = queue.pop(Slot(3))
var r51e = queue.pop(Slot(3), p1)
check len(queue) == 1
var r52e = queue.pop(Slot(4))
var r52e = queue.pop(Slot(4), p2)
check:
len(queue) == 0
r51 == r51e
r52 == r52e
r51.item == r51e.item
r52.item == r52e.item
r51.item == p1
r52.item == p2
r51.slot == Slot(0) and r51.count == 4'u64 and r51.step == 1'u64
r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64
test "[SyncQueue] One smart and one stupid + debt split + empty":
var queue = SyncQueue.init(Slot(0), Slot(4), 5'u64, nil)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
let p4 = SomeTPeer()
check len(queue) == 5
var r61 = queue.pop(Slot(4))
var r61 = queue.pop(Slot(4), p1)
check len(queue) == 0
queue.push(r61)
var r61e = queue.pop(Slot(2))
var r61e = queue.pop(Slot(2), p1)
check len(queue) == 2
var r62e = queue.pop(Slot(2))
var r62e = queue.pop(Slot(2), p2)
check len(queue) == 2
check r62e.isEmpty()
var r63e = queue.pop(Slot(3))
var r63e = queue.pop(Slot(3), p3)
check len(queue) == 1
var r64e = queue.pop(Slot(4))
var r64e = queue.pop(Slot(4), p4)
check:
len(queue) == 0
r61.slot == Slot(0) and r61.count == 5'u64 and r61.step == 1'u64
@ -140,12 +187,18 @@ suite "SyncManager test suite":
r62e.isEmpty()
r63e.slot == Slot(3) and r63e.count == 1'u64 and r63e.step == 1'u64
r64e.slot == Slot(4) and r64e.count == 1'u64 and r64e.step == 1'u64
r61.item == p1
r61e.item == p1
isNil(r62e.item) == true
r63e.item == p3
r64e.item == p4
test "[SyncQueue] Async unordered push start from zero":
proc test(): Future[bool] {.async.} =
var counter = 0
proc receiver(list: openarray[SignedBeaconBlock]): bool =
proc syncReceiver(req: SyncRequest[SomeTPeer],
list: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
result = true
for item in list:
if item.message.slot == Slot(counter):
@ -155,10 +208,14 @@ suite "SyncManager test suite":
break
var chain = createChain(Slot(0), Slot(2))
var queue = SyncQueue.init(Slot(0), Slot(2), 1'u64, receiver, 1)
var r11 = queue.pop(Slot(2))
var r12 = queue.pop(Slot(2))
var r13 = queue.pop(Slot(2))
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
syncReceiver, 1)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
var r11 = queue.pop(Slot(2), p1)
var r12 = queue.pop(Slot(2), p2)
var r13 = queue.pop(Slot(2), p3)
var f13 = queue.push(r13, @[chain[2]])
var f12 = queue.push(r12, @[chain[1]])
await sleepAsync(100.milliseconds)
@ -172,6 +229,9 @@ suite "SyncManager test suite":
doAssert(f12.finished == true and f12.failed == false)
doAssert(f13.finished == true and f13.failed == false)
doAssert(counter == 3)
doAssert(r11.item == p1)
doAssert(r12.item == p2)
doAssert(r13.item == p3)
result = true
check waitFor(test())
@ -180,7 +240,8 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} =
var counter = 5
proc receiver(list: openarray[SignedBeaconBlock]): bool =
proc syncReceiver(req: SyncRequest[SomeTPeer],
list: openarray[SignedBeaconBlock]): bool {.gcsafe.} =
result = true
for item in list:
if item.message.slot == Slot(counter):
@ -188,12 +249,19 @@ suite "SyncManager test suite":
else:
result = false
break
var chain = createChain(Slot(5), Slot(11))
var queue = SyncQueue.init(Slot(5), Slot(11), 2'u64, receiver, 2)
var r21 = queue.pop(Slot(11))
var r22 = queue.pop(Slot(11))
var r23 = queue.pop(Slot(11))
var r24 = queue.pop(Slot(11))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
syncReceiver, 2)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
let p3 = SomeTPeer()
let p4 = SomeTPeer()
var r21 = queue.pop(Slot(11), p1)
var r22 = queue.pop(Slot(11), p2)
var r23 = queue.pop(Slot(11), p3)
var r24 = queue.pop(Slot(11), p4)
var f24 = queue.push(r24, @[chain[6]])
var f22 = queue.push(r22, @[chain[2], chain[3]])
@ -210,6 +278,41 @@ suite "SyncManager test suite":
doAssert(counter == 12)
await sleepAsync(100.milliseconds)
doAssert(counter == 12)
doAssert(r21.item == p1)
doAssert(r22.item == p2)
doAssert(r23.item == p3)
doAssert(r24.item == p4)
result = true
check waitFor(test())
test "[SyncQueue] hasEndGap() test":
let chain1 = createChain(Slot(1), Slot(1))
let chain2 = newSeq[SignedBeaconBlock]()
for counter in countdown(32'u64, 2'u64):
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter,
step: 1'u64)
let sr = SyncResult[SomeTPeer](request: req, data: chain1)
check sr.hasEndGap() == true
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: 1'u64, step: 1'u64)
let sr1 = SyncResult[SomeTPeer](request: req, data: chain1)
let sr2 = SyncResult[SomeTPeer](request: req, data: chain2)
check:
sr1.hasEndGap() == false
sr2.hasEndGap() == true
test "[SyncQueue] getLastNonEmptySlot() test":
let chain1 = createChain(Slot(10), Slot(10))
let chain2 = newSeq[SignedBeaconBlock]()
for counter in countdown(32'u64, 2'u64):
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter,
step: 1'u64)
let sr = SyncResult[SomeTPeer](request: req, data: chain1)
check sr.getLastNonEmptySlot() == Slot(10)
let req = SyncRequest[SomeTPeer](slot: Slot(100), count: 1'u64, step: 1'u64)
let sr = SyncResult[SomeTPeer](request: req, data: chain2)
check sr.getLastNonEmptySlot() == Slot(100)