Implement sqlite directly without using piece resources
This commit is contained in:
parent
8706d326ba
commit
afea28091f
2
go.sum
2
go.sum
@ -268,6 +268,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
|
|||||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||||
|
github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0 h1:zvFSvII5rTbMZ3idAqSUjUCDgZFbWMKzxQot3/Y7nzA=
|
||||||
|
github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
|
||||||
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
|
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
|
||||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||||
github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw=
|
github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw=
|
||||||
|
162
storage/sqlite/new.go
Normal file
162
storage/sqlite/new.go
Normal file
@ -0,0 +1,162 @@
|
|||||||
|
package sqliteStorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"crawshaw.io/sqlite"
|
||||||
|
"crawshaw.io/sqlite/sqlitex"
|
||||||
|
"github.com/anacrolix/torrent/metainfo"
|
||||||
|
"github.com/anacrolix/torrent/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NewDirectStorageOpts struct {
|
||||||
|
NewPoolOpts
|
||||||
|
ProvOpts func(*ProviderOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// A convenience function that creates a connection pool, resource provider, and a pieces storage
|
||||||
|
// ClientImpl and returns them all with a Close attached.
|
||||||
|
func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
|
||||||
|
conns, provOpts, err := NewPool(opts.NewPoolOpts)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if f := opts.ProvOpts; f != nil {
|
||||||
|
f(&provOpts)
|
||||||
|
}
|
||||||
|
provOpts.BatchWrites = false
|
||||||
|
prov, err := NewProvider(conns, provOpts)
|
||||||
|
if err != nil {
|
||||||
|
conns.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return &client{
|
||||||
|
prov: prov,
|
||||||
|
conn: prov.pool.Get(nil),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type client struct {
|
||||||
|
l sync.Mutex
|
||||||
|
prov *provider
|
||||||
|
conn conn
|
||||||
|
blob *sqlite.Blob
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
|
||||||
|
return torrent{c}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *client) Close() error {
|
||||||
|
if c.blob != nil {
|
||||||
|
c.blob.Close()
|
||||||
|
}
|
||||||
|
c.prov.pool.Put(c.conn)
|
||||||
|
return c.prov.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type torrent struct {
|
||||||
|
c *client
|
||||||
|
}
|
||||||
|
|
||||||
|
func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
|
||||||
|
err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
|
||||||
|
rowid = stmt.ColumnInt64(0)
|
||||||
|
return nil
|
||||||
|
}, name)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if rowid != 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rowid = c.LastInsertRowID()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
|
||||||
|
t.c.l.Lock()
|
||||||
|
defer t.c.l.Unlock()
|
||||||
|
name := p.Hash().HexString()
|
||||||
|
return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t torrent) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type piece struct {
|
||||||
|
conn conn
|
||||||
|
name string
|
||||||
|
l *sync.Mutex
|
||||||
|
length int64
|
||||||
|
blob **sqlite.Blob
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p2 piece) getBlob() *sqlite.Blob {
|
||||||
|
if *p2.blob != nil {
|
||||||
|
err := (*p2.blob).Close()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
*p2.blob = nil
|
||||||
|
}
|
||||||
|
rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
*p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return *p2.blob
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
|
||||||
|
p2.l.Lock()
|
||||||
|
defer p2.l.Unlock()
|
||||||
|
blob := p2.getBlob()
|
||||||
|
return blob.ReadAt(p, off)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
|
||||||
|
p2.l.Lock()
|
||||||
|
defer p2.l.Unlock()
|
||||||
|
return p2.getBlob().WriteAt(p, off)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p2 piece) MarkComplete() error {
|
||||||
|
p2.l.Lock()
|
||||||
|
defer p2.l.Unlock()
|
||||||
|
err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
changes := p2.conn.Changes()
|
||||||
|
if changes != 1 {
|
||||||
|
panic(changes)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p2 piece) MarkNotComplete() error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p2 piece) Completion() (ret storage.Completion) {
|
||||||
|
p2.l.Lock()
|
||||||
|
defer p2.l.Unlock()
|
||||||
|
err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
|
||||||
|
ret.Complete = stmt.ColumnInt(0) != 0
|
||||||
|
return nil
|
||||||
|
}, p2.name)
|
||||||
|
ret.Ok = err == nil
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
@ -68,6 +68,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
|
|||||||
name text,
|
name text,
|
||||||
last_used timestamp default (datetime('now')),
|
last_used timestamp default (datetime('now')),
|
||||||
data blob,
|
data blob,
|
||||||
|
verified bool,
|
||||||
primary key (name)
|
primary key (name)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -1,5 +1,11 @@
|
|||||||
package test
|
package test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
_ "github.com/anacrolix/envpprof"
|
_ "github.com/anacrolix/envpprof"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log.SetFlags(log.Flags() | log.Lshortfile)
|
||||||
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
@ -110,6 +111,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
|
|||||||
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
|
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
|
||||||
}
|
}
|
||||||
cfg.Seed = false
|
cfg.Seed = false
|
||||||
|
cfg.Debug = true
|
||||||
if ps.ConfigureLeecher.Config != nil {
|
if ps.ConfigureLeecher.Config != nil {
|
||||||
ps.ConfigureLeecher.Config(cfg)
|
ps.ConfigureLeecher.Config(cfg)
|
||||||
}
|
}
|
||||||
@ -330,6 +332,20 @@ func TestClientTransferVarious(t *testing.T) {
|
|||||||
Wrapper: fileCachePieceResourceStorage,
|
Wrapper: fileCachePieceResourceStorage,
|
||||||
}), 0},
|
}), 0},
|
||||||
{"Boltdb", storage.NewBoltDB, 0},
|
{"Boltdb", storage.NewBoltDB, 0},
|
||||||
|
{"SqliteDirect", func(s string) storage.ClientImplCloser {
|
||||||
|
path := filepath.Join(s, "sqlite3.db")
|
||||||
|
log.Print(path)
|
||||||
|
cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{
|
||||||
|
NewPoolOpts: sqliteStorage.NewPoolOpts{
|
||||||
|
Path: path,
|
||||||
|
},
|
||||||
|
ProvOpts: nil,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return cl
|
||||||
|
}, 0},
|
||||||
sqliteLeecherStorageTestCase(1),
|
sqliteLeecherStorageTestCase(1),
|
||||||
sqliteLeecherStorageTestCase(2),
|
sqliteLeecherStorageTestCase(2),
|
||||||
// This should use a number of connections equal to the number of CPUs
|
// This should use a number of connections equal to the number of CPUs
|
||||||
@ -362,7 +378,7 @@ func TestClientTransferVarious(t *testing.T) {
|
|||||||
GOMAXPROCS: ls.gomaxprocs,
|
GOMAXPROCS: ls.gomaxprocs,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
|
for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
|
||||||
t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
|
t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
|
||||||
testClientTransfer(t, testClientTransferParams{
|
testClientTransfer(t, testClientTransferParams{
|
||||||
SeederStorage: ss.f,
|
SeederStorage: ss.f,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user