Implement prioritizing of torrent data regions based on FS activity

This commit is contained in:
Matt Joiner 2013-10-15 01:39:12 +11:00
parent 5c0ff3ff5f
commit 0a5043ba69
2 changed files with 181 additions and 73 deletions

238
client.go
View File

@ -82,7 +82,12 @@ type connection struct {
PeerPieces []bool
}
func (c *connection) PeerHasPiece(index int) bool {
func (c *connection) Close() {
c.Socket.Close()
close(c.post)
}
func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
if c.PeerPieces == nil {
return false
}
@ -97,6 +102,10 @@ func (c *connection) Request(chunk Request) bool {
if len(c.Requests) >= maxRequests {
return false
}
c.SetInterested(true)
if c.PeerChoked {
return false
}
if _, ok := c.Requests[chunk]; !ok {
c.Post(peer_protocol.Message{
Type: peer_protocol.Request,
@ -131,10 +140,12 @@ func (c *connection) SetInterested(interested bool) {
func (conn *connection) writer() {
for {
b := <-conn.write
if b == nil {
break
}
n, err := conn.Socket.Write(b)
if err != nil {
log.Print(err)
close(conn.write)
break
}
if n != len(b) {
@ -147,6 +158,7 @@ func (conn *connection) writer() {
func (conn *connection) writeOptimizer() {
pending := list.New()
var nextWrite []byte
defer close(conn.write)
for {
write := conn.write
if pending.Len() == 0 {
@ -159,7 +171,10 @@ func (conn *connection) writeOptimizer() {
}
}
select {
case msg := <-conn.post:
case msg, ok := <-conn.post:
if !ok {
return
}
pending.PushBack(msg)
case write <- nextWrite:
pending.Remove(pending.Front())
@ -168,12 +183,53 @@ func (conn *connection) writeOptimizer() {
}
type Torrent struct {
InfoHash InfoHash
Pieces []piece
Data MMapSpan
MetaInfo *metainfo.MetaInfo
Conns []*connection
Peers []Peer
InfoHash InfoHash
Pieces []piece
Data MMapSpan
MetaInfo *metainfo.MetaInfo
Conns []*connection
Peers []Peer
Priorities *list.List
}
func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
for len_ > 0 {
index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
piece := t.Pieces[index]
if piece.State == pieceStateComplete {
adv := int64(t.PieceLength(index) - pieceOff)
off += adv
len_ -= adv
continue
}
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
adv := int64(chunkSize - pieceOff%chunkSize)
off += adv
len_ -= adv
switch piece.State {
case pieceStateIncomplete:
if _, ok := piece.PendingChunkSpecs[chunk]; !ok {
continue
}
case pieceStateUnknown:
default:
panic("unexpected piece state")
}
newPriorities = append(newPriorities, Request{index, chunk})
}
if len(newPriorities) < 1 {
return
}
log.Print(newPriorities)
if t.Priorities == nil {
t.Priorities = list.New()
}
t.Priorities.PushFront(newPriorities[0])
for _, req := range newPriorities[1:] {
t.Priorities.PushBack(req)
}
}
func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
@ -188,7 +244,7 @@ func (t *Torrent) bitfield() (bf []bool) {
return
}
func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) {
func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) {
cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
c := ChunkSpec{
Begin: 0,
@ -220,8 +276,8 @@ type Peer struct {
Port int
}
func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
if piece == len(t.Pieces)-1 {
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
if int(piece) == len(t.Pieces)-1 {
len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
}
if len_ == 0 {
@ -230,7 +286,7 @@ func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
return
}
func (t *Torrent) HashPiece(piece int) (ps pieceSum) {
func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
hash := PieceHash.New()
n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
if err != nil {
@ -278,7 +334,7 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
err = errors.New("unknown torrent")
return
}
index := int(off / int64(t.PieceLength(0)))
index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
piece := t.Pieces[index]
pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
switch piece.State {
@ -317,6 +373,9 @@ func (c *Client) Start() {
c.addTorrent = make(chan *Torrent)
c.torrentFinished = make(chan InfoHash)
c.actorTask = make(chan func())
if c.HalfOpenLimit == 0 {
c.HalfOpenLimit = 10
}
o := copy(c.PeerId[:], BEP20)
_, err := rand.Read(c.PeerId[o:])
if err != nil {
@ -422,6 +481,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
write: make(chan []byte),
post: make(chan encoding.BinaryMarshaler),
}
defer conn.Close()
go conn.writer()
go conn.writeOptimizer()
conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
@ -432,11 +492,16 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
}
var b [28]byte
_, err := io.ReadFull(conn.Socket, b[:])
if err != nil {
log.Fatal(err)
switch err {
case nil:
case io.EOF:
return
default:
err = fmt.Errorf("when reading protocol and extensions: %s", err)
return
}
if string(b[:20]) != peer_protocol.Protocol {
log.Printf("wrong protocol: %#v", string(b[:20]))
err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
return
}
if 8 != copy(conn.PeerExtensions[:], b[20:]) {
@ -460,6 +525,7 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
conn.post <- peer_protocol.Bytes(me.PeerId[:])
}
done := make(chan struct{})
me.withContext(func() {
me.addConnection(torrent, conn)
if torrent.haveAnyPieces() {
@ -468,16 +534,16 @@ func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
Bitfield: torrent.bitfield(),
})
}
go func() {
defer me.withContext(func() {
me.dropConnection(torrent, conn)
})
err := me.runConnection(torrent, conn)
if err != nil {
log.Print(err)
}
}()
close(done)
})
<-done
defer me.withContext(func() {
me.dropConnection(torrent, conn)
})
err = me.runConnection(torrent, conn)
if err != nil {
log.Print(err)
}
}
func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
@ -486,7 +552,6 @@ func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
}
conn.PeerPieces[piece] = true
if torrent.wantPiece(piece) {
conn.SetInterested(true)
me.replenishConnRequests(torrent, conn)
}
}
@ -555,20 +620,7 @@ func (me *Client) runConnection(torrent *Torrent, conn *connection) error {
break
}
delete(conn.Requests, request_)
if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok {
log.Printf("got unnecessary chunk: %s", request_)
break
}
err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
break
}
delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec)
me.downloadedChunk(torrent, request_)
if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 {
torrent.Pieces[request_.Index].State = pieceStateUnknown
go me.verifyPiece(torrent, int(request_.Index))
}
err = me.downloadedChunk(torrent, msg)
default:
log.Printf("received unknown message type: %#v", msg.Type)
}
@ -669,32 +721,56 @@ func (me *Client) withContext(f func()) {
}
func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
if len(conn.Requests) >= maxRequests {
return
}
if conn.PeerChoked {
return
}
requestHeatMap := torrent.requestHeat()
for index, has := range conn.PeerPieces {
if !has {
if torrent.Priorities == nil {
return
}
for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
req := e.Value.(Request)
if !conn.PeerPieces[req.Index] {
continue
}
for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs {
request := Request{peer_protocol.Integer(index), chunkSpec}
if heat := requestHeatMap[request]; heat > 0 {
continue
}
conn.SetInterested(true)
if !conn.Request(request) {
return
}
switch torrent.Pieces[req.Index].State {
case pieceStateUnknown:
continue
case pieceStateIncomplete:
default:
panic("prioritized chunk for invalid piece state")
}
if requestHeatMap[req] > 0 {
continue
}
if !conn.Request(req) {
break
}
}
//conn.SetInterested(false)
}
func (me *Client) downloadedChunk(t *Torrent, chunk Request) {
func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok {
log.Printf("got unnecessary chunk: %s", request)
return
}
err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
return
}
delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
torrent.Pieces[request.Index].State = pieceStateUnknown
go me.verifyPiece(torrent, request.Index)
return
}
var next *list.Element
for e := torrent.Priorities.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(Request) == request {
torrent.Priorities.Remove(e)
}
}
me.dataReady(DataSpec{torrent.InfoHash, request})
return
}
func (cl *Client) dataReady(ds DataSpec) {
@ -706,7 +782,7 @@ func (cl *Client) dataReady(ds DataSpec) {
}()
}
func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) {
torrent := me.torrents[ih]
newState := func() pieceState {
if correct {
@ -715,16 +791,24 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
return pieceStateIncomplete
}
}()
oldState := torrent.Pieces[piece].State
if newState == oldState {
return
}
// oldState := torrent.Pieces[piece].State
// if newState == oldState {
// return
// }
torrent.Pieces[piece].State = newState
switch newState {
case pieceStateIncomplete:
torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
case pieceStateComplete:
log.Print(piece)
var next *list.Element
if torrent.Priorities != nil {
for e := torrent.Priorities.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(Request).Index == piece {
torrent.Priorities.Remove(e)
}
}
}
me.dataReady(DataSpec{
torrent.InfoHash,
Request{
@ -747,11 +831,14 @@ func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
}
}
func (me *Client) verifyPiece(torrent *Torrent, index int) {
func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) {
sum := torrent.HashPiece(index)
done := make(chan struct{})
me.withContext(func() {
me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash)
close(done)
})
<-done
}
func (me *Client) run() {
@ -769,7 +856,7 @@ func (me *Client) run() {
me.torrents[torrent.InfoHash] = torrent
go func() {
for index := range torrent.Pieces {
me.verifyPiece(torrent, index)
me.verifyPiece(torrent, peer_protocol.Integer(index))
}
}()
case infoHash := <-me.torrentFinished:
@ -791,3 +878,16 @@ func (me *Client) Torrents() (ret []*Torrent) {
<-done
return
}
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
done := make(chan struct{})
cl.withContext(func() {
t := cl.torrent(ih)
t.PrioritizeDataRegion(off, len_)
for _, cn := range t.Conns {
cl.replenishConnRequests(t, cn)
}
close(done)
})
<-done
}

View File

@ -8,13 +8,13 @@ import (
"github.com/davecheney/profile"
metainfo "github.com/nsf/libtorgo/torrent"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/user"
"path/filepath"
"sync"
"syscall"
)
var (
@ -48,7 +48,6 @@ type TorrentFS struct {
func (tfs *TorrentFS) publishData() {
for {
spec := <-tfs.Client.DataReady
log.Printf("ready data: %s", spec)
tfs.Lock()
for ds := range tfs.DataSubs {
ds <- spec
@ -110,11 +109,15 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
if int64(req.Size) < _len {
return req.Size
} else {
// limit read to the end of the file
return int(_len)
}
}())
infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
torrentOff := fn.TorrentOffset + req.Offset
fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
for {
n, err := fn.FS.Client.TorrentReadAt(torrent.BytesInfoHash(fn.metaInfo.InfoHash), fn.TorrentOffset+req.Offset, data)
n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
switch err {
case nil:
resp.Data = data[:n]
@ -123,7 +126,7 @@ func (fn fileNode) Read(req *fuse.ReadRequest, resp *fuse.ReadResponse, intr fus
select {
case <-dataSpecs:
case <-intr:
return fuse.Errno(syscall.EINTR)
return fuse.EINTR
}
default:
log.Print(err)
@ -270,6 +273,7 @@ func (tfs *TorrentFS) Root() (fusefs.Node, fuse.Error) {
func main() {
pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
flag.Parse()
log.SetFlags(log.LstdFlags | log.Lshortfile)
if *pprofAddr != "" {
go http.ListenAndServe(*pprofAddr, nil)
}
@ -297,6 +301,10 @@ func main() {
if err != nil {
log.Print(err)
}
client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), []torrent.Peer{{
IP: net.IPv4(127, 0, 0, 1),
Port: 3000,
}})
}
conn, err := fuse.Mount(mountDir)
if err != nil {