Refresh requests after a second of no updates

This commit is contained in:
Matt Joiner 2021-10-18 18:40:33 +11:00
parent 700542de89
commit 1201ccc53b
3 changed files with 24 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import (
"expvar" "expvar"
"fmt" "fmt"
"io" "io"
"math"
"net" "net"
"net/http" "net/http"
"sort" "sort"
@ -956,6 +957,16 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
defer t.dropConnection(c) defer t.dropConnection(c)
c.startWriter() c.startWriter()
cl.sendInitialMessages(c, t) cl.sendInitialMessages(c, t)
c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, func() {
if c.needRequestUpdate != "" {
return
}
if c.actualRequestState.Requests.IsEmpty() {
panic("updateRequestsTimer should have been stopped")
}
c.updateRequests("updateRequestsTimer")
})
c.updateRequestsTimer.Stop()
err := c.mainReadLoop() err := c.mainReadLoop()
if err != nil { if err != nil {
return fmt.Errorf("main read loop: %w", err) return fmt.Errorf("main read loop: %w", err)

View File

@ -84,6 +84,7 @@ type Peer struct {
// Stuff controlled by the local peer. // Stuff controlled by the local peer.
needRequestUpdate string needRequestUpdate string
actualRequestState requestState actualRequestState requestState
updateRequestsTimer *time.Timer
cancelledRequests roaring.Bitmap cancelledRequests roaring.Bitmap
lastBecameInterested time.Time lastBecameInterested time.Time
priorInterest time.Duration priorInterest time.Duration
@ -414,6 +415,9 @@ func (cn *PeerConn) onClose() {
if cn.pex.IsEnabled() { if cn.pex.IsEnabled() {
cn.pex.Close() cn.pex.Close()
} }
if cn.updateRequestsTimer != nil {
cn.updateRequestsTimer.Stop()
}
cn.tickleWriter() cn.tickleWriter()
if cn.conn != nil { if cn.conn != nil {
cn.conn.Close() cn.conn.Close()

View File

@ -253,7 +253,7 @@ func (p *Peer) applyNextRequestState() bool {
} }
func (p *Peer) applyRequestState(next requestState) bool { func (p *Peer) applyRequestState(next requestState) bool {
current := p.actualRequestState current := &p.actualRequestState
if !p.setInterested(next.Interested) { if !p.setInterested(next.Interested) {
return false return false
} }
@ -268,8 +268,9 @@ func (p *Peer) applyRequestState(next requestState) bool {
} }
next.Requests.Iterate(func(req uint32) bool { next.Requests.Iterate(func(req uint32) bool {
if p.cancelledRequests.Contains(req) { if p.cancelledRequests.Contains(req) {
log.Printf("waiting for cancelled request %v", req) // Waiting for a reject or piece message, which will suitably trigger us to update our
return false // requests, so we can skip this one with no additional consideration.
return true
} }
if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]", log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]",
@ -288,6 +289,11 @@ func (p *Peer) applyRequestState(next requestState) bool {
}) })
if more { if more {
p.needRequestUpdate = "" p.needRequestUpdate = ""
if current.Requests.IsEmpty() {
p.updateRequestsTimer.Stop()
} else {
p.updateRequestsTimer.Reset(time.Second)
}
} }
return more return more
} }