From 98dc7014736b8b48bed3d5eeb9dbf421fcdfb38f Mon Sep 17 00:00:00 2001 From: cheatfate Date: Fri, 24 Jan 2020 03:35:38 +0200 Subject: [PATCH] Add PeerPool.addPeer async version and tests. --- beacon_chain/peer_pool.nim | 188 +++++++++++++++++++++++++----------- tests/test_peer_pool.nim | 145 ++++++++++++++++++++------- tests/test_sync_manager.nim | 18 ++-- 3 files changed, 254 insertions(+), 97 deletions(-) diff --git a/beacon_chain/peer_pool.nim b/beacon_chain/peer_pool.nim index 316ac682f..0042bff1c 100644 --- a/beacon_chain/peer_pool.nim +++ b/beacon_chain/peer_pool.nim @@ -19,8 +19,10 @@ type cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.} PeerPool*[A, B] = ref object - incNeEvent: AsyncEvent - outNeEvent: AsyncEvent + incNotEmptyEvent: AsyncEvent + outNotEmptyEvent: AsyncEvent + incNotFullEvent: AsyncEvent + outNotFullEvent: AsyncEvent incQueue: HeapQueue[PeerIndex] outQueue: HeapQueue[PeerIndex] registry: Table[B, PeerIndex] @@ -39,27 +41,35 @@ type proc `<`*(a, b: PeerIndex): bool = result = a.cmp(b, a) -proc fireEvent[A, B](pool: PeerPool[A, B], item: PeerItem[A]) {.inline.} = +proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B], + item: PeerItem[A]) {.inline.} = if item.peerType == PeerType.Incoming: - pool.incNeEvent.fire() + pool.incNotEmptyEvent.fire() elif item.peerType == PeerType.Outgoing: - pool.outNeEvent.fire() + pool.outNotEmptyEvent.fire() -proc waitEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]) {.async.} = +proc fireNotFullEvent[A, B](pool: PeerPool[A, B], + item: PeerItem[A]) {.inline.} = + if item.peerType == PeerType.Incoming: + pool.incNotFullEvent.fire() + elif item.peerType == PeerType.Outgoing: + pool.outNotFullEvent.fire() + +proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], + filter: set[PeerType]) {.async.} = if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: - var fut1 = pool.incNeEvent.wait() - var fut2 = pool.outNeEvent.wait() + var fut1 = pool.incNotEmptyEvent.wait() + var fut2 = pool.outNotEmptyEvent.wait() try: discard await one(fut1, fut2) if fut1.finished: if not(fut2.finished): fut2.cancel() - pool.incNeEvent.clear() + pool.incNotEmptyEvent.clear() else: if not(fut1.finished): fut1.cancel() - pool.outNeEvent.clear() + pool.outNotEmptyEvent.clear() except CancelledError: if not(fut1.finished): fut1.cancel() @@ -67,11 +77,20 @@ proc waitEvent[A, B](pool: PeerPool[A, B], fut2.cancel() raise elif PeerType.Incoming in filter: - await pool.incNeEvent.wait() - pool.incNeEvent.clear() + await pool.incNotEmptyEvent.wait() + pool.incNotEmptyEvent.clear() elif PeerType.Outgoing in filter: - await pool.outNeEvent.wait() - pool.outNeEvent.clear() + await pool.outNotEmptyEvent.wait() + pool.outNotEmptyEvent.clear() + +proc waitNotFullEvent[A, B](pool: PeerPool[A, B], + peerType: PeerType) {.async.} = + if peerType == PeerType.Incoming: + await pool.incNotFullEvent.wait() + pool.incNotFullEvent.clear() + elif peerType == PeerType.Outgoing: + await pool.outNotFullEvent.wait() + pool.outNotFullEvent.clear() template getItem[A, B](pool: PeerPool[A, B], filter: set[PeerType]): ptr PeerItem[A] = @@ -126,13 +145,15 @@ proc newPeerPool*[A, B](maxPeers = -1, doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers) res.maxPeersCount = if maxPeers < 0: high(int) - else: maxPeers + else: maxPeers res.maxIncPeersCount = if maxIncomingPeers < 0: high(int) - else: maxIncomingPeers + else: maxIncomingPeers res.maxOutPeersCount = if maxOutgoingPeers < 0: high(int) - else: maxOutgoingPeers - res.incNeEvent = newAsyncEvent() - res.outNeEvent = newAsyncEvent() + else: maxOutgoingPeers + res.incNotEmptyEvent = newAsyncEvent() + res.outNotEmptyEvent = newAsyncEvent() + res.incNotFullEvent = newAsyncEvent() + res.outNotFullEvent = newAsyncEvent() res.incQueue = initHeapQueue[PeerIndex]() res.outQueue = initHeapQueue[PeerIndex]() res.registry = initTable[B, PeerIndex]() @@ -192,6 +213,8 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = elif item[].peerType == PeerType.Outgoing: dec(pool.curOutPeersCount) dec(pool.acqOutPeersCount) + + pool.fireNotFullEvent(item[]) # Cleanup storage with default item, and removing key from hashtable. pool.storage[pindex] = PeerItem[A]() pool.registry.del(key) @@ -212,68 +235,125 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = pool.outQueue.del(i) break dec(pool.curOutPeersCount) + + pool.fireNotFullEvent(item[]) # Cleanup storage with default item, and removing key from hashtable. pool.storage[pindex] = PeerItem[A]() pool.registry.del(key) result = true -proc addPeer*[A, B](pool: PeerPool[A, B], peer: A, peerType: PeerType): bool = +proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, + peerType: PeerType): PeerIndex = + proc onPeerClosed(udata: pointer) {.gcsafe.} = + discard pool.deletePeer(peer) + + var item = PeerItem[A](data: peer, peerType: peerType, + index: len(pool.storage)) + pool.storage.add(item) + var pitem = addr(pool.storage[^1]) + let pindex = PeerIndex(data: item.index, cmp: pool.cmp) + pool.registry[peerKey] = pindex + pitem[].data.getFuture().addCallback(onPeerClosed) + result = pindex + +proc addPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A, peerType: PeerType): bool = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## Procedure returns ``false`` in case ## * if ``peer`` is already closed. + ## * if ``pool`` already has peer ``peer`` inside. ## * if ``pool`` currently has a maximum of peers. ## * if ``pool`` currently has a maximum of `Incoming` or `Outgoing` peers. ## ## Procedure returns ``true`` on success. mixin getKey, getFuture - if peer.getFuture().finished: - return false + result = false + let peerKey = getKey(peer) - if len(pool.registry) >= pool.maxPeersCount: - return false + if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished): + if len(pool.registry) < pool.maxPeersCount: + if peerType == PeerType.Incoming: + if pool.curIncPeersCount < pool.maxIncPeersCount: + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curIncPeersCount) + pool.incQueue.push(pindex) + pool.incNotEmptyEvent.fire() + result = true + elif peerType == PeerType.Outgoing: + if pool.curOutPeersCount < pool.maxOutPeersCount: + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curOutPeersCount) + pool.outQueue.push(pindex) + pool.outNotEmptyEvent.fire() + result = true - var item = PeerItem[A](data: peer, peerType: peerType, - index: len(pool.storage)) - var key = getKey(peer) +proc addPeer*[A, B](pool: PeerPool[A, B], + peer: A, peerType: PeerType): Future[bool] {.async.} = + ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. + ## + ## This procedure will wait for an empty space in PeerPool ``pool``, if + ## PeerPool ``pool`` is full. + ## + ## Procedure returns ``false`` in case: + ## * if ``peer`` is already closed. + ## * if ``pool`` already has peer ``peer`` inside. + ## + ## Procedure returns ``true`` on success. + mixin getKey, getFuture - if not(pool.registry.hasKey(key)): - pool.storage.add(item) - var pitem = addr(pool.storage[^1]) - let pindex = PeerIndex(data: item.index, cmp: pool.cmp) - pool.registry[key] = pindex - - proc onPeerClosed(udata: pointer) {.gcsafe.} = - discard pool.deletePeer(peer) - - pitem[].data.getFuture().addCallback(onPeerClosed) + var res = false + let peerKey = getKey(peer) + if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished): + if len(pool.registry) >= pool.maxPeersCount: + await pool.waitNotFullEvent(peerType) if peerType == PeerType.Incoming: if pool.curIncPeersCount >= pool.maxIncPeersCount: - return false - else: - inc(pool.curIncPeersCount) - pool.incQueue.push(pindex) - pool.incNeEvent.fire() + await pool.waitNotFullEvent(peerType) + + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curIncPeersCount) + pool.incQueue.push(pindex) + pool.incNotEmptyEvent.fire() + res = true elif peerType == PeerType.Outgoing: if pool.curOutPeersCount >= pool.maxOutPeersCount: - return false - else: - inc(pool.curOutPeersCount) - pool.outQueue.push(pindex) - pool.outNeEvent.fire() + await pool.waitNotFullEvent(peerType) - result = true + let pindex = pool.addPeerImpl(peer, peerKey, peerType) + inc(pool.curOutPeersCount) + pool.outQueue.push(pindex) + pool.outNotEmptyEvent.fire() + res = true -proc addIncomingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = + result = res + +proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A): bool {.inline.} = + ## Add incoming peer ``peer`` to PeerPool ``pool``. + ## + ## Returns ``true`` on success. + result = pool.addPeerNoWait(peer, PeerType.Incoming) + +proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B], + peer: A): bool {.inline.} = + ## Add outgoing peer ``peer`` to PeerPool ``pool``. + ## + ## Returns ``true`` on success. + result = pool.addPeerNoWait(peer, PeerType.Outgoing) + +proc addIncomingPeer*[A, B](pool: PeerPool[A, B], + peer: A): Future[bool] {.inline.} = ## Add incoming peer ``peer`` to PeerPool ``pool``. ## ## Returns ``true`` on success. result = pool.addPeer(peer, PeerType.Incoming) -proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = +proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], + peer: A): Future[bool] {.inline.} = ## Add outgoing peer ``peer`` to PeerPool ``pool``. ## ## Returns ``true`` on success. @@ -291,7 +371,7 @@ proc acquire*[A, B](pool: PeerPool[A, B], if PeerType.Outgoing in filter: count = count + len(pool.outQueue) if count == 0: - await pool.waitEvent(filter) + await pool.waitNotEmptyEvent(filter) else: var item = pool.getItem(filter) doAssert(PeerFlags.Acquired notin item[].flags) @@ -341,7 +421,7 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = elif item[].peerType == PeerType.Outgoing: pool.outQueue.push(titem) dec(pool.acqOutPeersCount) - pool.fireEvent(item[]) + pool.fireNotEmptyEvent(item[]) proc release*[A, B](pool: PeerPool[A, B], peers: openarray[A]) {.inline.} = ## Release array of peers ``peers`` back to PeerPool ``pool``. @@ -367,7 +447,7 @@ proc acquire*[A, B](pool: PeerPool[A, B], if PeerType.Outgoing in filter: count = count + len(pool.outQueue) if count == 0: - await pool.waitEvent(filter) + await pool.waitNotEmptyEvent(filter) else: var item = pool.getItem(filter) doAssert(PeerFlags.Acquired notin item[].flags) diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index 56fcd6ed4..b689ad270 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -37,7 +37,7 @@ proc close*(peer: PeerTest) = peer.future.complete() suite "PeerPool testing suite": - timedTest "addPeer() test": + timedTest "addPeerNoWait() test": const peersCount = [ [10, 5, 5, 10, 5, 5], [-1, 5, 5, 10, 5, 5], @@ -47,23 +47,100 @@ suite "PeerPool testing suite": var pool = newPeerPool[PeerTest, PeerTestID](item[0], item[1], item[2]) for i in 0 ..< item[4]: var peer = PeerTest.init("idInc" & $i) - check pool.addIncomingPeer(peer) == true + check pool.addIncomingPeerNoWait(peer) == true for i in 0 ..< item[5]: var peer = PeerTest.init("idOut" & $i) - check pool.addOutgoingPeer(peer) == true + check pool.addOutgoingPeerNoWait(peer) == true var peer = PeerTest.init("idCheck") if item[1] != -1: for i in 0 ..< item[3]: - check pool.addIncomingPeer(peer) == false + check pool.addIncomingPeerNoWait(peer) == false if item[2] != -1: for i in 0 ..< item[3]: - check pool.addOutgoingPeer(peer) == false + check pool.addOutgoingPeerNoWait(peer) == false check: pool.lenAvailable == item[3] pool.lenAvailable({PeerType.Incoming}) == item[4] pool.lenAvailable({PeerType.Outgoing}) == item[5] + + timedTest "addPeer() test": + proc testAddPeer1(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 1, + maxIncomingPeers = 1, + maxOutgoingPeers = 0) + var peer0 = PeerTest.init("idInc0") + var peer1 = PeerTest.init("idOut0") + var peer2 = PeerTest.init("idInc1") + var fut0 = pool.addIncomingPeer(peer0) + var fut1 = pool.addOutgoingPeer(peer1) + var fut2 = pool.addIncomingPeer(peer2) + doAssert(fut0.finished == true and fut0.failed == false) + doAssert(fut1.finished == false) + doAssert(fut2.finished == false) + peer0.close() + await sleepAsync(100.milliseconds) + doAssert(fut1.finished == false) + doAssert(fut2.finished == true and fut2.failed == false) + result = true + + proc testAddPeer2(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 2, + maxIncomingPeers = 1, + maxOutgoingPeers = 1) + var peer0 = PeerTest.init("idInc0") + var peer1 = PeerTest.init("idOut0") + var peer2 = PeerTest.init("idInc1") + var peer3 = PeerTest.init("idOut1") + var fut0 = pool.addIncomingPeer(peer0) + var fut1 = pool.addOutgoingPeer(peer1) + var fut2 = pool.addIncomingPeer(peer2) + var fut3 = pool.addOutgoingPeer(peer3) + doAssert(fut0.finished == true and fut0.failed == false) + doAssert(fut1.finished == true and fut1.failed == false) + doAssert(fut2.finished == false) + doAssert(fut3.finished == false) + peer0.close() + await sleepAsync(100.milliseconds) + doAssert(fut2.finished == true and fut2.failed == false) + doAssert(fut3.finished == false) + peer1.close() + await sleepAsync(100.milliseconds) + doAssert(fut3.finished == true and fut3.failed == false) + result = true + + proc testAddPeer3(): Future[bool] {.async.} = + var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 3, + maxIncomingPeers = 1, + maxOutgoingPeers = 1) + var peer0 = PeerTest.init("idInc0") + var peer1 = PeerTest.init("idInc1") + var peer2 = PeerTest.init("idOut0") + var peer3 = PeerTest.init("idOut1") + + var fut0 = pool.addIncomingPeer(peer0) + var fut1 = pool.addIncomingPeer(peer1) + var fut2 = pool.addOutgoingPeer(peer2) + var fut3 = pool.addOutgoingPeer(peer3) + doAssert(fut0.finished == true and fut0.failed == false) + doAssert(fut1.finished == false) + doAssert(fut2.finished == true and fut2.failed == false) + doAssert(fut3.finished == false) + peer0.close() + await sleepAsync(100.milliseconds) + doAssert(fut1.finished == true and fut1.failed == false) + doAssert(fut3.finished == false) + peer2.close() + await sleepAsync(100.milliseconds) + doAssert(fut3.finished == true and fut3.failed == false) + result = true + + check: + waitFor(testAddPeer1()) == true + waitFor(testAddPeer2()) == true + waitFor(testAddPeer3()) == true + timedTest "Acquire from empty pool": var pool0 = newPeerPool[PeerTest, PeerTestID]() var pool1 = newPeerPool[PeerTest, PeerTestID]() @@ -92,10 +169,10 @@ suite "PeerPool testing suite": var peer21 = PeerTest.init("peer21") var peer22 = PeerTest.init("peer22") check: - pool1.addPeer(peer11, PeerType.Incoming) == true - pool1.addPeer(peer12, PeerType.Incoming) == true - pool2.addPeer(peer21, PeerType.Outgoing) == true - pool2.addPeer(peer22, PeerType.Outgoing) == true + pool1.addPeerNoWait(peer11, PeerType.Incoming) == true + pool1.addPeerNoWait(peer12, PeerType.Incoming) == true + pool2.addPeerNoWait(peer21, PeerType.Outgoing) == true + pool2.addPeerNoWait(peer22, PeerType.Outgoing) == true var itemFut11 = pool1.acquire({PeerType.Outgoing}) var itemFut12 = pool1.acquire(10, {PeerType.Outgoing}) @@ -179,9 +256,9 @@ suite "PeerPool testing suite": var peer = PeerTest.init("peer" & $i, rand(MaxNumber)) # echo repr peer if rand(100) mod 2 == 0: - check pool.addPeer(peer, PeerType.Incoming) == true + check pool.addPeerNoWait(peer, PeerType.Incoming) == true else: - check pool.addPeer(peer, PeerType.Outgoing) == true + check pool.addPeerNoWait(peer, PeerType.Outgoing) == true check waitFor(testAcquireRelease()) == TestsCount @@ -191,7 +268,7 @@ suite "PeerPool testing suite": var peer = PeerTest.init("deletePeer") ## Delete available peer - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -204,7 +281,7 @@ suite "PeerPool testing suite": ## Delete acquired peer peer = PeerTest.init("closingPeer") - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -223,7 +300,7 @@ suite "PeerPool testing suite": ## Force delete acquired peer peer = PeerTest.init("closingPeer") - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -244,7 +321,7 @@ suite "PeerPool testing suite": var peer = PeerTest.init("closingPeer") ## Close available peer - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -259,7 +336,7 @@ suite "PeerPool testing suite": ## Close acquired peer peer = PeerTest.init("closingPeer") - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) doAssert(pool.len == 1) doAssert(pool.lenAvailable == 1) doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0) @@ -292,9 +369,9 @@ suite "PeerPool testing suite": var peer3 = PeerTest.init("peer3", 8) check: - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true pool.lenAvailable == 3 pool.lenAvailable({PeerType.Outgoing}) == 1 pool.lenAvailable({PeerType.Incoming}) == 2 @@ -311,9 +388,9 @@ suite "PeerPool testing suite": pool.len == 0 check: - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true pool.lenAvailable == 3 pool.lenAvailable({PeerType.Outgoing}) == 1 pool.lenAvailable({PeerType.Incoming}) == 2 @@ -339,9 +416,9 @@ suite "PeerPool testing suite": var peer3 = PeerTest.init("peer3", 8) check: - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true pool.hasPeer("peer4") == false pool.hasPeer("peer1") == true pool.hasPeer("peer2") == true @@ -374,16 +451,16 @@ suite "PeerPool testing suite": var peer9 = PeerTest.init("peer9", 2) check: - pool.addPeer(peer2, PeerType.Incoming) == true - pool.addPeer(peer3, PeerType.Incoming) == true - pool.addPeer(peer1, PeerType.Incoming) == true - pool.addPeer(peer4, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Incoming) == true + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer4, PeerType.Incoming) == true - pool.addPeer(peer5, PeerType.Outgoing) == true - pool.addPeer(peer8, PeerType.Outgoing) == true - pool.addPeer(peer7, PeerType.Outgoing) == true - pool.addPeer(peer6, PeerType.Outgoing) == true - pool.addPeer(peer9, PeerType.Outgoing) == true + pool.addPeerNoWait(peer5, PeerType.Outgoing) == true + pool.addPeerNoWait(peer8, PeerType.Outgoing) == true + pool.addPeerNoWait(peer7, PeerType.Outgoing) == true + pool.addPeerNoWait(peer6, PeerType.Outgoing) == true + pool.addPeerNoWait(peer9, PeerType.Outgoing) == true var total1, total2, total3: seq[PeerTest] var avail1, avail2, avail3: seq[PeerTest] diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index a5ab56169..100a9bd50 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -613,7 +613,7 @@ proc syncManagerOnePeerTest(): Future[bool] {.async.} = result = true peer.update(srcChain) - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -672,9 +672,9 @@ proc syncManagerOneSlotTest(): Future[bool] {.async.} = for i in 0 ..< len(peers): peers[i].update(srcChain) - doAssert(pool.addIncomingPeer(peers[0]) == true) - doAssert(pool.addOutgoingPeer(peers[1]) == true) - doAssert(pool.addOutgoingPeer(peers[2]) == true) + doAssert(pool.addIncomingPeerNoWait(peers[0]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[1]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[2]) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -734,9 +734,9 @@ proc syncManagerOneGroupTest(): Future[bool] {.async.} = for i in 0 ..< len(peers): peers[i].update(srcChain) if i mod 2 == 0: - doAssert(pool.addIncomingPeer(peers[i]) == true) + doAssert(pool.addIncomingPeerNoWait(peers[i]) == true) else: - doAssert(pool.addOutgoingPeer(peers[i]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[i]) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -805,9 +805,9 @@ proc syncManagerGroupRecoveryTest(): Future[bool] {.async.} = for i in 0 ..< len(peers): if i mod 2 == 0: - doAssert(pool.addIncomingPeer(peers[i]) == true) + doAssert(pool.addIncomingPeerNoWait(peers[i]) == true) else: - doAssert(pool.addOutgoingPeer(peers[i]) == true) + doAssert(pool.addOutgoingPeerNoWait(peers[i]) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks, @@ -870,7 +870,7 @@ proc syncManagerFailureTest(): Future[bool] {.async.} = dstChain.add(item) result = true - doAssert(pool.addIncomingPeer(peer) == true) + doAssert(pool.addIncomingPeerNoWait(peer) == true) var sman = newSyncManager[SimplePeer, SimplePeerKey](pool, lastLocalSlot, updateBlocks,