2
0
mirror of synced 2025-02-23 22:28:11 +00:00

Track completion known to implementation state

Addresses #193
This commit is contained in:
Matt Joiner 2017-10-12 16:09:32 +11:00
parent 083d2120f3
commit dd083a4e11
19 changed files with 275 additions and 228 deletions

View File

@ -537,12 +537,14 @@ type badStoragePiece struct {
p metainfo.Piece
}
var _ storage.PieceImpl = badStoragePiece{}
func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
return 0, nil
}
func (p badStoragePiece) GetIsComplete() bool {
return true
func (p badStoragePiece) Completion() storage.Completion {
return storage.Completion{Complete: true, Ok: true}
}
func (p badStoragePiece) MarkComplete() error {

View File

@ -58,8 +58,8 @@ func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
return me
}
func (me *torrentStorage) GetIsComplete() bool {
return false
func (me *torrentStorage) Completion() storage.Completion {
return storage.Completion{}
}
func (me *torrentStorage) MarkComplete() error {

View File

@ -1,6 +1,8 @@
package torrent
import (
"fmt"
"log"
"sync"
"github.com/anacrolix/missinggo/bitmap"
@ -39,9 +41,9 @@ type Piece struct {
dirtyChunks bitmap.Bitmap
hashing bool
queuedForHash bool
everHashed bool
numVerifies int64
storageCompletionOk bool
publicPieceState PieceState
priority piecePriority
@ -51,6 +53,10 @@ type Piece struct {
noPendingWrites sync.Cond
}
func (p *Piece) String() string {
return fmt.Sprintf("%s/%d", p.t.infoHash.HexString(), p.index)
}
func (p *Piece) Info() metainfo.Piece {
return p.t.info.Piece(p.index)
}
@ -168,8 +174,15 @@ func (p *Piece) VerifyData() {
if p.hashing {
target++
}
log.Printf("target: %d", target)
p.t.queuePieceCheck(p.index)
for p.numVerifies < target {
log.Printf("got %d verifies", p.numVerifies)
p.t.cl.event.Wait()
}
log.Print("done")
}
func (p *Piece) queuedForHash() bool {
return p.t.piecesQueuedForHash.Get(p.index)
}

View File

@ -11,14 +11,21 @@ import (
"github.com/anacrolix/torrent/metainfo"
)
const (
boltDbCompleteValue = "c"
boltDbIncompleteValue = "i"
)
var (
value = []byte{}
completionBucketKey = []byte("completion")
)
type boltPieceCompletion struct {
db *bolt.DB
}
var _ PieceCompletion = (*boltPieceCompletion)(nil)
func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) {
os.MkdirAll(dir, 0770)
p := filepath.Join(dir, ".torrent.bolt.db")
@ -32,27 +39,35 @@ func NewBoltPieceCompletion(dir string) (ret PieceCompletion, err error) {
return
}
func (me *boltPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) {
func (me boltPieceCompletion) Get(pk metainfo.PieceKey) (cn Completion, err error) {
err = me.db.View(func(tx *bolt.Tx) error {
c := tx.Bucket(completed)
if c == nil {
cb := tx.Bucket(completionBucketKey)
if cb == nil {
return nil
}
ih := c.Bucket(pk.InfoHash[:])
ih := cb.Bucket(pk.InfoHash[:])
if ih == nil {
return nil
}
var key [4]byte
binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
ret = ih.Get(key[:]) != nil
cn.Ok = true
switch string(ih.Get(key[:])) {
case boltDbCompleteValue:
cn.Complete = true
case boltDbIncompleteValue:
cn.Complete = false
default:
cn.Ok = false
}
return nil
})
return
}
func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (me boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
return me.db.Update(func(tx *bolt.Tx) error {
c, err := tx.CreateBucketIfNotExists(completed)
c, err := tx.CreateBucketIfNotExists(completionBucketKey)
if err != nil {
return err
}
@ -62,11 +77,13 @@ func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
}
var key [4]byte
binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
return ih.Put(key[:], []byte(func() string {
if b {
return ih.Put(key[:], value)
return boltDbCompleteValue
} else {
return ih.Delete(key[:])
return boltDbIncompleteValue
}
}()))
})
}

100
storage/bolt_piece.go Normal file
View File

@ -0,0 +1,100 @@
package storage
import (
"encoding/binary"
"github.com/anacrolix/missinggo/x"
"github.com/anacrolix/torrent/metainfo"
"github.com/boltdb/bolt"
)
type boltDBPiece struct {
db *bolt.DB
p metainfo.Piece
ih metainfo.Hash
key [24]byte
}
var (
_ PieceImpl = (*boltDBPiece)(nil)
dataBucketKey = []byte("data")
)
func (me *boltDBPiece) pc() PieceCompletionGetSetter {
return boltPieceCompletion{me.db}
}
func (me *boltDBPiece) pk() metainfo.PieceKey {
return metainfo.PieceKey{me.ih, me.p.Index()}
}
func (me *boltDBPiece) Completion() Completion {
c, err := me.pc().Get(me.pk())
x.Pie(err)
return c
}
func (me *boltDBPiece) MarkComplete() error {
return me.pc().Set(me.pk(), true)
}
func (me *boltDBPiece) MarkNotComplete() error {
return me.pc().Set(me.pk(), false)
}
func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
err = me.db.View(func(tx *bolt.Tx) error {
db := tx.Bucket(dataBucketKey)
if db == nil {
return nil
}
ci := off / chunkSize
off %= chunkSize
for len(b) != 0 {
ck := me.chunkKey(int(ci))
_b := db.Get(ck[:])
if len(_b) != chunkSize {
break
}
n1 := copy(b, _b[off:])
off = 0
ci++
b = b[n1:]
n += n1
}
return nil
})
return
}
func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
copy(ret[:], me.key[:])
binary.BigEndian.PutUint16(ret[24:], uint16(index))
return
}
func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
err = me.db.Update(func(tx *bolt.Tx) error {
db, err := tx.CreateBucketIfNotExists(dataBucketKey)
if err != nil {
return err
}
ci := off / chunkSize
off %= chunkSize
for len(b) != 0 {
_b := make([]byte, chunkSize)
ck := me.chunkKey(int(ci))
copy(_b, db.Get(ck[:]))
n1 := copy(_b[off:], b)
db.Put(ck[:], _b)
if n1 > len(b) {
break
}
b = b[n1:]
off = 0
ci++
n += n1
}
return nil
})
return
}

View File

@ -15,16 +15,6 @@ const (
chunkSize = 1 << 14
)
var (
// The key for the data bucket.
data = []byte("data")
// The key for the completion flag bucket.
completed = []byte("completed")
// The value to assigned to pieces that are complete in the completed
// bucket.
completedValue = []byte{1}
)
type boltDBClient struct {
db *bolt.DB
}
@ -34,12 +24,6 @@ type boltDBTorrent struct {
ih metainfo.Hash
}
type boltDBPiece struct {
db *bolt.DB
p metainfo.Piece
key [24]byte
}
func NewBoltDB(filePath string) ClientImpl {
ret := &boltDBClient{}
var err error
@ -59,102 +43,14 @@ func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash)
}
func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
ret := &boltDBPiece{p: p, db: me.cl.db}
ret := &boltDBPiece{
p: p,
db: me.cl.db,
ih: me.ih,
}
copy(ret.key[:], me.ih[:])
binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index()))
return ret
}
func (boltDBTorrent) Close() error { return nil }
func (me *boltDBPiece) GetIsComplete() (complete bool) {
err := me.db.View(func(tx *bolt.Tx) error {
cb := tx.Bucket(completed)
// db := tx.Bucket(data)
complete =
cb != nil && len(cb.Get(me.key[:])) != 0
// db != nil && int64(len(db.Get(me.key[:]))) == me.p.Length()
return nil
})
if err != nil {
panic(err)
}
return
}
func (me *boltDBPiece) MarkComplete() error {
return me.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(completed)
if err != nil {
return err
}
return b.Put(me.key[:], completedValue)
})
}
func (me *boltDBPiece) MarkNotComplete() error {
return me.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(completed)
if b == nil {
return nil
}
return b.Delete(me.key[:])
})
}
func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
err = me.db.View(func(tx *bolt.Tx) error {
db := tx.Bucket(data)
if db == nil {
return nil
}
ci := off / chunkSize
off %= chunkSize
for len(b) != 0 {
ck := me.chunkKey(int(ci))
_b := db.Get(ck[:])
if len(_b) != chunkSize {
break
}
n1 := copy(b, _b[off:])
off = 0
ci++
b = b[n1:]
n += n1
}
return nil
})
return
}
func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
copy(ret[:], me.key[:])
binary.BigEndian.PutUint16(ret[24:], uint16(index))
return
}
func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
err = me.db.Update(func(tx *bolt.Tx) error {
db, err := tx.CreateBucketIfNotExists(data)
if err != nil {
return err
}
ci := off / chunkSize
off %= chunkSize
for len(b) != 0 {
_b := make([]byte, chunkSize)
ck := me.chunkKey(int(ci))
copy(_b, db.Get(ck[:]))
n1 := copy(_b[off:], b)
db.Put(ck[:], _b)
if n1 > len(b) {
break
}
b = b[n1:]
off = 0
ci++
n += n1
}
return nil
})
return
}

