From 73dc72583fd0a6ef2287db939090fab3aebc3312 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Tue, 21 Jan 2020 20:30:21 +0200 Subject: [PATCH 01/14] Initial commit. --- beacon_chain/peer_pool.nim | 50 +- beacon_chain/sync_manager.nim | 985 ++++++++++++++++++++++++++++++++++ tests/test_sync_manager.nim | 862 +++++++++++++++++++++++++++++ 3 files changed, 1896 insertions(+), 1 deletion(-) create mode 100644 beacon_chain/sync_manager.nim create mode 100644 tests/test_sync_manager.nim diff --git a/beacon_chain/peer_pool.nim b/beacon_chain/peer_pool.nim index 02f5fad37..316ac682f 100644 --- a/beacon_chain/peer_pool.nim +++ b/beacon_chain/peer_pool.nim @@ -34,6 +34,8 @@ type acqIncPeersCount: int acqOutPeersCount: int + PeerPoolError* = object of CatchableError + proc `<`*(a, b: PeerIndex): bool = result = a.cmp(b, a) @@ -219,9 +221,17 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = proc addPeer*[A, B](pool: PeerPool[A, B], peer: A, peerType: PeerType): bool = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## - ## Returns ``true`` on success. + ## Procedure returns ``false`` in case + ## * if ``peer`` is already closed. + ## * if ``pool`` currently has a maximum of peers. + ## * if ``pool`` currently has a maximum of `Incoming` or `Outgoing` peers. + ## + ## Procedure returns ``true`` on success. mixin getKey, getFuture + if peer.getFuture().finished: + return false + if len(pool.registry) >= pool.maxPeersCount: return false @@ -289,6 +299,22 @@ proc acquire*[A, B](pool: PeerPool[A, B], result = item[].data break +proc acquireNoWait*[A, B](pool: PeerPool[A, B], + filter = {PeerType.Incoming, + PeerType.Outgoing}): A = + doAssert(filter != {}, "Filter must not be empty") + var count = 0 + if PeerType.Incoming in filter: + count = count + len(pool.incQueue) + if PeerType.Outgoing in filter: + count = count + len(pool.outQueue) + if count < 1: + raise newException(PeerPoolError, "Not enough peers in pool") + var item = pool.getItem(filter) + doAssert(PeerFlags.Acquired notin item[].flags) + item[].flags.incl(PeerFlags.Acquired) + result = item[].data + proc release*[A, B](pool: PeerPool[A, B], peer: A) = ## Release peer ``peer`` back to PeerPool ``pool`` mixin getKey @@ -356,6 +382,28 @@ proc acquire*[A, B](pool: PeerPool[A, B], raise result = peers +proc acquireNoWait*[A, B](pool: PeerPool[A, B], + number: int, + filter = {PeerType.Incoming, + PeerType.Outgoing}): seq[A] = + ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the + ## filter ``filter``. + doAssert(filter != {}, "Filter must not be empty") + var peers = newSeq[A]() + var count = 0 + if PeerType.Incoming in filter: + count = count + len(pool.incQueue) + if PeerType.Outgoing in filter: + count = count + len(pool.outQueue) + if count < number: + raise newException(PeerPoolError, "Not enough peers in pool") + for i in 0 ..< number: + var item = pool.getItem(filter) + doAssert(PeerFlags.Acquired notin item[].flags) + item[].flags.incl(PeerFlags.Acquired) + peers.add(item[].data) + result = peers + proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = ## Acquire single incoming peer from PeerPool ``pool``. pool.acquire({PeerType.Incoming}) diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim new file mode 100644 index 000000000..84ae2e667 --- /dev/null +++ b/beacon_chain/sync_manager.nim @@ -0,0 +1,985 @@ +import chronicles +import options, deques, heapqueue +import spec/datatypes, spec/digest, stew/bitseqs, chronos +import peer_pool +export datatypes, digest + +# logScope: +# topics = "syncman" + +const MAX_REQUESTED_BLOCKS* = 20'u64 + +type + # A - Peer type + # B - PeerID type + # + # getLastSlot(Peer): Slot + # getHeadRoot(Peer): Eth2Digest + # getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, uint64): Future[Option[seq[SignedBeaconBlock]]] + # updateStatus(Peer): void + + PeerSlot*[A, B] = ref object + peers*: seq[A] + man: SyncManager[A, B] + + PeerGroup*[A, B] = ref object + slots*: seq[PeerSlot[A, B]] + man: SyncManager[A, B] + + GetLastLocalSlotCallback* = proc(): Slot + UpdateLocalBlocksCallback* = proc(list: openarray[SignedBeaconBlock]): bool + + SyncManager*[A, B] = ref object + groups*: seq[PeerGroup[A, B]] + pool: PeerPool[A, B] + peersInSlot: int + slotsInGroup: int + groupsCount: int + failuresCount: int + failurePause: chronos.Duration + peerSlotTimeout: chronos.Duration + peerGroupTimeout: chronos.Duration + statusPeriod: chronos.Duration + getLastLocalSlot: GetLastLocalSlotCallback + updateLocalBlocks: UpdateLocalBlocksCallback + + BlockList* = object + list*: seq[SignedBeaconBlock] + map*: BitSeq + start*: Slot + + OptionBlockList* = Option[BlockList] + OptionBeaconBlockSeq* = Option[seq[SignedBeaconBlock]] + + SyncRequest* = object + slot*: Slot + count*: uint64 + step*: uint64 + group*: int + + SyncResult* = object + request*: SyncRequest + data*: seq[SignedBeaconBlock] + + SyncQueue* = ref object + inpSlot*: Slot + outSlot*: Slot + + startSlot*: Slot + lastSlot: Slot + chunkSize*: uint64 + queueSize*: int + + notFullEvent*: AsyncEvent + syncUpdate*: UpdateLocalBlocksCallback + + debtsQueue: HeapQueue[SyncRequest] + debtsCount: uint64 + readyQueue: HeapQueue[SyncResult] + readyData: seq[seq[SignedBeaconBlock]] + + SyncManagerError* = object of CatchableError + +proc init*(t: typedesc[SyncQueue], start, last: Slot, chunkSize: uint64, + updateCb: UpdateLocalBlocksCallback, + queueSize: int = -1): SyncQueue = + ## Create new synchronization queue with parameters + ## + ## ``start`` and ``last`` are starting and finishing Slots. + ## + ## ``chunkSize`` maximum number of slots in one request. + ## + ## ``queueSize`` maximum queue size for incoming data. If ``queueSize > 0`` + ## queue will help to keep backpressure under control. If ``queueSize <= 0`` + ## then queue size is unlimited (default). + ## + ## ``updateCb`` procedure which will be used to send downloaded blocks to + ## consumer. Block sequences will be sent sequentially. Procedure should + ## return ``false`` only when it receives incorrect blocks, and ``true`` + ## if sequence of blocks is correct. + doAssert(chunkSize > 0'u64, "Chunk size should not be zero") + result = SyncQueue(startSlot: start, lastSlot: last, chunkSize: chunkSize, + queueSize: queueSize, syncUpdate: updateCb, + notFullEvent: newAsyncEvent(), + debtsQueue: initHeapQueue[SyncRequest](), + inpSlot: start, outSlot: start) + +proc `<`*(a, b: SyncRequest): bool {.inline.} = + result = (a.slot < b.slot) + +proc `<`*(a, b: SyncResult): bool {.inline.} = + result = (a.request.slot < b.request.slot) + +proc `==`*(a, b: SyncRequest): bool {.inline.} = + result = ((a.slot == b.slot) and (a.count == b.count) and + (a.step == b.step)) + +proc lastSlot*(req: SyncRequest): Slot {.inline.} = + ## Returns last slot for request ``req``. + result = req.slot + req.count - 1'u64 + +proc updateLastSlot*(sq: SyncQueue, last: Slot) {.inline.} = + ## Update last slot stored in queue ``sq`` with value ``last``. + doAssert(sq.lastSlot <= last, "Last slot could not be lower then stored one") + sq.lastSlot = last + +proc push*(sq: SyncQueue, sr: SyncRequest, + data: seq[SignedBeaconBlock]) {.async.} = + ## Push successfull result to queue ``sq``. + while true: + if (sq.queueSize > 0) and (sr.slot > sq.inpSlot + uint64(sq.queueSize)): + await sq.notFullEvent.wait() + sq.notFullEvent.clear() + continue + let res = SyncResult(request: sr, data: data) + sq.readyQueue.push(res) + break + + while len(sq.readyQueue) > 0: + let minSlot = sq.readyQueue[0].request.slot + if sq.outSlot != minSlot: + break + let item = sq.readyQueue.pop() + if not(sq.syncUpdate(item.data)): + sq.debtsQueue.push(item.request) + sq.debtsCount = sq.debtsCount + item.request.count + break + sq.outSlot = sq.outSlot + item.request.count + sq.notFullEvent.fire() + +proc push*(sq: SyncQueue, sr: SyncRequest) = + ## Push failed request back to queue. + sq.debtsQueue.push(sr) + sq.debtsCount = sq.debtsCount + sr.count + +proc push*(sq: SyncQueue, sr: SyncRequest, newstep: uint64) = + ## Push request with changed number of steps. + doAssert(sr.step > newstep, "The new step should be less than the original") + var count = sr.count + var slot = sr.slot + var newcount = 0'u64 + + for i in 0 ..< (sr.step div newstep): + if newstep * sq.chunkSize <= count: + newcount = newstep * sq.chunkSize + else: + newcount = count + var newsr = SyncRequest(slot: slot, count: newcount, step: newstep) + slot = slot + newcount + count = count - newcount + sq.debtsQueue.push(newsr) + sq.debtsCount = sq.debtsCount + newsr.count + if count == 0: + break + + if count > 0'u64: + let step = sr.step mod newstep + doAssert(step * sq.chunkSize <= count) + var newsr = SyncRequest(slot: slot, count: count, step: step) + sq.debtsQueue.push(newsr) + sq.debtsCount = sq.debtsCount + newsr.count + +proc pop*(sq: SyncQueue, step = 0'u64): SyncRequest = + ## Obtain request from queue ``sq``. + if len(sq.debtsQueue) > 0: + var sr = sq.debtsQueue.pop() + if step != 0'u64: + if sr.step > step: + sq.push(sr, step) + sr = sq.debtsQueue.pop() + sq.debtsCount = sq.debtsCount - sr.count + result = sr + else: + let nstep = if step == 0'u64: 1'u64 else: step + if sq.inpSlot <= sq.lastSlot: + let count = min(sq.lastSlot + 1'u64 - sq.inpSlot, sq.chunkSize * nstep) + result = SyncRequest(slot: sq.inpSlot, count: count, step: nstep) + sq.inpSlot = sq.inpSlot + count + else: + raise newException(ValueError, "Queue is already empty!") + +proc len*(sq: SyncQueue): uint64 {.inline.} = + ## Returns number of slots left in queue ``sq``. + if sq.inpSlot > sq.lastSlot: + result = sq.debtsCount + else: + result = sq.lastSlot - sq.inpSlot + 1'u64 + sq.debtsCount + +proc total*(sq: SyncQueue): uint64 {.inline.} = + ## Returns total number of slots in queue ``sq``. + result = sq.lastSlot - sq.startSlot + 1'u64 + +proc progress*(sq: SyncQueue): string = + ## Returns queue's ``sq`` progress string. + result = $len(sq) & "/" & $sq.total() + +proc init*(t: typedesc[BlockList], start: Slot, count, step: uint64, + list: openarray[SignedBeaconBlock]): Option[BlockList] = + mixin getSlot + var res: BlockList + var error = false + var current = start + var index = 0 + + res.map = BitSeq.init(0) + + for i in 0'u64 ..< count: + if index < len(list): + let slot = list[index].message.slot + if slot < current: + error = true + break + elif slot == current: + res.map.add(true) + inc(index) + else: + res.map.add(false) + else: + res.map.add(false) + + let next = current + step + current = current + 1'u64 + if i < (count - 1): + while current < next: + res.map.add(false) + current = current + 1'u64 + + if not(error) and index == len(list): + res.list = @list + res.start = start + result = some(res) + +proc init*(t: typedesc[BlockList], start, finish: Slot): BlockList = + result = BlockList(start: start) + result.map = BitSeq.init(int((finish - start) + 1'u64)) + +proc `$`*(blist: BlockList): string = + var index = 0 + for i in 0 ..< len(blist.map): + if blist.map[i]: + result = result & $blist.list[index].message.slot & ", " + index = index + 1 + else: + result = result & ", " + if len(result) > 2: + result.setLen(len(result) - 2) + +proc startSlot*(blist: BlockList): Slot {.inline.} = + result = blist.start + +proc lastSlot*(blist: BlockList): Slot {.inline.} = + doAssert(len(blist.map) > 0) + result = blist.start + uint64(len(blist.map) - 1) + +proc contains*(blist: BlockList, slot: Slot): bool {.inline.} = + if (blist.startSlot() <= slot) and (slot <= blist.lastSlot()): + result = true + +proc merge*(optlists: varargs[Option[BlockList]]): Option[BlockList] = + if len(optlists) > 0: + if len(optlists) == 1: + result = optlists[0] + else: + var blists = newSeq[BlockList](len(optlists)) + for i in 0 ..< len(optlists): + doAssert(optlists[i].isSome()) # Must not be happens + blists[i] = optlists[i].get() + + var minSlot, maxSlot: Slot + for i in 0 ..< len(blists): + if i == 0: + minSlot = blists[i].startSlot() + maxSlot = blists[i].lastSlot() + else: + let candidateMinSlot = blists[i].startSlot() + let candidateMaxSlot = blists[i].lastSlot() + if candidateMinSlot < minSlot: + minSlot = candidateMinSlot + if candidateMaxSlot > maxSlot: + maxSlot = candidateMaxSlot + + var res = BlockList.init(minSlot, maxSlot) + var slot = minSlot + var indexes = newSeq[int](len(blists)) + var resIndex = 0 + while slot <= maxSlot: + for i in 0 ..< len(blists): + if blists[i].contains(slot): + let slotIndex = slot - blists[i].startSlot() + if blists[i].map[slotIndex]: + res.map.setBit(resIndex) + res.list.add(blists[i].list[indexes[i]]) + inc(indexes[i]) + inc(resIndex) + slot = slot + 1'u64 + result = some(res) + +proc newSyncManager*[A, B](pool: PeerPool[A, B], + getLastLocalSlotCb: GetLastLocalSlotCallback, + updateLocalBlocksCb: UpdateLocalBlocksCallback, + peersInSlot = 3, peerSlotTimeout = 6.seconds, + slotsInGroup = 2, peerGroupTimeout = 10.seconds, + groupsCount = 10, + statusPeriod = 10.minutes, + failuresCount = 3, + failurePause = 5.seconds): SyncManager[A, B] = + ## ``pool`` - PeerPool object which will be used as source of peers. + ## + ## ``peersInSlot`` - maximum number of peers in slot. + ## + ## ``peerSlotTimeout`` - timeout for PeerSlot.getBlocks() execution. + ## + ## ``slotsInGroup`` - maximum number of slots in group. + ## + ## ``peerGroupTimeout`` - timeout for PeerGroup.getBlocks() execution. + ## + ## ``groupsCount`` - maximum number of groups used in sync process. + ## + ## ``statusPeriod`` - period of time between status updates. + ## + ## ``getLastLocalSlotCb`` - function which provides current latest `Slot` in + ## local database. + ## + ## ``updateLocalBlocksCb`` - function which accepts list of downloaded blocks + ## and stores it to local database. + ## + ## ``failuresCount`` - number of consecutive failures, after which the + ## procedure will exit. + ## + ## ``failurePause`` - period of time which will be waited by sync manager, if + ## all the nodes could not satisfy requested slot. + result = SyncManager[A, B](pool: pool, peersInSlot: peersInSlot, + slotsInGroup: slotsInGroup, + groupsCount: groupsCount, + peerSlotTimeout: peerSlotTimeout, + peerGroupTimeout: peerGroupTimeout, + statusPeriod: statusPeriod, + getLastLocalSlot: getLastLocalSlotCb, + updateLocalBlocks: updateLocalBlocksCb, + failuresCount: failuresCount, + failurePause: failurePause) + +template nearestOdd(number: int): int = + number - ((number - 1) mod 2) + +proc newPeerSlot*[A, B](man: SyncManager[A, B]): PeerSlot[A, B] = + result = PeerSlot[A, B]() + result.man = man + result.peers = newSeq[A]() + +proc `$`*[A, B](peerslot: PeerSlot[A, B]): string = + ## Returns string representation of peer's slot ``peerslot``. + mixin getKey, getLastSlot + if len(peerslot.peers) == 0: + result = "<>" + else: + result = "<" + for item in peerslot.peers: + result.add("\"" & getKey(item) & "\"") + result.add(":" & $getLastSlot(item)) + result.add(", ") + result.setLen(len(result) - 2) + result.add(">") + +proc isFull*[A, B](peerslot: PeerSlot[A, B]): bool {.inline.} = + ## Returns ``true`` if peer's slot ``peerslot`` is full of peers. + result = (len(peerslot.peers) == peerslot.man.peersInSlot) + +proc isEmpty*[A, B](peerslot: PeerSlot[A, B]): bool {.inline.} = + ## Returns ``true`` if peer's slot ``peerslot`` is empty (out of peers). + result = (len(peerslot.peers) == 0) + +proc fillPeers*[A, B](slot: PeerSlot[A, B]) {.async.} = + doAssert(slot.man.peersInSlot > 0 and + (slot.man.peersInSlot mod 2 == 1)) + doAssert(len(slot.peers) == 0 or (len(slot.peers) mod 2 == 0)) + doAssert(len(slot.peers) <= slot.man.peersInSlot) + if len(slot.peers) == 0: + # This is new slot + var peer = await slot.man.pool.acquire() + let available = slot.man.pool.lenAvailable() + slot.peers.add(peer) + if available > 0: + if available + len(slot.peers) < slot.man.peersInSlot: + # There not enoug available peers in pool, so we add only some of them, + # but we still want to keep number of peers in slot odd. + let count = nearestOdd(available + len(slot.peers)) + if count > len(slot.peers): + let peers = slot.man.pool.acquireNoWait(count - len(slot.peers)) + slot.peers.add(peers) + else: + # There enough peers to fill a slot. + let peers = slot.man.pool.acquireNoWait(slot.man.peersInSlot - + len(slot.peers)) + slot.peers.add(peers) + else: + # Only one peer obtained and there no more available peers, so we are + # starting with just one peer. + discard + else: + # If slot already has some peers, then we are not going to wait for peers, + # we will consume everything available. + if len(slot.peers) < slot.man.peersInSlot: + # Slot do not have enough peers inside, we need to add missing peers. + let available = slot.man.pool.lenAvailable() + if available == 0: + # There no peers available so we just exiting + discard + else: + if available + len(slot.peers) < slot.man.peersInSlot: + let count = nearestOdd(available + len(slot.peers)) + let peers = slot.man.pool.acquireNoWait(count - len(slot.peers)) + slot.peers.add(peers) + else: + let peers = slot.man.pool.acquireNoWait(slot.man.peersInSlot - + len(slot.peers)) + slot.peers.add(peers) + else: + # Slot has enough peers inside, we do nothing here + discard + +proc newPeerGroup*[A, B](man: SyncManager[A, B]): PeerGroup[A, B] = + result = PeerGroup[A, B]() + result.man = man + result.slots = newSeq[PeerSlot[A, B]]() + +proc fillSlots*[A, B](group: PeerGroup[A, B]) {.async.} = + ## Filling peer's group ``group`` with peers from PeerPool. + if len(group.slots) == 0: + while len(group.slots) < group.man.slotsInGroup: + var slot = newPeerSlot[A, B](group.man) + await slot.fillPeers() + doAssert(not(slot.isEmpty())) + group.slots.add(slot) + if not(slot.isFull()) or (group.man.pool.lenAvailable() == 0): + break + else: + for i in 0 ..< group.man.slotsInGroup: + if i < len(group.slots): + if group.man.pool.lenAvailable() == 0: + break + # PeerPool is not empty, so this call will be finished immediately. + await group.slots[i].fillPeers() + if not(group.slots[i].isFull()): + break + else: + if group.man.pool.lenAvailable() == 0: + break + var slot = newPeerSlot[A, B](group.man) + # PeerPool is not empty, so this call will be finished immediately. + await slot.fillPeers() + doAssert(not(slot.isEmpty())) + group.slots.add(slot) + if not(slot.isFull()): + break + +proc isFull*[A, B](group: PeerGroup[A, B]): bool = + result = false + if len(group.slots) >= group.man.slotsInGroup: + result = true + for item in group.slots: + if not(item.isFull()): + result = false + break + +proc isEmpty*[A, B](group: PeerGroup[A, B]): bool = + result = (len(group.slots) == 0) + +proc `$`*[A, B](group: PeerGroup[A, B]): string = + if len(group.slots) == 0: + result = "[]" + else: + result = "[" + for item in group.slots: + result.add($item) + result.add(", ") + result.setLen(len(result) - 2) + result.add("]") + +proc `$`*[A, B](man: SyncManager[A, B]): string = + result = "" + for i in 0 ..< man.groupsCount: + result.add($i & ":") + if i < len(man.groups): + result.add($man.groups[i]) + else: + result.add("[]") + result.add(", ") + + if len(result) > 0: + result.setLen(len(result) - 2) + +proc fillGroups*[A, B](man: SyncManager[A, B]) {.async.} = + if len(man.groups) == 0: + while len(man.groups) < man.groupsCount: + var group = newPeerGroup[A, B](man) + await group.fillSlots() + doAssert(not(group.isEmpty())) + man.groups.add(group) + if not(group.isFull()) or (man.pool.lenAvailable() == 0): + break + else: + for i in 0 ..< man.groupsCount: + if i < len(man.groups): + if man.pool.lenAvailable() == 0: + break + # PeerPool is not empty, so this call will be finished immediately. + await man.groups[i].fillSlots() + if not(man.groups[i].isFull()): + break + else: + if man.pool.lenAvailable() == 0: + break + var group = newPeerGroup[A, B](man) + # PeerPool is not empty, so this call will be finished immediately. + await group.fillSlots() + doAssert(not(group.isEmpty())) + man.groups.add(group) + if not(group.isFull()): + break + +proc compactGroups*[A, B](man: SyncManager[A, B]) = + ## Removes empty slots from SyncManager's groups list. + var ngroups = newSeq[PeerGroup[A, B]]() + for i in 0 ..< len(man.groups): + if not(man.groups[i].isEmpty()): + ngroups.add(man.groups[i]) + man.groups = ngroups + +proc isFull*[A, B](man: SyncManager[A, B]): bool = + result = false + if len(man.groups) >= man.groupsCount: + result = true + for item in man.groups: + if not(item.isFull()): + result = false + break + +proc isEmpty*[A, B](man: SyncManager[A, B]): bool = + result = (len(man.groups) == 0) + +proc reorderGroups*[A, B](man: SyncManager[A, B]) = + mixin getLastSlot + doAssert(not(man.isEmpty())) + + var x, y, z: int + for i0 in 0 ..< len(man.groups): + let group0 = man.groups[i0] + for j0 in 0 ..< len(group0.slots): + let slot0 = group0.slots[j0] + for k0 in 0 ..< len(slot0.peers): + var curSlot = getLastSlot(slot0.peers[k0]) + x = -1; y = -1; z = -1 + + for i1 in i0 ..< len(man.groups): + let group1 = man.groups[i1] + for j1 in j0 ..< len(group1.slots): + let slot1 = group1.slots[j1] + let start = if (i1 == i0) and (j1 == j0): k0 + 1 else: 0 + for k1 in start ..< len(slot1.peers): + let newSlot = getLastSlot(slot1.peers[k1]) + if curSlot < newSlot: + curSlot = newSlot + x = i1; y = j1; z = k1 + + if x >= 0: + swap(man.groups[i0].slots[j0].peers[k0], + man.groups[x].slots[y].peers[z]) + +proc disband*[A, B](peerslot: PeerSlot[A, B]) = + ## Releases all the peers back to the PeerPool, and make ``peerslot`` empty. + for peer in peerslot.peers: + peerslot.man.pool.release(peer) + peerslot.peers.setLen(0) + +proc disband*[A, B](peergroup: PeerGroup[A, B]) = + ## Releases all the slots back to the PeerPool, and make ``peergroup`` empty. + for slot in peergroup.slots: + disband(slot) + peergroup.slots.setLen(0) + +proc disband*[A, B](syncman: SyncManager[A, B]) = + ## Releases all the groups to the PeerPool, and make SyncManager peer groups + ## empty. + for group in syncman.groups: + disband(group) + syncman.groups.setLen(0) + +proc getLastSlot*[A, B](peerslot: PeerSlot[A, B]): Slot = + ## Returns minimal available beacon chain slot, for peer's slot ``peerslot``. + mixin getLastSlot + doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero") + for i in 0 ..< len(peerslot.peers): + if i == 0: + result = getLastSlot(peerslot.peers[i]) + else: + let slot = getLastSlot(peerslot.peers[i]) + if slot < result: + result = slot + +proc getLastSlot*[A, B](peergroup: PeerGroup[A, B]): Slot = + ## Returns minimal available beacon chain slot, for peer's group + ## ``peergroup``. + doAssert(len(peergroup.slots) > 0, + "Number of slots in group must not be zero") + for i in 0 ..< len(peergroup.slots): + if i == 0: + result = getLastSlot(peergroup.slots[i]) + else: + let slot = getLastSlot(peergroup.slots[i]) + if slot < result: + result = slot + +proc getLastSlot*[A, B](sman: SyncManager[A, B]): Slot = + ## Returns minimal available beacon chain slot, for all peers in sync manager + ## ``sman``. + for i in 0 ..< len(sman.groups): + if i == 0: + result = getLastSlot(sman.groups[i]) + else: + let slot = getLastSlot(sman.groups[i]) + if slot < result: + result = slot + +proc getBlocks*[A, B](peerslot: PeerSlot[A, B], slot: Slot, count: uint64, + step: uint64): Future[Option[BlockList]] {.async.} = + mixin getBeaconBlocksByRange, getHeadRoot, `==` + doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero") + var pending = newSeq[Future[OptionBeaconBlockSeq]](len(peerslot.peers)) + var allFut, timeFut: Future[void] + try: + for i in 0 ..< len(peerslot.peers): + let root = getHeadRoot(peerslot.peers[i]) + pending[i] = getBeaconBlocksByRange(peerslot.peers[i], root, slot, count, + step) + + allFut = allFutures(pending) + if peerslot.man.peerSlotTimeout == InfiniteDuration: + timeFut = newFuture[void]() + else: + timeFut = sleepAsync(peerslot.man.peerSlotTimeout) + + discard await one(allFut, timeFut) + # We do not care about who finished first, because we are waiting for all + # peers it can happens that some peers returned data, and some are not. + var results = newSeq[seq[SignedBeaconBlock]]() + for i in 0 ..< len(pending): + if pending[i].finished() and + not(pending[i].failed()) and not(pending[i].cancelled()): + var fdata = pending[i].read() + if fdata.isSome(): + results.add(fdata.get()) + else: + # remote peer did not returns any data + discard + else: + # getBeaconBlocksByRange() returns failure + discard + + if len(results) > 0: + var m: seq[SignedBeaconBlock] + var i = 0 + if len(results) > (len(peerslot.peers) div 2): + # Now we going to obtain major sequence of blocks by using + # Boyer–Moore majority vote algorithm. + for x in results: + if i == 0: + m = x + i = 1 + elif m == x: + i = i + 1 + else: + i = i - 1 + i = 0 + for x in results: + if m == x: + i = i + 1 + if i > (len(peerslot.peers) div 2): + # Major sequence of blocks found, so we going to return such result + # and penalize all the peers which returned different sequences of + # blocks. + for i in 0 ..< len(pending): + if pending[i].finished() and + not(pending[i].failed()) and not(pending[i].cancelled()): + let fdata = pending[i].read() + if fdata.isSome(): + if fdata.get() != m: + # peer returned data which is not major + discard + result = BlockList.init(slot, count, step, m) + else: + # Major sequence could not be found, so we going to penalize all the + # peers. + discard + else: + # Timeout exceeded while we waiting data from peers, or peers returned + # an error. + discard + except CancelledError as exc: + if not allFut.finished: + allFut.cancel() + if not timeFut.finished: + timeFut.cancel() + for i in 0 ..< len(peerslot.peers): + if not pending[i].finished: + pending[i].cancel() + raise exc + +proc getParams*[T](peerslots: int, index: int, slot: T, + count: uint64): tuple[start: T, count: uint64, step: uint64] = + mixin `+` + doAssert(peerslots > 0, "Number of peerslots must not be zero") + doAssert(count > 0'u64, "Number of requested blocks must not be zero") + doAssert(index < peerslots, "Peer slot index must be lower then slots count") + result.start = slot + uint64(index) + let more = if uint64(index) < (count mod uint64(peerslots)): 1'u64 else: 0'u64 + result.count = (count div uint64(peerslots)) + more + result.step = uint64(peerslots) + +proc getBlocks*[A, B](peergroup: PeerGroup[A, B], slot: Slot, + count: uint64): Future[Option[BlockList]] {.async.} = + doAssert(len(peergroup.slots) > 0, + "Number of slots in group must not be zero") + doAssert(count > 0'u64) + let slotsCount = len(peergroup.slots) + var + params = newSeq[tuple[start: Slot, count: uint64, step: uint64]](slotsCount) + results = newSeq[Option[BlockList]](slotsCount) + pending = newSeq[Future[OptionBlockList]]() + requests = newSeq[tuple[slot: int, param: int]]() + failures = newSeq[int]() + + var allFut, timeFut: Future[void] + try: + for i in 0 ..< slotsCount: + params[i] = getParams(slotsCount, i, slot, count) + requests.add((slot: i, param: i)) + pending.add(getBlocks(peergroup.slots[i], params[i].start, + params[i].count, params[i].step)) + + if peergroup.man.peerGroupTimeout == InfiniteDuration: + timeFut = newFuture[void]() + else: + timeFut = sleepAsync(peergroup.man.peerGroupTimeout) + + while true: + allFut = allFutures(pending) + if not timeFut.finished(): + discard await one(allFut, timeFut) + # We do not care about who finished first, because we are waiting for + # all slots and it can happens that some slots returned data, and some + # are not. + for i in 0 ..< len(pending): + let slotIndex = requests[i].slot + let resIndex = requests[i].param + if pending[i].finished() and + not(pending[i].failed()) and not(pending[i].cancelled()): + results[resIndex] = pending[i].read() + if results[resIndex].isNone(): + failures.add(slotIndex) + else: + failures.add(slotIndex) + + if len(failures) == len(peergroup.slots): + # All the slots in group are failed to download blocks. + peergroup.disband() + break + else: + pending.setLen(0) + requests.setLen(0) + + var missing = 0 + for i in 0 ..< len(results): + if results[i].isNone(): + inc(missing) + + if missing > 0: + for k in 0 ..< len(peergroup.slots): + if (missing > 0) and (k notin failures): + for i in 0 ..< len(results): + if results[i].isNone(): + requests.add((slot: k, param: i)) + pending.add(getBlocks(peergroup.slots[k], params[i].start, + params[i].count, params[i].step)) + break + dec(missing) + else: + # All the blocks downloaded. + if len(failures) > 0: + var slots = newSeq[PeerSlot[A, B]]() + for i in 0 ..< len(peergroup.slots): + if i notin failures: + slots.add(peergroup.slots[i]) + else: + disband(peergroup.slots[i]) + peergroup.slots = slots + result = merge(results) + break + + except CancelledError as exc: + if not allFut.finished: + allFut.cancel() + if not timeFut.finished: + timeFut.cancel() + for i in 0 ..< len(peergroup.slots): + if not pending[i].finished: + pending[i].cancel() + raise exc + +proc updateStatus*[A, B](peerslot: PeerSlot[A, B]) {.async.} = + mixin updateStatus + doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero") + let peersCount = len(peerslot.peers) + var pending = newSeq[Future[void]](peersCount) + var failed = newSeq[int]() + var allFut, timeFut: Future[void] + + try: + for i in 0 ..< peersCount: + pending.add(updateStatus(peerslot.peers[i])) + + if peerslot.man.peerSlotTimeout == InfiniteDuration: + timeFut = newFuture[void]() + else: + timeFut = sleepAsync(peerslot.man.peerSlotTimeout) + + allFut = allFutures(pending) + discard await one(allFut, timeFut) + for i in 0 ..< len(pending): + if pending[i].finished() and pending[i].failed(): + failed.add(i) + + if len(failed) > 0: + for index in failed: + peerslot.man.pool.release(peerslot.peers[index]) + peerslot.peers.del(index) + + except CancelledError as exc: + if not allFut.finished: + allFut.cancel() + if not timeFut.finished: + timeFut.cancel() + for i in 0 ..< peersCount: + if not pending[i].finished: + pending[i].cancel() + raise exc + +proc updateStatus*[A, B](sman: SyncManager[A, B]) {.async.} = + var pending = newSeq[Future[void]]() + try: + for i in 0 ..< len(sman.groups): + for k in 0 ..< len(sman.groups[i].slots): + pending.add(updateStatus(sman.groups[i].slots[k])) + await allFutures(pending) + except CancelledError as exc: + for i in 0 ..< len(pending): + if not pending[i].finished: + pending[i].cancel() + raise exc + +proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = + var + squeue: SyncQueue + remoteLastKnownSlot: Slot + localLastSlot: Slot = sman.getLastLocalSlot() + pending = newSeq[Future[OptionBlockList]]() + requests = newSeq[SyncRequest]() + checkMoment = Moment.now() + errorsCount = 0 + counter = 0'u64 + + squeue = SyncQueue.init(localLastSlot + 1'u64, localLastSlot + 2'u64, + MAX_REQUESTED_BLOCKS, sman.updateLocalBlocks, + sman.groupsCount) + while true: + if errorsCount == sman.failuresCount: + # Number of consecutive errors exceeds limit + break + + pending.setLen(0) + requests.setLen(0) + + await sman.fillGroups() + sman.reorderGroups() + + var localLastSlot = sman.getLastLocalSlot() + let remoteLastSlot = sman.getLastSlot() + if remoteLastSlot > remoteLastKnownSlot: + remoteLastKnownSlot = remoteLastSlot + squeue.updateLastSlot(remoteLastKnownSlot) + + if localLastSlot >= remoteLastKnownSlot: + info "Synchronization successfully finished" + break + else: + # if counter == 0: + # info "Starting synchronization", local_slot = localLastSlot, + # remote_slot = remoteLastKnownSlot, + # count = len(squeue) + # else: + # info "Synchronization progress", progress = squeue.progress() + discard + + counter = counter + 1'u64 + + for i in countdown(len(sman.groups) - 1, 0): + if len(squeue) == 0: + break + var req = squeue.pop(uint64(len(sman.groups[i].slots))) + if sman.groups[i].getLastSlot() >= req.lastSlot(): + req.group = i + pending.add(getBlocks(sman.groups[i], req.slot, req.count)) + requests.add(req) + # trace "Send request to a group of peers", group = i + else: + squeue.push(req) + + if len(pending) == 0: + # All the peer groups do not satisfy slot requirements + # Disbanding all the peers + sman.disband() + await sleepAsync(sman.failurePause) + inc(errorsCount) + continue + else: + errorsCount = 0 + + # TODO: If getBeaconBlocksByRange() will properly support cancellation, + # then this can be done more efficiently at the end, so you do not need + # to wait for all futures here. + await allFutures(pending) + + var failedCount = 0 + for i in 0 ..< len(pending): + if pending[i].finished() and not(pending[i].failed()): + let res = pending[i].read() + if res.isSome(): + # trace "Peer's group successfully delivered data" + await squeue.push(requests[i], res.get().list) + else: + inc(failedCount) + # trace "Peer's group failed to deliver data" + squeue.push(requests[i]) + sman.groups[requests[i].group].disband() + + else: + inc(failedCount) + # trace "Peer's group failed to deliver data" + squeue.push(requests[i]) + sman.groups[requests[i].group].disband() + + if failedCount == len(pending): + # All the peer groups failed to download requests. + await sleepAsync(sman.failurePause) + inc(errorsCount) + continue + else: + errorsCount = 0 + + sman.compactGroups() + + # if `statusPeriod` time passed, we are updating peers status. + let stamp = Moment.now() + if stamp - checkMoment > sman.statusPeriod: + checkMoment = stamp + await sman.updateStatus() diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim new file mode 100644 index 000000000..1ca0b5837 --- /dev/null +++ b/tests/test_sync_manager.nim @@ -0,0 +1,862 @@ +import options, hashes, unittest +import chronos +import ../beacon_chain/peer_pool, ../beacon_chain/sync_manager + +type + PeerRequest = object + headRoot: Eth2Digest + startSlot: Slot + count: uint64 + step: uint64 + data: seq[Slot] + + SimplePeerKey = string + + SimplePeer = ref object + id: SimplePeerKey + weight: int + lifu: Future[void] + blockchain: seq[SignedBeaconBlock] + latestSlot: Slot + delay: Duration + malicious: bool + failure: bool + disconnect: bool + requests: seq[PeerRequest] + +proc getKey*(peer: SimplePeer): SimplePeerKey = + result = peer.id + +proc getFuture*(peer: SimplePeer): Future[void] = + result = peer.lifu + +proc `<`*(a, b: SimplePeer): bool = + result = `<`(a.weight, b.weight) + +proc getLastSlot*(peer: SimplePeer): Slot = + if len(peer.blockchain) == 0: + result = peer.latestSlot + else: + result = peer.blockchain[len(peer.blockchain) - 1].message.slot + +proc init*(t: typedesc[SimplePeer], id: string = "", malicious = false, + weight: int = 0, slot: int = 0, + delay: Duration = ZeroDuration): SimplePeer = + result = SimplePeer(id: id, weight: weight, lifu: newFuture[void](), + delay: delay, latestSlot: Slot(slot), + malicious: malicious) + +proc update*(peer: SimplePeer, chain: openarray[SignedBeaconBlock], + malicious = false, failure = false, disconnect = false, + delay: Duration = ZeroDuration) = + peer.malicious = malicious + peer.delay = delay + peer.failure = failure + peer.disconnect = disconnect + peer.blockchain.setLen(0) + for item in chain: + peer.blockchain.add(item) + +proc close*(peer: SimplePeer) = + peer.lifu.complete() + +proc getHeadRoot*(peer: SimplePeer): Eth2Digest = + discard + +proc updateStatus*[A](peer: A): Future[void] = + var res = newFuture[void]("updateStatus") + res.complete() + return res + +proc getBeaconBlocksByRange*[A](peer: A, headRoot: Eth2Digest, startSlot: Slot, + count: uint64, + step: uint64): Future[OptionBeaconBlockSeq] {.async.} = + var req = PeerRequest(headRoot: headRoot, startSlot: startSlot, count: count, + step: step) + var res = newSeq[SignedBeaconBlock]() + var reqres = newSeq[Slot]() + if peer.delay != ZeroDuration: + await sleepAsync(peer.delay) + + var counter = 0'u64 + + if peer.failure: + peer.requests.add(req) + if peer.disconnect: + peer.close() + raise newException(SyncManagerError, "Error") + + if peer.malicious: + var index = 0 + while counter < count: + if index < len(peer.blockchain): + res.add(peer.blockchain[index]) + reqres.add(peer.blockchain[index].message.slot) + else: + break + index = index + int(step) + counter = counter + 1'u64 + req.data = reqres + peer.requests.add(req) + result = some(res) + else: + var index = -1 + for i in 0 ..< len(peer.blockchain): + if peer.blockchain[i].message.slot == startSlot: + index = i + break + + if index >= 0: + while counter < count: + if index < len(peer.blockchain): + res.add(peer.blockchain[index]) + reqres.add(peer.blockchain[index].message.slot) + else: + break + index = index + int(step) + counter = counter + 1'u64 + req.data = reqres + result = some(res) + peer.requests.add(req) + +proc newTempChain*(number: int, start: Slot): seq[SignedBeaconBlock] = + result = newSeq[SignedBeaconBlock](number) + for i in 0 ..< number: + result[i].message.slot = start + uint64(i) + +proc `==`*(a1, a2: SignedBeaconBlock): bool {.inline.} = + result = (a1.message.slot == a2.message.slot) and + (a1.message.parent_root == a2.message.parent_root) and + (a1.message.state_root == a2.message.state_root) + +proc peerSlotTests(): Future[bool] {.async.} = + # slot0: 3 ok + # slot1: 2 ok 1 timeout + # slot2: 1 ok 2 timeout + # slot3: 2 ok 1 bad + # slot4: 1 ok 2 bad + # slot5: 2 ok 1 failure + # slot6: 1 ok 2 failure + # slot7: 1 ok 1 bad 1 failure + # slot8: 1 bad 1 timeout 1 failure + # slot9: 3 bad + # slot10: 3 timeout + # slot11: 3 failure + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, nil, nil, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 6) + + var chain1 = newTempChain(10, Slot(10000)) + var chain2 = newTempChain(10, Slot(11000)) + + var peers = newSeq[SimplePeer]() + for i in 0 ..< 36: + var peer = SimplePeer.init("id" & $i) + peers.add(peer) + + peers[0].update(chain1) + peers[1].update(chain1) + peers[2].update(chain1) + + peers[3].update(chain1) + peers[4].update(chain1, delay = 2.seconds) + peers[5].update(chain1) + + peers[6].update(chain1) + peers[7].update(chain1, delay = 2.seconds) + peers[8].update(chain1, delay = 2.seconds) + + peers[9].update(chain1) + peers[10].update(chain1) + peers[11].update(chain2, malicious = true) + + peers[12].update(chain1) + peers[13].update(chain2, malicious = true) + peers[14].update(chain2, malicious = true) + + peers[15].update(chain1) + peers[16].update(chain1) + peers[17].update(chain1, failure = true) + + peers[18].update(chain1) + peers[19].update(chain1, failure = true) + peers[20].update(chain1, failure = true) + + peers[21].update(chain1) + peers[22].update(chain2, malicious = true) + peers[23].update(chain1, failure = true) + + peers[24].update(chain2, malicious = true) + peers[25].update(chain1, failure = true) + peers[26].update(chain1, delay = 2.seconds) + + peers[27].update(chain2, malicious = true) + peers[28].update(chain2, malicious = true) + peers[29].update(chain2, malicious = true) + + peers[30].update(chain1, delay = 2.seconds) + peers[31].update(chain1, delay = 2.seconds) + peers[32].update(chain1, delay = 2.seconds) + + peers[33].update(chain1, failure = true) + peers[34].update(chain1, failure = true) + peers[35].update(chain1, failure = true) + + var slot0 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot0.peers = @[peers[0], peers[1], peers[2]] + + var slot1 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot1.peers = @[peers[3], peers[4], peers[5]] + + var slot2 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot2.peers = @[peers[6], peers[7], peers[8]] + + var slot3 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot3.peers = @[peers[9], peers[10], peers[11]] + + var slot4 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot4.peers = @[peers[12], peers[13], peers[14]] + + var slot5 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot5.peers = @[peers[15], peers[16], peers[17]] + + var slot6 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot6.peers = @[peers[18], peers[19], peers[20]] + + var slot7 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot7.peers = @[peers[21], peers[22], peers[23]] + + var slot8 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot8.peers = @[peers[24], peers[25], peers[26]] + + var slot9 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot9.peers = @[peers[27], peers[28], peers[29]] + + var slot10 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot10.peers = @[peers[30], peers[31], peers[32]] + + var slot11 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot11.peers = @[peers[33], peers[34], peers[35]] + + var s0 = await slot0.getBlocks(Slot(10000), 10'u64, 1'u64) + var s1 = await slot1.getBlocks(Slot(10000), 10'u64, 1'u64) + var s2 = await slot2.getBlocks(Slot(10000), 10'u64, 1'u64) + var s3 = await slot3.getBlocks(Slot(10000), 10'u64, 1'u64) + var s4 = await slot4.getBlocks(Slot(10000), 10'u64, 1'u64) + var s5 = await slot5.getBlocks(Slot(10000), 10'u64, 1'u64) + var s6 = await slot6.getBlocks(Slot(10000), 10'u64, 1'u64) + var s7 = await slot7.getBlocks(Slot(10000), 10'u64, 1'u64) + var s8 = await slot8.getBlocks(Slot(10000), 10'u64, 1'u64) + var s9 = await slot9.getBlocks(Slot(10000), 10'u64, 1'u64) + var s10 = await slot10.getBlocks(Slot(10000), 10'u64, 1'u64) + var s11 = await slot11.getBlocks(Slot(10000), 10'u64, 1'u64) + + var expected = BlockList.init(Slot(10000), 10'u64, 1'u64, chain1).get() + + doAssert(s0.isSome()) + doAssert(s1.isSome()) + doAssert(s2.isNone()) + doAssert(s3.isSome()) + doAssert(s4.isNone()) + doAssert(s5.isSome()) + doAssert(s6.isNone()) + doAssert(s7.isNone()) + doAssert(s8.isNone()) + doAssert(s9.isNone()) + doAssert(s10.isNone()) + doAssert(s11.isNone()) + doAssert($s0.get() == $expected) + doAssert($s1.get() == $expected) + doAssert($s3.get() == $expected) + doAssert($s5.get() == $expected) + + result = true + +proc peerGroupTests(): Future[bool] {.async.} = + # group0: 3 ok + # group1: 2 ok 1 bad + # group2: 1 ok 2 bad + # group3: 3 bad + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, nil, nil, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 6) + + var chain1 = newTempChain(10, Slot(10000)) + var chain2 = newTempChain(10, Slot(11000)) + + var peers = newSeq[SimplePeer]() + for i in 0 ..< 18: + var peer = SimplePeer.init("id" & $i) + peers.add(peer) + + proc cleanup() = + for i in 0 ..< 18: + peers[i].requests.setLen(0) + + peers[0].update(chain1) + peers[1].update(chain1) + peers[2].update(chain1) + + peers[3].update(chain1) + peers[4].update(chain1) + peers[5].update(chain1) + + peers[6].update(chain1) + peers[7].update(chain1) + peers[8].update(chain1) + + peers[9].update(chain1) + peers[10].update(chain2, malicious = true) + peers[11].update(chain2, malicious = true) + + peers[12].update(chain1, delay = 2.seconds) + peers[13].update(chain1, delay = 2.seconds) + peers[14].update(chain1, delay = 2.seconds) + + peers[15].update(chain1, failure = true) + peers[16].update(chain1, failure = true) + peers[17].update(chain1, failure = true) + + var slot0 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot0.peers = @[peers[0], peers[1], peers[2]] + var slot1 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot1.peers = @[peers[3], peers[4], peers[5]] + var slot2 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot2.peers = @[peers[6], peers[7], peers[8]] + var slot3 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot3.peers = @[peers[9], peers[10], peers[11]] + var slot4 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot4.peers = @[peers[12], peers[13], peers[14]] + var slot5 = newPeerSlot[SimplePeer, SimplePeerKey](sman) + slot5.peers = @[peers[15], peers[16], peers[17]] + + var group0 = newPeerGroup(sman) + group0.slots = @[slot0, slot1, slot2] + var group1 = newPeerGroup(sman) + group1.slots = @[slot0, slot1, slot3] + var group2 = newPeerGroup(sman) + group2.slots = @[slot0, slot3, slot4] + var group3 = newPeerGroup(sman) + group3.slots = @[slot3, slot4, slot5] + + var s0 = await group0.getBlocks(Slot(10000), 10'u64) + cleanup() + var s1 = await group1.getBlocks(Slot(10000), 10'u64) + cleanup() + var s2 = await group2.getBlocks(Slot(10000), 10'u64) + cleanup() + var s3 = await group3.getBlocks(Slot(10000), 10'u64) + cleanup() + + var expected = BlockList.init(Slot(10000), 10'u64, 1'u64, chain1).get() + + doAssert(s0.isSome()) + doAssert(s1.isSome()) + doAssert(s2.isSome()) + doAssert(s3.isNone()) + + doAssert($s0.get() == $expected) + doAssert($s1.get() == $expected) + doAssert($s2.get() == $expected) + + result = true + +proc syncQueueNonAsyncTests(): bool = + var q1 = SyncQueue.init(Slot(0), Slot(0), 1'u64, nil) + doAssert(len(q1) == 1) + var r11 = q1.pop() + doAssert(len(q1) == 0) + q1.push(r11) + doAssert(len(q1) == 1) + var r11e = q1.pop() + doAssert(len(q1) == 0) + doAssert(r11e == r11) + doAssert(r11.slot == Slot(0) and r11.count == 1'u64) + + var q2 = SyncQueue.init(Slot(0), Slot(1), 1'u64, nil) + doAssert(len(q2) == 2) + var r21 = q2.pop() + doAssert(len(q2) == 1) + var r22 = q2.pop() + doAssert(len(q2) == 0) + q2.push(r22) + doAssert(len(q2) == 1) + q2.push(r21) + doAssert(len(q2) == 2) + var r21e = q2.pop() + doAssert(len(q2) == 1) + var r22e = q2.pop() + doAssert(len(q2) == 0) + doAssert(r21 == r21e) + doAssert(r22 == r22e) + doAssert(r21.slot == Slot(0) and r21.count == 1'u64) + doAssert(r22.slot == Slot(1) and r22.count == 1'u64) + + var q3 = SyncQueue.init(Slot(0), Slot(4), 2'u64, nil) + doAssert(len(q3) == 5) + var r31 = q3.pop() + doAssert(len(q3) == 3) + var r32 = q3.pop() + doAssert(len(q3) == 1) + var r33 = q3.pop() + doAssert(len(q3) == 0) + q3.push(r33) + doAssert(len(q3) == 1) + q3.push(r32) + doAssert(len(q3) == 3) + q3.push(r31) + doAssert(len(q3) == 5) + var r31e = q3.pop() + doAssert(len(q3) == 3) + var r32e = q3.pop() + doAssert(len(q3) == 1) + var r33e = q3.pop() + doAssert(len(q3) == 0) + doAssert(r31 == r31e) + doAssert(r32 == r32e) + doAssert(r33 == r33e) + doAssert(r31.slot == Slot(0) and r31.count == 2'u64) + doAssert(r32.slot == Slot(2) and r32.count == 2'u64) + doAssert(r33.slot == Slot(4) and r33.count == 1'u64) + + var q4 = SyncQueue.init(Slot(1), Slot(5), 3'u64, nil) + doAssert(len(q4) == 5) + var r41 = q4.pop() + doAssert(len(q4) == 2) + var r42 = q4.pop() + doAssert(len(q4) == 0) + q4.push(r42) + doAssert(len(q4) == 2) + q4.push(r41) + doAssert(len(q4) == 5) + var r41e = q4.pop() + doAssert(len(q4) == 2) + var r42e = q4.pop() + doAssert(len(q4) == 0) + doAssert(r41 == r41e) + doAssert(r42 == r42e) + doAssert(r41.slot == Slot(1) and r41.count == 3'u64) + doAssert(r42.slot == Slot(4) and r42.count == 2'u64) + + var q5 = SyncQueue.init(Slot(1), Slot(30), 2'u64, nil) + doAssert(len(q5) == 30) + var r51 = q5.pop(5) + doAssert(len(q5) == 20) + doAssert(r51.slot == Slot(1) and r51.count == 10 and r51.step == 5) + q5.push(r51, 3'u64) + doAssert(len(q5) == 30) + var r511 = q5.pop() + var r512 = q5.pop() + doAssert(len(q5) == 20) + doAssert(r511.slot == Slot(1) and r511.count == 6 and r511.step == 3) + doAssert(r512.slot == Slot(7) and r512.count == 4 and r512.step == 2) + q5.push(r511, 2'u64) + q5.push(r512, 1'u64) + doAssert(len(q5) == 30) + var r5111 = q5.pop() + var r5112 = q5.pop() + var r5121 = q5.pop() + var r5122 = q5.pop() + doAssert(len(q5) == 20) + doAssert(r5111.slot == Slot(1) and r5111.count == 4 and r5111.step == 2) + doAssert(r5112.slot == Slot(5) and r5112.count == 2 and r5112.step == 1) + doAssert(r5121.slot == Slot(7) and r5121.count == 2 and r5121.step == 1) + doAssert(r5122.slot == Slot(9) and r5122.count == 2 and r5122.step == 1) + + var q6 = SyncQueue.init(Slot(1), Slot(7), 10'u64, nil) + doAssert(len(q6) == 7) + var r61 = q6.pop() + doAssert(r61.slot == Slot(1) and r61.count == 7 and r61.step == 1) + doAssert(len(q6) == 0) + + var q7 = SyncQueue.init(Slot(1), Slot(7), 10'u64, nil) + doAssert(len(q7) == 7) + var r71 = q7.pop(5) + doAssert(len(q7) == 0) + doAssert(r71.slot == Slot(1) and r71.count == 7 and r71.step == 5) + q7.push(r71, 3'u64) + doAssert(len(q7) == 7) + var r72 = q7.pop() + doAssert(r72.slot == Slot(1) and r72.count == 7 and r72.step == 3) + q7.push(r72, 2'u64) + doAssert(len(q7) == 7) + var r73 = q7.pop() + doAssert(len(q7) == 0) + doAssert(r73.slot == Slot(1) and r73.count == 7 and r73.step == 2) + q7.push(r73, 1'u64) + doAssert(len(q7) == 7) + var r74 = q7.pop() + doAssert(len(q7) == 0) + doAssert(r74.slot == Slot(1) and r74.count == 7 and r74.step == 1) + + result = true + +proc syncQueueAsyncTests(): Future[bool] {.async.} = + var chain1 = newSeq[SignedBeaconBlock](3) + chain1[0].message.slot = Slot(0) + chain1[1].message.slot = Slot(1) + chain1[2].message.slot = Slot(2) + var chain2 = newSeq[SignedBeaconBlock](7) + chain2[0].message.slot = Slot(5) + chain2[1].message.slot = Slot(6) + chain2[2].message.slot = Slot(7) + chain2[3].message.slot = Slot(8) + chain2[4].message.slot = Slot(9) + chain2[5].message.slot = Slot(10) + chain2[6].message.slot = Slot(11) + + var counter = 0 + proc receiver1(list: openarray[SignedBeaconBlock]): bool = + result = true + for item in list: + if item.message.slot == uint64(counter): + inc(counter) + else: + result = false + break + + var q1 = SyncQueue.init(Slot(0), Slot(2), 1'u64, receiver1, 1) + var r11 = q1.pop() + var r12 = q1.pop() + var r13 = q1.pop() + var f13 = q1.push(r13, @[chain1[2]]) + var f12 = q1.push(r12, @[chain1[1]]) + await sleepAsync(100.milliseconds) + doAssert(f12.finished == false) + doAssert(f13.finished == false) + doAssert(counter == 0) + var f11 = q1.push(r11, @[chain1[0]]) + doAssert(counter == 1) + doAssert(f11.finished == true and f11.failed == false) + await sleepAsync(100.milliseconds) + doAssert(f12.finished == true and f12.failed == false) + doAssert(f13.finished == true and f13.failed == false) + doAssert(counter == 3) + + var q2 = SyncQueue.init(Slot(5), Slot(11), 2'u64, receiver1, 2) + var r21 = q2.pop() + var r22 = q2.pop() + var r23 = q2.pop() + var r24 = q2.pop() + + counter = 5 + + var f24 = q2.push(r24, @[chain2[6]]) + var f22 = q2.push(r22, @[chain2[2], chain2[3]]) + doAssert(f24.finished == false) + doAssert(f22.finished == true and f22.failed == false) + doAssert(counter == 5) + var f21 = q2.push(r21, @[chain2[0], chain2[1]]) + doAssert(f21.finished == true and f21.failed == false) + await sleepAsync(100.milliseconds) + doAssert(f24.finished == true and f24.failed == false) + doAssert(counter == 9) + var f23 = q2.push(r23, @[chain2[4], chain2[5]]) + doAssert(f23.finished == true and f23.failed == false) + doAssert(counter == 12) + await sleepAsync(100.milliseconds) + doAssert(counter == 12) + + result = true + +proc checkRequest(req: PeerRequest, slot, count, step: int, + data: varargs[int]): bool = + result = (req.startSlot == Slot(slot)) and (req.count == uint64(count)) and + (req.step == uint64(step)) + if result: + if len(data) != len(req.data): + result = false + else: + for i in 0 ..< len(data): + if Slot(data[i]) != req.data[i]: + result = false + break + +proc checkRequest(peer: SimplePeer, index: int, slot, count, step: int, + data: varargs[int]): bool {.inline.} = + result = checkRequest(peer.requests[index], slot, count, step, data) + +proc syncManagerOnePeerTest(): Future[bool] {.async.} = + # Syncing with one peer only. + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var peer = SimplePeer.init("id1") + var srcChain = newTempChain(100, Slot(10000)) + var dstChain = newSeq[SignedBeaconBlock]() + + proc lastLocalSlot(): Slot = + if len(dstChain) == 0: + result = Slot(9999) + else: + result = dstChain[^1].message.slot + + proc updateBlocks(list: openarray[SignedBeaconBlock]): bool = + for item in list: + dstChain.add(item) + result = true + + peer.update(srcChain) + doAssert(pool.addIncomingPeer(peer) == true) + + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, lastLocalSlot, updateBlocks, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 6) + await sman.synchronize() + doAssert(checkRequest(peer, 0, 10000, 20, 1, + 10000, 10001, 10002, 10003, 10004, + 10005, 10006, 10007, 10008, 10009, + 10010, 10011, 10012, 10013, 10014, + 10015, 10016, 10017, 10018, 10019) == true) + doAssert(checkRequest(peer, 1, 10020, 20, 1, + 10020, 10021, 10022, 10023, 10024, + 10025, 10026, 10027, 10028, 10029, + 10030, 10031, 10032, 10033, 10034, + 10035, 10036, 10037, 10038, 10039) == true) + doAssert(checkRequest(peer, 2, 10040, 20, 1, + 10040, 10041, 10042, 10043, 10044, + 10045, 10046, 10047, 10048, 10049, + 10050, 10051, 10052, 10053, 10054, + 10055, 10056, 10057, 10058, 10059) == true) + doAssert(checkRequest(peer, 3, 10060, 20, 1, + 10060, 10061, 10062, 10063, 10064, + 10065, 10066, 10067, 10068, 10069, + 10070, 10071, 10072, 10073, 10074, + 10075, 10076, 10077, 10078, 10079) == true) + doAssert(checkRequest(peer, 4, 10080, 20, 1, + 10080, 10081, 10082, 10083, 10084, + 10085, 10086, 10087, 10088, 10089, + 10090, 10091, 10092, 10093, 10094, + 10095, 10096, 10097, 10098, 10099) == true) + result = true + +proc syncManagerOneSlotTest(): Future[bool] {.async.} = + # Syncing with one slot (2n + 1 number of peers) only. + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + + var peers = newSeq[SimplePeer](3) + for i in 0 ..< len(peers): + peers[i] = SimplePeer.init("id" & $(i + 1)) + + var srcChain = newTempChain(100, Slot(10000)) + var dstChain = newSeq[SignedBeaconBlock]() + + proc lastLocalSlot(): Slot = + if len(dstChain) == 0: + result = Slot(9999) + else: + result = dstChain[^1].message.slot + + proc updateBlocks(list: openarray[SignedBeaconBlock]): bool = + for item in list: + dstChain.add(item) + result = true + + for i in 0 ..< len(peers): + peers[i].update(srcChain) + doAssert(pool.addIncomingPeer(peers[0]) == true) + doAssert(pool.addOutgoingPeer(peers[1]) == true) + doAssert(pool.addOutgoingPeer(peers[2]) == true) + + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, lastLocalSlot, updateBlocks, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 6) + await sman.synchronize() + for i in 0 ..< len(peers): + doAssert(checkRequest(peers[i], 0, 10000, 20, 1, + 10000, 10001, 10002, 10003, 10004, + 10005, 10006, 10007, 10008, 10009, + 10010, 10011, 10012, 10013, 10014, + 10015, 10016, 10017, 10018, 10019) == true) + doAssert(checkRequest(peers[i], 1, 10020, 20, 1, + 10020, 10021, 10022, 10023, 10024, + 10025, 10026, 10027, 10028, 10029, + 10030, 10031, 10032, 10033, 10034, + 10035, 10036, 10037, 10038, 10039) == true) + doAssert(checkRequest(peers[i], 2, 10040, 20, 1, + 10040, 10041, 10042, 10043, 10044, + 10045, 10046, 10047, 10048, 10049, + 10050, 10051, 10052, 10053, 10054, + 10055, 10056, 10057, 10058, 10059) == true) + doAssert(checkRequest(peers[i], 3, 10060, 20, 1, + 10060, 10061, 10062, 10063, 10064, + 10065, 10066, 10067, 10068, 10069, + 10070, 10071, 10072, 10073, 10074, + 10075, 10076, 10077, 10078, 10079) == true) + doAssert(checkRequest(peers[i], 4, 10080, 20, 1, + 10080, 10081, 10082, 10083, 10084, + 10085, 10086, 10087, 10088, 10089, + 10090, 10091, 10092, 10093, 10094, + 10095, 10096, 10097, 10098, 10099) == true) + result = true + +proc syncManagerOneGroupTest(): Future[bool] {.async.} = + # Syncing with one group of peers (n peer slots). + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var peers = newSeq[SimplePeer](6) + for i in 0 ..< len(peers): + peers[i] = SimplePeer.init("id" & $(i + 1), weight = 10 - i) + + var srcChain = newTempChain(100, Slot(10000)) + var dstChain = newSeq[SignedBeaconBlock]() + + proc lastLocalSlot(): Slot = + if len(dstChain) == 0: + result = Slot(9999) + else: + result = dstChain[^1].message.slot + + proc updateBlocks(list: openarray[SignedBeaconBlock]): bool = + for item in list: + dstChain.add(item) + result = true + + for i in 0 ..< len(peers): + peers[i].update(srcChain) + if i mod 2 == 0: + doAssert(pool.addIncomingPeer(peers[i]) == true) + else: + doAssert(pool.addOutgoingPeer(peers[i]) == true) + + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, lastLocalSlot, updateBlocks, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 2) + await sman.synchronize() + + for i in 0 ..< len(peers): + if i in {0, 1, 2}: + doAssert(checkRequest(peers[i], 0, 10000, 20, 2, + 10000, 10002, 10004, 10006, 10008, + 10010, 10012, 10014, 10016, 10018, + 10020, 10022, 10024, 10026, 10028, + 10030, 10032, 10034, 10036, 10038) == true) + doAssert(checkRequest(peers[i], 1, 10040, 20, 2, + 10040, 10042, 10044, 10046, 10048, + 10050, 10052, 10054, 10056, 10058, + 10060, 10062, 10064, 10066, 10068, + 10070, 10072, 10074, 10076, 10078) == true) + doAssert(checkRequest(peers[i], 2, 10080, 10, 2, + 10080, 10082, 10084, 10086, 10088, + 10090, 10092, 10094, 10096, 10098) == true) + elif i in {3, 4, 5}: + doAssert(checkRequest(peers[i], 0, 10001, 20, 2, + 10001, 10003, 10005, 10007, 10009, + 10011, 10013, 10015, 10017, 10019, + 10021, 10023, 10025, 10027, 10029, + 10031, 10033, 10035, 10037, 10039) == true) + doAssert(checkRequest(peers[i], 1, 10041, 20, 2, + 10041, 10043, 10045, 10047, 10049, + 10051, 10053, 10055, 10057, 10059, + 10061, 10063, 10065, 10067, 10069, + 10071, 10073, 10075, 10077, 10079) == true) + doAssert(checkRequest(peers[i], 2, 10081, 10, 2, + 10081, 10083, 10085, 10087, 10089, + 10091, 10093, 10095, 10097, 10099) == true) + + result = true + +proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = + # Syncing with two groups of peers (n peer slots), when one groups is failed + # to deliver request, and this request is bigger then other group. + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var peers = newSeq[SimplePeer](6 + 3) + for i in 0 ..< len(peers): + peers[i] = SimplePeer.init("id" & $(i + 1), weight = 9 - i) + + var srcChain = newTempChain(100, Slot(10000)) + var dstChain = newSeq[SignedBeaconBlock]() + + for i in 0 ..< 6: + peers[i].update(srcChain, failure = true, disconnect = true) + for i in 6 ..< len(peers): + peers[i].update(srcChain) + + proc lastLocalSlot(): Slot = + if len(dstChain) == 0: + result = Slot(9999) + else: + result = dstChain[^1].message.slot + + proc updateBlocks(list: openarray[SignedBeaconBlock]): bool = + for item in list: + dstChain.add(item) + result = true + + for i in 0 ..< len(peers): + if i mod 2 == 0: + doAssert(pool.addIncomingPeer(peers[i]) == true) + else: + doAssert(pool.addOutgoingPeer(peers[i]) == true) + + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, lastLocalSlot, updateBlocks, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 2) + await sman.synchronize() + + for i in 0 ..< len(peers): + if i in {0, 1, 2}: + doAssert(checkRequest(peers[i], 0, 10020, 20, 2) == true) + elif i in {3, 4, 5}: + doAssert(checkRequest(peers[i], 0, 10021, 20, 2) == true) + elif i in {6, 7, 8}: + doAssert(checkRequest(peers[i], 0, 10000, 20, 1, + 10000, 10001, 10002, 10003, 10004, + 10005, 10006, 10007, 10008, 10009, + 10010, 10011, 10012, 10013, 10014, + 10015, 10016, 10017, 10018, 10019) == true) + doAssert(checkRequest(peers[i], 1, 10020, 20, 1, + 10020, 10021, 10022, 10023, 10024, + 10025, 10026, 10027, 10028, 10029, + 10030, 10031, 10032, 10033, 10034, + 10035, 10036, 10037, 10038, 10039) == true) + doAssert(checkRequest(peers[i], 2, 10040, 20, 1, + 10040, 10041, 10042, 10043, 10044, + 10045, 10046, 10047, 10048, 10049, + 10050, 10051, 10052, 10053, 10054, + 10055, 10056, 10057, 10058, 10059) == true) + doAssert(checkRequest(peers[i], 3, 10060, 20, 1, + 10060, 10061, 10062, 10063, 10064, + 10065, 10066, 10067, 10068, 10069, + 10070, 10071, 10072, 10073, 10074, + 10075, 10076, 10077, 10078, 10079) == true) + doAssert(checkRequest(peers[i], 4, 10080, 20, 1, + 10080, 10081, 10082, 10083, 10084, + 10085, 10086, 10087, 10088, 10089, + 10090, 10091, 10092, 10093, 10094, + 10095, 10096, 10097, 10098, 10099) == true) + result = true + + +when isMainModule: + suite "SyncManager test suite": + test "BlockList tests": + # TODO + discard + # test "PeerSlot tests": + # check waitFor(peerSlotTests()) == true + # test "PeerGroup tests": + # check waitFor(peerGroupTests()) == true + # test "SyncQueue non-async tests": + # check syncQueueNonAsyncTests() == true + # test "SyncQueue async tests": + # check waitFor(syncQueueAsyncTests()) == true + # test "SyncManager one-peer test": + # check waitFor(syncManagerOnePeerTest()) == true + # test "SyncManager one-peer-slot test": + # check waitFor(syncManagerOneSlotTest()) == true + # test "SyncManager one-peer-group test": + # check waitFor(syncManagerOneGroupTest()) == true + test "SyncManager group-recovery test": + check waitFor(syncManagerGroupRecoveryTest()) == true From db20fc11720516641790d53f680ca072a01c68d6 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 22 Jan 2020 14:47:55 +0200 Subject: [PATCH 02/14] Fix SyncQueue push(data) bug. Rename lastSlot to HeadSlot. Add failure test. --- beacon_chain/sync_manager.nim | 176 +++++++++++++++++++++------------- tests/test_sync_manager.nim | 88 +++++++++++------ 2 files changed, 171 insertions(+), 93 deletions(-) diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index 84ae2e667..dbd16d203 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -1,22 +1,26 @@ import chronicles import options, deques, heapqueue -import spec/datatypes, spec/digest, stew/bitseqs, chronos -import peer_pool -export datatypes, digest +import stew/bitseqs, chronos, chronicles +import spec/datatypes, spec/digest, peer_pool +export datatypes, digest, chronos, chronicles -# logScope: -# topics = "syncman" +logScope: + topics = "syncman" const MAX_REQUESTED_BLOCKS* = 20'u64 type - # A - Peer type - # B - PeerID type - # - # getLastSlot(Peer): Slot - # getHeadRoot(Peer): Eth2Digest - # getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, uint64): Future[Option[seq[SignedBeaconBlock]]] - # updateStatus(Peer): void + ## A - `Peer` type + ## B - `PeerID` type + ## + ## Procedures which needs to be implemented and will be mixed to SyncManager's + ## code: + ## + ## getHeadSlot(Peer): Slot + ## getHeadRoot(Peer): Eth2Digest + ## getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, + ## uint64): Future[Option[seq[SignedBeaconBlock]]] + ## updateStatus(Peer): void PeerSlot*[A, B] = ref object peers*: seq[A] @@ -26,7 +30,7 @@ type slots*: seq[PeerSlot[A, B]] man: SyncManager[A, B] - GetLastLocalSlotCallback* = proc(): Slot + GetLocalHeadSlotCallback* = proc(): Slot UpdateLocalBlocksCallback* = proc(list: openarray[SignedBeaconBlock]): bool SyncManager*[A, B] = ref object @@ -40,7 +44,7 @@ type peerSlotTimeout: chronos.Duration peerGroupTimeout: chronos.Duration statusPeriod: chronos.Duration - getLastLocalSlot: GetLastLocalSlotCallback + getLocalHeadSlot: GetLocalHeadSlotCallback updateLocalBlocks: UpdateLocalBlocksCallback BlockList* = object @@ -127,7 +131,7 @@ proc push*(sq: SyncQueue, sr: SyncRequest, data: seq[SignedBeaconBlock]) {.async.} = ## Push successfull result to queue ``sq``. while true: - if (sq.queueSize > 0) and (sr.slot > sq.inpSlot + uint64(sq.queueSize)): + if (sq.queueSize > 0) and (sr.slot >= sq.outSlot + uint64(sq.queueSize)): await sq.notFullEvent.wait() sq.notFullEvent.clear() continue @@ -196,7 +200,7 @@ proc pop*(sq: SyncQueue, step = 0'u64): SyncRequest = result = SyncRequest(slot: sq.inpSlot, count: count, step: nstep) sq.inpSlot = sq.inpSlot + count else: - raise newException(ValueError, "Queue is already empty!") + raise newException(SyncManagerError, "Queue is already empty!") proc len*(sq: SyncQueue): uint64 {.inline.} = ## Returns number of slots left in queue ``sq``. @@ -211,7 +215,8 @@ proc total*(sq: SyncQueue): uint64 {.inline.} = proc progress*(sq: SyncQueue): string = ## Returns queue's ``sq`` progress string. - result = $len(sq) & "/" & $sq.total() + let curSlot = sq.outSlot - sq.startSlot + result = $curSlot & "/" & $sq.total() proc init*(t: typedesc[BlockList], start: Slot, count, step: uint64, list: openarray[SignedBeaconBlock]): Option[BlockList] = @@ -315,7 +320,7 @@ proc merge*(optlists: varargs[Option[BlockList]]): Option[BlockList] = result = some(res) proc newSyncManager*[A, B](pool: PeerPool[A, B], - getLastLocalSlotCb: GetLastLocalSlotCallback, + getLocalHeadSlotCb: GetLocalHeadSlotCallback, updateLocalBlocksCb: UpdateLocalBlocksCallback, peersInSlot = 3, peerSlotTimeout = 6.seconds, slotsInGroup = 2, peerGroupTimeout = 10.seconds, @@ -337,7 +342,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], ## ## ``statusPeriod`` - period of time between status updates. ## - ## ``getLastLocalSlotCb`` - function which provides current latest `Slot` in + ## ``getLocalHeadSlotCb`` - function which provides current latest `Slot` in ## local database. ## ## ``updateLocalBlocksCb`` - function which accepts list of downloaded blocks @@ -354,7 +359,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], peerSlotTimeout: peerSlotTimeout, peerGroupTimeout: peerGroupTimeout, statusPeriod: statusPeriod, - getLastLocalSlot: getLastLocalSlotCb, + getLocalHeadSlot: getLocalHeadSlotCb, updateLocalBlocks: updateLocalBlocksCb, failuresCount: failuresCount, failurePause: failurePause) @@ -369,14 +374,14 @@ proc newPeerSlot*[A, B](man: SyncManager[A, B]): PeerSlot[A, B] = proc `$`*[A, B](peerslot: PeerSlot[A, B]): string = ## Returns string representation of peer's slot ``peerslot``. - mixin getKey, getLastSlot + mixin getKey, getHeadSlot if len(peerslot.peers) == 0: result = "<>" else: result = "<" for item in peerslot.peers: result.add("\"" & getKey(item) & "\"") - result.add(":" & $getLastSlot(item)) + result.add(":" & $getHeadSlot(item)) result.add(", ") result.setLen(len(result) - 2) result.add(">") @@ -509,6 +514,12 @@ proc `$`*[A, B](man: SyncManager[A, B]): string = if len(result) > 0: result.setLen(len(result) - 2) +proc peersCount*[A, B](man: SyncManager[A, B]): int = + ## Returns number of peers which is managed by Sync Manager ``man``. + for i in 0 ..< len(man.groups): + for k in 0 ..< len(man.groups[i].slots): + result = result + len(man.groups[i].slots[k].peers) + proc fillGroups*[A, B](man: SyncManager[A, B]) {.async.} = if len(man.groups) == 0: while len(man.groups) < man.groupsCount: @@ -559,7 +570,7 @@ proc isEmpty*[A, B](man: SyncManager[A, B]): bool = result = (len(man.groups) == 0) proc reorderGroups*[A, B](man: SyncManager[A, B]) = - mixin getLastSlot + mixin getHeadSlot doAssert(not(man.isEmpty())) var x, y, z: int @@ -568,7 +579,7 @@ proc reorderGroups*[A, B](man: SyncManager[A, B]) = for j0 in 0 ..< len(group0.slots): let slot0 = group0.slots[j0] for k0 in 0 ..< len(slot0.peers): - var curSlot = getLastSlot(slot0.peers[k0]) + var curSlot = getHeadSlot(slot0.peers[k0]) x = -1; y = -1; z = -1 for i1 in i0 ..< len(man.groups): @@ -577,7 +588,7 @@ proc reorderGroups*[A, B](man: SyncManager[A, B]) = let slot1 = group1.slots[j1] let start = if (i1 == i0) and (j1 == j0): k0 + 1 else: 0 for k1 in start ..< len(slot1.peers): - let newSlot = getLastSlot(slot1.peers[k1]) + let newSlot = getHeadSlot(slot1.peers[k1]) if curSlot < newSlot: curSlot = newSlot x = i1; y = j1; z = k1 @@ -605,39 +616,39 @@ proc disband*[A, B](syncman: SyncManager[A, B]) = disband(group) syncman.groups.setLen(0) -proc getLastSlot*[A, B](peerslot: PeerSlot[A, B]): Slot = +proc getHeadSlot*[A, B](peerslot: PeerSlot[A, B]): Slot = ## Returns minimal available beacon chain slot, for peer's slot ``peerslot``. - mixin getLastSlot + mixin getHeadSlot doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero") for i in 0 ..< len(peerslot.peers): if i == 0: - result = getLastSlot(peerslot.peers[i]) + result = getHeadSlot(peerslot.peers[i]) else: - let slot = getLastSlot(peerslot.peers[i]) + let slot = getHeadSlot(peerslot.peers[i]) if slot < result: result = slot -proc getLastSlot*[A, B](peergroup: PeerGroup[A, B]): Slot = +proc getHeadSlot*[A, B](peergroup: PeerGroup[A, B]): Slot = ## Returns minimal available beacon chain slot, for peer's group ## ``peergroup``. doAssert(len(peergroup.slots) > 0, "Number of slots in group must not be zero") for i in 0 ..< len(peergroup.slots): if i == 0: - result = getLastSlot(peergroup.slots[i]) + result = getHeadSlot(peergroup.slots[i]) else: - let slot = getLastSlot(peergroup.slots[i]) + let slot = getHeadSlot(peergroup.slots[i]) if slot < result: result = slot -proc getLastSlot*[A, B](sman: SyncManager[A, B]): Slot = +proc getHeadSlot*[A, B](sman: SyncManager[A, B]): Slot = ## Returns minimal available beacon chain slot, for all peers in sync manager ## ``sman``. for i in 0 ..< len(sman.groups): if i == 0: - result = getLastSlot(sman.groups[i]) + result = getHeadSlot(sman.groups[i]) else: - let slot = getLastSlot(sman.groups[i]) + let slot = getHeadSlot(sman.groups[i]) if slot < result: result = slot @@ -878,22 +889,27 @@ proc updateStatus*[A, B](sman: SyncManager[A, B]) {.async.} = raise exc proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = + ## TODO: This synchronization procedure is not optimal, we can do it better + ## if spawn N parallel tasks, where N is number of peer groups. var squeue: SyncQueue - remoteLastKnownSlot: Slot - localLastSlot: Slot = sman.getLastLocalSlot() + remoteKnownHeadSlot: Slot + localHeadSlot: Slot = sman.getLocalHeadSlot() pending = newSeq[Future[OptionBlockList]]() requests = newSeq[SyncRequest]() - checkMoment = Moment.now() + startMoment = Moment.now() + checkMoment = startMoment errorsCount = 0 counter = 0'u64 - squeue = SyncQueue.init(localLastSlot + 1'u64, localLastSlot + 2'u64, + squeue = SyncQueue.init(localHeadSlot + 1'u64, localHeadSlot + 2'u64, MAX_REQUESTED_BLOCKS, sman.updateLocalBlocks, sman.groupsCount) while true: if errorsCount == sman.failuresCount: # Number of consecutive errors exceeds limit + error "Synchronization failed", errors = errorsCount, + duration = $(Moment.now() - startMoment) break pending.setLen(0) @@ -902,51 +918,64 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = await sman.fillGroups() sman.reorderGroups() - var localLastSlot = sman.getLastLocalSlot() - let remoteLastSlot = sman.getLastSlot() - if remoteLastSlot > remoteLastKnownSlot: - remoteLastKnownSlot = remoteLastSlot - squeue.updateLastSlot(remoteLastKnownSlot) + localHeadSlot = sman.getLocalHeadSlot() + let remoteHeadSlot = sman.getHeadSlot() + if remoteHeadSlot > remoteKnownHeadSlot: + remoteKnownHeadSlot = remoteHeadSlot + squeue.updateLastSlot(remoteKnownHeadSlot) - if localLastSlot >= remoteLastKnownSlot: - info "Synchronization successfully finished" + if localHeadSlot >= remoteKnownHeadSlot: + info "Synchronization finished", progress = squeue.progress(), + peers = sman.peersCount(), + groups = len(sman.groups), + duration = $(Moment.now() - startMoment) break else: - # if counter == 0: - # info "Starting synchronization", local_slot = localLastSlot, - # remote_slot = remoteLastKnownSlot, - # count = len(squeue) - # else: - # info "Synchronization progress", progress = squeue.progress() - discard + if counter == 0: + info "Starting synchronization", local_head_slot = localHeadSlot, + remote_head_slot = remoteKnownHeadSlot, + count = len(squeue), + peers = sman.peersCount(), + groups = len(sman.groups), + progress = squeue.progress() + + else: + info "Synchronization progress", progress = squeue.progress(), + peers = sman.peersCount(), + groups = len(sman.groups), + iteration = counter counter = counter + 1'u64 for i in countdown(len(sman.groups) - 1, 0): if len(squeue) == 0: break + let groupLastSlot = sman.groups[i].getHeadSlot() var req = squeue.pop(uint64(len(sman.groups[i].slots))) - if sman.groups[i].getLastSlot() >= req.lastSlot(): + trace "Request created", slot = req.slot, step = req.step, + count = req.count + if groupLastSlot >= req.lastSlot(): req.group = i pending.add(getBlocks(sman.groups[i], req.slot, req.count)) requests.add(req) - # trace "Send request to a group of peers", group = i + trace "Request sent to a group", group = i, slot = req.slot, + step = req.step, + count = req.count else: + trace "Request returned to queue", slot = req.slot, step = req.step, + count = req.count, + group_last_slot = groupLastSlot squeue.push(req) if len(pending) == 0: # All the peer groups do not satisfy slot requirements # Disbanding all the peers sman.disband() - await sleepAsync(sman.failurePause) inc(errorsCount) + warn "Unable to create requests, disbanding peers", errors = errorsCount + await sleepAsync(sman.failurePause) continue - else: - errorsCount = 0 - # TODO: If getBeaconBlocksByRange() will properly support cancellation, - # then this can be done more efficiently at the end, so you do not need - # to wait for all futures here. await allFutures(pending) var failedCount = 0 @@ -954,24 +983,34 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = if pending[i].finished() and not(pending[i].failed()): let res = pending[i].read() if res.isSome(): - # trace "Peer's group successfully delivered data" + trace "Request data received", group = requests[i].group, + slot = requests[i].slot, + step = requests[i].step, + count = requests[i].count await squeue.push(requests[i], res.get().list) else: inc(failedCount) - # trace "Peer's group failed to deliver data" + trace "Request failed", group = requests[i].group, + slot = requests[i].slot, + step = requests[i].step, + count = requests[i].count squeue.push(requests[i]) sman.groups[requests[i].group].disband() - else: inc(failedCount) - # trace "Peer's group failed to deliver data" + trace "Request failed", group = requests[i].group, + slot = requests[i].slot, + step = requests[i].step, + count = requests[i].count squeue.push(requests[i]) sman.groups[requests[i].group].disband() if failedCount == len(pending): # All the peer groups failed to download requests. - await sleepAsync(sman.failurePause) inc(errorsCount) + warn "All requests failed to deliver data, disbanding peers", + errors = errorsCount + await sleepAsync(sman.failurePause) continue else: errorsCount = 0 @@ -982,4 +1021,9 @@ proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} = let stamp = Moment.now() if stamp - checkMoment > sman.statusPeriod: checkMoment = stamp + info "Updating peers status" await sman.updateStatus() + info "Peers status updated", duration = $(Moment.now() - checkMoment) + + # Returning all the peers back to PeerPool. + sman.disband() diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 1ca0b5837..f5298f173 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -33,7 +33,7 @@ proc getFuture*(peer: SimplePeer): Future[void] = proc `<`*(a, b: SimplePeer): bool = result = `<`(a.weight, b.weight) -proc getLastSlot*(peer: SimplePeer): Slot = +proc getHeadSlot*(peer: SimplePeer): Slot = if len(peer.blockchain) == 0: result = peer.latestSlot else: @@ -550,17 +550,19 @@ proc syncQueueAsyncTests(): Future[bool] {.async.} = var f24 = q2.push(r24, @[chain2[6]]) var f22 = q2.push(r22, @[chain2[2], chain2[3]]) doAssert(f24.finished == false) - doAssert(f22.finished == true and f22.failed == false) + doAssert(f22.finished == false) doAssert(counter == 5) var f21 = q2.push(r21, @[chain2[0], chain2[1]]) doAssert(f21.finished == true and f21.failed == false) await sleepAsync(100.milliseconds) - doAssert(f24.finished == true and f24.failed == false) + doAssert(f22.finished == true and f22.failed == false) + doAssert(f24.finished == false) doAssert(counter == 9) var f23 = q2.push(r23, @[chain2[4], chain2[5]]) doAssert(f23.finished == true and f23.failed == false) - doAssert(counter == 12) + doAssert(counter == 11) await sleepAsync(100.milliseconds) + doAssert(f24.finished == true and f24.failed == false) doAssert(counter == 12) result = true @@ -732,7 +734,6 @@ proc syncManagerOneGroupTest(): Future[bool] {.async.} = peerSlotTimeout = 1.seconds, slotsInGroup = 2) await sman.synchronize() - for i in 0 ..< len(peers): if i in {0, 1, 2}: doAssert(checkRequest(peers[i], 0, 10000, 20, 2, @@ -804,7 +805,6 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = peerSlotTimeout = 1.seconds, slotsInGroup = 2) await sman.synchronize() - for i in 0 ..< len(peers): if i in {0, 1, 2}: doAssert(checkRequest(peers[i], 0, 10020, 20, 2) == true) @@ -838,25 +838,59 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = 10095, 10096, 10097, 10098, 10099) == true) result = true +proc syncManagerFailureTest(): Future[bool] {.async.} = + # Failure test + const FailuresCount = 3 + var pool = newPeerPool[SimplePeer, SimplePeerKey]() + var peer = SimplePeer.init("id1", weight = 0) -when isMainModule: - suite "SyncManager test suite": - test "BlockList tests": - # TODO - discard - # test "PeerSlot tests": - # check waitFor(peerSlotTests()) == true - # test "PeerGroup tests": - # check waitFor(peerGroupTests()) == true - # test "SyncQueue non-async tests": - # check syncQueueNonAsyncTests() == true - # test "SyncQueue async tests": - # check waitFor(syncQueueAsyncTests()) == true - # test "SyncManager one-peer test": - # check waitFor(syncManagerOnePeerTest()) == true - # test "SyncManager one-peer-slot test": - # check waitFor(syncManagerOneSlotTest()) == true - # test "SyncManager one-peer-group test": - # check waitFor(syncManagerOneGroupTest()) == true - test "SyncManager group-recovery test": - check waitFor(syncManagerGroupRecoveryTest()) == true + var srcChain = newTempChain(100, Slot(10000)) + var dstChain = newSeq[SignedBeaconBlock]() + + peer.update(srcChain, failure = true) + + proc lastLocalSlot(): Slot = + if len(dstChain) == 0: + result = Slot(9999) + else: + result = dstChain[^1].message.slot + + proc updateBlocks(list: openarray[SignedBeaconBlock]): bool = + for item in list: + dstChain.add(item) + result = true + + doAssert(pool.addIncomingPeer(peer) == true) + + var sman = newSyncManager[SimplePeer, + SimplePeerKey](pool, lastLocalSlot, updateBlocks, + peersInSlot = 3, + peerSlotTimeout = 1.seconds, + slotsInGroup = 2, + failuresCount = FailuresCount, + failurePause = 100.milliseconds) + await sman.synchronize() + doAssert(len(peer.requests) == FailuresCount) + for i in 0 ..< len(peer.requests): + doAssert(checkRequest(peer, i, 10000, 20, 1) == true) + result = true + +suite "SyncManager test suite": + test "PeerSlot tests": + check waitFor(peerSlotTests()) == true + test "PeerGroup tests": + check waitFor(peerGroupTests()) == true + test "SyncQueue non-async tests": + check syncQueueNonAsyncTests() == true + test "SyncQueue async tests": + check waitFor(syncQueueAsyncTests()) == true + test "SyncManager one-peer test": + check waitFor(syncManagerOnePeerTest()) == true + test "SyncManager one-peer-slot test": + check waitFor(syncManagerOneSlotTest()) == true + test "SyncManager one-peer-group test": + check waitFor(syncManagerOneGroupTest()) == true + test "SyncManager group-recovery test": + check waitFor(syncManagerGroupRecoveryTest()) == true + test "SyncManager failure test": + check waitFor(syncManagerFailureTest()) == true From 3b809616ec4116a0a1e6df06d858268abd6fbd6a Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 22 Jan 2020 14:50:14 +0200 Subject: [PATCH 03/14] Add SyncManager tests to test suite. --- tests/all_tests.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/all_tests.nim b/tests/all_tests.nim index 06426231f..3c7c1860c 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -25,7 +25,8 @@ import # Unit test ./test_sync_protocol, # ./test_validator # Empty! ./test_zero_signature, - ./test_peer_pool + ./test_peer_pool, + ./test_sync_manager import # Refactor state transition unit tests # TODO re-enable when useful From 8b229d68adc6f43a902ce5bb68a8f32ff5c9a623 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 22 Jan 2020 15:41:20 +0200 Subject: [PATCH 04/14] Add testutil and timedTest. --- tests/test_sync_manager.nim | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index f5298f173..a5ab56169 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -1,4 +1,14 @@ +# beacon_chain +# Copyright (c) 2019 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.used.} + import options, hashes, unittest +import ./testutil import chronos import ../beacon_chain/peer_pool, ../beacon_chain/sync_manager @@ -876,21 +886,21 @@ proc syncManagerFailureTest(): Future[bool] {.async.} = result = true suite "SyncManager test suite": - test "PeerSlot tests": + timedTest "PeerSlot tests": check waitFor(peerSlotTests()) == true - test "PeerGroup tests": + timedTest "PeerGroup tests": check waitFor(peerGroupTests()) == true - test "SyncQueue non-async tests": + timedTest "SyncQueue non-async tests": check syncQueueNonAsyncTests() == true - test "SyncQueue async tests": + timedTest "SyncQueue async tests": check waitFor(syncQueueAsyncTests()) == true - test "SyncManager one-peer test": + timedTest "SyncManager one-peer test": check waitFor(syncManagerOnePeerTest()) == true - test "SyncManager one-peer-slot test": + timedTest "SyncManager one-peer-slot test": check waitFor(syncManagerOneSlotTest()) == true - test "SyncManager one-peer-group test": + timedTest "SyncManager one-peer-group test": check waitFor(syncManagerOneGroupTest()) == true - test "SyncManager group-recovery test": + timedTest "SyncManager group-recovery test": check waitFor(syncManagerGroupRecoveryTest()) == true - test "SyncManager failure test": + timedTest "SyncManager failure test": check waitFor(syncManagerFailureTest()) == true From 98dc7014736b8b48bed3d5eeb9dbf421fcdfb38f Mon Sep 17 00:00:00 2001 From: cheatfate Date: Fri, 24 Jan 2020 03:35:38 +0200 Subject: [PATCH 05/14] Add PeerPool.addPeer async version and tests. --- beacon_chain/peer_pool.nim | 188 +++++++++++++++++++++++++----------- tests/test_peer_pool.nim | 145 ++++++++++++++++++++------- tests/test_sync_manager.nim | 18 ++-- 3 files changed, 254 insertions(+), 97 deletions(-) diff --git a/beacon_chain/peer_pool.nim b/beacon_chain/peer_pool.nim index 316ac682f..0042bff1c 100644 --- a/beacon_chain/peer_pool.nim +++ b/beacon_chain/peer_pool.nim @@ -19,8 +19,10 @@ type cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.} PeerPool*[A, B] = ref object - incNeEvent: AsyncEvent - outNeEvent: AsyncEvent + incNotEmptyEvent: AsyncEvent + outNotEmptyEvent: AsyncEvent + incNotFullEvent: AsyncEvent + outNotFullEvent: AsyncEvent incQueue: HeapQueue[PeerIndex] outQueue: HeapQueue[PeerIndex] registry: Table[B, PeerIndex] @@ -39,27 +41,35 @@ type proc `<`*(a, b: PeerIndex): bool = result = a.cmp(b, a) -proc fireEvent[A, B](pool: PeerPool[A, B], item: PeerItem[A]) {.inline.} = +proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B], + item: PeerItem[A]) {.inline.} = if item.peerType == PeerType.Incoming: - pool.incNeEvent.fire() + pool.incNotEmptyEvent.fire() elif item.peerType == PeerType.Outgoing: - pool.outNeEvent.fire() + pool.outNotEmptyEvent.fire() -proc waitEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]) {.async.} = +proc fireNotFullEvent[A, B](pool: PeerPool[A, B], + item: PeerItem[A]) {.inline.} = + if item.peerType == PeerType.Incoming: + pool.incNotFullEvent.fire() + elif item.peerType == PeerType.Outgoing: + pool.outNotFullEvent.fire() + +proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], + filter: set[PeerType]) {.async.} = if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: - var fut1 = pool.incNeEvent.wait() - var fut2 = pool.outNeEvent.wait() + var fut1 = pool.incNotEmptyEvent.wait() + var fut2 = pool.outNotEmptyEvent.wait() try: discard await one(fut1, fut2) if fut1.finished: if not(fut2.finished): fut2.cancel() - pool.incNeEvent.clear() + pool.incNotEmptyEvent.clear() else: if not(fut1.finished): fut1.cancel() - pool.outNeEvent.clear() + pool.outNotEmptyEvent.clear() except CancelledError: if not(fut1.finished): fut1.cancel() @@ -67,11 +77,20 @@ proc waitEvent[A, B](pool: PeerPool[A, B], fut2.cancel() raise elif PeerType.Incoming in filter: - await pool.incNeEvent.wait() - pool.incNeEvent.clear() + await pool.incNotEmptyEvent.wait() + pool.incNotEmptyEvent.clear() elif PeerType.Outgoing in filter: - await pool.outNeEvent.wait() - pool.outNeEvent.clear() + await pool.outNotEmptyEvent.wait() + pool.outNotEmptyEvent.clear() + +proc waitNotFullEvent[A, B](pool: PeerPool[A, B], + peerType: PeerType) {.async.} = + if peerType == PeerType.Incoming: + await pool.incNotFullEvent.wait() + pool.incNotFullEvent.clear() + elif peerType == PeerType.Outgoing: + await pool.outNotFullEvent.wait() + pool.outNotFullEvent.clear() template getItem[A, B](pool: PeerPool[A, B], filter: set[PeerType]): ptr PeerItem[A] = @@ -126,13 +145,15 @@ proc newPeerPool*[A, B](maxPeers = -1, doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers) res.maxPeersCount = if maxPeers < 0: high(int) - else: maxPeers + else: maxPeers res.maxIncPeersCount = if maxIncomingPeers < 0: high(int) - else: maxIncomingPeers + else: maxIncomingPeers res.maxOutPeersCount = if maxOutgoingPeers < 0: high(int) - else: maxOutgoingPeers - res.incNeEvent = newAsyncEvent() - res.outNeEvent = newAsyncEvent() + else: maxOutgoingPeers + res.incNotEmptyEvent = newAsyncEvent() + res.outNotEmptyEvent = newAsyncEvent() + res.incNotFullEvent = newAsyncEvent() + res.outNotFullEvent = newAsyncEvent() res.incQueue = initHeapQueue[PeerIndex]() res.outQueue = initHeapQueue[PeerIndex]() res.registry = initTable[B, PeerIndex]() @@ -192,6 +213,8 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = elif item[].peerType == PeerType.Outgoing: dec(pool.curOutPeersCount) dec(pool.acqOutPeersCount) + + pool.fireNotFullEvent(item[]) # Cleanup storage with default item, and removing key from hashtable. pool.storage[pindex] = PeerItem[A]() pool.registry.del(key) @@ -212,68 +235,125 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = pool.outQueue.del(i) break dec(pool.curOutPeersCount) + + pool.fireNotFullEvent(item[]) # Cleanup storage with default item, and removing key from hashtable. pool.storage[pindex] = PeerItem[A]() pool.registry.del(key) result = true -proc addPeer*[A, B](pool: PeerPool[A, B], peer: A, peerType: PeerType): bool = +proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, + peerType: PeerType): PeerIndex = + proc onPeerClosed(udata: pointer) {.gcsafe.} = + discard pool.deletePeer(peer) + + var item = PeerItem[A](data: peer, peerType: peerType, + index: len(pool.storage)) + pool.storage.add(item) + var pitem = addr(pool.storage[^1]) + let pindex = PeerIndex(data: item.index, cmp: pool.cmp) + pool.registry[peerKey] = pindex + pitem[].data.getFuture().addCallback(onPeerClosed) + result = pindex + +proc addPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A, peerType: PeerType): bool = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## Procedure returns ``false`` in case ## * if ``peer`` is already closed. + ## * if ``pool`` already has peer ``peer`` inside. ## * if ``pool`` currently has a maximum of peers. ## * if ``pool`` currently has a maximum of `Incoming` or `Outgoing` peers. ## ## Procedure returns ``true`` on success. mixin getKey, getFuture - if peer.getFuture().finished: - return false + result = false + let peerKey = getKey(peer) - if len(pool.registry) >= pool.maxPeersCount: - return false + if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished): + if len(pool.registry) < pool.maxPeersCount: + if peerType == PeerType.Incoming: + if pool.curIncPeersCount < pool.maxIncPeersCount: + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curIncPeersCount) + pool.incQueue.push(pindex) + pool.incNotEmptyEvent.fire() + result = true + elif peerType == PeerType.Outgoing: + if pool.curOutPeersCount < pool.maxOutPeersCount: + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curOutPeersCount) + pool.outQueue.push(pindex) + pool.outNotEmptyEvent.fire() + result = true - var item = PeerItem[A](data: peer, peerType: peerType, - index: len(pool.storage)) - var key = getKey(peer) +proc addPeer*[A, B](pool: PeerPool[A, B], + peer: A, peerType: PeerType): Future[bool] {.async.} = + ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. + ## + ## This procedure will wait for an empty space in PeerPool ``pool``, if + ## PeerPool ``pool`` is full. + ## + ## Procedure returns ``false`` in case: + ## * if ``peer`` is already closed. + ## * if ``pool`` already has peer ``peer`` inside. + ## + ## Procedure returns ``true`` on success. + mixin getKey, getFuture - if not(pool.registry.hasKey(key)): - pool.storage.add(item) - var pitem = addr(pool.storage[^1]) - let pindex = PeerIndex(data: item.index, cmp: pool.cmp) - pool.registry[key] = pindex - - proc onPeerClosed(udata: pointer) {.gcsafe.} = - discard pool.deletePeer(peer) - - pitem[].data.getFuture().addCallback(onPeerClosed) + var res = false + let peerKey = getKey(peer) + if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished): + if len(pool.registry) >= pool.maxPeersCount: + await pool.waitNotFullEvent(peerType) if peerType == PeerType.Incoming: if pool.curIncPeersCount >= pool.maxIncPeersCount: - return false - else: - inc(pool.curIncPeersCount) - pool.incQueue.push(pindex) - pool.incNeEvent.fire() + await pool.waitNotFullEvent(peerType) + + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curIncPeersCount) + pool.incQueue.push(pindex) + pool.incNotEmptyEvent.fire() + res = true elif peerType == PeerType.Outgoing: if pool.curOutPeersCount >= pool.maxOutPeersCount: - return false - else: - inc(pool.curOutPeersCount) - pool.outQueue.push(pindex) - pool.outNeEvent.fire() + await pool.waitNotFullEvent(peerType) - result = true + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curOutPeersCount) + pool.outQueue.push(pindex) + pool.outNotEmptyEvent.fire() + res = true -proc addIncomingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = + result = res + +proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A): bool {.inline.} = + ## Add incoming peer ``peer`` to PeerPool ``pool``. + ## + ## Returns ``true`` on success. + result = pool.addPeerNoWait(peer, PeerType.Incoming) + +proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A): bool {.inline.} = + ## Add outgoing peer ``peer`` to PeerPool ``pool``. + ## + ## Returns ``true`` on success. + result = pool.addPeerNoWait(peer, PeerType.Outgoing) + +proc addIncomingPeer*[A, B](pool: PeerPool[A, B], + peer: A): Future[bool] {.inline.} = ## Add incoming peer ``peer`` to PeerPool ``pool``. ## ## Returns ``true`` on success. result = pool.addPeer(peer, PeerType.Incoming) -proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = +proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], + peer: A): Future[bool] {.inline.} = ## Add outgoing peer ``peer`` to PeerPool ``pool``. ## ## Returns ``true`` on success. @@ -291,7 +371,7 @@ proc acquire*[A, B](pool: PeerPool[A, B], if PeerType.Outgoing in filter: count = count + len(pool.outQueue) if count == 0: - await pool.waitEvent(filter) + await pool.waitNotEmptyEvent(filter) else: var item = pool.getItem(filter) doAssert(PeerFlags.Acquired notin item[].flags) @@ -341,7 +421,7 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = elif item[].peerType == PeerType.Outgoing: pool.outQueue.push(titem) dec(pool.acqOutPeersCount) - pool.fireEvent(item[]) + pool.fireNotEmptyEvent(item[]) proc release*[A, B](pool: PeerPool[A, B], peers: openarray[A]) {.inline.} = ## Release array of peers ``peers`` back to PeerPool ``pool``. @@ -367,7 +447,7 @@ proc acquire*[A, B](pool: PeerPool[A, B], if PeerType.Outgoing in filter: count = count + len(pool.outQueue) if count == 0: - await pool.waitEvent(filter) + await pool.waitNotEmptyEvent(filter) else: var item = pool.getItem(filter) doAssert(PeerFlags.Acquired notin item[].flags) diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 56fcd6ed4..b689ad270 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -37,7 +37,7 @@ proc close*(peer: PeerTest) = peer.future.complete() suite "PeerPool testing suite": - timedTest "addPeer() test": + timedTest "addPeerNoWait() test": const peersCount = [ [10, 5, 5, 10, 5, 5], [-1, 5, 5, 10, 5, 5], @@ -47,23 +47,100 @@ suite "PeerPool testing suite": var pool = newPeerPool[PeerTest, PeerTestID](item[0], item[1], item[2]) for i in 0 ..< item[4]: var peer = PeerTest.init("idInc" & $i) - check pool.addIncomingPeer(peer) == true + check pool.addIncomingPeerNoWait(peer) == true for i in 0 ..< item[5]: var peer = PeerTest.init("idOut" & $i) - check pool.addOutgoingPeer(peer) == true + check pool.addOutgoingPeerNoWait(peer) == true var peer = PeerTest.init("idCheck") if item[1] != -1: for i in 0 ..< item[3]: - check pool.addIncomingPeer(peer) == false + check pool.addIncomingPeerNoWait(peer) == false if item[2] != -1: for i in 0 ..< item[3]: - check pool.addOutgoingPeer(peer) == false + check pool.addOutgoingPeerNoWait(peer) == false check: pool.lenAvailable == item[3] pool.lenAvailable({PeerType.Incoming}) == item[4] pool.lenAvailable({PeerType.Outgoing}) == item[5] + + timedTest "addPeer() test": + proc testAddPeer1(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 1, + maxIncomingPeers = 1, + maxOutgoingPeers = 0) + var peer0 = PeerTest.init("idInc0") + var peer1 = PeerTest.init("idOut0") + var peer2 = PeerTest.init("idInc1") + var fut0 = pool.addIncomingPeer(peer0) + var fut1 = pool.addOutgoingPeer(peer1) + var fut2 = pool.addIncomingPeer(peer2) + doAssert(fut0.finished == true and fut0.failed == false) + doAssert(fut1.finished == false) + doAssert(fut2.finished == false) + peer0.close() + await sleepAsync(100.milliseconds) + doAssert(fut1.finished == false) + doAssert(fut2.finished == true and fut2.failed == false) + result = true + + proc testAddPeer2(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 2, + maxIncomingPeers = 1, + maxOutgoingPeers = 1) + var peer0 = PeerTest.init("idInc0") + var peer1 = PeerTest.init("idOut0") + var peer2 = PeerTest.init("idInc1") + var peer3 = PeerTest.init("idOut1") + var fut0 = pool.addIncomingPeer(peer0) + var fut1 = pool.addOutgoingPeer(peer1) + var fut2 = pool.addIncomingPeer(peer2) + var fut3 = pool.addOutgoingPeer(peer3) + doAssert(fut0.finished == true and fut0.failed == false) + doAssert(fut1.finished == true and fut1.failed == false) + doAssert(fut2.finished == false) + doAssert(fut3.finished == false) + peer0.close() + await sleepAsync(100.milliseconds) + doAssert(fut2.finished == true and fut2.failed == false) + doAssert(fut3.finished == false) + peer1.close() + await sleepAsync(100.milliseconds) + doAssert(fut3.finished == true and fut3.failed == false) + result = true + + proc testAddPeer3(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 3, + maxIncomingPeers = 1, + maxOutgoingPeers = 1) + var peer0 = PeerTest.init("idInc0") + var peer1 = PeerTest.init("idInc1") + var peer2 = PeerTest.init("idOut0") + var peer3 = PeerTest.init("idOut1") + + var fut0 = pool.addIncomingPeer(peer0) + var fut1 = pool.addIncomingPeer(peer1) + var fut2 = pool.addOutgoingPeer(peer2) + var fut3 = pool.addOutgoingPeer(peer3) + doAssert(fut0.finished == true and fut0.failed == false) + doAssert(fut1.finished == false) + doAssert(fut2.finished == true and fut2.failed == false) + doAssert(fut3.finished == false) + peer0.close() + await sleepAsync(100.milliseconds) + doAssert(fut1.finished == true and fut1.failed == false) + doAssert(fut3.finished == false) + peer2.close() + await sleepAsync(100.milliseconds) + doAssert(fut3.finished == true and fut3.failed == false) + result = true + + check: + waitFor(testAddPeer1()) == true + waitFor(testAddPeer2()) == true + waitFor(testAddPeer3()) == true + timedTest "Acquire from empty pool": var pool0 = newPeerPool[PeerTest, PeerTestID]() var pool1 = newPeerPool[PeerTest, PeerTestID]() @@ -92,10 +169,10 @@ suite "PeerPool testing suite": var peer21 = PeerTest.init("peer21") var peer22 = PeerTest.init("peer22") check: - pool1.addPeer(peer11, PeerType.Incoming) == true - pool1.addPeer(peer12, PeerType.Incoming) == true - pool2.addPeer(peer21, PeerType.Outgoing) == true - pool2.addPeer(peer22, PeerType.Outgoing) == true + pool1.addPeerNoWait(peer11, PeerType.Incoming) == true + pool1.addPeerNoWait(peer12, PeerType.Incoming) == true + pool2.addPeerNoWait(peer21, PeerType.Outgoing) == true + pool2.addPeerNoWait(peer22, PeerType.Outgoing) == true var itemFut11 = pool1.acquire({PeerType.Outgoing}) var itemFut12 = pool1.acquire(10, {PeerType.Outgoing}) @@ -179,9 +256,9 @@ suite "PeerPool testing suite": var peer = PeerTest.init("peer" & $i, rand(MaxNumber)) # echo repr peer if rand(100) mod 2 == 0: - check pool.addPeer(peer, PeerType.Incoming) == true + check pool.addPeerNoWait(peer, PeerType.Incoming) == true else: - check pool.addPeer(peer, PeerType.Outgoing) == true + check pool.addPeerNoWait(peer, PeerType.Outgoing) == true check waitFor(testAcquireRelease()) == TestsCount @@ -191,7 +268,7 @@ suite "PeerPool testing suite": var peer = PeerTest.init("deletePeer") ## Delete available peer - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -204,7 +281,7 @@ suite "PeerPool testing suite": ## Delete acquired peer peer = PeerTest.init("closingPeer") - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -223,7 +300,7 @@ suite "PeerPool testing suite": ## Force delete acquired peer peer = PeerTest.init("closingPeer") - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -244,7 +321,7 @@ suite "PeerPool testing suite": var peer = PeerTest.init("closingPeer") ## Close available peer - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -259,7 +336,7 @@ suite "PeerPool testing suite": ## Close acquired peer peer = PeerTest.init("closingPeer") - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -292,9 +369,9 @@ suite "PeerPool testing suite": var peer3 = PeerTest.init("peer3", 8) check: - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true pool.lenAvailable == 3 pool.lenAvailable({PeerType.Outgoing}) == 1 pool.lenAvailable({PeerType.Incoming}) == 2 @@ -311,9 +388,9 @@ suite "PeerPool testing suite": pool.len == 0 check: - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true pool.lenAvailable == 3 pool.lenAvailable({PeerType.Outgoing}) == 1 pool.lenAvailable({PeerType.Incoming}) == 2 @@ -339,9 +416,9 @@ suite "PeerPool testing suite": var peer3 = PeerTest.init("peer3", 8) check: - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true pool.hasPeer("peer4") == false pool.hasPeer("peer1") == true pool.hasPeer("peer2") == true @@ -374,16 +451,16 @@ suite "PeerPool testing suite": var peer9 = PeerTest.init("peer9", 2) check: - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Incoming) == true - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer4, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Incoming) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer4, PeerType.Incoming) == true - pool.addPeer(peer5, PeerType.Outgoing) == true - pool.addPeer(peer8, PeerType.Outgoing) == true - pool.addPeer(peer7, PeerType.Outgoing) == true - pool.addPeer(peer6, PeerType.Outgoing) == true - pool.addPeer(peer9, PeerType.Outgoing) == true + pool.addPeerNoWait(peer5, PeerType.Outgoing) == true + pool.addPeerNoWait(peer8, PeerType.Outgoing) == true + pool.addPeerNoWait(peer7, PeerType.Outgoing) == true + pool.addPeerNoWait(peer6, PeerType.Outgoing) == true + pool.addPeerNoWait(peer9, PeerType.Outgoing) == true var total1, total2, total3: seq[PeerTest] var avail1, avail2, avail3: seq[PeerTest] diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index a5ab56169..100a9bd50 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -613,7 +613,7 @@ proc syncManagerOnePeerTest(): Future[bool] {.async.} = result = true peer.update(srcChain) - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -672,9 +672,9 @@ proc syncManagerOneSlotTest(): Future[bool] {.async.} = for i in 0 ..< len(peers): peers[i].update(srcChain) - doAssert(pool.addIncomingPeer(peers[0]) == true) - doAssert(pool.addOutgoingPeer(peers[1]) == true) - doAssert(pool.addOutgoingPeer(peers[2]) == true) + doAssert(pool.addIncomingPeerNoWait(peers[0]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[1]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[2]) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -734,9 +734,9 @@ proc syncManagerOneGroupTest(): Future[bool] {.async.} = for i in 0 ..< len(peers): peers[i].update(srcChain) if i mod 2 == 0: - doAssert(pool.addIncomingPeer(peers[i]) == true) + doAssert(pool.addIncomingPeerNoWait(peers[i]) == true) else: - doAssert(pool.addOutgoingPeer(peers[i]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[i]) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -805,9 +805,9 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = for i in 0 ..< len(peers): if i mod 2 == 0: - doAssert(pool.addIncomingPeer(peers[i]) == true) + doAssert(pool.addIncomingPeerNoWait(peers[i]) == true) else: - doAssert(pool.addOutgoingPeer(peers[i]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[i]) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -870,7 +870,7 @@ proc syncManagerFailureTest(): Future[bool] {.async.} = dstChain.add(item) result = true - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, From 6b45c563192876e9b7cc78acba54fb56c5413302 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Wed, 29 Jan 2020 18:16:12 +0100 Subject: [PATCH 06/14] update .appveyor.yml --- .appveyor.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.appveyor.yml b/.appveyor.yml index 890b0b86c..af7e52d38 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -37,6 +37,7 @@ build_script: test_script: # the "go-checks" target fails in AppVeyor, for some reason; easier to disable than to debug - mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_GO_CHECKS=1 P2PD_CACHE=p2pdCache + - mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_GO_CHECKS=1 P2PD_CACHE=p2pdCache NIMFLAGS="-d:NETWORK_TYPE=libp2p" - mingw32-make -j2 ARCH_OVERRIDE=%PLATFORM% DISABLE_TEST_FIXTURES_SCRIPT=1 DISABLE_GO_CHECKS=1 test deploy: off From 1c21e01de98f6c58eaae300d57692fa535b569a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Tue, 28 Jan 2020 11:20:55 +0100 Subject: [PATCH 07/14] Azure Pipelines workaround for a broken "Git for Windows" install in their latest image --- azure-pipelines.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index d75d41e07..969fc54a2 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -41,6 +41,8 @@ jobs: - bash: | set -e + # https://developercommunity.visualstudio.com/content/problem/891929/windows-2019-cygheap-base-mismatch-detected-git-ba.html + export PATH="/mingw64/bin:/usr/bin:$PATH" echo "Installing MinGW-w64" if [[ $PLATFORM == "x86" ]]; then MINGW_FILE="i686-8.1.0-release-posix-dwarf-rt_v6-rev0.7z" @@ -64,6 +66,7 @@ jobs: export PATH="/c/custom/${MINGW_DIR}/bin:$PATH" echo "Fetching submodules" git config --global core.longpaths true + git config --global core.autocrlf false git submodule --quiet update --init --recursive scripts/setup_official_tests.sh jsonTestsCache mingw32-make -j2 ARCH_OVERRIDE=${PLATFORM} CI_CACHE=NimBinaries update From 45dd12cf3fcfaa1ca155c2738f0d39e783d9d553 Mon Sep 17 00:00:00 2001 From: Dustin Brody Date: Tue, 28 Jan 2020 22:24:45 +0100 Subject: [PATCH 08/14] update process_deposit() to actually check is_valid_merkle_branch() unless skipValidation specified --- beacon_chain/spec/beaconstate.nim | 23 ++++++++--------------- research/state_sim.nim | 5 +++-- tests/test_beaconstate.nim | 7 ++++--- tests/test_state_transition.nim | 6 +++--- 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index 45a46cabb..e5e14bbec 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -47,27 +47,20 @@ func decrease_balance*( else: state.balances[index] - delta -# https://github.com/ethereum/eth2.0-specs/blob/v0.8.4/specs/core/0_beacon-chain.md#deposits -func process_deposit*( +# https://github.com/ethereum/eth2.0-specs/blob/v0.9.4/specs/core/0_beacon-chain.md#deposits +proc process_deposit*( state: var BeaconState, deposit: Deposit, flags: UpdateFlags = {}): bool {.nbench.}= # Process an Eth1 deposit, registering a validator or increasing its balance. # Verify the Merkle branch - # TODO enable this check, but don't use doAssert - if not is_valid_merkle_branch( - hash_tree_root(deposit.getDepositMessage), - deposit.proof, - DEPOSIT_CONTRACT_TREE_DEPTH, - state.eth1_deposit_index, + if skipValidation notin flags and not is_valid_merkle_branch( + hash_tree_root(deposit.data), + deposit.proof, + DEPOSIT_CONTRACT_TREE_DEPTH + 1, + state.eth1_deposit_index, state.eth1_data.deposit_root, ): - ## TODO: a notice-like mechanism which works in a func - ## here and elsewhere, one minimal approach is a check-if-true - ## and return false iff so. - ## obviously, better/more principled ones exist, but - ## generally require broader rearchitecting, and this is what - ## mostly happens now, just haphazardly. - discard + return false # Deposits must be processed in order state.eth1_deposit_index += 1 diff --git a/research/state_sim.nim b/research/state_sim.nim index b980cab08..361caf8b7 100644 --- a/research/state_sim.nim +++ b/research/state_sim.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2019 Status Research & Development GmbH +# Copyright (c) 2019-2020 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -78,7 +78,8 @@ cli do(slots = SLOTS_PER_EPOCH * 6, let genesisState = - initialize_beacon_state_from_eth1(Eth2Digest(), 0, deposits, flags) + initialize_beacon_state_from_eth1( + Eth2Digest(), 0, deposits, {skipValidation}) genesisBlock = get_initial_beacon_block(genesisState) echo "Starting simulation..." diff --git a/tests/test_beaconstate.nim b/tests/test_beaconstate.nim index a9de1ecb5..26f81a8c1 100644 --- a/tests/test_beaconstate.nim +++ b/tests/test_beaconstate.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018 Status Research & Development GmbH +# Copyright (c) 2018-2020 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -10,11 +10,12 @@ import times, unittest, ./testutil, ./testblockutil, - ../beacon_chain/spec/[beaconstate, datatypes, digest] + ../beacon_chain/spec/[beaconstate, datatypes, digest], + ../beacon_chain/extras suite "Beacon state" & preset(): timedTest "Smoke test initialize_beacon_state_from_eth1" & preset(): let state = initialize_beacon_state_from_eth1( Eth2Digest(), 0, - makeInitialDeposits(SLOTS_PER_EPOCH, {}), {}) + makeInitialDeposits(SLOTS_PER_EPOCH, {}), {skipValidation}) check: state.validators.len == SLOTS_PER_EPOCH diff --git a/tests/test_state_transition.nim b/tests/test_state_transition.nim index e067e34a7..566ac7e06 100644 --- a/tests/test_state_transition.nim +++ b/tests/test_state_transition.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018 Status Research & Development GmbH +# Copyright (c) 2018-2020 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -11,7 +11,7 @@ import unittest, ./testutil, ./testblockutil, ../beacon_chain/spec/[beaconstate, datatypes, digest, validator], - ../beacon_chain/[state_transition, ssz] + ../beacon_chain/[extras, state_transition, ssz] suite "Block processing" & preset(): ## For now just test that we can compile and execute block processing with @@ -22,7 +22,7 @@ suite "Block processing" & preset(): # TODO bls verification is a bit of a bottleneck here genesisState = initialize_beacon_state_from_eth1( Eth2Digest(), 0, - makeInitialDeposits(), {}) + makeInitialDeposits(), {skipValidation}) genesisBlock = get_initial_beacon_block(genesisState) genesisRoot = hash_tree_root(genesisBlock.message) From 9748b2606ecd6f1c80567b6aa5b0626f4c5706f6 Mon Sep 17 00:00:00 2001 From: Dustin Brody Date: Thu, 30 Jan 2020 11:54:15 +0100 Subject: [PATCH 09/14] update attestations and voluntary exit operations test to 0.10.1 --- .../test_fixture_operations_attestations.nim | 29 ++++++++++++------- ...test_fixture_operations_voluntary_exit.nim | 10 ++++--- tests/official/test_fixture_sanity_slots.nim | 2 ++ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/tests/official/test_fixture_operations_attestations.nim b/tests/official/test_fixture_operations_attestations.nim index 4127c99b0..8a9d6ff6e 100644 --- a/tests/official/test_fixture_operations_attestations.nim +++ b/tests/official/test_fixture_operations_attestations.nim @@ -66,18 +66,27 @@ template runTest(testName: string, identifier: untyped) = `testImpl _ operations_attestations _ identifier`() suite "Official - Operations - Attestations " & preset(): - runTest("success", success) - runTest("success previous epoch", success_previous_epoch) - runTest("invalid attestation signature", invalid_attestation_signature) - runTest("before inclusion delay", before_inclusion_delay) + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/minimal/phase0/operations/attestation/pyspec_tests + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/mainnet/phase0/operations/attestation/pyspec_tests runTest("after_epoch_slots", after_epoch_slots) + runTest("bad source root", bad_source_root) + runTest("before inclusion delay", before_inclusion_delay) + runTest("empty aggregation bits", empty_aggregation_bits) + runTest("future target epoch", future_target_epoch) + runTest("invalid attestation signature", invalid_attestation_signature) + runTest("invalid current source root", invalid_current_source_root) + runTest("invalid index", invalid_index) + runTest("mismatched target and slot", mismatched_target_and_slot) + runTest("new source epoch", new_source_epoch) runTest("old source epoch", old_source_epoch) runTest("old target epoch", old_target_epoch) - runTest("future target epoch", future_target_epoch) - runTest("new source epoch", new_source_epoch) runTest("source root is target root", source_root_is_target_root) - runTest("invalid current source root", invalid_current_source_root) - runTest("bad source root", bad_source_root) - runTest("empty aggregation bits", empty_aggregation_bits) - runTest("too many aggregation bits", too_many_aggregation_bits) + runTest("success", success) + runTest("success multi-proposer index interations", + success_multi_proposer_index_iterations) + runTest("success previous epoch", success_previous_epoch) runTest("too few aggregation bits", too_few_aggregation_bits) + runTest("too many aggregation bits", too_many_aggregation_bits) + runTest("wrong index for committee signature", + wrong_index_for_committee_signature) + runTest("wrong index for slot", wrong_index_for_slot) diff --git a/tests/official/test_fixture_operations_voluntary_exit.nim b/tests/official/test_fixture_operations_voluntary_exit.nim index 00521f631..943921e6d 100644 --- a/tests/official/test_fixture_operations_voluntary_exit.nim +++ b/tests/official/test_fixture_operations_voluntary_exit.nim @@ -64,16 +64,18 @@ template runTest(identifier: untyped) = `testImpl _ voluntary_exit _ identifier`() suite "Official - Operations - Voluntary exit " & preset(): + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/minimal/phase0/operations/voluntary_exit/pyspec_tests + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/mainnet/phase0/operations/voluntary_exit/pyspec_tests runTest(success) when false: # TODO not sure how this particularly could falsely succeed runTest(invalid_signature) + runTest(validator_invalid_validator_index) + runTest(validator_already_exited) runTest(success_exit_queue) runTest(validator_exit_in_future) - runTest(validator_invalid_validator_index) - runTest(validator_not_active) - runTest(validator_already_exited) + runTest(default_exit_epoch_subsequent_exit) runTest(validator_not_active_long_enough) - + runTest(validator_not_active) diff --git a/tests/official/test_fixture_sanity_slots.nim b/tests/official/test_fixture_sanity_slots.nim index 8c6f69369..a20479750 100644 --- a/tests/official/test_fixture_sanity_slots.nim +++ b/tests/official/test_fixture_sanity_slots.nim @@ -47,6 +47,8 @@ template runTest(testName: string, identifier: untyped, num_slots: uint64): unty # --------------------------------------------------------------- suite "Official - Sanity - Slots " & preset(): + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/minimal/phase0/sanity/slots/pyspec_tests + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/mainnet/phase0/sanity/slots/pyspec_tests runTest("Advance 1 slot", slots_1, 1) runTest("Advance 2 slots", slots_2, 2) runTest("Advance an empty epoch", empty_epoch, SLOTS_PER_EPOCH) From 2591be8796d3f09f2ead643120a0fe0d4096c0aa Mon Sep 17 00:00:00 2001 From: Dustin Brody Date: Thu, 30 Jan 2020 13:40:08 +0100 Subject: [PATCH 10/14] re-organize/shuffle proposer_slashing operations test runner for easy consistency-with-alphabetical-GitHub checking --- .../test_fixture_operations_proposer_slashings.nim | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/official/test_fixture_operations_proposer_slashings.nim b/tests/official/test_fixture_operations_proposer_slashings.nim index f5e1b0409..50375bc1f 100644 --- a/tests/official/test_fixture_operations_proposer_slashings.nim +++ b/tests/official/test_fixture_operations_proposer_slashings.nim @@ -66,13 +66,15 @@ template runTest(identifier: untyped) = `testImpl_proposer_slashing _ identifier`() suite "Official - Operations - Proposer slashing " & preset(): - runTest(success) - runTest(invalid_sig_1) - runTest(invalid_sig_2) - runTest(invalid_sig_1_and_2) - runTest(invalid_proposer_index) + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/minimal/phase0/operations/proposer_slashing/pyspec_tests + # https://github.com/ethereum/eth2.0-spec-tests/tree/v0.10.1/tests/mainnet/phase0/operations/proposer_slashing/pyspec_tests runTest(epochs_are_different) runTest(headers_are_same) + runTest(invalid_proposer_index) + runTest(invalid_sig_1) + runTest(invalid_sig_1_and_2) + runTest(invalid_sig_2) runTest(proposer_is_not_activated) runTest(proposer_is_slashed) runTest(proposer_is_withdrawn) + runTest(success) From 28264aa29368cdd3af0c0d2e291acb92c7017223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Sat, 18 Jan 2020 16:42:04 +0100 Subject: [PATCH 11/14] Nim 1.0.6 RC --- vendor/nimbus-build-system | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 6cfabf782..0dcb8e0c3 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 6cfabf7820834cb99bd30fc664d3ab91eb0493bf +Subproject commit 0dcb8e0c37b7654fee29ed1c1eefdfe9794db396 From 2a7ef4e5e52b873a683394943edbc66eff9f3244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C8=98tefan=20Talpalaru?= Date: Mon, 27 Jan 2020 15:26:43 +0100 Subject: [PATCH 12/14] Nim 1.0.6 --- vendor/nimbus-build-system | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/nimbus-build-system b/vendor/nimbus-build-system index 0dcb8e0c3..2a70c4f15 160000 --- a/vendor/nimbus-build-system +++ b/vendor/nimbus-build-system @@ -1 +1 @@ -Subproject commit 0dcb8e0c37b7654fee29ed1c1eefdfe9794db396 +Subproject commit 2a70c4f152ee849db1ededa92c1d80f7102dd718 From 0d9503ee499fb7e39face7c82b871ab911e66069 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 3 Feb 2020 17:06:35 +0100 Subject: [PATCH 13/14] Allow run-time switching to the TRACE log level --- scripts/connect_to_testnet.nims | 2 +- tests/simulation/start.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/connect_to_testnet.nims b/scripts/connect_to_testnet.nims index f3f6e23e3..c90aeb975 100644 --- a/scripts/connect_to_testnet.nims +++ b/scripts/connect_to_testnet.nims @@ -75,7 +75,7 @@ cli do (testnetName {.argument.}: string): validatorsDir = dataDir / "validators" dumpDir = dataDir / "dump" beaconNodeBinary = buildDir / "beacon_node_" & dataDirName - nimFlags = "-d:chronicles_log_level=DEBUG " & getEnv("NIM_PARAMS") + nimFlags = "-d:chronicles_log_level=TRACE " & getEnv("NIM_PARAMS") let depositContractFile = testnetDir / depositContractFileName if system.fileExists(depositContractFile): diff --git a/tests/simulation/start.sh b/tests/simulation/start.sh index 7edcbbb7c..3c4d63fd4 100755 --- a/tests/simulation/start.sh +++ b/tests/simulation/start.sh @@ -16,7 +16,7 @@ mkdir -p "$VALIDATORS_DIR" cd "$GIT_ROOT" -NIMFLAGS="-d:chronicles_log_level=DEBUG --hints:off --warnings:off --verbosity:0 --opt:speed --debuginfo" +NIMFLAGS="-d:chronicles_log_level=TRACE --hints:off --warnings:off --verbosity:0 --opt:speed --debuginfo" # Run with "SLOTS_PER_EPOCH=8 ./start.sh" to change these DEFS="" From e4922cb1776f1c530f5af61f966b5145659628dc Mon Sep 17 00:00:00 2001 From: Dustin Brody Date: Thu, 30 Jan 2020 15:03:26 +0100 Subject: [PATCH 14/14] implement compute_signing_root() and mark some functions as 0.10.1-compatible --- beacon_chain/spec/beaconstate.nim | 2 +- beacon_chain/spec/helpers.nim | 12 +++++++++++- beacon_chain/spec/state_transition_block.nim | 4 ++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index e5e14bbec..14f44fbe9 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -373,7 +373,7 @@ proc is_valid_indexed_attestation*( notice "indexed attestation: validator index beyond max validators per committee" return false - # Verify indices are sorted + # Verify indices are sorted and unique # TODO but why? this is a local artifact if indices != sorted(indices, system.cmp): notice "indexed attestation: indices not sorted" diff --git a/beacon_chain/spec/helpers.nim b/beacon_chain/spec/helpers.nim index def556861..572a7d316 100644 --- a/beacon_chain/spec/helpers.nim +++ b/beacon_chain/spec/helpers.nim @@ -13,7 +13,7 @@ import # Third-party blscurve, # defines Domain # Internal - ./datatypes, ./digest + ./datatypes, ./digest, ../ssz # https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#integer_squareroot func integer_squareroot*(n: SomeInteger): SomeInteger = @@ -144,6 +144,16 @@ func get_domain*( func get_domain*(state: BeaconState, domain_type: DomainType): Domain = get_domain(state, domain_type, get_current_epoch(state)) +# https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#compute_signing_root +func compute_signing_root*(ssz_object: auto, domain: Domain): Eth2Digest = + # Return the signing root of an object by calculating the root of the + # object-domain tree. + let domain_wrapped_object = SigningRoot( + object_root: hash_tree_root(ssz_object), + domain: domain + ) + hash_tree_root(domain_wrapped_object) + # https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#get_seed func get_seed*(state: BeaconState, epoch: Epoch, domain_type: DomainType): Eth2Digest = # Return the seed at ``epoch``. diff --git a/beacon_chain/spec/state_transition_block.nim b/beacon_chain/spec/state_transition_block.nim index 3b33c2664..b568a62f6 100644 --- a/beacon_chain/spec/state_transition_block.nim +++ b/beacon_chain/spec/state_transition_block.nim @@ -213,7 +213,7 @@ func is_slashable_attestation_data( (data_1.source.epoch < data_2.source.epoch and data_2.target.epoch < data_1.target.epoch) -# https://github.com/ethereum/eth2.0-specs/blob/v0.9.4/specs/core/0_beacon-chain.md#attester-slashings +# https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#attester-slashings proc process_attester_slashing*( state: var BeaconState, attester_slashing: AttesterSlashing, @@ -250,7 +250,7 @@ proc process_attester_slashing*( return false return true -# https://github.com/ethereum/eth2.0-specs/blob/v0.9.4/specs/core/0_beacon-chain.md#attester-slashings +# https://github.com/ethereum/eth2.0-specs/blob/v0.10.1/specs/phase0/beacon-chain.md#attester-slashings proc processAttesterSlashings(state: var BeaconState, blck: BeaconBlock, stateCache: var StateCache): bool {.nbench.}= # Process ``AttesterSlashing`` operation.