Rewrite handshaking and connection management
This commit is contained in:
parent
fa025bdbee
commit
a7dddd9be6
411
client.go
411
client.go
|
@ -75,6 +75,12 @@ var (
|
|||
acceptedConns = expvar.NewInt("acceptedConns")
|
||||
inboundConnsBlocked = expvar.NewInt("inboundConnsBlocked")
|
||||
peerExtensions = expvar.NewMap("peerExtensions")
|
||||
// Count of connections to peer with same client ID.
|
||||
connsToSelf = expvar.NewInt("connsToSelf")
|
||||
// Number of completed connections to a client we're already connected with.
|
||||
duplicateClientConns = expvar.NewInt("duplicateClientConns")
|
||||
receivedMessageTypes = expvar.NewMap("receivedMessageTypes")
|
||||
supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -94,7 +100,8 @@ const (
|
|||
// Limit how long handshake can take. This is to reduce the lingering
|
||||
// impact of a few bad apples. 4s loses 1% of successful handshakes that
|
||||
// are obtained with 60s timeout, and 5% of unsuccessful handshakes.
|
||||
handshakeTimeout = 45 * time.Second
|
||||
btHandshakeTimeout = 4 * time.Second
|
||||
handshakesTimeout = 20 * time.Second
|
||||
|
||||
pruneInterval = 10 * time.Second
|
||||
)
|
||||
|
@ -143,8 +150,6 @@ type Client struct {
|
|||
event sync.Cond
|
||||
quit chan struct{}
|
||||
|
||||
handshaking int
|
||||
|
||||
torrents map[InfoHash]*torrent
|
||||
}
|
||||
|
||||
|
@ -218,7 +223,6 @@ func (cl *Client) WriteStatus(_w io.Writer) {
|
|||
fmt.Fprintln(w, "Not listening!")
|
||||
}
|
||||
fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
|
||||
fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
|
||||
if cl.dHT != nil {
|
||||
dhtStats := cl.dHT.Stats()
|
||||
fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes)
|
||||
|
@ -277,7 +281,7 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err
|
|||
}
|
||||
|
||||
// Sets priorities to download from the given offset. Returns when the piece
|
||||
// at the given offset can be read. Returns the number of bytes that
|
||||
// at the given offset can be read. Returns the number of bytes that are
|
||||
// immediately available from the offset.
|
||||
func (cl *Client) prepareRead(t *torrent, off int64) (n int64) {
|
||||
index := int(off / int64(t.usualPieceSize()))
|
||||
|
@ -328,6 +332,8 @@ again:
|
|||
panic(fmt.Sprintf("can't read from %T", d))
|
||||
}
|
||||
|
||||
// Calculates the number of pieces to set to Readahead priority, after the
|
||||
// Now, and Next pieces.
|
||||
func readaheadPieces(readahead, pieceLength int64) int {
|
||||
return int((readahead+pieceLength-1)/pieceLength - 1)
|
||||
}
|
||||
|
@ -340,7 +346,7 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off int64) {
|
|||
return
|
||||
}
|
||||
cl.raisePiecePriority(t, index, piecePriorityNext)
|
||||
for i := 0; i < readaheadPieces(5*1024*1024, t.Info.PieceLength); i++ {
|
||||
for range iter.N(readaheadPieces(5*1024*1024, t.Info.PieceLength)) {
|
||||
index++
|
||||
if index >= t.numPieces() {
|
||||
break
|
||||
|
@ -366,6 +372,7 @@ func (t *torrent) connPendPiece(c *connection, piece int) {
|
|||
|
||||
func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
|
||||
if t.Pieces[piece].Priority < priority {
|
||||
cl.event.Broadcast()
|
||||
cl.prioritizePiece(t, piece, priority)
|
||||
}
|
||||
}
|
||||
|
@ -624,12 +631,35 @@ func (cl *Client) acceptConnections(l net.Listener, utp bool) {
|
|||
conn.Close()
|
||||
continue
|
||||
}
|
||||
go func() {
|
||||
if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
|
||||
go cl.incomingConnection(conn, utp)
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
|
||||
defer nc.Close()
|
||||
if tc, ok := nc.(*net.TCPConn); ok {
|
||||
tc.SetLinger(0)
|
||||
}
|
||||
c := newConnection()
|
||||
c.conn = nc
|
||||
c.rw = nc
|
||||
c.Discovery = peerSourceIncoming
|
||||
c.uTP = utp
|
||||
err := cl.runReceivedConn(c)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) {
|
||||
cl.mu.Lock()
|
||||
defer cl.mu.Unlock()
|
||||
t, ok := cl.torrents[ih]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
T = Torrent{cl, t}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *Client) torrent(ih InfoHash) *torrent {
|
||||
|
@ -637,12 +667,12 @@ func (me *Client) torrent(ih InfoHash) *torrent {
|
|||
}
|
||||
|
||||
type dialResult struct {
|
||||
net.Conn
|
||||
Conn net.Conn
|
||||
UTP bool
|
||||
}
|
||||
|
||||
func doDial(dial func(addr string) (net.Conn, error), ch chan dialResult, utp bool, addr string) {
|
||||
conn, err := dial(addr)
|
||||
func doDial(dial func(addr string, t *torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *torrent) {
|
||||
conn, err := dial(addr, t)
|
||||
if err != nil {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
|
@ -693,16 +723,30 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
|
|||
log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
|
||||
return
|
||||
}
|
||||
dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
|
||||
t.HalfOpen[addr] = struct{}{}
|
||||
go func() {
|
||||
// Binding to the listen address and dialing via net.Dialer gives
|
||||
// "address in use" error. It seems it's not possible to dial out from
|
||||
// this address so that peers associate our local address with our
|
||||
// listen address.
|
||||
go me.outgoingConnection(t, addr, peer.Source)
|
||||
}
|
||||
|
||||
// Initiate connections via TCP and UTP simultaneously. Use the first
|
||||
// one that succeeds.
|
||||
func (me *Client) dialTimeout(t *torrent) time.Duration {
|
||||
return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
|
||||
}
|
||||
|
||||
func (me *Client) dialTCP(addr string, t *torrent) (c net.Conn, err error) {
|
||||
c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
|
||||
if err == nil {
|
||||
c.(*net.TCPConn).SetLinger(0)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *Client) dialUTP(addr string, t *torrent) (c net.Conn, err error) {
|
||||
return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
|
||||
}
|
||||
|
||||
// Returns a connection over UTP or TCP.
|
||||
func (me *Client) dial(addr string, t *torrent) (conn net.Conn, utp bool) {
|
||||
// Initiate connections via TCP and UTP simultaneously. Use the first one
|
||||
// that succeeds.
|
||||
left := 0
|
||||
if !me.disableUTP {
|
||||
left++
|
||||
|
@ -712,35 +756,18 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
|
|||
}
|
||||
resCh := make(chan dialResult, left)
|
||||
if !me.disableUTP {
|
||||
go doDial(func(addr string) (net.Conn, error) {
|
||||
return me.utpSock.DialTimeout(addr, dialTimeout)
|
||||
}, resCh, true, addr)
|
||||
go doDial(me.dialUTP, resCh, true, addr, t)
|
||||
}
|
||||
if !me.disableTCP {
|
||||
go doDial(func(addr string) (net.Conn, error) {
|
||||
// time.Sleep(time.Second) // Give uTP a bit of a head start.
|
||||
return net.DialTimeout("tcp", addr, dialTimeout)
|
||||
}, resCh, false, addr)
|
||||
go doDial(me.dialTCP, resCh, false, addr, t)
|
||||
}
|
||||
var res dialResult
|
||||
// Wait for a successful connection.
|
||||
for ; left > 0 && res.Conn == nil; left-- {
|
||||
res = <-resCh
|
||||
}
|
||||
// Whether or not the connection attempt succeeds, the half open
|
||||
// counter should be decremented, and new connection attempts made.
|
||||
go func() {
|
||||
me.mu.Lock()
|
||||
defer me.mu.Unlock()
|
||||
if _, ok := t.HalfOpen[addr]; !ok {
|
||||
panic("invariant broken")
|
||||
}
|
||||
delete(t.HalfOpen, addr)
|
||||
me.openNewConns(t)
|
||||
}()
|
||||
if res.Conn == nil {
|
||||
return
|
||||
}
|
||||
if left > 0 {
|
||||
// There are still incompleted dials.
|
||||
go func() {
|
||||
for ; left > 0; left-- {
|
||||
conn := (<-resCh).Conn
|
||||
|
@ -750,13 +777,88 @@ func (me *Client) initiateConn(peer Peer, t *torrent) {
|
|||
}
|
||||
}()
|
||||
}
|
||||
conn = res.Conn
|
||||
utp = res.UTP
|
||||
return
|
||||
}
|
||||
|
||||
// log.Printf("connected to %s", conn.RemoteAddr())
|
||||
err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
|
||||
func (me *Client) noLongerHalfOpen(t *torrent, addr string) {
|
||||
if _, ok := t.HalfOpen[addr]; !ok {
|
||||
panic("invariant broken")
|
||||
}
|
||||
delete(t.HalfOpen, addr)
|
||||
me.openNewConns(t)
|
||||
}
|
||||
|
||||
// Returns nil connection and nil error if no connection could be established
|
||||
// for valid reasons.
|
||||
func (me *Client) establishOutgoingConn(t *torrent, addr string) (c *connection, err error) {
|
||||
handshakesConnection := func(nc net.Conn, encrypted, utp bool) (c *connection, err error) {
|
||||
c = newConnection()
|
||||
c.conn = nc
|
||||
c.rw = nc
|
||||
c.encrypted = encrypted
|
||||
c.uTP = utp
|
||||
err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ok, err := me.initiateHandshakes(c, t)
|
||||
if !ok {
|
||||
c = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
nc, utp := me.dial(addr, t)
|
||||
if nc == nil {
|
||||
return
|
||||
}
|
||||
c, err = handshakesConnection(nc, true, utp)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return
|
||||
} else if c != nil {
|
||||
return
|
||||
}
|
||||
nc.Close()
|
||||
if utp {
|
||||
nc, err = me.dialUTP(addr, t)
|
||||
} else {
|
||||
nc, err = me.dialTCP(addr, t)
|
||||
}
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
|
||||
return
|
||||
}
|
||||
c, err = handshakesConnection(nc, false, utp)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Called to dial out and run a connection. The addr we're given is already
|
||||
// considered half-open.
|
||||
func (me *Client) outgoingConnection(t *torrent, addr string, ps peerSource) {
|
||||
c, err := me.establishOutgoingConn(t, addr)
|
||||
me.mu.Lock()
|
||||
defer me.mu.Unlock()
|
||||
// Don't release lock between here and addConnection, unless it's for
|
||||
// failure.
|
||||
me.noLongerHalfOpen(t, addr)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
return
|
||||
}
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
c.Discovery = ps
|
||||
err = me.runInitiatedHandshookConn(c, t)
|
||||
if err != nil {
|
||||
log.Print(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// The port number for incoming peer connections. 0 if the client isn't
|
||||
|
@ -843,7 +945,7 @@ func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions pee
|
|||
// Wait until writes complete before returning from handshake.
|
||||
err = <-writeDone
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error writing during handshake: %s", err)
|
||||
err = fmt.Errorf("error writing: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -889,94 +991,173 @@ func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions pee
|
|||
|
||||
// Wraps a raw connection and provides the interface we want for using the
|
||||
// connection in the message loop.
|
||||
type peerConn struct {
|
||||
net.Conn
|
||||
type deadlineReader struct {
|
||||
nc net.Conn
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
func (pc peerConn) Read(b []byte) (n int, err error) {
|
||||
func (me deadlineReader) Read(b []byte) (n int, err error) {
|
||||
// Keep-alives should be received every 2 mins. Give a bit of gracetime.
|
||||
err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
|
||||
err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error setting read deadline: %s", err)
|
||||
}
|
||||
n, err = pc.Conn.Read(b)
|
||||
n, err = me.r.Read(b)
|
||||
// Convert common errors into io.EOF.
|
||||
// if err != nil {
|
||||
// if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
|
||||
// err = io.EOF
|
||||
// } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
// if n != 0 {
|
||||
// panic(n)
|
||||
// }
|
||||
// err = io.EOF
|
||||
// }
|
||||
// }
|
||||
return
|
||||
}
|
||||
|
||||
type readWriter struct {
|
||||
io.Reader
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
|
||||
var protocol [len(pp.Protocol)]byte
|
||||
_, err = io.ReadFull(rw, protocol[:])
|
||||
if err != nil {
|
||||
if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
|
||||
err = io.EOF
|
||||
} else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
if n != 0 {
|
||||
panic(n)
|
||||
return
|
||||
}
|
||||
err = io.EOF
|
||||
ret = readWriter{
|
||||
io.MultiReader(bytes.NewReader(protocol[:]), rw),
|
||||
rw,
|
||||
}
|
||||
if string(protocol[:]) == pp.Protocol {
|
||||
return
|
||||
}
|
||||
encrypted = true
|
||||
ret, err = mse.ReceiveHandshake(ret, skeys)
|
||||
return
|
||||
}
|
||||
|
||||
func (cl *Client) receiveSkeys() (ret [][]byte) {
|
||||
for ih := range cl.torrents {
|
||||
ret = append(ret, ih[:])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
|
||||
if tcpConn, ok := sock.(*net.TCPConn); ok {
|
||||
tcpConn.SetLinger(0)
|
||||
}
|
||||
defer sock.Close()
|
||||
err = sock.SetDeadline(time.Now().Add(handshakeTimeout))
|
||||
func (me *Client) initiateHandshakes(c *connection, t *torrent) (ok bool, err error) {
|
||||
if c.encrypted {
|
||||
c.rw, err = mse.InitiateHandshake(c.rw, t.InfoHash[:], nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("couldn't set handshake deadline: %s", err)
|
||||
return
|
||||
}
|
||||
me.mu.Lock()
|
||||
me.handshaking++
|
||||
me.mu.Unlock()
|
||||
var rw io.ReadWriter = sock
|
||||
if torrent == nil {
|
||||
rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) {
|
||||
for ih := range me.torrents {
|
||||
ret = append(ret, ih[:])
|
||||
}
|
||||
ih, ok, err := me.connBTHandshake(c, &t.InfoHash)
|
||||
if ih != t.InfoHash {
|
||||
ok = false
|
||||
}
|
||||
return
|
||||
}())
|
||||
} else {
|
||||
rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:])
|
||||
}
|
||||
|
||||
func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) {
|
||||
cl.mu.Lock()
|
||||
skeys := cl.receiveSkeys()
|
||||
cl.mu.Unlock()
|
||||
// TODO: Filter unmatching skey errors.
|
||||
c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error during MSE handshake: %s", err)
|
||||
if err == mse.ErrNoSecretKeyMatch {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
hsRes, ok, err := handshake(rw, func() *InfoHash {
|
||||
if torrent == nil {
|
||||
return nil
|
||||
} else {
|
||||
return &torrent.InfoHash
|
||||
}
|
||||
}(), me.peerID, me.extensionBytes)
|
||||
me.mu.Lock()
|
||||
defer me.mu.Unlock()
|
||||
if me.handshaking == 0 {
|
||||
panic("handshake count invariant is broken")
|
||||
}
|
||||
me.handshaking--
|
||||
ih, ok, err := cl.connBTHandshake(c, nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error during handshake: %s", err)
|
||||
fmt.Errorf("error during bt handshake: %s", err)
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if hsRes.peerID == me.peerID {
|
||||
cl.mu.Lock()
|
||||
t = cl.torrents[ih]
|
||||
cl.mu.Unlock()
|
||||
return
|
||||
}
|
||||
torrent = me.torrent(hsRes.InfoHash)
|
||||
if torrent == nil {
|
||||
|
||||
// Returns !ok if handshake failed for valid reasons.
|
||||
func (cl *Client) connBTHandshake(c *connection, ih *InfoHash) (ret InfoHash, ok bool, err error) {
|
||||
res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
|
||||
if err != nil || !ok {
|
||||
return
|
||||
}
|
||||
sock.SetWriteDeadline(time.Time{})
|
||||
sock = peerConn{sock}
|
||||
conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw)
|
||||
defer conn.Close()
|
||||
conn.Discovery = discovery
|
||||
if !me.addConnection(torrent, conn) {
|
||||
ret = res.InfoHash
|
||||
c.PeerExtensionBytes = res.peerExtensionBytes
|
||||
c.PeerID = res.peerID
|
||||
c.completedHandshake = time.Now()
|
||||
return
|
||||
}
|
||||
|
||||
func (cl *Client) runInitiatedHandshookConn(c *connection, t *torrent) (err error) {
|
||||
if c.PeerID == cl.peerID {
|
||||
// Only if we initiated the connection is the remote address a
|
||||
// listen addr for a doppleganger.
|
||||
connsToSelf.Add(1)
|
||||
addr := c.conn.RemoteAddr().String()
|
||||
cl.dopplegangerAddrs[addr] = struct{}{}
|
||||
return
|
||||
}
|
||||
return cl.runHandshookConn(c, t)
|
||||
}
|
||||
|
||||
func (cl *Client) runReceivedConn(c *connection) (err error) {
|
||||
err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
t, err := cl.receiveHandshakes(c)
|
||||
if err != nil {
|
||||
logonce.Stderr.Printf("error receiving handshakes: %s", err)
|
||||
err = nil
|
||||
return
|
||||
}
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
cl.mu.Lock()
|
||||
defer cl.mu.Unlock()
|
||||
if c.PeerID == cl.peerID {
|
||||
return
|
||||
}
|
||||
return cl.runHandshookConn(c, t)
|
||||
}
|
||||
|
||||
func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
|
||||
c.conn.SetWriteDeadline(time.Time{})
|
||||
c.rw = readWriter{
|
||||
deadlineReader{c.conn, c.rw},
|
||||
c.rw,
|
||||
}
|
||||
if !cl.addConnection(t, c) {
|
||||
return
|
||||
}
|
||||
defer cl.dropConnection(t, c)
|
||||
go c.writer()
|
||||
go c.writeOptimizer(time.Minute)
|
||||
cl.sendInitialMessages(c, t)
|
||||
if t.haveInfo() {
|
||||
t.initRequestOrdering(c)
|
||||
}
|
||||
err = cl.connectionLoop(t, c)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error during connection loop: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
|
||||
if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
|
||||
conn.Post(pp.Message{
|
||||
Type: pp.Extended,
|
||||
|
@ -1035,24 +1216,19 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
|
|||
Port: uint16(AddrPort(me.dHT.LocalAddr())),
|
||||
})
|
||||
}
|
||||
if torrent.haveInfo() {
|
||||
torrent.initRequestOrdering(conn)
|
||||
}
|
||||
err = me.connectionLoop(torrent, conn)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
|
||||
}
|
||||
me.dropConnection(torrent, conn)
|
||||
return
|
||||
}
|
||||
|
||||
// Randomizes the piece order for this connection. Every connection will be
|
||||
// given a different ordering. Having it stored per connection saves having to
|
||||
// randomize during request filling, and constantly recalculate the ordering
|
||||
// based on piece priorities.
|
||||
func (t *torrent) initRequestOrdering(c *connection) {
|
||||
if c.pieceRequestOrder != nil || c.piecePriorities != nil {
|
||||
panic("double init of request ordering")
|
||||
}
|
||||
c.piecePriorities = mathRand.Perm(t.numPieces())
|
||||
c.pieceRequestOrder = pieceordering.New()
|
||||
for i := 0; i < t.numPieces(); i++ {
|
||||
for i := range iter.N(t.Info.NumPieces()) {
|
||||
if !c.PeerHasPiece(i) {
|
||||
continue
|
||||
}
|
||||
|
@ -1527,7 +1703,8 @@ func (t *torrent) needData() bool {
|
|||
|
||||
// TODO: I'm sure there's something here to do with seeding.
|
||||
func (t *torrent) badConn(c *connection) bool {
|
||||
if time.Now().Sub(c.completedHandshake) < 30*time.Second {
|
||||
// A 30 second grace for initial messages to go through.
|
||||
if time.Since(c.completedHandshake) < 30*time.Second {
|
||||
return false
|
||||
}
|
||||
if !t.haveInfo() {
|
||||
|
@ -1546,7 +1723,7 @@ func (t *torrent) numGoodConns() (num int) {
|
|||
}
|
||||
|
||||
func (me *Client) wantConns(t *torrent) bool {
|
||||
if !t.needData() && me.noUpload {
|
||||
if me.noUpload && !t.needData() {
|
||||
return false
|
||||
}
|
||||
if t.numGoodConns() >= socketsPerTorrent {
|
||||
|
@ -1596,12 +1773,12 @@ func (me *Client) addPeers(t *torrent, peers []Peer) {
|
|||
me.openNewConns(t)
|
||||
}
|
||||
|
||||
func (cl *Client) torrentFileCachePath(ih InfoHash) string {
|
||||
func (cl *Client) cachedMetaInfoFilename(ih InfoHash) string {
|
||||
return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
|
||||
}
|
||||
|
||||
func (cl *Client) saveTorrentFile(t *torrent) error {
|
||||
path := cl.torrentFileCachePath(t.InfoHash)
|
||||
path := cl.cachedMetaInfoFilename(t.InfoHash)
|
||||
os.MkdirAll(filepath.Dir(path), 0777)
|
||||
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
|
||||
if err != nil {
|
||||
|
@ -1682,7 +1859,7 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e
|
|||
// Prepare a Torrent without any attachment to a Client. That means we can
|
||||
// initialize fields all fields that don't require the Client without locking
|
||||
// it.
|
||||
func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
|
||||
func newTorrent(ih InfoHash) (t *torrent, err error) {
|
||||
t = &torrent{
|
||||
InfoHash: ih,
|
||||
Peers: make(map[peersKey]Peer),
|
||||
|
@ -1696,7 +1873,6 @@ func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *tor
|
|||
}
|
||||
t.wantPeers.L = &t.stateMu
|
||||
t.GotMetainfo = t.gotMetainfo
|
||||
t.addTrackers(announceList)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -1763,7 +1939,9 @@ func (t Torrent) NumPieces() int {
|
|||
}
|
||||
|
||||
func (t Torrent) Drop() {
|
||||
t.cl.mu.Lock()
|
||||
t.cl.dropTorrent(t.InfoHash)
|
||||
t.cl.mu.Unlock()
|
||||
}
|
||||
|
||||
type File struct {
|
||||
|
@ -1959,12 +2137,13 @@ func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
|
|||
return me.cl.torrentReadAt(me.torrent, off, p)
|
||||
}
|
||||
|
||||
// Returns nil metainfo if it isn't in the cache.
|
||||
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
|
||||
// metainfo has the correct infohash.
|
||||
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
|
||||
if cl.config.DisableMetainfoCache {
|
||||
return
|
||||
}
|
||||
f, err := os.Open(cl.torrentFileCachePath(ih))
|
||||
f, err := os.Open(cl.cachedMetaInfoFilename(ih))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = nil
|
||||
|
|
|
@ -30,6 +30,7 @@ const (
|
|||
type connection struct {
|
||||
conn net.Conn
|
||||
rw io.ReadWriter // The real slim shady
|
||||
encrypted bool
|
||||
Discovery peerSource
|
||||
uTP bool
|
||||
closing chan struct{}
|
||||
|
@ -37,7 +38,9 @@ type connection struct {
|
|||
post chan pp.Message
|
||||
writeCh chan []byte
|
||||
|
||||
// The connections preferred order to download pieces.
|
||||
piecePriorities []int
|
||||
// The piece request order based on piece priorities.
|
||||
pieceRequestOrder *pieceordering.Instance
|
||||
|
||||
UnwantedChunksReceived int
|
||||
|
@ -69,26 +72,16 @@ type connection struct {
|
|||
PeerClientName string
|
||||
}
|
||||
|
||||
func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) {
|
||||
func newConnection() (c *connection) {
|
||||
c = &connection{
|
||||
conn: sock,
|
||||
rw: rw,
|
||||
uTP: uTP,
|
||||
|
||||
Choked: true,
|
||||
PeerChoked: true,
|
||||
PeerMaxRequests: 250,
|
||||
PeerExtensionBytes: peb,
|
||||
PeerID: peerID,
|
||||
|
||||
closing: make(chan struct{}),
|
||||
writeCh: make(chan []byte),
|
||||
post: make(chan pp.Message),
|
||||
|
||||
completedHandshake: time.Now(),
|
||||
}
|
||||
go c.writer()
|
||||
go c.writeOptimizer(time.Minute)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -100,6 +93,8 @@ func (cn *connection) localAddr() net.Addr {
|
|||
return cn.conn.LocalAddr()
|
||||
}
|
||||
|
||||
// Adjust piece position in the request order for this connection based on the
|
||||
// given piece priority.
|
||||
func (cn *connection) pendPiece(piece int, priority piecePriority) {
|
||||
if priority == piecePriorityNone {
|
||||
cn.pieceRequestOrder.DeletePiece(piece)
|
||||
|
|
14
torrent.go
14
torrent.go
|
@ -66,14 +66,18 @@ type torrent struct {
|
|||
|
||||
InfoHash InfoHash
|
||||
Pieces []*piece
|
||||
// Total length of the torrent in bytes. Stored because it's not O(1) to
|
||||
// get this from the info dict.
|
||||
length int64
|
||||
|
||||
data StatefulData
|
||||
|
||||
// The info dict. Nil if we don't have it.
|
||||
Info *metainfo.Info
|
||||
// Active peer connections.
|
||||
// Active peer connections, running message stream loops.
|
||||
Conns []*connection
|
||||
// Set of addrs to which we're attempting to connect.
|
||||
// Set of addrs to which we're attempting to connect. Connections are
|
||||
// half-open until all handshakes are completed.
|
||||
HalfOpen map[string]struct{}
|
||||
|
||||
// Reserve of peers to connect to. A peer can be both here and in the
|
||||
|
@ -85,10 +89,15 @@ type torrent struct {
|
|||
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
|
||||
// mirror their respective URLs from the announce-list metainfo key.
|
||||
Trackers [][]tracker.Client
|
||||
// Name used if the info name isn't available.
|
||||
DisplayName string
|
||||
// The bencoded bytes of the info dict.
|
||||
MetaData []byte
|
||||
// Each element corresponds to the 16KiB metadata pieces. If true, we have
|
||||
// received that piece.
|
||||
metadataHave []bool
|
||||
|
||||
// Closed when .Info is set.
|
||||
gotMetainfo chan struct{}
|
||||
GotMetainfo <-chan struct{}
|
||||
|
||||
|
@ -195,6 +204,7 @@ func (t *torrent) numConnsUnchoked() (num int) {
|
|||
return
|
||||
}
|
||||
|
||||
// There's a connection to that address already.
|
||||
func (t *torrent) addrActive(addr string) bool {
|
||||
if _, ok := t.HalfOpen[addr]; ok {
|
||||
return true
|
||||
|
|
|
@ -45,7 +45,7 @@ func TestTorrentRequest(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTorrentDoubleClose(t *testing.T) {
|
||||
tt, err := newTorrent(InfoHash{}, nil, 0)
|
||||
tt, err := newTorrent(InfoHash{})
|
||||
tt.pruneTimer = time.NewTimer(0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
Loading…
Reference in New Issue