Add support for Fast Extension
This commit is contained in:
parent
339c650085
commit
e85b7e228b
65
client.go
65
client.go
|
@ -21,6 +21,7 @@ import (
|
|||
"container/heap"
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
|
@ -68,6 +69,7 @@ var (
|
|||
successfulDials = expvar.NewInt("successfulDials")
|
||||
acceptedConns = expvar.NewInt("acceptedConns")
|
||||
inboundConnsBlocked = expvar.NewInt("inboundConnsBlocked")
|
||||
peerExtensions = expvar.NewMap("peerExtensions")
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -75,7 +77,8 @@ const (
|
|||
//
|
||||
// Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
|
||||
// DHT: http://www.bittorrent.org/beps/bep_0005.html
|
||||
extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
|
||||
// Fast Extension: http://bittorrent.org/beps/bep_0006.html
|
||||
extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
|
||||
|
||||
socketsPerTorrent = 40
|
||||
torrentPeersHighWater = 200
|
||||
|
@ -850,7 +853,11 @@ func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res hand
|
|||
CopyExact(&res.peerExtensionBytes, b[20:28])
|
||||
CopyExact(&res.InfoHash, b[28:48])
|
||||
CopyExact(&res.peerID, b[48:68])
|
||||
peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
|
||||
|
||||
// TODO: Maybe we can just drop peers here if we're not interested. This
|
||||
// could prevent them trying to reconnect, falsely believing there was
|
||||
// just a problem.
|
||||
if ih == nil { // We were waiting for the peer to tell us what they wanted.
|
||||
post(res.InfoHash[:])
|
||||
post(peerID[:])
|
||||
|
@ -1008,7 +1015,7 @@ func (t *torrent) initRequestOrdering(c *connection) {
|
|||
c.piecePriorities = mathRand.Perm(t.numPieces())
|
||||
c.pieceRequestOrder = pieceordering.New()
|
||||
for i := 0; i < t.numPieces(); i++ {
|
||||
if !c.PeerHasPiece(pp.Integer(i)) {
|
||||
if !c.PeerHasPiece(i) {
|
||||
continue
|
||||
}
|
||||
if !t.wantPiece(i) {
|
||||
|
@ -1019,16 +1026,18 @@ func (t *torrent) initRequestOrdering(c *connection) {
|
|||
}
|
||||
|
||||
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
|
||||
if t.haveInfo() {
|
||||
if c.PeerPieces == nil {
|
||||
c.PeerPieces = make([]bool, t.numPieces())
|
||||
}
|
||||
} else {
|
||||
for piece >= len(c.PeerPieces) {
|
||||
c.PeerPieces = append(c.PeerPieces, false)
|
||||
if !c.peerHasAll {
|
||||
if t.haveInfo() {
|
||||
if c.PeerPieces == nil {
|
||||
c.PeerPieces = make([]bool, t.numPieces())
|
||||
}
|
||||
} else {
|
||||
for piece >= len(c.PeerPieces) {
|
||||
c.PeerPieces = append(c.PeerPieces, false)
|
||||
}
|
||||
}
|
||||
c.PeerPieces[piece] = true
|
||||
}
|
||||
c.PeerPieces[piece] = true
|
||||
if t.wantPiece(piece) {
|
||||
t.connPendPiece(c, piece)
|
||||
me.replenishConnRequests(t, c)
|
||||
|
@ -1166,6 +1175,16 @@ func addrPort(addr net.Addr) int {
|
|||
return AddrPort(addr)
|
||||
}
|
||||
|
||||
func (cl *Client) peerHasAll(t *torrent, cn *connection) {
|
||||
cn.peerHasAll = true
|
||||
cn.PeerPieces = nil
|
||||
if t.haveInfo() {
|
||||
for i := 0; i < t.numPieces(); i++ {
|
||||
cl.peerGotPiece(t, cn, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Processes incoming bittorrent messages. The client lock is held upon entry
|
||||
// and exit.
|
||||
func (me *Client) connectionLoop(t *torrent, c *connection) error {
|
||||
|
@ -1201,6 +1220,8 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
|
|||
}
|
||||
// We can then reset our interest.
|
||||
me.replenishConnRequests(t, c)
|
||||
case pp.Reject:
|
||||
me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
|
||||
case pp.Unchoke:
|
||||
c.PeerChoked = false
|
||||
me.peerUnchoked(t, c)
|
||||
|
@ -1248,7 +1269,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
|
|||
unexpectedCancels.Add(1)
|
||||
}
|
||||
case pp.Bitfield:
|
||||
if c.PeerPieces != nil {
|
||||
if c.PeerPieces != nil || c.peerHasAll {
|
||||
err = errors.New("received unexpected bitfield")
|
||||
break
|
||||
}
|
||||
|
@ -1265,6 +1286,24 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
|
|||
me.peerGotPiece(t, c, index)
|
||||
}
|
||||
}
|
||||
case pp.HaveAll:
|
||||
if c.PeerPieces != nil || c.peerHasAll {
|
||||
err = errors.New("unexpected have-all")
|
||||
break
|
||||
}
|
||||
me.peerHasAll(t, c)
|
||||
case pp.HaveNone:
|
||||
if c.peerHasAll || c.PeerPieces != nil {
|
||||
err = errors.New("unexpected have-none")
|
||||
break
|
||||
}
|
||||
c.PeerPieces = make([]bool, func() int {
|
||||
if t.haveInfo() {
|
||||
return t.numPieces()
|
||||
} else {
|
||||
return 0
|
||||
}
|
||||
}())
|
||||
case pp.Piece:
|
||||
err = me.downloadedChunk(t, c, &msg)
|
||||
case pp.Extended:
|
||||
|
@ -2278,7 +2317,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
|
|||
}
|
||||
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
|
||||
pieceIndex := e.Piece()
|
||||
if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
|
||||
if !c.PeerHasPiece(pieceIndex) {
|
||||
panic("piece in request order but peer doesn't have it")
|
||||
}
|
||||
if !t.wantPiece(pieceIndex) {
|
||||
|
@ -2401,7 +2440,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
|
|||
}
|
||||
conn.pieceRequestOrder.DeletePiece(int(piece))
|
||||
}
|
||||
if t.wantPiece(int(piece)) && conn.PeerHasPiece(pp.Integer(piece)) {
|
||||
if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
|
||||
t.connPendPiece(conn, int(piece))
|
||||
me.replenishConnRequests(t, conn)
|
||||
}
|
||||
|
|
|
@ -287,7 +287,7 @@ func TestReadaheadPieces(t *testing.T) {
|
|||
{5 * 1024 * 1024, 1048576, 4},
|
||||
} {
|
||||
if readaheadPieces(case_.readaheadBytes, case_.pieceLength) != case_.readaheadPieces {
|
||||
t.Fatalf("case failed: %s", case_)
|
||||
t.Fatalf("case failed: %v", case_)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,9 @@ type connection struct {
|
|||
PeerExtensionBytes peerExtensionBytes
|
||||
// Whether the peer has the given piece. nil if they've not sent any
|
||||
// related messages yet.
|
||||
PeerPieces []bool
|
||||
PeerPieces []bool
|
||||
peerHasAll bool
|
||||
|
||||
PeerMaxRequests int // Maximum pending requests the peer allows.
|
||||
PeerExtensionIDs map[string]int64
|
||||
PeerClientName string
|
||||
|
@ -115,32 +117,42 @@ func (cn *connection) supportsExtension(ext string) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
func (cn *connection) completedString() string {
|
||||
if cn.PeerPieces == nil {
|
||||
func (cn *connection) completedString(t *torrent) string {
|
||||
if cn.PeerPieces == nil && !cn.peerHasAll {
|
||||
return "?"
|
||||
}
|
||||
// f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
|
||||
// return fmt.Sprintf("%d%%", int(f*100))
|
||||
return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
|
||||
}
|
||||
|
||||
func (cn *connection) totalPiecesCount() int {
|
||||
return len(cn.PeerPieces)
|
||||
}
|
||||
|
||||
func (cn *connection) piecesPeerHasCount() (count int) {
|
||||
for _, has := range cn.PeerPieces {
|
||||
if has {
|
||||
count++
|
||||
return fmt.Sprintf("%d/%d", func() int {
|
||||
if cn.peerHasAll {
|
||||
if t.haveInfo() {
|
||||
return t.numPieces()
|
||||
}
|
||||
return -1
|
||||
}
|
||||
}
|
||||
return
|
||||
ret := 0
|
||||
for _, b := range cn.PeerPieces {
|
||||
if b {
|
||||
ret++
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}(), func() int {
|
||||
if cn.peerHasAll || cn.PeerPieces == nil {
|
||||
if t.haveInfo() {
|
||||
return t.numPieces()
|
||||
}
|
||||
return -1
|
||||
}
|
||||
return len(cn.PeerPieces)
|
||||
}())
|
||||
}
|
||||
|
||||
// Correct the PeerPieces slice length. Return false if the existing slice is
|
||||
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
|
||||
// messages.
|
||||
func (cn *connection) setNumPieces(num int) error {
|
||||
if cn.peerHasAll {
|
||||
return nil
|
||||
}
|
||||
if cn.PeerPieces == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -170,9 +182,9 @@ func eventAgeString(t time.Time) string {
|
|||
return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
|
||||
}
|
||||
|
||||
func (cn *connection) WriteStatus(w io.Writer) {
|
||||
func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
|
||||
// \t isn't preserved in <pre> blocks?
|
||||
fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
|
||||
fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
|
||||
c := func(b byte) {
|
||||
fmt.Fprintf(w, "%c", b)
|
||||
}
|
||||
|
@ -215,14 +227,14 @@ func (c *connection) Close() {
|
|||
go c.Socket.Close()
|
||||
}
|
||||
|
||||
func (c *connection) PeerHasPiece(index pp.Integer) bool {
|
||||
if c.PeerPieces == nil {
|
||||
func (c *connection) PeerHasPiece(piece int) bool {
|
||||
if c.peerHasAll {
|
||||
return true
|
||||
}
|
||||
if piece >= len(c.PeerPieces) {
|
||||
return false
|
||||
}
|
||||
if int(index) >= len(c.PeerPieces) {
|
||||
return false
|
||||
}
|
||||
return c.PeerPieces[index]
|
||||
return c.PeerPieces[piece]
|
||||
}
|
||||
|
||||
func (c *connection) Post(msg pp.Message) {
|
||||
|
@ -242,7 +254,7 @@ func (c *connection) Request(chunk request) bool {
|
|||
if len(c.Requests) >= c.PeerMaxRequests {
|
||||
return false
|
||||
}
|
||||
if !c.PeerHasPiece(chunk.Index) {
|
||||
if !c.PeerHasPiece(int(chunk.Index)) {
|
||||
return true
|
||||
}
|
||||
if c.RequestPending(chunk) {
|
||||
|
|
|
@ -34,7 +34,15 @@ const (
|
|||
Piece // 7
|
||||
Cancel // 8
|
||||
Port // 9
|
||||
Extended = 20
|
||||
|
||||
// BEP 6
|
||||
Suggest = 0xd // 13
|
||||
HaveAll = 0xe // 14
|
||||
HaveNone = 0xf // 15
|
||||
Reject = 0x10 // 16
|
||||
AllowedFast = 0x11 // 17
|
||||
|
||||
Extended = 20
|
||||
|
||||
HandshakeExtendedID = 0
|
||||
|
||||
|
@ -62,10 +70,10 @@ func (msg Message) MarshalBinary() (data []byte, err error) {
|
|||
return
|
||||
}
|
||||
switch msg.Type {
|
||||
case Choke, Unchoke, Interested, NotInterested:
|
||||
case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
|
||||
case Have:
|
||||
err = binary.Write(buf, binary.BigEndian, msg.Index)
|
||||
case Request, Cancel:
|
||||
case Request, Cancel, Reject:
|
||||
for _, i := range []Integer{msg.Index, msg.Begin, msg.Length} {
|
||||
err = binary.Write(buf, binary.BigEndian, i)
|
||||
if err != nil {
|
||||
|
@ -159,11 +167,11 @@ func (d *Decoder) Decode(msg *Message) (err error) {
|
|||
}
|
||||
msg.Type = MessageType(c)
|
||||
switch msg.Type {
|
||||
case Choke, Unchoke, Interested, NotInterested:
|
||||
case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone:
|
||||
return
|
||||
case Have:
|
||||
err = msg.Index.Read(r)
|
||||
case Request, Cancel:
|
||||
case Request, Cancel, Reject:
|
||||
for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} {
|
||||
err = data.Read(r)
|
||||
if err != nil {
|
||||
|
|
|
@ -482,7 +482,7 @@ func (t *torrent) WriteStatus(w io.Writer) {
|
|||
t: t,
|
||||
})
|
||||
for _, c := range t.Conns {
|
||||
c.WriteStatus(w)
|
||||
c.WriteStatus(w, t)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -553,7 +553,7 @@ func (t *torrent) lastPieceSize() int {
|
|||
}
|
||||
|
||||
func (t *torrent) numPieces() int {
|
||||
return len(t.Info.Pieces) / 20
|
||||
return t.Info.NumPieces()
|
||||
}
|
||||
|
||||
func (t *torrent) numPiecesCompleted() (num int) {
|
||||
|
@ -744,7 +744,7 @@ func (t *torrent) wantPiece(index int) bool {
|
|||
|
||||
func (t *torrent) connHasWantedPieces(c *connection) bool {
|
||||
for p := range t.Pieces {
|
||||
if t.wantPiece(p) && c.PeerHasPiece(pp.Integer(p)) {
|
||||
if t.wantPiece(p) && c.PeerHasPiece(p) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue