2
0
mirror of synced 2025-02-23 06:08:07 +00:00

Fix closing of webseed peers

This commit is contained in:
Matt Joiner 2021-02-09 19:21:54 +11:00
parent 549ab3c160
commit 0cc655deed
2 changed files with 24 additions and 9 deletions

View File

@ -312,6 +312,9 @@ func (cn *Peer) downloadRate() float64 {
func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
// \t isn't preserved in <pre> blocks? // \t isn't preserved in <pre> blocks?
if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ")
}
fmt.Fprintln(w, cn.connStatusString()) fmt.Fprintln(w, cn.connStatusString())
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived), eventAgeString(cn.lastMessageReceived),
@ -787,11 +790,14 @@ func (cn *Peer) shouldRequestWithoutBias() bool {
return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection()) return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
} }
func (cn *Peer) iterPendingPieces(f func(pieceIndex) bool) bool { func (cn *Peer) iterPendingPieces(f func(pieceIndex) bool) {
if !cn.t.haveInfo() { if !cn.t.haveInfo() {
return false return
} }
return cn.t.requestStrategy.iterPendingPieces(cn, f) if cn.closed.IsSet() {
return
}
cn.t.requestStrategy.iterPendingPieces(cn, f)
} }
func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) { func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) {
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })

View File

@ -1,6 +1,8 @@
package torrent package torrent
import ( import (
"context"
"errors"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
@ -10,7 +12,6 @@ import (
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/webseed" "github.com/anacrolix/torrent/webseed"
"github.com/pkg/errors"
) )
type webseedPeer struct { type webseedPeer struct {
@ -64,9 +65,11 @@ func (ws *webseedPeer) request(r Request) bool {
func (ws *webseedPeer) doRequest(r Request) { func (ws *webseedPeer) doRequest(r Request) {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.activeRequests[r] = webseedRequest ws.activeRequests[r] = webseedRequest
ws.requesterCond.L.Unlock() func() {
ws.requestResultHandler(r, webseedRequest) ws.requesterCond.L.Unlock()
ws.requesterCond.L.Lock() defer ws.requesterCond.L.Lock()
ws.requestResultHandler(r, webseedRequest)
}()
delete(ws.activeRequests, r) delete(ws.activeRequests, r)
} }
@ -99,6 +102,10 @@ func (ws *webseedPeer) updateRequests() {
} }
func (ws *webseedPeer) onClose() { func (ws *webseedPeer) onClose() {
ws.peer.logger.Print("closing")
for _, r := range ws.activeRequests {
r.Cancel()
}
ws.requesterCond.Broadcast() ws.requesterCond.Broadcast()
} }
@ -107,12 +114,14 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
ws.peer.t.cl.lock() ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock() defer ws.peer.t.cl.unlock()
if result.Err != nil { if result.Err != nil {
ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err) if !errors.Is(result.Err, context.Canceled) {
ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
}
// Always close for now. We need to filter out temporary errors, but this is a nightmare in // Always close for now. We need to filter out temporary errors, but this is a nightmare in
// Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection // Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection
// algorithm. // algorithm.
const closeOnAllErrors = false const closeOnAllErrors = false
if closeOnAllErrors || strings.Contains(errors.Cause(result.Err).Error(), "unsupported protocol scheme") { if closeOnAllErrors || strings.Contains(result.Err.Error(), "unsupported protocol scheme") {
ws.peer.close() ws.peer.close()
} else { } else {
ws.peer.remoteRejectedRequest(r) ws.peer.remoteRejectedRequest(r)