2
0
mirror of synced 2025-02-23 14:18:13 +00:00

Make blob data stateful

This commit is contained in:
Matt Joiner 2015-02-27 12:45:55 +11:00
parent bd6d72fa5f
commit a2d69b4931
6 changed files with 96 additions and 10 deletions

View File

@ -99,7 +99,7 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
// been checked before.
func (cl *Client) queueFirstHash(t *torrent, piece int) {
p := t.Pieces[piece]
if p.EverHashed || p.Hashing || p.QueuedForHash {
if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
return
}
cl.queuePieceCheck(t, pp.Integer(piece))
@ -409,7 +409,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
disableTCP: cfg.DisableTCP,
_configDir: cfg.ConfigDir,
config: *cfg,
torrentDataOpener: func(md *metainfo.Info) (Data, error) {
torrentDataOpener: func(md *metainfo.Info) (StatelessData, error) {
return filePkg.TorrentData(md, cfg.DataDir), nil
},
@ -1553,7 +1553,16 @@ func (cl *Client) setStorage(t *torrent, td Data) (err error) {
return
}
type TorrentDataOpener func(*metainfo.Info) (Data, error)
type TorrentDataOpener func(*metainfo.Info) (StatelessData, error)
type statelessDataWrapper struct {
StatelessData
}
func (statelessDataWrapper) PieceComplete(int) bool { return false }
func (statelessDataWrapper) PieceCompleted(int) error { return nil }
var _ Data = statelessDataWrapper{}
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
err = t.setMetadata(md, bytes, &cl.mu)
@ -1571,10 +1580,14 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
return
}
close(t.gotMetainfo)
td, err := cl.torrentDataOpener(&md)
stateless, err := cl.torrentDataOpener(&md)
if err != nil {
return
}
td, ok := stateless.(Data)
if !ok {
td = statelessDataWrapper{stateless}
}
err = cl.setStorage(t, td)
return
}
@ -2205,9 +2218,17 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
failedPieceHashes.Add(1)
}
p.EverHashed = true
if correct {
err := t.data.PieceCompleted(int(piece))
if err != nil {
log.Printf("error completing piece: %s", err)
correct = false
}
}
if correct {
p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
p.complete = true
p.Event.Broadcast()
me.downloadStrategy.TorrentGotPiece(t, int(piece))
} else {
@ -2250,11 +2271,11 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
for p.Hashing || t.data == nil {
cl.event.Wait()
}
if t.isClosed() {
p.QueuedForHash = false
if t.isClosed() || p.complete {
return
}
p.Hashing = true
p.QueuedForHash = false
cl.mu.Unlock()
sum := t.hashPiece(index)
cl.mu.Lock()

View File

@ -254,7 +254,7 @@ func TestClientTransfer(t *testing.T) {
t.Fatal(err)
}
defer os.RemoveAll(leecherDataDir)
cfg.TorrentDataOpener = func(info *metainfo.Info) (Data, error) {
cfg.TorrentDataOpener = func(info *metainfo.Info) (StatelessData, error) {
return blob.TorrentData(info, leecherDataDir), nil
}
leecher, _ := NewClient(&cfg)

View File

@ -1,14 +1,22 @@
package blob
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"errors"
"io"
"os"
"path/filepath"
"github.com/anacrolix/libtorgo/metainfo"
)
const (
filePerm = 0640
dirPerm = 0750
)
type data struct {
info *metainfo.Info
baseDir string
@ -125,3 +133,47 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er
}
return
}
func (me *data) incompletePiecePath(piece int) string {
return filepath.Join(me.baseDir, "incomplete", me.pieceHashHex(piece))
}
func (me *data) completedPiecePath(piece int) string {
return filepath.Join(me.baseDir, "complete", me.pieceHashHex(piece))
}
func (me *data) PieceCompleted(index int) (err error) {
var (
incompletePiecePath = me.incompletePiecePath(index)
completedPiecePath = me.completedPiecePath(index)
)
fSrc, err := os.Open(incompletePiecePath)
if err != nil {
return
}
defer fSrc.Close()
os.MkdirAll(filepath.Dir(completedPiecePath), dirPerm)
fDst, err := os.OpenFile(completedPiecePath, os.O_EXCL|os.O_CREATE|os.O_WRONLY, filePerm)
if err != nil {
return
}
defer fDst.Close()
hasher := sha1.New()
r := io.TeeReader(io.LimitReader(fSrc, me.info.Piece(index).Length()), hasher)
_, err = io.Copy(fDst, r)
if err != nil {
return
}
if !bytes.Equal(hasher.Sum(nil), me.info.Piece(index).Hash()) {
err = errors.New("piece incomplete")
os.Remove(completedPiecePath)
return
}
os.Remove(incompletePiecePath)
return
}
func (me *data) PieceComplete(piece int) bool {
_, err := os.Stat(me.completedPiecePath(piece))
return err == nil
}

View File

@ -191,7 +191,7 @@ func TestDownloadOnDemand(t *testing.T) {
NoDefaultBlocklist: true,
TorrentDataOpener: func(info *metainfo.Info) (torrent.Data, error) {
TorrentDataOpener: func(info *metainfo.Info) (torrent.StatelessData, error) {
return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download"))
},

View File

@ -45,6 +45,7 @@ const (
type piece struct {
Hash pieceSum
complete bool
PendingChunkSpecs map[chunkSpec]struct{}
Hashing bool
QueuedForHash bool
@ -72,7 +73,7 @@ func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) {
}
func (p *piece) Complete() bool {
return len(p.PendingChunkSpecs) == 0 && p.EverHashed
return p.complete
}
func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) {

View File

@ -38,13 +38,21 @@ type peersKey struct {
Port int
}
type Data interface {
type StatelessData interface {
ReadAt(p []byte, off int64) (n int, err error)
Close()
WriteAt(p []byte, off int64) (n int, err error)
WriteSectionTo(w io.Writer, off, n int64) (written int64, err error)
}
type Data interface {
StatelessData
// We believe the piece data will pass a hash check.
PieceCompleted(index int) error
// Returns true if the piece is complete.
PieceComplete(index int) bool
}
// Is not aware of Client. Maintains state of torrent for with-in a Client.
type torrent struct {
stateMu sync.Mutex
@ -211,6 +219,9 @@ func (t *torrent) setStorage(td Data) (err error) {
t.data.Close()
}
t.data = td
for i, p := range t.Pieces {
p.complete = t.data.PieceComplete(i)
}
return
}
@ -586,6 +597,7 @@ func (t *torrent) hashPiece(piece pp.Integer) (ps pieceSum) {
util.CopyExact(ps[:], hash.Sum(nil))
return
}
func (t *torrent) haveAllPieces() bool {
if !t.haveInfo() {
return false