View File

@ -24,17 +24,17 @@ func TestBoltPieceCompletion(t *testing.T) {
b, err := pc.Get(pk)
require.NoError(t, err)
assert.False(t, b)
assert.False(t, b.Ok)
require.NoError(t, pc.Set(pk, false))
b, err = pc.Get(pk)
require.NoError(t, err)
assert.False(t, b)
assert.Equal(t, Completion{Complete: false, Ok: true}, b)
require.NoError(t, pc.Set(pk, true))
b, err = pc.Get(pk)
require.NoError(t, err)
assert.True(t, b)
assert.Equal(t, Completion{Complete: true, Ok: true}, b)
}

View File

@ -6,10 +6,14 @@ import (
"github.com/anacrolix/torrent/metainfo"
)
type PieceCompletionGetSetter interface {
Get(metainfo.PieceKey) (Completion, error)
Set(metainfo.PieceKey, bool) error
}
// Implementations track the completion of pieces. It must be concurrent-safe.
type PieceCompletion interface {
Get(metainfo.PieceKey) (bool, error)
Set(metainfo.PieceKey, bool) error
PieceCompletionGetSetter
Close() error
}

View File

@ -8,32 +8,30 @@ import (
type mapPieceCompletion struct {
mu sync.Mutex
m map[metainfo.PieceKey]struct{}
m map[metainfo.PieceKey]bool
}
var _ PieceCompletion = (*mapPieceCompletion)(nil)
func NewMapPieceCompletion() PieceCompletion {
return &mapPieceCompletion{m: make(map[metainfo.PieceKey]struct{})}
return &mapPieceCompletion{m: make(map[metainfo.PieceKey]bool)}
}
func (*mapPieceCompletion) Close() error { return nil }
func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (bool, error) {
func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) {
me.mu.Lock()
_, ok := me.m[pk]
me.mu.Unlock()
return ok, nil
defer me.mu.Unlock()
c.Complete, c.Ok = me.m[pk]
return
}
func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
me.mu.Lock()
if b {
defer me.mu.Unlock()
if me.m == nil {
me.m = make(map[metainfo.PieceKey]struct{})
me.m = make(map[metainfo.PieceKey]bool)
}
me.m[pk] = struct{}{}
} else {
delete(me.m, pk)
}
me.mu.Unlock()
me.m[pk] = b
return nil
}

