refactor locking order structure
This commit is contained in:
parent
b6f19a5591
commit
b54202e768
|
@ -66,6 +66,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
|
|||
dl.rllock.Lock()
|
||||
defer dl.rllock.Unlock()
|
||||
|
||||
if addrutil.IsFDCostlyTransport(dj.addr) {
|
||||
dl.fdConsuming--
|
||||
if len(dl.waitingOnFd) > 0 {
|
||||
next := dl.waitingOnFd[0]
|
||||
dl.waitingOnFd = dl.waitingOnFd[1:]
|
||||
if len(dl.waitingOnFd) == 0 {
|
||||
dl.waitingOnFd = nil // clear out memory
|
||||
}
|
||||
dl.fdConsuming++
|
||||
|
||||
go dl.executeDial(next)
|
||||
}
|
||||
}
|
||||
|
||||
// release tokens in reverse order than we take them
|
||||
dl.activePerPeer[dj.peer]--
|
||||
if dl.activePerPeer[dj.peer] == 0 {
|
||||
|
@ -87,17 +101,6 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
|
|||
go dl.executeDial(next)
|
||||
}
|
||||
|
||||
if addrutil.IsFDCostlyTransport(dj.addr) {
|
||||
dl.fdConsuming--
|
||||
if len(dl.waitingOnFd) > 0 {
|
||||
next := dl.waitingOnFd[0]
|
||||
dl.waitingOnFd = dl.waitingOnFd[1:]
|
||||
dl.fdConsuming++
|
||||
|
||||
// now, attempt to take the 'per peer limit' token
|
||||
dl.schedulePerPeerDial(next)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AddDialJob tries to take the needed tokens for starting the given dial job.
|
||||
|
@ -107,6 +110,13 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
|
|||
dl.rllock.Lock()
|
||||
defer dl.rllock.Unlock()
|
||||
|
||||
if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
|
||||
wlist := dl.waitingOnPeerLimit[dj.peer]
|
||||
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
|
||||
return
|
||||
}
|
||||
dl.activePerPeer[dj.peer]++
|
||||
|
||||
if addrutil.IsFDCostlyTransport(dj.addr) {
|
||||
if dl.fdConsuming >= dl.fdLimit {
|
||||
dl.waitingOnFd = append(dl.waitingOnFd, dj)
|
||||
|
@ -117,7 +127,20 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
|
|||
dl.fdConsuming++
|
||||
}
|
||||
|
||||
dl.schedulePerPeerDial(dj)
|
||||
// take second needed token and start dial!
|
||||
go dl.executeDial(dj)
|
||||
}
|
||||
|
||||
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
|
||||
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
|
||||
wlist := dl.waitingOnPeerLimit[j.peer]
|
||||
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
|
||||
return
|
||||
}
|
||||
|
||||
// take second needed token and start dial!
|
||||
dl.activePerPeer[j.peer]++
|
||||
go dl.executeDial(j)
|
||||
}
|
||||
|
||||
// executeDial calls the dialFunc, and reports the result through the response
|
||||
|
@ -135,15 +158,3 @@ func (dl *dialLimiter) executeDial(j *dialJob) {
|
|||
case <-j.ctx.Done():
|
||||
}
|
||||
}
|
||||
|
||||
func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
|
||||
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
|
||||
wlist := dl.waitingOnPeerLimit[j.peer]
|
||||
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
|
||||
return
|
||||
}
|
||||
|
||||
// take second needed token and start dial!
|
||||
dl.activePerPeer[j.peer]++
|
||||
go dl.executeDial(j)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package swarm
|
|||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -262,6 +263,7 @@ func TestTokenRedistribution(t *testing.T) {
|
|||
|
||||
func TestStressLimiter(t *testing.T) {
|
||||
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
|
||||
fmt.Println("dial for peer: ", string(p))
|
||||
if tcpPortOver(a, 1000) {
|
||||
return conn.Conn(nil), nil
|
||||
} else {
|
||||
|
@ -305,6 +307,8 @@ func TestStressLimiter(t *testing.T) {
|
|||
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 1000)
|
||||
fmt.Println("NUM GOROS: ", runtime.NumGoroutine())
|
||||
for i := 0; i < 20; i++ {
|
||||
select {
|
||||
case <-success:
|
||||
|
|
Loading…
Reference in New Issue