Prioritize pending nodes with BEP 40

This commit is contained in:
Matt Joiner 2018-04-04 17:59:28 +10:00
parent d950677f67
commit 92f6209c5f
4 changed files with 122 additions and 46 deletions

32
Peer.go Normal file
View File

@ -0,0 +1,32 @@
package torrent
import (
"net"
"github.com/anacrolix/dht/krpc"
)
type Peer struct {
Id [20]byte
IP net.IP
Port int
Source peerSource
// Peer is known to support encryption.
SupportsEncryption bool
pexPeerFlags
}
func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
me.IP = append([]byte(nil), na.IP...)
me.Port = na.Port
me.Source = peerSourcePEX
// If they prefer encryption, they must support it.
if fs.Get(pexPrefersEncryption) {
me.SupportsEncryption = true
}
me.pexPeerFlags = fs
}
func (me Peer) addr() ipPort {
return ipPort{me.IP, uint16(me.Port)}
}

View File

@ -24,6 +24,7 @@ import (
"github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/sync"
"github.com/dustin/go-humanize"
"github.com/google/btree"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
@ -1029,7 +1030,12 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
t = &Torrent{
cl: cl,
infoHash: ih,
peers: make(map[peersKey]Peer),
peers: prioritizedPeers{
om: btree.New(2),
getPrio: func(p Peer) peerPriority {
return bep40Priority(cl.publicAddr(p.IP), p.addr())
},
},
conns: make(map[*connection]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
halfOpen: make(map[string]Peer),
@ -1251,3 +1257,26 @@ func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
Source: peerSourceDHTAnnouncePeer,
}})
}
func firstNotNil(ips ...net.IP) net.IP {
for _, ip := range ips {
if ip != nil {
return ip
}
}
return nil
}
func (cl *Client) publicIp(peer net.IP) net.IP {
// TODO: Use BEP 10 to determine how peers are seeing us.
if peer.To4() != nil {
return firstNotNil(cl.config.PublicIp4, missinggo.AddrIP(cl.ListenAddr()).To4())
} else {
return firstNotNil(cl.config.PublicIp6, missinggo.AddrIP(cl.ListenAddr()).To16())
}
}
// Our IP as a peer should see it.
func (cl *Client) publicAddr(peer net.IP) ipPort {
return ipPort{cl.publicIp(peer), uint16(cl.incomingPeerPort())}
}

43
prioritized_peers.go Normal file
View File

@ -0,0 +1,43 @@
package torrent
import "github.com/google/btree"
// Peers are stored with their priority at insertion. Their priority may
// change if our apparent IP changes, we don't currently handle that.
type prioritizedPeersItem struct {
prio peerPriority
p Peer
}
func (me prioritizedPeersItem) Less(than btree.Item) bool {
return me.prio < than.(prioritizedPeersItem).prio
}
type prioritizedPeers struct {
om *btree.BTree
getPrio func(Peer) peerPriority
}
func (me *prioritizedPeers) Each(f func(Peer)) {
me.om.Ascend(func(i btree.Item) bool {
f(i.(prioritizedPeersItem).p)
return true
})
}
func (me *prioritizedPeers) Len() int {
return me.om.Len()
}
// Returns true if a peer is replaced.
func (me *prioritizedPeers) Add(p Peer) bool {
return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil
}
func (me *prioritizedPeers) DeleteMin() {
me.om.DeleteMin()
}
func (me *prioritizedPeers) PopMax() Peer {
return me.om.DeleteMax().(prioritizedPeersItem).p
}

View File