View File

@ -86,7 +86,7 @@ func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
// Create a view onto the file-based torrent storage.
_io := fileTorrentImplIO{fts}
// Return the appropriate segments of this.
return &fileStoragePiece{
return &filePieceImpl{
fts,
p,
missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),

50
storage/file_piece.go Normal file
View File

@ -0,0 +1,50 @@
package storage
import (
"io"
"os"
"github.com/anacrolix/torrent/metainfo"
)
type filePieceImpl struct {
*fileTorrentImpl
p metainfo.Piece
io.WriterAt
io.ReaderAt
}
var _ PieceImpl = (*filePieceImpl)(nil)
func (me *filePieceImpl) pieceKey() metainfo.PieceKey {
return metainfo.PieceKey{me.infoHash, me.p.Index()}
}
func (fs *filePieceImpl) Completion() Completion {
c, err := fs.completion.Get(fs.pieceKey())
if err != nil || !c.Ok {
return Completion{Ok: false}
}
// If it's allegedly complete, check that its constituent files have the
// necessary length.
for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
s, err := os.Stat(fs.fileInfoName(fi))
if err != nil || s.Size() < fi.Length {
c.Complete = false
break
}
}
if !c.Complete {
// The completion was wrong, fix it.
fs.completion.Set(fs.pieceKey(), false)
}
return c
}
func (fs *filePieceImpl) MarkComplete() error {
return fs.completion.Set(fs.pieceKey(), true)
}
func (fs *filePieceImpl) MarkNotComplete() error {
return fs.completion.Set(fs.pieceKey(), false)
}

