2
0
mirror of synced 2025-02-23 14:18:13 +00:00

More cleaning of public interface

This commit is contained in:
Matt Joiner 2015-03-08 17:28:14 +11:00
parent 3e753bb8ad
commit ad6ac3f2cd
13 changed files with 114 additions and 126 deletions

View File

@ -114,7 +114,7 @@ type Client struct {
listeners []net.Listener
utpSock *utp.Socket
disableTrackers bool
downloadStrategy DownloadStrategy
downloadStrategy downloadStrategy
dHT *dht.Server
disableUTP bool
disableTCP bool
@ -192,6 +192,8 @@ func (cl *Client) sortedTorrents() (ret []*torrent) {
return
}
// Writes out a human readable status of the client, such as for writing to a
// HTTP status page.
func (cl *Client) WriteStatus(_w io.Writer) {
cl.mu.RLock()
defer cl.mu.RUnlock()
@ -290,7 +292,7 @@ type SectionOpener interface {
OpenSection(off, n int64) (io.ReadCloser, error)
}
func dataReadAt(d Data, b []byte, off int64) (n int, err error) {
func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
if ra, ok := d.(io.ReaderAt); ok {
return ra.ReadAt(b, off)
}
@ -447,15 +449,14 @@ func NewClient(cfg *Config) (cl *Client, err error) {
}
cl = &Client{
noUpload: cfg.NoUpload,
disableTrackers: cfg.DisableTrackers,
downloadStrategy: cfg.DownloadStrategy,
halfOpenLimit: socketsPerTorrent,
dataDir: cfg.DataDir,
disableUTP: cfg.DisableUTP,
disableTCP: cfg.DisableTCP,
_configDir: cfg.ConfigDir,
config: *cfg,
noUpload: cfg.NoUpload,
disableTrackers: cfg.DisableTrackers,
halfOpenLimit: socketsPerTorrent,
dataDir: cfg.DataDir,
disableUTP: cfg.DisableUTP,
disableTCP: cfg.DisableTCP,
_configDir: cfg.ConfigDir,
config: *cfg,
torrentDataOpener: func(md *metainfo.Info) data.Data {
return filePkg.TorrentData(md, cfg.DataDir)
},
@ -483,7 +484,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
if cfg.PeerID != "" {
CopyExact(&cl.peerID, cfg.PeerID)
} else {
o := copy(cl.peerID[:], BEP20)
o := copy(cl.peerID[:], bep20)
_, err = rand.Read(cl.peerID[o:])
if err != nil {
panic("error generating peer id")
@ -491,7 +492,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
}
if cl.downloadStrategy == nil {
cl.downloadStrategy = &DefaultDownloadStrategy{}
cl.downloadStrategy = &defaultDownloadStrategy{}
}
// Returns the laddr string to listen on for the next Listen call.
@ -557,7 +558,7 @@ func (cl *Client) stopped() bool {
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
func (me *Client) Stop() {
func (me *Client) Close() {
me.mu.Lock()
defer me.mu.Unlock()
close(me.quit)
@ -622,12 +623,7 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
}
func (me *Client) torrent(ih InfoHash) *torrent {
for _, t := range me.torrents {
if t.InfoHash == ih {
return t
}
}
return nil
return me.torrents[ih]
}
type dialResult struct {
@ -1342,7 +1338,8 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
break
}
go func() {
err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
me.mu.Lock()
me.addPeers(t, func() (ret []Peer) {
for _, cp := range pexMsg.Added {
p := Peer{
IP: make([]byte, 4),
@ -1356,10 +1353,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
}
return
}())
if err != nil {
log.Printf("error adding PEX peers: %s", err)
return
}
me.mu.Unlock()
peersFoundByPEX.Add(int64(len(pexMsg.Added)))
}()
default:
@ -1513,34 +1507,19 @@ func (me *Client) openNewConns(t *torrent) {
func (me *Client) addPeers(t *torrent, peers []Peer) {
blocked := 0
for i, p := range peers {
if me.ipBlockRange(p.IP) == nil {
for _, p := range peers {
if me.ipBlockRange(p.IP) != nil {
blocked++
continue
}
peers[i] = peers[len(peers)-1]
peers = peers[:len(peers)-1]
i--
blocked++
t.addPeer(p)
}
if blocked != 0 {
log.Printf("IP blocklist screened %d peers from being added", blocked)
}
t.AddPeers(peers)
me.openNewConns(t)
}
// Adds peers to the swarm for the torrent corresponding to infoHash.
func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
me.mu.Lock()
defer me.mu.Unlock()
t := me.torrent(infoHash)
if t == nil {
return errors.New("no such torrent")
}
me.addPeers(t, peers)
return nil
}
func (cl *Client) torrentFileCachePath(ih InfoHash) string {
return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
}
@ -1592,7 +1571,7 @@ func (cl *Client) startTorrent(t *torrent) {
}
// Storage cannot be changed once it's set.
func (cl *Client) setStorage(t *torrent, td Data) (err error) {
func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
err = t.setStorage(td)
cl.event.Broadcast()
if err != nil {
@ -1881,7 +1860,11 @@ func (t Torrent) MetainfoFilepath() string {
}
func (t Torrent) AddPeers(pp []Peer) error {
return t.cl.AddPeers(t.torrent.InfoHash, pp)
cl := t.cl
cl.mu.Lock()
defer cl.mu.Unlock()
cl.addPeers(t.torrent, pp)
return nil
}
func (t Torrent) DownloadAll() {
@ -2142,12 +2125,10 @@ func (cl *Client) announceTorrentSingleTracker(tr tracker.Client, req *tracker.A
Port: peer.Port,
})
}
err = cl.AddPeers(t.InfoHash, peers)
if err != nil {
log.Printf("error adding peers to torrent %s: %s", t, err)
} else {
log.Printf("%s: %d new peers from %s", t, len(peers), tr)
}
cl.mu.Lock()
cl.addPeers(t, peers)
cl.mu.Unlock()
log.Printf("%s: %d new peers from %s", t, len(peers), tr)
time.Sleep(time.Second * time.Duration(resp.Interval))
return nil
@ -2393,14 +2374,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
cl.pieceHashed(t, index, sum == p.Hash)
}
func (cl *Client) Torrent(ih InfoHash) (t Torrent, ok bool) {
cl.mu.Lock()
defer cl.mu.Unlock()
t.torrent, ok = cl.torrents[ih]
t.cl = cl
return
}
// Returns handles to all the torrents loaded in the Client.
func (me *Client) Torrents() (ret []Torrent) {
me.mu.Lock()
for _, t := range me.torrents {

View File

@ -37,7 +37,7 @@ func TestClientDefault(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cl.Stop()
cl.Close()
}
func TestAddDropTorrent(t *testing.T) {
@ -47,7 +47,7 @@ func TestAddDropTorrent(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer cl.Stop()
defer cl.Close()
dir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(dir)
tt, err := cl.AddTorrent(mi)
@ -223,13 +223,13 @@ func TestTwoClientsArbitraryPorts(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer cl.Stop()
defer cl.Close()
}
}
func TestAddDropManyTorrents(t *testing.T) {
cl, _ := NewClient(&TestingConfig)
defer cl.Stop()
defer cl.Close()
var m Magnet
for i := range iter.N(1000) {
binary.PutVarint(m.InfoHash[:], int64(i))
@ -246,7 +246,7 @@ func TestClientTransfer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer seeder.Stop()
defer seeder.Close()
seeder.AddTorrent(mi)
leecherDataDir, err := ioutil.TempDir("", "")
if err != nil {
@ -258,7 +258,7 @@ func TestClientTransfer(t *testing.T) {
// }
cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent
leecher, _ := NewClient(&cfg)
defer leecher.Stop()
defer leecher.Close()
leecherGreeting, _ := leecher.AddTorrent(mi)
leecherGreeting.AddPeers([]Peer{
Peer{

View File

@ -42,7 +42,7 @@ func main() {
http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
client.WriteStatus(w)
})
defer client.Stop()
defer client.Close()
if flag.NArg() == 0 {
fmt.Fprintln(os.Stderr, "no torrents specified")
return

View File

@ -69,7 +69,7 @@ func exitSignalHandlers(fs *torrentfs.TorrentFS) {
func addTestPeer(client *torrent.Client) {
for _, t := range client.Torrents() {
if testPeerAddr != nil {
if err := client.AddPeers(t.InfoHash, []torrent.Peer{{
if err := t.AddPeers([]torrent.Peer{{
IP: testPeerAddr.IP,
Port: testPeerAddr.Port,
}}); err != nil {

View File

@ -4,20 +4,38 @@ import (
"bitbucket.org/anacrolix/go.torrent/dht"
)
// Override Client defaults.
type Config struct {
DataDir string
ListenAddr string
DisableTrackers bool
DownloadStrategy DownloadStrategy
NoDHT bool
DHTConfig *dht.ServerConfig
NoUpload bool
PeerID string
DisableUTP bool
DisableTCP bool
// Store torrent file data in this directory unless TorrentDataOpener is
// specified.
DataDir string
// The address to listen for new uTP and TCP bittorrent protocol
// connections. DHT shares a UDP socket with uTP unless configured
// otherwise.
ListenAddr string
// Don't announce to trackers. This only leaves DHT to discover peers.
DisableTrackers bool
// Don't create a DHT.
NoDHT bool
// Overrides the default DHT configuration.
DHTConfig *dht.ServerConfig
// Don't chunks to peers.
NoUpload bool
// User-provided Client peer ID. If not present, one is generated automatically.
PeerID string
// For the bittorrent protocol.
DisableUTP bool
// For the bittorrent protocol.
DisableTCP bool
// Don't automatically load "$ConfigDir/blocklist".
NoDefaultBlocklist bool
// Defaults to "$HOME/.config/torrent"
ConfigDir string
// Defaults to "$HOME/.config/torrent". This is where "blocklist",
// "torrents" and other operational files are stored.
ConfigDir string
// Don't save or load to a cache of torrent files stored in
// "$ConfigDir/torrents".
DisableMetainfoCache bool
// Called to instantiate storage for each added torrent. Provided backends
// are in $REPO/data. If not set, the "file" implementation is used.
TorrentDataOpener
}

View File

@ -10,6 +10,8 @@ type Store interface {
OpenTorrent(*metainfo.Info) Data
}
// Represents data storage for a Torrent. Additional optional interfaces to
// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
type Data interface {
// OpenSection(off, n int64) (io.ReadCloser, error)
// ReadAt(p []byte, off int64) (n int, err error)

View File

@ -6,7 +6,7 @@ import (
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
)
type DownloadStrategy interface {
type downloadStrategy interface {
// Tops up the outgoing pending requests.
FillRequests(*torrent, *connection)
TorrentStarted(*torrent)
@ -20,17 +20,17 @@ type DownloadStrategy interface {
PendingData(*torrent) bool
}
type DefaultDownloadStrategy struct{}
type defaultDownloadStrategy struct{}
func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool {
func (me *defaultDownloadStrategy) PendingData(t *torrent) bool {
return !t.haveAllPieces()
}
func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
func (me *defaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
func (me *defaultDownloadStrategy) WriteStatus(w io.Writer) {}
func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
func (s *defaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
@ -64,14 +64,14 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
return
}
func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {}
func (s *defaultDownloadStrategy) TorrentStarted(t *torrent) {}
func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
func (s *defaultDownloadStrategy) TorrentStopped(t *torrent) {
}
func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
func (s *defaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
}
func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
func (me *defaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
func (me *defaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
func (*defaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}

View File

@ -98,7 +98,7 @@ func TestUnmountWedged(t *testing.T) {
NoDefaultBlocklist: true,
})
defer client.Stop()
defer client.Close()
client.AddTorrent(layout.Metainfo)
fs := New(client)
fuseConn, err := fuse.Mount(layout.MountDir)
@ -177,7 +177,7 @@ func TestDownloadOnDemand(t *testing.T) {
t.Fatalf("error creating seeder client: %s", err)
}
seeder.SetIPBlockList(nil)
defer seeder.Stop()
defer seeder.Close()
http.HandleFunc("/seeder", func(w http.ResponseWriter, req *http.Request) {
seeder.WriteStatus(w)
})
@ -207,11 +207,9 @@ func TestDownloadOnDemand(t *testing.T) {
http.HandleFunc("/leecher", func(w http.ResponseWriter, req *http.Request) {
leecher.WriteStatus(w)
})
defer leecher.Stop()
leecher.AddTorrent(layout.Metainfo)
var ih torrent.InfoHash
util.CopyExact(ih[:], layout.Metainfo.Info.Hash)
leecher.AddPeers(ih, []torrent.Peer{func() torrent.Peer {
defer leecher.Close()
leecherTorrent, _ := leecher.AddTorrent(layout.Metainfo)
leecherTorrent.AddPeers([]torrent.Peer{func() torrent.Peer {
_, port, err := net.SplitHostPort(seeder.ListenAddr().String())
if err != nil {
panic(err)

View File

@ -21,8 +21,9 @@ func (r *Range) String() string {
return fmt.Sprintf("%s-%s (%s)", r.First, r.Last, r.Description)
}
// Create a new IP list. The given range must already sorted by the lower IP
// in the range. Behaviour is undefined for lists of overlapping ranges.
// Create a new IP list. The given ranges must already sorted by the lower
// bound IP in each range. Behaviour is undefined for lists of overlapping
// ranges.
func New(initSorted []Range) *IPList {
return &IPList{
ranges: initSorted,

View File

@ -15,7 +15,7 @@ const (
pieceHash = crypto.SHA1
maxRequests = 250 // Maximum pending requests we allow peers to send us.
chunkSize = 0x4000 // 16KiB
BEP20 = "-GT0000-" // Peer ID client identifier prefix
bep20 = "-GT0000-" // Peer ID client identifier prefix
nominalDialTimeout = time.Second * 30
minDialTimeout = 5 * time.Second
)
@ -97,7 +97,7 @@ func newRequest(index, begin, length peer_protocol.Integer) request {
var (
// Requested data not yet available.
ErrDataNotReady = errors.New("data not ready")
errDataNotReady = errors.New("data not ready")
)
// The size in bytes of a metadata extension piece.

View File

@ -4,23 +4,26 @@ import (
"container/list"
)
type OrderedList struct {
// This was used to maintain pieces in order of bytes left to download. I
// don't think it's currently in use.
type orderedList struct {
list *list.List
lessFunc func(a, b interface{}) bool
}
func (me *OrderedList) Len() int {
func (me *orderedList) Len() int {
return me.list.Len()
}
func NewList(lessFunc func(a, b interface{}) bool) *OrderedList {
return &OrderedList{
func newOrderedList(lessFunc func(a, b interface{}) bool) *orderedList {
return &orderedList{
list: list.New(),
lessFunc: lessFunc,
}
}
func (me *OrderedList) ValueChanged(e *list.Element) {
func (me *orderedList) ValueChanged(e *list.Element) {
for prev := e.Prev(); prev != nil && me.lessFunc(e.Value, prev.Value); prev = e.Prev() {
me.list.MoveBefore(e, prev)
}
@ -29,16 +32,16 @@ func (me *OrderedList) ValueChanged(e *list.Element) {
}
}
func (me *OrderedList) Insert(value interface{}) (ret *list.Element) {
func (me *orderedList) Insert(value interface{}) (ret *list.Element) {
ret = me.list.PushFront(value)
me.ValueChanged(ret)
return
}
func (me *OrderedList) Front() *list.Element {
func (me *orderedList) Front() *list.Element {
return me.list.Front()
}
func (me *OrderedList) Remove(e *list.Element) interface{} {
func (me *orderedList) Remove(e *list.Element) interface{} {
return me.list.Remove(e)
}

View File

@ -5,7 +5,7 @@ import (
)
func TestOrderedList(t *testing.T) {
ol := NewList(func(a, b interface{}) bool {
ol := newOrderedList(func(a, b interface{}) bool {
return a.(int) < b.(int)
})
if ol.Len() != 0 {

View File

@ -44,15 +44,9 @@ type peersKey struct {
Port int
}
// Represents data storage for a Torrent. Additional optional interfaces to
// implement are io.Closer, io.ReaderAt, StatefulData, and SectionOpener.
type Data interface {
data.Data
}
// Data maintains per-piece persistent state.
type StatefulData interface {
Data
data.Data
// We believe the piece data will pass a hash check.
PieceCompleted(index int) error
// Returns true if the piece is complete.
@ -72,7 +66,7 @@ type torrent struct {
Pieces []*piece
length int64
data Data
data data.Data
Info *metainfo.Info
// Active peer connections.
@ -229,10 +223,8 @@ func (t *torrent) ceaseNetworking() {
t.pruneTimer.Stop()
}
func (t *torrent) AddPeers(pp []Peer) {
for _, p := range pp {
t.Peers[peersKey{string(p.IP), p.Port}] = p
}
func (t *torrent) addPeer(p Peer) {
t.Peers[peersKey{string(p.IP), p.Port}] = p
}
func (t *torrent) invalidateMetadata() {
@ -305,7 +297,7 @@ func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sy
return
}
func (t *torrent) setStorage(td Data) (err error) {
func (t *torrent) setStorage(td data.Data) (err error) {
if c, ok := t.data.(io.Closer); ok {
c.Close()
}
@ -539,7 +531,7 @@ func (t *torrent) piecePartiallyDownloaded(index int) bool {
return t.PieceNumPendingBytes(pp.Integer(index)) != t.PieceLength(pp.Integer(index))
}
func NumChunksForPiece(chunkSize int, pieceSize int) int {
func numChunksForPiece(chunkSize int, pieceSize int) int {
return (pieceSize + chunkSize - 1) / chunkSize
}