@ -17,7 +17,6 @@ import (
"time"
"github.com/anacrolix/dht"
"github.com/anacrolix/dht/krpc"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap"
@ -96,7 +95,7 @@ type Torrent struct {
// active connections if were told about the peer after connecting with
// them. That encourages us to reconnect to peers that are well known in
// the swarm.
peers map[peersKey]Peer
peers prioritizedPeers
wantPeersEvent missinggo.Event
// An announcer for each tracker URL.
trackerAnnouncers map[string]*trackerScraper
@ -155,9 +154,9 @@ func (t *Torrent) Closed() <-chan struct{} {
// pending, and half-open peers.
func (t *Torrent) KnownSwarm() (ks []Peer) {
// Add pending peers to the list
for _, peer := range t.peers {
t.peers.Each(func(peer Peer) {
ks = append(ks, peer)
}
})
// Add half-open peers to the list
for _, peer := range t.halfOpen {
@ -254,13 +253,14 @@ func (t *Torrent) addPeer(p Peer) {
torrent.Add("peers not added because of bad addr", 1)
return
}
t.openNewConns()
if len(t.peers) >= cl.config.TorrentPeersHighWater {
return
if t.peers.Add(p) {
torrent.Add("peers replaced", 1)
}
t.peers[peersKey{string(p.IP), p.Port}] = p
t.openNewConns()
for t.peers.Len() > cl.config.TorrentPeersHighWater {
t.peers.DeleteMin()
torrent.Add("peers discarded", 1)
}
}
func (t *Torrent) invalidateMetadata() {
@ -735,27 +735,6 @@ func (t *Torrent) pendAllChunkSpecs(pieceIndex int) {
t.pieces[pieceIndex].dirtyChunks.Clear()
}
type Peer struct {
Id [20]byte
IP net.IP
Port int
Source peerSource
// Peer is known to support encryption.
SupportsEncryption bool
pexPeerFlags
}
func (me *Peer) FromPex(na krpc.NodeAddr, fs pexPeerFlags) {
me.IP = append([]byte(nil), na.IP...)
me.Port = na.Port
me.Source = peerSourcePEX
// If they prefer encryption, they must support it.
if fs.Get(pexPrefersEncryption) {
me.SupportsEncryption = true
}
me.pexPeerFlags = fs
}
func (t *Torrent) pieceLength(piece int) pp.Integer {
if t.info.PieceLength == 0 {
// There will be no variance amongst pieces. Only pain.
@ -1077,21 +1056,14 @@ func (t *Torrent) maxHalfOpen() int {
func (t *Torrent) openNewConns() {
defer t.updateWantPeersEvent()
for len(t.peers) != 0 {
for t.peers.Len() != 0 {
if !t.wantConns() {
return
}
if len(t.halfOpen) >= t.maxHalfOpen() {
return
}
var (
k peersKey
p Peer
)
for k, p = range t.peers {
break
}
delete(t.peers, k)
p := t.peers.PopMax()
t.initiateConn(p)
}
}
@ -1272,7 +1244,7 @@ func (t *Torrent) wantPeers() bool {
if t.closed.IsSet() {
return false
}
if len(t.peers) > t.cl.config.TorrentPeersLowWater {
if t.peers.Len() > t.cl.config.TorrentPeersLowWater {
return false
}
return t.needData() || t.seeding()
@ -1395,7 +1367,7 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
}
cl.mu.Lock()
t.addPeers(addPeers)
numPeers := len(t.peers)
numPeers := t.peers.Len()
cl.mu.Unlock()
if numPeers >= cl.config.TorrentPeersHighWater {
return
@ -1458,7 +1430,7 @@ func (t *Torrent) Stats() TorrentStats {
func (t *Torrent) statsLocked() TorrentStats {
t.stats.ActivePeers = len(t.conns)
t.stats.HalfOpenPeers = len(t.halfOpen)
t.stats.PendingPeers = len(t.peers)
t.stats.PendingPeers = t.peers.Len()
t.stats.TotalPeers = t.numTotalPeers()
t.stats.ConnectedSeeders = 0
for c := range t.conns {
@ -1483,9 +1455,9 @@ func (t *Torrent) numTotalPeers() int {
for addr := range t.halfOpen {
peers[addr] = struct{}{}
}
for _, peer := range t.peers {
t.peers.Each(func(peer Peer) {
peers[fmt.Sprintf("%s:%d", peer.IP, peer.Port)] = struct{}{}
}
})
return len(peers)
}