Fix PeerPool issue with peers overflow maxPeers setting. (#1285)

This commit is contained in:
Eugene Kabanov 2020-07-07 12:49:08 +03:00 committed by GitHub
parent 0f55ab13e8
commit 293d990d43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 30 deletions

View File

@ -8,6 +8,9 @@ type
PeerFlags = enum
Acquired, DeleteOnRelease
EventType = enum
NotEmptyEvent, NotFullEvent
PeerItem[T] = object
data: T
peerType: PeerType
@ -62,21 +65,35 @@ iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) =
for peerId, peerIdx in pool.registry:
yield (peerId, pool.storage[peerIdx.data].data)
proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]) {.async.} =
template incomingEvent(eventType: EventType): AsyncEvent =
case eventType
of EventType.NotEmptyEvent:
pool.incNotEmptyEvent
of EventType.NotFullEvent:
pool.incNotFullEvent
template outgoingEvent(eventType: EventType): AsyncEvent =
case eventType
of EventType.NotEmptyEvent:
pool.outNotEmptyEvent
of EventType.NotFullEvent:
pool.outNotFullEvent
proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
filter: set[PeerType]) {.async.} =
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
var fut1 = pool.incNotEmptyEvent.wait()
var fut2 = pool.outNotEmptyEvent.wait()
var fut1 = incomingEvent(eventType).wait()
var fut2 = outgoingEvent(eventType).wait()
try:
discard await one(fut1, fut2)
if fut1.finished:
if not(fut2.finished):
fut2.cancel()
pool.incNotEmptyEvent.clear()
incomingEvent(eventType).clear()
else:
if not(fut1.finished):
fut1.cancel()
pool.outNotEmptyEvent.clear()
outgoingEvent(eventType).clear()
except CancelledError:
if not(fut1.finished):
fut1.cancel()
@ -84,20 +101,19 @@ proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
fut2.cancel()
raise
elif PeerType.Incoming in filter:
await pool.incNotEmptyEvent.wait()
pool.incNotEmptyEvent.clear()
await incomingEvent(eventType).wait()
incomingEvent(eventType).clear()
elif PeerType.Outgoing in filter:
await pool.outNotEmptyEvent.wait()
pool.outNotEmptyEvent.clear()
await outgoingEvent(eventType).wait()
outgoingEvent(eventType).clear()
proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
pool.waitForEvent(EventType.NotEmptyEvent, filter)
proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async.} =
if peerType == PeerType.Incoming:
await pool.incNotFullEvent.wait()
pool.incNotFullEvent.clear()
elif peerType == PeerType.Outgoing:
await pool.outNotFullEvent.wait()
pool.outNotFullEvent.clear()
filter: set[PeerType]): Future[void] =
pool.waitForEvent(EventType.NotFullEvent, filter)
template getItem[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): ptr PeerItem[A] =
@ -336,20 +352,19 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
if len(pool.registry) >= pool.maxPeersCount:
await pool.waitNotFullEvent(peerType)
while len(pool.registry) >= pool.maxPeersCount:
await pool.waitNotFullEvent({PeerType.Incoming, PeerType.Outgoing})
if peerType == PeerType.Incoming:
if pool.curIncPeersCount >= pool.maxIncPeersCount:
await pool.waitNotFullEvent(peerType)
while pool.curIncPeersCount >= pool.maxIncPeersCount:
await pool.waitNotFullEvent({peerType})
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curIncPeersCount)
pool.incQueue.push(pindex)
pool.incNotEmptyEvent.fire()
res = true
elif peerType == PeerType.Outgoing:
if pool.curOutPeersCount >= pool.maxOutPeersCount:
await pool.waitNotFullEvent(peerType)
while pool.curOutPeersCount >= pool.maxOutPeersCount:
await pool.waitNotFullEvent({peerType})
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curOutPeersCount)
pool.outQueue.push(pindex)

View File

@ -80,7 +80,7 @@ suiteReport "PeerPool testing suite":
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
peer0.close()
await sleepAsync(100.milliseconds)
await sleepAsync(10.milliseconds)
doAssert(fut1.finished == false)
doAssert(fut2.finished == true and fut2.failed == false)
result = true
@ -102,11 +102,11 @@ suiteReport "PeerPool testing suite":
doAssert(fut2.finished == false)
doAssert(fut3.finished == false)
peer0.close()
await sleepAsync(100.milliseconds)
await sleepAsync(10.milliseconds)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == false)
peer1.close()
await sleepAsync(100.milliseconds)
await sleepAsync(10.milliseconds)
doAssert(fut3.finished == true and fut3.failed == false)
result = true
@ -128,18 +128,61 @@ suiteReport "PeerPool testing suite":
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == false)
peer0.close()
await sleepAsync(100.milliseconds)
await sleepAsync(10.milliseconds)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut3.finished == false)
peer2.close()
await sleepAsync(100.milliseconds)
await sleepAsync(10.milliseconds)
doAssert(fut3.finished == true and fut3.failed == false)
result = true
proc testAddPeer4(): Future[bool] {.async.} =
var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 3)
var peer0 = PeerTest.init("idInc0")
var peer1 = PeerTest.init("idInc1")
var peer2 = PeerTest.init("idOut0")
var peer3 = PeerTest.init("idOut1")
var peer4 = PeerTest.init("idOut2")
var peer5 = PeerTest.init("idInc2")
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addIncomingPeer(peer1)
var fut2 = pool.addOutgoingPeer(peer2)
var fut3 = pool.addOutgoingPeer(peer3)
var fut4 = pool.addOutgoingPeer(peer4)
var fut5 = pool.addIncomingPeer(peer5)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(fut3.finished == false)
doAssert(fut4.finished == false)
doAssert(fut5.finished == false)
await sleepAsync(10.milliseconds)
doAssert(fut3.finished == false)
doAssert(fut4.finished == false)
doAssert(fut5.finished == false)
peer0.close()
await sleepAsync(10.milliseconds)
doAssert(fut3.finished == true and fut3.failed == false)
doAssert(fut4.finished == false)
doAssert(fut5.finished == false)
peer1.close()
await sleepAsync(10.milliseconds)
doAssert(fut4.finished == true and fut4.failed == false)
doAssert(fut5.finished == false)
peer2.close()
await sleepAsync(10.milliseconds)
doAssert(fut5.finished == true and fut5.failed == false)
result = true
check:
waitFor(testAddPeer1()) == true
waitFor(testAddPeer2()) == true
waitFor(testAddPeer3()) == true
waitFor(testAddPeer4()) == true
timedTest "Acquire from empty pool":
var pool0 = newPeerPool[PeerTest, PeerTestID]()
@ -399,7 +442,7 @@ suiteReport "PeerPool testing suite":
proc testConsumer() {.async.} =
var p = await pool.acquire()
await sleepAsync(100.milliseconds)
await sleepAsync(10.milliseconds)
pool.release(p)
proc testClose(): Future[bool] {.async.} =