Move half-open tracking into per-torrent
This commit is contained in:
parent
963918ac90
commit
e37d369864
39
client.go
39
client.go
|
@ -127,9 +127,9 @@ type Client struct {
|
|||
event sync.Cond
|
||||
quit chan struct{}
|
||||
|
||||
halfOpen int
|
||||
handshaking int
|
||||
torrents map[InfoHash]*torrent
|
||||
|
||||
torrents map[InfoHash]*torrent
|
||||
|
||||
dataWaits map[*torrent][]dataWait
|
||||
}
|
||||
|
@ -149,7 +149,6 @@ func (cl *Client) WriteStatus(w io.Writer) {
|
|||
defer cl.mu.Unlock()
|
||||
fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
|
||||
fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
|
||||
fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
|
||||
fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
|
||||
if cl.dHT != nil {
|
||||
fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
|
||||
|
@ -373,18 +372,12 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
|
|||
if peer.Id == me.peerID {
|
||||
return
|
||||
}
|
||||
addr := &net.TCPAddr{
|
||||
IP: peer.IP,
|
||||
Port: peer.Port,
|
||||
addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
|
||||
if t.addrActive(addr) {
|
||||
duplicateConnsAvoided.Add(1)
|
||||
return
|
||||
}
|
||||
// Don't connect to the same address twice for the same torrent.
|
||||
for _, c := range torrent.Conns {
|
||||
if c.Socket.RemoteAddr().String() == addr.String() {
|
||||
duplicateConnsAvoided.Add(1)
|
||||
return
|
||||
}
|
||||
}
|
||||
me.halfOpen++
|
||||
t.HalfOpen[addr] = struct{}{}
|
||||
go func() {
|
||||
// Binding to the listener address and dialing via net.Dialer gives
|
||||
// "address in use" error. It seems it's not possible to dial out from
|
||||
|
@ -401,10 +394,10 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
|
|||
go func() {
|
||||
me.mu.Lock()
|
||||
defer me.mu.Unlock()
|
||||
if me.halfOpen == 0 {
|
||||
panic("assert")
|
||||
if _, ok := t.HalfOpen[addr]; !ok {
|
||||
panic("invariant broken")
|
||||
}
|
||||
me.halfOpen--
|
||||
delete(t.HalfOpen, addr)
|
||||
me.openNewConns()
|
||||
}()
|
||||
|
||||
|
@ -1078,10 +1071,10 @@ func (me *Client) openNewConns() {
|
|||
default:
|
||||
}
|
||||
for len(t.Peers) != 0 {
|
||||
if me.halfOpen >= me.halfOpenLimit {
|
||||
if len(t.HalfOpen) >= me.halfOpenLimit {
|
||||
return
|
||||
}
|
||||
if me.halfOpen+me.handshaking+len(t.Conns) >= socketsPerTorrent {
|
||||
if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
|
||||
break
|
||||
}
|
||||
var (
|
||||
|
@ -1152,7 +1145,7 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
|
|||
// Prepare a Torrent without any attachment to a Client. That means we can
|
||||
// initialize fields all fields that don't require the Client without locking
|
||||
// it.
|
||||
func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
|
||||
func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
|
||||
t = &torrent{
|
||||
InfoHash: ih,
|
||||
Peers: make(map[peersKey]Peer, 2000),
|
||||
|
@ -1161,6 +1154,8 @@ func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
|
|||
ceasingNetworking: make(chan struct{}),
|
||||
|
||||
gotMetainfo: make(chan *metainfo.MetaInfo, 1),
|
||||
|
||||
HalfOpen: make(map[string]struct{}, halfOpenLimit),
|
||||
}
|
||||
t.GotMetainfo = t.gotMetainfo
|
||||
t.Trackers = make([][]tracker.Client, len(announceList))
|
||||
|
@ -1191,7 +1186,7 @@ func (cl *Client) AddMagnet(uri string) (t *torrent, err error) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
t, err = newTorrent(m.InfoHash, [][]string{m.Trackers})
|
||||
t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}, cl.halfOpenLimit)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -1245,7 +1240,7 @@ func (me *Client) addTorrent(t *torrent) (err error) {
|
|||
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
|
||||
var ih InfoHash
|
||||
CopyExact(&ih, metaInfo.Info.Hash)
|
||||
t, err := newTorrent(ih, metaInfo.AnnounceList)
|
||||
t, err := newTorrent(ih, metaInfo.AnnounceList, me.halfOpenLimit)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestTorrentInitialState(t *testing.T) {
|
|||
tor, err := newTorrent(func() (ih InfoHash) {
|
||||
util.CopyExact(ih[:], mi.Info.Hash)
|
||||
return
|
||||
}(), nil)
|
||||
}(), nil, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
22
torrent.go
22
torrent.go
|
@ -58,9 +58,16 @@ type torrent struct {
|
|||
dataLock sync.RWMutex
|
||||
Data mmap_span.MMapSpan
|
||||
|
||||
Info *metainfo.Info
|
||||
Info *metainfo.Info
|
||||
// Active peer connections.
|
||||
Conns []*connection
|
||||
// Set of addrs to which we're attempting to connect.
|
||||
HalfOpen map[string]struct{}
|
||||
// Reserve of peers to connect to. A peer can be both here and in the
|
||||
// active connections if were told about the peer after connecting with
|
||||
// them. That encourages us to reconnect to peers that are well known.
|
||||
Peers map[peersKey]Peer
|
||||
|
||||
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
|
||||
// mirror their respective URLs from the announce-list key.
|
||||
Trackers [][]tracker.Client
|
||||
|
@ -72,6 +79,18 @@ type torrent struct {
|
|||
GotMetainfo <-chan *metainfo.MetaInfo
|
||||
}
|
||||
|
||||
func (t *torrent) addrActive(addr string) bool {
|
||||
if _, ok := t.HalfOpen[addr]; ok {
|
||||
return true
|
||||
}
|
||||
for _, c := range t.Conns {
|
||||
if c.Socket.RemoteAddr().String() == addr {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *torrent) worstConnsHeap() (wcs *worstConns) {
|
||||
wcs = &worstConns{
|
||||
c: append([]*connection{}, t.Conns...),
|
||||
|
@ -301,6 +320,7 @@ func (t *torrent) WriteStatus(w io.Writer) {
|
|||
}
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
|
||||
fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
|
||||
fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
|
||||
sort.Sort(&worstConns{
|
||||
c: t.Conns,
|
||||
|
|
|
@ -44,7 +44,7 @@ func TestTorrentRequest(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTorrentDoubleClose(t *testing.T) {
|
||||
tt, err := newTorrent(InfoHash{}, nil)
|
||||
tt, err := newTorrent(InfoHash{}, nil, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue