diff --git a/connection.go b/connection.go index 54297b56..4195683e 100644 --- a/connection.go +++ b/connection.go @@ -512,7 +512,17 @@ func (cn *connection) request(r request, mw messageWriter) bool { } cn.validReceiveChunks[r] = struct{}{} cn.t.pendingRequests[r]++ - cn.t.lastRequested[r] = time.Now() + cn.t.lastRequested[r] = time.AfterFunc(cn.t.duplicateRequestTimeout, func() { + torrent.Add("duplicate request timeouts", 1) + cn.mu().Lock() + defer cn.mu().Unlock() + delete(cn.t.lastRequested, r) + for cn := range cn.t.conns { + if cn.PeerHasPiece(pieceIndex(r.Index)) { + cn.updateRequests() + } + } + }) cn.updateExpectingChunks() return mw(pp.Message{ Type: pp.Request, @@ -761,13 +771,10 @@ func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool { r := request{pp.Integer(piece), cs} if cn.t.requestStrategy == 3 { - lr := cn.t.lastRequested[r] - if !lr.IsZero() { - if time.Since(lr) < cn.t.duplicateRequestTimeout { - return true - } else { - torrent.Add("requests duplicated due to timeout", 1) - } + if _, ok := cn.t.lastRequested[r]; ok { + // This piece has been requested on another connection, and + // the duplicate request timer is still running. + return true } } return f(r) @@ -1470,7 +1477,10 @@ func (c *connection) deleteRequest(r request) bool { } delete(c.requests, r) c.updateExpectingChunks() - delete(c.t.lastRequested, r) + if t, ok := c.t.lastRequested[r]; ok { + t.Stop() + delete(c.t.lastRequested, r) + } pr := c.t.pendingRequests pr[r]-- n := pr[r] diff --git a/torrent.go b/torrent.go index d7bb2bdb..26da79c5 100644 --- a/torrent.go +++ b/torrent.go @@ -147,7 +147,7 @@ type Torrent struct { pendingRequests map[request]int // The last time we requested a chunk. Deleting the request from any // connection will clear this value. - lastRequested map[request]time.Time + lastRequested map[request]*time.Timer } func (t *Torrent) tickleReaders() { @@ -407,7 +407,7 @@ func (t *Torrent) onSetInfo() { t.gotMetainfo.Set() t.updateWantPeersEvent() t.pendingRequests = make(map[request]int) - t.lastRequested = make(map[request]time.Time) + t.lastRequested = make(map[request]*time.Timer) } // Called when metadata for a torrent becomes available. @@ -1240,6 +1240,9 @@ func (t *Torrent) assertNoPendingRequests() { if len(t.pendingRequests) != 0 { panic(t.pendingRequests) } + if len(t.lastRequested) != 0 { + panic(t.lastRequested) + } } func (t *Torrent) dropConnection(c *connection) {