Initial commit.
This commit is contained in:
parent
0aaf8a7555
commit
73dc72583f
|
@ -34,6 +34,8 @@ type
|
|||
acqIncPeersCount: int
|
||||
acqOutPeersCount: int
|
||||
|
||||
PeerPoolError* = object of CatchableError
|
||||
|
||||
proc `<`*(a, b: PeerIndex): bool =
|
||||
result = a.cmp(b, a)
|
||||
|
||||
|
@ -219,9 +221,17 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
|||
proc addPeer*[A, B](pool: PeerPool[A, B], peer: A, peerType: PeerType): bool =
|
||||
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
## Procedure returns ``false`` in case
|
||||
## * if ``peer`` is already closed.
|
||||
## * if ``pool`` currently has a maximum of peers.
|
||||
## * if ``pool`` currently has a maximum of `Incoming` or `Outgoing` peers.
|
||||
##
|
||||
## Procedure returns ``true`` on success.
|
||||
mixin getKey, getFuture
|
||||
|
||||
if peer.getFuture().finished:
|
||||
return false
|
||||
|
||||
if len(pool.registry) >= pool.maxPeersCount:
|
||||
return false
|
||||
|
||||
|
@ -289,6 +299,22 @@ proc acquire*[A, B](pool: PeerPool[A, B],
|
|||
result = item[].data
|
||||
break
|
||||
|
||||
proc acquireNoWait*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): A =
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count < 1:
|
||||
raise newException(PeerPoolError, "Not enough peers in pool")
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
result = item[].data
|
||||
|
||||
proc release*[A, B](pool: PeerPool[A, B], peer: A) =
|
||||
## Release peer ``peer`` back to PeerPool ``pool``
|
||||
mixin getKey
|
||||
|
@ -356,6 +382,28 @@ proc acquire*[A, B](pool: PeerPool[A, B],
|
|||
raise
|
||||
result = peers
|
||||
|
||||
proc acquireNoWait*[A, B](pool: PeerPool[A, B],
|
||||
number: int,
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): seq[A] =
|
||||
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
|
||||
## filter ``filter``.
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
var peers = newSeq[A]()
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count < number:
|
||||
raise newException(PeerPoolError, "Not enough peers in pool")
|
||||
for i in 0 ..< number:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
peers.add(item[].data)
|
||||
result = peers
|
||||
|
||||
proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} =
|
||||
## Acquire single incoming peer from PeerPool ``pool``.
|
||||
pool.acquire({PeerType.Incoming})
|
||||
|
|
|
@ -0,0 +1,985 @@
|
|||
import chronicles
|
||||
import options, deques, heapqueue
|
||||
import spec/datatypes, spec/digest, stew/bitseqs, chronos
|
||||
import peer_pool
|
||||
export datatypes, digest
|
||||
|
||||
# logScope:
|
||||
# topics = "syncman"
|
||||
|
||||
const MAX_REQUESTED_BLOCKS* = 20'u64
|
||||
|
||||
type
|
||||
# A - Peer type
|
||||
# B - PeerID type
|
||||
#
|
||||
# getLastSlot(Peer): Slot
|
||||
# getHeadRoot(Peer): Eth2Digest
|
||||
# getBeaconBlocksByRange(Peer, Eth2Digest, Slot, uint64, uint64): Future[Option[seq[SignedBeaconBlock]]]
|
||||
# updateStatus(Peer): void
|
||||
|
||||
PeerSlot*[A, B] = ref object
|
||||
peers*: seq[A]
|
||||
man: SyncManager[A, B]
|
||||
|
||||
PeerGroup*[A, B] = ref object
|
||||
slots*: seq[PeerSlot[A, B]]
|
||||
man: SyncManager[A, B]
|
||||
|
||||
GetLastLocalSlotCallback* = proc(): Slot
|
||||
UpdateLocalBlocksCallback* = proc(list: openarray[SignedBeaconBlock]): bool
|
||||
|
||||
SyncManager*[A, B] = ref object
|
||||
groups*: seq[PeerGroup[A, B]]
|
||||
pool: PeerPool[A, B]
|
||||
peersInSlot: int
|
||||
slotsInGroup: int
|
||||
groupsCount: int
|
||||
failuresCount: int
|
||||
failurePause: chronos.Duration
|
||||
peerSlotTimeout: chronos.Duration
|
||||
peerGroupTimeout: chronos.Duration
|
||||
statusPeriod: chronos.Duration
|
||||
getLastLocalSlot: GetLastLocalSlotCallback
|
||||
updateLocalBlocks: UpdateLocalBlocksCallback
|
||||
|
||||
BlockList* = object
|
||||
list*: seq[SignedBeaconBlock]
|
||||
map*: BitSeq
|
||||
start*: Slot
|
||||
|
||||
OptionBlockList* = Option[BlockList]
|
||||
OptionBeaconBlockSeq* = Option[seq[SignedBeaconBlock]]
|
||||
|
||||
SyncRequest* = object
|
||||
slot*: Slot
|
||||
count*: uint64
|
||||
step*: uint64
|
||||
group*: int
|
||||
|
||||
SyncResult* = object
|
||||
request*: SyncRequest
|
||||
data*: seq[SignedBeaconBlock]
|
||||
|
||||
SyncQueue* = ref object
|
||||
inpSlot*: Slot
|
||||
outSlot*: Slot
|
||||
|
||||
startSlot*: Slot
|
||||
lastSlot: Slot
|
||||
chunkSize*: uint64
|
||||
queueSize*: int
|
||||
|
||||
notFullEvent*: AsyncEvent
|
||||
syncUpdate*: UpdateLocalBlocksCallback
|
||||
|
||||
debtsQueue: HeapQueue[SyncRequest]
|
||||
debtsCount: uint64
|
||||
readyQueue: HeapQueue[SyncResult]
|
||||
readyData: seq[seq[SignedBeaconBlock]]
|
||||
|
||||
SyncManagerError* = object of CatchableError
|
||||
|
||||
proc init*(t: typedesc[SyncQueue], start, last: Slot, chunkSize: uint64,
|
||||
updateCb: UpdateLocalBlocksCallback,
|
||||
queueSize: int = -1): SyncQueue =
|
||||
## Create new synchronization queue with parameters
|
||||
##
|
||||
## ``start`` and ``last`` are starting and finishing Slots.
|
||||
##
|
||||
## ``chunkSize`` maximum number of slots in one request.
|
||||
##
|
||||
## ``queueSize`` maximum queue size for incoming data. If ``queueSize > 0``
|
||||
## queue will help to keep backpressure under control. If ``queueSize <= 0``
|
||||
## then queue size is unlimited (default).
|
||||
##
|
||||
## ``updateCb`` procedure which will be used to send downloaded blocks to
|
||||
## consumer. Block sequences will be sent sequentially. Procedure should
|
||||
## return ``false`` only when it receives incorrect blocks, and ``true``
|
||||
## if sequence of blocks is correct.
|
||||
doAssert(chunkSize > 0'u64, "Chunk size should not be zero")
|
||||
result = SyncQueue(startSlot: start, lastSlot: last, chunkSize: chunkSize,
|
||||
queueSize: queueSize, syncUpdate: updateCb,
|
||||
notFullEvent: newAsyncEvent(),
|
||||
debtsQueue: initHeapQueue[SyncRequest](),
|
||||
inpSlot: start, outSlot: start)
|
||||
|
||||
proc `<`*(a, b: SyncRequest): bool {.inline.} =
|
||||
result = (a.slot < b.slot)
|
||||
|
||||
proc `<`*(a, b: SyncResult): bool {.inline.} =
|
||||
result = (a.request.slot < b.request.slot)
|
||||
|
||||
proc `==`*(a, b: SyncRequest): bool {.inline.} =
|
||||
result = ((a.slot == b.slot) and (a.count == b.count) and
|
||||
(a.step == b.step))
|
||||
|
||||
proc lastSlot*(req: SyncRequest): Slot {.inline.} =
|
||||
## Returns last slot for request ``req``.
|
||||
result = req.slot + req.count - 1'u64
|
||||
|
||||
proc updateLastSlot*(sq: SyncQueue, last: Slot) {.inline.} =
|
||||
## Update last slot stored in queue ``sq`` with value ``last``.
|
||||
doAssert(sq.lastSlot <= last, "Last slot could not be lower then stored one")
|
||||
sq.lastSlot = last
|
||||
|
||||
proc push*(sq: SyncQueue, sr: SyncRequest,
|
||||
data: seq[SignedBeaconBlock]) {.async.} =
|
||||
## Push successfull result to queue ``sq``.
|
||||
while true:
|
||||
if (sq.queueSize > 0) and (sr.slot > sq.inpSlot + uint64(sq.queueSize)):
|
||||
await sq.notFullEvent.wait()
|
||||
sq.notFullEvent.clear()
|
||||
continue
|
||||
let res = SyncResult(request: sr, data: data)
|
||||
sq.readyQueue.push(res)
|
||||
break
|
||||
|
||||
while len(sq.readyQueue) > 0:
|
||||
let minSlot = sq.readyQueue[0].request.slot
|
||||
if sq.outSlot != minSlot:
|
||||
break
|
||||
let item = sq.readyQueue.pop()
|
||||
if not(sq.syncUpdate(item.data)):
|
||||
sq.debtsQueue.push(item.request)
|
||||
sq.debtsCount = sq.debtsCount + item.request.count
|
||||
break
|
||||
sq.outSlot = sq.outSlot + item.request.count
|
||||
sq.notFullEvent.fire()
|
||||
|
||||
proc push*(sq: SyncQueue, sr: SyncRequest) =
|
||||
## Push failed request back to queue.
|
||||
sq.debtsQueue.push(sr)
|
||||
sq.debtsCount = sq.debtsCount + sr.count
|
||||
|
||||
proc push*(sq: SyncQueue, sr: SyncRequest, newstep: uint64) =
|
||||
## Push request with changed number of steps.
|
||||
doAssert(sr.step > newstep, "The new step should be less than the original")
|
||||
var count = sr.count
|
||||
var slot = sr.slot
|
||||
var newcount = 0'u64
|
||||
|
||||
for i in 0 ..< (sr.step div newstep):
|
||||
if newstep * sq.chunkSize <= count:
|
||||
newcount = newstep * sq.chunkSize
|
||||
else:
|
||||
newcount = count
|
||||
var newsr = SyncRequest(slot: slot, count: newcount, step: newstep)
|
||||
slot = slot + newcount
|
||||
count = count - newcount
|
||||
sq.debtsQueue.push(newsr)
|
||||
sq.debtsCount = sq.debtsCount + newsr.count
|
||||
if count == 0:
|
||||
break
|
||||
|
||||
if count > 0'u64:
|
||||
let step = sr.step mod newstep
|
||||
doAssert(step * sq.chunkSize <= count)
|
||||
var newsr = SyncRequest(slot: slot, count: count, step: step)
|
||||
sq.debtsQueue.push(newsr)
|
||||
sq.debtsCount = sq.debtsCount + newsr.count
|
||||
|
||||
proc pop*(sq: SyncQueue, step = 0'u64): SyncRequest =
|
||||
## Obtain request from queue ``sq``.
|
||||
if len(sq.debtsQueue) > 0:
|
||||
var sr = sq.debtsQueue.pop()
|
||||
if step != 0'u64:
|
||||
if sr.step > step:
|
||||
sq.push(sr, step)
|
||||
sr = sq.debtsQueue.pop()
|
||||
sq.debtsCount = sq.debtsCount - sr.count
|
||||
result = sr
|
||||
else:
|
||||
let nstep = if step == 0'u64: 1'u64 else: step
|
||||
if sq.inpSlot <= sq.lastSlot:
|
||||
let count = min(sq.lastSlot + 1'u64 - sq.inpSlot, sq.chunkSize * nstep)
|
||||
result = SyncRequest(slot: sq.inpSlot, count: count, step: nstep)
|
||||
sq.inpSlot = sq.inpSlot + count
|
||||
else:
|
||||
raise newException(ValueError, "Queue is already empty!")
|
||||
|
||||
proc len*(sq: SyncQueue): uint64 {.inline.} =
|
||||
## Returns number of slots left in queue ``sq``.
|
||||
if sq.inpSlot > sq.lastSlot:
|
||||
result = sq.debtsCount
|
||||
else:
|
||||
result = sq.lastSlot - sq.inpSlot + 1'u64 + sq.debtsCount
|
||||
|
||||
proc total*(sq: SyncQueue): uint64 {.inline.} =
|
||||
## Returns total number of slots in queue ``sq``.
|
||||
result = sq.lastSlot - sq.startSlot + 1'u64
|
||||
|
||||
proc progress*(sq: SyncQueue): string =
|
||||
## Returns queue's ``sq`` progress string.
|
||||
result = $len(sq) & "/" & $sq.total()
|
||||
|
||||
proc init*(t: typedesc[BlockList], start: Slot, count, step: uint64,
|
||||
list: openarray[SignedBeaconBlock]): Option[BlockList] =
|
||||
mixin getSlot
|
||||
var res: BlockList
|
||||
var error = false
|
||||
var current = start
|
||||
var index = 0
|
||||
|
||||
res.map = BitSeq.init(0)
|
||||
|
||||
for i in 0'u64 ..< count:
|
||||
if index < len(list):
|
||||
let slot = list[index].message.slot
|
||||
if slot < current:
|
||||
error = true
|
||||
break
|
||||
elif slot == current:
|
||||
res.map.add(true)
|
||||
inc(index)
|
||||
else:
|
||||
res.map.add(false)
|
||||
else:
|
||||
res.map.add(false)
|
||||
|
||||
let next = current + step
|
||||
current = current + 1'u64
|
||||
if i < (count - 1):
|
||||
while current < next:
|
||||
res.map.add(false)
|
||||
current = current + 1'u64
|
||||
|
||||
if not(error) and index == len(list):
|
||||
res.list = @list
|
||||
res.start = start
|
||||
result = some(res)
|
||||
|
||||
proc init*(t: typedesc[BlockList], start, finish: Slot): BlockList =
|
||||
result = BlockList(start: start)
|
||||
result.map = BitSeq.init(int((finish - start) + 1'u64))
|
||||
|
||||
proc `$`*(blist: BlockList): string =
|
||||
var index = 0
|
||||
for i in 0 ..< len(blist.map):
|
||||
if blist.map[i]:
|
||||
result = result & $blist.list[index].message.slot & ", "
|
||||
index = index + 1
|
||||
else:
|
||||
result = result & "<empty>, "
|
||||
if len(result) > 2:
|
||||
result.setLen(len(result) - 2)
|
||||
|
||||
proc startSlot*(blist: BlockList): Slot {.inline.} =
|
||||
result = blist.start
|
||||
|
||||
proc lastSlot*(blist: BlockList): Slot {.inline.} =
|
||||
doAssert(len(blist.map) > 0)
|
||||
result = blist.start + uint64(len(blist.map) - 1)
|
||||
|
||||
proc contains*(blist: BlockList, slot: Slot): bool {.inline.} =
|
||||
if (blist.startSlot() <= slot) and (slot <= blist.lastSlot()):
|
||||
result = true
|
||||
|
||||
proc merge*(optlists: varargs[Option[BlockList]]): Option[BlockList] =
|
||||
if len(optlists) > 0:
|
||||
if len(optlists) == 1:
|
||||
result = optlists[0]
|
||||
else:
|
||||
var blists = newSeq[BlockList](len(optlists))
|
||||
for i in 0 ..< len(optlists):
|
||||
doAssert(optlists[i].isSome()) # Must not be happens
|
||||
blists[i] = optlists[i].get()
|
||||
|
||||
var minSlot, maxSlot: Slot
|
||||
for i in 0 ..< len(blists):
|
||||
if i == 0:
|
||||
minSlot = blists[i].startSlot()
|
||||
maxSlot = blists[i].lastSlot()
|
||||
else:
|
||||
let candidateMinSlot = blists[i].startSlot()
|
||||
let candidateMaxSlot = blists[i].lastSlot()
|
||||
if candidateMinSlot < minSlot:
|
||||
minSlot = candidateMinSlot
|
||||
if candidateMaxSlot > maxSlot:
|
||||
maxSlot = candidateMaxSlot
|
||||
|
||||
var res = BlockList.init(minSlot, maxSlot)
|
||||
var slot = minSlot
|
||||
var indexes = newSeq[int](len(blists))
|
||||
var resIndex = 0
|
||||
while slot <= maxSlot:
|
||||
for i in 0 ..< len(blists):
|
||||
if blists[i].contains(slot):
|
||||
let slotIndex = slot - blists[i].startSlot()
|
||||
if blists[i].map[slotIndex]:
|
||||
res.map.setBit(resIndex)
|
||||
res.list.add(blists[i].list[indexes[i]])
|
||||
inc(indexes[i])
|
||||
inc(resIndex)
|
||||
slot = slot + 1'u64
|
||||
result = some(res)
|
||||
|
||||
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||
getLastLocalSlotCb: GetLastLocalSlotCallback,
|
||||
updateLocalBlocksCb: UpdateLocalBlocksCallback,
|
||||
peersInSlot = 3, peerSlotTimeout = 6.seconds,
|
||||
slotsInGroup = 2, peerGroupTimeout = 10.seconds,
|
||||
groupsCount = 10,
|
||||
statusPeriod = 10.minutes,
|
||||
failuresCount = 3,
|
||||
failurePause = 5.seconds): SyncManager[A, B] =
|
||||
## ``pool`` - PeerPool object which will be used as source of peers.
|
||||
##
|
||||
## ``peersInSlot`` - maximum number of peers in slot.
|
||||
##
|
||||
## ``peerSlotTimeout`` - timeout for PeerSlot.getBlocks() execution.
|
||||
##
|
||||
## ``slotsInGroup`` - maximum number of slots in group.
|
||||
##
|
||||
## ``peerGroupTimeout`` - timeout for PeerGroup.getBlocks() execution.
|
||||
##
|
||||
## ``groupsCount`` - maximum number of groups used in sync process.
|
||||
##
|
||||
## ``statusPeriod`` - period of time between status updates.
|
||||
##
|
||||
## ``getLastLocalSlotCb`` - function which provides current latest `Slot` in
|
||||
## local database.
|
||||
##
|
||||
## ``updateLocalBlocksCb`` - function which accepts list of downloaded blocks
|
||||
## and stores it to local database.
|
||||
##
|
||||
## ``failuresCount`` - number of consecutive failures, after which the
|
||||
## procedure will exit.
|
||||
##
|
||||
## ``failurePause`` - period of time which will be waited by sync manager, if
|
||||
## all the nodes could not satisfy requested slot.
|
||||
result = SyncManager[A, B](pool: pool, peersInSlot: peersInSlot,
|
||||
slotsInGroup: slotsInGroup,
|
||||
groupsCount: groupsCount,
|
||||
peerSlotTimeout: peerSlotTimeout,
|
||||
peerGroupTimeout: peerGroupTimeout,
|
||||
statusPeriod: statusPeriod,
|
||||
getLastLocalSlot: getLastLocalSlotCb,
|
||||
updateLocalBlocks: updateLocalBlocksCb,
|
||||
failuresCount: failuresCount,
|
||||
failurePause: failurePause)
|
||||
|
||||
template nearestOdd(number: int): int =
|
||||
number - ((number - 1) mod 2)
|
||||
|
||||
proc newPeerSlot*[A, B](man: SyncManager[A, B]): PeerSlot[A, B] =
|
||||
result = PeerSlot[A, B]()
|
||||
result.man = man
|
||||
result.peers = newSeq[A]()
|
||||
|
||||
proc `$`*[A, B](peerslot: PeerSlot[A, B]): string =
|
||||
## Returns string representation of peer's slot ``peerslot``.
|
||||
mixin getKey, getLastSlot
|
||||
if len(peerslot.peers) == 0:
|
||||
result = "<>"
|
||||
else:
|
||||
result = "<"
|
||||
for item in peerslot.peers:
|
||||
result.add("\"" & getKey(item) & "\"")
|
||||
result.add(":" & $getLastSlot(item))
|
||||
result.add(", ")
|
||||
result.setLen(len(result) - 2)
|
||||
result.add(">")
|
||||
|
||||
proc isFull*[A, B](peerslot: PeerSlot[A, B]): bool {.inline.} =
|
||||
## Returns ``true`` if peer's slot ``peerslot`` is full of peers.
|
||||
result = (len(peerslot.peers) == peerslot.man.peersInSlot)
|
||||
|
||||
proc isEmpty*[A, B](peerslot: PeerSlot[A, B]): bool {.inline.} =
|
||||
## Returns ``true`` if peer's slot ``peerslot`` is empty (out of peers).
|
||||
result = (len(peerslot.peers) == 0)
|
||||
|
||||
proc fillPeers*[A, B](slot: PeerSlot[A, B]) {.async.} =
|
||||
doAssert(slot.man.peersInSlot > 0 and
|
||||
(slot.man.peersInSlot mod 2 == 1))
|
||||
doAssert(len(slot.peers) == 0 or (len(slot.peers) mod 2 == 0))
|
||||
doAssert(len(slot.peers) <= slot.man.peersInSlot)
|
||||
if len(slot.peers) == 0:
|
||||
# This is new slot
|
||||
var peer = await slot.man.pool.acquire()
|
||||
let available = slot.man.pool.lenAvailable()
|
||||
slot.peers.add(peer)
|
||||
if available > 0:
|
||||
if available + len(slot.peers) < slot.man.peersInSlot:
|
||||
# There not enoug available peers in pool, so we add only some of them,
|
||||
# but we still want to keep number of peers in slot odd.
|
||||
let count = nearestOdd(available + len(slot.peers))
|
||||
if count > len(slot.peers):
|
||||
let peers = slot.man.pool.acquireNoWait(count - len(slot.peers))
|
||||
slot.peers.add(peers)
|
||||
else:
|
||||
# There enough peers to fill a slot.
|
||||
let peers = slot.man.pool.acquireNoWait(slot.man.peersInSlot -
|
||||
len(slot.peers))
|
||||
slot.peers.add(peers)
|
||||
else:
|
||||
# Only one peer obtained and there no more available peers, so we are
|
||||
# starting with just one peer.
|
||||
discard
|
||||
else:
|
||||
# If slot already has some peers, then we are not going to wait for peers,
|
||||
# we will consume everything available.
|
||||
if len(slot.peers) < slot.man.peersInSlot:
|
||||
# Slot do not have enough peers inside, we need to add missing peers.
|
||||
let available = slot.man.pool.lenAvailable()
|
||||
if available == 0:
|
||||
# There no peers available so we just exiting
|
||||
discard
|
||||
else:
|
||||
if available + len(slot.peers) < slot.man.peersInSlot:
|
||||
let count = nearestOdd(available + len(slot.peers))
|
||||
let peers = slot.man.pool.acquireNoWait(count - len(slot.peers))
|
||||
slot.peers.add(peers)
|
||||
else:
|
||||
let peers = slot.man.pool.acquireNoWait(slot.man.peersInSlot -
|
||||
len(slot.peers))
|
||||
slot.peers.add(peers)
|
||||
else:
|
||||
# Slot has enough peers inside, we do nothing here
|
||||
discard
|
||||
|
||||
proc newPeerGroup*[A, B](man: SyncManager[A, B]): PeerGroup[A, B] =
|
||||
result = PeerGroup[A, B]()
|
||||
result.man = man
|
||||
result.slots = newSeq[PeerSlot[A, B]]()
|
||||
|
||||
proc fillSlots*[A, B](group: PeerGroup[A, B]) {.async.} =
|
||||
## Filling peer's group ``group`` with peers from PeerPool.
|
||||
if len(group.slots) == 0:
|
||||
while len(group.slots) < group.man.slotsInGroup:
|
||||
var slot = newPeerSlot[A, B](group.man)
|
||||
await slot.fillPeers()
|
||||
doAssert(not(slot.isEmpty()))
|
||||
group.slots.add(slot)
|
||||
if not(slot.isFull()) or (group.man.pool.lenAvailable() == 0):
|
||||
break
|
||||
else:
|
||||
for i in 0 ..< group.man.slotsInGroup:
|
||||
if i < len(group.slots):
|
||||
if group.man.pool.lenAvailable() == 0:
|
||||
break
|
||||
# PeerPool is not empty, so this call will be finished immediately.
|
||||
await group.slots[i].fillPeers()
|
||||
if not(group.slots[i].isFull()):
|
||||
break
|
||||
else:
|
||||
if group.man.pool.lenAvailable() == 0:
|
||||
break
|
||||
var slot = newPeerSlot[A, B](group.man)
|
||||
# PeerPool is not empty, so this call will be finished immediately.
|
||||
await slot.fillPeers()
|
||||
doAssert(not(slot.isEmpty()))
|
||||
group.slots.add(slot)
|
||||
if not(slot.isFull()):
|
||||
break
|
||||
|
||||
proc isFull*[A, B](group: PeerGroup[A, B]): bool =
|
||||
result = false
|
||||
if len(group.slots) >= group.man.slotsInGroup:
|
||||
result = true
|
||||
for item in group.slots:
|
||||
if not(item.isFull()):
|
||||
result = false
|
||||
break
|
||||
|
||||
proc isEmpty*[A, B](group: PeerGroup[A, B]): bool =
|
||||
result = (len(group.slots) == 0)
|
||||
|
||||
proc `$`*[A, B](group: PeerGroup[A, B]): string =
|
||||
if len(group.slots) == 0:
|
||||
result = "[]"
|
||||
else:
|
||||
result = "["
|
||||
for item in group.slots:
|
||||
result.add($item)
|
||||
result.add(", ")
|
||||
result.setLen(len(result) - 2)
|
||||
result.add("]")
|
||||
|
||||
proc `$`*[A, B](man: SyncManager[A, B]): string =
|
||||
result = ""
|
||||
for i in 0 ..< man.groupsCount:
|
||||
result.add($i & ":")
|
||||
if i < len(man.groups):
|
||||
result.add($man.groups[i])
|
||||
else:
|
||||
result.add("[]")
|
||||
result.add(", ")
|
||||
|
||||
if len(result) > 0:
|
||||
result.setLen(len(result) - 2)
|
||||
|
||||
proc fillGroups*[A, B](man: SyncManager[A, B]) {.async.} =
|
||||
if len(man.groups) == 0:
|
||||
while len(man.groups) < man.groupsCount:
|
||||
var group = newPeerGroup[A, B](man)
|
||||
await group.fillSlots()
|
||||
doAssert(not(group.isEmpty()))
|
||||
man.groups.add(group)
|
||||
if not(group.isFull()) or (man.pool.lenAvailable() == 0):
|
||||
break
|
||||
else:
|
||||
for i in 0 ..< man.groupsCount:
|
||||
if i < len(man.groups):
|
||||
if man.pool.lenAvailable() == 0:
|
||||
break
|
||||
# PeerPool is not empty, so this call will be finished immediately.
|
||||
await man.groups[i].fillSlots()
|
||||
if not(man.groups[i].isFull()):
|
||||
break
|
||||
else:
|
||||
if man.pool.lenAvailable() == 0:
|
||||
break
|
||||
var group = newPeerGroup[A, B](man)
|
||||
# PeerPool is not empty, so this call will be finished immediately.
|
||||
await group.fillSlots()
|
||||
doAssert(not(group.isEmpty()))
|
||||
man.groups.add(group)
|
||||
if not(group.isFull()):
|
||||
break
|
||||
|
||||
proc compactGroups*[A, B](man: SyncManager[A, B]) =
|
||||
## Removes empty slots from SyncManager's groups list.
|
||||
var ngroups = newSeq[PeerGroup[A, B]]()
|
||||
for i in 0 ..< len(man.groups):
|
||||
if not(man.groups[i].isEmpty()):
|
||||
ngroups.add(man.groups[i])
|
||||
man.groups = ngroups
|
||||
|
||||
proc isFull*[A, B](man: SyncManager[A, B]): bool =
|
||||
result = false
|
||||
if len(man.groups) >= man.groupsCount:
|
||||
result = true
|
||||
for item in man.groups:
|
||||
if not(item.isFull()):
|
||||
result = false
|
||||
break
|
||||
|
||||
proc isEmpty*[A, B](man: SyncManager[A, B]): bool =
|
||||
result = (len(man.groups) == 0)
|
||||
|
||||
proc reorderGroups*[A, B](man: SyncManager[A, B]) =
|
||||
mixin getLastSlot
|
||||
doAssert(not(man.isEmpty()))
|
||||
|
||||
var x, y, z: int
|
||||
for i0 in 0 ..< len(man.groups):
|
||||
let group0 = man.groups[i0]
|
||||
for j0 in 0 ..< len(group0.slots):
|
||||
let slot0 = group0.slots[j0]
|
||||
for k0 in 0 ..< len(slot0.peers):
|
||||
var curSlot = getLastSlot(slot0.peers[k0])
|
||||
x = -1; y = -1; z = -1
|
||||
|
||||
for i1 in i0 ..< len(man.groups):
|
||||
let group1 = man.groups[i1]
|
||||
for j1 in j0 ..< len(group1.slots):
|
||||
let slot1 = group1.slots[j1]
|
||||
let start = if (i1 == i0) and (j1 == j0): k0 + 1 else: 0
|
||||
for k1 in start ..< len(slot1.peers):
|
||||
let newSlot = getLastSlot(slot1.peers[k1])
|
||||
if curSlot < newSlot:
|
||||
curSlot = newSlot
|
||||
x = i1; y = j1; z = k1
|
||||
|
||||
if x >= 0:
|
||||
swap(man.groups[i0].slots[j0].peers[k0],
|
||||
man.groups[x].slots[y].peers[z])
|
||||
|
||||
proc disband*[A, B](peerslot: PeerSlot[A, B]) =
|
||||
## Releases all the peers back to the PeerPool, and make ``peerslot`` empty.
|
||||
for peer in peerslot.peers:
|
||||
peerslot.man.pool.release(peer)
|
||||
peerslot.peers.setLen(0)
|
||||
|
||||
proc disband*[A, B](peergroup: PeerGroup[A, B]) =
|
||||
## Releases all the slots back to the PeerPool, and make ``peergroup`` empty.
|
||||
for slot in peergroup.slots:
|
||||
disband(slot)
|
||||
peergroup.slots.setLen(0)
|
||||
|
||||
proc disband*[A, B](syncman: SyncManager[A, B]) =
|
||||
## Releases all the groups to the PeerPool, and make SyncManager peer groups
|
||||
## empty.
|
||||
for group in syncman.groups:
|
||||
disband(group)
|
||||
syncman.groups.setLen(0)
|
||||
|
||||
proc getLastSlot*[A, B](peerslot: PeerSlot[A, B]): Slot =
|
||||
## Returns minimal available beacon chain slot, for peer's slot ``peerslot``.
|
||||
mixin getLastSlot
|
||||
doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero")
|
||||
for i in 0 ..< len(peerslot.peers):
|
||||
if i == 0:
|
||||
result = getLastSlot(peerslot.peers[i])
|
||||
else:
|
||||
let slot = getLastSlot(peerslot.peers[i])
|
||||
if slot < result:
|
||||
result = slot
|
||||
|
||||
proc getLastSlot*[A, B](peergroup: PeerGroup[A, B]): Slot =
|
||||
## Returns minimal available beacon chain slot, for peer's group
|
||||
## ``peergroup``.
|
||||
doAssert(len(peergroup.slots) > 0,
|
||||
"Number of slots in group must not be zero")
|
||||
for i in 0 ..< len(peergroup.slots):
|
||||
if i == 0:
|
||||
result = getLastSlot(peergroup.slots[i])
|
||||
else:
|
||||
let slot = getLastSlot(peergroup.slots[i])
|
||||
if slot < result:
|
||||
result = slot
|
||||
|
||||
proc getLastSlot*[A, B](sman: SyncManager[A, B]): Slot =
|
||||
## Returns minimal available beacon chain slot, for all peers in sync manager
|
||||
## ``sman``.
|
||||
for i in 0 ..< len(sman.groups):
|
||||
if i == 0:
|
||||
result = getLastSlot(sman.groups[i])
|
||||
else:
|
||||
let slot = getLastSlot(sman.groups[i])
|
||||
if slot < result:
|
||||
result = slot
|
||||
|
||||
proc getBlocks*[A, B](peerslot: PeerSlot[A, B], slot: Slot, count: uint64,
|
||||
step: uint64): Future[Option[BlockList]] {.async.} =
|
||||
mixin getBeaconBlocksByRange, getHeadRoot, `==`
|
||||
doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero")
|
||||
var pending = newSeq[Future[OptionBeaconBlockSeq]](len(peerslot.peers))
|
||||
var allFut, timeFut: Future[void]
|
||||
try:
|
||||
for i in 0 ..< len(peerslot.peers):
|
||||
let root = getHeadRoot(peerslot.peers[i])
|
||||
pending[i] = getBeaconBlocksByRange(peerslot.peers[i], root, slot, count,
|
||||
step)
|
||||
|
||||
allFut = allFutures(pending)
|
||||
if peerslot.man.peerSlotTimeout == InfiniteDuration:
|
||||
timeFut = newFuture[void]()
|
||||
else:
|
||||
timeFut = sleepAsync(peerslot.man.peerSlotTimeout)
|
||||
|
||||
discard await one(allFut, timeFut)
|
||||
# We do not care about who finished first, because we are waiting for all
|
||||
# peers it can happens that some peers returned data, and some are not.
|
||||
var results = newSeq[seq[SignedBeaconBlock]]()
|
||||
for i in 0 ..< len(pending):
|
||||
if pending[i].finished() and
|
||||
not(pending[i].failed()) and not(pending[i].cancelled()):
|
||||
var fdata = pending[i].read()
|
||||
if fdata.isSome():
|
||||
results.add(fdata.get())
|
||||
else:
|
||||
# remote peer did not returns any data
|
||||
discard
|
||||
else:
|
||||
# getBeaconBlocksByRange() returns failure
|
||||
discard
|
||||
|
||||
if len(results) > 0:
|
||||
var m: seq[SignedBeaconBlock]
|
||||
var i = 0
|
||||
if len(results) > (len(peerslot.peers) div 2):
|
||||
# Now we going to obtain major sequence of blocks by using
|
||||
# Boyer–Moore majority vote algorithm.
|
||||
for x in results:
|
||||
if i == 0:
|
||||
m = x
|
||||
i = 1
|
||||
elif m == x:
|
||||
i = i + 1
|
||||
else:
|
||||
i = i - 1
|
||||
i = 0
|
||||
for x in results:
|
||||
if m == x:
|
||||
i = i + 1
|
||||
if i > (len(peerslot.peers) div 2):
|
||||
# Major sequence of blocks found, so we going to return such result
|
||||
# and penalize all the peers which returned different sequences of
|
||||
# blocks.
|
||||
for i in 0 ..< len(pending):
|
||||
if pending[i].finished() and
|
||||
not(pending[i].failed()) and not(pending[i].cancelled()):
|
||||
let fdata = pending[i].read()
|
||||
if fdata.isSome():
|
||||
if fdata.get() != m:
|
||||
# peer returned data which is not major
|
||||
discard
|
||||
result = BlockList.init(slot, count, step, m)
|
||||
else:
|
||||
# Major sequence could not be found, so we going to penalize all the
|
||||
# peers.
|
||||
discard
|
||||
else:
|
||||
# Timeout exceeded while we waiting data from peers, or peers returned
|
||||
# an error.
|
||||
discard
|
||||
except CancelledError as exc:
|
||||
if not allFut.finished:
|
||||
allFut.cancel()
|
||||
if not timeFut.finished:
|
||||
timeFut.cancel()
|
||||
for i in 0 ..< len(peerslot.peers):
|
||||
if not pending[i].finished:
|
||||
pending[i].cancel()
|
||||
raise exc
|
||||
|
||||
proc getParams*[T](peerslots: int, index: int, slot: T,
|
||||
count: uint64): tuple[start: T, count: uint64, step: uint64] =
|
||||
mixin `+`
|
||||
doAssert(peerslots > 0, "Number of peerslots must not be zero")
|
||||
doAssert(count > 0'u64, "Number of requested blocks must not be zero")
|
||||
doAssert(index < peerslots, "Peer slot index must be lower then slots count")
|
||||
result.start = slot + uint64(index)
|
||||
let more = if uint64(index) < (count mod uint64(peerslots)): 1'u64 else: 0'u64
|
||||
result.count = (count div uint64(peerslots)) + more
|
||||
result.step = uint64(peerslots)
|
||||
|
||||
proc getBlocks*[A, B](peergroup: PeerGroup[A, B], slot: Slot,
|
||||
count: uint64): Future[Option[BlockList]] {.async.} =
|
||||
doAssert(len(peergroup.slots) > 0,
|
||||
"Number of slots in group must not be zero")
|
||||
doAssert(count > 0'u64)
|
||||
let slotsCount = len(peergroup.slots)
|
||||
var
|
||||
params = newSeq[tuple[start: Slot, count: uint64, step: uint64]](slotsCount)
|
||||
results = newSeq[Option[BlockList]](slotsCount)
|
||||
pending = newSeq[Future[OptionBlockList]]()
|
||||
requests = newSeq[tuple[slot: int, param: int]]()
|
||||
failures = newSeq[int]()
|
||||
|
||||
var allFut, timeFut: Future[void]
|
||||
try:
|
||||
for i in 0 ..< slotsCount:
|
||||
params[i] = getParams(slotsCount, i, slot, count)
|
||||
requests.add((slot: i, param: i))
|
||||
pending.add(getBlocks(peergroup.slots[i], params[i].start,
|
||||
params[i].count, params[i].step))
|
||||
|
||||
if peergroup.man.peerGroupTimeout == InfiniteDuration:
|
||||
timeFut = newFuture[void]()
|
||||
else:
|
||||
timeFut = sleepAsync(peergroup.man.peerGroupTimeout)
|
||||
|
||||
while true:
|
||||
allFut = allFutures(pending)
|
||||
if not timeFut.finished():
|
||||
discard await one(allFut, timeFut)
|
||||
# We do not care about who finished first, because we are waiting for
|
||||
# all slots and it can happens that some slots returned data, and some
|
||||
# are not.
|
||||
for i in 0 ..< len(pending):
|
||||
let slotIndex = requests[i].slot
|
||||
let resIndex = requests[i].param
|
||||
if pending[i].finished() and
|
||||
not(pending[i].failed()) and not(pending[i].cancelled()):
|
||||
results[resIndex] = pending[i].read()
|
||||
if results[resIndex].isNone():
|
||||
failures.add(slotIndex)
|
||||
else:
|
||||
failures.add(slotIndex)
|
||||
|
||||
if len(failures) == len(peergroup.slots):
|
||||
# All the slots in group are failed to download blocks.
|
||||
peergroup.disband()
|
||||
break
|
||||
else:
|
||||
pending.setLen(0)
|
||||
requests.setLen(0)
|
||||
|
||||
var missing = 0
|
||||
for i in 0 ..< len(results):
|
||||
if results[i].isNone():
|
||||
inc(missing)
|
||||
|
||||
if missing > 0:
|
||||
for k in 0 ..< len(peergroup.slots):
|
||||
if (missing > 0) and (k notin failures):
|
||||
for i in 0 ..< len(results):
|
||||
if results[i].isNone():
|
||||
requests.add((slot: k, param: i))
|
||||
pending.add(getBlocks(peergroup.slots[k], params[i].start,
|
||||
params[i].count, params[i].step))
|
||||
break
|
||||
dec(missing)
|
||||
else:
|
||||
# All the blocks downloaded.
|
||||
if len(failures) > 0:
|
||||
var slots = newSeq[PeerSlot[A, B]]()
|
||||
for i in 0 ..< len(peergroup.slots):
|
||||
if i notin failures:
|
||||
slots.add(peergroup.slots[i])
|
||||
else:
|
||||
disband(peergroup.slots[i])
|
||||
peergroup.slots = slots
|
||||
result = merge(results)
|
||||
break
|
||||
|
||||
except CancelledError as exc:
|
||||
if not allFut.finished:
|
||||
allFut.cancel()
|
||||
if not timeFut.finished:
|
||||
timeFut.cancel()
|
||||
for i in 0 ..< len(peergroup.slots):
|
||||
if not pending[i].finished:
|
||||
pending[i].cancel()
|
||||
raise exc
|
||||
|
||||
proc updateStatus*[A, B](peerslot: PeerSlot[A, B]) {.async.} =
|
||||
mixin updateStatus
|
||||
doAssert(len(peerslot.peers) > 0, "Number of peers in slot must not be zero")
|
||||
let peersCount = len(peerslot.peers)
|
||||
var pending = newSeq[Future[void]](peersCount)
|
||||
var failed = newSeq[int]()
|
||||
var allFut, timeFut: Future[void]
|
||||
|
||||
try:
|
||||
for i in 0 ..< peersCount:
|
||||
pending.add(updateStatus(peerslot.peers[i]))
|
||||
|
||||
if peerslot.man.peerSlotTimeout == InfiniteDuration:
|
||||
timeFut = newFuture[void]()
|
||||
else:
|
||||
timeFut = sleepAsync(peerslot.man.peerSlotTimeout)
|
||||
|
||||
allFut = allFutures(pending)
|
||||
discard await one(allFut, timeFut)
|
||||
for i in 0 ..< len(pending):
|
||||
if pending[i].finished() and pending[i].failed():
|
||||
failed.add(i)
|
||||
|
||||
if len(failed) > 0:
|
||||
for index in failed:
|
||||
peerslot.man.pool.release(peerslot.peers[index])
|
||||
peerslot.peers.del(index)
|
||||
|
||||
except CancelledError as exc:
|
||||
if not allFut.finished:
|
||||
allFut.cancel()
|
||||
if not timeFut.finished:
|
||||
timeFut.cancel()
|
||||
for i in 0 ..< peersCount:
|
||||
if not pending[i].finished:
|
||||
pending[i].cancel()
|
||||
raise exc
|
||||
|
||||
proc updateStatus*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||
var pending = newSeq[Future[void]]()
|
||||
try:
|
||||
for i in 0 ..< len(sman.groups):
|
||||
for k in 0 ..< len(sman.groups[i].slots):
|
||||
pending.add(updateStatus(sman.groups[i].slots[k]))
|
||||
await allFutures(pending)
|
||||
except CancelledError as exc:
|
||||
for i in 0 ..< len(pending):
|
||||
if not pending[i].finished:
|
||||
pending[i].cancel()
|
||||
raise exc
|
||||
|
||||
proc synchronize*[A, B](sman: SyncManager[A, B]) {.async.} =
|
||||
var
|
||||
squeue: SyncQueue
|
||||
remoteLastKnownSlot: Slot
|
||||
localLastSlot: Slot = sman.getLastLocalSlot()
|
||||
pending = newSeq[Future[OptionBlockList]]()
|
||||
requests = newSeq[SyncRequest]()
|
||||
checkMoment = Moment.now()
|
||||
errorsCount = 0
|
||||
counter = 0'u64
|
||||
|
||||
squeue = SyncQueue.init(localLastSlot + 1'u64, localLastSlot + 2'u64,
|
||||
MAX_REQUESTED_BLOCKS, sman.updateLocalBlocks,
|
||||
sman.groupsCount)
|
||||
while true:
|
||||
if errorsCount == sman.failuresCount:
|
||||
# Number of consecutive errors exceeds limit
|
||||
break
|
||||
|
||||
pending.setLen(0)
|
||||
requests.setLen(0)
|
||||
|
||||
await sman.fillGroups()
|
||||
sman.reorderGroups()
|
||||
|
||||
var localLastSlot = sman.getLastLocalSlot()
|
||||
let remoteLastSlot = sman.getLastSlot()
|
||||
if remoteLastSlot > remoteLastKnownSlot:
|
||||
remoteLastKnownSlot = remoteLastSlot
|
||||
squeue.updateLastSlot(remoteLastKnownSlot)
|
||||
|
||||
if localLastSlot >= remoteLastKnownSlot:
|
||||
info "Synchronization successfully finished"
|
||||
break
|
||||
else:
|
||||
# if counter == 0:
|
||||
# info "Starting synchronization", local_slot = localLastSlot,
|
||||
# remote_slot = remoteLastKnownSlot,
|
||||
# count = len(squeue)
|
||||
# else:
|
||||
# info "Synchronization progress", progress = squeue.progress()
|
||||
discard
|
||||
|
||||
counter = counter + 1'u64
|
||||
|
||||
for i in countdown(len(sman.groups) - 1, 0):
|
||||
if len(squeue) == 0:
|
||||
break
|
||||
var req = squeue.pop(uint64(len(sman.groups[i].slots)))
|
||||
if sman.groups[i].getLastSlot() >= req.lastSlot():
|
||||
req.group = i
|
||||
pending.add(getBlocks(sman.groups[i], req.slot, req.count))
|
||||
requests.add(req)
|
||||
# trace "Send request to a group of peers", group = i
|
||||
else:
|
||||
squeue.push(req)
|
||||
|
||||
if len(pending) == 0:
|
||||
# All the peer groups do not satisfy slot requirements
|
||||
# Disbanding all the peers
|
||||
sman.disband()
|
||||
await sleepAsync(sman.failurePause)
|
||||
inc(errorsCount)
|
||||
continue
|
||||
else:
|
||||
errorsCount = 0
|
||||
|
||||
# TODO: If getBeaconBlocksByRange() will properly support cancellation,
|
||||
# then this can be done more efficiently at the end, so you do not need
|
||||
# to wait for all futures here.
|
||||
await allFutures(pending)
|
||||
|
||||
var failedCount = 0
|
||||
for i in 0 ..< len(pending):
|
||||
if pending[i].finished() and not(pending[i].failed()):
|
||||
let res = pending[i].read()
|
||||
if res.isSome():
|
||||
# trace "Peer's group successfully delivered data"
|
||||
await squeue.push(requests[i], res.get().list)
|
||||
else:
|
||||
inc(failedCount)
|
||||
# trace "Peer's group failed to deliver data"
|
||||
squeue.push(requests[i])
|
||||
sman.groups[requests[i].group].disband()
|
||||
|
||||
else:
|
||||
inc(failedCount)
|
||||
# trace "Peer's group failed to deliver data"
|
||||
squeue.push(requests[i])
|
||||
sman.groups[requests[i].group].disband()
|
||||
|
||||
if failedCount == len(pending):
|
||||
# All the peer groups failed to download requests.
|
||||
await sleepAsync(sman.failurePause)
|
||||
inc(errorsCount)
|
||||
continue
|
||||
else:
|
||||
errorsCount = 0
|
||||
|
||||
sman.compactGroups()
|
||||
|
||||
# if `statusPeriod` time passed, we are updating peers status.
|
||||
let stamp = Moment.now()
|
||||
if stamp - checkMoment > sman.statusPeriod:
|
||||
checkMoment = stamp
|
||||
await sman.updateStatus()
|
|
@ -0,0 +1,862 @@
|
|||
import options, hashes, unittest
|
||||
import chronos
|
||||
import ../beacon_chain/peer_pool, ../beacon_chain/sync_manager
|
||||
|
||||
type
|
||||
PeerRequest = object
|
||||
headRoot: Eth2Digest
|
||||
startSlot: Slot
|
||||
count: uint64
|
||||
step: uint64
|
||||
data: seq[Slot]
|
||||
|
||||
SimplePeerKey = string
|
||||
|
||||
SimplePeer = ref object
|
||||
id: SimplePeerKey
|
||||
weight: int
|
||||
lifu: Future[void]
|
||||
blockchain: seq[SignedBeaconBlock]
|
||||
latestSlot: Slot
|
||||
delay: Duration
|
||||
malicious: bool
|
||||
failure: bool
|
||||
disconnect: bool
|
||||
requests: seq[PeerRequest]
|
||||
|
||||
proc getKey*(peer: SimplePeer): SimplePeerKey =
|
||||
result = peer.id
|
||||
|
||||
proc getFuture*(peer: SimplePeer): Future[void] =
|
||||
result = peer.lifu
|
||||
|
||||
proc `<`*(a, b: SimplePeer): bool =
|
||||
result = `<`(a.weight, b.weight)
|
||||
|
||||
proc getLastSlot*(peer: SimplePeer): Slot =
|
||||
if len(peer.blockchain) == 0:
|
||||
result = peer.latestSlot
|
||||
else:
|
||||
result = peer.blockchain[len(peer.blockchain) - 1].message.slot
|
||||
|
||||
proc init*(t: typedesc[SimplePeer], id: string = "", malicious = false,
|
||||
weight: int = 0, slot: int = 0,
|
||||
delay: Duration = ZeroDuration): SimplePeer =
|
||||
result = SimplePeer(id: id, weight: weight, lifu: newFuture[void](),
|
||||
delay: delay, latestSlot: Slot(slot),
|
||||
malicious: malicious)
|
||||
|
||||
proc update*(peer: SimplePeer, chain: openarray[SignedBeaconBlock],
|
||||
malicious = false, failure = false, disconnect = false,
|
||||
delay: Duration = ZeroDuration) =
|
||||
peer.malicious = malicious
|
||||
peer.delay = delay
|
||||
peer.failure = failure
|
||||
peer.disconnect = disconnect
|
||||
peer.blockchain.setLen(0)
|
||||
for item in chain:
|
||||
peer.blockchain.add(item)
|
||||
|
||||
proc close*(peer: SimplePeer) =
|
||||
peer.lifu.complete()
|
||||
|
||||
proc getHeadRoot*(peer: SimplePeer): Eth2Digest =
|
||||
discard
|
||||
|
||||
proc updateStatus*[A](peer: A): Future[void] =
|
||||
var res = newFuture[void]("updateStatus")
|
||||
res.complete()
|
||||
return res
|
||||
|
||||
proc getBeaconBlocksByRange*[A](peer: A, headRoot: Eth2Digest, startSlot: Slot,
|
||||
count: uint64,
|
||||
step: uint64): Future[OptionBeaconBlockSeq] {.async.} =
|
||||
var req = PeerRequest(headRoot: headRoot, startSlot: startSlot, count: count,
|
||||
step: step)
|
||||
var res = newSeq[SignedBeaconBlock]()
|
||||
var reqres = newSeq[Slot]()
|
||||
if peer.delay != ZeroDuration:
|
||||
await sleepAsync(peer.delay)
|
||||
|
||||
var counter = 0'u64
|
||||
|
||||
if peer.failure:
|
||||
peer.requests.add(req)
|
||||
if peer.disconnect:
|
||||
peer.close()
|
||||
raise newException(SyncManagerError, "Error")
|
||||
|
||||
if peer.malicious:
|
||||
var index = 0
|
||||
while counter < count:
|
||||
if index < len(peer.blockchain):
|
||||
res.add(peer.blockchain[index])
|
||||
reqres.add(peer.blockchain[index].message.slot)
|
||||
else:
|
||||
break
|
||||
index = index + int(step)
|
||||
counter = counter + 1'u64
|
||||
req.data = reqres
|
||||
peer.requests.add(req)
|
||||
result = some(res)
|
||||
else:
|
||||
var index = -1
|
||||
for i in 0 ..< len(peer.blockchain):
|
||||
if peer.blockchain[i].message.slot == startSlot:
|
||||
index = i
|
||||
break
|
||||
|
||||
if index >= 0:
|
||||
while counter < count:
|
||||
if index < len(peer.blockchain):
|
||||
res.add(peer.blockchain[index])
|
||||
reqres.add(peer.blockchain[index].message.slot)
|
||||
else:
|
||||
break
|
||||
index = index + int(step)
|
||||
counter = counter + 1'u64
|
||||
req.data = reqres
|
||||
result = some(res)
|
||||
peer.requests.add(req)
|
||||
|
||||
proc newTempChain*(number: int, start: Slot): seq[SignedBeaconBlock] =
|
||||
result = newSeq[SignedBeaconBlock](number)
|
||||
for i in 0 ..< number:
|
||||
result[i].message.slot = start + uint64(i)
|
||||
|
||||
proc `==`*(a1, a2: SignedBeaconBlock): bool {.inline.} =
|
||||
result = (a1.message.slot == a2.message.slot) and
|
||||
(a1.message.parent_root == a2.message.parent_root) and
|
||||
(a1.message.state_root == a2.message.state_root)
|
||||
|
||||
proc peerSlotTests(): Future[bool] {.async.} =
|
||||
# slot0: 3 ok
|
||||
# slot1: 2 ok 1 timeout
|
||||
# slot2: 1 ok 2 timeout
|
||||
# slot3: 2 ok 1 bad
|
||||
# slot4: 1 ok 2 bad
|
||||
# slot5: 2 ok 1 failure
|
||||
# slot6: 1 ok 2 failure
|
||||
# slot7: 1 ok 1 bad 1 failure
|
||||
# slot8: 1 bad 1 timeout 1 failure
|
||||
# slot9: 3 bad
|
||||
# slot10: 3 timeout
|
||||
# slot11: 3 failure
|
||||
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, nil, nil,
|
||||
peersInSlot = 3,
|
||||
peerSlotTimeout = 1.seconds,
|
||||
slotsInGroup = 6)
|
||||
|
||||
var chain1 = newTempChain(10, Slot(10000))
|
||||
var chain2 = newTempChain(10, Slot(11000))
|
||||
|
||||
var peers = newSeq[SimplePeer]()
|
||||
for i in 0 ..< 36:
|
||||
var peer = SimplePeer.init("id" & $i)
|
||||
peers.add(peer)
|
||||
|
||||
peers[0].update(chain1)
|
||||
peers[1].update(chain1)
|
||||
peers[2].update(chain1)
|
||||
|
||||
peers[3].update(chain1)
|
||||
peers[4].update(chain1, delay = 2.seconds)
|
||||
peers[5].update(chain1)
|
||||
|
||||
peers[6].update(chain1)
|
||||
peers[7].update(chain1, delay = 2.seconds)
|
||||
peers[8].update(chain1, delay = 2.seconds)
|
||||
|
||||
peers[9].update(chain1)
|
||||
peers[10].update(chain1)
|
||||
peers[11].update(chain2, malicious = true)
|
||||
|
||||
peers[12].update(chain1)
|
||||
peers[13].update(chain2, malicious = true)
|
||||
peers[14].update(chain2, malicious = true)
|
||||
|
||||
peers[15].update(chain1)
|
||||
peers[16].update(chain1)
|
||||
peers[17].update(chain1, failure = true)
|
||||
|
||||
peers[18].update(chain1)
|
||||
peers[19].update(chain1, failure = true)
|
||||
peers[20].update(chain1, failure = true)
|
||||
|
||||
peers[21].update(chain1)
|
||||
peers[22].update(chain2, malicious = true)
|
||||
peers[23].update(chain1, failure = true)
|
||||
|
||||
peers[24].update(chain2, malicious = true)
|
||||
peers[25].update(chain1, failure = true)
|
||||
peers[26].update(chain1, delay = 2.seconds)
|
||||
|
||||
peers[27].update(chain2, malicious = true)
|
||||
peers[28].update(chain2, malicious = true)
|
||||
peers[29].update(chain2, malicious = true)
|
||||
|
||||
peers[30].update(chain1, delay = 2.seconds)
|
||||
peers[31].update(chain1, delay = 2.seconds)
|
||||
peers[32].update(chain1, delay = 2.seconds)
|
||||
|
||||
peers[33].update(chain1, failure = true)
|
||||
peers[34].update(chain1, failure = true)
|
||||
peers[35].update(chain1, failure = true)
|
||||
|
||||
var slot0 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot0.peers = @[peers[0], peers[1], peers[2]]
|
||||
|
||||
var slot1 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot1.peers = @[peers[3], peers[4], peers[5]]
|
||||
|
||||
var slot2 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot2.peers = @[peers[6], peers[7], peers[8]]
|
||||
|
||||
var slot3 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot3.peers = @[peers[9], peers[10], peers[11]]
|
||||
|
||||
var slot4 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot4.peers = @[peers[12], peers[13], peers[14]]
|
||||
|
||||
var slot5 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot5.peers = @[peers[15], peers[16], peers[17]]
|
||||
|
||||
var slot6 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot6.peers = @[peers[18], peers[19], peers[20]]
|
||||
|
||||
var slot7 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot7.peers = @[peers[21], peers[22], peers[23]]
|
||||
|
||||
var slot8 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot8.peers = @[peers[24], peers[25], peers[26]]
|
||||
|
||||
var slot9 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot9.peers = @[peers[27], peers[28], peers[29]]
|
||||
|
||||
var slot10 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot10.peers = @[peers[30], peers[31], peers[32]]
|
||||
|
||||
var slot11 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot11.peers = @[peers[33], peers[34], peers[35]]
|
||||
|
||||
var s0 = await slot0.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s1 = await slot1.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s2 = await slot2.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s3 = await slot3.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s4 = await slot4.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s5 = await slot5.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s6 = await slot6.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s7 = await slot7.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s8 = await slot8.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s9 = await slot9.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s10 = await slot10.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
var s11 = await slot11.getBlocks(Slot(10000), 10'u64, 1'u64)
|
||||
|
||||
var expected = BlockList.init(Slot(10000), 10'u64, 1'u64, chain1).get()
|
||||
|
||||
doAssert(s0.isSome())
|
||||
doAssert(s1.isSome())
|
||||
doAssert(s2.isNone())
|
||||
doAssert(s3.isSome())
|
||||
doAssert(s4.isNone())
|
||||
doAssert(s5.isSome())
|
||||
doAssert(s6.isNone())
|
||||
doAssert(s7.isNone())
|
||||
doAssert(s8.isNone())
|
||||
doAssert(s9.isNone())
|
||||
doAssert(s10.isNone())
|
||||
doAssert(s11.isNone())
|
||||
doAssert($s0.get() == $expected)
|
||||
doAssert($s1.get() == $expected)
|
||||
doAssert($s3.get() == $expected)
|
||||
doAssert($s5.get() == $expected)
|
||||
|
||||
result = true
|
||||
|
||||
proc peerGroupTests(): Future[bool] {.async.} =
|
||||
# group0: 3 ok
|
||||
# group1: 2 ok 1 bad
|
||||
# group2: 1 ok 2 bad
|
||||
# group3: 3 bad
|
||||
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, nil, nil,
|
||||
peersInSlot = 3,
|
||||
peerSlotTimeout = 1.seconds,
|
||||
slotsInGroup = 6)
|
||||
|
||||
var chain1 = newTempChain(10, Slot(10000))
|
||||
var chain2 = newTempChain(10, Slot(11000))
|
||||
|
||||
var peers = newSeq[SimplePeer]()
|
||||
for i in 0 ..< 18:
|
||||
var peer = SimplePeer.init("id" & $i)
|
||||
peers.add(peer)
|
||||
|
||||
proc cleanup() =
|
||||
for i in 0 ..< 18:
|
||||
peers[i].requests.setLen(0)
|
||||
|
||||
peers[0].update(chain1)
|
||||
peers[1].update(chain1)
|
||||
peers[2].update(chain1)
|
||||
|
||||
peers[3].update(chain1)
|
||||
peers[4].update(chain1)
|
||||
peers[5].update(chain1)
|
||||
|
||||
peers[6].update(chain1)
|
||||
peers[7].update(chain1)
|
||||
peers[8].update(chain1)
|
||||
|
||||
peers[9].update(chain1)
|
||||
peers[10].update(chain2, malicious = true)
|
||||
peers[11].update(chain2, malicious = true)
|
||||
|
||||
peers[12].update(chain1, delay = 2.seconds)
|
||||
peers[13].update(chain1, delay = 2.seconds)
|
||||
peers[14].update(chain1, delay = 2.seconds)
|
||||
|
||||
peers[15].update(chain1, failure = true)
|
||||
peers[16].update(chain1, failure = true)
|
||||
peers[17].update(chain1, failure = true)
|
||||
|
||||
var slot0 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot0.peers = @[peers[0], peers[1], peers[2]]
|
||||
var slot1 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot1.peers = @[peers[3], peers[4], peers[5]]
|
||||
var slot2 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot2.peers = @[peers[6], peers[7], peers[8]]
|
||||
var slot3 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot3.peers = @[peers[9], peers[10], peers[11]]
|
||||
var slot4 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot4.peers = @[peers[12], peers[13], peers[14]]
|
||||
var slot5 = newPeerSlot[SimplePeer, SimplePeerKey](sman)
|
||||
slot5.peers = @[peers[15], peers[16], peers[17]]
|
||||
|
||||
var group0 = newPeerGroup(sman)
|
||||
group0.slots = @[slot0, slot1, slot2]
|
||||
var group1 = newPeerGroup(sman)
|
||||
group1.slots = @[slot0, slot1, slot3]
|
||||
var group2 = newPeerGroup(sman)
|
||||
group2.slots = @[slot0, slot3, slot4]
|
||||
var group3 = newPeerGroup(sman)
|
||||
group3.slots = @[slot3, slot4, slot5]
|
||||
|
||||
var s0 = await group0.getBlocks(Slot(10000), 10'u64)
|
||||
cleanup()
|
||||
var s1 = await group1.getBlocks(Slot(10000), 10'u64)
|
||||
cleanup()
|
||||
var s2 = await group2.getBlocks(Slot(10000), 10'u64)
|
||||
cleanup()
|
||||
var s3 = await group3.getBlocks(Slot(10000), 10'u64)
|
||||
cleanup()
|
||||
|
||||
var expected = BlockList.init(Slot(10000), 10'u64, 1'u64, chain1).get()
|
||||
|
||||
doAssert(s0.isSome())
|
||||
doAssert(s1.isSome())
|
||||
doAssert(s2.isSome())
|
||||
doAssert(s3.isNone())
|
||||
|
||||
doAssert($s0.get() == $expected)
|
||||
doAssert($s1.get() == $expected)
|
||||
doAssert($s2.get() == $expected)
|
||||
|
||||
result = true
|
||||
|
||||
proc syncQueueNonAsyncTests(): bool =
|
||||
var q1 = SyncQueue.init(Slot(0), Slot(0), 1'u64, nil)
|
||||
doAssert(len(q1) == 1)
|
||||
var r11 = q1.pop()
|
||||
doAssert(len(q1) == 0)
|
||||
q1.push(r11)
|
||||
doAssert(len(q1) == 1)
|
||||
var r11e = q1.pop()
|
||||
doAssert(len(q1) == 0)
|
||||
doAssert(r11e == r11)
|
||||
doAssert(r11.slot == Slot(0) and r11.count == 1'u64)
|
||||
|
||||
var q2 = SyncQueue.init(Slot(0), Slot(1), 1'u64, nil)
|
||||
doAssert(len(q2) == 2)
|
||||
var r21 = q2.pop()
|
||||
doAssert(len(q2) == 1)
|
||||
var r22 = q2.pop()
|
||||
doAssert(len(q2) == 0)
|
||||
q2.push(r22)
|
||||
doAssert(len(q2) == 1)
|
||||
q2.push(r21)
|
||||
doAssert(len(q2) == 2)
|
||||
var r21e = q2.pop()
|
||||
doAssert(len(q2) == 1)
|
||||
var r22e = q2.pop()
|
||||
doAssert(len(q2) == 0)
|
||||
doAssert(r21 == r21e)
|
||||
doAssert(r22 == r22e)
|
||||
doAssert(r21.slot == Slot(0) and r21.count == 1'u64)
|
||||
doAssert(r22.slot == Slot(1) and r22.count == 1'u64)
|
||||
|
||||
var q3 = SyncQueue.init(Slot(0), Slot(4), 2'u64, nil)
|
||||
doAssert(len(q3) == 5)
|
||||
var r31 = q3.pop()
|
||||
doAssert(len(q3) == 3)
|
||||
var r32 = q3.pop()
|
||||
doAssert(len(q3) == 1)
|
||||
var r33 = q3.pop()
|
||||
doAssert(len(q3) == 0)
|
||||
q3.push(r33)
|
||||
doAssert(len(q3) == 1)
|
||||
q3.push(r32)
|
||||
doAssert(len(q3) == 3)
|
||||
q3.push(r31)
|
||||
doAssert(len(q3) == 5)
|
||||
var r31e = q3.pop()
|
||||
doAssert(len(q3) == 3)
|
||||
var r32e = q3.pop()
|
||||
doAssert(len(q3) == 1)
|
||||
var r33e = q3.pop()
|
||||
doAssert(len(q3) == 0)
|
||||
doAssert(r31 == r31e)
|
||||
doAssert(r32 == r32e)
|
||||
doAssert(r33 == r33e)
|
||||
doAssert(r31.slot == Slot(0) and r31.count == 2'u64)
|
||||
doAssert(r32.slot == Slot(2) and r32.count == 2'u64)
|
||||
doAssert(r33.slot == Slot(4) and r33.count == 1'u64)
|
||||
|
||||
var q4 = SyncQueue.init(Slot(1), Slot(5), 3'u64, nil)
|
||||
doAssert(len(q4) == 5)
|
||||
var r41 = q4.pop()
|
||||
doAssert(len(q4) == 2)
|
||||
var r42 = q4.pop()
|
||||
doAssert(len(q4) == 0)
|
||||
q4.push(r42)
|
||||
doAssert(len(q4) == 2)
|
||||
q4.push(r41)
|
||||
doAssert(len(q4) == 5)
|
||||
var r41e = q4.pop()
|
||||
doAssert(len(q4) == 2)
|
||||
var r42e = q4.pop()
|
||||
doAssert(len(q4) == 0)
|
||||
doAssert(r41 == r41e)
|
||||
doAssert(r42 == r42e)
|
||||
doAssert(r41.slot == Slot(1) and r41.count == 3'u64)
|
||||
doAssert(r42.slot == Slot(4) and r42.count == 2'u64)
|
||||
|
||||
var q5 = SyncQueue.init(Slot(1), Slot(30), 2'u64, nil)
|
||||
doAssert(len(q5) == 30)
|
||||
var r51 = q5.pop(5)
|
||||
doAssert(len(q5) == 20)
|
||||
doAssert(r51.slot == Slot(1) and r51.count == 10 and r51.step == 5)
|
||||
q5.push(r51, 3'u64)
|
||||
doAssert(len(q5) == 30)
|
||||
var r511 = q5.pop()
|
||||
var r512 = q5.pop()
|
||||
doAssert(len(q5) == 20)
|
||||
doAssert(r511.slot == Slot(1) and r511.count == 6 and r511.step == 3)
|
||||
doAssert(r512.slot == Slot(7) and r512.count == 4 and r512.step == 2)
|
||||
q5.push(r511, 2'u64)
|
||||
q5.push(r512, 1'u64)
|
||||
doAssert(len(q5) == 30)
|
||||
var r5111 = q5.pop()
|
||||
var r5112 = q5.pop()
|
||||
var r5121 = q5.pop()
|
||||
var r5122 = q5.pop()
|
||||
doAssert(len(q5) == 20)
|
||||
doAssert(r5111.slot == Slot(1) and r5111.count == 4 and r5111.step == 2)
|
||||
doAssert(r5112.slot == Slot(5) and r5112.count == 2 and r5112.step == 1)
|
||||
doAssert(r5121.slot == Slot(7) and r5121.count == 2 and r5121.step == 1)
|
||||
doAssert(r5122.slot == Slot(9) and r5122.count == 2 and r5122.step == 1)
|
||||
|
||||
var q6 = SyncQueue.init(Slot(1), Slot(7), 10'u64, nil)
|
||||
doAssert(len(q6) == 7)
|
||||
var r61 = q6.pop()
|
||||
doAssert(r61.slot == Slot(1) and r61.count == 7 and r61.step == 1)
|
||||
doAssert(len(q6) == 0)
|
||||
|
||||
var q7 = SyncQueue.init(Slot(1), Slot(7), 10'u64, nil)
|
||||
doAssert(len(q7) == 7)
|
||||
var r71 = q7.pop(5)
|
||||
doAssert(len(q7) == 0)
|
||||
doAssert(r71.slot == Slot(1) and r71.count == 7 and r71.step == 5)
|
||||
q7.push(r71, 3'u64)
|
||||
doAssert(len(q7) == 7)
|
||||
var r72 = q7.pop()
|
||||
doAssert(r72.slot == Slot(1) and r72.count == 7 and r72.step == 3)
|
||||
q7.push(r72, 2'u64)
|
||||
doAssert(len(q7) == 7)
|
||||
var r73 = q7.pop()
|
||||
doAssert(len(q7) == 0)
|
||||
doAssert(r73.slot == Slot(1) and r73.count == 7 and r73.step == 2)
|
||||
q7.push(r73, 1'u64)
|
||||
doAssert(len(q7) == 7)
|
||||
var r74 = q7.pop()
|
||||
doAssert(len(q7) == 0)
|
||||
doAssert(r74.slot == Slot(1) and r74.count == 7 and r74.step == 1)
|
||||
|
||||
result = true
|
||||
|
||||
proc syncQueueAsyncTests(): Future[bool] {.async.} =
|
||||
var chain1 = newSeq[SignedBeaconBlock](3)
|
||||
chain1[0].message.slot = Slot(0)
|
||||
chain1[1].message.slot = Slot(1)
|
||||
chain1[2].message.slot = Slot(2)
|
||||
var chain2 = newSeq[SignedBeaconBlock](7)
|
||||
chain2[0].message.slot = Slot(5)
|
||||
chain2[1].message.slot = Slot(6)
|
||||
chain2[2].message.slot = Slot(7)
|
||||
chain2[3].message.slot = Slot(8)
|
||||
chain2[4].message.slot = Slot(9)
|
||||
chain2[5].message.slot = Slot(10)
|
||||
chain2[6].message.slot = Slot(11)
|
||||
|
||||
var counter = 0
|
||||
proc receiver1(list: openarray[SignedBeaconBlock]): bool =
|
||||
result = true
|
||||
for item in list:
|
||||
if item.message.slot == uint64(counter):
|
||||
inc(counter)
|
||||
else:
|
||||
result = false
|
||||
break
|
||||
|
||||
var q1 = SyncQueue.init(Slot(0), Slot(2), 1'u64, receiver1, 1)
|
||||
var r11 = q1.pop()
|
||||
var r12 = q1.pop()
|
||||
var r13 = q1.pop()
|
||||
var f13 = q1.push(r13, @[chain1[2]])
|
||||
var f12 = q1.push(r12, @[chain1[1]])
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(f12.finished == false)
|
||||
doAssert(f13.finished == false)
|
||||
doAssert(counter == 0)
|
||||
var f11 = q1.push(r11, @[chain1[0]])
|
||||
doAssert(counter == 1)
|
||||
doAssert(f11.finished == true and f11.failed == false)
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(f12.finished == true and f12.failed == false)
|
||||
doAssert(f13.finished == true and f13.failed == false)
|
||||
doAssert(counter == 3)
|
||||
|
||||
var q2 = SyncQueue.init(Slot(5), Slot(11), 2'u64, receiver1, 2)
|
||||
var r21 = q2.pop()
|
||||
var r22 = q2.pop()
|
||||
var r23 = q2.pop()
|
||||
var r24 = q2.pop()
|
||||
|
||||
counter = 5
|
||||
|
||||
var f24 = q2.push(r24, @[chain2[6]])
|
||||
var f22 = q2.push(r22, @[chain2[2], chain2[3]])
|
||||
doAssert(f24.finished == false)
|
||||
doAssert(f22.finished == true and f22.failed == false)
|
||||
doAssert(counter == 5)
|
||||
var f21 = q2.push(r21, @[chain2[0], chain2[1]])
|
||||
doAssert(f21.finished == true and f21.failed == false)
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(f24.finished == true and f24.failed == false)
|
||||
doAssert(counter == 9)
|
||||
var f23 = q2.push(r23, @[chain2[4], chain2[5]])
|
||||
doAssert(f23.finished == true and f23.failed == false)
|
||||
doAssert(counter == 12)
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(counter == 12)
|
||||
|
||||
result = true
|
||||
|
||||
proc checkRequest(req: PeerRequest, slot, count, step: int,
|
||||
data: varargs[int]): bool =
|
||||
result = (req.startSlot == Slot(slot)) and (req.count == uint64(count)) and
|
||||
(req.step == uint64(step))
|
||||
if result:
|
||||
if len(data) != len(req.data):
|
||||
result = false
|
||||
else:
|
||||
for i in 0 ..< len(data):
|
||||
if Slot(data[i]) != req.data[i]:
|
||||
result = false
|
||||
break
|
||||
|
||||
proc checkRequest(peer: SimplePeer, index: int, slot, count, step: int,
|
||||
data: varargs[int]): bool {.inline.} =
|
||||
result = checkRequest(peer.requests[index], slot, count, step, data)
|
||||
|
||||
proc syncManagerOnePeerTest(): Future[bool] {.async.} =
|
||||
# Syncing with one peer only.
|
||||
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||
var peer = SimplePeer.init("id1")
|
||||
var srcChain = newTempChain(100, Slot(10000))
|
||||
var dstChain = newSeq[SignedBeaconBlock]()
|
||||
|
||||
proc lastLocalSlot(): Slot =
|
||||
if len(dstChain) == 0:
|
||||
result = Slot(9999)
|
||||
else:
|
||||
result = dstChain[^1].message.slot
|
||||
|
||||
proc updateBlocks(list: openarray[SignedBeaconBlock]): bool =
|
||||
for item in list:
|
||||
dstChain.add(item)
|
||||
result = true
|
||||
|
||||
peer.update(srcChain)
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
peersInSlot = 3,
|
||||
peerSlotTimeout = 1.seconds,
|
||||
slotsInGroup = 6)
|
||||
await sman.synchronize()
|
||||
doAssert(checkRequest(peer, 0, 10000, 20, 1,
|
||||
10000, 10001, 10002, 10003, 10004,
|
||||
10005, 10006, 10007, 10008, 10009,
|
||||
10010, 10011, 10012, 10013, 10014,
|
||||
10015, 10016, 10017, 10018, 10019) == true)
|
||||
doAssert(checkRequest(peer, 1, 10020, 20, 1,
|
||||
10020, 10021, 10022, 10023, 10024,
|
||||
10025, 10026, 10027, 10028, 10029,
|
||||
10030, 10031, 10032, 10033, 10034,
|
||||
10035, 10036, 10037, 10038, 10039) == true)
|
||||
doAssert(checkRequest(peer, 2, 10040, 20, 1,
|
||||
10040, 10041, 10042, 10043, 10044,
|
||||
10045, 10046, 10047, 10048, 10049,
|
||||
10050, 10051, 10052, 10053, 10054,
|
||||
10055, 10056, 10057, 10058, 10059) == true)
|
||||
doAssert(checkRequest(peer, 3, 10060, 20, 1,
|
||||
10060, 10061, 10062, 10063, 10064,
|
||||
10065, 10066, 10067, 10068, 10069,
|
||||
10070, 10071, 10072, 10073, 10074,
|
||||
10075, 10076, 10077, 10078, 10079) == true)
|
||||
doAssert(checkRequest(peer, 4, 10080, 20, 1,
|
||||
10080, 10081, 10082, 10083, 10084,
|
||||
10085, 10086, 10087, 10088, 10089,
|
||||
10090, 10091, 10092, 10093, 10094,
|
||||
10095, 10096, 10097, 10098, 10099) == true)
|
||||
result = true
|
||||
|
||||
proc syncManagerOneSlotTest(): Future[bool] {.async.} =
|
||||
# Syncing with one slot (2n + 1 number of peers) only.
|
||||
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||
|
||||
var peers = newSeq[SimplePeer](3)
|
||||
for i in 0 ..< len(peers):
|
||||
peers[i] = SimplePeer.init("id" & $(i + 1))
|
||||
|
||||
var srcChain = newTempChain(100, Slot(10000))
|
||||
var dstChain = newSeq[SignedBeaconBlock]()
|
||||
|
||||
proc lastLocalSlot(): Slot =
|
||||
if len(dstChain) == 0:
|
||||
result = Slot(9999)
|
||||
else:
|
||||
result = dstChain[^1].message.slot
|
||||
|
||||
proc updateBlocks(list: openarray[SignedBeaconBlock]): bool =
|
||||
for item in list:
|
||||
dstChain.add(item)
|
||||
result = true
|
||||
|
||||
for i in 0 ..< len(peers):
|
||||
peers[i].update(srcChain)
|
||||
doAssert(pool.addIncomingPeer(peers[0]) == true)
|
||||
doAssert(pool.addOutgoingPeer(peers[1]) == true)
|
||||
doAssert(pool.addOutgoingPeer(peers[2]) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
peersInSlot = 3,
|
||||
peerSlotTimeout = 1.seconds,
|
||||
slotsInGroup = 6)
|
||||
await sman.synchronize()
|
||||
for i in 0 ..< len(peers):
|
||||
doAssert(checkRequest(peers[i], 0, 10000, 20, 1,
|
||||
10000, 10001, 10002, 10003, 10004,
|
||||
10005, 10006, 10007, 10008, 10009,
|
||||
10010, 10011, 10012, 10013, 10014,
|
||||
10015, 10016, 10017, 10018, 10019) == true)
|
||||
doAssert(checkRequest(peers[i], 1, 10020, 20, 1,
|
||||
10020, 10021, 10022, 10023, 10024,
|
||||
10025, 10026, 10027, 10028, 10029,
|
||||
10030, 10031, 10032, 10033, 10034,
|
||||
10035, 10036, 10037, 10038, 10039) == true)
|
||||
doAssert(checkRequest(peers[i], 2, 10040, 20, 1,
|
||||
10040, 10041, 10042, 10043, 10044,
|
||||
10045, 10046, 10047, 10048, 10049,
|
||||
10050, 10051, 10052, 10053, 10054,
|
||||
10055, 10056, 10057, 10058, 10059) == true)
|
||||
doAssert(checkRequest(peers[i], 3, 10060, 20, 1,
|
||||
10060, 10061, 10062, 10063, 10064,
|
||||
10065, 10066, 10067, 10068, 10069,
|
||||
10070, 10071, 10072, 10073, 10074,
|
||||
10075, 10076, 10077, 10078, 10079) == true)
|
||||
doAssert(checkRequest(peers[i], 4, 10080, 20, 1,
|
||||
10080, 10081, 10082, 10083, 10084,
|
||||
10085, 10086, 10087, 10088, 10089,
|
||||
10090, 10091, 10092, 10093, 10094,
|
||||
10095, 10096, 10097, 10098, 10099) == true)
|
||||
result = true
|
||||
|
||||
proc syncManagerOneGroupTest(): Future[bool] {.async.} =
|
||||
# Syncing with one group of peers (n peer slots).
|
||||
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||
var peers = newSeq[SimplePeer](6)
|
||||
for i in 0 ..< len(peers):
|
||||
peers[i] = SimplePeer.init("id" & $(i + 1), weight = 10 - i)
|
||||
|
||||
var srcChain = newTempChain(100, Slot(10000))
|
||||
var dstChain = newSeq[SignedBeaconBlock]()
|
||||
|
||||
proc lastLocalSlot(): Slot =
|
||||
if len(dstChain) == 0:
|
||||
result = Slot(9999)
|
||||
else:
|
||||
result = dstChain[^1].message.slot
|
||||
|
||||
proc updateBlocks(list: openarray[SignedBeaconBlock]): bool =
|
||||
for item in list:
|
||||
dstChain.add(item)
|
||||
result = true
|
||||
|
||||
for i in 0 ..< len(peers):
|
||||
peers[i].update(srcChain)
|
||||
if i mod 2 == 0:
|
||||
doAssert(pool.addIncomingPeer(peers[i]) == true)
|
||||
else:
|
||||
doAssert(pool.addOutgoingPeer(peers[i]) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
peersInSlot = 3,
|
||||
peerSlotTimeout = 1.seconds,
|
||||
slotsInGroup = 2)
|
||||
await sman.synchronize()
|
||||
|
||||
for i in 0 ..< len(peers):
|
||||
if i in {0, 1, 2}:
|
||||
doAssert(checkRequest(peers[i], 0, 10000, 20, 2,
|
||||
10000, 10002, 10004, 10006, 10008,
|
||||
10010, 10012, 10014, 10016, 10018,
|
||||
10020, 10022, 10024, 10026, 10028,
|
||||
10030, 10032, 10034, 10036, 10038) == true)
|
||||
doAssert(checkRequest(peers[i], 1, 10040, 20, 2,
|
||||
10040, 10042, 10044, 10046, 10048,
|
||||
10050, 10052, 10054, 10056, 10058,
|
||||
10060, 10062, 10064, 10066, 10068,
|
||||
10070, 10072, 10074, 10076, 10078) == true)
|
||||
doAssert(checkRequest(peers[i], 2, 10080, 10, 2,
|
||||
10080, 10082, 10084, 10086, 10088,
|
||||
10090, 10092, 10094, 10096, 10098) == true)
|
||||
elif i in {3, 4, 5}:
|
||||
doAssert(checkRequest(peers[i], 0, 10001, 20, 2,
|
||||
10001, 10003, 10005, 10007, 10009,
|
||||
10011, 10013, 10015, 10017, 10019,
|
||||
10021, 10023, 10025, 10027, 10029,
|
||||
10031, 10033, 10035, 10037, 10039) == true)
|
||||
doAssert(checkRequest(peers[i], 1, 10041, 20, 2,
|
||||
10041, 10043, 10045, 10047, 10049,
|
||||
10051, 10053, 10055, 10057, 10059,
|
||||
10061, 10063, 10065, 10067, 10069,
|
||||
10071, 10073, 10075, 10077, 10079) == true)
|
||||
doAssert(checkRequest(peers[i], 2, 10081, 10, 2,
|
||||
10081, 10083, 10085, 10087, 10089,
|
||||
10091, 10093, 10095, 10097, 10099) == true)
|
||||
|
||||
result = true
|
||||
|
||||
proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} =
|
||||
# Syncing with two groups of peers (n peer slots), when one groups is failed
|
||||
# to deliver request, and this request is bigger then other group.
|
||||
var pool = newPeerPool[SimplePeer, SimplePeerKey]()
|
||||
var peers = newSeq[SimplePeer](6 + 3)
|
||||
for i in 0 ..< len(peers):
|
||||
peers[i] = SimplePeer.init("id" & $(i + 1), weight = 9 - i)
|
||||
|
||||
var srcChain = newTempChain(100, Slot(10000))
|
||||
var dstChain = newSeq[SignedBeaconBlock]()
|
||||
|
||||
for i in 0 ..< 6:
|
||||
peers[i].update(srcChain, failure = true, disconnect = true)
|
||||
for i in 6 ..< len(peers):
|
||||
peers[i].update(srcChain)
|
||||
|
||||
proc lastLocalSlot(): Slot =
|
||||
if len(dstChain) == 0:
|
||||
result = Slot(9999)
|
||||
else:
|
||||
result = dstChain[^1].message.slot
|
||||
|
||||
proc updateBlocks(list: openarray[SignedBeaconBlock]): bool =
|
||||
for item in list:
|
||||
dstChain.add(item)
|
||||
result = true
|
||||
|
||||
for i in 0 ..< len(peers):
|
||||
if i mod 2 == 0:
|
||||
doAssert(pool.addIncomingPeer(peers[i]) == true)
|
||||
else:
|
||||
doAssert(pool.addOutgoingPeer(peers[i]) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
peersInSlot = 3,
|
||||
peerSlotTimeout = 1.seconds,
|
||||
slotsInGroup = 2)
|
||||
await sman.synchronize()
|
||||
|
||||
for i in 0 ..< len(peers):
|
||||
if i in {0, 1, 2}:
|
||||
doAssert(checkRequest(peers[i], 0, 10020, 20, 2) == true)
|
||||
elif i in {3, 4, 5}:
|
||||
doAssert(checkRequest(peers[i], 0, 10021, 20, 2) == true)
|
||||
elif i in {6, 7, 8}:
|
||||
doAssert(checkRequest(peers[i], 0, 10000, 20, 1,
|
||||
10000, 10001, 10002, 10003, 10004,
|
||||
10005, 10006, 10007, 10008, 10009,
|
||||
10010, 10011, 10012, 10013, 10014,
|
||||
10015, 10016, 10017, 10018, 10019) == true)
|
||||
doAssert(checkRequest(peers[i], 1, 10020, 20, 1,
|
||||
10020, 10021, 10022, 10023, 10024,
|
||||
10025, 10026, 10027, 10028, 10029,
|
||||
10030, 10031, 10032, 10033, 10034,
|
||||
10035, 10036, 10037, 10038, 10039) == true)
|
||||
doAssert(checkRequest(peers[i], 2, 10040, 20, 1,
|
||||
10040, 10041, 10042, 10043, 10044,
|
||||
10045, 10046, 10047, 10048, 10049,
|
||||
10050, 10051, 10052, 10053, 10054,
|
||||
10055, 10056, 10057, 10058, 10059) == true)
|
||||
doAssert(checkRequest(peers[i], 3, 10060, 20, 1,
|
||||
10060, 10061, 10062, 10063, 10064,
|
||||
10065, 10066, 10067, 10068, 10069,
|
||||
10070, 10071, 10072, 10073, 10074,
|
||||
10075, 10076, 10077, 10078, 10079) == true)
|
||||
doAssert(checkRequest(peers[i], 4, 10080, 20, 1,
|
||||
10080, 10081, 10082, 10083, 10084,
|
||||
10085, 10086, 10087, 10088, 10089,
|
||||
10090, 10091, 10092, 10093, 10094,
|
||||
10095, 10096, 10097, 10098, 10099) == true)
|
||||
result = true
|
||||
|
||||
|
||||
when isMainModule:
|
||||
suite "SyncManager test suite":
|
||||
test "BlockList tests":
|
||||
# TODO
|
||||
discard
|
||||
# test "PeerSlot tests":
|
||||
# check waitFor(peerSlotTests()) == true
|
||||
# test "PeerGroup tests":
|
||||
# check waitFor(peerGroupTests()) == true
|
||||
# test "SyncQueue non-async tests":
|
||||
# check syncQueueNonAsyncTests() == true
|
||||
# test "SyncQueue async tests":
|
||||
# check waitFor(syncQueueAsyncTests()) == true
|
||||
# test "SyncManager one-peer test":
|
||||
# check waitFor(syncManagerOnePeerTest()) == true
|
||||
# test "SyncManager one-peer-slot test":
|
||||
# check waitFor(syncManagerOneSlotTest()) == true
|
||||
# test "SyncManager one-peer-group test":
|
||||
# check waitFor(syncManagerOneGroupTest()) == true
|
||||
test "SyncManager group-recovery test":
|
||||
check waitFor(syncManagerGroupRecoveryTest()) == true
|
Loading…
Reference in New Issue