torrent/data/pieceStore/store.go

293 lines
5.9 KiB
Go

package pieceStore
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"errors"
"io"
"log"
"os"
"path"
"strconv"
"sync"
"time"
"github.com/anacrolix/torrent/data/pieceStore/dataBackend"
"github.com/anacrolix/torrent/metainfo"
)
type store struct {
db dataBackend.I
// Limit backend requests.
requestPool chan struct{}
mu sync.Mutex
// The cached completion state for pieces.
completion map[[20]byte]bool
lastError time.Time
}
func (me *store) completedPiecePath(p metainfo.Piece) string {
return path.Join("completed", hex.EncodeToString(p.Hash()))
}
func (me *store) incompletePiecePath(p metainfo.Piece) string {
return path.Join(
"incomplete",
strconv.FormatInt(int64(os.Getpid()), 10),
hex.EncodeToString(p.Hash()))
}
func (me *store) OpenTorrentData(info *metainfo.Info) *data {
return &data{info, me}
}
func New(db dataBackend.I) *store {
s := &store{
db: db,
requestPool: make(chan struct{}, 5),
}
return s
}
// Turns 40 byte hex string into its equivalent binary byte array.
func hexStringPieceHashArray(s string) (ret [20]byte, ok bool) {
if len(s) != 40 {
return
}
n, err := hex.Decode(ret[:], []byte(s))
if err != nil {
return
}
if n != 20 {
panic(n)
}
ok = true
return
}
func sliceToPieceHashArray(b []byte) (ret [20]byte) {
n := copy(ret[:], b)
if n != 20 {
panic(n)
}
return
}
func pieceHashArray(p metainfo.Piece) [20]byte {
return sliceToPieceHashArray(p.Hash())
}
func (me *store) completionKnown(p metainfo.Piece) bool {
me.mu.Lock()
_, ok := me.completion[pieceHashArray(p)]
me.mu.Unlock()
return ok
}
func (me *store) isComplete(p metainfo.Piece) bool {
me.mu.Lock()
ret, _ := me.completion[pieceHashArray(p)]
me.mu.Unlock()
return ret
}
func (me *store) setCompletion(p metainfo.Piece, complete bool) {
me.mu.Lock()
if me.completion == nil {
me.completion = make(map[[20]byte]bool)
}
me.completion[pieceHashArray(p)] = complete
me.mu.Unlock()
}
func (me *store) pieceComplete(p metainfo.Piece) bool {
if me.completionKnown(p) {
return me.isComplete(p)
}
// Prevent a errors from stalling the caller.
if !me.lastError.IsZero() && time.Since(me.lastError) < time.Second {
return false
}
me.requestPool <- struct{}{}
defer func() {
<-me.requestPool
}()
length, err := me.db.GetLength(me.completedPiecePath(p))
if err == dataBackend.ErrNotFound {
me.setCompletion(p, false)
return false
}
if err != nil {
me.lastError = time.Now()
log.Printf("%+v", err)
return false
}
complete := length == p.Length()
if !complete {
log.Printf("completed piece %x has wrong length: %d", p.Hash(), length)
}
me.setCompletion(p, complete)
return complete
}
func (me *store) pieceWriteAt(p metainfo.Piece, b []byte, off int64) (n int, err error) {
if me.pieceComplete(p) {
err = errors.New("already have piece")
return
}
me.requestPool <- struct{}{}
defer func() {
<-me.requestPool
}()
f, err := me.db.Open(me.incompletePiecePath(p), os.O_WRONLY|os.O_CREATE)
if err != nil {
return
}
defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()
_, err = f.Seek(off, os.SEEK_SET)
if err != nil {
return
}
n, err = f.Write(b)
return
}
// Wraps a Closer, releases a slot from a channel pool the first time Close is
// called.
type poolCloser struct {
mu sync.Mutex
released bool
pool <-chan struct{}
io.Closer
}
func (me *poolCloser) Close() (err error) {
err = me.Closer.Close()
me.mu.Lock()
if !me.released {
<-me.pool
me.released = true
}
me.mu.Unlock()
return
}
func (me *store) forgetCompletions() {
me.mu.Lock()
me.completion = nil
me.mu.Unlock()
}
func (me *store) getPieceRange(p metainfo.Piece, off, n int64) (ret io.ReadCloser, err error) {
me.requestPool <- struct{}{}
rc, err := me.db.OpenSection(me.completedPiecePath(p), off, n)
if err == dataBackend.ErrNotFound {
if me.isComplete(p) {
me.forgetCompletions()
}
me.setCompletion(p, false)
rc, err = me.db.OpenSection(me.incompletePiecePath(p), off, n)
}
if err == dataBackend.ErrNotFound {
<-me.requestPool
err = io.ErrUnexpectedEOF
return
}
if err != nil {
<-me.requestPool
return
}
// Wrap up the response body so that the request slot is released when the
// response body is closed.
ret = struct {
io.Reader
io.Closer
}{
rc,
&poolCloser{
pool: me.requestPool,
Closer: rc,
},
}
return
}
func (me *store) pieceReadAt(p metainfo.Piece, b []byte, off int64) (n int, err error) {
rc, err := me.getPieceRange(p, off, int64(len(b)))
if err != nil {
return
}
defer rc.Close()
n, err = io.ReadFull(rc, b)
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
}
func (me *store) removePath(path string) (err error) {
me.requestPool <- struct{}{}
defer func() {
<-me.requestPool
}()
err = me.db.Delete(path)
return
}
// Remove the completed piece if it exists, and mark the piece not completed.
// Mustn't fail.
func (me *store) deleteCompleted(p metainfo.Piece) {
if err := me.removePath(me.completedPiecePath(p)); err != nil {
panic(err)
}
me.setCompletion(p, false)
}
func (me *store) hashCopyFile(from, to string, n int64) (hash []byte, err error) {
// Yes, 2 requests occur here simultaneously, but we're not trying to be
// pedantic.
me.requestPool <- struct{}{}
defer func() {
<-me.requestPool
}()
src, err := me.db.OpenSection(from, 0, n)
if err != nil {
return
}
defer src.Close()
hasher := sha1.New()
tee := io.TeeReader(src, hasher)
dest, err := me.db.Open(to, os.O_WRONLY|os.O_CREATE)
if err != nil {
return
}
defer dest.Close()
_, err = io.Copy(dest, tee)
if err != nil {
return
}
hash = hasher.Sum(nil)
return
}
func (me *store) PieceCompleted(p metainfo.Piece) (err error) {
hash, err := me.hashCopyFile(me.incompletePiecePath(p), me.completedPiecePath(p), p.Length())
if err == nil && !bytes.Equal(hash, p.Hash()) {
err = errors.New("piece incomplete")
}
if err != nil {
me.deleteCompleted(p)
return
}
me.removePath(me.incompletePiecePath(p))
me.setCompletion(p, true)
return
}