2
0
mirror of synced 2025-02-24 06:38:14 +00:00

Make use of trackers

This commit is contained in:
Matt Joiner 2014-03-17 02:30:10 +11:00
parent e68d5fec1f
commit 31530899e4
3 changed files with 176 additions and 24 deletions

132
client.go
View File

@ -2,6 +2,8 @@ package torrent
import (
"bitbucket.org/anacrolix/go.torrent/peer_protocol"
"bitbucket.org/anacrolix/go.torrent/tracker"
_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
"bufio"
"container/list"
"crypto"
@ -13,6 +15,7 @@ import (
"io"
"launchpad.net/gommap"
"log"
mathRand "math/rand"
"net"
"os"
"path/filepath"
@ -222,6 +225,13 @@ type Torrent struct {
Conns []*Connection
Peers []Peer
Priorities *list.List
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
}
func (t *Torrent) Length() int64 {
return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
}
func (t *Torrent) Close() (err error) {
@ -261,6 +271,7 @@ func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
return slice.Indices
}
// Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
piece := t.Pieces[pieceIndex]
if piece.Hashing {
@ -347,8 +358,7 @@ func (t *Torrent) requestHeat() (ret map[Request]int) {
type Peer struct {
Id [20]byte
IP net.IP
Port int
tracker.Peer
}
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
@ -536,6 +546,15 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
}()
}
func (t *Torrent) haveAllPieces() bool {
for _, piece := range t.Pieces {
if !piece.Complete() {
return false
}
}
return true
}
func (me *Torrent) haveAnyPieces() bool {
for _, piece := range me.Pieces {
if piece.Complete() {
@ -594,7 +613,10 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte
conn.post <- peer_protocol.Bytes(me.PeerId[:])
}
me.mu.Lock()
me.addConnection(torrent, conn)
defer me.mu.Unlock()
if !me.addConnection(torrent, conn) {
return
}
if torrent.haveAnyPieces() {
conn.Post(peer_protocol.Message{
Type: peer_protocol.Bitfield,
@ -606,7 +628,6 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte
err = fmt.Errorf("during Connection loop: %s", err)
}
me.dropConnection(torrent, conn)
me.mu.Unlock()
return
}
@ -714,8 +735,9 @@ func (me *Client) dropConnection(torrent *Torrent, conn *Connection) {
}
func (me *Client) addConnection(t *Torrent, c *Connection) bool {
for _, c := range t.Conns {
if c.PeerId == c.PeerId {
for _, c0 := range t.Conns {
if c.PeerId == c0.PeerId {
log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
return false
}
}
@ -748,22 +770,53 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
return nil
}
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
torrent := &Torrent{
// Prepare a Torrent without any attachment to a Client. That means we can
// initialize fields all fields that don't require the Client without locking
// it.
func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) {
torrent = &Torrent{
InfoHash: BytesInfoHash(metaInfo.InfoHash),
MetaInfo: metaInfo,
}
for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
if len(hash) != PieceHash.Size() {
return errors.New("bad piece hash in metainfo")
err = errors.New("bad piece hash in metainfo")
return
}
piece := &piece{}
copyHashSum(piece.Hash[:], hash)
torrent.Pieces = append(torrent.Pieces, piece)
}
var err error
torrent.Data, err = mmapTorrentData(metaInfo, me.DataDir)
torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
if err != nil {
return
}
torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
for tierIndex := range metaInfo.AnnounceList {
tier := torrent.Trackers[tierIndex]
for _, url := range metaInfo.AnnounceList[tierIndex] {
tr, err := tracker.New(url)
if err != nil {
log.Print(err)
continue
}
tier = append(tier, tr)
}
// The trackers within each tier must be shuffled before use.
// http://stackoverflow.com/a/12267471/149482
// http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
for i := range tier {
j := mathRand.Intn(i + 1)
tier[i], tier[j] = tier[j], tier[i]
}
torrent.Trackers[tierIndex] = tier
}
return
}
func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
torrent, err := newTorrent(metaInfo, me.DataDir)
if err != nil {
return err
}
@ -773,12 +826,66 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
return torrent.Close()
}
me.torrents[torrent.InfoHash] = torrent
go me.announceTorrent(torrent)
go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
// for i := range torrent.Pieces {
// me.queuePieceCheck(torrent, peer_protocol.Integer(i))
// }
return nil
}
func (cl *Client) announceTorrent(t *Torrent) {
req := tracker.AnnounceRequest{
Event: tracker.Started,
NumWant: -1,
}
req.PeerId = cl.PeerId
req.InfoHash = t.InfoHash
newAnnounce:
for {
for _, tier := range t.Trackers {
for trIndex, tr := range tier {
if err := tr.Connect(); err != nil {
log.Print(err)
continue
}
resp, err := tr.Announce(&req)
if err != nil {
log.Print(err)
continue
}
var peers []Peer
for _, peer := range resp.Peers {
peers = append(peers, Peer{
Peer: peer,
})
}
if err := cl.AddPeers(t.InfoHash, peers); err != nil {
log.Print(err)
return
}
log.Printf("%d new peers from %s", len(peers), "TODO")
tier[0], tier[trIndex] = tier[trIndex], tier[0]
time.Sleep(time.Second * time.Duration(resp.Interval))
continue newAnnounce
}
}
time.Sleep(time.Second)
}
}
func (cl *Client) allTorrentsCompleted() bool {
for _, t := range cl.torrents {
if !t.haveAllPieces() {
return false
}
}
return true
}
func (me *Client) WaitAll() {
me.mu.Lock()
for len(me.torrents) != 0 {
for !me.allTorrentsCompleted() {
me.event.Wait()
}
me.mu.Unlock()
@ -900,6 +1007,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
}
}
}
me.event.Broadcast()
}
func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {

View File

@ -2,6 +2,7 @@ package main
import (
"bitbucket.org/anacrolix/go.torrent"
"bitbucket.org/anacrolix/go.torrent/tracker"
"flag"
"fmt"
metainfo "github.com/nsf/libtorgo/torrent"
@ -29,6 +30,7 @@ func main() {
}
client := torrent.Client{
DataDir: *downloadDir,
// HalfOpenLimit: 2,
}
client.Start()
defer client.Stop()
@ -54,9 +56,10 @@ func main() {
log.Fatal(err)
}
return []torrent.Peer{{
Peer: tracker.Peer{
IP: addr.IP,
Port: addr.Port,
}}
}}}
}())
if err != nil {
log.Fatal(err)

View File

@ -5,13 +5,15 @@ import (
"bytes"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"io"
"net"
"sync"
"syscall"
"testing"
)
// Ensure net.IPs are stored big-endian, to match the way they're read from
// the wire.
func TestNetIPv4Bytes(t *testing.T) {
ip := net.IP([]byte{127, 0, 0, 1})
if ip.String() != "127.0.0.1" {
@ -95,16 +97,55 @@ func TestUDPTracker(t *testing.T) {
Event: tracker.Started,
}
rand.Read(req.PeerId[:])
n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb"))
if err != nil {
t.Fatal(err)
}
if n != len(req.InfoHash) {
panic("nope")
}
copy(req.InfoHash[:], []uint8{0xa3, 0x56, 0x41, 0x43, 0x74, 0x23, 0xe6, 0x26, 0xd9, 0x38, 0x25, 0x4a, 0x6b, 0x80, 0x49, 0x10, 0xa6, 0x67, 0xa, 0xc1})
// TODO: Find out what torrent this info hash corresponds to.
// n, err := hex.Decode(req.InfoHash[:], []byte("c833bb2b5e7bcb9c07f4c020b4be430c28ba7cdb"))
// if err != nil {
// t.Fatal(err)
// }
// if n != len(req.InfoHash) {
// panic("nope")
// }
resp, err := tr.Announce(&req)
if err != nil {
t.Fatal(err)
}
t.Log(resp)
}
func TestAnnounceRandomInfoHash(t *testing.T) {
wg := sync.WaitGroup{}
for _, url := range []string{
"udp://tracker.openbittorrent.com:80/announce",
"udp://tracker.publicbt.com:80",
"udp://tracker.istole.it:6969",
"udp://tracker.ccc.de:80",
"udp://tracker.open.demonii.com:1337",
} {
go func(url string) {
defer wg.Done()
tr, err := tracker.New(url)
if err != nil {
t.Fatal(err)
}
if err := tr.Connect(); err != nil {
t.Log(err)
return
}
req := tracker.AnnounceRequest{
Event: tracker.Stopped,
}
rand.Read(req.PeerId[:])
rand.Read(req.InfoHash[:])
resp, err := tr.Announce(&req)
if err != nil {
t.Fatal(err)
}
if resp.Leechers != 0 || resp.Seeders != 0 || len(resp.Peers) != 0 {
t.Fatal(resp)
}
}(url)
wg.Add(1)
}
wg.Wait()
}