From 06aff91f8cc878adc73472a7ef9302481ec2d32e Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 21 Nov 2014 00:09:55 -0600 Subject: [PATCH] Don't open new connections or announce when new connections aren't wanted --- client.go | 178 ++++++++++++++++++++++++++++++++++------------------- torrent.go | 11 +++- 2 files changed, 124 insertions(+), 65 deletions(-) diff --git a/client.go b/client.go index afec2b88..75a04fb1 100644 --- a/client.go +++ b/client.go @@ -109,6 +109,7 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error { for _, cn := range t.Conns { me.replenishConnRequests(t, cn) } + me.openNewConns(t) return nil } @@ -277,7 +278,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { noUpload: cfg.NoUpload, disableTrackers: cfg.DisableTrackers, downloadStrategy: cfg.DownloadStrategy, - halfOpenLimit: 100, + halfOpenLimit: socketsPerTorrent, dataDir: cfg.DataDir, disableUTP: cfg.DisableUTP, @@ -493,7 +494,7 @@ func (me *Client) initiateConn(peer Peer, t *torrent) { panic("invariant broken") } delete(t.HalfOpen, addr) - me.openNewConns() + me.openNewConns(t) }() if res.Conn == nil { return @@ -1131,7 +1132,7 @@ func (me *Client) dropConnection(torrent *torrent, conn *connection) { torrent.Conns[i0] = torrent.Conns[i1] } torrent.Conns = torrent.Conns[:i1] - me.openNewConns() + me.openNewConns(torrent) return } panic("connection not found") @@ -1153,6 +1154,8 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { } } t.Conns = append(t.Conns, c) + // TODO: This should probably be done by a routine that kills off bad + // connections, and extra connections killed here instead. if len(t.Conns) > socketsPerTorrent { wcs := t.worstConnsHeap() heap.Pop(wcs).(*connection).Close() @@ -1160,31 +1163,33 @@ func (me *Client) addConnection(t *torrent, c *connection) bool { return true } -func (me *Client) openNewConns() { - for _, t := range me.torrents { - select { - case <-t.ceasingNetworking: - continue - default: - } - for len(t.Peers) != 0 { - if len(t.HalfOpen) >= me.halfOpenLimit { - return - } - if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent { - break - } - var ( - k peersKey - p Peer - ) - for k, p = range t.Peers { - break - } - delete(t.Peers, k) - me.initiateConn(p, t) - } +func (me *Client) openNewConns(t *torrent) { + select { + case <-t.ceasingNetworking: + return + default: } + if t.haveInfo() && !me.downloadStrategy.PendingData(t) { + return + } + for len(t.Peers) != 0 { + if len(t.Conns) >= socketsPerTorrent { + break + } + if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit { + break + } + var ( + k peersKey + p Peer + ) + for k, p = range t.Peers { + break + } + delete(t.Peers, k) + me.initiateConn(p, t) + } + t.wantPeers.Broadcast() } // Adds peers to the swarm for the torrent corresponding to infoHash. @@ -1195,11 +1200,8 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { if t == nil { return errors.New("no such torrent") } - // for _, p := range peers { - // log.Printf("adding peer for %q: %s", infoHash, p) - // } t.AddPeers(peers) - me.openNewConns() + me.openNewConns(t) return nil } @@ -1257,30 +1259,59 @@ func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *tor HalfOpen: make(map[string]struct{}, halfOpenLimit), } + t.wantPeers.L = &t.stateMu t.GotMetainfo = t.gotMetainfo - t.Trackers = make([][]tracker.Client, len(announceList)) - for tierIndex := range announceList { - tier := t.Trackers[tierIndex] - for _, url := range announceList[tierIndex] { - tr, err := tracker.New(url) - if err != nil { - log.Print(err) - continue - } - tier = append(tier, tr) - } - // The trackers within each tier must be shuffled before use. - // http://stackoverflow.com/a/12267471/149482 - // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing - for i := range tier { - j := mathRand.Intn(i + 1) - tier[i], tier[j] = tier[j], tier[i] - } - t.Trackers[tierIndex] = tier + t.addTrackers(announceList) + return +} + +// The trackers within each tier must be shuffled before use. +// http://stackoverflow.com/a/12267471/149482 +// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing +func shuffleTier(tier []tracker.Client) { + for i := range tier { + j := mathRand.Intn(i + 1) + tier[i], tier[j] = tier[j], tier[i] + } +} + +func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) { + for _, tier := range base { + copy = append(copy, tier) } return } +func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client { +nextURL: + for _, url := range newURLs { + for _, tr := range tier { + if tr.URL() == url { + continue nextURL + } + } + tr, err := tracker.New(url) + if err != nil { + log.Printf("error creating tracker client for %q: %s", url, err) + continue + } + tier = append(tier, tr) + } + return tier +} + +func (t *torrent) addTrackers(announceList [][]string) { + newTrackers := copyTrackers(t.Trackers) + for tierIndex, tier := range announceList { + if tierIndex < len(newTrackers) { + newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier) + } else { + newTrackers = append(newTrackers, mergeTier(nil, tier)) + } + } + t.Trackers = newTrackers +} + type Torrent struct { cl *Client *torrent @@ -1409,19 +1440,40 @@ func (me *Client) AddTorrentFromFile(name string) (err error) { return me.AddTorrent(mi) } -func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { +// Returns true when peers are required, or false if the torrent is closing. +func (cl *Client) waitWantPeers(t *torrent) bool { + cl.mu.Lock() + defer cl.mu.Unlock() + t.stateMu.Lock() + defer t.stateMu.Unlock() for { + select { + case <-t.ceasingNetworking: + return false + default: + } + if len(t.Peers) < socketsPerTorrent*5 { + return true + } + cl.mu.Unlock() + t.wantPeers.Wait() + t.stateMu.Unlock() + cl.mu.Lock() + t.stateMu.Lock() + } +} + +func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { + for cl.waitWantPeers(t) { + log.Printf("announcing torrent %q to DHT", t) ps, err := cl.dHT.GetPeers(string(t.InfoHash[:])) if err != nil { log.Printf("error getting peers from dht: %s", err) return } - nextScrape := time.After(1 * time.Minute) getPeers: for { select { - case <-nextScrape: - break getPeers case v, ok := <-ps.Values: if !ok { break getPeers @@ -1447,6 +1499,7 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { } } ps.Close() + log.Printf("finished DHT peer scrape for %s", t) // After a GetPeers, we can announce on the best nodes that gave us an // announce token. @@ -1457,7 +1510,12 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) { if port != 0 { // We can't allow the port to be implied as long as the UTP and // DHT ports are different. - cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString()) + err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString()) + if err != nil { + log.Printf("error announcing torrent to DHT: %s", err) + } else { + log.Printf("announced %q to DHT", t) + } } } } @@ -1471,16 +1529,12 @@ func (cl *Client) announceTorrent(t *torrent) { InfoHash: t.InfoHash, } newAnnounce: - for { - select { - case <-t.ceasingNetworking: - return - default: - } + for cl.waitWantPeers(t) { cl.mu.Lock() req.Left = t.BytesLeft() + trackers := t.Trackers cl.mu.Unlock() - for _, tier := range t.Trackers { + for _, tier := range trackers { for trIndex, tr := range tier { if err := tr.Connect(); err != nil { log.Print(err) diff --git a/torrent.go b/torrent.go index 1fa05be1..be9ca711 100644 --- a/torrent.go +++ b/torrent.go @@ -46,8 +46,11 @@ type peersKey struct { } type torrent struct { - stateMu sync.Mutex - closing chan struct{} + stateMu sync.Mutex + closing chan struct{} + + // Closed when no more network activity is desired. This includes + // announcing, and communicating with peers. ceasingNetworking chan struct{} InfoHash InfoHash @@ -63,10 +66,12 @@ type torrent struct { Conns []*connection // Set of addrs to which we're attempting to connect. HalfOpen map[string]struct{} + // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with // them. That encourages us to reconnect to peers that are well known. - Peers map[peersKey]Peer + Peers map[peersKey]Peer + wantPeers sync.Cond // BEP 12 Multitracker Metadata Extension. The tracker.Client instances // mirror their respective URLs from the announce-list key.