PeerPool initial commit.
This commit is contained in:
parent
1099548775
commit
fa22ba22b9
|
@ -0,0 +1,477 @@
|
|||
import tables, heapqueue
|
||||
import chronos
|
||||
|
||||
type
|
||||
PeerType* = enum
|
||||
None, Incoming, Outgoing
|
||||
|
||||
PeerFlags = enum
|
||||
Acquired, DeleteOnRelease
|
||||
|
||||
PeerItem[T] = object
|
||||
data: T
|
||||
peerType: PeerType
|
||||
flags: set[PeerFlags]
|
||||
index: int
|
||||
|
||||
PeerIndex = object
|
||||
data: int
|
||||
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
|
||||
|
||||
PeerPool*[A, B] = ref object
|
||||
incNeEvent: AsyncEvent
|
||||
outNeEvent: AsyncEvent
|
||||
incQueue: HeapQueue[PeerIndex]
|
||||
outQueue: HeapQueue[PeerIndex]
|
||||
registry: Table[B, PeerIndex]
|
||||
storage: seq[PeerItem[A]]
|
||||
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
|
||||
maxPeersCount: int
|
||||
maxIncPeersCount: int
|
||||
maxOutPeersCount: int
|
||||
curIncPeersCount: int
|
||||
curOutPeersCount: int
|
||||
acqIncPeersCount: int
|
||||
acqOutPeersCount: int
|
||||
|
||||
proc `<`*(a, b: PeerIndex): bool =
|
||||
result = a.cmp(b, a)
|
||||
|
||||
proc fireEvent[A, B](pool: PeerPool[A, B], item: PeerItem[A]) {.inline.} =
|
||||
if item.peerType == PeerType.Incoming:
|
||||
pool.incNeEvent.fire()
|
||||
elif item.peerType == PeerType.Outgoing:
|
||||
pool.outNeEvent.fire()
|
||||
|
||||
proc waitEvent[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]) {.async.} =
|
||||
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
|
||||
var fut1 = pool.incNeEvent.wait()
|
||||
var fut2 = pool.outNeEvent.wait()
|
||||
try:
|
||||
discard await one(fut1, fut2)
|
||||
if fut1.finished:
|
||||
if not(fut2.finished):
|
||||
fut2.cancel()
|
||||
pool.incNeEvent.clear()
|
||||
else:
|
||||
if not(fut1.finished):
|
||||
fut1.cancel()
|
||||
pool.outNeEvent.clear()
|
||||
except CancelledError:
|
||||
if not(fut1.finished):
|
||||
fut1.cancel()
|
||||
if not(fut2.finished):
|
||||
fut2.cancel()
|
||||
raise
|
||||
elif PeerType.Incoming in filter:
|
||||
await pool.incNeEvent.wait()
|
||||
pool.incNeEvent.clear()
|
||||
elif PeerType.Outgoing in filter:
|
||||
await pool.outNeEvent.wait()
|
||||
pool.outNeEvent.clear()
|
||||
|
||||
template getItem[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]): ptr PeerItem[A] =
|
||||
doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0))
|
||||
var pindex: int
|
||||
if filter == {PeerType.Incoming, PeerType.Outgoing}:
|
||||
if len(pool.outQueue) > 0 and len(pool.incQueue) > 0:
|
||||
# Don't think `<` is actually `<` here.
|
||||
if pool.incQueue[0] < pool.outQueue[0]:
|
||||
inc(pool.acqIncPeersCount)
|
||||
pindex = pool.incQueue.pop().data
|
||||
else:
|
||||
inc(pool.acqOutPeersCount)
|
||||
pindex = pool.outQueue.pop().data
|
||||
else:
|
||||
if len(pool.outQueue) > 0:
|
||||
inc(pool.acqOutPeersCount)
|
||||
pindex = pool.outQueue.pop().data
|
||||
else:
|
||||
inc(pool.acqIncPeersCount)
|
||||
pindex = pool.incQueue.pop().data
|
||||
else:
|
||||
if PeerType.Outgoing in filter:
|
||||
inc(pool.acqOutPeersCount)
|
||||
pindex = pool.outQueue.pop().data
|
||||
elif PeerType.Incoming in filter:
|
||||
inc(pool.acqIncPeersCount)
|
||||
pindex = pool.incQueue.pop().data
|
||||
addr(pool.storage[pindex])
|
||||
|
||||
proc newPeerPool*[A, B](maxPeers = -1,
|
||||
maxIncomingPeers = -1,
|
||||
maxOutgoingPeers = -1): PeerPool[A, B] =
|
||||
## Create new PeerPool.
|
||||
##
|
||||
## ``maxPeers`` - maximum number of peers allowed. All the peers which
|
||||
## exceeds this number will be rejected (``addPeer()`` procedure will return
|
||||
## ``false``). By default this number is infinite.
|
||||
##
|
||||
## ``maxIncomingPeers`` - maximum number of incoming peers allowed. All the
|
||||
## incoming peers exceeds this number will be rejected. By default this
|
||||
## number is infinite.
|
||||
##
|
||||
## ``maxOutgoingPeers`` - maximum number of outgoing peers allowed. All the
|
||||
## outgoing peers exceeds this number will be rejected. By default this
|
||||
## number if infinite.
|
||||
##
|
||||
## Please note, that if ``maxPeers`` is positive non-zero value, then equation
|
||||
## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``.
|
||||
var res = PeerPool[A, B]()
|
||||
if maxPeers != -1:
|
||||
doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers)
|
||||
|
||||
res.maxPeersCount = if maxPeers < 0: high(int)
|
||||
else: maxPeers
|
||||
res.maxIncPeersCount = if maxIncomingPeers < 0: high(int)
|
||||
else: maxIncomingPeers
|
||||
res.maxOutPeersCount = if maxOutgoingPeers < 0: high(int)
|
||||
else: maxOutgoingPeers
|
||||
res.incNeEvent = newAsyncEvent()
|
||||
res.outNeEvent = newAsyncEvent()
|
||||
res.incQueue = initHeapQueue[PeerIndex]()
|
||||
res.outQueue = initHeapQueue[PeerIndex]()
|
||||
res.registry = initTable[B, PeerIndex]()
|
||||
res.storage = newSeq[PeerItem[A]]()
|
||||
|
||||
proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} =
|
||||
let p1 = res.storage[a.data].data
|
||||
let p2 = res.storage[b.data].data
|
||||
result = p1 < p2
|
||||
|
||||
res.cmp = peerCmp
|
||||
result = res
|
||||
|
||||
proc len*[A, B](pool: PeerPool[A, B]): int =
|
||||
## Returns number of registered peers in PeerPool ``pool``. This number
|
||||
## includes all the peers (acquired and available).
|
||||
result = len(pool.registry)
|
||||
|
||||
proc lenAvailable*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): int {.inline.} =
|
||||
## Returns number of available peers in PeerPool ``pool`` which satisfies
|
||||
## filter ``filter``.
|
||||
if PeerType.Incoming in filter:
|
||||
result = result + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
result = result + len(pool.outQueue)
|
||||
|
||||
proc lenAcquired*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): int {.inline.} =
|
||||
## Returns number of acquired peers in PeerPool ``pool`` which satisifies
|
||||
## filter ``filter``.
|
||||
if PeerType.Incoming in filter:
|
||||
result = result + pool.acqIncPeersCount
|
||||
if PeerType.Outgoing in filter:
|
||||
result = result + pool.acqOutPeersCount
|
||||
|
||||
proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
||||
## Remove ``peer`` from PeerPool ``pool``.
|
||||
##
|
||||
## Deletion occurs immediately only if peer is available, otherwise it will
|
||||
## be deleted only when peer will be released. You can change this behavior
|
||||
## with ``force`` option.
|
||||
mixin getKey
|
||||
var key = getKey(peer)
|
||||
if pool.registry.hasKey(key):
|
||||
let pindex = pool.registry[key].data
|
||||
var item = addr(pool.storage[pindex])
|
||||
if (PeerFlags.Acquired in item[].flags):
|
||||
if not(force):
|
||||
item[].flags.incl(PeerFlags.DeleteOnRelease)
|
||||
else:
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
dec(pool.curIncPeersCount)
|
||||
dec(pool.acqIncPeersCount)
|
||||
elif item[].peerType == PeerType.Outgoing:
|
||||
dec(pool.curOutPeersCount)
|
||||
dec(pool.acqOutPeersCount)
|
||||
# Cleanup storage with default item, and removing key from hashtable.
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
else:
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
# If peer is available, then its copy present in heapqueue, so we need
|
||||
# to remove it.
|
||||
for i in 0 ..< len(pool.incQueue):
|
||||
if pool.incQueue[i].data == pindex:
|
||||
pool.incQueue.del(i)
|
||||
break
|
||||
dec(pool.curIncPeersCount)
|
||||
elif item[].peerType == PeerType.Outgoing:
|
||||
# If peer is available, then its copy present in heapqueue, so we need
|
||||
# to remove it.
|
||||
for i in 0 ..< len(pool.outQueue):
|
||||
if pool.outQueue[i].data == pindex:
|
||||
pool.outQueue.del(i)
|
||||
break
|
||||
dec(pool.curOutPeersCount)
|
||||
# Cleanup storage with default item, and removing key from hashtable.
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
|
||||
result = true
|
||||
|
||||
proc addPeer*[A, B](pool: PeerPool[A, B], peer: A, peerType: PeerType): bool =
|
||||
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
mixin getKey, getFuture
|
||||
|
||||
if len(pool.registry) >= pool.maxPeersCount:
|
||||
return false
|
||||
|
||||
var item = PeerItem[A](data: peer, peerType: peerType,
|
||||
index: len(pool.storage))
|
||||
var key = getKey(peer)
|
||||
|
||||
if not(pool.registry.hasKey(key)):
|
||||
pool.storage.add(item)
|
||||
var pitem = addr(pool.storage[^1])
|
||||
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
|
||||
pool.registry[key] = pindex
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
discard pool.deletePeer(peer)
|
||||
|
||||
pitem[].data.getFuture().addCallback(onPeerClosed)
|
||||
|
||||
if peerType == PeerType.Incoming:
|
||||
if pool.curIncPeersCount >= pool.maxIncPeersCount:
|
||||
return false
|
||||
else:
|
||||
inc(pool.curIncPeersCount)
|
||||
pool.incQueue.push(pindex)
|
||||
pool.incNeEvent.fire()
|
||||
elif peerType == PeerType.Outgoing:
|
||||
if pool.curOutPeersCount >= pool.maxOutPeersCount:
|
||||
return false
|
||||
else:
|
||||
inc(pool.curOutPeersCount)
|
||||
pool.outQueue.push(pindex)
|
||||
pool.outNeEvent.fire()
|
||||
|
||||
result = true
|
||||
|
||||
proc addIncomingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
## Add incoming peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeer(peer, PeerType.Incoming)
|
||||
|
||||
proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
## Add outgoing peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeer(peer, PeerType.Outgoing)
|
||||
|
||||
proc acquire*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): Future[A] {.async.} =
|
||||
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
while true:
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count == 0:
|
||||
await pool.waitEvent(filter)
|
||||
else:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
result = item[].data
|
||||
break
|
||||
|
||||
proc release*[A, B](pool: PeerPool[A, B], peer: A) =
|
||||
## Release peer ``peer`` back to PeerPool ``pool``
|
||||
mixin getKey
|
||||
var key = getKey(peer)
|
||||
var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1))
|
||||
if titem.data >= 0:
|
||||
let pindex = titem.data
|
||||
var item = addr(pool.storage[pindex])
|
||||
if PeerFlags.Acquired in item[].flags:
|
||||
item[].flags.excl(PeerFlags.Acquired)
|
||||
if PeerFlags.DeleteOnRelease in item[].flags:
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
dec(pool.curIncPeersCount)
|
||||
dec(pool.acqIncPeersCount)
|
||||
elif item[].peerType == PeerType.Outgoing:
|
||||
dec(pool.curOutPeersCount)
|
||||
dec(pool.acqOutPeersCount)
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
else:
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
pool.incQueue.push(titem)
|
||||
dec(pool.acqIncPeersCount)
|
||||
elif item[].peerType == PeerType.Outgoing:
|
||||
pool.outQueue.push(titem)
|
||||
dec(pool.acqOutPeersCount)
|
||||
pool.fireEvent(item[])
|
||||
|
||||
proc release*[A, B](pool: PeerPool[A, B], peers: openarray[A]) {.inline.} =
|
||||
## Release array of peers ``peers`` back to PeerPool ``pool``.
|
||||
for item in peers:
|
||||
pool.release(item)
|
||||
|
||||
proc acquire*[A, B](pool: PeerPool[A, B],
|
||||
number: int,
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): Future[seq[A]] {.async.} =
|
||||
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
|
||||
## filter ``filter``.
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
var peers = newSeq[A]()
|
||||
try:
|
||||
if number > 0:
|
||||
while true:
|
||||
if len(peers) >= number:
|
||||
break
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count == 0:
|
||||
await pool.waitEvent(filter)
|
||||
else:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
peers.add(item[].data)
|
||||
except CancelledError:
|
||||
# If we got cancelled, we need to return all the acquired peers back to
|
||||
# pool.
|
||||
for item in peers:
|
||||
pool.release(item)
|
||||
peers.setLen(0)
|
||||
raise
|
||||
result = peers
|
||||
|
||||
proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} =
|
||||
## Acquire single incoming peer from PeerPool ``pool``.
|
||||
pool.acquire({PeerType.Incoming})
|
||||
|
||||
proc acquireOutgoingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} =
|
||||
## Acquire single outgoing peer from PeerPool ``pool``.
|
||||
pool.acquire({PeerType.Outgoing})
|
||||
|
||||
proc acquireIncomingPeers*[A, B](pool: PeerPool[A, B],
|
||||
number: int): Future[seq[A]] {.inline.} =
|
||||
## Acquire ``number`` number of incoming peers from PeerPool ``pool``.
|
||||
pool.acquire(number, {PeerType.Incoming})
|
||||
|
||||
proc acquireOutgoingPeers*[A, B](pool: PeerPool[A, B],
|
||||
number: int): Future[seq[A]] {.inline.} =
|
||||
## Acquire ``number`` number of outgoing peers from PeerPool ``pool``.
|
||||
pool.acquire(number, {PeerType.Outgoing})
|
||||
|
||||
iterator peers*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): A =
|
||||
## Iterate over sorted list of peers.
|
||||
##
|
||||
## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values
|
||||
## will be first.
|
||||
var sorted = initHeapQueue[PeerIndex]()
|
||||
for i in 0 ..< len(pool.storage):
|
||||
if pool.storage[i].peerType in filter:
|
||||
sorted.push(PeerIndex(data: i, cmp: pool.cmp))
|
||||
while len(sorted) > 0:
|
||||
let pindex = sorted.pop().data
|
||||
yield pool.storage[pindex].data
|
||||
|
||||
iterator availablePeers*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): A =
|
||||
## Iterate over sorted list of available peers.
|
||||
##
|
||||
## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values
|
||||
## will be first.
|
||||
var sorted = initHeapQueue[PeerIndex]()
|
||||
for i in 0 ..< len(pool.storage):
|
||||
if (PeerFlags.Acquired notin pool.storage[i].flags) and
|
||||
(pool.storage[i].peerType in filter):
|
||||
sorted.push(PeerIndex(data: i, cmp: pool.cmp))
|
||||
while len(sorted) > 0:
|
||||
let pindex = sorted.pop().data
|
||||
yield pool.storage[pindex].data
|
||||
|
||||
iterator acquiredPeers*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): A =
|
||||
## Iterate over sorted list of acquired (non-available) peers.
|
||||
##
|
||||
## All peers will be sorted by equation `>`(Peer1, Peer2), so biggest values
|
||||
## will be first.
|
||||
var sorted = initHeapQueue[PeerIndex]()
|
||||
for i in 0 ..< len(pool.storage):
|
||||
if (PeerFlags.Acquired in pool.storage[i].flags) and
|
||||
(pool.storage[i].peerType in filter):
|
||||
sorted.push(PeerIndex(data: i, cmp: pool.cmp))
|
||||
while len(sorted) > 0:
|
||||
let pindex = sorted.pop().data
|
||||
yield pool.storage[pindex].data
|
||||
|
||||
proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
|
||||
## Retrieve peer with key ``key`` from PeerPool ``pool``.
|
||||
let pindex = pool.registry[key]
|
||||
result = pool.storage[pindex.data]
|
||||
|
||||
proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline.} =
|
||||
## Retrieve peer with key ``key`` from PeerPool ``pool``.
|
||||
let pindex = pool.registry[key]
|
||||
result = pool.storage[pindex.data].data
|
||||
|
||||
proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} =
|
||||
## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``.
|
||||
result = pool.registry.hasKey(key)
|
||||
|
||||
proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
|
||||
## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is
|
||||
## not present, default initialization value for type ``A`` is returned
|
||||
## (e.g. 0 for any integer type).
|
||||
let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1))
|
||||
if pindex.data >= 0:
|
||||
result = pool.storage[pindex.data].data
|
||||
|
||||
proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B,
|
||||
default: A): A {.inline.} =
|
||||
## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is
|
||||
## not present, default value ``default`` is returned.
|
||||
let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1))
|
||||
if pindex.data >= 0:
|
||||
result = pool.storage[pindex.data].data
|
||||
else:
|
||||
result = default
|
||||
|
||||
proc clear*[A, B](pool: PeerPool[A, B]) =
|
||||
## Performs PeerPool's ``pool`` storage and counters reset.
|
||||
pool.incQueue.clear()
|
||||
pool.outQueue.clear()
|
||||
pool.registry.clear()
|
||||
for i in 0 ..< len(pool.storage):
|
||||
pool.storage[i] = PeerItem[A]()
|
||||
pool.storage.setLen(0)
|
||||
pool.curIncPeersCount = 0
|
||||
pool.curOutPeersCount = 0
|
||||
pool.acqIncPeersCount = 0
|
||||
pool.acqOutPeersCount = 0
|
||||
|
||||
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
|
||||
## Performs "safe" clear. Safe means that it first acquires all the peers
|
||||
## in PeerPool, and only after that it will reset storage.
|
||||
var acquired = newSeq[A]()
|
||||
while len(pool.registry) > len(acquired):
|
||||
var peers = await pool.acquire(len(pool.registry) - len(acquired))
|
||||
for item in peers:
|
||||
acquired.add(item)
|
||||
pool.clear()
|
|
@ -20,7 +20,8 @@ import # Unit test
|
|||
./test_state_transition,
|
||||
./test_sync_protocol,
|
||||
# ./test_validator # Empty!
|
||||
./test_zero_signature
|
||||
./test_zero_signature,
|
||||
./test_peer_pool
|
||||
|
||||
import # Refactor state transition unit tests
|
||||
./spec_block_processing/test_genesis,
|
||||
|
|
|
@ -0,0 +1,459 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2019 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at http://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
|
||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||
|
||||
import
|
||||
unittest, random, heapqueue, tables, strutils,
|
||||
chronos,
|
||||
../beacon_chain/peer_pool
|
||||
|
||||
type
|
||||
PeerTestID* = string
|
||||
PeerTest* = object
|
||||
id: PeerTestID
|
||||
weight: int
|
||||
future: Future[void]
|
||||
|
||||
proc getKey*(peer: PeerTest): PeerTestID =
|
||||
result = peer.id
|
||||
|
||||
proc getFuture*(peer: PeerTest): Future[void] =
|
||||
result = peer.future
|
||||
|
||||
proc `<`*(a, b: PeerTest): bool =
|
||||
result = `<`(a.weight, b.weight)
|
||||
|
||||
proc init*(t: typedesc[PeerTest], id: string = "",
|
||||
weight: int = 0): PeerTest =
|
||||
result = PeerTest(id: id, weight: weight, future: newFuture[void]())
|
||||
|
||||
proc close*(peer: PeerTest) =
|
||||
peer.future.complete()
|
||||
|
||||
suite "PeerPool testing suite":
|
||||
test "addPeer() test":
|
||||
const peersCount = [
|
||||
[10, 5, 5, 10, 5, 5],
|
||||
[-1, 5, 5, 10, 5, 5],
|
||||
[-1, -1, -1, 10, 5, 5]
|
||||
]
|
||||
for item in peersCount:
|
||||
var pool = newPeerPool[PeerTest, PeerTestID](item[0], item[1], item[2])
|
||||
for i in 0 ..< item[4]:
|
||||
var peer = PeerTest.init("idInc" & $i)
|
||||
check pool.addIncomingPeer(peer) == true
|
||||
|
||||
for i in 0 ..< item[5]:
|
||||
var peer = PeerTest.init("idOut" & $i)
|
||||
check pool.addOutgoingPeer(peer) == true
|
||||
|
||||
var peer = PeerTest.init("idCheck")
|
||||
if item[1] != -1:
|
||||
for i in 0 ..< item[3]:
|
||||
check pool.addIncomingPeer(peer) == false
|
||||
if item[2] != -1:
|
||||
for i in 0 ..< item[3]:
|
||||
check pool.addOutgoingPeer(peer) == false
|
||||
check:
|
||||
pool.lenAvailable == item[3]
|
||||
pool.lenAvailable({PeerType.Incoming}) == item[4]
|
||||
pool.lenAvailable({PeerType.Outgoing}) == item[5]
|
||||
test "Acquire from empty pool":
|
||||
var pool0 = newPeerPool[PeerTest, PeerTestID]()
|
||||
var pool1 = newPeerPool[PeerTest, PeerTestID]()
|
||||
var pool2 = newPeerPool[PeerTest, PeerTestID]()
|
||||
|
||||
var itemFut01 = pool0.acquire({PeerType.Incoming})
|
||||
var itemFut02 = pool0.acquire({PeerType.Outgoing})
|
||||
var itemFut03 = pool0.acquire({PeerType.Incoming, PeerType.Outgoing})
|
||||
var itemFut04 = pool0.acquire()
|
||||
var itemFut05 = pool0.acquire(5, {PeerType.Incoming})
|
||||
var itemFut06 = pool0.acquire(5, {PeerType.Outgoing})
|
||||
var itemFut07 = pool0.acquire(5, {PeerType.Incoming, PeerType.Outgoing})
|
||||
var itemFut08 = pool0.acquire(5)
|
||||
check:
|
||||
itemFut01.finished == false
|
||||
itemFut02.finished == false
|
||||
itemFut03.finished == false
|
||||
itemFut04.finished == false
|
||||
itemFut05.finished == false
|
||||
itemFut06.finished == false
|
||||
itemFut07.finished == false
|
||||
itemFut08.finished == false
|
||||
|
||||
var peer11 = PeerTest.init("peer11")
|
||||
var peer12 = PeerTest.init("peer12")
|
||||
var peer21 = PeerTest.init("peer21")
|
||||
var peer22 = PeerTest.init("peer22")
|
||||
check:
|
||||
pool1.addPeer(peer11, PeerType.Incoming) == true
|
||||
pool1.addPeer(peer12, PeerType.Incoming) == true
|
||||
pool2.addPeer(peer21, PeerType.Outgoing) == true
|
||||
pool2.addPeer(peer22, PeerType.Outgoing) == true
|
||||
|
||||
var itemFut11 = pool1.acquire({PeerType.Outgoing})
|
||||
var itemFut12 = pool1.acquire(10, {PeerType.Outgoing})
|
||||
var itemFut13 = pool1.acquire(3, {PeerType.Incoming})
|
||||
var itemFut14 = pool1.acquire({PeerType.Incoming})
|
||||
|
||||
var itemFut21 = pool2.acquire({PeerType.Incoming})
|
||||
var itemFut22 = pool2.acquire(10, {PeerType.Incoming})
|
||||
var itemFut23 = pool2.acquire(3, {PeerType.Outgoing})
|
||||
var itemFut24 = pool1.acquire({PeerType.Outgoing})
|
||||
check:
|
||||
itemFut11.finished == false
|
||||
itemFut12.finished == false
|
||||
itemFut13.finished == false
|
||||
itemFut14.finished == false
|
||||
itemFut21.finished == false
|
||||
itemFut22.finished == false
|
||||
itemFut23.finished == false
|
||||
itemFut24.finished == false
|
||||
|
||||
test "Acquire/Sorting and consistency test":
|
||||
const
|
||||
TestsCount = 1000
|
||||
MaxNumber = 1_000_000
|
||||
|
||||
var pool = newPeerPool[PeerTest, PeerTestID]()
|
||||
|
||||
proc testAcquireRelease(): Future[int] {.async.} =
|
||||
var weight: int
|
||||
var incoming, outgoing, total: seq[PeerTest]
|
||||
var incWeight1, outWeight1, totWeight1: int
|
||||
|
||||
incoming.setLen(0)
|
||||
for i in 0 ..< pool.lenAvailable({PeerType.Incoming}):
|
||||
var peer = await pool.acquire({PeerType.Incoming})
|
||||
incoming.add(peer)
|
||||
|
||||
outgoing.setLen(0)
|
||||
for i in 0 ..< pool.lenAvailable({PeerType.Outgoing}):
|
||||
var peer = await pool.acquire({PeerType.Outgoing})
|
||||
outgoing.add(peer)
|
||||
|
||||
weight = MaxNumber + 1
|
||||
incWeight1 = 0
|
||||
for i in 0 ..< len(incoming):
|
||||
incWeight1 = incWeight1 + incoming[i].weight
|
||||
if incoming[i].weight > weight:
|
||||
raise newException(ValueError, "Incoming items are not sorted")
|
||||
weight = incoming[i].weight
|
||||
pool.release(incoming[i])
|
||||
|
||||
weight = MaxNumber + 1
|
||||
outWeight1 = 0
|
||||
for i in 0..<len(outgoing):
|
||||
outWeight1 = outWeight1 + outgoing[i].weight
|
||||
if outgoing[i].weight > weight:
|
||||
raise newException(ValueError, "Outgoing items are not sorted")
|
||||
weight = outgoing[i].weight
|
||||
pool.release(outgoing[i])
|
||||
|
||||
for i in 0 ..< pool.lenAvailable():
|
||||
var peer = await pool.acquire()
|
||||
total.add(peer)
|
||||
|
||||
weight = MaxNumber + 1
|
||||
totWeight1 = 0
|
||||
for i in 0 ..< len(total):
|
||||
totWeight1 = totWeight1 + total[i].weight
|
||||
if total[i].weight > weight:
|
||||
raise newException(ValueError, "Outgoing items are not sorted")
|
||||
weight = total[i].weight
|
||||
pool.release(total[i])
|
||||
|
||||
doAssert(totWeight1 == incWeight1 + outWeight1)
|
||||
doAssert(len(total) == len(incoming) + len(outgoing))
|
||||
|
||||
result = TestsCount
|
||||
|
||||
randomize()
|
||||
for i in 0 ..< TestsCount:
|
||||
var peer = PeerTest.init("peer" & $i, rand(MaxNumber))
|
||||
# echo repr peer
|
||||
if rand(100) mod 2 == 0:
|
||||
check pool.addPeer(peer, PeerType.Incoming) == true
|
||||
else:
|
||||
check pool.addPeer(peer, PeerType.Outgoing) == true
|
||||
|
||||
check waitFor(testAcquireRelease()) == TestsCount
|
||||
|
||||
test "deletePeer() test":
|
||||
proc testDeletePeer(): Future[bool] {.async.} =
|
||||
var pool = newPeerPool[PeerTest, PeerTestID]()
|
||||
var peer = PeerTest.init("deletePeer")
|
||||
|
||||
## Delete available peer
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 1)
|
||||
doAssert(pool.deletePeer(peer) == true)
|
||||
doAssert(pool.len == 0)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
|
||||
## Delete acquired peer
|
||||
peer = PeerTest.init("closingPeer")
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 1)
|
||||
var apeer = await pool.acquire()
|
||||
doAssert(pool.deletePeer(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
pool.release(apeer)
|
||||
doAssert(pool.len == 0)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
|
||||
## Force delete acquired peer
|
||||
peer = PeerTest.init("closingPeer")
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 1)
|
||||
apeer = await pool.acquire()
|
||||
doAssert(pool.deletePeer(peer, true) == true)
|
||||
doAssert(pool.len == 0)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
|
||||
result = true
|
||||
check waitFor(testDeletePeer()) == true
|
||||
|
||||
test "Peer lifetime test":
|
||||
proc testPeerLifetime(): Future[bool] {.async.} =
|
||||
var pool = newPeerPool[PeerTest, PeerTestID]()
|
||||
var peer = PeerTest.init("closingPeer")
|
||||
|
||||
## Close available peer
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 1)
|
||||
close(peer)
|
||||
# We need to wait next callback scheduler
|
||||
await sleepAsync(1.milliseconds)
|
||||
doAssert(pool.len == 0)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
|
||||
## Close acquired peer
|
||||
peer = PeerTest.init("closingPeer")
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 1)
|
||||
var apeer = await pool.acquire()
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
close(peer)
|
||||
await sleepAsync(1.milliseconds)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
pool.release(apeer)
|
||||
doAssert(pool.len == 0)
|
||||
doAssert(pool.lenAvailable == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
doAssert(pool.lenAvailable({PeerType.Incoming}) == 0)
|
||||
|
||||
result = true
|
||||
|
||||
check waitFor(testPeerLifetime()) == true
|
||||
|
||||
test "Safe/Clear test":
|
||||
var pool = newPeerPool[PeerTest, PeerTestID]()
|
||||
var peer1 = PeerTest.init("peer1", 10)
|
||||
var peer2 = PeerTest.init("peer2", 9)
|
||||
var peer3 = PeerTest.init("peer3", 8)
|
||||
|
||||
check:
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Outgoing) == true
|
||||
pool.lenAvailable == 3
|
||||
pool.lenAvailable({PeerType.Outgoing}) == 1
|
||||
pool.lenAvailable({PeerType.Incoming}) == 2
|
||||
pool.lenAcquired == 0
|
||||
pool.len == 3
|
||||
|
||||
pool.clear()
|
||||
|
||||
check:
|
||||
pool.lenAvailable == 0
|
||||
pool.lenAvailable({PeerType.Outgoing}) == 0
|
||||
pool.lenAvailable({PeerType.Incoming}) == 0
|
||||
pool.lenAcquired == 0
|
||||
pool.len == 0
|
||||
|
||||
check:
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Outgoing) == true
|
||||
pool.lenAvailable == 3
|
||||
pool.lenAvailable({PeerType.Outgoing}) == 1
|
||||
pool.lenAvailable({PeerType.Incoming}) == 2
|
||||
pool.lenAcquired == 0
|
||||
pool.len == 3
|
||||
|
||||
proc testConsumer() {.async.} =
|
||||
var p = await pool.acquire()
|
||||
await sleepAsync(100.milliseconds)
|
||||
pool.release(p)
|
||||
|
||||
proc testClose(): Future[bool] {.async.} =
|
||||
await pool.clearSafe()
|
||||
result = true
|
||||
|
||||
asyncCheck testConsumer()
|
||||
check waitFor(testClose()) == true
|
||||
|
||||
test "Access peers by key test":
|
||||
var pool = newPeerPool[PeerTest, PeerTestID]()
|
||||
var peer1 = PeerTest.init("peer1", 10)
|
||||
var peer2 = PeerTest.init("peer2", 9)
|
||||
var peer3 = PeerTest.init("peer3", 8)
|
||||
|
||||
check:
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Outgoing) == true
|
||||
pool.hasPeer("peer4") == false
|
||||
pool.hasPeer("peer1") == true
|
||||
pool.hasPeer("peer2") == true
|
||||
pool.hasPeer("peer3") == true
|
||||
pool.getOrDefault("peer4").id == ""
|
||||
pool.getOrDefault("peer4", PeerTest.init("peer5")).id == "peer5"
|
||||
pool.getOrDefault("peer1").id == "peer1"
|
||||
pool.getOrDefault("peer1", PeerTest.init("peer5")).id == "peer1"
|
||||
pool["peer1"].id == "peer1"
|
||||
pool["peer1"].weight == 10
|
||||
pool["peer2"].id == "peer2"
|
||||
pool["peer2"].weight == 9
|
||||
pool["peer3"].id == "peer3"
|
||||
pool["peer3"].weight == 8
|
||||
|
||||
var ppeer = addr(pool["peer1"])
|
||||
ppeer[].weight = 100
|
||||
check pool["peer1"].weight == 100
|
||||
|
||||
test "Iterators test":
|
||||
var pool = newPeerPool[PeerTest, PeerTestID]()
|
||||
var peer1 = PeerTest.init("peer1", 10)
|
||||
var peer2 = PeerTest.init("peer2", 9)
|
||||
var peer3 = PeerTest.init("peer3", 8)
|
||||
var peer4 = PeerTest.init("peer4", 7)
|
||||
var peer5 = PeerTest.init("peer5", 6)
|
||||
var peer6 = PeerTest.init("peer6", 5)
|
||||
var peer7 = PeerTest.init("peer7", 4)
|
||||
var peer8 = PeerTest.init("peer8", 3)
|
||||
var peer9 = PeerTest.init("peer9", 2)
|
||||
|
||||
check:
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Incoming) == true
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer4, PeerType.Incoming) == true
|
||||
|
||||
pool.addPeer(peer5, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer8, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer7, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer6, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer9, PeerType.Outgoing) == true
|
||||
|
||||
var total1, total2, total3: seq[PeerTest]
|
||||
var avail1, avail2, avail3: seq[PeerTest]
|
||||
var acqui1, acqui2, acqui3: seq[PeerTest]
|
||||
|
||||
for item in pool.peers():
|
||||
total1.add(item)
|
||||
for item in pool.peers({PeerType.Incoming}):
|
||||
total2.add(item)
|
||||
for item in pool.peers({PeerType.Outgoing}):
|
||||
total3.add(item)
|
||||
|
||||
for item in pool.availablePeers():
|
||||
avail1.add(item)
|
||||
for item in pool.availablePeers({PeerType.Incoming}):
|
||||
avail2.add(item)
|
||||
for item in pool.availablePeers({PeerType.Outgoing}):
|
||||
avail3.add(item)
|
||||
|
||||
for item in pool.acquiredPeers():
|
||||
acqui1.add(item)
|
||||
for item in pool.acquiredPeers({PeerType.Incoming}):
|
||||
acqui2.add(item)
|
||||
for item in pool.acquiredPeers({PeerType.Outgoing}):
|
||||
acqui3.add(item)
|
||||
|
||||
check:
|
||||
len(total1) == 9
|
||||
len(total2) == 4
|
||||
len(total3) == 5
|
||||
len(avail1) == 9
|
||||
len(avail2) == 4
|
||||
len(avail3) == 5
|
||||
len(acqui1) == 0
|
||||
len(acqui2) == 0
|
||||
len(acqui3) == 0
|
||||
|
||||
discard waitFor(pool.acquire({PeerType.Incoming}))
|
||||
discard waitFor(pool.acquire({PeerType.Incoming}))
|
||||
discard waitFor(pool.acquire({PeerType.Outgoing}))
|
||||
|
||||
total1.setLen(0); total2.setLen(0); total3.setLen(0)
|
||||
avail1.setLen(0); avail2.setLen(0); avail3.setLen(0)
|
||||
acqui1.setLen(0); acqui2.setLen(0); acqui3.setLen(0)
|
||||
|
||||
for item in pool.peers():
|
||||
total1.add(item)
|
||||
for item in pool.peers({PeerType.Incoming}):
|
||||
total2.add(item)
|
||||
for item in pool.peers({PeerType.Outgoing}):
|
||||
total3.add(item)
|
||||
|
||||
for item in pool.availablePeers():
|
||||
avail1.add(item)
|
||||
for item in pool.availablePeers({PeerType.Incoming}):
|
||||
avail2.add(item)
|
||||
for item in pool.availablePeers({PeerType.Outgoing}):
|
||||
avail3.add(item)
|
||||
|
||||
for item in pool.acquiredPeers():
|
||||
acqui1.add(item)
|
||||
for item in pool.acquiredPeers({PeerType.Incoming}):
|
||||
acqui2.add(item)
|
||||
for item in pool.acquiredPeers({PeerType.Outgoing}):
|
||||
acqui3.add(item)
|
||||
|
||||
check:
|
||||
len(total1) == 9
|
||||
len(total2) == 4
|
||||
len(total3) == 5
|
||||
len(avail1) == 6
|
||||
len(avail2) == 2
|
||||
len(avail3) == 4
|
||||
len(acqui1) == 3
|
||||
len(acqui2) == 2
|
||||
len(acqui3) == 1
|
Loading…
Reference in New Issue