From b8b403f83eb019ffccaa51675d3ff5a44be23e07 Mon Sep 17 00:00:00 2001 From: Alexander Baranov Date: Mon, 11 May 2015 17:50:59 +0300 Subject: [PATCH 1/6] Basic pick-file functionality --- client.go | 83 ++++++++++++++++++++++ cmd/picker/picker.go | 160 +++++++++++++++++++++++++++++++++++++++++++ torrent.go | 9 +++ 3 files changed, 252 insertions(+) create mode 100644 cmd/picker/picker.go diff --git a/client.go b/client.go index f5ecac7c..9055955f 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,7 @@ import ( "strings" "syscall" "time" + "math" "github.com/anacrolix/sync" "github.com/anacrolix/utp" @@ -2016,6 +2017,62 @@ func (t Torrent) DownloadAll() { t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead) } +// Marks the entire torrent for download. Requires the info first, see +// GotInfo. +func (t Torrent) DownloadFile(Path []string) { + t.cl.mu.Lock() + defer t.cl.mu.Unlock() + + // log.Printf("File to Download: %s", Path) + + // log.Printf("Pieces: %s", t.torrent.Info.NumPieces()) + // log.Printf("Length: %s", t.torrent.Info.TotalLength()) + // log.Printf("Torrent info: %s", t.torrent.Info.UpvertedFiles()) + + var offset int64 + var pickedFile metainfo.FileInfo + + found := false + pathStr := strings.Join(Path, "") + + for _, file := range t.torrent.Info.UpvertedFiles() { + if strings.Join(file.Path, "/") == pathStr { + log.Printf("Found file: %s", file) + + found = true + pickedFile = file + break + } + // log.Printf("%d %d `%s` `%s`", len(file.Path), len(Path), strings.Join(file.Path, "/"), strings.Join(Path, "")) + log.Printf("File: %s", strings.Join(file.Path, "/")) + offset += file.Length; + } + + if !found { + panic(fmt.Sprintf("File not found")) + } + + log.Printf("Donwloading file: `%s`", Path) + log.Printf("Calculated offset: %s", offset) + log.Printf("File length: %s", pickedFile.Length) + log.Printf("Piece length: %s", t.torrent.Info.PieceLength) + + + firstChunk := int(offset/t.torrent.Info.PieceLength) + nChunks := int(math.Ceil(float64(pickedFile.Length) / float64(t.torrent.Info.PieceLength))) + + log.Printf("First chunk: %s", offset/t.torrent.Info.PieceLength) + log.Printf("Number of chunks: %s", nChunks) + log.Printf("Total chunks: %s", t.torrent.Info.NumPieces()) + + + for chunk := firstChunk; chunk < firstChunk + nChunks; chunk++ { + log.Printf("Piece #%d: %s %s", chunk, t.torrent.Info.Piece(chunk).Length(), t.torrent.Info.Piece(chunk).Offset()) + t.cl.raisePiecePriority(t.torrent, chunk, piecePriorityNormal) + } +} + + // Returns nil metainfo if it isn't in the cache. Checks that the retrieved // metainfo has the correct infohash. func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { @@ -2400,6 +2457,18 @@ func (cl *Client) allTorrentsCompleted() bool { return true } +func (cl *Client) allNeededTorrentsCompleted() bool { + for _, t := range cl.torrents { + if !t.haveInfo() { + return false + } + if ! t.neededPiecesDownloaded() { + return false + } + } + return true +} + // Returns true when all torrents are completely downloaded and false if the // client is stopped before that. func (me *Client) WaitAll() bool { @@ -2414,6 +2483,20 @@ func (me *Client) WaitAll() bool { return true } +// Returns true when all requested chunks are completely downloaded and false if the +// client is stopped before that. +func (me *Client) WaitNeeded() bool { + me.mu.Lock() + defer me.mu.Unlock() + for !me.allNeededTorrentsCompleted() { + if me.stopped() { + return false + } + me.event.Wait() + } + return true +} + func (me *Client) fillRequests(t *torrent, c *connection) { if c.Interested { if c.PeerChoked { diff --git a/cmd/picker/picker.go b/cmd/picker/picker.go new file mode 100644 index 00000000..e882b62b --- /dev/null +++ b/cmd/picker/picker.go @@ -0,0 +1,160 @@ +// Downloads torrents from the command-line. +package main + +import ( + "fmt" + "log" + "net" + "net/http" + _ "net/http/pprof" + "os" + "strings" + "time" + + _ "github.com/anacrolix/envpprof" + "github.com/dustin/go-humanize" + "github.com/jessevdk/go-flags" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/metainfo" +) + +// fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0]) + +func resolvedPeerAddrs(ss []string) (ret []torrent.Peer, err error) { + for _, s := range ss { + var addr *net.TCPAddr + addr, err = net.ResolveTCPAddr("tcp", s) + if err != nil { + return + } + ret = append(ret, torrent.Peer{ + IP: addr.IP, + Port: addr.Port, + }) + } + return +} + +func bytesCompleted(tc *torrent.Client) (ret int64) { + for _, t := range tc.Torrents() { + if t.Info != nil { + ret += t.BytesCompleted() + } + } + return +} + +// Returns an estimate of the total bytes for all torrents. +func totalBytesEstimate(tc *torrent.Client) (ret int64) { + var noInfo, hadInfo int64 + for _, t := range tc.Torrents() { + info := t.Info() + if info == nil { + noInfo++ + continue + } + ret += info.TotalLength() + hadInfo++ + } + if hadInfo != 0 { + // Treat each torrent without info as the average of those with, + // rounded up. + ret += (noInfo*ret + hadInfo - 1) / hadInfo + } + return +} + +func progressLine(tc *torrent.Client) string { + return fmt.Sprintf("\033[K%s / %s\r", humanize.Bytes(uint64(bytesCompleted(tc))), humanize.Bytes(uint64(totalBytesEstimate(tc)))) +} + +func main() { + log.SetFlags(log.LstdFlags | log.Lshortfile) + var rootGroup struct { + Client torrent.Config `group:"Client Options"` + Seed bool `long:"seed" description:"continue seeding torrents after completed"` + TestPeers []string `long:"test-peer" description:"address of peer to inject to every torrent"` + Pick []string `long:"pick" description:"filename to pick"` + } + // Don't pass flags.PrintError because it's inconsistent with printing. + // https://github.com/jessevdk/go-flags/issues/132 + parser := flags.NewParser(&rootGroup, flags.HelpFlag|flags.PassDoubleDash) + parser.Usage = "[OPTIONS] (magnet URI or .torrent file path)..." + posArgs, err := parser.Parse() + if err != nil { + fmt.Fprintln(os.Stderr, "Download from the BitTorrent network.\n") + fmt.Println(err) + os.Exit(2) + } + log.Printf("File to pick: %s", rootGroup.Pick) + + testPeers, err := resolvedPeerAddrs(rootGroup.TestPeers) + if err != nil { + log.Fatal(err) + } + + if len(posArgs) == 0 { + fmt.Fprintln(os.Stderr, "no torrents specified") + return + } + client, err := torrent.NewClient(&rootGroup.Client) + if err != nil { + log.Fatalf("error creating client: %s", err) + } + http.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + client.WriteStatus(w) + }) + defer client.Close() + for _, arg := range posArgs { + t := func() torrent.Torrent { + if strings.HasPrefix(arg, "magnet:") { + t, err := client.AddMagnet(arg) + if err != nil { + log.Fatalf("error adding magnet: %s", err) + } + return t + } else { + metaInfo, err := metainfo.LoadFromFile(arg) + if err != nil { + log.Fatal(err) + } + t, err := client.AddTorrent(metaInfo) + if err != nil { + log.Fatal(err) + } + return t + } + }() + err := t.AddPeers(testPeers) + if err != nil { + log.Fatal(err) + } + go func() { + <-t.GotInfo() + t.DownloadFile(rootGroup.Pick) + }() + } + done := make(chan struct{}) + go func() { + defer close(done) + if client.WaitNeeded() { + log.Print("downloaded ALL the torrents") + } else { + log.Fatal("y u no complete torrents?!") + } + }() + ticker := time.NewTicker(time.Second) +waitDone: + for { + select { + case <-done: + break waitDone + case <-ticker.C: + os.Stdout.WriteString(progressLine(client)) + } + } + if rootGroup.Seed { + select {} + } +} diff --git a/torrent.go b/torrent.go index fd213d5b..d025185b 100644 --- a/torrent.go +++ b/torrent.go @@ -515,6 +515,15 @@ func (t *torrent) numPieces() int { return t.Info.NumPieces() } +func (t *torrent) neededPiecesDownloaded() bool { + for i := range iter.N(t.Info.NumPieces()) { + if t.Pieces[i].Priority != piecePriorityNone && !t.pieceComplete(i) { + return false + } + } + return true +} + func (t *torrent) numPiecesCompleted() (num int) { for i := range iter.N(t.Info.NumPieces()) { if t.pieceComplete(i) { From 1bb31b979da7461cc2835a98934384d2f6b3dd2a Mon Sep 17 00:00:00 2001 From: Alexander Baranov Date: Thu, 28 May 2015 23:55:15 +0300 Subject: [PATCH 2/6] Rename picker tool --- cmd/{picker/picker.go => torrent-pick/main.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cmd/{picker/picker.go => torrent-pick/main.go} (100%) diff --git a/cmd/picker/picker.go b/cmd/torrent-pick/main.go similarity index 100% rename from cmd/picker/picker.go rename to cmd/torrent-pick/main.go From 577b129df540730d6e21c2f77469c3549557abfa Mon Sep 17 00:00:00 2001 From: Alexander Baranov Date: Mon, 1 Jun 2015 22:56:10 +0300 Subject: [PATCH 3/6] Use higher-level API to get file --- client.go | 55 ---------------------------------------- cmd/torrent-pick/main.go | 46 +++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 57 deletions(-) diff --git a/client.go b/client.go index 9055955f..62bfb8ee 100644 --- a/client.go +++ b/client.go @@ -23,7 +23,6 @@ import ( "strings" "syscall" "time" - "math" "github.com/anacrolix/sync" "github.com/anacrolix/utp" @@ -2017,60 +2016,6 @@ func (t Torrent) DownloadAll() { t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead) } -// Marks the entire torrent for download. Requires the info first, see -// GotInfo. -func (t Torrent) DownloadFile(Path []string) { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() - - // log.Printf("File to Download: %s", Path) - - // log.Printf("Pieces: %s", t.torrent.Info.NumPieces()) - // log.Printf("Length: %s", t.torrent.Info.TotalLength()) - // log.Printf("Torrent info: %s", t.torrent.Info.UpvertedFiles()) - - var offset int64 - var pickedFile metainfo.FileInfo - - found := false - pathStr := strings.Join(Path, "") - - for _, file := range t.torrent.Info.UpvertedFiles() { - if strings.Join(file.Path, "/") == pathStr { - log.Printf("Found file: %s", file) - - found = true - pickedFile = file - break - } - // log.Printf("%d %d `%s` `%s`", len(file.Path), len(Path), strings.Join(file.Path, "/"), strings.Join(Path, "")) - log.Printf("File: %s", strings.Join(file.Path, "/")) - offset += file.Length; - } - - if !found { - panic(fmt.Sprintf("File not found")) - } - - log.Printf("Donwloading file: `%s`", Path) - log.Printf("Calculated offset: %s", offset) - log.Printf("File length: %s", pickedFile.Length) - log.Printf("Piece length: %s", t.torrent.Info.PieceLength) - - - firstChunk := int(offset/t.torrent.Info.PieceLength) - nChunks := int(math.Ceil(float64(pickedFile.Length) / float64(t.torrent.Info.PieceLength))) - - log.Printf("First chunk: %s", offset/t.torrent.Info.PieceLength) - log.Printf("Number of chunks: %s", nChunks) - log.Printf("Total chunks: %s", t.torrent.Info.NumPieces()) - - - for chunk := firstChunk; chunk < firstChunk + nChunks; chunk++ { - log.Printf("Piece #%d: %s %s", chunk, t.torrent.Info.Piece(chunk).Length(), t.torrent.Info.Piece(chunk).Offset()) - t.cl.raisePiecePriority(t.torrent, chunk, piecePriorityNormal) - } -} // Returns nil metainfo if it isn't in the cache. Checks that the retrieved diff --git a/cmd/torrent-pick/main.go b/cmd/torrent-pick/main.go index e882b62b..88e3c32b 100644 --- a/cmd/torrent-pick/main.go +++ b/cmd/torrent-pick/main.go @@ -8,8 +8,12 @@ import ( "net/http" _ "net/http/pprof" "os" + "io" + "io/ioutil" "strings" "time" + "bufio" + _ "github.com/anacrolix/envpprof" "github.com/dustin/go-humanize" @@ -69,13 +73,18 @@ func progressLine(tc *torrent.Client) string { return fmt.Sprintf("\033[K%s / %s\r", humanize.Bytes(uint64(bytesCompleted(tc))), humanize.Bytes(uint64(totalBytesEstimate(tc)))) } +func dstFileName(picked string) string { + parts := strings.Split(picked, "/") + return parts[len(parts)-1] +} + func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) var rootGroup struct { Client torrent.Config `group:"Client Options"` Seed bool `long:"seed" description:"continue seeding torrents after completed"` TestPeers []string `long:"test-peer" description:"address of peer to inject to every torrent"` - Pick []string `long:"pick" description:"filename to pick"` + Pick string `long:"pick" description:"filename to pick"` } // Don't pass flags.PrintError because it's inconsistent with printing. // https://github.com/jessevdk/go-flags/issues/132 @@ -98,6 +107,16 @@ func main() { fmt.Fprintln(os.Stderr, "no torrents specified") return } + + tmpdir, err := ioutil.TempDir("", "torrent-pick-") + if err != nil { + log.Fatal(err) + } + + defer os.RemoveAll(tmpdir) + + rootGroup.Client.DataDir = tmpdir + client, err := torrent.NewClient(&rootGroup.Client) if err != nil { log.Fatalf("error creating client: %s", err) @@ -106,6 +125,17 @@ func main() { client.WriteStatus(w) }) defer client.Close() + + + dstName := dstFileName(rootGroup.Pick) + + f, err := os.Create(dstName) + if err != nil { + log.Fatal(err) + } + dstWriter := bufio.NewWriter(f) + + for _, arg := range posArgs { t := func() torrent.Torrent { if strings.HasPrefix(arg, "magnet:") { @@ -130,11 +160,23 @@ func main() { if err != nil { log.Fatal(err) } + go func() { <-t.GotInfo() - t.DownloadFile(rootGroup.Pick) + files := t.Files() + for _, file := range files { + if file.Path() == rootGroup.Pick { + + log.Printf("Downloading file: %s", file.Path()) + + srcReader := io.NewSectionReader(t.NewReader(), file.Offset(), file.Length()) + io.Copy(dstWriter, srcReader) + break + } + } }() } + done := make(chan struct{}) go func() { defer close(done) From 0b529da1d940acfdb09db0fc23844098cc6763cf Mon Sep 17 00:00:00 2001 From: Alexander Baranov Date: Mon, 1 Jun 2015 22:58:21 +0300 Subject: [PATCH 4/6] Remove obsolete spaces --- client.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/client.go b/client.go index 62bfb8ee..e43c9f19 100644 --- a/client.go +++ b/client.go @@ -2016,8 +2016,6 @@ func (t Torrent) DownloadAll() { t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, piecePriorityReadahead) } - - // Returns nil metainfo if it isn't in the cache. Checks that the retrieved // metainfo has the correct infohash. func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { From 05d597a9f739c2f7b064e35032c3878a2560ed0f Mon Sep 17 00:00:00 2001 From: Alexander Baranov Date: Mon, 1 Jun 2015 23:11:45 +0300 Subject: [PATCH 5/6] Only wait for io.Copy; Remove obsolete from client --- client.go | 24 ------------------------ cmd/torrent-pick/main.go | 13 +++---------- torrent.go | 8 -------- 3 files changed, 3 insertions(+), 42 deletions(-) diff --git a/client.go b/client.go index e43c9f19..f4e6bbb0 100644 --- a/client.go +++ b/client.go @@ -2400,17 +2400,6 @@ func (cl *Client) allTorrentsCompleted() bool { return true } -func (cl *Client) allNeededTorrentsCompleted() bool { - for _, t := range cl.torrents { - if !t.haveInfo() { - return false - } - if ! t.neededPiecesDownloaded() { - return false - } - } - return true -} // Returns true when all torrents are completely downloaded and false if the // client is stopped before that. @@ -2426,19 +2415,6 @@ func (me *Client) WaitAll() bool { return true } -// Returns true when all requested chunks are completely downloaded and false if the -// client is stopped before that. -func (me *Client) WaitNeeded() bool { - me.mu.Lock() - defer me.mu.Unlock() - for !me.allNeededTorrentsCompleted() { - if me.stopped() { - return false - } - me.event.Wait() - } - return true -} func (me *Client) fillRequests(t *torrent, c *connection) { if c.Interested { diff --git a/cmd/torrent-pick/main.go b/cmd/torrent-pick/main.go index 88e3c32b..3b64152a 100644 --- a/cmd/torrent-pick/main.go +++ b/cmd/torrent-pick/main.go @@ -135,7 +135,7 @@ func main() { } dstWriter := bufio.NewWriter(f) - + done := make(chan struct{}) for _, arg := range posArgs { t := func() torrent.Torrent { if strings.HasPrefix(arg, "magnet:") { @@ -171,21 +171,14 @@ func main() { srcReader := io.NewSectionReader(t.NewReader(), file.Offset(), file.Length()) io.Copy(dstWriter, srcReader) + close(done) break } } }() } - done := make(chan struct{}) - go func() { - defer close(done) - if client.WaitNeeded() { - log.Print("downloaded ALL the torrents") - } else { - log.Fatal("y u no complete torrents?!") - } - }() + ticker := time.NewTicker(time.Second) waitDone: for { diff --git a/torrent.go b/torrent.go index d025185b..74e49831 100644 --- a/torrent.go +++ b/torrent.go @@ -515,14 +515,6 @@ func (t *torrent) numPieces() int { return t.Info.NumPieces() } -func (t *torrent) neededPiecesDownloaded() bool { - for i := range iter.N(t.Info.NumPieces()) { - if t.Pieces[i].Priority != piecePriorityNone && !t.pieceComplete(i) { - return false - } - } - return true -} func (t *torrent) numPiecesCompleted() (num int) { for i := range iter.N(t.Info.NumPieces()) { From 2982ad07ca57bd31f18febbb4a57b15d6d2f3e1f Mon Sep 17 00:00:00 2001 From: Alexander Baranov Date: Mon, 1 Jun 2015 23:12:27 +0300 Subject: [PATCH 6/6] remove obsolete spaces --- client.go | 2 -- torrent.go | 1 - 2 files changed, 3 deletions(-) diff --git a/client.go b/client.go index f4e6bbb0..f5ecac7c 100644 --- a/client.go +++ b/client.go @@ -2400,7 +2400,6 @@ func (cl *Client) allTorrentsCompleted() bool { return true } - // Returns true when all torrents are completely downloaded and false if the // client is stopped before that. func (me *Client) WaitAll() bool { @@ -2415,7 +2414,6 @@ func (me *Client) WaitAll() bool { return true } - func (me *Client) fillRequests(t *torrent, c *connection) { if c.Interested { if c.PeerChoked { diff --git a/torrent.go b/torrent.go index 74e49831..fd213d5b 100644 --- a/torrent.go +++ b/torrent.go @@ -515,7 +515,6 @@ func (t *torrent) numPieces() int { return t.Info.NumPieces() } - func (t *torrent) numPiecesCompleted() (num int) { for i := range iter.N(t.Info.NumPieces()) { if t.pieceComplete(i) {