From db20fc11720516641790d53f680ca072a01c68d6 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 22 Jan 2020 14:47:55 +0200 Subject: [PATCH] Fix SyncQueue push(data) bug. Rename lastSlot to HeadSlot. Add failure test. --- beacon_chain/sync_manager.nim | 176 +++++++++++++++++++++------------- tests/test_sync_manager.nim | 88 +++++++++++------ 2 files changed, 171 insertions(+), 93 deletions(-) diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index 84ae2e667..dbd16d203 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -1,22 +1,26 @@ import chronicles import options, deques, heapqueue -import spec/datatypes, spec/digest, stew/bitseqs, chronos -import peer_pool -export datatypes, digest +import stew/bitseqs, chronos, chronicles +import spec/datatypes, spec/digest, peer_pool +export datatypes, digest, chronos, chronicles -# logScope: -# topics = "syncman" +logScope: + topics = "syncman" const MAX_REQUESTED_BLOCKS* = 20'u64 type - # A - Peer type - # B - PeerID type - # - # getLastSlot(Peer): Slot - # getHeadRoot(Peer): Eth2Digest - # getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, uint64): Future[Option[seq[SignedBeaconBlock]]] - # updateStatus(Peer): void + ## A - `Peer` type + ## B - `PeerID` type + ## + ## Procedures which needs to be implemented and will be mixed to SyncManager's + ## code: + ## + ## getHeadSlot(Peer): Slot + ## getHeadRoot(Peer): Eth2Digest + ## getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, + ## uint64): Future[Option[seq[SignedBeaconBlock]]] + ## updateStatus(Peer): void PeerSlot*[A, B] = ref object peers*: seq[A] @@ -26,7 +30,7 @@ type slots*: seq[PeerSlot[A, B]] man: SyncManager[A, B] - GetLastLocalSlotCallback* = proc(): Slot + GetLocalHeadSlotCallback* = proc(): Slot UpdateLocalBlocksCallback* = proc(list: openarray[SignedBeaconBlock]): bool SyncManager*[A, B] = ref object @@ -40,7 +44,7 @@ type peerSlotTimeout: chronos.Duration peerGroupTimeout: chronos.Duration statusPeriod: chronos.Duration - getLastLocalSlot: GetLastLocalSlotCallback + getLocalHeadSlot: GetLocalHeadSlotCallback updateLocalBlocks: UpdateLocalBlocksCallback BlockList* = object @@ -127,7 +131,7 @@ proc push*(sq: SyncQueue, sr: SyncRequest, data: seq[SignedBeaconBlock]) {.async.} = ## Push successfull result to queue ``sq``. while true: - if (sq.queueSize > 0) and (sr.slot > sq.inpSlot + uint64(sq.queueSize)): + if (sq.queueSize > 0) and (sr.slot >= sq.outSlot + uint64(sq.queueSize)): await sq.notFullEvent.wait() sq.notFullEvent.clear() continue @@ -196,7 +200,7 @@ proc pop*(sq: SyncQueue, step = 0'u64): SyncRequest = result = SyncRequest(slot: sq.inpSlot, count: count, step: nstep) sq.inpSlot = sq.inpSlot + count else: - raise newException(ValueError, "Queue is already empty!") + raise newException(SyncManagerError, "Queue is already empty!") proc len*(sq: SyncQueue): uint64 {.inline.} = ## Returns number of slots left in queue ``sq``. @@ -211,7 +215,8 @@ proc total*(sq: SyncQueue): uint64 {.inline.} = proc progress*(sq: SyncQueue): string = ## Returns queue's ``sq`` progress string. - result = $len(sq) & "/" & $sq.total() + let curSlot = sq.outSlot - sq.startSlot + result = $curSlot & "/" & $sq.total() proc init*(t: typedesc[BlockList], start: Slot, count, step: uint64, list: openarray[SignedBeaconBlock]): Option[BlockList] = @@ -315,7 +320,7 @@ proc merge*(optlists: varargs[Option[BlockList]]): Option[BlockList] = result = some(res) proc newSyncManager*[A, B](pool: PeerPool[A, B], - getLastLocalSlotCb: GetLastLocalSlotCallback, + getLocalHeadSlotCb: GetLocalHeadSlotCallback, updateLocalBlocksCb: UpdateLocalBlocksCallback, peersInSlot = 3, peerSlotTimeout = 6.seconds, slotsInGroup = 2, peerGroupTimeout = 10.seconds, @@ -337,7 +342,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], ## ## ``statusPeriod`` - period of time between status updates. ## - ## ``getLastLocalSlotCb`` - function which provides current latest `Slot` in + ## ``getLocalHeadSlotCb`` - function which provides current latest `Slot` in ## local database. ## ## ``updateLocalBlocksCb`` - function which accepts list of downloaded blocks @@ -354,7 +359,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], peerSlotTimeout: peerSlotTimeout, peerGroupTimeout: peerGroupTimeout, statusPeriod: statusPeriod, - getLastLocalSlot: getLastLocalSlotCb, + getLocalHeadSlot: getLocalHeadSlotCb, updateLocalBlocks: updateLocalBlocksCb, failuresCount: failuresCount, failurePause: failurePause) @@ -369,14 +374,14 @@ proc newPeerSlot*[A, B](man: SyncManager[A, B]): PeerSlot[A, B] = proc `$`*[A, B](peerslot: PeerSlot[A, B]): string = ## Returns string representation of peer's slot ``peerslot``. - mixin getKey, getLastSlot + mixin getKey, getHeadSlot if len(peerslot.peers) == 0: result = "<>" else: result = "<" for item in peerslot.peers: result.add("\"" & getKey(item) & "\"") - result.add(":" & $getLastSlot(item)) + result.add(":" & $getHeadSlot(item)) result.add(", ") result.setLen(len(result) - 2) result.add(">") @@ -509,6 +514,12 @@ proc `$`*[A, B](man: SyncManager[A, B]): string = if len(result) > 0: result.setLen(len(result) - 2) +proc peersCount*[A, B](man: SyncManager[A, B]): int = + ## Returns number of peers which is managed by Sync Manager ``man``. + for i in 0 ..< len(man.groups): + for k in 0 ..< len(man.groups[i].slots): + result = result + len(man.groups[i].slots[k].peers) + proc fillGroups*[A, B](man: SyncManager[A, B]) {.async.} = if len(man.groups) == 0: while len(man.groups) < man.groupsCount: @@ -559,7 +570,7 @@ proc isEmpty*[A, B](man: SyncManager[A, B]): bool = result = (len(man.groups) == 0) proc reorderGroups*[A, B](man: SyncManager[A, B]) = - mixin getLastSlot + mixin getHeadSlot doAssert(not(man.isEmpty())) var x, y, z: int @@ -568,7 +579,7 @@ proc reorderGroups*[A, B](man: SyncManager[A, B]) = for j0 in 0 ..< len(group0.slots): let slot0 = group0.slots[j0] for k0 in 0 ..< len(slot0.peers): - var curSlot = getLastSlot(slot0.peers[k0]) + var curSlot = getHeadSlot(slot0.peers[k0]) x = -1; y = -1; z = -1 for i1 in i0 ..< len(man.groups): @@ -577,7 +588,7 @@ proc reorderGroups*[A, B](man: SyncManager[A, B]) = let slot1 = group1.slots[j1] let start = if (i1 == i0) and (j1 == j0): k0 + 1 else: 0 for k1 in start ..< len(slot1.peers): - let newSlot = getLastSlot(slot1.peers[k1]) + let newSlot = getHeadSlot(slot1.peers[k1]) if curSlot < newSlot: curSlot = newSlot x = i1; y = j1; z = k1 @@ -605,39 +616,39 @@ proc disband*[A, B](syncman: SyncManager[A, B]) = disband(group) syncman.groups.setLen(0) -proc getLastSlot*[A, B](peerslot: PeerSlot[A, B]): Slot = +proc getHeadSlot*[A, B](peerslot: PeerSlot[A, B]): Slot = ## Returns minimal available beacon chain slot, for peer's slot ``peerslot``. - mixin getLastSlot + mixin getHeadSlot doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero") for i in 0 ..< len(peerslot.peers): if i == 0: - result = getLastSlot(peerslot.peers[i]) + result = getHeadSlot(peerslot.peers[i]) else: - let slot = getLastSlot(peerslot.peers[i]) + let slot = getHeadSlot(peerslot.peers[i]) if slot < result: result = slot -proc getLastSlot*[A, B](peergroup: PeerGroup[A, B]): Slot = +proc getHeadSlot*[A, B](peergroup: PeerGroup[A, B]): Slot = ## Returns minimal available beacon chain slot, for peer's group ## ``peergroup``. doAssert(len(peergroup.slots) > 0, "Number of slots in group must not be zero") for i in 0 ..< len(peergroup.slots): if i == 0: - result = getLastSlot(peergroup.slots[i]) + result = getHeadSlot(peergroup.slots[i]) else: - let slot = getLastSlot(peergroup.slots[i]) + let slot = getHeadSlot(peergroup.slots[i]) if slot < result: result = slot -proc getLastSlot*[A, B](sman: SyncManager[A, B]): Slot = +proc getHeadSlot*[A, B](sman: SyncManager[A, B]): Slot = ## Returns minimal available beacon chain slot, for all peers in sync manager ## ``sman``. for i in 0 ..< len(sman.groups): if i == 0: - result = getLastSlot(sman.groups[i]) + result = getHeadSlot(sman.groups[i]) else: - let slot = getLastSlot(sman.groups[i]) + let slot = getHeadSlot(sman.groups[i]) if slot < result: result = slot @@ -878,22 +889,27 @@ proc updateStatus*[A, B](sman: SyncManager[A, B]) {.async.} = raise exc proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = + ## TODO: This synchronization procedure is not optimal, we can do it better + ## if spawn N parallel tasks, where N is number of peer groups. var squeue: SyncQueue - remoteLastKnownSlot: Slot - localLastSlot: Slot = sman.getLastLocalSlot() + remoteKnownHeadSlot: Slot + localHeadSlot: Slot = sman.getLocalHeadSlot() pending = newSeq[Future[OptionBlockList]]() requests = newSeq[SyncRequest]() - checkMoment = Moment.now() + startMoment = Moment.now() + checkMoment = startMoment errorsCount = 0 counter = 0'u64 - squeue = SyncQueue.init(localLastSlot + 1'u64, localLastSlot + 2'u64, + squeue = SyncQueue.init(localHeadSlot + 1'u64, localHeadSlot + 2'u64, MAX_REQUESTED_BLOCKS, sman.updateLocalBlocks, sman.groupsCount) while true: if errorsCount == sman.failuresCount: # Number of consecutive errors exceeds limit + error "Synchronization failed", errors = errorsCount, + duration = $(Moment.now() - startMoment) break pending.setLen(0) @@ -902,51 +918,64 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = await sman.fillGroups() sman.reorderGroups() - var localLastSlot = sman.getLastLocalSlot() - let remoteLastSlot = sman.getLastSlot() - if remoteLastSlot > remoteLastKnownSlot: - remoteLastKnownSlot = remoteLastSlot - squeue.updateLastSlot(remoteLastKnownSlot) + localHeadSlot = sman.getLocalHeadSlot() + let remoteHeadSlot = sman.getHeadSlot() + if remoteHeadSlot > remoteKnownHeadSlot: + remoteKnownHeadSlot = remoteHeadSlot + squeue.updateLastSlot(remoteKnownHeadSlot) - if localLastSlot >= remoteLastKnownSlot: - info "Synchronization successfully finished" + if localHeadSlot >= remoteKnownHeadSlot: + info "Synchronization finished", progress = squeue.progress(), + peers = sman.peersCount(), + groups = len(sman.groups), + duration = $(Moment.now() - startMoment) break else: - # if counter == 0: - # info "Starting synchronization", local_slot = localLastSlot, - # remote_slot = remoteLastKnownSlot, - # count = len(squeue) - # else: - # info "Synchronization progress", progress = squeue.progress() - discard + if counter == 0: + info "Starting synchronization", local_head_slot = localHeadSlot, + remote_head_slot = remoteKnownHeadSlot, + count = len(squeue), + peers = sman.peersCount(), + groups = len(sman.groups), + progress = squeue.progress() + + else: + info "Synchronization progress", progress = squeue.progress(), + peers = sman.peersCount(), + groups = len(sman.groups), + iteration = counter counter = counter + 1'u64 for i in countdown(len(sman.groups) - 1, 0): if len(squeue) == 0: break + let groupLastSlot = sman.groups[i].getHeadSlot() var req = squeue.pop(uint64(len(sman.groups[i].slots))) - if sman.groups[i].getLastSlot() >= req.lastSlot(): + trace "Request created", slot = req.slot, step = req.step, + count = req.count + if groupLastSlot >= req.lastSlot(): req.group = i pending.add(getBlocks(sman.groups[i], req.slot, req.count)) requests.add(req) - # trace "Send request to a group of peers", group = i + trace "Request sent to a group", group = i, slot = req.slot, + step = req.step, + count = req.count else: + trace "Request returned to queue", slot = req.slot, step = req.step, + count = req.count, + group_last_slot = groupLastSlot squeue.push(req) if len(pending) == 0: # All the peer groups do not satisfy slot requirements # Disbanding all the peers sman.disband() - await sleepAsync(sman.failurePause) inc(errorsCount) + warn "Unable to create requests, disbanding peers", errors = errorsCount + await sleepAsync(sman.failurePause) continue - else: - errorsCount = 0 - # TODO: If getBeaconBlocksByRange() will properly support cancellation, - # then this can be done more efficiently at the end, so you do not need - # to wait for all futures here. await allFutures(pending) var failedCount = 0 @@ -954,24 +983,34 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = if pending[i].finished() and not(pending[i].failed()): let res = pending[i].read() if res.isSome(): - # trace "Peer's group successfully delivered data" + trace "Request data received", group = requests[i].group, + slot = requests[i].slot, + step = requests[i].step, + count = requests[i].count await squeue.push(requests[i], res.get().list) else: inc(failedCount) - # trace "Peer's group failed to deliver data" + trace "Request failed", group = requests[i].group, + slot = requests[i].slot, + step = requests[i].step, + count = requests[i].count squeue.push(requests[i]) sman.groups[requests[i].group].disband() - else: inc(failedCount) - # trace "Peer's group failed to deliver data" + trace "Request failed", group = requests[i].group, + slot = requests[i].slot, + step = requests[i].step, + count = requests[i].count squeue.push(requests[i]) sman.groups[requests[i].group].disband() if failedCount == len(pending): # All the peer groups failed to download requests. - await sleepAsync(sman.failurePause) inc(errorsCount) + warn "All requests failed to deliver data, disbanding peers", + errors = errorsCount + await sleepAsync(sman.failurePause) continue else: errorsCount = 0 @@ -982,4 +1021,9 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = let stamp = Moment.now() if stamp - checkMoment > sman.statusPeriod: checkMoment = stamp + info "Updating peers status" await sman.updateStatus() + info "Peers status updated", duration = $(Moment.now() - checkMoment) + + # Returning all the peers back to PeerPool. + sman.disband() diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 1ca0b5837..f5298f173 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -33,7 +33,7 @@ proc getFuture*(peer: SimplePeer): Future[void] = proc `<`*(a, b: SimplePeer): bool = result = `<`(a.weight, b.weight) -proc getLastSlot*(peer: SimplePeer): Slot = +proc getHeadSlot*(peer: SimplePeer): Slot = if len(peer.blockchain) == 0: result = peer.latestSlot else: @@ -550,17 +550,19 @@ proc syncQueueAsyncTests(): Future[bool] {.async.} = var f24 = q2.push(r24, @[chain2[6]]) var f22 = q2.push(r22, @[chain2[2], chain2[3]]) doAssert(f24.finished == false) - doAssert(f22.finished == true and f22.failed == false) + doAssert(f22.finished == false) doAssert(counter == 5) var f21 = q2.push(r21, @[chain2[0], chain2[1]]) doAssert(f21.finished == true and f21.failed == false) await sleepAsync(100.milliseconds) - doAssert(f24.finished == true and f24.failed == false) + doAssert(f22.finished == true and f22.failed == false) + doAssert(f24.finished == false) doAssert(counter == 9) var f23 = q2.push(r23, @[chain2[4], chain2[5]]) doAssert(f23.finished == true and f23.failed == false) - doAssert(counter == 12) + doAssert(counter == 11) await sleepAsync(100.milliseconds) + doAssert(f24.finished == true and f24.failed == false) doAssert(counter == 12) result = true @@ -732,7 +734,6 @@ proc syncManagerOneGroupTest(): Future[bool] {.async.} = peerSlotTimeout = 1.seconds, slotsInGroup = 2) await sman.synchronize() - for i in 0 ..< len(peers): if i in {0, 1, 2}: doAssert(checkRequest(peers[i], 0, 10000, 20, 2, @@ -804,7 +805,6 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = peerSlotTimeout = 1.seconds, slotsInGroup = 2) await sman.synchronize() - for i in 0 ..< len(peers): if i in {0, 1, 2}: doAssert(checkRequest(peers[i], 0, 10020, 20, 2) == true) @@ -838,25 +838,59 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = 10095, 10096, 10097, 10098, 10099) == true) result = true +proc syncManagerFailureTest(): Future[bool] {.async.} = + # Failure test + const FailuresCount = 3 + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var peer = SimplePeer.init("id1", weight = 0) -when isMainModule: - suite "SyncManager test suite": - test "BlockList tests": - # TODO - discard - # test "PeerSlot tests": - # check waitFor(peerSlotTests()) == true - # test "PeerGroup tests": - # check waitFor(peerGroupTests()) == true - # test "SyncQueue non-async tests": - # check syncQueueNonAsyncTests() == true - # test "SyncQueue async tests": - # check waitFor(syncQueueAsyncTests()) == true - # test "SyncManager one-peer test": - # check waitFor(syncManagerOnePeerTest()) == true - # test "SyncManager one-peer-slot test": - # check waitFor(syncManagerOneSlotTest()) == true - # test "SyncManager one-peer-group test": - # check waitFor(syncManagerOneGroupTest()) == true - test "SyncManager group-recovery test": - check waitFor(syncManagerGroupRecoveryTest()) == true + var srcChain = newTempChain(100, Slot(10000)) + var dstChain = newSeq[SignedBeaconBlock]() + + peer.update(srcChain, failure = true) + + proc lastLocalSlot(): Slot = + if len(dstChain) == 0: + result = Slot(9999) + else: + result = dstChain[^1].message.slot + + proc updateBlocks(list: openarray[SignedBeaconBlock]): bool = + for item in list: + dstChain.add(item) + result = true + + doAssert(pool.addIncomingPeer(peer) == true) + + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, lastLocalSlot, updateBlocks, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 2, + failuresCount = FailuresCount, + failurePause = 100.milliseconds) + await sman.synchronize() + doAssert(len(peer.requests) == FailuresCount) + for i in 0 ..< len(peer.requests): + doAssert(checkRequest(peer, i, 10000, 20, 1) == true) + result = true + +suite "SyncManager test suite": + test "PeerSlot tests": + check waitFor(peerSlotTests()) == true + test "PeerGroup tests": + check waitFor(peerGroupTests()) == true + test "SyncQueue non-async tests": + check syncQueueNonAsyncTests() == true + test "SyncQueue async tests": + check waitFor(syncQueueAsyncTests()) == true + test "SyncManager one-peer test": + check waitFor(syncManagerOnePeerTest()) == true + test "SyncManager one-peer-slot test": + check waitFor(syncManagerOneSlotTest()) == true + test "SyncManager one-peer-group test": + check waitFor(syncManagerOneGroupTest()) == true + test "SyncManager group-recovery test": + check waitFor(syncManagerGroupRecoveryTest()) == true + test "SyncManager failure test": + check waitFor(syncManagerFailureTest()) == true