RIP active dial tracking
It turns out it is not actually necessary if the limiter short-circuits. Point being is that if the request channel has been closed, there is no active dial any more, at which point we might as well consider it done. This fixes the goroutine leak.
This commit is contained in:
parent
f50fa5792c
commit
ebb9e45a59
|
@ -50,16 +50,15 @@ type dialWorker struct {
|
|||
pending map[ma.Multiaddr]*addrDial
|
||||
resch chan dialResult
|
||||
|
||||
active int
|
||||
done bool // true when the request channel has been closed
|
||||
connected bool // true when a connection has been successfully established
|
||||
|
||||
nextDial []ma.Multiaddr
|
||||
nextDial []ma.Multiaddr
|
||||
|
||||
// ready when we have more addresses to dial (nextDial is not empty)
|
||||
triggerDial <-chan struct{}
|
||||
|
||||
// for testing
|
||||
wg sync.WaitGroup
|
||||
eval <-chan func()
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest) *dialWorker {
|
||||
|
@ -78,23 +77,15 @@ func (w *dialWorker) loop() {
|
|||
defer w.wg.Done()
|
||||
defer w.s.limiter.clearAllPeerDials(w.peer)
|
||||
|
||||
triggerNow := make(chan struct{})
|
||||
close(triggerNow)
|
||||
// used to signal readiness to dial and completion of the dial
|
||||
ready := make(chan struct{})
|
||||
close(ready)
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case req, ok := <-w.reqch:
|
||||
if !ok {
|
||||
// request channel has been closed, wait for pending dials to complete
|
||||
if w.active > 0 {
|
||||
w.done = true
|
||||
w.reqch = nil
|
||||
w.triggerDial = nil
|
||||
continue loop
|
||||
}
|
||||
|
||||
// no active dials, we are done
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -183,7 +174,7 @@ loop:
|
|||
w.nextDial = w.rankAddrs(w.nextDial)
|
||||
|
||||
// trigger a new dial now to account for the new addrs we added
|
||||
w.triggerDial = triggerNow
|
||||
w.triggerDial = ready
|
||||
}
|
||||
|
||||
case <-w.triggerDial:
|
||||
|
@ -193,8 +184,6 @@ loop:
|
|||
err := w.s.dialNextAddr(ad.ctx, w.peer, addr, w.resch)
|
||||
if err != nil {
|
||||
w.dispatchError(ad, err)
|
||||
} else {
|
||||
w.active++
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,25 +191,10 @@ loop:
|
|||
w.triggerDial = nil
|
||||
|
||||
case res := <-w.resch:
|
||||
w.active--
|
||||
|
||||
if res.Conn != nil {
|
||||
w.connected = true
|
||||
}
|
||||
|
||||
if w.done && w.active == 0 {
|
||||
if res.Conn != nil {
|
||||
// we got an actual connection, but the dial has been cancelled
|
||||
// Should we close it? I think not, we should just add it to the swarm
|
||||
_, err := w.s.addConn(res.Conn, network.DirOutbound)
|
||||
if err != nil {
|
||||
// well duh, now we have to close it
|
||||
res.Conn.Close()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
ad := w.pending[res.Addr]
|
||||
|
||||
if res.Conn != nil {
|
||||
|
@ -230,9 +204,6 @@ loop:
|
|||
// oops no, we failed to add it to the swarm
|
||||
res.Conn.Close()
|
||||
w.dispatchError(ad, err)
|
||||
if w.active == 0 && len(w.nextDial) > 0 {
|
||||
w.triggerDial = triggerNow
|
||||
}
|
||||
continue loop
|
||||
}
|
||||
|
||||
|
@ -262,12 +233,6 @@ loop:
|
|||
}
|
||||
|
||||
w.dispatchError(ad, res.Err)
|
||||
if w.active == 0 && len(w.nextDial) > 0 {
|
||||
w.triggerDial = triggerNow
|
||||
}
|
||||
|
||||
case f := <-w.eval:
|
||||
f()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ func (dl *dialLimiter) executeDial(j *dialJob) {
|
|||
select {
|
||||
case j.resp <- dialResult{Conn: con, Addr: j.addr, Err: err}:
|
||||
case <-j.ctx.Done():
|
||||
if err == nil {
|
||||
if con != nil {
|
||||
con.Close()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue