diff --git a/client_test.go b/client_test.go index 5696add4..0e9f1299 100644 --- a/client_test.go +++ b/client_test.go @@ -294,9 +294,14 @@ func TestClientTransferSmallCache(t *testing.T) { } func TestClientTransferVarious(t *testing.T) { - for _, lsf := range []func(*filecache.Cache) storage.Client{ - fileCachePieceFileStorage, - fileCachePieceResourceStorage, + for _, ls := range []storageFactory{ + NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ + Wrapper: fileCachePieceFileStorage, + }), + NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ + Wrapper: fileCachePieceResourceStorage, + }), + storage.NewBoltDB, } { for _, ss := range []func(string) storage.Client{ storage.NewFile, @@ -304,21 +309,17 @@ func TestClientTransferVarious(t *testing.T) { } { for _, responsive := range []bool{false, true} { testClientTransfer(t, testClientTransferParams{ - Responsive: responsive, - SeederStorage: ss, - LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ - Wrapper: lsf, - }), + Responsive: responsive, + SeederStorage: ss, + LeecherStorage: ls, }) for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} { testClientTransfer(t, testClientTransferParams{ - SeederStorage: ss, - Responsive: responsive, - SetReadahead: true, - Readahead: readahead, - LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ - Wrapper: lsf, - }), + SeederStorage: ss, + Responsive: responsive, + SetReadahead: true, + Readahead: readahead, + LeecherStorage: ls, }) } } diff --git a/storage/boltdb.go b/storage/boltdb.go new file mode 100644 index 00000000..4a6928a5 --- /dev/null +++ b/storage/boltdb.go @@ -0,0 +1,145 @@ +package storage + +import ( + "encoding/binary" + "path/filepath" + + "github.com/boltdb/bolt" + + "github.com/anacrolix/torrent/metainfo" +) + +var ( + data = []byte("data") + completed = []byte("completed") +) + +type boltDBClient struct { + db *bolt.DB +} + +type boltDBTorrent struct { + cl *boltDBClient + ih metainfo.Hash +} + +type boltDBPiece struct { + db *bolt.DB + p metainfo.Piece + key [24]byte +} + +func NewBoltDB(filePath string) Client { + ret := &boltDBClient{} + var err error + ret.db, err = bolt.Open(filepath.Join(filePath, "bolt.db"), 0600, nil) + if err != nil { + panic(err) + } + return ret +} + +func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) { + return &boltDBTorrent{me, infoHash}, nil +} + +func (me *boltDBTorrent) Piece(p metainfo.Piece) Piece { + ret := &boltDBPiece{p: p, db: me.cl.db} + copy(ret.key[:], me.ih[:]) + binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index())) + return ret +} + +func (boltDBTorrent) Close() error { return nil } + +func (me *boltDBPiece) GetIsComplete() (complete bool) { + err := me.db.View(func(tx *bolt.Tx) error { + cb := tx.Bucket(completed) + // db := tx.Bucket(data) + complete = + cb != nil && len(cb.Get(me.key[:])) != 0 + // db != nil && int64(len(db.Get(me.key[:]))) == me.p.Length() + return nil + }) + if err != nil { + panic(err) + } + return +} + +func (me *boltDBPiece) MarkComplete() error { + return me.db.Update(func(tx *bolt.Tx) (err error) { + b, err := tx.CreateBucketIfNotExists(completed) + if err != nil { + return + } + b.Put(me.key[:], make([]byte, 1)) + return + }) +} + +func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { + err = me.db.View(func(tx *bolt.Tx) error { + db := tx.Bucket(data) + if db == nil { + return nil + } + ci := off / (1 << 14) + off %= 1 << 14 + for len(b) != 0 { + ck := me.chunkKey(int(ci)) + _b := db.Get(ck[:]) + if len(_b) != 1<<14 { + break + } + n1 := copy(b, _b[off:]) + off = 0 + ci++ + b = b[n1:] + n += n1 + } + return nil + }) + // if n == 0 && err == nil { + // if off < me.p.Length() { + // err = io.ErrUnexpectedEOF + // } else { + // err = io.EOF + // } + // } + // // log.Println(n, err) + return +} + +func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) { + copy(ret[:], me.key[:]) + binary.BigEndian.PutUint16(ret[24:], uint16(index)) + return +} + +func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) { + err = me.db.Update(func(tx *bolt.Tx) error { + db, err := tx.CreateBucketIfNotExists(data) + if err != nil { + return err + } + ci := off / (1 << 14) + off %= 1 << 14 + for len(b) != 0 { + _b := make([]byte, 1<<14) + ck := me.chunkKey(int(ci)) + copy(_b, db.Get(ck[:])) + n1 := copy(_b[off:], b) + db.Put(ck[:], _b) + if n1 > len(b) { + break + } + b = b[n1:] + off = 0 + ci++ + n += n1 + } + return nil + }) + return +} diff --git a/storage/issue96_test.go b/storage/issue96_test.go index 81fd178c..3339b9b0 100644 --- a/storage/issue96_test.go +++ b/storage/issue96_test.go @@ -33,3 +33,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) { func TestMarkedCompleteMissingOnReadFile(t *testing.T) { testMarkedCompleteMissingOnRead(t, NewFile) } + +func TestMarkedCompleteMissingOnReadFileBoltDB(t *testing.T) { + testMarkedCompleteMissingOnRead(t, NewBoltDB) +}