View File

@ -1,51 +0,0 @@
package storage
import (
"io"
"os"
"github.com/anacrolix/torrent/metainfo"
)
type fileStoragePiece struct {
*fileTorrentImpl
p metainfo.Piece
io.WriterAt
io.ReaderAt
}
func (me *fileStoragePiece) pieceKey() metainfo.PieceKey {
return metainfo.PieceKey{me.infoHash, me.p.Index()}
}
func (fs *fileStoragePiece) GetIsComplete() bool {
ret, err := fs.completion.Get(fs.pieceKey())
if err != nil || !ret {
return false
}
// If it's allegedly complete, check that its constituent files have the
// necessary length.
for _, fi := range extentCompleteRequiredLengths(fs.p.Info, fs.p.Offset(), fs.p.Length()) {
s, err := os.Stat(fs.fileInfoName(fi))
if err != nil || s.Size() < fi.Length {
ret = false
break
}
}
if ret {
return true
}
// The completion was wrong, fix it.
fs.completion.Set(fs.pieceKey(), false)
return false
}
func (fs *fileStoragePiece) MarkComplete() error {
fs.completion.Set(fs.pieceKey(), true)
return nil
}
func (fs *fileStoragePiece) MarkNotComplete() error {
fs.completion.Set(fs.pieceKey(), false)
return nil
}

View File

@ -31,5 +31,10 @@ type PieceImpl interface {
MarkComplete() error
MarkNotComplete() error
// Returns true if the piece is complete.
GetIsComplete() bool
Completion() Completion
}
type Completion struct {
Complete bool
Ok bool
}

View File

