torrentfs: Match the active torrents to those found in the given directory

This commit is contained in:
Matt Joiner 2014-07-23 01:54:11 +10:00
parent 6d52918540
commit 6158b1f6ec
4 changed files with 199 additions and 35 deletions

View File

@ -895,6 +895,22 @@ func (cl *Client) AddMagnet(uri string) (err error) {
return
}
func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
me.mu.Lock()
defer me.mu.Unlock()
t, ok := me.torrents[infoHash]
if !ok {
err = fmt.Errorf("no such torrent")
return
}
err = t.Close()
if err != nil {
panic(err)
}
delete(me.torrents, infoHash)
return
}
func (me *Client) addTorrent(t *torrent) (err error) {
if _, ok := me.torrents[t.InfoHash]; ok {
err = fmt.Errorf("torrent infohash collision")
@ -929,6 +945,15 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
return
}
func (me *Client) AddTorrentFromFile(name string) (err error) {
mi, err := metainfo.LoadFromFile(name)
if err != nil {
err = fmt.Errorf("error loading metainfo from file: %s", err)
return
}
return me.AddTorrent(mi)
}
func (cl *Client) listenerAnnouncePort() (port int16) {
l := cl.Listener
if l == nil {
@ -1206,6 +1231,10 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
for p.Hashing {
cl.event.Wait()
}
if t.isClosed() {
cl.mu.Unlock()
return
}
p.Hashing = true
p.QueuedForHash = false
cl.mu.Unlock()

View File

@ -1,6 +1,8 @@
package main
import (
"bitbucket.org/anacrolix/go.torrent/util"
"bitbucket.org/anacrolix/go.torrent/util/dirwatch"
"flag"
"log"
"net"
@ -17,7 +19,6 @@ import (
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
"bitbucket.org/anacrolix/go.torrent/fs"
"github.com/anacrolix/libtorgo/metainfo"
)
var (
@ -68,34 +69,6 @@ func setSignalHandlers() {
}()
}
func addTorrent(c *torrent.Client, file string) {
metaInfo, err := metainfo.LoadFromFile(file)
if err != nil {
log.Print(err)
return
}
err = c.AddTorrent(metaInfo)
if err != nil {
log.Print(err)
return
}
}
func addTorrentDir(c *torrent.Client, _path string) {
torrentDir, err := os.Open(torrentPath)
defer torrentDir.Close()
if err != nil {
log.Fatal(err)
}
names, err := torrentDir.Readdirnames(-1)
if err != nil {
log.Fatal(err)
}
for _, name := range names {
go addTorrent(c, filepath.Join(_path, name))
}
}
func addTestPeer(client *torrent.Client) {
for _, t := range client.Torrents() {
if testPeerAddr != nil {
@ -140,7 +113,33 @@ func main() {
client.WriteStatus(w)
})
client.Start()
addTorrentDir(client, torrentPath)
dw, err := dirwatch.New(torrentPath)
if err != nil {
log.Fatal(err)
}
go func() {
for ev := range dw.Events {
switch ev.Change {
case dirwatch.Added:
if ev.TorrentFilePath != "" {
err := client.AddTorrentFromFile(ev.TorrentFilePath)
if err != nil {
log.Printf("error adding torrent to client: %s", err)
}
} else if ev.Magnet != "" {
err := client.AddMagnet(ev.Magnet)
if err != nil {
log.Printf("error adding magnet: %s", err)
}
}
case dirwatch.Removed:
err := client.DropTorrent(ev.InfoHash)
if err != nil {
log.Printf("error dropping torrent: %s", err)
}
}
}
}()
resolveTestPeerAddr()
fs := torrentfs.New(client)
go func() {

View File

@ -11,6 +11,7 @@ import (
"io"
"log"
"net"
"sync"
)
func (t *torrent) PieceNumPendingBytes(index pp.Integer) (count pp.Integer) {
@ -35,14 +36,17 @@ type torrentPiece struct {
}
type torrent struct {
closed bool
InfoHash InfoHash
Pieces []*torrentPiece
PiecesByBytesLeft *OrderedList
Data mmap_span.MMapSpan
Info *metainfo.Info
Conns []*connection
Peers []Peer
Priorities *list.List
// Prevent mutations to Data memory maps while in use as they're not safe.
dataLock sync.RWMutex
Info *metainfo.Info
Conns []*connection
Peers []Peer
Priorities *list.List
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
@ -276,8 +280,16 @@ func (t *torrent) Length() int64 {
return int64(t.LastPieceSize()) + int64(len(t.Pieces)-1)*int64(t.UsualPieceSize())
}
func (t *torrent) isClosed() bool {
return t.closed
}
func (t *torrent) Close() (err error) {
t.closed = true
t.dataLock.Lock()
t.Data.Close()
t.Data = nil
t.dataLock.Unlock()
for _, conn := range t.Conns {
conn.Close()
}
@ -373,12 +385,14 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) {
func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) {
hash := pieceHash.New()
t.dataLock.RLock()
n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength)
t.dataLock.RUnlock()
if err != nil {
panic(err)
}
if pp.Integer(n) != t.PieceLength(piece) {
log.Print(t.Info)
// log.Print(t.Info)
panic(fmt.Sprintf("hashed wrong number of bytes: expected %d; did %d; piece %d", t.PieceLength(piece), n, piece))
}
copyHashSum(ps[:], hash.Sum(nil))

122
util/dirwatch/dirwatch.go Normal file
View File

@ -0,0 +1,122 @@
package dirwatch
import (
"bitbucket.org/anacrolix/go.torrent"
"github.com/anacrolix/libtorgo/metainfo"
"github.com/go-fsnotify/fsnotify"
"log"
"os"
"path/filepath"
)
type Change uint
const (
Added Change = iota
Removed
)
type Event struct {
Magnet string
Change
TorrentFilePath string
InfoHash torrent.InfoHash
}
type Instance struct {
w *fsnotify.Watcher
dirName string
Events chan Event
torrentFileInfoHashes map[string]torrent.InfoHash
}
func (me *Instance) handleEvents() {
for e := range me.w.Events {
log.Printf("event: %s", e)
me.processFile(e.Name)
}
}
func (me *Instance) handleErrors() {
for err := range me.w.Errors {
log.Printf("error in torrent directory watcher: %s", err)
}
}
func torrentFileInfoHash(fileName string) (ih torrent.InfoHash, ok bool) {
mi, _ := metainfo.LoadFromFile(fileName)
if mi == nil {
return
}
if 20 != copy(ih[:], mi.Info.Hash) {
panic(mi.Info.Hash)
}
ok = true
return
}
func (me *Instance) processFile(name string) {
name = filepath.Clean(name)
log.Print(name)
if filepath.Ext(name) != ".torrent" {
return
}
ih, ok := me.torrentFileInfoHashes[name]
if ok {
me.Events <- Event{
TorrentFilePath: name,
Change: Removed,
InfoHash: ih,
}
}
delete(me.torrentFileInfoHashes, name)
ih, ok = torrentFileInfoHash(name)
if ok {
me.torrentFileInfoHashes[name] = ih
me.Events <- Event{
TorrentFilePath: name,
Change: Added,
InfoHash: ih,
}
}
}
func (me *Instance) addDir() (err error) {
f, err := os.Open(me.dirName)
if err != nil {
return
}
defer f.Close()
names, err := f.Readdirnames(-1)
if err != nil {
return
}
for _, n := range names {
me.processFile(filepath.Join(me.dirName, n))
}
return
}
func New(dirName string) (i *Instance, err error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return
}
err = w.Add(dirName)
if err != nil {
w.Close()
return
}
i = &Instance{
w: w,
dirName: dirName,
Events: make(chan Event),
torrentFileInfoHashes: make(map[string]torrent.InfoHash, 20),
}
go func() {
i.addDir()
go i.handleEvents()
go i.handleErrors()
}()
return
}