2
0
mirror of synced 2025-02-24 14:48:27 +00:00
This commit is contained in:
lovedboy 2016-05-24 13:19:55 +08:00
commit 0b36de4be9
13 changed files with 323 additions and 306 deletions

227
client.go
View File

@ -33,7 +33,6 @@ import (
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
)
// Currently doesn't really queue, but should in the future.
@ -74,6 +73,7 @@ type Client struct {
// include ourselves if we end up trying to connect to our own address
// through legitimate channels.
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
defaultStorage storage.Client
@ -157,6 +157,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
fmt.Fprintln(w, "Not listening!")
}
fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPs))
if cl.dHT != nil {
dhtStats := cl.dHT.Stats()
fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
@ -397,12 +398,12 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
acceptTCP.Add(1)
}
cl.mu.RLock()
doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
_, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
reject := cl.badPeerIPPort(
missinggo.AddrIP(conn.RemoteAddr()),
missinggo.AddrPort(conn.RemoteAddr()))
cl.mu.RUnlock()
if blocked || doppleganger {
if reject {
acceptReject.Add(1)
// log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
conn.Close()
continue
}
@ -474,13 +475,11 @@ func (cl *Client) initiateConn(peer Peer, t *Torrent) {
if peer.Id == cl.peerID {
return
}
addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
duplicateConnsAvoided.Add(1)
if cl.badPeerIPPort(peer.IP, peer.Port) {
return
}
if r, ok := cl.ipBlockRange(peer.IP); ok {
log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
if t.addrActive(addr) {
return
}
t.halfOpen[addr] = struct{}{}
@ -876,8 +875,6 @@ func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainf
func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
if c.PeerID == cl.peerID {
// Only if we initiated the connection is the remote address a
// listen addr for a doppleganger.
connsToSelf.Add(1)
addr := c.conn.RemoteAddr().String()
cl.dopplegangerAddrs[addr] = struct{}{}
@ -904,6 +901,10 @@ func (cl *Client) runReceivedConn(c *connection) {
cl.mu.Lock()
defer cl.mu.Unlock()
if c.PeerID == cl.peerID {
// Because the remote address is not necessarily the same as its
// client's torrent listen address, we won't record the remote address
// as a doppleganger. Instead, the initiator can record *us* as the
// doppleganger.
return
}
cl.runHandshookConn(c, t)
@ -1375,6 +1376,7 @@ func (cl *Client) wantConns(t *Torrent) bool {
}
func (cl *Client) openNewConns(t *Torrent) {
defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
if !cl.wantConns(t) {
return
@ -1392,22 +1394,27 @@ func (cl *Client) openNewConns(t *Torrent) {
delete(t.peers, k)
cl.initiateConn(p, t)
}
t.wantPeers.Broadcast()
}
func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
if port == 0 {
return true
}
if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
return true
}
if _, ok := cl.ipBlockRange(ip); ok {
return true
}
if _, ok := cl.badPeerIPs[ip.String()]; ok {
return true
}
return false
}
func (cl *Client) addPeers(t *Torrent, peers []Peer) {
for _, p := range peers {
if cl.dopplegangerAddr(net.JoinHostPort(
p.IP.String(),
strconv.FormatInt(int64(p.Port), 10),
)) {
continue
}
if _, ok := cl.ipBlockRange(p.IP); ok {
continue
}
if p.Port == 0 {
// The spec says to scrub these yourselves. Fine.
if cl.badPeerIPPort(p.IP, p.Port) {
continue
}
t.addPeer(p, cl)
@ -1429,7 +1436,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
storageOpener: cl.defaultStorage,
}
t.wantPeers.L = &cl.mu
return
}
@ -1450,26 +1456,6 @@ func shuffleTier(tier trackerTier) {
}
}
func copyTrackers(base []trackerTier) (copy []trackerTier) {
for _, tier := range base {
copy = append(copy, append(trackerTier(nil), tier...))
}
return
}
func mergeTier(tier trackerTier, newURLs []string) trackerTier {
nextURL:
for _, url := range newURLs {
for _, trURL := range tier {
if trURL == url {
continue nextURL
}
}
tier = append(tier, url)
}
return tier
}
// A file-like handle to some torrent data resource.
type Handle interface {
io.Reader
@ -1513,10 +1499,8 @@ func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
DisplayName: mi.Info.Name,
InfoHash: mi.Info.Hash(),
}
if len(spec.Trackers) == 0 {
spec.Trackers = [][]string{[]string{mi.Announce}}
} else {
spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
if spec.Trackers == nil && mi.Announce != "" {
spec.Trackers = [][]string{{mi.Announce}}
}
return
}
@ -1530,13 +1514,11 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
}
new = true
t = cl.newTorrent(infoHash)
if !cl.config.DisableTrackers {
go cl.announceTorrentTrackers(t)
}
if cl.dHT != nil {
go cl.announceTorrentDHT(t, true)
}
cl.torrents[infoHash] = t
t.updateWantPeersEvent()
return
}
@ -1580,7 +1562,12 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
}
func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
for t.waitWantPeers() {
for {
select {
case <-t.wantPeersEvent.LockedChan(&cl.mu):
case <-t.closed.LockedChan(&cl.mu):
return
}
// log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
@ -1630,128 +1617,30 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
}
}
func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
url_, err := url.Parse(trRawURL)
func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
_url, err := url.Parse(announceURL)
if err != nil {
return
}
host, _, err := net.SplitHostPort(url_.Host)
if err != nil {
host = url_.Host
hmp := missinggo.SplitHostMaybePort(_url.Host)
if hmp.Err != nil {
err = hmp.Err
return
}
addr, err := net.ResolveIPAddr("ip", host)
addr, err := net.ResolveIPAddr("ip", hmp.Host)
if err != nil {
return
}
cl.mu.RLock()
_, blocked = cl.ipBlockRange(addr.IP)
cl.mu.RUnlock()
host = _url.Host
hmp.Host = addr.String()
_url.Host = hmp.String()
urlToUse = _url.String()
return
}
func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) (interval time.Duration, err error) {
blocked, err := cl.trackerBlockedUnlocked(tr)
if err != nil {
err = fmt.Errorf("error determining if tracker blocked: %s", err)
return
}
if blocked {
err = errors.New("tracker has blocked IP")
return
}
resp, err := tracker.Announce(tr, req)
if err != nil {
return
}
var peers []Peer
for _, peer := range resp.Peers {
peers = append(peers, Peer{
IP: peer.IP,
Port: peer.Port,
})
}
t.AddPeers(peers)
interval = time.Second * time.Duration(resp.Interval)
return
}
func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
oks := make(chan bool)
outstanding := 0
for _, tier := range trackers {
for _, tr := range tier {
outstanding++
go func(tr string) {
_, err := cl.announceTorrentSingleTracker(tr, req, t)
oks <- err == nil
}(tr)
}
}
for outstanding > 0 {
ok := <-oks
outstanding--
if ok {
atLeastOne = true
}
}
return
}
// Announce torrent to its trackers.
func (cl *Client) announceTorrentTrackers(t *Torrent) {
req := tracker.AnnounceRequest{
Event: tracker.Started,
NumWant: -1,
Port: uint16(cl.incomingPeerPort()),
PeerId: cl.peerID,
InfoHash: t.infoHash,
}
if !t.waitWantPeers() {
return
}
cl.mu.RLock()
req.Left = t.bytesLeftAnnounce()
trackers := t.trackers
cl.mu.RUnlock()
if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
req.Event = tracker.None
}
newAnnounce:
for t.waitWantPeers() {
cl.mu.RLock()
req.Left = t.bytesLeftAnnounce()
trackers = t.trackers
cl.mu.RUnlock()
numTrackersTried := 0
for _, tier := range trackers {
for trIndex, tr := range tier {
numTrackersTried++
interval, err := cl.announceTorrentSingleTracker(tr, &req, t)
if err != nil {
// Try the next tracker.
continue
}
// Float the successful announce to the top of the tier. If
// the trackers list has been changed, we'll be modifying an
// old copy so it won't matter.
cl.mu.Lock()
tier[0], tier[trIndex] = tier[trIndex], tier[0]
cl.mu.Unlock()
req.Event = tracker.None
// Wait the interval before attempting another announce.
time.Sleep(interval)
continue newAnnounce
}
}
if numTrackersTried != 0 {
log.Printf("%s: all trackers failed", t)
}
// TODO: Wait until trackers are added if there are none.
time.Sleep(10 * time.Second)
}
}
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveInfo() {
@ -1877,14 +1766,19 @@ func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
p.EverHashed = true
touchers := cl.reapPieceTouches(t, piece)
if correct {
for _, c := range touchers {
c.goodPiecesDirtied++
}
err := p.Storage().MarkComplete()
if err != nil {
log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
}
t.updatePieceCompletion(piece)
} else if len(touchers) != 0 {
log.Printf("dropping %d conns that touched piece", len(touchers))
log.Printf("dropping and banning %d conns that touched piece", len(touchers))
for _, c := range touchers {
c.badPiecesDirtied++
t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
t.dropConnection(c)
}
}
@ -2014,3 +1908,10 @@ func (cl *Client) AddDHTNodes(nodes []string) {
cl.DHT().AddNode(ni)
}
}
func (cl *Client) banPeerIP(ip net.IP) {
if cl.badPeerIPs == nil {
cl.badPeerIPs = make(map[string]struct{})
}
cl.badPeerIPs[ip.String()] = struct{}{}
}

