Add PeerPool.addPeer async version and tests.
This commit is contained in:
parent
8b229d68ad
commit
98dc701473
|
@ -19,8 +19,10 @@ type
|
|||
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
|
||||
|
||||
PeerPool*[A, B] = ref object
|
||||
incNeEvent: AsyncEvent
|
||||
outNeEvent: AsyncEvent
|
||||
incNotEmptyEvent: AsyncEvent
|
||||
outNotEmptyEvent: AsyncEvent
|
||||
incNotFullEvent: AsyncEvent
|
||||
outNotFullEvent: AsyncEvent
|
||||
incQueue: HeapQueue[PeerIndex]
|
||||
outQueue: HeapQueue[PeerIndex]
|
||||
registry: Table[B, PeerIndex]
|
||||
|
@ -39,27 +41,35 @@ type
|
|||
proc `<`*(a, b: PeerIndex): bool =
|
||||
result = a.cmp(b, a)
|
||||
|
||||
proc fireEvent[A, B](pool: PeerPool[A, B], item: PeerItem[A]) {.inline.} =
|
||||
proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B],
|
||||
item: PeerItem[A]) {.inline.} =
|
||||
if item.peerType == PeerType.Incoming:
|
||||
pool.incNeEvent.fire()
|
||||
pool.incNotEmptyEvent.fire()
|
||||
elif item.peerType == PeerType.Outgoing:
|
||||
pool.outNeEvent.fire()
|
||||
pool.outNotEmptyEvent.fire()
|
||||
|
||||
proc waitEvent[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]) {.async.} =
|
||||
proc fireNotFullEvent[A, B](pool: PeerPool[A, B],
|
||||
item: PeerItem[A]) {.inline.} =
|
||||
if item.peerType == PeerType.Incoming:
|
||||
pool.incNotFullEvent.fire()
|
||||
elif item.peerType == PeerType.Outgoing:
|
||||
pool.outNotFullEvent.fire()
|
||||
|
||||
proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]) {.async.} =
|
||||
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
|
||||
var fut1 = pool.incNeEvent.wait()
|
||||
var fut2 = pool.outNeEvent.wait()
|
||||
var fut1 = pool.incNotEmptyEvent.wait()
|
||||
var fut2 = pool.outNotEmptyEvent.wait()
|
||||
try:
|
||||
discard await one(fut1, fut2)
|
||||
if fut1.finished:
|
||||
if not(fut2.finished):
|
||||
fut2.cancel()
|
||||
pool.incNeEvent.clear()
|
||||
pool.incNotEmptyEvent.clear()
|
||||
else:
|
||||
if not(fut1.finished):
|
||||
fut1.cancel()
|
||||
pool.outNeEvent.clear()
|
||||
pool.outNotEmptyEvent.clear()
|
||||
except CancelledError:
|
||||
if not(fut1.finished):
|
||||
fut1.cancel()
|
||||
|
@ -67,11 +77,20 @@ proc waitEvent[A, B](pool: PeerPool[A, B],
|
|||
fut2.cancel()
|
||||
raise
|
||||
elif PeerType.Incoming in filter:
|
||||
await pool.incNeEvent.wait()
|
||||
pool.incNeEvent.clear()
|
||||
await pool.incNotEmptyEvent.wait()
|
||||
pool.incNotEmptyEvent.clear()
|
||||
elif PeerType.Outgoing in filter:
|
||||
await pool.outNeEvent.wait()
|
||||
pool.outNeEvent.clear()
|
||||
await pool.outNotEmptyEvent.wait()
|
||||
pool.outNotEmptyEvent.clear()
|
||||
|
||||
proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
|
||||
peerType: PeerType) {.async.} =
|
||||
if peerType == PeerType.Incoming:
|
||||
await pool.incNotFullEvent.wait()
|
||||
pool.incNotFullEvent.clear()
|
||||
elif peerType == PeerType.Outgoing:
|
||||
await pool.outNotFullEvent.wait()
|
||||
pool.outNotFullEvent.clear()
|
||||
|
||||
template getItem[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]): ptr PeerItem[A] =
|
||||
|
@ -126,13 +145,15 @@ proc newPeerPool*[A, B](maxPeers = -1,
|
|||
doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers)
|
||||
|
||||
res.maxPeersCount = if maxPeers < 0: high(int)
|
||||
else: maxPeers
|
||||
else: maxPeers
|
||||
res.maxIncPeersCount = if maxIncomingPeers < 0: high(int)
|
||||
else: maxIncomingPeers
|
||||
else: maxIncomingPeers
|
||||
res.maxOutPeersCount = if maxOutgoingPeers < 0: high(int)
|
||||
else: maxOutgoingPeers
|
||||
res.incNeEvent = newAsyncEvent()
|
||||
res.outNeEvent = newAsyncEvent()
|
||||
else: maxOutgoingPeers
|
||||
res.incNotEmptyEvent = newAsyncEvent()
|
||||
res.outNotEmptyEvent = newAsyncEvent()
|
||||
res.incNotFullEvent = newAsyncEvent()
|
||||
res.outNotFullEvent = newAsyncEvent()
|
||||
res.incQueue = initHeapQueue[PeerIndex]()
|
||||
res.outQueue = initHeapQueue[PeerIndex]()
|
||||
res.registry = initTable[B, PeerIndex]()
|
||||
|
@ -192,6 +213,8 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
|||
elif item[].peerType == PeerType.Outgoing:
|
||||
dec(pool.curOutPeersCount)
|
||||
dec(pool.acqOutPeersCount)
|
||||
|
||||
pool.fireNotFullEvent(item[])
|
||||
# Cleanup storage with default item, and removing key from hashtable.
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
|
@ -212,68 +235,125 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
|||
pool.outQueue.del(i)
|
||||
break
|
||||
dec(pool.curOutPeersCount)
|
||||
|
||||
pool.fireNotFullEvent(item[])
|
||||
# Cleanup storage with default item, and removing key from hashtable.
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
|
||||
result = true
|
||||
|
||||
proc addPeer*[A, B](pool: PeerPool[A, B], peer: A, peerType: PeerType): bool =
|
||||
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
|
||||
peerType: PeerType): PeerIndex =
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
discard pool.deletePeer(peer)
|
||||
|
||||
var item = PeerItem[A](data: peer, peerType: peerType,
|
||||
index: len(pool.storage))
|
||||
pool.storage.add(item)
|
||||
var pitem = addr(pool.storage[^1])
|
||||
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
|
||||
pool.registry[peerKey] = pindex
|
||||
pitem[].data.getFuture().addCallback(onPeerClosed)
|
||||
result = pindex
|
||||
|
||||
proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
peer: A, peerType: PeerType): bool =
|
||||
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
|
||||
##
|
||||
## Procedure returns ``false`` in case
|
||||
## * if ``peer`` is already closed.
|
||||
## * if ``pool`` already has peer ``peer`` inside.
|
||||
## * if ``pool`` currently has a maximum of peers.
|
||||
## * if ``pool`` currently has a maximum of `Incoming` or `Outgoing` peers.
|
||||
##
|
||||
## Procedure returns ``true`` on success.
|
||||
mixin getKey, getFuture
|
||||
|
||||
if peer.getFuture().finished:
|
||||
return false
|
||||
result = false
|
||||
let peerKey = getKey(peer)
|
||||
|
||||
if len(pool.registry) >= pool.maxPeersCount:
|
||||
return false
|
||||
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
|
||||
if len(pool.registry) < pool.maxPeersCount:
|
||||
if peerType == PeerType.Incoming:
|
||||
if pool.curIncPeersCount < pool.maxIncPeersCount:
|
||||
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
|
||||
inc(pool.curIncPeersCount)
|
||||
pool.incQueue.push(pindex)
|
||||
pool.incNotEmptyEvent.fire()
|
||||
result = true
|
||||
elif peerType == PeerType.Outgoing:
|
||||
if pool.curOutPeersCount < pool.maxOutPeersCount:
|
||||
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
|
||||
inc(pool.curOutPeersCount)
|
||||
pool.outQueue.push(pindex)
|
||||
pool.outNotEmptyEvent.fire()
|
||||
result = true
|
||||
|
||||
var item = PeerItem[A](data: peer, peerType: peerType,
|
||||
index: len(pool.storage))
|
||||
var key = getKey(peer)
|
||||
proc addPeer*[A, B](pool: PeerPool[A, B],
|
||||
peer: A, peerType: PeerType): Future[bool] {.async.} =
|
||||
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
|
||||
##
|
||||
## This procedure will wait for an empty space in PeerPool ``pool``, if
|
||||
## PeerPool ``pool`` is full.
|
||||
##
|
||||
## Procedure returns ``false`` in case:
|
||||
## * if ``peer`` is already closed.
|
||||
## * if ``pool`` already has peer ``peer`` inside.
|
||||
##
|
||||
## Procedure returns ``true`` on success.
|
||||
mixin getKey, getFuture
|
||||
|
||||
if not(pool.registry.hasKey(key)):
|
||||
pool.storage.add(item)
|
||||
var pitem = addr(pool.storage[^1])
|
||||
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
|
||||
pool.registry[key] = pindex
|
||||
|
||||
proc onPeerClosed(udata: pointer) {.gcsafe.} =
|
||||
discard pool.deletePeer(peer)
|
||||
|
||||
pitem[].data.getFuture().addCallback(onPeerClosed)
|
||||
var res = false
|
||||
let peerKey = getKey(peer)
|
||||
|
||||
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
|
||||
if len(pool.registry) >= pool.maxPeersCount:
|
||||
await pool.waitNotFullEvent(peerType)
|
||||
if peerType == PeerType.Incoming:
|
||||
if pool.curIncPeersCount >= pool.maxIncPeersCount:
|
||||
return false
|
||||
else:
|
||||
inc(pool.curIncPeersCount)
|
||||
pool.incQueue.push(pindex)
|
||||
pool.incNeEvent.fire()
|
||||
await pool.waitNotFullEvent(peerType)
|
||||
|
||||
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
|
||||
inc(pool.curIncPeersCount)
|
||||
pool.incQueue.push(pindex)
|
||||
pool.incNotEmptyEvent.fire()
|
||||
res = true
|
||||
elif peerType == PeerType.Outgoing:
|
||||
if pool.curOutPeersCount >= pool.maxOutPeersCount:
|
||||
return false
|
||||
else:
|
||||
inc(pool.curOutPeersCount)
|
||||
pool.outQueue.push(pindex)
|
||||
pool.outNeEvent.fire()
|
||||
await pool.waitNotFullEvent(peerType)
|
||||
|
||||
result = true
|
||||
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
|
||||
inc(pool.curOutPeersCount)
|
||||
pool.outQueue.push(pindex)
|
||||
pool.outNotEmptyEvent.fire()
|
||||
res = true
|
||||
|
||||
proc addIncomingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
result = res
|
||||
|
||||
proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): bool {.inline.} =
|
||||
## Add incoming peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeerNoWait(peer, PeerType.Incoming)
|
||||
|
||||
proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): bool {.inline.} =
|
||||
## Add outgoing peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeerNoWait(peer, PeerType.Outgoing)
|
||||
|
||||
proc addIncomingPeer*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): Future[bool] {.inline.} =
|
||||
## Add incoming peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeer(peer, PeerType.Incoming)
|
||||
|
||||
proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
proc addOutgoingPeer*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): Future[bool] {.inline.} =
|
||||
## Add outgoing peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
|
@ -291,7 +371,7 @@ proc acquire*[A, B](pool: PeerPool[A, B],
|
|||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count == 0:
|
||||
await pool.waitEvent(filter)
|
||||
await pool.waitNotEmptyEvent(filter)
|
||||
else:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
|
@ -341,7 +421,7 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) =
|
|||
elif item[].peerType == PeerType.Outgoing:
|
||||
pool.outQueue.push(titem)
|
||||
dec(pool.acqOutPeersCount)
|
||||
pool.fireEvent(item[])
|
||||
pool.fireNotEmptyEvent(item[])
|
||||
|
||||
proc release*[A, B](pool: PeerPool[A, B], peers: openarray[A]) {.inline.} =
|
||||
## Release array of peers ``peers`` back to PeerPool ``pool``.
|
||||
|
@ -367,7 +447,7 @@ proc acquire*[A, B](pool: PeerPool[A, B],
|
|||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count == 0:
|
||||
await pool.waitEvent(filter)
|
||||
await pool.waitNotEmptyEvent(filter)
|
||||
else:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
|
|
|
@ -37,7 +37,7 @@ proc close*(peer: PeerTest) =
|
|||
peer.future.complete()
|
||||
|
||||
suite "PeerPool testing suite":
|
||||
timedTest "addPeer() test":
|
||||
timedTest "addPeerNoWait() test":
|
||||
const peersCount = [
|
||||
[10, 5, 5, 10, 5, 5],
|
||||
[-1, 5, 5, 10, 5, 5],
|
||||
|
@ -47,23 +47,100 @@ suite "PeerPool testing suite":
|
|||
var pool = newPeerPool[PeerTest, PeerTestID](item[0], item[1], item[2])
|
||||
for i in 0 ..< item[4]:
|
||||
var peer = PeerTest.init("idInc" & $i)
|
||||
check pool.addIncomingPeer(peer) == true
|
||||
check pool.addIncomingPeerNoWait(peer) == true
|
||||
|
||||
for i in 0 ..< item[5]:
|
||||
var peer = PeerTest.init("idOut" & $i)
|
||||
check pool.addOutgoingPeer(peer) == true
|
||||
check pool.addOutgoingPeerNoWait(peer) == true
|
||||
|
||||
var peer = PeerTest.init("idCheck")
|
||||
if item[1] != -1:
|
||||
for i in 0 ..< item[3]:
|
||||
check pool.addIncomingPeer(peer) == false
|
||||
check pool.addIncomingPeerNoWait(peer) == false
|
||||
if item[2] != -1:
|
||||
for i in 0 ..< item[3]:
|
||||
check pool.addOutgoingPeer(peer) == false
|
||||
check pool.addOutgoingPeerNoWait(peer) == false
|
||||
check:
|
||||
pool.lenAvailable == item[3]
|
||||
pool.lenAvailable({PeerType.Incoming}) == item[4]
|
||||
pool.lenAvailable({PeerType.Outgoing}) == item[5]
|
||||
|
||||
timedTest "addPeer() test":
|
||||
proc testAddPeer1(): Future[bool] {.async.} =
|
||||
var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 1,
|
||||
maxIncomingPeers = 1,
|
||||
maxOutgoingPeers = 0)
|
||||
var peer0 = PeerTest.init("idInc0")
|
||||
var peer1 = PeerTest.init("idOut0")
|
||||
var peer2 = PeerTest.init("idInc1")
|
||||
var fut0 = pool.addIncomingPeer(peer0)
|
||||
var fut1 = pool.addOutgoingPeer(peer1)
|
||||
var fut2 = pool.addIncomingPeer(peer2)
|
||||
doAssert(fut0.finished == true and fut0.failed == false)
|
||||
doAssert(fut1.finished == false)
|
||||
doAssert(fut2.finished == false)
|
||||
peer0.close()
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(fut1.finished == false)
|
||||
doAssert(fut2.finished == true and fut2.failed == false)
|
||||
result = true
|
||||
|
||||
proc testAddPeer2(): Future[bool] {.async.} =
|
||||
var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 2,
|
||||
maxIncomingPeers = 1,
|
||||
maxOutgoingPeers = 1)
|
||||
var peer0 = PeerTest.init("idInc0")
|
||||
var peer1 = PeerTest.init("idOut0")
|
||||
var peer2 = PeerTest.init("idInc1")
|
||||
var peer3 = PeerTest.init("idOut1")
|
||||
var fut0 = pool.addIncomingPeer(peer0)
|
||||
var fut1 = pool.addOutgoingPeer(peer1)
|
||||
var fut2 = pool.addIncomingPeer(peer2)
|
||||
var fut3 = pool.addOutgoingPeer(peer3)
|
||||
doAssert(fut0.finished == true and fut0.failed == false)
|
||||
doAssert(fut1.finished == true and fut1.failed == false)
|
||||
doAssert(fut2.finished == false)
|
||||
doAssert(fut3.finished == false)
|
||||
peer0.close()
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(fut2.finished == true and fut2.failed == false)
|
||||
doAssert(fut3.finished == false)
|
||||
peer1.close()
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(fut3.finished == true and fut3.failed == false)
|
||||
result = true
|
||||
|
||||
proc testAddPeer3(): Future[bool] {.async.} =
|
||||
var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 3,
|
||||
maxIncomingPeers = 1,
|
||||
maxOutgoingPeers = 1)
|
||||
var peer0 = PeerTest.init("idInc0")
|
||||
var peer1 = PeerTest.init("idInc1")
|
||||
var peer2 = PeerTest.init("idOut0")
|
||||
var peer3 = PeerTest.init("idOut1")
|
||||
|
||||
var fut0 = pool.addIncomingPeer(peer0)
|
||||
var fut1 = pool.addIncomingPeer(peer1)
|
||||
var fut2 = pool.addOutgoingPeer(peer2)
|
||||
var fut3 = pool.addOutgoingPeer(peer3)
|
||||
doAssert(fut0.finished == true and fut0.failed == false)
|
||||
doAssert(fut1.finished == false)
|
||||
doAssert(fut2.finished == true and fut2.failed == false)
|
||||
doAssert(fut3.finished == false)
|
||||
peer0.close()
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(fut1.finished == true and fut1.failed == false)
|
||||
doAssert(fut3.finished == false)
|
||||
peer2.close()
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(fut3.finished == true and fut3.failed == false)
|
||||
result = true
|
||||
|
||||
check:
|
||||
waitFor(testAddPeer1()) == true
|
||||
waitFor(testAddPeer2()) == true
|
||||
waitFor(testAddPeer3()) == true
|
||||
|
||||
timedTest "Acquire from empty pool":
|
||||
var pool0 = newPeerPool[PeerTest, PeerTestID]()
|
||||
var pool1 = newPeerPool[PeerTest, PeerTestID]()
|
||||
|
@ -92,10 +169,10 @@ suite "PeerPool testing suite":
|
|||
var peer21 = PeerTest.init("peer21")
|
||||
var peer22 = PeerTest.init("peer22")
|
||||
check:
|
||||
pool1.addPeer(peer11, PeerType.Incoming) == true
|
||||
pool1.addPeer(peer12, PeerType.Incoming) == true
|
||||
pool2.addPeer(peer21, PeerType.Outgoing) == true
|
||||
pool2.addPeer(peer22, PeerType.Outgoing) == true
|
||||
pool1.addPeerNoWait(peer11, PeerType.Incoming) == true
|
||||
pool1.addPeerNoWait(peer12, PeerType.Incoming) == true
|
||||
pool2.addPeerNoWait(peer21, PeerType.Outgoing) == true
|
||||
pool2.addPeerNoWait(peer22, PeerType.Outgoing) == true
|
||||
|
||||
var itemFut11 = pool1.acquire({PeerType.Outgoing})
|
||||
var itemFut12 = pool1.acquire(10, {PeerType.Outgoing})
|
||||
|
@ -179,9 +256,9 @@ suite "PeerPool testing suite":
|
|||
var peer = PeerTest.init("peer" & $i, rand(MaxNumber))
|
||||
# echo repr peer
|
||||
if rand(100) mod 2 == 0:
|
||||
check pool.addPeer(peer, PeerType.Incoming) == true
|
||||
check pool.addPeerNoWait(peer, PeerType.Incoming) == true
|
||||
else:
|
||||
check pool.addPeer(peer, PeerType.Outgoing) == true
|
||||
check pool.addPeerNoWait(peer, PeerType.Outgoing) == true
|
||||
|
||||
check waitFor(testAcquireRelease()) == TestsCount
|
||||
|
||||
|
@ -191,7 +268,7 @@ suite "PeerPool testing suite":
|
|||
var peer = PeerTest.init("deletePeer")
|
||||
|
||||
## Delete available peer
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
|
@ -204,7 +281,7 @@ suite "PeerPool testing suite":
|
|||
|
||||
## Delete acquired peer
|
||||
peer = PeerTest.init("closingPeer")
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
|
@ -223,7 +300,7 @@ suite "PeerPool testing suite":
|
|||
|
||||
## Force delete acquired peer
|
||||
peer = PeerTest.init("closingPeer")
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
|
@ -244,7 +321,7 @@ suite "PeerPool testing suite":
|
|||
var peer = PeerTest.init("closingPeer")
|
||||
|
||||
## Close available peer
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
|
@ -259,7 +336,7 @@ suite "PeerPool testing suite":
|
|||
|
||||
## Close acquired peer
|
||||
peer = PeerTest.init("closingPeer")
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
doAssert(pool.len == 1)
|
||||
doAssert(pool.lenAvailable == 1)
|
||||
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
|
||||
|
@ -292,9 +369,9 @@ suite "PeerPool testing suite":
|
|||
var peer3 = PeerTest.init("peer3", 8)
|
||||
|
||||
check:
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
|
||||
pool.lenAvailable == 3
|
||||
pool.lenAvailable({PeerType.Outgoing}) == 1
|
||||
pool.lenAvailable({PeerType.Incoming}) == 2
|
||||
|
@ -311,9 +388,9 @@ suite "PeerPool testing suite":
|
|||
pool.len == 0
|
||||
|
||||
check:
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
|
||||
pool.lenAvailable == 3
|
||||
pool.lenAvailable({PeerType.Outgoing}) == 1
|
||||
pool.lenAvailable({PeerType.Incoming}) == 2
|
||||
|
@ -339,9 +416,9 @@ suite "PeerPool testing suite":
|
|||
var peer3 = PeerTest.init("peer3", 8)
|
||||
|
||||
check:
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
|
||||
pool.hasPeer("peer4") == false
|
||||
pool.hasPeer("peer1") == true
|
||||
pool.hasPeer("peer2") == true
|
||||
|
@ -374,16 +451,16 @@ suite "PeerPool testing suite":
|
|||
var peer9 = PeerTest.init("peer9", 2)
|
||||
|
||||
check:
|
||||
pool.addPeer(peer2, PeerType.Incoming) == true
|
||||
pool.addPeer(peer3, PeerType.Incoming) == true
|
||||
pool.addPeer(peer1, PeerType.Incoming) == true
|
||||
pool.addPeer(peer4, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer3, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
|
||||
pool.addPeerNoWait(peer4, PeerType.Incoming) == true
|
||||
|
||||
pool.addPeer(peer5, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer8, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer7, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer6, PeerType.Outgoing) == true
|
||||
pool.addPeer(peer9, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer5, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer8, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer7, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer6, PeerType.Outgoing) == true
|
||||
pool.addPeerNoWait(peer9, PeerType.Outgoing) == true
|
||||
|
||||
var total1, total2, total3: seq[PeerTest]
|
||||
var avail1, avail2, avail3: seq[PeerTest]
|
||||
|
|
|
@ -613,7 +613,7 @@ proc syncManagerOnePeerTest(): Future[bool] {.async.} =
|
|||
result = true
|
||||
|
||||
peer.update(srcChain)
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
|
@ -672,9 +672,9 @@ proc syncManagerOneSlotTest(): Future[bool] {.async.} =
|
|||
|
||||
for i in 0 ..< len(peers):
|
||||
peers[i].update(srcChain)
|
||||
doAssert(pool.addIncomingPeer(peers[0]) == true)
|
||||
doAssert(pool.addOutgoingPeer(peers[1]) == true)
|
||||
doAssert(pool.addOutgoingPeer(peers[2]) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peers[0]) == true)
|
||||
doAssert(pool.addOutgoingPeerNoWait(peers[1]) == true)
|
||||
doAssert(pool.addOutgoingPeerNoWait(peers[2]) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
|
@ -734,9 +734,9 @@ proc syncManagerOneGroupTest(): Future[bool] {.async.} =
|
|||
for i in 0 ..< len(peers):
|
||||
peers[i].update(srcChain)
|
||||
if i mod 2 == 0:
|
||||
doAssert(pool.addIncomingPeer(peers[i]) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peers[i]) == true)
|
||||
else:
|
||||
doAssert(pool.addOutgoingPeer(peers[i]) == true)
|
||||
doAssert(pool.addOutgoingPeerNoWait(peers[i]) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
|
@ -805,9 +805,9 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} =
|
|||
|
||||
for i in 0 ..< len(peers):
|
||||
if i mod 2 == 0:
|
||||
doAssert(pool.addIncomingPeer(peers[i]) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peers[i]) == true)
|
||||
else:
|
||||
doAssert(pool.addOutgoingPeer(peers[i]) == true)
|
||||
doAssert(pool.addOutgoingPeerNoWait(peers[i]) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
|
@ -870,7 +870,7 @@ proc syncManagerFailureTest(): Future[bool] {.async.} =
|
|||
dstChain.add(item)
|
||||
result = true
|
||||
|
||||
doAssert(pool.addIncomingPeer(peer) == true)
|
||||
doAssert(pool.addIncomingPeerNoWait(peer) == true)
|
||||
|
||||
var sman = newSyncManager[SimplePeer,
|
||||
SimplePeerKey](pool, lastLocalSlot, updateBlocks,
|
||||
|
|
Loading…
Reference in New Issue