Allow configuring Client torrent data opener, config dir, disabling metainfo cache, and prune with a timer instead of goroutine

This commit is contained in:
Matt Joiner 2015-02-25 14:48:39 +11:00
parent 61adeee308
commit 0eb418360b
7 changed files with 131 additions and 81 deletions

100
client.go
View File

@ -36,6 +36,8 @@ import (
"syscall" "syscall"
"time" "time"
filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
"bitbucket.org/anacrolix/go.torrent/dht" "bitbucket.org/anacrolix/go.torrent/dht"
"bitbucket.org/anacrolix/go.torrent/internal/pieceordering" "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
"bitbucket.org/anacrolix/go.torrent/iplist" "bitbucket.org/anacrolix/go.torrent/iplist"
@ -80,6 +82,8 @@ const (
// impact of a few bad apples. 4s loses 1% of successful handshakes that // impact of a few bad apples. 4s loses 1% of successful handshakes that
// are obtained with 60s timeout, and 5% of unsuccessful handshakes. // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
handshakeTimeout = 4 * time.Second handshakeTimeout = 4 * time.Second
pruneInterval = 10 * time.Second
) )
// Currently doesn't really queue, but should in the future. // Currently doesn't really queue, but should in the future.
@ -116,6 +120,11 @@ type Client struct {
disableTCP bool disableTCP bool
ipBlockList *iplist.IPList ipBlockList *iplist.IPList
bannedTorrents map[InfoHash]struct{} bannedTorrents map[InfoHash]struct{}
_configDir string
config Config
pruneTimer *time.Timer
torrentDataOpener TorrentDataOpener
mu sync.RWMutex mu sync.RWMutex
event sync.Cond event sync.Cond
@ -219,8 +228,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
} }
} }
// Read torrent data at the given offset. Returns ErrDataNotReady if the data // Read torrent data at the given offset. Will block until it is available.
// isn't available.
func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) { func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
cl.mu.Lock() cl.mu.Lock()
defer cl.mu.Unlock() defer cl.mu.Unlock()
@ -248,10 +256,10 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err
if len(p) == 0 { if len(p) == 0 {
panic(len(p)) panic(len(p))
} }
for !piece.Complete() { for !piece.Complete() && !t.isClosed() {
piece.Event.Wait() piece.Event.Wait()
} }
return t.Data.ReadAt(p, off) return t.data.ReadAt(p, off)
} }
func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) { func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
@ -272,8 +280,11 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
} }
func (cl *Client) configDir() string { func (cl *Client) configDir() string {
if cl._configDir == "" {
return filepath.Join(os.Getenv("HOME"), ".config/torrent") return filepath.Join(os.Getenv("HOME"), ".config/torrent")
} }
return cl._configDir
}
func (cl *Client) ConfigDir() string { func (cl *Client) ConfigDir() string {
return cl.configDir() return cl.configDir()
@ -393,6 +404,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
dataDir: cfg.DataDir, dataDir: cfg.DataDir,
disableUTP: cfg.DisableUTP, disableUTP: cfg.DisableUTP,
disableTCP: cfg.DisableTCP, disableTCP: cfg.DisableTCP,
_configDir: cfg.ConfigDir,
config: *cfg,
torrentDataOpener: func(md *metainfo.Info) (TorrentData, error) {
return filePkg.TorrentData(md, cfg.DataDir), nil
},
quit: make(chan struct{}), quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent), torrents: make(map[InfoHash]*torrent),
@ -1163,7 +1179,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
// routine. // routine.
// c.PeerRequests[request] = struct{}{} // c.PeerRequests[request] = struct{}{}
p := make([]byte, msg.Length) p := make([]byte, msg.Length)
n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin)) n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
if err != nil { if err != nil {
return fmt.Errorf("reading t data to serve request %q: %s", request, err) return fmt.Errorf("reading t data to serve request %q: %s", request, err)
} }
@ -1499,22 +1515,10 @@ func (cl *Client) saveTorrentFile(t *torrent) error {
return nil return nil
} }
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { func (cl *Client) startTorrent(t *torrent) {
err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu) if t.Info == nil || t.data == nil {
if err != nil { panic("nope")
return
} }
if err := cl.saveTorrentFile(t); err != nil {
log.Printf("error saving torrent file for %s: %s", t, err)
}
if strings.Contains(strings.ToLower(md.Name), "porn") {
cl.dropTorrent(t.InfoHash)
err = errors.New("no porn plx")
return
}
// If the client intends to upload, it needs to know what state pieces are // If the client intends to upload, it needs to know what state pieces are
// in. // in.
if !cl.noUpload { if !cl.noUpload {
@ -1529,9 +1533,43 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
} }
}() }()
} }
cl.downloadStrategy.TorrentStarted(t) cl.downloadStrategy.TorrentStarted(t)
}
// Storage cannot be changed once it's set.
func (cl *Client) setStorage(t *torrent, td TorrentData) (err error) {
err = t.setStorage(td)
cl.event.Broadcast()
if err != nil {
return
}
cl.startTorrent(t)
return
}
type TorrentDataOpener func(*metainfo.Info) (TorrentData, error)
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
err = t.setMetadata(md, bytes, &cl.mu)
if err != nil {
return
}
if !cl.config.DisableMetainfoCache {
if err := cl.saveTorrentFile(t); err != nil {
log.Printf("error saving torrent file for %s: %s", t, err)
}
}
if strings.Contains(strings.ToLower(md.Name), "porn") {
cl.dropTorrent(t.InfoHash)
err = errors.New("no porn plx")
return
}
close(t.gotMetainfo) close(t.gotMetainfo)
td, err := cl.torrentDataOpener(&md)
if err != nil {
return
}
err = cl.setStorage(t, td)
return return
} }
@ -1722,6 +1760,9 @@ func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
// Returns nil metainfo if it isn't in the cache. // Returns nil metainfo if it isn't in the cache.
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
if cl.config.DisableMetainfoCache {
return
}
f, err := os.Open(cl.torrentFileCachePath(ih)) f, err := os.Open(cl.torrentFileCachePath(ih))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -1768,16 +1809,15 @@ func (cl *Client) AddMagnet(uri string) (T Torrent, err error) {
return return
} }
// Actively prunes unused connections. This is required to make space to dial // Prunes unused connections. This is required to make space to dial for
// for replacements. // replacements.
func (cl *Client) connectionPruner(t *torrent) { func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
for {
select { select {
case <-t.ceasingNetworking: case <-t.ceasingNetworking:
return return
case <-t.closing: case <-t.closing:
return return
case <-time.After(15 * time.Second): default:
} }
cl.mu.Lock() cl.mu.Lock()
license := len(t.Conns) - (socketsPerTorrent+1)/2 license := len(t.Conns) - (socketsPerTorrent+1)/2
@ -1795,7 +1835,7 @@ func (cl *Client) connectionPruner(t *torrent) {
license-- license--
} }
cl.mu.Unlock() cl.mu.Unlock()
} t.pruneTimer.Reset(pruneInterval)
} }
func (me *Client) dropTorrent(infoHash InfoHash) (err error) { func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
@ -1835,7 +1875,9 @@ func (me *Client) addOrMergeTorrent(ih InfoHash, announceList [][]string) (T Tor
if me.dHT != nil { if me.dHT != nil {
go me.announceTorrentDHT(T.torrent, true) go me.announceTorrentDHT(T.torrent, true)
} }
go me.connectionPruner(T.torrent) T.torrent.pruneTimer = time.AfterFunc(0, func() {
me.pruneConnectionsUnlocked(T.torrent)
})
} }
return return
} }
@ -2178,7 +2220,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
cl.mu.Lock() cl.mu.Lock()
defer cl.mu.Unlock() defer cl.mu.Unlock()
p := t.Pieces[index] p := t.Pieces[index]
for p.Hashing { for p.Hashing || t.data == nil {
cl.event.Wait() cl.event.Wait()
} }
if t.isClosed() { if t.isClosed() {

View File

@ -70,7 +70,7 @@ func TestTorrentInitialState(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes, nil) err = tor.setMetadata(mi.Info.Info, mi.Info.Bytes, nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -16,4 +16,8 @@ type Config struct {
DisableUTP bool DisableUTP bool
DisableTCP bool DisableTCP bool
NoDefaultBlocklist bool NoDefaultBlocklist bool
// Defaults to "$HOME/.config/torrent"
ConfigDir string
DisableMetainfoCache bool
TorrentDataOpener
} }

View File

@ -13,12 +13,11 @@ type data struct {
loc string loc string
} }
func TorrentData(md *metainfo.Info, location string) (ret *data, err error) { func TorrentData(md *metainfo.Info, location string) data {
ret = &data{md, location} return data{md, location}
return
} }
func (me *data) ReadAt(p []byte, off int64) (n int, err error) { func (me data) ReadAt(p []byte, off int64) (n int, err error) {
for _, fi := range me.info.UpvertedFiles() { for _, fi := range me.info.UpvertedFiles() {
if off >= fi.Length { if off >= fi.Length {
off -= fi.Length off -= fi.Length
@ -48,9 +47,9 @@ func (me *data) ReadAt(p []byte, off int64) (n int, err error) {
return return
} }
func (me *data) Close() {} func (me data) Close() {}
func (me *data) WriteAt(p []byte, off int64) (n int, err error) { func (me data) WriteAt(p []byte, off int64) (n int, err error) {
for _, fi := range me.info.UpvertedFiles() { for _, fi := range me.info.UpvertedFiles() {
if off >= fi.Length { if off >= fi.Length {
off -= fi.Length off -= fi.Length
@ -82,7 +81,7 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) {
return return
} }
func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { func (me data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) {
for _, fi := range me.info.UpvertedFiles() { for _, fi := range me.info.UpvertedFiles() {
if off >= fi.Length { if off >= fi.Length {
off -= fi.Length off -= fi.Length
@ -112,6 +111,6 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er
return return
} }
func (me *data) fileInfoName(fi metainfo.FileInfo) string { func (me data) fileInfoName(fi metainfo.FileInfo) string {
return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...) return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...)
} }

View File

@ -169,6 +169,9 @@ func TestDownloadOnDemand(t *testing.T) {
ListenAddr: ":0", ListenAddr: ":0",
NoDefaultBlocklist: true, NoDefaultBlocklist: true,
// Ensure that the metainfo is obtained over the wire, since we added
// the torrent to the seeder by magnet.
DisableMetainfoCache: true,
}) })
if err != nil { if err != nil {
t.Fatalf("error creating seeder client: %s", err) t.Fatalf("error creating seeder client: %s", err)
@ -183,7 +186,6 @@ func TestDownloadOnDemand(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
leecher, err := torrent.NewClient(&torrent.Config{ leecher, err := torrent.NewClient(&torrent.Config{
DataDir: filepath.Join(layout.BaseDir, "download"),
DisableTrackers: true, DisableTrackers: true,
NoDHT: true, NoDHT: true,
ListenAddr: ":0", ListenAddr: ":0",
@ -191,6 +193,10 @@ func TestDownloadOnDemand(t *testing.T) {
NoDefaultBlocklist: true, NoDefaultBlocklist: true,
TorrentDataOpener: func(info *metainfo.Info) (torrent.TorrentData, error) {
return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download"))
},
// This can be used to check if clients can connect to other clients // This can be used to check if clients can connect to other clients
// with the same ID. // with the same ID.

View File

@ -10,8 +10,6 @@ import (
"sync" "sync"
"time" "time"
"bitbucket.org/anacrolix/go.torrent/data/file"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker" "bitbucket.org/anacrolix/go.torrent/tracker"
"bitbucket.org/anacrolix/go.torrent/util" "bitbucket.org/anacrolix/go.torrent/util"
@ -40,7 +38,7 @@ type peersKey struct {
Port int Port int
} }
type torrentData interface { type TorrentData interface {
ReadAt(p []byte, off int64) (n int, err error) ReadAt(p []byte, off int64) (n int, err error)
Close() Close()
WriteAt(p []byte, off int64) (n int, err error) WriteAt(p []byte, off int64) (n int, err error)
@ -60,9 +58,7 @@ type torrent struct {
Pieces []*piece Pieces []*piece
length int64 length int64
// Prevent mutations to Data memory maps while in use as they're not safe. data TorrentData
dataLock sync.RWMutex
Data torrentData
Info *MetaInfo Info *MetaInfo
// Active peer connections. // Active peer connections.
@ -85,6 +81,8 @@ type torrent struct {
gotMetainfo chan struct{} gotMetainfo chan struct{}
GotMetainfo <-chan struct{} GotMetainfo <-chan struct{}
pruneTimer *time.Timer
} }
func (t *torrent) numConnsUnchoked() (num int) { func (t *torrent) numConnsUnchoked() (num int) {
@ -129,6 +127,7 @@ func (t *torrent) ceaseNetworking() {
for _, c := range t.Conns { for _, c := range t.Conns {
c.Close() c.Close()
} }
t.pruneTimer.Stop()
} }
func (t *torrent) AddPeers(pp []Peer) { func (t *torrent) AddPeers(pp []Peer) {
@ -183,7 +182,7 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) {
} }
// Called when metadata for a torrent becomes available. // Called when metadata for a torrent becomes available.
func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) { func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) {
t.Info = newMetaInfo(&md) t.Info = newMetaInfo(&md)
t.length = 0 t.length = 0
for _, f := range t.Info.UpvertedFiles() { for _, f := range t.Info.UpvertedFiles() {
@ -204,11 +203,14 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
conn.Close() conn.Close()
} }
} }
t.Data, err = file.TorrentData(&md, dataDir)
if err != nil {
err = fmt.Errorf("error mmap'ing torrent data: %s", err)
return return
} }
func (t *torrent) setStorage(td TorrentData) (err error) {
if t.data != nil {
t.data.Close()
}
t.data = td
return return
} }
@ -477,12 +479,9 @@ func (t *torrent) close() (err error) {
} }
t.ceaseNetworking() t.ceaseNetworking()
close(t.closing) close(t.closing)
t.dataLock.Lock() if t.data != nil {
if t.Data != nil { t.data.Close()
t.Data.Close()
t.Data = nil
} }
t.dataLock.Unlock()
for _, conn := range t.Conns { for _, conn := range t.Conns {
conn.Close() conn.Close()
} }
@ -525,7 +524,7 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
} }
func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
_, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin) _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
return return
} }
@ -583,9 +582,7 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) { func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
hash := pieceHash.New() hash := pieceHash.New()
t.dataLock.RLock() t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
t.dataLock.RUnlock()
util.CopyExact(ps[:], hash.Sum(nil)) util.CopyExact(ps[:], hash.Sum(nil))
return return
} }

View File

@ -3,6 +3,7 @@ package torrent
import ( import (
"sync" "sync"
"testing" "testing"
"time"
"bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/peer_protocol"
) )
@ -45,6 +46,7 @@ func TestTorrentRequest(t *testing.T) {
func TestTorrentDoubleClose(t *testing.T) { func TestTorrentDoubleClose(t *testing.T) {
tt, err := newTorrent(InfoHash{}, nil, 0) tt, err := newTorrent(InfoHash{}, nil, 0)
tt.pruneTimer = time.NewTimer(0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }