Rework torrent tracker scraping
New scrapers are added anytime trackers are added to a torrent. In the future they will also be stopped as soon as they're removed. All trackers are concurrently scraped, the old-style of sticking to one tracker that works is abandoned for now.
This commit is contained in:
parent
7f6f921aa9
commit
20870ec4ff
50
client.go
50
client.go
@ -1372,6 +1372,7 @@ func (cl *Client) wantConns(t *Torrent) bool {
|
||||
}
|
||||
|
||||
func (cl *Client) openNewConns(t *Torrent) {
|
||||
defer t.updateWantPeersEvent()
|
||||
for len(t.peers) != 0 {
|
||||
if !cl.wantConns(t) {
|
||||
return
|
||||
@ -1389,7 +1390,6 @@ func (cl *Client) openNewConns(t *Torrent) {
|
||||
delete(t.peers, k)
|
||||
cl.initiateConn(p, t)
|
||||
}
|
||||
t.wantPeers.Broadcast()
|
||||
}
|
||||
|
||||
func (cl *Client) addPeers(t *Torrent, peers []Peer) {
|
||||
@ -1426,7 +1426,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
|
||||
|
||||
storageOpener: cl.defaultStorage,
|
||||
}
|
||||
t.wantPeers.L = &cl.mu
|
||||
return
|
||||
}
|
||||
|
||||
@ -1447,26 +1446,6 @@ func shuffleTier(tier trackerTier) {
|
||||
}
|
||||
}
|
||||
|
||||
func copyTrackers(base []trackerTier) (copy []trackerTier) {
|
||||
for _, tier := range base {
|
||||
copy = append(copy, append(trackerTier(nil), tier...))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func mergeTier(tier trackerTier, newURLs []string) trackerTier {
|
||||
nextURL:
|
||||
for _, url := range newURLs {
|
||||
for _, trURL := range tier {
|
||||
if trURL == url {
|
||||
continue nextURL
|
||||
}
|
||||
}
|
||||
tier = append(tier, url)
|
||||
}
|
||||
return tier
|
||||
}
|
||||
|
||||
// A file-like handle to some torrent data resource.
|
||||
type Handle interface {
|
||||
io.Reader
|
||||
@ -1527,9 +1506,6 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
|
||||
}
|
||||
new = true
|
||||
t = cl.newTorrent(infoHash)
|
||||
if !cl.config.DisableTrackers {
|
||||
go t.announceTrackers()
|
||||
}
|
||||
if cl.dHT != nil {
|
||||
go cl.announceTorrentDHT(t, true)
|
||||
}
|
||||
@ -1577,7 +1553,12 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
|
||||
}
|
||||
|
||||
func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
|
||||
for t.waitWantPeers() {
|
||||
for {
|
||||
select {
|
||||
case <-t.wantPeersEvent.LockedChan(&cl.mu):
|
||||
case <-t.closed.LockedChan(&cl.mu):
|
||||
return
|
||||
}
|
||||
// log.Printf("getting peers for %q from DHT", t)
|
||||
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
|
||||
if err != nil {
|
||||
@ -1627,22 +1608,27 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
|
||||
url_, err := url.Parse(trRawURL)
|
||||
func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
|
||||
_url, err := url.Parse(announceURL)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
host, _, err := net.SplitHostPort(url_.Host)
|
||||
if err != nil {
|
||||
host = url_.Host
|
||||
hmp := missinggo.SplitHostMaybePort(_url.Host)
|
||||
if hmp.Err != nil {
|
||||
err = hmp.Err
|
||||
return
|
||||
}
|
||||
addr, err := net.ResolveIPAddr("ip", host)
|
||||
addr, err := net.ResolveIPAddr("ip", hmp.Host)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
cl.mu.RLock()
|
||||
_, blocked = cl.ipBlockRange(addr.IP)
|
||||
cl.mu.RUnlock()
|
||||
host = _url.Host
|
||||
hmp.Host = addr.String()
|
||||
_url.Host = hmp.String()
|
||||
urlToUse = _url.String()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -453,11 +453,10 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) {
|
||||
}
|
||||
spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
|
||||
_, new, _ = cl.AddTorrentSpec(&spec)
|
||||
if new {
|
||||
t.FailNow()
|
||||
}
|
||||
assert.EqualValues(t, T.trackers[0][0], "http://a")
|
||||
assert.EqualValues(t, T.trackers[1][0], "udp://b")
|
||||
assert.False(t, new)
|
||||
assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
|
||||
// Because trackers are disabled in TestingConfig.
|
||||
assert.EqualValues(t, 0, len(T.trackerAnnouncers))
|
||||
}
|
||||
|
||||
type badStorage struct{}
|
||||
@ -762,7 +761,7 @@ func TestAddMetainfoWithNodes(t *testing.T) {
|
||||
assert.EqualValues(t, cl.DHT().NumNodes(), 0)
|
||||
tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, tt.trackers, 5)
|
||||
assert.Len(t, tt.metainfo.AnnounceList, 5)
|
||||
assert.EqualValues(t, 6, cl.DHT().NumNodes())
|
||||
}
|
||||
|
||||
@ -889,3 +888,12 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) {
|
||||
Info: &greetingMetainfo.Info,
|
||||
})
|
||||
}
|
||||
|
||||
func TestPrepareTrackerAnnounce(t *testing.T) {
|
||||
cl := &Client{}
|
||||
blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
|
||||
require.NoError(t, err)
|
||||
assert.False(t, blocked)
|
||||
assert.EqualValues(t, "localhost:1234", host)
|
||||
assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
|
||||
}
|
||||
|
2
t.go
2
t.go
@ -120,7 +120,7 @@ func (t *Torrent) Length() int64 {
|
||||
func (t *Torrent) Metainfo() *metainfo.MetaInfo {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
return t.metainfo()
|
||||
return t.newMetaInfo()
|
||||
}
|
||||
|
||||
func (t *Torrent) addReader(r *Reader) {
|
||||
|
211
torrent.go
211
torrent.go
@ -56,6 +56,8 @@ type Torrent struct {
|
||||
// Storage for torrent data.
|
||||
storage storage.Torrent
|
||||
|
||||
metainfo metainfo.MetaInfo
|
||||
|
||||
// The info dict. nil if we don't have it (yet).
|
||||
info *metainfo.InfoEx
|
||||
// Active peer connections, running message stream loops.
|
||||
@ -67,12 +69,11 @@ type Torrent struct {
|
||||
// Reserve of peers to connect to. A peer can be both here and in the
|
||||
// active connections if were told about the peer after connecting with
|
||||
// them. That encourages us to reconnect to peers that are well known.
|
||||
peers map[peersKey]Peer
|
||||
wantPeers sync.Cond
|
||||
peers map[peersKey]Peer
|
||||
wantPeersEvent missinggo.Event
|
||||
|
||||
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
|
||||
// mirror their respective URLs from the announce-list metainfo key.
|
||||
trackers []trackerTier
|
||||
// An announcer for each tracker URL.
|
||||
trackerAnnouncers map[string]*trackerScraper
|
||||
// Name used if the info name isn't available.
|
||||
displayName string
|
||||
// The bencoded bytes of the info dict.
|
||||
@ -425,10 +426,8 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
|
||||
})
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintf(w, "Trackers: ")
|
||||
for _, tier := range t.trackers {
|
||||
for _, tr := range tier {
|
||||
fmt.Fprintf(w, "%q ", tr)
|
||||
}
|
||||
for _url := range t.trackerAnnouncers {
|
||||
fmt.Fprintf(w, "%q ", _url)
|
||||
}
|
||||
fmt.Fprintf(w, "\n")
|
||||
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
|
||||
@ -451,13 +450,12 @@ func (t *Torrent) haveInfo() bool {
|
||||
|
||||
// TODO: Include URIs that weren't converted to tracker clients.
|
||||
func (t *Torrent) announceList() (al [][]string) {
|
||||
missinggo.CastSlice(&al, t.trackers)
|
||||
return
|
||||
return t.metainfo.AnnounceList
|
||||
}
|
||||
|
||||
// Returns a run-time generated MetaInfo that includes the info bytes and
|
||||
// announce-list as currently known to the client.
|
||||
func (t *Torrent) metainfo() *metainfo.MetaInfo {
|
||||
func (t *Torrent) newMetaInfo() *metainfo.MetaInfo {
|
||||
if t.metadataBytes == nil {
|
||||
panic("info bytes not set")
|
||||
}
|
||||
@ -1049,17 +1047,35 @@ func (t *Torrent) needData() bool {
|
||||
})
|
||||
}
|
||||
|
||||
func (t *Torrent) addTrackers(announceList [][]string) {
|
||||
newTrackers := copyTrackers(t.trackers)
|
||||
for tierIndex, tier := range announceList {
|
||||
if tierIndex < len(newTrackers) {
|
||||
newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
|
||||
} else {
|
||||
newTrackers = append(newTrackers, mergeTier(nil, tier))
|
||||
func appendMissingStrings(old, new []string) (ret []string) {
|
||||
ret = old
|
||||
new:
|
||||
for _, n := range new {
|
||||
for _, o := range old {
|
||||
if o == n {
|
||||
continue new
|
||||
}
|
||||
}
|
||||
shuffleTier(newTrackers[tierIndex])
|
||||
ret = append(ret, n)
|
||||
}
|
||||
t.trackers = newTrackers
|
||||
return
|
||||
}
|
||||
|
||||
func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
|
||||
ret = existing
|
||||
for minNumTiers > len(ret) {
|
||||
ret = append(ret, nil)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *Torrent) addTrackers(announceList [][]string) {
|
||||
fullAnnounceList := &t.metainfo.AnnounceList
|
||||
t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
|
||||
for tierIndex, trackerURLs := range announceList {
|
||||
(*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
|
||||
}
|
||||
t.startMissingTrackerScrapers()
|
||||
}
|
||||
|
||||
// Don't call this before the info is available.
|
||||
@ -1103,22 +1119,21 @@ func (t *Torrent) dropConnection(c *connection) {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true when peers are required, or false if the torrent is closing.
|
||||
func (t *Torrent) waitWantPeers() bool {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
for {
|
||||
if t.closed.IsSet() {
|
||||
return false
|
||||
}
|
||||
if len(t.peers) > torrentPeersLowWater {
|
||||
goto wait
|
||||
}
|
||||
if t.needData() || t.seeding() {
|
||||
return true
|
||||
}
|
||||
wait:
|
||||
t.wantPeers.Wait()
|
||||
func (t *Torrent) wantPeers() bool {
|
||||
if t.closed.IsSet() {
|
||||
return false
|
||||
}
|
||||
if len(t.peers) > torrentPeersLowWater {
|
||||
return false
|
||||
}
|
||||
return t.needData() || t.seeding()
|
||||
}
|
||||
|
||||
func (t *Torrent) updateWantPeersEvent() {
|
||||
if t.wantPeers() {
|
||||
t.wantPeersEvent.Set()
|
||||
} else {
|
||||
t.wantPeersEvent.Clear()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1137,106 +1152,36 @@ func (t *Torrent) seeding() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Announce torrent to its trackers.
|
||||
func (t *Torrent) announceTrackers() {
|
||||
cl := t.cl
|
||||
req := tracker.AnnounceRequest{
|
||||
Event: tracker.Started,
|
||||
NumWant: -1,
|
||||
Port: uint16(cl.incomingPeerPort()),
|
||||
PeerId: cl.peerID,
|
||||
InfoHash: t.infoHash,
|
||||
}
|
||||
if !t.waitWantPeers() {
|
||||
return
|
||||
}
|
||||
cl.mu.RLock()
|
||||
req.Left = t.bytesLeftAnnounce()
|
||||
trackers := t.trackers
|
||||
cl.mu.RUnlock()
|
||||
if t.announceTrackersFastStart(&req, trackers) {
|
||||
req.Event = tracker.None
|
||||
}
|
||||
newAnnounce:
|
||||
for t.waitWantPeers() {
|
||||
cl.mu.RLock()
|
||||
req.Left = t.bytesLeftAnnounce()
|
||||
trackers = t.trackers
|
||||
cl.mu.RUnlock()
|
||||
numTrackersTried := 0
|
||||
for _, tier := range trackers {
|
||||
for trIndex, tr := range tier {
|
||||
numTrackersTried++
|
||||
interval, err := t.announceSingleTracker(tr, &req)
|
||||
if err != nil {
|
||||
// Try the next tracker.
|
||||
continue
|
||||
}
|
||||
// Float the successful announce to the top of the tier. If
|
||||
// the trackers list has been changed, we'll be modifying an
|
||||
// old copy so it won't matter.
|
||||
cl.mu.Lock()
|
||||
tier[0], tier[trIndex] = tier[trIndex], tier[0]
|
||||
cl.mu.Unlock()
|
||||
|
||||
req.Event = tracker.None
|
||||
// Wait the interval before attempting another announce.
|
||||
time.Sleep(interval)
|
||||
continue newAnnounce
|
||||
// Adds and starts tracker scrapers for tracker URLs that aren't already
|
||||
// running.
|
||||
func (t *Torrent) startMissingTrackerScrapers() {
|
||||
for _, tier := range t.announceList() {
|
||||
for _, trackerURL := range tier {
|
||||
if _, ok := t.trackerAnnouncers[trackerURL]; ok {
|
||||
continue
|
||||
}
|
||||
newAnnouncer := &trackerScraper{
|
||||
url: trackerURL,
|
||||
t: t,
|
||||
}
|
||||
if t.trackerAnnouncers == nil {
|
||||
t.trackerAnnouncers = make(map[string]*trackerScraper)
|
||||
}
|
||||
t.trackerAnnouncers[trackerURL] = newAnnouncer
|
||||
go newAnnouncer.Run()
|
||||
}
|
||||
if numTrackersTried != 0 {
|
||||
log.Printf("%s: all trackers failed", t)
|
||||
}
|
||||
// TODO: Wait until trackers are added if there are none.
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Torrent) announceTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier) (atLeastOne bool) {
|
||||
oks := make(chan bool)
|
||||
outstanding := 0
|
||||
for _, tier := range trackers {
|
||||
for _, tr := range tier {
|
||||
outstanding++
|
||||
go func(tr string) {
|
||||
_, err := t.announceSingleTracker(tr, req)
|
||||
oks <- err == nil
|
||||
}(tr)
|
||||
}
|
||||
// Returns an AnnounceRequest with fields filled out to defaults and current
|
||||
// values.
|
||||
func (t *Torrent) announceRequest() tracker.AnnounceRequest {
|
||||
return tracker.AnnounceRequest{
|
||||
Event: tracker.None,
|
||||
NumWant: -1,
|
||||
Port: uint16(t.cl.incomingPeerPort()),
|
||||
PeerId: t.cl.peerID,
|
||||
InfoHash: t.infoHash,
|
||||
Left: t.bytesLeftAnnounce(),
|
||||
}
|
||||
for outstanding > 0 {
|
||||
ok := <-oks
|
||||
outstanding--
|
||||
if ok {
|
||||
atLeastOne = true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *Torrent) announceSingleTracker(tr string, req *tracker.AnnounceRequest) (interval time.Duration, err error) {
|
||||
blocked, err := t.cl.trackerBlockedUnlocked(tr)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error determining if tracker blocked: %s", err)
|
||||
return
|
||||
}
|
||||
if blocked {
|
||||
err = errors.New("tracker has blocked IP")
|
||||
return
|
||||
}
|
||||
resp, err := tracker.Announce(tr, req)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var peers []Peer
|
||||
for _, peer := range resp.Peers {
|
||||
peers = append(peers, Peer{
|
||||
IP: peer.IP,
|
||||
Port: peer.Port,
|
||||
})
|
||||
}
|
||||
t.AddPeers(peers)
|
||||
interval = time.Second * time.Duration(resp.Interval)
|
||||
return
|
||||
}
|
||||
|
73
tracker_scraper.go
Normal file
73
tracker_scraper.go
Normal file
@ -0,0 +1,73 @@
|
||||
package torrent
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/missinggo"
|
||||
|
||||
"github.com/anacrolix/torrent/tracker"
|
||||
)
|
||||
|
||||
// Announces a torrent to a tracker at regular intervals, when peers are
|
||||
// required.
|
||||
type trackerScraper struct {
|
||||
url string
|
||||
// Causes the trackerScraper to stop running.
|
||||
stop missinggo.Event
|
||||
t *Torrent
|
||||
}
|
||||
|
||||
func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
|
||||
ret = make([]Peer, 0, len(ps))
|
||||
for _, p := range ps {
|
||||
ret = append(ret, Peer{
|
||||
IP: p.IP,
|
||||
Port: p.Port,
|
||||
Source: peerSourceTracker,
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Return how long to wait before trying again.
|
||||
func (me *trackerScraper) announce() time.Duration {
|
||||
blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
|
||||
if blocked {
|
||||
// Wait for DNS to potentially change. Very few people do it faster
|
||||
// than 5 minutes.
|
||||
return 5 * time.Minute
|
||||
}
|
||||
me.t.cl.mu.Lock()
|
||||
req := me.t.announceRequest()
|
||||
me.t.cl.mu.Unlock()
|
||||
res, err := tracker.AnnounceHost(urlToUse, &req, host)
|
||||
if err != nil {
|
||||
log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err)
|
||||
return 5 * time.Minute
|
||||
}
|
||||
me.t.AddPeers(trackerToTorrentPeers(res.Peers))
|
||||
return time.Duration(res.Interval) * time.Second
|
||||
}
|
||||
|
||||
func (me *trackerScraper) Run() {
|
||||
for {
|
||||
select {
|
||||
case <-me.t.closed.LockedChan(&me.t.cl.mu):
|
||||
return
|
||||
case <-me.stop.LockedChan(&me.t.cl.mu):
|
||||
return
|
||||
case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
|
||||
}
|
||||
|
||||
intervalChan := time.After(me.announce())
|
||||
|
||||
select {
|
||||
case <-me.t.closed.LockedChan(&me.t.cl.mu):
|
||||
return
|
||||
case <-me.stop.LockedChan(&me.t.cl.mu):
|
||||
return
|
||||
case <-intervalChan:
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user