Disable data downloading on storage write errors

Also add a hook and Torrent methods to modify this.
This commit is contained in:
Matt Joiner 2020-02-22 19:40:50 +11:00
parent ddc61845ac
commit bae791a5a2
3 changed files with 133 additions and 30 deletions

View File

@ -509,7 +509,7 @@ func (cn *PeerConn) request(r request, mw messageWriter) bool {
}
func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
if !cn.t.networkingEnabled {
if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
if !cn.setInterested(false, msg) {
return
}
@ -1246,9 +1246,10 @@ func (c *PeerConn) receiveChunk(msg *pp.Message) error {
piece.decrementPendingWrites()
if err != nil {
panic(fmt.Sprintf("error writing chunk: %v", err))
c.logger.Printf("error writing received chunk %v: %v", req, err)
t.pendRequest(req)
t.updatePieceCompletion(pieceIndex(msg.Index))
//t.updatePieceCompletion(pieceIndex(msg.Index))
t.onWriteChunkErr(err)
return nil
}

View File

@ -2,7 +2,9 @@ package test
import (
"errors"
"log"
"os"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@ -14,11 +16,17 @@ import (
"github.com/anacrolix/torrent/storage"
)
func justOneNetwork(cc *torrent.ClientConfig) {
cc.DisableTCP = true
cc.DisableIPv4 = true
}
func TestReceiveChunkStorageFailure(t *testing.T) {
seederDataDir, metainfo := testutil.GreetingTestTorrent()
defer os.RemoveAll(seederDataDir)
seederClientConfig := torrent.TestingConfig()
seederClientConfig.Debug = true
justOneNetwork(seederClientConfig)
seederClientStorage := storage.NewMMap(seederDataDir)
defer seederClientStorage.Close()
seederClientConfig.DefaultStorage = seederClientStorage
@ -29,57 +37,110 @@ func TestReceiveChunkStorageFailure(t *testing.T) {
defer testutil.ExportStatusWriter(seederClient, "s")()
leecherClientConfig := torrent.TestingConfig()
leecherClientConfig.Debug = true
justOneNetwork(leecherClientConfig)
leecherClient, err := torrent.NewClient(leecherClientConfig)
require.NoError(t, err)
defer testutil.ExportStatusWriter(leecherClient, "l")()
info, err := metainfo.UnmarshalInfo()
require.NoError(t, err)
leecherStorage := diskFullStorage{
pieces: make([]pieceState, info.NumPieces()),
data: make([]byte, info.TotalLength()),
}
defer leecherStorage.Close()
leecherTorrent, new, err := leecherClient.AddTorrentSpec(&torrent.TorrentSpec{
InfoHash: metainfo.HashInfoBytes(),
Storage: diskFullStorage{},
Storage: &leecherStorage,
})
leecherStorage.t = leecherTorrent
require.NoError(t, err)
assert.True(t, new)
seederTorrent, err := seederClient.AddTorrent(metainfo)
require.NoError(t, err)
// Tell the seeder to find the leecher. Is it guaranteed seeders will always try to do this?
seederTorrent.AddClientPeer(leecherClient)
//leecherTorrent.AddClientPeer(seederClient)
<-leecherTorrent.GotInfo()
assertReadAllGreeting(t, leecherTorrent.NewReader())
}
type diskFullStorage struct{}
func (me diskFullStorage) ReadAt(p []byte, off int64) (n int, err error) {
panic("implement me")
type pieceState struct {
complete bool
}
func (me diskFullStorage) WriteAt(p []byte, off int64) (n int, err error) {
return 1, errors.New("disk full")
type diskFullStorage struct {
pieces []pieceState
t *torrent.Torrent
defaultHandledWriteChunkError bool
data []byte
mu sync.Mutex
diskNotFull bool
}
func (me diskFullStorage) MarkComplete() error {
panic("implement me")
func (me *diskFullStorage) Piece(p metainfo.Piece) storage.PieceImpl {
return pieceImpl{
mip: p,
diskFullStorage: me,
}
func (me diskFullStorage) MarkNotComplete() error {
panic("implement me")
}
func (me diskFullStorage) Completion() storage.Completion {
return storage.Completion{
Complete: false,
Ok: true,
}
}
func (me diskFullStorage) Piece(metainfo.Piece) storage.PieceImpl {
return me
}
func (me diskFullStorage) Close() error {
panic("implement me")
return nil
}
func (d diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
return d, nil
return &d, nil
}
type pieceImpl struct {
mip metainfo.Piece
*diskFullStorage
}
func (me pieceImpl) state() *pieceState {
return &me.diskFullStorage.pieces[me.mip.Index()]
}
func (me pieceImpl) ReadAt(p []byte, off int64) (n int, err error) {
off += me.mip.Offset()
return copy(p, me.data[off:]), nil
}
func (me pieceImpl) WriteAt(p []byte, off int64) (int, error) {
off += me.mip.Offset()
if !me.defaultHandledWriteChunkError {
go func() {
me.t.SetOnWriteChunkError(func(err error) {
log.Printf("got write chunk error to custom handler: %v", err)
me.mu.Lock()
me.diskNotFull = true
me.mu.Unlock()
me.t.AllowDataDownload()
})
me.t.AllowDataDownload()
}()
me.defaultHandledWriteChunkError = true
}
me.mu.Lock()
defer me.mu.Unlock()
if me.diskNotFull {
return copy(me.data[off:], p), nil
}
return copy(me.data[off:], p[:1]), errors.New("disk full")
}
func (me pieceImpl) MarkComplete() error {
me.state().complete = true
return nil
}
func (me pieceImpl) MarkNotComplete() error {
panic("implement me")
}
func (me pieceImpl) Completion() storage.Completion {
return storage.Completion{
Complete: me.state().complete,
Ok: true,
}
}

View File

@ -41,6 +41,8 @@ type Torrent struct {
logger log.Logger
networkingEnabled bool
dataDownloadDisallowed bool
userOnWriteChunkErr func(error)
// Determines what chunks to request from peers.
requestStrategy requestStrategy
@ -1810,3 +1812,42 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
return torrentRequestStrategyCallbacks{t}
}
func (t *Torrent) onWriteChunkErr(err error) {
if t.userOnWriteChunkErr != nil {
go t.userOnWriteChunkErr(err)
return
}
t.disallowDataDownloadLocked()
}
func (t *Torrent) DisallowDataDownload() {
t.cl.lock()
defer t.cl.unlock()
t.disallowDataDownloadLocked()
}
func (t *Torrent) disallowDataDownloadLocked() {
log.Printf("disallowing data download")
t.dataDownloadDisallowed = true
for c := range t.conns {
c.updateRequests()
}
}
func (t *Torrent) AllowDataDownload() {
t.cl.lock()
defer t.cl.unlock()
log.Printf("AllowDataDownload")
t.dataDownloadDisallowed = false
for c := range t.conns {
c.updateRequests()
}
}
func (t *Torrent) SetOnWriteChunkError(f func(error)) {
t.cl.lock()
defer t.cl.unlock()
t.userOnWriteChunkErr = f
}