Tidy up extension bytes handling; don't close conn from handshake writer; force protocol encryption for now

This commit is contained in:
Matt Joiner 2015-03-13 06:21:13 +11:00
parent d57f5896d4
commit 203da0aab0
3 changed files with 68 additions and 24 deletions

View File

@ -38,6 +38,8 @@ import (
"syscall"
"time"
"bitbucket.org/anacrolix/go.torrent/mse"
"bitbucket.org/anacrolix/go.torrent/data"
filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
"bitbucket.org/anacrolix/go.torrent/dht"
@ -77,8 +79,8 @@ const (
//
// Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
// DHT: http://www.bittorrent.org/beps/bep_0005.html
// Fast Extension: http://bittorrent.org/beps/bep_0006.html
extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
// Fast Extension: http://bittorrent.org/beps/bep_0006.html ([7]|=4)
defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x05"
socketsPerTorrent = 40
torrentPeersHighWater = 200
@ -87,7 +89,7 @@ const (
// Limit how long handshake can take. This is to reduce the lingering
// impact of a few bad apples. 4s loses 1% of successful handshakes that
// are obtained with 60s timeout, and 5% of unsuccessful handshakes.
handshakeTimeout = 4 * time.Second
handshakeTimeout = 45 * time.Second
pruneInterval = 10 * time.Second
)
@ -128,6 +130,7 @@ type Client struct {
_configDir string
config Config
pruneTimer *time.Timer
extensionBytes peerExtensionBytes
torrentDataOpener TorrentDataOpener
@ -469,6 +472,7 @@ func NewClient(cfg *Config) (cl *Client, err error) {
quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent),
}
CopyExact(&cl.extensionBytes, defaultExtensionBytes)
cl.event.L = &cl.mu
if cfg.TorrentDataOpener != nil {
cl.torrentDataOpener = cfg.TorrentDataOpener
@ -777,12 +781,11 @@ func addrCompactIP(addr net.Addr) (string, error) {
return string(ip.To16()), nil
}
func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
var err error
for b := range bb {
_, err = w.Write(b)
if err != nil {
w.Close()
break
}
}
@ -794,6 +797,18 @@ type (
peerID [20]byte
)
func (me *peerExtensionBytes) SupportsExtended() bool {
return me[5]&0x10 != 0
}
func (me *peerExtensionBytes) SupportsDHT() bool {
return me[7]&0x01 != 0
}
func (me *peerExtensionBytes) SupportsFast() bool {
return me[7]&0x04 != 0
}
type handshakeResult struct {
peerExtensionBytes
peerID
@ -804,7 +819,7 @@ type handshakeResult struct {
// peer initiated the connection. Returns ok if the handshake was successful,
// and err if there was an unexpected condition other than the peer simply
// abandoning the handshake.
func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
// Bytes to be sent to the peer. Should never block the sender.
postCh := make(chan []byte, 4)
// A single error value sent when the writer completes.
@ -836,7 +851,7 @@ func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res hand
}
post([]byte(pp.Protocol))
post([]byte(extensionBytes))
post(extensions[:])
if ih != nil { // We already know what we want.
post(ih[:])
post(peerID[:])
@ -907,13 +922,28 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
me.mu.Lock()
me.handshaking++
me.mu.Unlock()
hsRes, ok, err := handshake(sock, func() *InfoHash {
var rw io.ReadWriter = sock
if torrent == nil {
rw, err = mse.ReceiveHandshake(sock, func() (ret [][]byte) {
for ih := range me.torrents {
ret = append(ret, ih[:])
}
return
}())
} else {
rw, err = mse.InitiateHandshake(sock, torrent.InfoHash[:])
}
if err != nil {
err = fmt.Errorf("error during MSE handshake: %s", err)
return
}
hsRes, ok, err := handshake(rw, func() *InfoHash {
if torrent == nil {
return nil
} else {
return &torrent.InfoHash
}
}(), me.peerID)
}(), me.peerID, me.extensionBytes)
me.mu.Lock()
defer me.mu.Unlock()
if me.handshaking == 0 {
@ -936,13 +966,13 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
}
sock.SetWriteDeadline(time.Time{})
sock = peerConn{sock}
conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP, rw)
defer conn.Close()
conn.Discovery = discovery
if !me.addConnection(torrent, conn) {
return
}
if conn.PeerExtensionBytes[5]&0x10 != 0 {
if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
conn.Post(pp.Message{
Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID,
@ -969,7 +999,7 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
if p := me.incomingPeerPort(); p != 0 {
d["p"] = p
}
yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
yourip, err := addrCompactIP(conn.remoteAddr())
if err != nil {
log.Printf("error calculating yourip field value in extension handshake: %s", err)
} else {
@ -989,8 +1019,12 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
Type: pp.Bitfield,
Bitfield: torrent.bitfield(),
})
} else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
conn.Post(pp.Message{
Type: pp.HaveNone,
})
}
if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
conn.Post(pp.Message{
Type: pp.Port,
Port: uint16(AddrPort(me.dHT.LocalAddr())),
@ -998,7 +1032,6 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
}
if torrent.haveInfo() {
torrent.initRequestOrdering(conn)
me.replenishConnRequests(torrent, conn)
}
err = me.connectionLoop(torrent, conn)
if err != nil {
@ -1189,7 +1222,7 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) {
// and exit.
func (me *Client) connectionLoop(t *torrent, c *connection) error {
decoder := pp.Decoder{
R: bufio.NewReader(c.Socket),
R: bufio.NewReader(c.rw),
MaxLength: 256 * 1024,
}
for {
@ -1222,6 +1255,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
me.replenishConnRequests(t, c)
case pp.Reject:
me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
me.replenishConnRequests(t, c)
case pp.Unchoke:
c.PeerChoked = false
me.peerUnchoked(t, c)
@ -1409,7 +1443,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
if me.dHT == nil {
break
}
pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
if err != nil {
panic(err)
}

View File

@ -28,7 +28,8 @@ const (
// Maintains the state of a connection with a peer.
type connection struct {
Socket net.Conn
conn net.Conn
rw io.ReadWriter // The real slim shady
Discovery peerSource
uTP bool
closing chan struct{}
@ -68,10 +69,11 @@ type connection struct {
PeerClientName string
}
func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool, rw io.ReadWriter) (c *connection) {
c = &connection{
Socket: sock,
uTP: uTP,
conn: sock,
rw: rw,
uTP: uTP,
Choked: true,
PeerChoked: true,
@ -90,6 +92,14 @@ func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP b
return
}
func (cn *connection) remoteAddr() net.Addr {
return cn.conn.RemoteAddr()
}
func (cn *connection) localAddr() net.Addr {
return cn.conn.LocalAddr()
}
func (cn *connection) pendPiece(piece int, priority piecePriority) {
if priority == piecePriorityNone {
cn.pieceRequestOrder.DeletePiece(piece)
@ -184,7 +194,7 @@ func eventAgeString(t time.Time) string {
func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
// \t isn't preserved in <pre> blocks?
fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
fmt.Fprintf(w, "%s\n %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.localAddr(), cn.remoteAddr()), cn.completedString(t), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
c := func(b byte) {
fmt.Fprintf(w, "%c", b)
}
@ -224,7 +234,7 @@ func (c *connection) Close() {
}
close(c.closing)
// TODO: This call blocks sometimes, why?
go c.Socket.Close()
go c.conn.Close()
}
func (c *connection) PeerHasPiece(piece int) bool {
@ -347,7 +357,7 @@ func (c *connection) SetInterested(interested bool) {
// Writes buffers to the socket from the write channel.
func (conn *connection) writer() {
// Reduce write syscalls.
buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
// Receives when buf is not empty.
notEmpty := make(chan struct{}, 1)
for {

View File

@ -200,7 +200,7 @@ func (t *torrent) addrActive(addr string) bool {
return true
}
for _, c := range t.Conns {
if c.Socket.RemoteAddr().String() == addr {
if c.remoteAddr().String() == addr {
return true
}
}