Fix SyncQueue push(data) bug.
Rename lastSlot to HeadSlot. Add failure test.
This commit is contained in:
parent
73dc72583f
commit
db20fc1172
|
@ -1,22 +1,26 @@
|
||||||
import chronicles
|
import chronicles
|
||||||
import options, deques, heapqueue
|
import options, deques, heapqueue
|
||||||
import spec/datatypes, spec/digest, stew/bitseqs, chronos
|
import stew/bitseqs, chronos, chronicles
|
||||||
import peer_pool
|
import spec/datatypes, spec/digest, peer_pool
|
||||||
export datatypes, digest
|
export datatypes, digest, chronos, chronicles
|
||||||
|
|
||||||
# logScope:
|
logScope:
|
||||||
# topics = "syncman"
|
topics = "syncman"
|
||||||
|
|
||||||
const MAX_REQUESTED_BLOCKS* = 20'u64
|
const MAX_REQUESTED_BLOCKS* = 20'u64
|
||||||
|
|
||||||
type
|
type
|
||||||
# A - Peer type
|
## A - `Peer` type
|
||||||
# B - PeerID type
|
## B - `PeerID` type
|
||||||
#
|
##
|
||||||
# getLastSlot(Peer): Slot
|
## Procedures which needs to be implemented and will be mixed to SyncManager's
|
||||||
# getHeadRoot(Peer): Eth2Digest
|
## code:
|
||||||
# getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, uint64): Future[Option[seq[SignedBeaconBlock]]]
|
##
|
||||||
# updateStatus(Peer): void
|
## getHeadSlot(Peer): Slot
|
||||||
|
## getHeadRoot(Peer): Eth2Digest
|
||||||
|
## getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64,
|
||||||
|
## uint64): Future[Option[seq[SignedBeaconBlock]]]
|
||||||
|
## updateStatus(Peer): void
|
||||||
|
|
||||||
PeerSlot*[A, B] = ref object
|
PeerSlot*[A, B] = ref object
|
||||||
peers*: seq[A]
|
peers*: seq[A]
|
||||||
|
@ -26,7 +30,7 @@ type
|
||||||
slots*: seq[PeerSlot[A, B]]
|
slots*: seq[PeerSlot[A, B]]
|
||||||
man: SyncManager[A, B]
|
man: SyncManager[A, B]
|
||||||
|
|
||||||
GetLastLocalSlotCallback* = proc(): Slot
|
GetLocalHeadSlotCallback* = proc(): Slot
|
||||||
UpdateLocalBlocksCallback* = proc(list: openarray[SignedBeaconBlock]): bool
|
UpdateLocalBlocksCallback* = proc(list: openarray[SignedBeaconBlock]): bool
|
||||||
|
|
||||||
SyncManager*[A, B] = ref object
|
SyncManager*[A, B] = ref object
|
||||||
|
@ -40,7 +44,7 @@ type
|
||||||
peerSlotTimeout: chronos.Duration
|
peerSlotTimeout: chronos.Duration
|
||||||
peerGroupTimeout: chronos.Duration
|
peerGroupTimeout: chronos.Duration
|
||||||
statusPeriod: chronos.Duration
|
statusPeriod: chronos.Duration
|
||||||
getLastLocalSlot: GetLastLocalSlotCallback
|
getLocalHeadSlot: GetLocalHeadSlotCallback
|
||||||
updateLocalBlocks: UpdateLocalBlocksCallback
|
updateLocalBlocks: UpdateLocalBlocksCallback
|
||||||
|
|
||||||
BlockList* = object
|
BlockList* = object
|
||||||
|
@ -127,7 +131,7 @@ proc push*(sq: SyncQueue, sr: SyncRequest,
|
||||||
data: seq[SignedBeaconBlock]) {.async.} =
|
data: seq[SignedBeaconBlock]) {.async.} =
|
||||||
## Push successfull result to queue ``sq``.
|
## Push successfull result to queue ``sq``.
|
||||||
while true:
|
while true:
|
||||||
if (sq.queueSize > 0) and (sr.slot > sq.inpSlot + uint64(sq.queueSize)):
|
if (sq.queueSize > 0) and (sr.slot >= sq.outSlot + uint64(sq.queueSize)):
|
||||||
await sq.notFullEvent.wait()
|
await sq.notFullEvent.wait()
|
||||||
sq.notFullEvent.clear()
|
sq.notFullEvent.clear()
|
||||||
continue
|
continue
|
||||||
|
@ -196,7 +200,7 @@ proc pop*(sq: SyncQueue, step = 0'u64): SyncRequest =
|
||||||
result = SyncRequest(slot: sq.inpSlot, count: count, step: nstep)
|
result = SyncRequest(slot: sq.inpSlot, count: count, step: nstep)
|
||||||
sq.inpSlot = sq.inpSlot + count
|
sq.inpSlot = sq.inpSlot + count
|
||||||
else:
|
else:
|
||||||
raise newException(ValueError, "Queue is already empty!")
|
raise newException(SyncManagerError, "Queue is already empty!")
|
||||||
|
|
||||||
proc len*(sq: SyncQueue): uint64 {.inline.} =
|
proc len*(sq: SyncQueue): uint64 {.inline.} =
|
||||||
## Returns number of slots left in queue ``sq``.
|
## Returns number of slots left in queue ``sq``.
|
||||||
|
@ -211,7 +215,8 @@ proc total*(sq: SyncQueue): uint64 {.inline.} =
|
||||||
|
|
||||||
proc progress*(sq: SyncQueue): string =
|
proc progress*(sq: SyncQueue): string =
|
||||||
## Returns queue's ``sq`` progress string.
|
## Returns queue's ``sq`` progress string.
|
||||||
result = $len(sq) & "/" & $sq.total()
|
let curSlot = sq.outSlot - sq.startSlot
|
||||||
|
result = $curSlot & "/" & $sq.total()
|
||||||
|
|
||||||
proc init*(t: typedesc[BlockList], start: Slot, count, step: uint64,
|
proc init*(t: typedesc[BlockList], start: Slot, count, step: uint64,
|
||||||
list: openarray[SignedBeaconBlock]): Option[BlockList] =
|
list: openarray[SignedBeaconBlock]): Option[BlockList] =
|
||||||
|
@ -315,7 +320,7 @@ proc merge*(optlists: varargs[Option[BlockList]]): Option[BlockList] =
|
||||||
result = some(res)
|
result = some(res)
|
||||||
|
|
||||||
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||||
getLastLocalSlotCb: GetLastLocalSlotCallback,
|
getLocalHeadSlotCb: GetLocalHeadSlotCallback,
|
||||||
updateLocalBlocksCb: UpdateLocalBlocksCallback,
|
updateLocalBlocksCb: UpdateLocalBlocksCallback,
|
||||||
peersInSlot = 3, peerSlotTimeout = 6.seconds,
|
peersInSlot = 3, peerSlotTimeout = 6.seconds,
|
||||||
slotsInGroup = 2, peerGroupTimeout = 10.seconds,
|
slotsInGroup = 2, peerGroupTimeout = 10.seconds,
|
||||||
|
@ -337,7 +342,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||||
##
|
##
|
||||||
## ``statusPeriod`` - period of time between status updates.
|
## ``statusPeriod`` - period of time between status updates.
|
||||||
##
|
##
|
||||||
## ``getLastLocalSlotCb`` - function which provides current latest `Slot` in
|
## ``getLocalHeadSlotCb`` - function which provides current latest `Slot` in
|
||||||
## local database.
|
## local database.
|
||||||
##
|
##
|
||||||
## ``updateLocalBlocksCb`` - function which accepts list of downloaded blocks
|
## ``updateLocalBlocksCb`` - function which accepts list of downloaded blocks
|
||||||
|
@ -354,7 +359,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||||
peerSlotTimeout: peerSlotTimeout,
|
peerSlotTimeout: peerSlotTimeout,
|
||||||
peerGroupTimeout: peerGroupTimeout,
|
peerGroupTimeout: peerGroupTimeout,
|
||||||
statusPeriod: statusPeriod,
|
statusPeriod: statusPeriod,
|
||||||
getLastLocalSlot: getLastLocalSlotCb,
|
getLocalHeadSlot: getLocalHeadSlotCb,
|
||||||
updateLocalBlocks: updateLocalBlocksCb,
|
updateLocalBlocks: updateLocalBlocksCb,
|
||||||
failuresCount: failuresCount,
|
failuresCount: failuresCount,
|
||||||
failurePause: failurePause)
|
failurePause: failurePause)
|
||||||
|
@ -369,14 +374,14 @@ proc newPeerSlot*[A, B](man: SyncManager[A, B]): PeerSlot[A, B] =
|
||||||
|
|
||||||
proc `$`*[A, B](peerslot: PeerSlot[A, B]): string =
|
proc `$`*[A, B](peerslot: PeerSlot[A, B]): string =
|
||||||
## Returns string representation of peer's slot ``peerslot``.
|
## Returns string representation of peer's slot ``peerslot``.
|
||||||
mixin getKey, getLastSlot
|
mixin getKey, getHeadSlot
|
||||||
if len(peerslot.peers) == 0:
|
if len(peerslot.peers) == 0:
|
||||||
result = "<>"
|
result = "<>"
|
||||||
else:
|
else:
|
||||||
result = "<"
|
result = "<"
|
||||||
for item in peerslot.peers:
|
for item in peerslot.peers:
|
||||||
result.add("\"" & getKey(item) & "\"")
|
result.add("\"" & getKey(item) & "\"")
|
||||||
result.add(":" & $getLastSlot(item))
|
result.add(":" & $getHeadSlot(item))
|
||||||
result.add(", ")
|
result.add(", ")
|
||||||
result.setLen(len(result) - 2)
|
result.setLen(len(result) - 2)
|
||||||
result.add(">")
|
result.add(">")
|
||||||
|
@ -509,6 +514,12 @@ proc `$`*[A, B](man: SyncManager[A, B]): string =
|
||||||
if len(result) > 0:
|
if len(result) > 0:
|
||||||
result.setLen(len(result) - 2)
|
result.setLen(len(result) - 2)
|
||||||
|
|
||||||
|
proc peersCount*[A, B](man: SyncManager[A, B]): int =
|
||||||
|
## Returns number of peers which is managed by Sync Manager ``man``.
|
||||||
|
for i in 0 ..< len(man.groups):
|
||||||
|
for k in 0 ..< len(man.groups[i].slots):
|
||||||
|
result = result + len(man.groups[i].slots[k].peers)
|
||||||
|
|
||||||
proc fillGroups*[A, B](man: SyncManager[A, B]) {.async.} =
|
proc fillGroups*[A, B](man: SyncManager[A, B]) {.async.} =
|
||||||
if len(man.groups) == 0:
|
if len(man.groups) == 0:
|
||||||
while len(man.groups) < man.groupsCount:
|
while len(man.groups) < man.groupsCount:
|
||||||
|
@ -559,7 +570,7 @@ proc isEmpty*[A, B](man: SyncManager[A, B]): bool =
|
||||||
result = (len(man.groups) == 0)
|
result = (len(man.groups) == 0)
|
||||||
|
|
||||||
proc reorderGroups*[A, B](man: SyncManager[A, B]) =
|
proc reorderGroups*[A, B](man: SyncManager[A, B]) =
|
||||||
mixin getLastSlot
|
mixin getHeadSlot
|
||||||
doAssert(not(man.isEmpty()))
|
doAssert(not(man.isEmpty()))
|
||||||
|
|
||||||
var x, y, z: int
|
var x, y, z: int
|
||||||
|
@ -568,7 +579,7 @@ proc reorderGroups*[A, B](man: SyncManager[A, B]) =
|
||||||
for j0 in 0 ..< len(group0.slots):
|
for j0 in 0 ..< len(group0.slots):
|
||||||
let slot0 = group0.slots[j0]
|
let slot0 = group0.slots[j0]
|
||||||
for k0 in 0 ..< len(slot0.peers):
|
for k0 in 0 ..< len(slot0.peers):
|
||||||
var curSlot = getLastSlot(slot0.peers[k0])
|
var curSlot = getHeadSlot(slot0.peers[k0])
|
||||||
x = -1; y = -1; z = -1
|
x = -1; y = -1; z = -1
|
||||||
|
|
||||||
for i1 in i0 ..< len(man.groups):
|
for i1 in i0 ..< len(man.groups):
|
||||||
|
@ -577,7 +588,7 @@ proc reorderGroups*[A, B](man: SyncManager[A, B]) =
|
||||||
let slot1 = group1.slots[j1]
|
let slot1 = group1.slots[j1]
|
||||||
let start = if (i1 == i0) and (j1 == j0): k0 + 1 else: 0
|
let start = if (i1 == i0) and (j1 == j0): k0 + 1 else: 0
|
||||||
for k1 in start ..< len(slot1.peers):
|
for k1 in start ..< len(slot1.peers):
|
||||||
let newSlot = getLastSlot(slot1.peers[k1])
|
let newSlot = getHeadSlot(slot1.peers[k1])
|
||||||
if curSlot < newSlot:
|
if curSlot < newSlot:
|
||||||
curSlot = newSlot
|
curSlot = newSlot
|
||||||
x = i1; y = j1; z = k1
|
x = i1; y = j1; z = k1
|
||||||
|
@ -605,39 +616,39 @@ proc disband*[A, B](syncman: SyncManager[A, B]) =
|
||||||
disband(group)
|
disband(group)
|
||||||
syncman.groups.setLen(0)
|
syncman.groups.setLen(0)
|
||||||
|
|
||||||
proc getLastSlot*[A, B](peerslot: PeerSlot[A, B]): Slot =
|
proc getHeadSlot*[A, B](peerslot: PeerSlot[A, B]): Slot =
|
||||||
## Returns minimal available beacon chain slot, for peer's slot ``peerslot``.
|
## Returns minimal available beacon chain slot, for peer's slot ``peerslot``.
|
||||||
mixin getLastSlot
|
mixin getHeadSlot
|
||||||
doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero")
|
doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero")
|
||||||
for i in 0 ..< len(peerslot.peers):
|
for i in 0 ..< len(peerslot.peers):
|
||||||
if i == 0:
|
if i == 0:
|
||||||
result = getLastSlot(peerslot.peers[i])
|
result = getHeadSlot(peerslot.peers[i])
|
||||||
else:
|
else:
|
||||||
let slot = getLastSlot(peerslot.peers[i])
|
let slot = getHeadSlot(peerslot.peers[i])
|
||||||
if slot < result:
|
if slot < result:
|
||||||
result = slot
|
result = slot
|
||||||
|
|
||||||
proc getLastSlot*[A, B](peergroup: PeerGroup[A, B]): Slot =
|
proc getHeadSlot*[A, B](peergroup: PeerGroup[A, B]): Slot =
|
||||||
## Returns minimal available beacon chain slot, for peer's group
|
## Returns minimal available beacon chain slot, for peer's group
|
||||||
## ``peergroup``.
|
## ``peergroup``.
|
||||||
doAssert(len(peergroup.slots) > 0,
|
doAssert(len(peergroup.slots) > 0,
|
||||||
"Number of slots in group must not be zero")
|
"Number of slots in group must not be zero")
|
||||||
for i in 0 ..< len(peergroup.slots):
|
for i in 0 ..< len(peergroup.slots):
|
||||||
if i == 0:
|
if i == 0:
|
||||||
result = getLastSlot(peergroup.slots[i])
|
result = getHeadSlot(peergroup.slots[i])
|
||||||
else:
|
else:
|
||||||
let slot = getLastSlot(peergroup.slots[i])
|
let slot = getHeadSlot(peergroup.slots[i])
|
||||||
if slot < result:
|
if slot < result:
|
||||||
result = slot
|
result = slot
|
||||||
|
|
||||||
proc getLastSlot*[A, B](sman: SyncManager[A, B]): Slot =
|
proc getHeadSlot*[A, B](sman: SyncManager[A, B]): Slot =
|
||||||
## Returns minimal available beacon chain slot, for all peers in sync manager
|
## Returns minimal available beacon chain slot, for all peers in sync manager
|
||||||
## ``sman``.
|
## ``sman``.
|
||||||
for i in 0 ..< len(sman.groups):
|
for i in 0 ..< len(sman.groups):
|
||||||
if i == 0:
|
if i == 0:
|
||||||
result = getLastSlot(sman.groups[i])
|
result = getHeadSlot(sman.groups[i])
|
||||||
else:
|
else:
|
||||||
let slot = getLastSlot(sman.groups[i])
|
let slot = getHeadSlot(sman.groups[i])
|
||||||
if slot < result:
|
if slot < result:
|
||||||
result = slot
|
result = slot
|
||||||
|
|
||||||
|
@ -878,22 +889,27 @@ proc updateStatus*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} =
|
proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||||
|
## TODO: This synchronization procedure is not optimal, we can do it better
|
||||||
|
## if spawn N parallel tasks, where N is number of peer groups.
|
||||||
var
|
var
|
||||||
squeue: SyncQueue
|
squeue: SyncQueue
|
||||||
remoteLastKnownSlot: Slot
|
remoteKnownHeadSlot: Slot
|
||||||
localLastSlot: Slot = sman.getLastLocalSlot()
|
localHeadSlot: Slot = sman.getLocalHeadSlot()
|
||||||
pending = newSeq[Future[OptionBlockList]]()
|
pending = newSeq[Future[OptionBlockList]]()
|
||||||
requests = newSeq[SyncRequest]()
|
requests = newSeq[SyncRequest]()
|
||||||
checkMoment = Moment.now()
|
startMoment = Moment.now()
|
||||||
|
checkMoment = startMoment
|
||||||
errorsCount = 0
|
errorsCount = 0
|
||||||
counter = 0'u64
|
counter = 0'u64
|
||||||
|
|
||||||
squeue = SyncQueue.init(localLastSlot + 1'u64, localLastSlot + 2'u64,
|
squeue = SyncQueue.init(localHeadSlot + 1'u64, localHeadSlot + 2'u64,
|
||||||
MAX_REQUESTED_BLOCKS, sman.updateLocalBlocks,
|
MAX_REQUESTED_BLOCKS, sman.updateLocalBlocks,
|
||||||
sman.groupsCount)
|
sman.groupsCount)
|
||||||
while true:
|
while true:
|
||||||
if errorsCount == sman.failuresCount:
|
if errorsCount == sman.failuresCount:
|
||||||
# Number of consecutive errors exceeds limit
|
# Number of consecutive errors exceeds limit
|
||||||
|
error "Synchronization failed", errors = errorsCount,
|
||||||
|
duration = $(Moment.now() - startMoment)
|
||||||
break
|
break
|
||||||
|
|
||||||
pending.setLen(0)
|
pending.setLen(0)
|
||||||
|
@ -902,51 +918,64 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||||
await sman.fillGroups()
|
await sman.fillGroups()
|
||||||
sman.reorderGroups()
|
sman.reorderGroups()
|
||||||
|
|
||||||
var localLastSlot = sman.getLastLocalSlot()
|
localHeadSlot = sman.getLocalHeadSlot()
|
||||||
let remoteLastSlot = sman.getLastSlot()
|
let remoteHeadSlot = sman.getHeadSlot()
|
||||||
if remoteLastSlot > remoteLastKnownSlot:
|
if remoteHeadSlot > remoteKnownHeadSlot:
|
||||||
remoteLastKnownSlot = remoteLastSlot
|
remoteKnownHeadSlot = remoteHeadSlot
|
||||||
squeue.updateLastSlot(remoteLastKnownSlot)
|
squeue.updateLastSlot(remoteKnownHeadSlot)
|
||||||
|
|
||||||
if localLastSlot >= remoteLastKnownSlot:
|
if localHeadSlot >= remoteKnownHeadSlot:
|
||||||
info "Synchronization successfully finished"
|
info "Synchronization finished", progress = squeue.progress(),
|
||||||
|
peers = sman.peersCount(),
|
||||||
|
groups = len(sman.groups),
|
||||||
|
duration = $(Moment.now() - startMoment)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# if counter == 0:
|
if counter == 0:
|
||||||
# info "Starting synchronization", local_slot = localLastSlot,
|
info "Starting synchronization", local_head_slot = localHeadSlot,
|
||||||
# remote_slot = remoteLastKnownSlot,
|
remote_head_slot = remoteKnownHeadSlot,
|
||||||
# count = len(squeue)
|
count = len(squeue),
|
||||||
# else:
|
peers = sman.peersCount(),
|
||||||
# info "Synchronization progress", progress = squeue.progress()
|
groups = len(sman.groups),
|
||||||
discard
|
progress = squeue.progress()
|
||||||
|
|
||||||
|
else:
|
||||||
|
info "Synchronization progress", progress = squeue.progress(),
|
||||||
|
peers = sman.peersCount(),
|
||||||
|
groups = len(sman.groups),
|
||||||
|
iteration = counter
|
||||||
|
|
||||||
counter = counter + 1'u64
|
counter = counter + 1'u64
|
||||||
|
|
||||||
for i in countdown(len(sman.groups) - 1, 0):
|
for i in countdown(len(sman.groups) - 1, 0):
|
||||||
if len(squeue) == 0:
|
if len(squeue) == 0:
|
||||||
break
|
break
|
||||||
|
let groupLastSlot = sman.groups[i].getHeadSlot()
|
||||||
var req = squeue.pop(uint64(len(sman.groups[i].slots)))
|
var req = squeue.pop(uint64(len(sman.groups[i].slots)))
|
||||||
if sman.groups[i].getLastSlot() >= req.lastSlot():
|
trace "Request created", slot = req.slot, step = req.step,
|
||||||
|
count = req.count
|
||||||
|
if groupLastSlot >= req.lastSlot():
|
||||||
req.group = i
|
req.group = i
|
||||||
pending.add(getBlocks(sman.groups[i], req.slot, req.count))
|
pending.add(getBlocks(sman.groups[i], req.slot, req.count))
|
||||||
requests.add(req)
|
requests.add(req)
|
||||||
# trace "Send request to a group of peers", group = i
|
trace "Request sent to a group", group = i, slot = req.slot,
|
||||||
|
step = req.step,
|
||||||
|
count = req.count
|
||||||
else:
|
else:
|
||||||
|
trace "Request returned to queue", slot = req.slot, step = req.step,
|
||||||
|
count = req.count,
|
||||||
|
group_last_slot = groupLastSlot
|
||||||
squeue.push(req)
|
squeue.push(req)
|
||||||
|
|
||||||
if len(pending) == 0:
|
if len(pending) == 0:
|
||||||
# All the peer groups do not satisfy slot requirements
|
# All the peer groups do not satisfy slot requirements
|
||||||
# Disbanding all the peers
|
# Disbanding all the peers
|
||||||
sman.disband()
|
sman.disband()
|
||||||
await sleepAsync(sman.failurePause)
|
|
||||||
inc(errorsCount)
|
inc(errorsCount)
|
||||||
|
warn "Unable to create requests, disbanding peers", errors = errorsCount
|
||||||
|
await sleepAsync(sman.failurePause)
|
||||||
continue
|
continue
|
||||||
else:
|
|
||||||
errorsCount = 0
|
|
||||||
|
|
||||||
# TODO: If getBeaconBlocksByRange() will properly support cancellation,
|
|
||||||
# then this can be done more efficiently at the end, so you do not need
|
|
||||||
# to wait for all futures here.
|
|
||||||
await allFutures(pending)
|
await allFutures(pending)
|
||||||
|
|
||||||
var failedCount = 0
|
var failedCount = 0
|
||||||
|
@ -954,24 +983,34 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||||
if pending[i].finished() and not(pending[i].failed()):
|
if pending[i].finished() and not(pending[i].failed()):
|
||||||
let res = pending[i].read()
|
let res = pending[i].read()
|
||||||
if res.isSome():
|
if res.isSome():
|
||||||
# trace "Peer's group successfully delivered data"
|
trace "Request data received", group = requests[i].group,
|
||||||
|
slot = requests[i].slot,
|
||||||
|
step = requests[i].step,
|
||||||
|
count = requests[i].count
|
||||||
await squeue.push(requests[i], res.get().list)
|
await squeue.push(requests[i], res.get().list)
|
||||||
else:
|
else:
|
||||||
inc(failedCount)
|
inc(failedCount)
|
||||||
# trace "Peer's group failed to deliver data"
|
trace "Request failed", group = requests[i].group,
|
||||||
|
slot = requests[i].slot,
|
||||||
|
step = requests[i].step,
|
||||||
|
count = requests[i].count
|
||||||
squeue.push(requests[i])
|
squeue.push(requests[i])
|
||||||
sman.groups[requests[i].group].disband()
|
sman.groups[requests[i].group].disband()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
inc(failedCount)
|
inc(failedCount)
|
||||||
# trace "Peer's group failed to deliver data"
|
trace "Request failed", group = requests[i].group,
|
||||||
|
slot = requests[i].slot,
|
||||||
|
step = requests[i].step,
|
||||||
|
count = requests[i].count
|
||||||
squeue.push(requests[i])
|
squeue.push(requests[i])
|
||||||
sman.groups[requests[i].group].disband()
|
sman.groups[requests[i].group].disband()
|
||||||
|
|
||||||
if failedCount == len(pending):
|
if failedCount == len(pending):
|
||||||
# All the peer groups failed to download requests.
|
# All the peer groups failed to download requests.
|
||||||
await sleepAsync(sman.failurePause)
|
|
||||||
inc(errorsCount)
|
inc(errorsCount)
|
||||||
|
warn "All requests failed to deliver data, disbanding peers",
|
||||||
|
errors = errorsCount
|
||||||
|
await sleepAsync(sman.failurePause)
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
errorsCount = 0
|
errorsCount = 0
|
||||||
|
@ -982,4 +1021,9 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||||
let stamp = Moment.now()
|
let stamp = Moment.now()
|
||||||
if stamp - checkMoment > sman.statusPeriod:
|
if stamp - checkMoment > sman.statusPeriod:
|
||||||
checkMoment = stamp
|
checkMoment = stamp
|
||||||
|
info "Updating peers status"
|
||||||
await sman.updateStatus()
|
await sman.updateStatus()
|
||||||
|
info "Peers status updated", duration = $(Moment.now() - checkMoment)
|
||||||
|
|
||||||
|
# Returning all the peers back to PeerPool.
|
||||||
|
sman.disband()
|
||||||
|
|
|
@ -33,7 +33,7 @@ proc getFuture*(peer: SimplePeer): Future[void] =
|
||||||
proc `<`*(a, b: SimplePeer): bool =
|
proc `<`*(a, b: SimplePeer): bool =
|
||||||
result = `<`(a.weight, b.weight)
|
result = `<`(a.weight, b.weight)
|
||||||
|
|
||||||
proc getLastSlot*(peer: SimplePeer): Slot =
|
proc getHeadSlot*(peer: SimplePeer): Slot =
|
||||||
if len(peer.blockchain) == 0:
|
if len(peer.blockchain) == 0:
|
||||||
result = peer.latestSlot
|
result = peer.latestSlot
|
||||||
else:
|
else:
|
||||||
|
@ -550,17 +550,19 @@ proc syncQueueAsyncTests(): Future[bool] {.async.} =
|
||||||
var f24 = q2.push(r24, @[chain2[6]])
|
var f24 = q2.push(r24, @[chain2[6]])
|
||||||
var f22 = q2.push(r22, @[chain2[2], chain2[3]])
|
var f22 = q2.push(r22, @[chain2[2], chain2[3]])
|
||||||
doAssert(f24.finished == false)
|
doAssert(f24.finished == false)
|
||||||
doAssert(f22.finished == true and f22.failed == false)
|
doAssert(f22.finished == false)
|
||||||
doAssert(counter == 5)
|
doAssert(counter == 5)
|
||||||
var f21 = q2.push(r21, @[chain2[0], chain2[1]])
|
var f21 = q2.push(r21, @[chain2[0], chain2[1]])
|
||||||
doAssert(f21.finished == true and f21.failed == false)
|
doAssert(f21.finished == true and f21.failed == false)
|
||||||
await sleepAsync(100.milliseconds)
|
await sleepAsync(100.milliseconds)
|
||||||
doAssert(f24.finished == true and f24.failed == false)
|
doAssert(f22.finished == true and f22.failed == false)
|
||||||
|
doAssert(f24.finished == false)
|
||||||
doAssert(counter == 9)
|
doAssert(counter == 9)
|
||||||
var f23 = q2.push(r23, @[chain2[4], chain2[5]])
|
var f23 = q2.push(r23, @[chain2[4], chain2[5]])
|
||||||
doAssert(f23.finished == true and f23.failed == false)
|
doAssert(f23.finished == true and f23.failed == false)
|
||||||
doAssert(counter == 12)
|
doAssert(counter == 11)
|
||||||
await sleepAsync(100.milliseconds)
|
await sleepAsync(100.milliseconds)
|
||||||
|
doAssert(f24.finished == true and f24.failed == false)
|
||||||
doAssert(counter == 12)
|
doAssert(counter == 12)
|
||||||
|
|
||||||
result = true
|
result = true
|
||||||
|
@ -732,7 +734,6 @@ proc syncManagerOneGroupTest(): Future[bool] {.async.} =
|
||||||
peerSlotTimeout = 1.seconds,
|
peerSlotTimeout = 1.seconds,
|
||||||
slotsInGroup = 2)
|
slotsInGroup = 2)
|
||||||
await sman.synchronize()
|
await sman.synchronize()
|
||||||
|
|
||||||
for i in 0 ..< len(peers):
|
for i in 0 ..< len(peers):
|
||||||
if i in {0, 1, 2}:
|
if i in {0, 1, 2}:
|
||||||
doAssert(checkRequest(peers[i], 0, 10000, 20, 2,
|
doAssert(checkRequest(peers[i], 0, 10000, 20, 2,
|
||||||
|
@ -804,7 +805,6 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} =
|
||||||
peerSlotTimeout = 1.seconds,
|
peerSlotTimeout = 1.seconds,
|
||||||
slotsInGroup = 2)
|
slotsInGroup = 2)
|
||||||
await sman.synchronize()
|
await sman.synchronize()
|
||||||
|
|
||||||
for i in 0 ..< len(peers):
|
for i in 0 ..< len(peers):
|
||||||
if i in {0, 1, 2}:
|
if i in {0, 1, 2}:
|
||||||
doAssert(checkRequest(peers[i], 0, 10020, 20, 2) == true)
|
doAssert(checkRequest(peers[i], 0, 10020, 20, 2) == true)
|
||||||
|
@ -838,25 +838,59 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} =
|
||||||
10095, 10096, 10097, 10098, 10099) == true)
|
10095, 10096, 10097, 10098, 10099) == true)
|
||||||
result = true
|
result = true
|
||||||
|
|
||||||
|
proc syncManagerFailureTest(): Future[bool] {.async.} =
|
||||||
|
# Failure test
|
||||||
|
const FailuresCount = 3
|
||||||
|
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||||
|
var peer = SimplePeer.init("id1", weight = 0)
|
||||||
|
|
||||||
when isMainModule:
|
var srcChain = newTempChain(100, Slot(10000))
|
||||||
suite "SyncManager test suite":
|
var dstChain = newSeq[SignedBeaconBlock]()
|
||||||
test "BlockList tests":
|
|
||||||
# TODO
|
peer.update(srcChain, failure = true)
|
||||||
discard
|
|
||||||
# test "PeerSlot tests":
|
proc lastLocalSlot(): Slot =
|
||||||
# check waitFor(peerSlotTests()) == true
|
if len(dstChain) == 0:
|
||||||
# test "PeerGroup tests":
|
result = Slot(9999)
|
||||||
# check waitFor(peerGroupTests()) == true
|
else:
|
||||||
# test "SyncQueue non-async tests":
|
result = dstChain[^1].message.slot
|
||||||
# check syncQueueNonAsyncTests() == true
|
|
||||||
# test "SyncQueue async tests":
|
proc updateBlocks(list: openarray[SignedBeaconBlock]): bool =
|
||||||
# check waitFor(syncQueueAsyncTests()) == true
|
for item in list:
|
||||||
# test "SyncManager one-peer test":
|
dstChain.add(item)
|
||||||
# check waitFor(syncManagerOnePeerTest()) == true
|
result = true
|
||||||
# test "SyncManager one-peer-slot test":
|
|
||||||
# check waitFor(syncManagerOneSlotTest()) == true
|
doAssert(pool.addIncomingPeer(peer) == true)
|
||||||
# test "SyncManager one-peer-group test":
|
|
||||||
# check waitFor(syncManagerOneGroupTest()) == true
|
var sman = newSyncManager[SimplePeer,
|
||||||
test "SyncManager group-recovery test":
|
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||||
check waitFor(syncManagerGroupRecoveryTest()) == true
|
peersInSlot = 3,
|
||||||
|
peerSlotTimeout = 1.seconds,
|
||||||
|
slotsInGroup = 2,
|
||||||
|
failuresCount = FailuresCount,
|
||||||
|
failurePause = 100.milliseconds)
|
||||||
|
await sman.synchronize()
|
||||||
|
doAssert(len(peer.requests) == FailuresCount)
|
||||||
|
for i in 0 ..< len(peer.requests):
|
||||||
|
doAssert(checkRequest(peer, i, 10000, 20, 1) == true)
|
||||||
|
result = true
|
||||||
|
|
||||||
|
suite "SyncManager test suite":
|
||||||
|
test "PeerSlot tests":
|
||||||
|
check waitFor(peerSlotTests()) == true
|
||||||
|
test "PeerGroup tests":
|
||||||
|
check waitFor(peerGroupTests()) == true
|
||||||
|
test "SyncQueue non-async tests":
|
||||||
|
check syncQueueNonAsyncTests() == true
|
||||||
|
test "SyncQueue async tests":
|
||||||
|
check waitFor(syncQueueAsyncTests()) == true
|
||||||
|
test "SyncManager one-peer test":
|
||||||
|
check waitFor(syncManagerOnePeerTest()) == true
|
||||||
|
test "SyncManager one-peer-slot test":
|
||||||
|
check waitFor(syncManagerOneSlotTest()) == true
|
||||||
|
test "SyncManager one-peer-group test":
|
||||||
|
check waitFor(syncManagerOneGroupTest()) == true
|
||||||
|
test "SyncManager group-recovery test":
|
||||||
|
check waitFor(syncManagerGroupRecoveryTest()) == true
|
||||||
|
test "SyncManager failure test":
|
||||||
|
check waitFor(syncManagerFailureTest()) == true
|
||||||
|
|
Loading…
Reference in New Issue