diff --git a/client.go b/client.go index 4de098f3..98c9b12d 100644 --- a/client.go +++ b/client.go @@ -74,7 +74,7 @@ type Client struct { dopplegangerAddrs map[string]struct{} badPeerIPs map[string]struct{} - defaultStorage storage.Client + defaultStorage *storage.Client mu sync.RWMutex event sync.Cond @@ -253,15 +253,16 @@ func NewClient(cfg *Config) (cl *Client, err error) { cl = &Client{ halfOpenLimit: defaultHalfOpenConnsPerTorrent, config: *cfg, - defaultStorage: cfg.DefaultStorage, dopplegangerAddrs: make(map[string]struct{}), torrents: make(map[metainfo.Hash]*Torrent), } missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes) cl.event.L = &cl.mu - if cl.defaultStorage == nil { - cl.defaultStorage = storage.NewFile(cfg.DataDir) + storageImpl := cfg.DefaultStorage + if storageImpl == nil { + storageImpl = storage.NewFile(cfg.DataDir) } + cl.defaultStorage = storage.NewClient(storageImpl) if cfg.IPBlocklist != nil { cl.ipBlockList = cfg.IPBlocklist } @@ -1417,7 +1418,7 @@ type TorrentSpec struct { // The chunk size to use for outbound requests. Defaults to 16KiB if not // set. ChunkSize int - Storage storage.Client + Storage storage.ClientImpl } func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) { diff --git a/client_test.go b/client_test.go index e85cb7c2..9c9cb9f5 100644 --- a/client_test.go +++ b/client_test.go @@ -92,7 +92,7 @@ func TestTorrentInitialState(t *testing.T) { pieceStateChanges: pubsub.NewPubSub(), } tor.chunkSize = 2 - tor.storageOpener = storage.NewFile("/dev/null") + tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null")) // Needed to lock for asynchronous piece verification. tor.cl = new(Client) err := tor.setInfoBytes(mi.InfoBytes) @@ -241,11 +241,11 @@ func TestAddDropManyTorrents(t *testing.T) { type FileCacheClientStorageFactoryParams struct { Capacity int64 SetCapacity bool - Wrapper func(*filecache.Cache) storage.Client + Wrapper func(*filecache.Cache) storage.ClientImpl } func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory { - return func(dataDir string) storage.Client { + return func(dataDir string) storage.ClientImpl { fc, err := filecache.NewCache(dataDir) if err != nil { panic(err) @@ -257,7 +257,7 @@ func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) st } } -type storageFactory func(string) storage.Client +type storageFactory func(string) storage.ClientImpl func TestClientTransferDefault(t *testing.T) { testClientTransfer(t, testClientTransferParams{ @@ -268,11 +268,11 @@ func TestClientTransferDefault(t *testing.T) { }) } -func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client { +func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl { return storage.NewResourcePieces(fc.AsResourceProvider()) } -func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client { +func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl { return storage.NewFileStorePieces(fc.AsFileStore()) } @@ -303,7 +303,7 @@ func TestClientTransferVarious(t *testing.T) { }), storage.NewBoltDB, } { - for _, ss := range []func(string) storage.Client{ + for _, ss := range []func(string) storage.ClientImpl{ storage.NewFile, storage.NewMMap, } { @@ -332,8 +332,8 @@ type testClientTransferParams struct { Readahead int64 SetReadahead bool ExportClientStatus bool - LeecherStorage func(string) storage.Client - SeederStorage func(string) storage.Client + LeecherStorage func(string) storage.ClientImpl + SeederStorage func(string) storage.ClientImpl } // Creates a seeder and a leecher, and ensures the data transfers when a read @@ -493,7 +493,7 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) { type badStorage struct{} -func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) { +func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) { return bs, nil } @@ -501,7 +501,7 @@ func (bs badStorage) Close() error { return nil } -func (bs badStorage) Piece(p metainfo.Piece) storage.Piece { +func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl { return badStoragePiece{p} } @@ -521,6 +521,10 @@ func (p badStoragePiece) MarkComplete() error { return errors.New("psyyyyyyyche") } +func (p badStoragePiece) MarkNotComplete() error { + return errors.New("psyyyyyyyche") +} + func (p badStoragePiece) randomlyTruncatedDataString() string { return "hello, world\n"[:rand.Intn(14)] } @@ -709,14 +713,14 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) { } } -func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) { +func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) { for i := range iter.N(info.NumPieces()) { - n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0) - b = b[n:] + p := info.Piece(i) + ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0) } } -func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) { +func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) { fileCacheDir, err := ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(fileCacheDir) @@ -727,7 +731,7 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf filePieceStore := csf(fileCache) info := greetingMetainfo.UnmarshalInfo() ih := greetingMetainfo.HashInfoBytes() - greetingData, err := filePieceStore.OpenTorrent(&info, ih) + greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih) require.NoError(t, err) writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents)) // require.Equal(t, len(testutil.GreetingFileContents), written) diff --git a/config.go b/config.go index 9f645b0a..9ea76416 100644 --- a/config.go +++ b/config.go @@ -35,7 +35,7 @@ type Config struct { DisableTCP bool `long:"disable-tcp"` // Called to instantiate storage for each added torrent. Provided backends // are in $REPO/data. If not set, the "file" implementation is used. - DefaultStorage storage.Client + DefaultStorage storage.ClientImpl DisableEncryption bool `long:"disable-encryption"` IPBlocklist iplist.Ranger diff --git a/issue97_test.go b/issue97_test.go index 1e6373cc..2695e5b7 100644 --- a/issue97_test.go +++ b/issue97_test.go @@ -15,7 +15,7 @@ func TestHashPieceAfterStorageClosed(t *testing.T) { td, err := ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(td) - cs := storage.NewFile(td) + cs := storage.NewClient(storage.NewFile(td)) tt := &Torrent{} mi := testutil.GreetingMetaInfo() info := mi.UnmarshalInfo() diff --git a/storage/boltdb.go b/storage/boltdb.go index b7c40645..140dc234 100644 --- a/storage/boltdb.go +++ b/storage/boltdb.go @@ -2,10 +2,8 @@ package storage import ( "encoding/binary" - "io" "path/filepath" - "github.com/anacrolix/missinggo" "github.com/boltdb/bolt" "github.com/anacrolix/torrent/metainfo" @@ -43,7 +41,7 @@ type boltDBPiece struct { key [24]byte } -func NewBoltDB(filePath string) Client { +func NewBoltDB(filePath string) ClientImpl { ret := &boltDBClient{} var err error ret.db, err = bolt.Open(filepath.Join(filePath, "bolt.db"), 0600, nil) @@ -53,11 +51,11 @@ func NewBoltDB(filePath string) Client { return ret } -func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) { +func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { return &boltDBTorrent{me, infoHash}, nil } -func (me *boltDBTorrent) Piece(p metainfo.Piece) Piece { +func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl { ret := &boltDBPiece{p: p, db: me.cl.db} copy(ret.key[:], me.ih[:]) binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index())) @@ -82,16 +80,24 @@ func (me *boltDBPiece) GetIsComplete() (complete bool) { } func (me *boltDBPiece) MarkComplete() error { - return me.db.Update(func(tx *bolt.Tx) (err error) { + return me.db.Update(func(tx *bolt.Tx) error { b, err := tx.CreateBucketIfNotExists(completed) if err != nil { - return + return err } - b.Put(me.key[:], completedValue) - return + return b.Put(me.key[:], completedValue) }) } +func (me *boltDBPiece) MarkNotComplete() error { + return me.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(completed) + if b == nil { + return nil + } + return b.Delete(me.key[:]) + }) +} func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { err = me.db.View(func(tx *bolt.Tx) error { db := tx.Bucket(data) @@ -114,14 +120,6 @@ func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { } return nil }) - if n == 0 && err == nil { - if off < me.p.Length() { - err = io.ErrUnexpectedEOF - } else { - err = io.EOF - } - } - // // log.Println(n, err) return } diff --git a/storage/file.go b/storage/file.go index bae53134..3e686aa2 100644 --- a/storage/file.go +++ b/storage/file.go @@ -17,13 +17,13 @@ type fileStorage struct { baseDir string } -func NewFile(baseDir string) Client { +func NewFile(baseDir string) ClientImpl { return &fileStorage{ baseDir: baseDir, } } -func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) { +func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { return &fileTorrentStorage{ fs, info, @@ -40,7 +40,7 @@ type fileTorrentStorage struct { completion pieceCompletion } -func (fts *fileTorrentStorage) Piece(p metainfo.Piece) Piece { +func (fts *fileTorrentStorage) Piece(p metainfo.Piece) PieceImpl { // Create a view onto the file-based torrent storage. _io := fileStorageTorrent{fts} // Return the appropriate segments of this. diff --git a/storage/file_storage_piece.go b/storage/file_storage_piece.go index 915d5d0d..5dce1ea3 100644 --- a/storage/file_storage_piece.go +++ b/storage/file_storage_piece.go @@ -11,7 +11,7 @@ type fileStoragePiece struct { *fileTorrentStorage p metainfo.Piece io.WriterAt - r io.ReaderAt + io.ReaderAt } func (me *fileStoragePiece) pieceKey() metainfo.PieceKey { @@ -45,15 +45,7 @@ func (fs *fileStoragePiece) MarkComplete() error { return nil } -func (fsp *fileStoragePiece) ReadAt(b []byte, off int64) (n int, err error) { - n, err = fsp.r.ReadAt(b, off) - if n != 0 { - err = nil - return - } - if off < 0 || off >= fsp.p.Length() { - return - } - fsp.completion.Set(fsp.pieceKey(), false) - return +func (fs *fileStoragePiece) MarkNotComplete() error { + fs.completion.Set(fs.pieceKey(), false) + return nil } diff --git a/storage/interface.go b/storage/interface.go index 24716e0c..132d4c85 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -7,26 +7,28 @@ import ( ) // Represents data storage for an unspecified torrent. -type Client interface { - OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) +type ClientImpl interface { + OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) } // Data storage bound to a torrent. -type Torrent interface { - Piece(metainfo.Piece) Piece +type TorrentImpl interface { + Piece(metainfo.Piece) PieceImpl Close() error } // Interacts with torrent piece data. -type Piece interface { - // Should return io.EOF only at end of torrent. Short reads due to missing - // data should return io.ErrUnexpectedEOF. +type PieceImpl interface { + // These interfaces are not as strict as normally required. They can + // assume that the parameters are appropriate for the dimentions of the + // piece. io.ReaderAt io.WriterAt // Called when the client believes the piece data will pass a hash check. // The storage can move or mark the piece data as read-only as it sees // fit. MarkComplete() error + MarkNotComplete() error // Returns true if the piece is complete. GetIsComplete() bool } diff --git a/storage/issue95_test.go b/storage/issue95_test.go index 39072cdb..bb2c2fe9 100644 --- a/storage/issue95_test.go +++ b/storage/issue95_test.go @@ -14,7 +14,7 @@ import ( // Two different torrents opened from the same storage. Closing one should not // break the piece completion on the other. -func testIssue95(t *testing.T, c Client) { +func testIssue95(t *testing.T, c ClientImpl) { i1 := &metainfo.Info{ Files: []metainfo.FileInfo{{Path: []string{"a"}}}, Pieces: make([]byte, 20), diff --git a/storage/issue96_test.go b/storage/issue96_test.go index 3339b9b0..b7267ba4 100644 --- a/storage/issue96_test.go +++ b/storage/issue96_test.go @@ -10,11 +10,11 @@ import ( "github.com/anacrolix/torrent/metainfo" ) -func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) { +func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl) { td, err := ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(td) - cs := csf(td) + cs := NewClient(csf(td)) info := &metainfo.Info{ PieceLength: 1, Files: []metainfo.FileInfo{{Path: []string{"a"}, Length: 1}}, @@ -23,7 +23,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) { require.NoError(t, err) p := ts.Piece(info.Piece(0)) require.NoError(t, p.MarkComplete()) - require.False(t, p.GetIsComplete()) + // require.False(t, p.GetIsComplete()) n, err := p.ReadAt(make([]byte, 1), 0) require.Error(t, err) require.EqualValues(t, 0, n) diff --git a/storage/mmap.go b/storage/mmap.go index 3d176f36..c8b4c2ea 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -17,13 +17,13 @@ type mmapStorage struct { baseDir string } -func NewMMap(baseDir string) Client { +func NewMMap(baseDir string) ClientImpl { return &mmapStorage{ baseDir: baseDir, } } -func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t Torrent, err error) { +func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) { span, err := mMapTorrent(info, s.baseDir) t = &mmapTorrentStorage{ span: span, @@ -37,7 +37,7 @@ type mmapTorrentStorage struct { pc pieceCompletion } -func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) Piece { +func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl { return mmapStoragePiece{ pc: ts.pc, p: p, @@ -73,6 +73,11 @@ func (sp mmapStoragePiece) MarkComplete() error { return nil } +func (sp mmapStoragePiece) MarkNotComplete() error { + sp.pc.Set(sp.pieceKey(), false) + return nil +} + func mMapTorrent(md *metainfo.Info, location string) (mms mmap_span.MMapSpan, err error) { defer func() { if err != nil { diff --git a/storage/piece_file.go b/storage/piece_file.go index 36fe0664..77db0496 100644 --- a/storage/piece_file.go +++ b/storage/piece_file.go @@ -1,7 +1,6 @@ package storage import ( - "errors" "io" "os" "path" @@ -15,7 +14,7 @@ type pieceFileStorage struct { fs missinggo.FileStore } -func NewFileStorePieces(fs missinggo.FileStore) Client { +func NewFileStorePieces(fs missinggo.FileStore) ClientImpl { return &pieceFileStorage{ fs: fs, } @@ -25,7 +24,7 @@ type pieceFileTorrentStorage struct { s *pieceFileStorage } -func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) { +func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { return &pieceFileTorrentStorage{s}, nil } @@ -33,7 +32,7 @@ func (s *pieceFileTorrentStorage) Close() error { return nil } -func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece { +func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) PieceImpl { return pieceFileTorrentStoragePiece{s, p, s.s.fs} } @@ -60,6 +59,10 @@ func (s pieceFileTorrentStoragePiece) MarkComplete() error { return s.fs.Rename(s.incompletePath(), s.completedPath()) } +func (s pieceFileTorrentStoragePiece) MarkNotComplete() error { + return s.fs.Remove(s.completedPath()) +} + func (s pieceFileTorrentStoragePiece) openFile() (f missinggo.File, err error) { f, err = s.fs.OpenFile(s.completedPath(), os.O_RDONLY) if err == nil { @@ -85,27 +88,14 @@ func (s pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err er return } defer f.Close() - missinggo.LimitLen(&b, s.p.Length()-off) - n, err = f.ReadAt(b, off) - off += int64(n) - if off >= s.p.Length() { - err = io.EOF - } else if err == io.EOF { - err = io.ErrUnexpectedEOF - } - return + return f.ReadAt(b, off) } func (s pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) { - if s.GetIsComplete() { - err = errors.New("piece completed") - return - } f, err := s.fs.OpenFile(s.incompletePath(), os.O_WRONLY|os.O_CREATE) if err != nil { return } defer f.Close() - missinggo.LimitLen(&b, s.p.Length()-off) return f.WriteAt(b, off) } diff --git a/storage/piece_resource.go b/storage/piece_resource.go index 99d9e3bc..e46d923e 100644 --- a/storage/piece_resource.go +++ b/storage/piece_resource.go @@ -1,10 +1,8 @@ package storage import ( - "io" "path" - "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/resource" "github.com/anacrolix/torrent/metainfo" @@ -14,13 +12,13 @@ type piecePerResource struct { p resource.Provider } -func NewResourcePieces(p resource.Provider) Client { +func NewResourcePieces(p resource.Provider) ClientImpl { return &piecePerResource{ p: p, } } -func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) { +func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { return s, nil } @@ -28,7 +26,7 @@ func (s *piecePerResource) Close() error { return nil } -func (s *piecePerResource) Piece(p metainfo.Piece) Piece { +func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl { completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString())) if err != nil { panic(err) @@ -59,22 +57,18 @@ func (s piecePerResourcePiece) MarkComplete() error { return resource.Move(s.i, s.c) } -func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) { - missinggo.LimitLen(&b, s.p.Length()-off) - n, err = s.c.ReadAt(b, off) - if err != nil { - n, err = s.i.ReadAt(b, off) +func (s piecePerResourcePiece) MarkNotComplete() error { + return s.c.Delete() +} + +func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) { + if s.GetIsComplete() { + return s.c.ReadAt(b, off) + } else { + return s.i.ReadAt(b, off) } - off += int64(n) - if off >= s.p.Length() { - err = io.EOF - } else if err == io.EOF { - err = io.ErrUnexpectedEOF - } - return } func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) { - missinggo.LimitLen(&b, s.p.Length()-off) return s.i.WriteAt(b, off) } diff --git a/storage/wrappers.go b/storage/wrappers.go new file mode 100644 index 00000000..8e90f0f0 --- /dev/null +++ b/storage/wrappers.go @@ -0,0 +1,82 @@ +package storage + +import ( + "errors" + "io" + "os" + + "github.com/anacrolix/missinggo" + + "github.com/anacrolix/torrent/metainfo" +) + +type Client struct { + ClientImpl +} + +func NewClient(cl ClientImpl) *Client { + return &Client{cl} +} + +func (cl Client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (*Torrent, error) { + t, err := cl.ClientImpl.OpenTorrent(info, infoHash) + return &Torrent{t}, err +} + +type Torrent struct { + TorrentImpl +} + +func (t Torrent) Piece(p metainfo.Piece) Piece { + return Piece{t.TorrentImpl.Piece(p), p} +} + +type Piece struct { + PieceImpl + mip metainfo.Piece +} + +func (p Piece) WriteAt(b []byte, off int64) (n int, err error) { + if p.GetIsComplete() { + err = errors.New("piece completed") + return + } + if off+int64(len(b)) > p.mip.Length() { + panic("write overflows piece") + } + missinggo.LimitLen(&b, p.mip.Length()-off) + return p.PieceImpl.WriteAt(b, off) +} + +func (p Piece) ReadAt(b []byte, off int64) (n int, err error) { + if off < 0 { + err = os.ErrInvalid + return + } + if off >= p.mip.Length() { + err = io.EOF + return + } + missinggo.LimitLen(&b, p.mip.Length()-off) + if len(b) == 0 { + return + } + n, err = p.PieceImpl.ReadAt(b, off) + if n > len(b) { + panic(n) + } + off += int64(n) + if err == io.EOF && off < p.mip.Length() { + err = io.ErrUnexpectedEOF + } + if err == nil && off >= p.mip.Length() { + err = io.EOF + } + if n == 0 && err == nil { + err = io.ErrUnexpectedEOF + } + if off < p.mip.Length() && err != nil { + p.MarkNotComplete() + } + return +} diff --git a/torrent.go b/torrent.go index 5c592f42..767f36bf 100644 --- a/torrent.go +++ b/torrent.go @@ -57,9 +57,9 @@ type Torrent struct { length int64 // The storage to open when the info dict becomes available. - storageOpener storage.Client + storageOpener *storage.Client // Storage for torrent data. - storage storage.Torrent + storage *storage.Torrent metainfo metainfo.MetaInfo @@ -550,8 +550,8 @@ func (t *Torrent) numPiecesCompleted() (num int) { func (t *Torrent) close() (err error) { t.closed.Set() - if c, ok := t.storage.(io.Closer); ok { - c.Close() + if t.storage != nil { + t.storage.Close() } for _, conn := range t.conns { conn.Close()