Don't open new connections or announce when new connections aren't wanted
This commit is contained in:
parent
9c37205dde
commit
06aff91f8c
178
client.go
178
client.go
|
@ -109,6 +109,7 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
|
|||
for _, cn := range t.Conns {
|
||||
me.replenishConnRequests(t, cn)
|
||||
}
|
||||
me.openNewConns(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -277,7 +278,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
|
|||
noUpload: cfg.NoUpload,
|
||||
disableTrackers: cfg.DisableTrackers,
|
||||
downloadStrategy: cfg.DownloadStrategy,
|
||||
halfOpenLimit: 100,
|
||||
halfOpenLimit: socketsPerTorrent,
|
||||
dataDir: cfg.DataDir,
|
||||
disableUTP: cfg.DisableUTP,
|
||||
|
||||
|
@ -493,7 +494,7 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
|
|||
panic("invariant broken")
|
||||
}
|
||||
delete(t.HalfOpen, addr)
|
||||
me.openNewConns()
|
||||
me.openNewConns(t)
|
||||
}()
|
||||
if res.Conn == nil {
|
||||
return
|
||||
|
@ -1131,7 +1132,7 @@ func (me *Client) dropConnection(torrent *torrent, conn *connection) {
|
|||
torrent.Conns[i0] = torrent.Conns[i1]
|
||||
}
|
||||
torrent.Conns = torrent.Conns[:i1]
|
||||
me.openNewConns()
|
||||
me.openNewConns(torrent)
|
||||
return
|
||||
}
|
||||
panic("connection not found")
|
||||
|
@ -1153,6 +1154,8 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
|
|||
}
|
||||
}
|
||||
t.Conns = append(t.Conns, c)
|
||||
// TODO: This should probably be done by a routine that kills off bad
|
||||
// connections, and extra connections killed here instead.
|
||||
if len(t.Conns) > socketsPerTorrent {
|
||||
wcs := t.worstConnsHeap()
|
||||
heap.Pop(wcs).(*connection).Close()
|
||||
|
@ -1160,31 +1163,33 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (me *Client) openNewConns() {
|
||||
for _, t := range me.torrents {
|
||||
select {
|
||||
case <-t.ceasingNetworking:
|
||||
continue
|
||||
default:
|
||||
}
|
||||
for len(t.Peers) != 0 {
|
||||
if len(t.HalfOpen) >= me.halfOpenLimit {
|
||||
return
|
||||
}
|
||||
if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
|
||||
break
|
||||
}
|
||||
var (
|
||||
k peersKey
|
||||
p Peer
|
||||
)
|
||||
for k, p = range t.Peers {
|
||||
break
|
||||
}
|
||||
delete(t.Peers, k)
|
||||
me.initiateConn(p, t)
|
||||
}
|
||||
func (me *Client) openNewConns(t *torrent) {
|
||||
select {
|
||||
case <-t.ceasingNetworking:
|
||||
return
|
||||
default:
|
||||
}
|
||||
if t.haveInfo() && !me.downloadStrategy.PendingData(t) {
|
||||
return
|
||||
}
|
||||
for len(t.Peers) != 0 {
|
||||
if len(t.Conns) >= socketsPerTorrent {
|
||||
break
|
||||
}
|
||||
if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit {
|
||||
break
|
||||
}
|
||||
var (
|
||||
k peersKey
|
||||
p Peer
|
||||
)
|
||||
for k, p = range t.Peers {
|
||||
break
|
||||
}
|
||||
delete(t.Peers, k)
|
||||
me.initiateConn(p, t)
|
||||
}
|
||||
t.wantPeers.Broadcast()
|
||||
}
|
||||
|
||||
// Adds peers to the swarm for the torrent corresponding to infoHash.
|
||||
|
@ -1195,11 +1200,8 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
|
|||
if t == nil {
|
||||
return errors.New("no such torrent")
|
||||
}
|
||||
// for _, p := range peers {
|
||||
// log.Printf("adding peer for %q: %s", infoHash, p)
|
||||
// }
|
||||
t.AddPeers(peers)
|
||||
me.openNewConns()
|
||||
me.openNewConns(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1257,30 +1259,59 @@ func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *tor
|
|||
|
||||
HalfOpen: make(map[string]struct{}, halfOpenLimit),
|
||||
}
|
||||
t.wantPeers.L = &t.stateMu
|
||||
t.GotMetainfo = t.gotMetainfo
|
||||
t.Trackers = make([][]tracker.Client, len(announceList))
|
||||
for tierIndex := range announceList {
|
||||
tier := t.Trackers[tierIndex]
|
||||
for _, url := range announceList[tierIndex] {
|
||||
tr, err := tracker.New(url)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
continue
|
||||
}
|
||||
tier = append(tier, tr)
|
||||
}
|
||||
// The trackers within each tier must be shuffled before use.
|
||||
// http://stackoverflow.com/a/12267471/149482
|
||||
// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
|
||||
for i := range tier {
|
||||
j := mathRand.Intn(i + 1)
|
||||
tier[i], tier[j] = tier[j], tier[i]
|
||||
}
|
||||
t.Trackers[tierIndex] = tier
|
||||
t.addTrackers(announceList)
|
||||
return
|
||||
}
|
||||
|
||||
// The trackers within each tier must be shuffled before use.
|
||||
// http://stackoverflow.com/a/12267471/149482
|
||||
// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
|
||||
func shuffleTier(tier []tracker.Client) {
|
||||
for i := range tier {
|
||||
j := mathRand.Intn(i + 1)
|
||||
tier[i], tier[j] = tier[j], tier[i]
|
||||
}
|
||||
}
|
||||
|
||||
func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
|
||||
for _, tier := range base {
|
||||
copy = append(copy, tier)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
|
||||
nextURL:
|
||||
for _, url := range newURLs {
|
||||
for _, tr := range tier {
|
||||
if tr.URL() == url {
|
||||
continue nextURL
|
||||
}
|
||||
}
|
||||
tr, err := tracker.New(url)
|
||||
if err != nil {
|
||||
log.Printf("error creating tracker client for %q: %s", url, err)
|
||||
continue
|
||||
}
|
||||
tier = append(tier, tr)
|
||||
}
|
||||
return tier
|
||||
}
|
||||
|
||||
func (t *torrent) addTrackers(announceList [][]string) {
|
||||
newTrackers := copyTrackers(t.Trackers)
|
||||
for tierIndex, tier := range announceList {
|
||||
if tierIndex < len(newTrackers) {
|
||||
newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
|
||||
} else {
|
||||
newTrackers = append(newTrackers, mergeTier(nil, tier))
|
||||
}
|
||||
}
|
||||
t.Trackers = newTrackers
|
||||
}
|
||||
|
||||
type Torrent struct {
|
||||
cl *Client
|
||||
*torrent
|
||||
|
@ -1409,19 +1440,40 @@ func (me *Client) AddTorrentFromFile(name string) (err error) {
|
|||
return me.AddTorrent(mi)
|
||||
}
|
||||
|
||||
func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
|
||||
// Returns true when peers are required, or false if the torrent is closing.
|
||||
func (cl *Client) waitWantPeers(t *torrent) bool {
|
||||
cl.mu.Lock()
|
||||
defer cl.mu.Unlock()
|
||||
t.stateMu.Lock()
|
||||
defer t.stateMu.Unlock()
|
||||
for {
|
||||
select {
|
||||
case <-t.ceasingNetworking:
|
||||
return false
|
||||
default:
|
||||
}
|
||||
if len(t.Peers) < socketsPerTorrent*5 {
|
||||
return true
|
||||
}
|
||||
cl.mu.Unlock()
|
||||
t.wantPeers.Wait()
|
||||
t.stateMu.Unlock()
|
||||
cl.mu.Lock()
|
||||
t.stateMu.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
|
||||
for cl.waitWantPeers(t) {
|
||||
log.Printf("announcing torrent %q to DHT", t)
|
||||
ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
|
||||
if err != nil {
|
||||
log.Printf("error getting peers from dht: %s", err)
|
||||
return
|
||||
}
|
||||
nextScrape := time.After(1 * time.Minute)
|
||||
getPeers:
|
||||
for {
|
||||
select {
|
||||
case <-nextScrape:
|
||||
break getPeers
|
||||
case v, ok := <-ps.Values:
|
||||
if !ok {
|
||||
break getPeers
|
||||
|
@ -1447,6 +1499,7 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
|
|||
}
|
||||
}
|
||||
ps.Close()
|
||||
log.Printf("finished DHT peer scrape for %s", t)
|
||||
|
||||
// After a GetPeers, we can announce on the best nodes that gave us an
|
||||
// announce token.
|
||||
|
@ -1457,7 +1510,12 @@ func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
|
|||
if port != 0 {
|
||||
// We can't allow the port to be implied as long as the UTP and
|
||||
// DHT ports are different.
|
||||
cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
|
||||
err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
|
||||
if err != nil {
|
||||
log.Printf("error announcing torrent to DHT: %s", err)
|
||||
} else {
|
||||
log.Printf("announced %q to DHT", t)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1471,16 +1529,12 @@ func (cl *Client) announceTorrent(t *torrent) {
|
|||
InfoHash: t.InfoHash,
|
||||
}
|
||||
newAnnounce:
|
||||
for {
|
||||
select {
|
||||
case <-t.ceasingNetworking:
|
||||
return
|
||||
default:
|
||||
}
|
||||
for cl.waitWantPeers(t) {
|
||||
cl.mu.Lock()
|
||||
req.Left = t.BytesLeft()
|
||||
trackers := t.Trackers
|
||||
cl.mu.Unlock()
|
||||
for _, tier := range t.Trackers {
|
||||
for _, tier := range trackers {
|
||||
for trIndex, tr := range tier {
|
||||
if err := tr.Connect(); err != nil {
|
||||
log.Print(err)
|
||||
|
|
11
torrent.go
11
torrent.go
|
@ -46,8 +46,11 @@ type peersKey struct {
|
|||
}
|
||||
|
||||
type torrent struct {
|
||||
stateMu sync.Mutex
|
||||
closing chan struct{}
|
||||
stateMu sync.Mutex
|
||||
closing chan struct{}
|
||||
|
||||
// Closed when no more network activity is desired. This includes
|
||||
// announcing, and communicating with peers.
|
||||
ceasingNetworking chan struct{}
|
||||
|
||||
InfoHash InfoHash
|
||||
|
@ -63,10 +66,12 @@ type torrent struct {
|
|||
Conns []*connection
|
||||
// Set of addrs to which we're attempting to connect.
|
||||
HalfOpen map[string]struct{}
|
||||
|
||||
// Reserve of peers to connect to. A peer can be both here and in the
|
||||
// active connections if were told about the peer after connecting with
|
||||
// them. That encourages us to reconnect to peers that are well known.
|
||||
Peers map[peersKey]Peer
|
||||
Peers map[peersKey]Peer
|
||||
wantPeers sync.Cond
|
||||
|
||||
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
|
||||
// mirror their respective URLs from the announce-list key.
|
||||
|
|
Loading…
Reference in New Issue