@ -29,7 +29,7 @@ func testIssue95(t *testing.T, c ClientImpl) {
require.NoError(t, err)
t2p := t2.Piece(i2.Piece(0))
assert.NoError(t, t1.Close())
assert.NotPanics(t, func() { t2p.GetIsComplete() })
assert.NotPanics(t, func() { t2p.Completion() })
}
func TestIssue95File(t *testing.T) {

View File

@ -27,7 +27,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl)
n, err := p.ReadAt(make([]byte, 1), 0)
require.Error(t, err)
require.EqualValues(t, 0, n)
require.False(t, p.GetIsComplete())
require.False(t, p.Completion().Complete)
}
func TestMarkedCompleteMissingOnReadFile(t *testing.T) {

View File

@ -76,9 +76,9 @@ func (me mmapStoragePiece) pieceKey() metainfo.PieceKey {
return metainfo.PieceKey{me.ih, me.p.Index()}
}
func (sp mmapStoragePiece) GetIsComplete() (ret bool) {
ret, _ = sp.pc.Get(sp.pieceKey())
return
func (sp mmapStoragePiece) Completion() Completion {
c, _ := sp.pc.Get(sp.pieceKey())
return c
}
func (sp mmapStoragePiece) MarkComplete() error {

View File

@ -48,9 +48,12 @@ type piecePerResourcePiece struct {
i resource.Instance
}
func (s piecePerResourcePiece) GetIsComplete() bool {
func (s piecePerResourcePiece) Completion() Completion {
fi, err := s.c.Stat()
return err == nil && fi.Size() == s.p.Length()
return Completion{
Complete: err == nil && fi.Size() == s.p.Length(),
Ok: true,
}
}
func (s piecePerResourcePiece) MarkComplete() error {
@ -62,7 +65,7 @@ func (s piecePerResourcePiece) MarkNotComplete() error {
}
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
if s.GetIsComplete() {
if s.Completion().Complete {
return s.c.ReadAt(b, off)
} else {
return s.i.ReadAt(b, off)

View File

@ -37,8 +37,9 @@ type Piece struct {
}
func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
if p.GetIsComplete() {
err = errors.New("piece completed")
c := p.Completion()
if c.Ok && c.Complete {
err = errors.New("piece already completed")
return
}
if off+int64(len(b)) > p.mip.Length() {

View File

@ -112,6 +112,8 @@ type Torrent struct {
pendingPieces bitmap.Bitmap
// A cache of completed piece indices.
completedPieces bitmap.Bitmap
// Pieces that need to be hashed.
piecesQueuedForHash bitmap.Bitmap
// A pool of piece priorities []int for assignment to new connections.
// These "inclinations" are used to give connections preference for
@ -190,8 +192,8 @@ func (t *Torrent) pieceComplete(piece int) bool {
return t.completedPieces.Get(piece)
}
func (t *Torrent) pieceCompleteUncached(piece int) bool {
return t.pieces[piece].Storage().GetIsComplete()
func (t *Torrent) pieceCompleteUncached(piece int) storage.Completion {
return t.pieces[piece].Storage().Completion()
}
// There's a connection to that address already.
@ -332,13 +334,12 @@ func (t *Torrent) setInfoBytes(b []byte) error {
}
for i := range t.pieces {
t.updatePieceCompletion(i)
// t.pieces[i].QueuedForHash = true
p := &t.pieces[i]
if !p.storageCompletionOk {
log.Printf("piece %s completion unknown, queueing check", p)
t.queuePieceCheck(i)
}
}
// go func() {
// for i := range t.pieces {
// t.verifyPiece(i)
// }
// }()
return nil
}
@ -392,7 +393,7 @@ func (t *Torrent) pieceState(index int) (ret PieceState) {
if t.pieceComplete(index) {
ret.Complete = true
}
if p.queuedForHash || p.hashing {
if p.queuedForHash() || p.hashing {
ret.Checking = true
}
if !ret.Complete && t.piecePartiallyDownloaded(index) {
@ -738,7 +739,7 @@ func (t *Torrent) wantPieceIndex(index int) bool {
return false
}
p := &t.pieces[index]
if p.queuedForHash {
if p.queuedForHash() {
return false
}
if p.hashing {
@ -1005,8 +1006,10 @@ func (t *Torrent) putPieceInclination(pi []int) {
func (t *Torrent) updatePieceCompletion(piece int) {
pcu := t.pieceCompleteUncached(piece)
changed := t.completedPieces.Get(piece) != pcu
t.completedPieces.Set(piece, pcu)
p := &t.pieces[piece]
changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
p.storageCompletionOk = pcu.Ok
t.completedPieces.Set(piece, pcu.Complete)
if changed {
t.pieceCompletionChanged(piece)
}
@ -1517,12 +1520,19 @@ func (t *Torrent) verifyPiece(piece int) {
cl.mu.Lock()
defer cl.mu.Unlock()
p := &t.pieces[piece]
defer func() {
p.numVerifies++
cl.event.Broadcast()
}()
for p.hashing || t.storage == nil {
cl.event.Wait()
}
p.queuedForHash = false
if !p.t.piecesQueuedForHash.Remove(piece) {
panic("piece was not queued")
}
if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece)
log.Println("early return", t.closed.IsSet(), t.pieceComplete(piece))
return
}
p.hashing = true
@ -1530,7 +1540,6 @@ func (t *Torrent) verifyPiece(piece int) {
cl.mu.Unlock()
sum := t.hashPiece(piece)
cl.mu.Lock()
p.numVerifies++
p.hashing = false
t.pieceHashed(piece, sum == p.hash)
}
@ -1557,10 +1566,10 @@ func (t *Torrent) connsAsSlice() (ret []*connection) {
// Currently doesn't really queue, but should in the future.
func (t *Torrent) queuePieceCheck(pieceIndex int) {
piece := &t.pieces[pieceIndex]
if piece.queuedForHash {
if piece.queuedForHash() {
return
}
piece.queuedForHash = true
t.piecesQueuedForHash.Add(pieceIndex)
t.publishPieceChange(pieceIndex)
go t.verifyPiece(pieceIndex)
}