From 1aa106386398eb37a8e26f4eb2d08aa4d30b9849 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 9 Apr 2014 02:36:05 +1000 Subject: [PATCH] Big visibility/doc clean-up, and extract mmap_span package --- client.go | 147 +++++++++++++++---------- client_test.go | 4 +- cmd/torrent-verify/main.go | 6 +- cmd/torrent/main.go | 7 +- cmd/torrentfs/main.go | 9 -- connection.go | 20 ++-- fs/torrentfs.go | 8 +- misc.go | 20 ++-- mmap_span.go => mmap_span/mmap_span.go | 20 ++-- span.go => mmap_span/span.go | 2 +- torrent.go | 49 +++++---- torrent_test.go | 2 +- 12 files changed, 159 insertions(+), 135 deletions(-) rename mmap_span.go => mmap_span/mmap_span.go (80%) rename span.go => mmap_span/span.go (94%) diff --git a/client.go b/client.go index 6858f953..76425ccc 100644 --- a/client.go +++ b/client.go @@ -1,3 +1,18 @@ +/* +Package torrent implements a torrent client. + +Simple example: + + c := &Client{} + c.Start() + defer c.Stop() + if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil { + return fmt.Errors("error adding torrent: %s", err) + } + c.WaitAll() + log.Print("erhmahgerd, torrent downloaded") + +*/ package torrent import ( @@ -24,7 +39,7 @@ import ( ) // Currently doesn't really queue, but should in the future. -func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) { +func (cl *Client) queuePieceCheck(t *torrent, pieceIndex peer_protocol.Integer) { piece := t.Pieces[pieceIndex] if piece.QueuedForHash { return @@ -33,10 +48,13 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) go cl.verifyPiece(t, pieceIndex) } -func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error { - cl.mu.Lock() - defer cl.mu.Unlock() - t := cl.torrent(ih) +// Queues the torrent data for the given region for download. The beginning of +// the region is given highest priority to allow a subsequent read at the same +// offset to return data ASAP. +func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error { + me.mu.Lock() + defer me.mu.Unlock() + t := me.torrent(ih) if t == nil { return errors.New("no such active torrent") } @@ -68,12 +86,12 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error { t.Priorities.PushBack(req) } for _, cn := range t.Conns { - cl.replenishConnRequests(t, cn) + me.replenishConnRequests(t, cn) } return nil } -type DataSpec struct { +type dataSpec struct { InfoHash Request } @@ -85,16 +103,17 @@ type Client struct { Listener net.Listener DisableTrackers bool - sync.Mutex - mu *sync.Mutex + mu sync.Mutex event sync.Cond quit chan struct{} halfOpen int - torrents map[InfoHash]*Torrent + torrents map[InfoHash]*torrent dataWaiter chan struct{} } +// Read torrent data at the given offset. Returns ErrDataNotReady if the data +// isn't available. func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) { cl.mu.Lock() defer cl.mu.Unlock() @@ -144,10 +163,10 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er return t.Data.ReadAt(p, off) } +// Starts the client. Defaults are applied. The client will begin accepting connections and tracking. func (c *Client) Start() { - c.mu = &c.Mutex - c.event.L = c.mu - c.torrents = make(map[InfoHash]*Torrent) + c.event.L = &c.mu + c.torrents = make(map[InfoHash]*torrent) if c.HalfOpenLimit == 0 { c.HalfOpenLimit = 10 } @@ -171,8 +190,10 @@ func (cl *Client) stopped() bool { } } +// Stops the client. All connections to peers are closed and all activity will +// come to a halt. func (me *Client) Stop() { - me.Lock() + me.mu.Lock() close(me.quit) me.event.Broadcast() for _, t := range me.torrents { @@ -180,7 +201,7 @@ func (me *Client) Stop() { c.Close() } } - me.Unlock() + me.mu.Unlock() } func (cl *Client) acceptConnections() { @@ -203,7 +224,7 @@ func (cl *Client) acceptConnections() { } } -func (me *Client) torrent(ih InfoHash) *Torrent { +func (me *Client) torrent(ih InfoHash) *torrent { for _, t := range me.torrents { if t.InfoHash == ih { return t @@ -212,7 +233,7 @@ func (me *Client) torrent(ih InfoHash) *Torrent { return nil } -func (me *Client) initiateConn(peer Peer, torrent *Torrent) { +func (me *Client) initiateConn(peer Peer, torrent *torrent) { if peer.Id == me.PeerId { return } @@ -255,8 +276,8 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) { }() } -func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) { - conn := &Connection{ +func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) { + conn := &connection{ Socket: sock, Choked: true, PeerChoked: true, @@ -331,7 +352,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) { return } -func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) { +func (me *Client) peerGotPiece(torrent *torrent, conn *connection, piece int) { if conn.PeerPieces == nil { conn.PeerPieces = make([]bool, len(torrent.Pieces)) } @@ -341,11 +362,11 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) { } } -func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) { +func (me *Client) peerUnchoked(torrent *torrent, conn *connection) { me.replenishConnRequests(torrent, conn) } -func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { +func (me *Client) connectionLoop(torrent *torrent, conn *connection) error { decoder := peer_protocol.Decoder{ R: bufio.NewReader(conn.Socket), MaxLength: 256 * 1024, @@ -386,7 +407,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { } request := Request{ Index: msg.Index, - ChunkSpec: ChunkSpec{msg.Begin, msg.Length}, + chunkSpec: chunkSpec{msg.Begin, msg.Length}, } conn.PeerRequests[request] = struct{}{} // TODO: Requests should be satisfied from a dedicated upload routine. @@ -420,7 +441,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { } } case peer_protocol.Piece: - request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} + request_ := Request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} if _, ok := conn.Requests[request_]; !ok { err = fmt.Errorf("unexpected piece: %s", request_) break @@ -437,7 +458,7 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error { } } -func (me *Client) dropConnection(torrent *Torrent, conn *Connection) { +func (me *Client) dropConnection(torrent *torrent, conn *connection) { conn.Socket.Close() for i0, c := range torrent.Conns { if c != conn { @@ -453,7 +474,7 @@ func (me *Client) dropConnection(torrent *Torrent, conn *Connection) { panic("no such Connection") } -func (me *Client) addConnection(t *Torrent, c *Connection) bool { +func (me *Client) addConnection(t *torrent, c *connection) bool { for _, c0 := range t.Conns { if c.PeerId == c0.PeerId { log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId) @@ -477,6 +498,7 @@ func (me *Client) openNewConns() { } } +// Adds peers to the swarm for the torrent corresponding to infoHash. func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { me.mu.Lock() t := me.torrent(infoHash) @@ -492,29 +514,29 @@ func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error { // Prepare a Torrent without any attachment to a Client. That means we can // initialize fields all fields that don't require the Client without locking // it. -func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) { - torrent = &Torrent{ +func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (t *torrent, err error) { + t = &torrent{ InfoHash: BytesInfoHash(metaInfo.InfoHash), MetaInfo: metaInfo, } - torrent.Data, err = mmapTorrentData(metaInfo, dataDir) + t.Data, err = mmapTorrentData(metaInfo, dataDir) if err != nil { return } - for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() { - hash := metaInfo.Pieces[offset : offset+PieceHash.Size()] - if len(hash) != PieceHash.Size() { + for offset := 0; offset < len(metaInfo.Pieces); offset += pieceHash.Size() { + hash := metaInfo.Pieces[offset : offset+pieceHash.Size()] + if len(hash) != pieceHash.Size() { err = errors.New("bad piece hash in metainfo") return } piece := &piece{} copyHashSum(piece.Hash[:], hash) - torrent.Pieces = append(torrent.Pieces, piece) - torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1)) + t.Pieces = append(t.Pieces, piece) + t.pendAllChunkSpecs(peer_protocol.Integer(len(t.Pieces) - 1)) } - torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList)) + t.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList)) for tierIndex := range metaInfo.AnnounceList { - tier := torrent.Trackers[tierIndex] + tier := t.Trackers[tierIndex] for _, url := range metaInfo.AnnounceList[tierIndex] { tr, err := tracker.New(url) if err != nil { @@ -530,11 +552,12 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, j := mathRand.Intn(i + 1) tier[i], tier[j] = tier[j], tier[i] } - torrent.Trackers[tierIndex] = tier + t.Trackers[tierIndex] = tier } return } +// Adds the torrent to the client. func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { torrent, err := newTorrent(metaInfo, me.DataDir) if err != nil { @@ -572,7 +595,7 @@ func (cl *Client) listenerAnnouncePort() (port int16) { return } -func (cl *Client) announceTorrent(t *Torrent) { +func (cl *Client) announceTorrent(t *torrent) { req := tracker.AnnounceRequest{ Event: tracker.Started, NumWant: -1, @@ -623,15 +646,21 @@ func (cl *Client) allTorrentsCompleted() bool { return true } -func (me *Client) WaitAll() { +// Returns true when all torrents are completely downloaded and false if the +// client is stopped. +func (me *Client) WaitAll() bool { me.mu.Lock() + defer me.mu.Unlock() for !me.allTorrentsCompleted() { + if me.stopped() { + return false + } me.event.Wait() } - me.mu.Unlock() + return true } -func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) { +func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) { requestHeatMap := torrent.requestHeat() addRequest := func(req Request) (again bool) { piece := torrent.Pieces[req.Index] @@ -673,9 +702,9 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) { } } -func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) { - request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} - if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok { +func (me *Client) downloadedChunk(torrent *torrent, msg *peer_protocol.Message) (err error) { + request := Request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}} + if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.chunkSpec]; !ok { log.Printf("got unnecessary chunk: %s", request) return } @@ -683,7 +712,7 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) if err != nil { return } - delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec) + delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.chunkSpec) if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 { me.queuePieceCheck(torrent, request.Index) } @@ -694,27 +723,29 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) torrent.Priorities.Remove(e) } } - me.dataReady(DataSpec{torrent.InfoHash, request}) + me.dataReady(dataSpec{torrent.InfoHash, request}) return } -func (cl *Client) dataReady(ds DataSpec) { +func (cl *Client) dataReady(ds dataSpec) { if cl.dataWaiter != nil { close(cl.dataWaiter) } cl.dataWaiter = nil } -func (cl *Client) DataWaiter() <-chan struct{} { - cl.Lock() - defer cl.Unlock() - if cl.dataWaiter == nil { - cl.dataWaiter = make(chan struct{}) +// Returns a channel that is closed when new data has become available in the +// client. +func (me *Client) DataWaiter() <-chan struct{} { + me.mu.Lock() + defer me.mu.Unlock() + if me.dataWaiter == nil { + me.dataWaiter = make(chan struct{}) } - return cl.dataWaiter + return me.dataWaiter } -func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) { +func (me *Client) pieceHashed(t *torrent, piece peer_protocol.Integer, correct bool) { p := t.Pieces[piece] p.EverHashed = true if correct { @@ -729,11 +760,11 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b } } } - me.dataReady(DataSpec{ + me.dataReady(dataSpec{ t.InfoHash, Request{ peer_protocol.Integer(piece), - ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))}, + chunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))}, }, }) } else { @@ -757,7 +788,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b me.event.Broadcast() } -func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) { +func (cl *Client) verifyPiece(t *torrent, index peer_protocol.Integer) { cl.mu.Lock() p := t.Pieces[index] for p.Hashing { @@ -773,7 +804,7 @@ func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) { cl.mu.Unlock() } -func (me *Client) Torrents() (ret []*Torrent) { +func (me *Client) Torrents() (ret []*torrent) { me.mu.Lock() for _, t := range me.torrents { ret = append(ret, t) diff --git a/client_test.go b/client_test.go index e2b9cc33..89e94b10 100644 --- a/client_test.go +++ b/client_test.go @@ -21,7 +21,7 @@ func TestAddPeersToUnknownTorrent(t *testing.T) { } func TestPieceHashSize(t *testing.T) { - if PieceHash.Size() != 20 { + if pieceHash.Size() != 20 { t.FailNow() } } @@ -40,7 +40,7 @@ func TestTorrentInitialState(t *testing.T) { if len(p.PendingChunkSpecs) != 1 { t.Fatalf("should only be 1 chunk: %s", p.PendingChunkSpecs) } - if _, ok := p.PendingChunkSpecs[ChunkSpec{ + if _, ok := p.PendingChunkSpecs[chunkSpec{ Length: 13, }]; !ok { t.Fatal("pending chunk spec is incorrect") diff --git a/cmd/torrent-verify/main.go b/cmd/torrent-verify/main.go index a107a5da..8cfc707f 100644 --- a/cmd/torrent-verify/main.go +++ b/cmd/torrent-verify/main.go @@ -1,12 +1,12 @@ package main import ( + "bitbucket.org/anacrolix/go.torrent/mmap_span" "bytes" "crypto/sha1" "flag" "fmt" - "bitbucket.org/anacrolix/go.torrent" // "github.com/davecheney/profile" "log" "os" @@ -36,7 +36,7 @@ func main() { log.Print(err) } defer devZero.Close() - var mMapSpan torrent.MMapSpan + var mMapSpan mmap_span.MMapSpan for _, file := range metaInfo.Files { filename := filepath.Join(append([]string{*dirPath, metaInfo.Name}, file.Path...)...) osFile, err := os.Open(filename) @@ -56,7 +56,7 @@ func main() { log.Printf("file mmap has wrong size: %#v", filename) } osFile.Close() - mMapSpan = append(mMapSpan, torrent.MMap{goMMap}) + mMapSpan = append(mMapSpan, goMMap) } log.Println(len(metaInfo.Files)) log.Println(mMapSpan.Size()) diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go index f78bfe07..93b02407 100644 --- a/cmd/torrent/main.go +++ b/cmd/torrent/main.go @@ -76,6 +76,9 @@ func main() { log.Fatal(err) } } - client.WaitAll() - log.Print("all torrents completed!") + if client.WaitAll() { + log.Print("all torrents completed!") + } else { + log.Fatal("y u no complete torrents?!") + } } diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index e4bcea2a..5233b22b 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -96,16 +96,7 @@ func addTorrentDir(c *torrent.Client, _path string) { } func addTestPeer(client *torrent.Client) { -torrents: for _, t := range client.Torrents() { - client.Lock() - for _, c := range t.Conns { - if c.Socket.RemoteAddr().String() == testPeerAddr.String() { - client.Unlock() - continue torrents - } - } - client.Unlock() if testPeerAddr != nil { if err := client.AddPeers(t.InfoHash, []torrent.Peer{{ IP: testPeerAddr.IP, diff --git a/connection.go b/connection.go index cbe0e91c..ee54f10c 100644 --- a/connection.go +++ b/connection.go @@ -12,7 +12,7 @@ import ( ) // Maintains the state of a connection with a peer. -type Connection struct { +type connection struct { Socket net.Conn closed bool mu sync.Mutex // Only for closing. @@ -33,7 +33,7 @@ type Connection struct { PeerPieces []bool } -func (c *Connection) Close() { +func (c *connection) Close() { c.mu.Lock() if c.closed { return @@ -44,25 +44,25 @@ func (c *Connection) Close() { c.mu.Unlock() } -func (c *Connection) getClosed() bool { +func (c *connection) getClosed() bool { c.mu.Lock() defer c.mu.Unlock() return c.closed } -func (c *Connection) PeerHasPiece(index peer_protocol.Integer) bool { +func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool { if c.PeerPieces == nil { return false } return c.PeerPieces[index] } -func (c *Connection) Post(msg encoding.BinaryMarshaler) { +func (c *connection) Post(msg encoding.BinaryMarshaler) { c.post <- msg } // Returns true if more requests can be sent. -func (c *Connection) Request(chunk Request) bool { +func (c *connection) Request(chunk Request) bool { if len(c.Requests) >= maxRequests { return false } @@ -88,7 +88,7 @@ func (c *Connection) Request(chunk Request) bool { return true } -func (c *Connection) Unchoke() { +func (c *connection) Unchoke() { if !c.Choked { return } @@ -98,7 +98,7 @@ func (c *Connection) Unchoke() { c.Choked = false } -func (c *Connection) SetInterested(interested bool) { +func (c *connection) SetInterested(interested bool) { if c.Interested == interested { return } @@ -119,7 +119,7 @@ var ( keepAliveBytes [4]byte ) -func (conn *Connection) writer() { +func (conn *connection) writer() { timer := time.NewTimer(0) defer timer.Stop() for { @@ -146,7 +146,7 @@ func (conn *Connection) writer() { } } -func (conn *Connection) writeOptimizer() { +func (conn *connection) writeOptimizer() { pending := list.New() var nextWrite []byte defer close(conn.write) diff --git a/fs/torrentfs.go b/fs/torrentfs.go index fba0e09e..914c2697 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -3,7 +3,6 @@ package torrentfs import ( "log" "os" - "sync" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" @@ -16,9 +15,7 @@ const ( ) type torrentFS struct { - Client *torrent.Client - DataSubs map[chan torrent.DataSpec]struct{} - sync.Mutex + Client *torrent.Client } var _ fusefs.NodeForgetter = rootNode{} @@ -229,8 +226,7 @@ func (tfs *torrentFS) Root() (fusefs.Node, fuse.Error) { func New(cl *torrent.Client) *torrentFS { fs := &torrentFS{ - Client: cl, - DataSubs: make(map[chan torrent.DataSpec]struct{}), + Client: cl, } return fs } diff --git a/misc.go b/misc.go index e5644005..ce0997f2 100644 --- a/misc.go +++ b/misc.go @@ -1,6 +1,7 @@ package torrent import ( + "bitbucket.org/anacrolix/go.torrent/mmap_span" "crypto" "errors" "os" @@ -13,10 +14,10 @@ import ( ) const ( - PieceHash = crypto.SHA1 + pieceHash = crypto.SHA1 maxRequests = 250 - chunkSize = 0x4000 // 16KiB - BEP20 = "-GT0000-" + chunkSize = 0x4000 // 16KiB + BEP20 = "-GT0000-" // Peer ID client identifier prefix dialTimeout = time.Second * 15 ) @@ -39,7 +40,7 @@ func BytesInfoHash(b []byte) (ih InfoHash) { type piece struct { Hash pieceSum - PendingChunkSpecs map[ChunkSpec]struct{} + PendingChunkSpecs map[chunkSpec]struct{} Hashing bool QueuedForHash bool EverHashed bool @@ -49,19 +50,19 @@ func (p *piece) Complete() bool { return len(p.PendingChunkSpecs) == 0 && p.EverHashed } -func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) { +func lastChunkSpec(pieceLength peer_protocol.Integer) (cs chunkSpec) { cs.Begin = (pieceLength - 1) / chunkSize * chunkSize cs.Length = pieceLength - cs.Begin return } -type ChunkSpec struct { +type chunkSpec struct { Begin, Length peer_protocol.Integer } type Request struct { Index peer_protocol.Integer - ChunkSpec + chunkSpec } type pieceByBytesPendingSlice struct { @@ -81,10 +82,11 @@ func (me pieceByBytesPendingSlice) Swap(i, j int) { } var ( + // Requested data not yet available. ErrDataNotReady = errors.New("data not ready") ) -func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) { +func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms mmap_span.MMapSpan, err error) { defer func() { if err != nil { mms.Close() @@ -123,7 +125,7 @@ func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan if int64(len(mMap)) != miFile.Length { panic("mmap has wrong length") } - mms = append(mms, MMap{mMap}) + mms = append(mms, mMap) }() if err != nil { return diff --git a/mmap_span.go b/mmap_span/mmap_span.go similarity index 80% rename from mmap_span.go rename to mmap_span/mmap_span.go index 810c8257..66635a12 100644 --- a/mmap_span.go +++ b/mmap_span/mmap_span.go @@ -1,4 +1,4 @@ -package torrent +package mmap_span import ( "io" @@ -6,19 +6,19 @@ import ( "launchpad.net/gommap" ) -type MMap struct { +type segment struct { gommap.MMap } -func (me MMap) Size() int64 { +func (me segment) Size() int64 { return int64(len(me.MMap)) } -type MMapSpan []MMap +type MMapSpan []gommap.MMap func (me MMapSpan) span() (s span) { for _, mmap := range me { - s = append(s, mmap) + s = append(s, segment{mmap}) } return } @@ -30,15 +30,15 @@ func (me MMapSpan) Close() { } func (me MMapSpan) Size() (ret int64) { - for _, mmap := range me { - ret += mmap.Size() + for _, seg := range me.span() { + ret += seg.Size() } return } func (me MMapSpan) ReadAt(p []byte, off int64) (n int, err error) { me.span().ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) { - _n := copy(p, interval.(MMap).MMap[intervalOffset:]) + _n := copy(p, interval.(segment).MMap[intervalOffset:]) p = p[_n:] n += _n return len(p) == 0 @@ -52,7 +52,7 @@ func (me MMapSpan) ReadAt(p []byte, off int64) (n int, err error) { func (me MMapSpan) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { me.span().ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) { var _n int - p := interval.(MMap).MMap[intervalOffset:] + p := interval.(segment).MMap[intervalOffset:] if n < int64(len(p)) { p = p[:n] } @@ -69,7 +69,7 @@ func (me MMapSpan) WriteSectionTo(w io.Writer, off, n int64) (written int64, err func (me MMapSpan) WriteAt(p []byte, off int64) (n int, err error) { me.span().ApplyTo(off, func(iOff int64, i sizer) (stop bool) { - mMap := i.(MMap) + mMap := i.(segment) _n := copy(mMap.MMap[iOff:], p) // err = mMap.Sync(gommap.MS_ASYNC) // if err != nil { diff --git a/span.go b/mmap_span/span.go similarity index 94% rename from span.go rename to mmap_span/span.go index b744926b..5db95885 100644 --- a/span.go +++ b/mmap_span/span.go @@ -1,4 +1,4 @@ -package torrent +package mmap_span type sizer interface { Size() int64 diff --git a/torrent.go b/torrent.go index d6279f2d..8c026e96 100644 --- a/torrent.go +++ b/torrent.go @@ -6,12 +6,13 @@ import ( "net" "sort" + "bitbucket.org/anacrolix/go.torrent/mmap_span" "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" metainfo "github.com/nsf/libtorgo/torrent" ) -func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) { +func (t *torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_protocol.Integer) { pendingChunks := t.Pieces[index].PendingChunkSpecs count = peer_protocol.Integer(len(pendingChunks)) * chunkSize _lastChunkSpec := lastChunkSpec(t.PieceLength(index)) @@ -23,12 +24,12 @@ func (t *Torrent) PieceNumPendingBytes(index peer_protocol.Integer) (count peer_ return } -type Torrent struct { +type torrent struct { InfoHash InfoHash Pieces []*piece - Data MMapSpan + Data mmap_span.MMapSpan MetaInfo *metainfo.MetaInfo - Conns []*Connection + Conns []*connection Peers []Peer Priorities *list.List // BEP 12 Multitracker Metadata Extension. The tracker.Client instances @@ -36,11 +37,11 @@ type Torrent struct { Trackers [][]tracker.Client } -func (t *Torrent) NumPieces() int { - return len(t.MetaInfo.Pieces) / PieceHash.Size() +func (t *torrent) NumPieces() int { + return len(t.MetaInfo.Pieces) / pieceHash.Size() } -func (t *Torrent) NumPiecesCompleted() (num int) { +func (t *torrent) NumPiecesCompleted() (num int) { for _, p := range t.Pieces { if p.Complete() { num++ @@ -49,11 +50,11 @@ func (t *Torrent) NumPiecesCompleted() (num int) { return } -func (t *Torrent) Length() int64 { +func (t *torrent) Length() int64 { return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0)) } -func (t *Torrent) Close() (err error) { +func (t *torrent) Close() (err error) { t.Data.Close() for _, conn := range t.Conns { conn.Close() @@ -61,7 +62,7 @@ func (t *Torrent) Close() (err error) { return } -func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { +func (t *torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) { slice := pieceByBytesPendingSlice{ Pending: make([]peer_protocol.Integer, 0, len(t.Pieces)), Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)), @@ -100,35 +101,35 @@ func torrentRequestOffset(torrentLength, pieceSize int64, r Request) (off int64) return } -func (t *Torrent) requestOffset(r Request) int64 { +func (t *torrent) requestOffset(r Request) int64 { return torrentRequestOffset(t.Length(), t.MetaInfo.PieceLength, r) } // Return the request that would include the given offset into the torrent data. -func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) { +func (t *torrent) offsetRequest(off int64) (req Request, ok bool) { return torrentOffsetRequest(t.Length(), t.MetaInfo.PieceLength, chunkSize, off) } -func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { +func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { _, err = t.Data.WriteAt(data, int64(piece)*t.MetaInfo.PieceLength+begin) return } -func (t *Torrent) bitfield() (bf []bool) { +func (t *torrent) bitfield() (bf []bool) { for _, p := range t.Pieces { bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0) } return } -func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) { +func (t *torrent) pendAllChunkSpecs(index peer_protocol.Integer) { piece := t.Pieces[index] if piece.PendingChunkSpecs == nil { piece.PendingChunkSpecs = make( - map[ChunkSpec]struct{}, + map[chunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize) } - c := ChunkSpec{ + c := chunkSpec{ Begin: 0, } cs := piece.PendingChunkSpecs @@ -143,7 +144,7 @@ func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) { return } -func (t *Torrent) requestHeat() (ret map[Request]int) { +func (t *torrent) requestHeat() (ret map[Request]int) { ret = make(map[Request]int) for _, conn := range t.Conns { for req, _ := range conn.Requests { @@ -159,7 +160,7 @@ type Peer struct { Port int } -func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { +func (t *torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) { if int(piece) == t.NumPieces()-1 { len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength) } @@ -169,8 +170,8 @@ func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.I return } -func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) { - hash := PieceHash.New() +func (t *torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) { + hash := pieceHash.New() n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength) if err != nil { panic(err) @@ -181,7 +182,7 @@ func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) { copyHashSum(ps[:], hash.Sum(nil)) return } -func (t *Torrent) haveAllPieces() bool { +func (t *torrent) haveAllPieces() bool { for _, piece := range t.Pieces { if !piece.Complete() { return false @@ -190,7 +191,7 @@ func (t *Torrent) haveAllPieces() bool { return true } -func (me *Torrent) haveAnyPieces() bool { +func (me *torrent) haveAnyPieces() bool { for _, piece := range me.Pieces { if piece.Complete() { return true @@ -199,7 +200,7 @@ func (me *Torrent) haveAnyPieces() bool { return false } -func (t *Torrent) wantPiece(index int) bool { +func (t *torrent) wantPiece(index int) bool { p := t.Pieces[index] return p.EverHashed && len(p.PendingChunkSpecs) != 0 } diff --git a/torrent_test.go b/torrent_test.go index e31a16a2..8e102df3 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -6,7 +6,7 @@ import ( ) func r(i, b, l peer_protocol.Integer) Request { - return Request{i, ChunkSpec{b, l}} + return Request{i, chunkSpec{b, l}} } // Check the given Request is correct for various torrent offsets.