View File

@ -453,11 +453,10 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) {
}
spec.Trackers = [][]string{{"http://a"}, {"udp://b"}}
_, new, _ = cl.AddTorrentSpec(&spec)
if new {
t.FailNow()
}
assert.EqualValues(t, T.trackers[0][0], "http://a")
assert.EqualValues(t, T.trackers[1][0], "udp://b")
assert.False(t, new)
assert.EqualValues(t, [][]string{{"http://a"}, {"udp://b"}}, T.metainfo.AnnounceList)
// Because trackers are disabled in TestingConfig.
assert.EqualValues(t, 0, len(T.trackerAnnouncers))
}
type badStorage struct{}
@ -762,7 +761,7 @@ func TestAddMetainfoWithNodes(t *testing.T) {
assert.EqualValues(t, cl.DHT().NumNodes(), 0)
tt, err := cl.AddTorrentFromFile("metainfo/testdata/issue_65a.torrent")
require.NoError(t, err)
assert.Len(t, tt.trackers, 5)
assert.Len(t, tt.metainfo.AnnounceList, 5)
assert.EqualValues(t, 6, cl.DHT().NumNodes())
}
@ -889,3 +888,12 @@ func TestPieceCompletedInStorageButNotClient(t *testing.T) {
Info: &greetingMetainfo.Info,
})
}
func TestPrepareTrackerAnnounce(t *testing.T) {
cl := &Client{}
blocked, urlToUse, host, err := cl.prepareTrackerAnnounceUnlocked("http://localhost:1234/announce?herp")
require.NoError(t, err)
assert.False(t, blocked)
assert.EqualValues(t, "localhost:1234", host)
assert.EqualValues(t, "http://127.0.0.1:1234/announce?herp", urlToUse)
}

