diff --git a/client.go b/client.go index 196f67a5..b96f67d0 100644 --- a/client.go +++ b/client.go @@ -99,7 +99,7 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) { // been checked before. func (cl *Client) queueFirstHash(t *torrent, piece int) { p := t.Pieces[piece] - if p.EverHashed || p.Hashing || p.QueuedForHash { + if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() { return } cl.queuePieceCheck(t, pp.Integer(piece)) @@ -409,7 +409,7 @@ func NewClient(cfg *Config) (cl *Client, err error) { disableTCP: cfg.DisableTCP, _configDir: cfg.ConfigDir, config: *cfg, - torrentDataOpener: func(md *metainfo.Info) (Data, error) { + torrentDataOpener: func(md *metainfo.Info) (StatelessData, error) { return filePkg.TorrentData(md, cfg.DataDir), nil }, @@ -1553,7 +1553,16 @@ func (cl *Client) setStorage(t *torrent, td Data) (err error) { return } -type TorrentDataOpener func(*metainfo.Info) (Data, error) +type TorrentDataOpener func(*metainfo.Info) (StatelessData, error) + +type statelessDataWrapper struct { + StatelessData +} + +func (statelessDataWrapper) PieceComplete(int) bool { return false } +func (statelessDataWrapper) PieceCompleted(int) error { return nil } + +var _ Data = statelessDataWrapper{} func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { err = t.setMetadata(md, bytes, &cl.mu) @@ -1571,10 +1580,14 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e return } close(t.gotMetainfo) - td, err := cl.torrentDataOpener(&md) + stateless, err := cl.torrentDataOpener(&md) if err != nil { return } + td, ok := stateless.(Data) + if !ok { + td = statelessDataWrapper{stateless} + } err = cl.setStorage(t, td) return } @@ -2205,9 +2218,17 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { failedPieceHashes.Add(1) } p.EverHashed = true + if correct { + err := t.data.PieceCompleted(int(piece)) + if err != nil { + log.Printf("error completing piece: %s", err) + correct = false + } + } if correct { p.Priority = piecePriorityNone p.PendingChunkSpecs = nil + p.complete = true p.Event.Broadcast() me.downloadStrategy.TorrentGotPiece(t, int(piece)) } else { @@ -2250,11 +2271,11 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { for p.Hashing || t.data == nil { cl.event.Wait() } - if t.isClosed() { + p.QueuedForHash = false + if t.isClosed() || p.complete { return } p.Hashing = true - p.QueuedForHash = false cl.mu.Unlock() sum := t.hashPiece(index) cl.mu.Lock() diff --git a/client_test.go b/client_test.go index d873fbe1..5dc8a4d3 100644 --- a/client_test.go +++ b/client_test.go @@ -254,7 +254,7 @@ func TestClientTransfer(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(leecherDataDir) - cfg.TorrentDataOpener = func(info *metainfo.Info) (Data, error) { + cfg.TorrentDataOpener = func(info *metainfo.Info) (StatelessData, error) { return blob.TorrentData(info, leecherDataDir), nil } leecher, _ := NewClient(&cfg) diff --git a/data/blob/blob.go b/data/blob/blob.go index 43657f2a..f8ece667 100644 --- a/data/blob/blob.go +++ b/data/blob/blob.go @@ -1,14 +1,22 @@ package blob import ( + "bytes" + "crypto/sha1" "encoding/hex" "errors" "io" "os" + "path/filepath" "github.com/anacrolix/libtorgo/metainfo" ) +const ( + filePerm = 0640 + dirPerm = 0750 +) + type data struct { info *metainfo.Info baseDir string @@ -125,3 +133,47 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er } return } + +func (me *data) incompletePiecePath(piece int) string { + return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece)) +} + +func (me *data) completedPiecePath(piece int) string { + return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece)) +} + +func (me *data) PieceCompleted(index int) (err error) { + var ( + incompletePiecePath = me.incompletePiecePath(index) + completedPiecePath = me.completedPiecePath(index) + ) + fSrc, err := os.Open(incompletePiecePath) + if err != nil { + return + } + defer fSrc.Close() + os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm) + fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm) + if err != nil { + return + } + defer fDst.Close() + hasher := sha1.New() + r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher) + _, err = io.Copy(fDst, r) + if err != nil { + return + } + if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) { + err = errors.New("piece incomplete") + os.Remove(completedPiecePath) + return + } + os.Remove(incompletePiecePath) + return +} + +func (me *data) PieceComplete(piece int) bool { + _, err := os.Stat(me.completedPiecePath(piece)) + return err == nil +} diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 69df66b0..4c865ba4 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -191,7 +191,7 @@ func TestDownloadOnDemand(t *testing.T) { NoDefaultBlocklist: true, - TorrentDataOpener: func(info *metainfo.Info) (torrent.Data, error) { + TorrentDataOpener: func(info *metainfo.Info) (torrent.StatelessData, error) { return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download")) }, diff --git a/misc.go b/misc.go index f14c8f74..c3da1048 100644 --- a/misc.go +++ b/misc.go @@ -45,6 +45,7 @@ const ( type piece struct { Hash pieceSum + complete bool PendingChunkSpecs map[chunkSpec]struct{} Hashing bool QueuedForHash bool @@ -72,7 +73,7 @@ func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) { } func (p *piece) Complete() bool { - return len(p.PendingChunkSpecs) == 0 && p.EverHashed + return p.complete } func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) { diff --git a/torrent.go b/torrent.go index 3804e067..5217db56 100644 --- a/torrent.go +++ b/torrent.go @@ -38,13 +38,21 @@ type peersKey struct { Port int } -type Data interface { +type StatelessData interface { ReadAt(p []byte, off int64) (n int, err error) Close() WriteAt(p []byte, off int64) (n int, err error) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) } +type Data interface { + StatelessData + // We believe the piece data will pass a hash check. + PieceCompleted(index int) error + // Returns true if the piece is complete. + PieceComplete(index int) bool +} + // Is not aware of Client. Maintains state of torrent for with-in a Client. type torrent struct { stateMu sync.Mutex @@ -211,6 +219,9 @@ func (t *torrent) setStorage(td Data) (err error) { t.data.Close() } t.data = td + for i, p := range t.Pieces { + p.complete = t.data.PieceComplete(i) + } return } @@ -586,6 +597,7 @@ func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) { util.CopyExact(ps[:], hash.Sum(nil)) return } + func (t *torrent) haveAllPieces() bool { if !t.haveInfo() { return false