View File

@ -35,12 +35,14 @@ func main() {
continue
}
d := map[string]interface{}{
"Name": info.Name,
"NumPieces": info.NumPieces(),
"PieceLength": info.PieceLength,
"InfoHash": metainfo.Info.Hash().HexString(),
"NumFiles": len(info.UpvertedFiles()),
"TotalLength": info.TotalLength(),
"Name": info.Name,
"NumPieces": info.NumPieces(),
"PieceLength": info.PieceLength,
"InfoHash": metainfo.Info.Hash().HexString(),
"NumFiles": len(info.UpvertedFiles()),
"TotalLength": info.TotalLength(),
"Announce": metainfo.Announce,
"AnnounceList": metainfo.AnnounceList,
}
if flags.Files {
d["Files"] = info.Files

View File

@ -47,6 +47,8 @@ type connection struct {
UnwantedChunksReceived int
UsefulChunksReceived int
chunksSent int
goodPiecesDirtied int
badPiecesDirtied int
lastMessageReceived time.Time
completedHandshake time.Time

View File

@ -60,10 +60,9 @@ var (
peersAddedBySource = expvar.NewMap("peersAddedBySource")
uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
unexpectedCancels = expvar.NewInt("unexpectedCancels")
postedCancels = expvar.NewInt("postedCancels")
duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided")
uploadChunksPosted = expvar.NewInt("uploadChunksPosted")
unexpectedCancels = expvar.NewInt("unexpectedCancels")
postedCancels = expvar.NewInt("postedCancels")
pieceHashedCorrect = expvar.NewInt("pieceHashedCorrect")
pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")

View File

@ -34,7 +34,6 @@ func GreetingMetaInfo() (mi *metainfo.MetaInfo) {
mi = new(metainfo.MetaInfo)
mi.Info.Name = GreetingFileName
mi.Info.Length = int64(len(GreetingFileContents))
mi.Announce = "lol://cheezburger"
mi.Info.PieceLength = 5
err := mi.Info.GeneratePieces(func(metainfo.FileInfo) (io.ReadCloser, error) {
return ioutil.NopCloser(strings.NewReader(GreetingFileContents)), nil

8
t.go
View File

@ -120,7 +120,7 @@ func (t *Torrent) Length() int64 {
func (t *Torrent) Metainfo() *metainfo.MetaInfo {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
return t.metainfo()
return t.newMetaInfo()
}
func (t *Torrent) addReader(r *Reader) {
@ -197,3 +197,9 @@ func (t *Torrent) String() string {
}
return s
}
func (t *Torrent) AddTrackers(announceList [][]string) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
t.addTrackers(announceList)
}

View File

@ -25,6 +25,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
)
func (t *Torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
@ -55,6 +56,8 @@ type Torrent struct {
// Storage for torrent data.
storage storage.Torrent
metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet).
info *metainfo.InfoEx
// Active peer connections, running message stream loops.
@ -66,12 +69,11 @@ type Torrent struct {
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
// them. That encourages us to reconnect to peers that are well known.
peers map[peersKey]Peer
wantPeers sync.Cond
peers map[peersKey]Peer
wantPeersEvent missinggo.Event
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list metainfo key.
trackers []trackerTier
// An announcer for each tracker URL.
trackerAnnouncers map[string]*trackerScraper
// Name used if the info name isn't available.
displayName string
// The bencoded bytes of the info dict.
@ -216,6 +218,7 @@ func (t *Torrent) setInfoBytes(b []byte) error {
if err != nil {
return fmt.Errorf("bad info: %s", err)
}
defer t.updateWantPeersEvent()
t.info = ie
t.cl.event.Broadcast()
t.gotMetainfo.Set()
@ -424,10 +427,8 @@ func (t *Torrent) writeStatus(w io.Writer, cl *Client) {
})
fmt.Fprintln(w)
fmt.Fprintf(w, "Trackers: ")
for _, tier := range t.trackers {
for _, tr := range tier {
fmt.Fprintf(w, "%q ", tr)
}
for _url := range t.trackerAnnouncers {
fmt.Fprintf(w, "%q ", _url)
}
fmt.Fprintf(w, "\n")
fmt.Fprintf(w, "Pending peers: %d\n", len(t.peers))
@ -450,23 +451,22 @@ func (t *Torrent) haveInfo() bool {
// TODO: Include URIs that weren't converted to tracker clients.
func (t *Torrent) announceList() (al [][]string) {
missinggo.CastSlice(&al, t.trackers)
return
return t.metainfo.AnnounceList
}
// Returns a run-time generated MetaInfo that includes the info bytes and
// announce-list as currently known to the client.
func (t *Torrent) metainfo() *metainfo.MetaInfo {
if t.metadataBytes == nil {
panic("info bytes not set")
}
return &metainfo.MetaInfo{
Info: *t.info,
func (t *Torrent) newMetaInfo() (mi *metainfo.MetaInfo) {
mi = &metainfo.MetaInfo{
CreationDate: time.Now().Unix(),
Comment: "dynamic metainfo from client",
CreatedBy: "go.torrent",
AnnounceList: t.announceList(),
}
if t.info != nil {
mi.Info = *t.info
}
return
}
func (t *Torrent) bytesLeft() (left int64) {
@ -520,6 +520,7 @@ func (t *Torrent) close() (err error) {
conn.Close()
}
t.pieceStateChanges.Close()
t.updateWantPeersEvent()
return
}
@ -1048,17 +1049,36 @@ func (t *Torrent) needData() bool {
})
}
func (t *Torrent) addTrackers(announceList [][]string) {
newTrackers := copyTrackers(t.trackers)
for tierIndex, tier := range announceList {
if tierIndex < len(newTrackers) {
newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
} else {
newTrackers = append(newTrackers, mergeTier(nil, tier))
func appendMissingStrings(old, new []string) (ret []string) {
ret = old
new:
for _, n := range new {
for _, o := range old {
if o == n {
continue new
}
}
shuffleTier(newTrackers[tierIndex])
ret = append(ret, n)
}
t.trackers = newTrackers
return
}
func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]string) {
ret = existing
for minNumTiers > len(ret) {
ret = append(ret, nil)
}
return
}
func (t *Torrent) addTrackers(announceList [][]string) {
fullAnnounceList := &t.metainfo.AnnounceList
t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
for tierIndex, trackerURLs := range announceList {
(*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
}
t.startMissingTrackerScrapers()
t.updateWantPeersEvent()
}
// Don't call this before the info is available.
@ -1102,22 +1122,21 @@ func (t *Torrent) dropConnection(c *connection) {
}
}
// Returns true when peers are required, or false if the torrent is closing.
func (t *Torrent) waitWantPeers() bool {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
for {
if t.closed.IsSet() {
return false
}
if len(t.peers) > torrentPeersLowWater {
goto wait
}
if t.needData() || t.seeding() {
return true
}
wait:
t.wantPeers.Wait()
func (t *Torrent) wantPeers() bool {
if t.closed.IsSet() {
return false
}
if len(t.peers) > torrentPeersLowWater {
return false
}
return t.needData() || t.seeding()
}
func (t *Torrent) updateWantPeersEvent() {
if t.wantPeers() {
t.wantPeersEvent.Set()
} else {
t.wantPeersEvent.Clear()
}
}
@ -1135,3 +1154,40 @@ func (t *Torrent) seeding() bool {
}
return true
}
// Adds and starts tracker scrapers for tracker URLs that aren't already
// running.
func (t *Torrent) startMissingTrackerScrapers() {
if t.cl.config.DisableTrackers {
return
}
for _, tier := range t.announceList() {
for _, trackerURL := range tier {
if _, ok := t.trackerAnnouncers[trackerURL]; ok {
continue
}
newAnnouncer := &trackerScraper{
url: trackerURL,
t: t,
}
if t.trackerAnnouncers == nil {
t.trackerAnnouncers = make(map[string]*trackerScraper)
}
t.trackerAnnouncers[trackerURL] = newAnnouncer
go newAnnouncer.Run()
}
}
}
// Returns an AnnounceRequest with fields filled out to defaults and current
// values.
func (t *Torrent) announceRequest() tracker.AnnounceRequest {
return tracker.AnnounceRequest{
Event: tracker.None,
NumWant: -1,
Port: uint16(t.cl.incomingPeerPort()),
PeerId: t.cl.peerID,
InfoHash: t.infoHash,
Left: t.bytesLeftAnnounce(),
}
}

View File

@ -10,26 +10,12 @@ import (
"net/url"
"strconv"
"github.com/anacrolix/missinggo/httptoo"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/util"
)
func init() {
registerClientScheme("http", newHTTPClient)
}
type httpClient struct {
url url.URL
}
func (httpClient) Close() error { return nil }
func newHTTPClient(url *url.URL) client {
return &httpClient{
url: *url,
}
}
type httpResponse struct {
FailureReason string `bencode:"failure reason"`
Interval int32 `bencode:"interval"`
@ -56,9 +42,8 @@ func (r *httpResponse) UnmarshalPeers() (ret []Peer, err error) {
return
}
func (c *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err error) {
// retain query parameters from announce URL
q := c.url.Query()
func setAnnounceParams(_url *url.URL, ar *AnnounceRequest) {
q := _url.Query()
q.Set("info_hash", string(ar.InfoHash[:]))
q.Set("peer_id", string(ar.PeerId[:]))
@ -73,14 +58,21 @@ func (c *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err er
q.Set("compact", "1")
// According to https://wiki.vuze.com/w/Message_Stream_Encryption.
q.Set("supportcrypto", "1")
var reqURL url.URL = c.url
reqURL.RawQuery = q.Encode()
resp, err := http.Get(reqURL.String())
_url.RawQuery = q.Encode()
}
func announceHTTP(ar *AnnounceRequest, _url *url.URL, host string) (ret AnnounceResponse, err error) {
_url = httptoo.CopyURL(_url)
setAnnounceParams(_url, ar)
req, err := http.NewRequest("GET", _url.String(), nil)
req.Host = host
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
buf := bytes.Buffer{}
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
if resp.StatusCode != 200 {
err = fmt.Errorf("response from tracker: %s: %s", resp.Status, buf.String())

View File

@ -13,11 +13,13 @@ type AnnounceRequest struct {
Downloaded int64
Left uint64
Uploaded int64
Event AnnounceEvent
IPAddress int32
Key int32
NumWant int32 // How many peer addresses are desired. -1 for default.
Port uint16
// Apparently this is optional. None can be used for announces done at
// regular intervals.
Event AnnounceEvent
IPAddress int32
Key int32
NumWant int32 // How many peer addresses are desired. -1 for default.
Port uint16
} // 82 bytes
type AnnounceResponse struct {
@ -46,42 +48,26 @@ const (
Stopped // The local peer is leaving the swarm.
)
type client interface {
Announce(*AnnounceRequest) (AnnounceResponse, error)
Close() error
}
var (
ErrBadScheme = errors.New("unknown scheme")
schemes = make(map[string]func(*url.URL) client)
)
func registerClientScheme(scheme string, newFunc func(*url.URL) client) {
schemes[scheme] = newFunc
func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
return AnnounceHost(urlStr, req, "")
}
// Returns ErrBadScheme if the tracker scheme isn't recognised.
func newClient(rawurl string) (cl client, err error) {
url_s, err := url.Parse(rawurl)
func AnnounceHost(urlStr string, req *AnnounceRequest, host string) (res AnnounceResponse, err error) {
_url, err := url.Parse(urlStr)
if err != nil {
return
}
newFunc, ok := schemes[url_s.Scheme]
if !ok {
switch _url.Scheme {
case "http", "https":
return announceHTTP(req, _url, host)
case "udp":
return announceUDP(req, _url)
default:
err = ErrBadScheme
return
}
cl = newFunc(url_s)
return
}
func Announce(urlStr string, req *AnnounceRequest) (res AnnounceResponse, err error) {
cl, err := newClient(urlStr)
if err != nil {
return
}
defer cl.Close()
return cl.Announce(req)
}

View File

@ -60,16 +60,6 @@ type AnnounceResponseHeader struct {
Seeders int32
}
func init() {
registerClientScheme("udp", newUDPClient)
}
func newUDPClient(url *url.URL) client {
return &udpClient{
url: *url,
}
}
func newTransactionId() int32 {
return int32(rand.Uint32())
}
@ -85,7 +75,7 @@ func timeout(contiguousTimeouts int) (d time.Duration) {
return
}
type udpClient struct {
type udpAnnounce struct {
contiguousTimeouts int
connectionIdReceived time.Time
connectionId int64
@ -93,14 +83,14 @@ type udpClient struct {
url url.URL
}
func (c *udpClient) Close() error {
func (c *udpAnnounce) Close() error {
if c.socket != nil {
return c.socket.Close()
}
return nil
}
func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err error) {
func (c *udpAnnounce) Do(req *AnnounceRequest) (res AnnounceResponse, err error) {
err = c.connect()
if err != nil {
return
@ -140,7 +130,7 @@ func (c *udpClient) Announce(req *AnnounceRequest) (res AnnounceResponse, err er
// body is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
func (c *udpClient) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
func (c *udpAnnounce) write(h *RequestHeader, body interface{}, trailer []byte) (err error) {
var buf bytes.Buffer
err = binary.Write(&buf, binary.BigEndian, h)
if err != nil {
@ -176,7 +166,7 @@ func write(w io.Writer, data interface{}) error {
// args is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
func (c *udpClient) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
tid := newTransactionId()
err = c.write(&RequestHeader{
ConnectionId: c.connectionId,
@ -232,11 +222,11 @@ func readBody(r io.Reader, data ...interface{}) (err error) {
return
}
func (c *udpClient) connected() bool {
func (c *udpAnnounce) connected() bool {
return !c.connectionIdReceived.IsZero() && time.Now().Before(c.connectionIdReceived.Add(time.Minute))
}
func (c *udpClient) connect() (err error) {
func (c *udpAnnounce) connect() (err error) {
if c.connected() {
return nil
}
@ -266,3 +256,11 @@ func (c *udpClient) connect() (err error) {
c.connectionIdReceived = time.Now()
return
}
func announceUDP(ar *AnnounceRequest, _url *url.URL) (AnnounceResponse, error) {
ua := udpAnnounce{
url: *_url,
}
defer ua.Close()
return ua.Do(ar)
}

View File

@ -131,18 +131,9 @@ func TestUDPTracker(t *testing.T) {
rand.Read(req.PeerId[:])
copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
ar, err := Announce("udp://tracker.openbittorrent.com:80/announce", &req)
// Skip temporary errors as we don't control the server.
if ne, ok := err.(net.Error); ok {
if ne.Timeout() {
t.Skip(err)
}
}
// Skip DNS errors because the network might not be available, and we
// don't control the domains we're testing.
if oe, ok := err.(*net.OpError); ok {
if _, ok := oe.Err.(*net.DNSError); ok {
t.Skip(err)
}
// Skip any net errors as we don't control the server.
if _, ok := err.(net.Error); ok {
t.Skip(err)
}
require.NoError(t, err)
t.Log(ar)

77
tracker_scraper.go Normal file
View File

@ -0,0 +1,77 @@
package torrent
import (
"log"
"time"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/tracker"
)
// Announces a torrent to a tracker at regular intervals, when peers are
// required.
type trackerScraper struct {
url string
// Causes the trackerScraper to stop running.
stop missinggo.Event
t *Torrent
}
func trackerToTorrentPeers(ps []tracker.Peer) (ret []Peer) {
ret = make([]Peer, 0, len(ps))
for _, p := range ps {
ret = append(ret, Peer{
IP: p.IP,
Port: p.Port,
Source: peerSourceTracker,
})
}
return
}
// Return how long to wait before trying again. For most errors, we return 5
// minutes, a relatively quick turn around for DNS changes.
func (me *trackerScraper) announce() time.Duration {
blocked, urlToUse, host, err := me.t.cl.prepareTrackerAnnounceUnlocked(me.url)
if err != nil {
log.Printf("error preparing announce to %q: %s", me.url, err)
return 5 * time.Minute
}
if blocked {
log.Printf("announce to tracker %q blocked by IP", me.url)
return 5 * time.Minute
}
me.t.cl.mu.Lock()
req := me.t.announceRequest()
me.t.cl.mu.Unlock()
res, err := tracker.AnnounceHost(urlToUse, &req, host)
if err != nil {
log.Printf("error announcing %s %q to %q: %s", me.t.InfoHash().HexString(), me.t.Name(), me.url, err)
return 5 * time.Minute
}
me.t.AddPeers(trackerToTorrentPeers(res.Peers))
return time.Duration(res.Interval) * time.Second
}
func (me *trackerScraper) Run() {
for {
select {
case <-me.t.closed.LockedChan(&me.t.cl.mu):
return
case <-me.stop.LockedChan(&me.t.cl.mu):
return
case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
}
intervalChan := time.After(me.announce())
select {
case <-me.t.closed.LockedChan(&me.t.cl.mu):
return
case <-me.stop.LockedChan(&me.t.cl.mu):
return
case <-intervalChan:
}
}
}