Move Client.connectionLoop to connection.mainReadLoop
This commit is contained in:
parent
5d7882a79d
commit
2fc814b5eb
210
client.go
210
client.go
@ -929,7 +929,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
|
|||||||
defer t.dropConnection(c)
|
defer t.dropConnection(c)
|
||||||
go c.writer(time.Minute)
|
go c.writer(time.Minute)
|
||||||
cl.sendInitialMessages(c, t)
|
cl.sendInitialMessages(c, t)
|
||||||
err := cl.connectionLoop(t, c)
|
err := c.mainReadLoop()
|
||||||
if err != nil && cl.config.Debug {
|
if err != nil && cl.config.Debug {
|
||||||
log.Printf("error during connection loop: %s", err)
|
log.Printf("error during connection loop: %s", err)
|
||||||
}
|
}
|
||||||
@ -1118,214 +1118,6 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Processes incoming bittorrent messages. The client lock is held upon entry
|
|
||||||
// and exit. Returning will end the connection.
|
|
||||||
func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
|
|
||||||
decoder := pp.Decoder{
|
|
||||||
R: bufio.NewReader(c.rw),
|
|
||||||
MaxLength: 256 * 1024,
|
|
||||||
}
|
|
||||||
for {
|
|
||||||
cl.mu.Unlock()
|
|
||||||
var msg pp.Message
|
|
||||||
err := decoder.Decode(&msg)
|
|
||||||
cl.mu.Lock()
|
|
||||||
if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
c.readMsg(&msg)
|
|
||||||
c.lastMessageReceived = time.Now()
|
|
||||||
if msg.Keepalive {
|
|
||||||
receivedKeepalives.Add(1)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
|
|
||||||
switch msg.Type {
|
|
||||||
case pp.Choke:
|
|
||||||
c.PeerChoked = true
|
|
||||||
c.Requests = nil
|
|
||||||
// We can then reset our interest.
|
|
||||||
c.updateRequests()
|
|
||||||
case pp.Reject:
|
|
||||||
cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
|
|
||||||
c.updateRequests()
|
|
||||||
case pp.Unchoke:
|
|
||||||
c.PeerChoked = false
|
|
||||||
cl.peerUnchoked(t, c)
|
|
||||||
case pp.Interested:
|
|
||||||
c.PeerInterested = true
|
|
||||||
cl.upload(t, c)
|
|
||||||
case pp.NotInterested:
|
|
||||||
c.PeerInterested = false
|
|
||||||
c.Choke()
|
|
||||||
case pp.Have:
|
|
||||||
err = c.peerSentHave(int(msg.Index))
|
|
||||||
case pp.Request:
|
|
||||||
if c.Choked {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !c.PeerInterested {
|
|
||||||
err = errors.New("peer sent request but isn't interested")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !t.havePiece(msg.Index.Int()) {
|
|
||||||
// This isn't necessarily them screwing up. We can drop pieces
|
|
||||||
// from our storage, and can't communicate this to peers
|
|
||||||
// except by reconnecting.
|
|
||||||
requestsReceivedForMissingPieces.Add(1)
|
|
||||||
err = errors.New("peer requested piece we don't have")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if c.PeerRequests == nil {
|
|
||||||
c.PeerRequests = make(map[request]struct{}, maxRequests)
|
|
||||||
}
|
|
||||||
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
|
|
||||||
cl.upload(t, c)
|
|
||||||
case pp.Cancel:
|
|
||||||
req := newRequest(msg.Index, msg.Begin, msg.Length)
|
|
||||||
if !c.PeerCancel(req) {
|
|
||||||
unexpectedCancels.Add(1)
|
|
||||||
}
|
|
||||||
case pp.Bitfield:
|
|
||||||
err = c.peerSentBitfield(msg.Bitfield)
|
|
||||||
case pp.HaveAll:
|
|
||||||
err = c.peerSentHaveAll()
|
|
||||||
case pp.HaveNone:
|
|
||||||
err = c.peerSentHaveNone()
|
|
||||||
case pp.Piece:
|
|
||||||
cl.downloadedChunk(t, c, &msg)
|
|
||||||
case pp.Extended:
|
|
||||||
switch msg.ExtendedID {
|
|
||||||
case pp.HandshakeExtendedID:
|
|
||||||
// TODO: Create a bencode struct for this.
|
|
||||||
var d map[string]interface{}
|
|
||||||
err = bencode.Unmarshal(msg.ExtendedPayload, &d)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error decoding extended message payload: %s", err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
// log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
|
|
||||||
if reqq, ok := d["reqq"]; ok {
|
|
||||||
if i, ok := reqq.(int64); ok {
|
|
||||||
c.PeerMaxRequests = int(i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if v, ok := d["v"]; ok {
|
|
||||||
c.PeerClientName = v.(string)
|
|
||||||
}
|
|
||||||
m, ok := d["m"]
|
|
||||||
if !ok {
|
|
||||||
err = errors.New("handshake missing m item")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
mTyped, ok := m.(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
err = errors.New("handshake m value is not dict")
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if c.PeerExtensionIDs == nil {
|
|
||||||
c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
|
|
||||||
}
|
|
||||||
for name, v := range mTyped {
|
|
||||||
id, ok := v.(int64)
|
|
||||||
if !ok {
|
|
||||||
log.Printf("bad handshake m item extension ID type: %T", v)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if id == 0 {
|
|
||||||
delete(c.PeerExtensionIDs, name)
|
|
||||||
} else {
|
|
||||||
if c.PeerExtensionIDs[name] == 0 {
|
|
||||||
supportedExtensionMessages.Add(name, 1)
|
|
||||||
}
|
|
||||||
c.PeerExtensionIDs[name] = byte(id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metadata_sizeUntyped, ok := d["metadata_size"]
|
|
||||||
if ok {
|
|
||||||
metadata_size, ok := metadata_sizeUntyped.(int64)
|
|
||||||
if !ok {
|
|
||||||
log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
|
|
||||||
} else {
|
|
||||||
err = t.setMetadataSize(metadata_size)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error setting metadata size to %d", metadata_size)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
|
|
||||||
c.requestPendingMetadata()
|
|
||||||
}
|
|
||||||
case metadataExtendedId:
|
|
||||||
err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error handling metadata extension message: %s", err)
|
|
||||||
}
|
|
||||||
case pexExtendedId:
|
|
||||||
if cl.config.DisablePEX {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
var pexMsg peerExchangeMessage
|
|
||||||
err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("error unmarshalling PEX message: %s", err)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
cl.mu.Lock()
|
|
||||||
t.addPeers(func() (ret []Peer) {
|
|
||||||
for i, cp := range pexMsg.Added {
|
|
||||||
p := Peer{
|
|
||||||
IP: make([]byte, 4),
|
|
||||||
Port: cp.Port,
|
|
||||||
Source: peerSourcePEX,
|
|
||||||
}
|
|
||||||
if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
|
|
||||||
p.SupportsEncryption = true
|
|
||||||
}
|
|
||||||
missinggo.CopyExact(p.IP, cp.IP[:])
|
|
||||||
ret = append(ret, p)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}())
|
|
||||||
cl.mu.Unlock()
|
|
||||||
}()
|
|
||||||
default:
|
|
||||||
err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
// That client uses its own extension IDs for outgoing message
|
|
||||||
// types, which is incorrect.
|
|
||||||
if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
|
|
||||||
strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case pp.Port:
|
|
||||||
if cl.dHT == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if msg.Port != 0 {
|
|
||||||
pingAddr.Port = int(msg.Port)
|
|
||||||
}
|
|
||||||
cl.dHT.Ping(pingAddr)
|
|
||||||
default:
|
|
||||||
err = fmt.Errorf("received unknown message type: %#v", msg.Type)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cl *Client) openNewConns(t *Torrent) {
|
func (cl *Client) openNewConns(t *Torrent) {
|
||||||
defer t.updateWantPeersEvent()
|
defer t.updateWantPeersEvent()
|
||||||
for len(t.peers) != 0 {
|
for len(t.peers) != 0 {
|
||||||
|
212
connection.go
212
connection.go
@ -8,9 +8,11 @@ import (
|
|||||||
"expvar"
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -690,3 +692,213 @@ func (c *connection) lastHelpful() (ret time.Time) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Processes incoming bittorrent messages. The client lock is held upon entry
|
||||||
|
// and exit. Returning will end the connection.
|
||||||
|
func (c *connection) mainReadLoop() error {
|
||||||
|
t := c.t
|
||||||
|
cl := t.cl
|
||||||
|
decoder := pp.Decoder{
|
||||||
|
R: bufio.NewReader(c.rw),
|
||||||
|
MaxLength: 256 * 1024,
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
cl.mu.Unlock()
|
||||||
|
var msg pp.Message
|
||||||
|
err := decoder.Decode(&msg)
|
||||||
|
cl.mu.Lock()
|
||||||
|
if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.readMsg(&msg)
|
||||||
|
c.lastMessageReceived = time.Now()
|
||||||
|
if msg.Keepalive {
|
||||||
|
receivedKeepalives.Add(1)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
|
||||||
|
switch msg.Type {
|
||||||
|
case pp.Choke:
|
||||||
|
c.PeerChoked = true
|
||||||
|
c.Requests = nil
|
||||||
|
// We can then reset our interest.
|
||||||
|
c.updateRequests()
|
||||||
|
case pp.Reject:
|
||||||
|
cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
|
||||||
|
c.updateRequests()
|
||||||
|
case pp.Unchoke:
|
||||||
|
c.PeerChoked = false
|
||||||
|
cl.peerUnchoked(t, c)
|
||||||
|
case pp.Interested:
|
||||||
|
c.PeerInterested = true
|
||||||
|
cl.upload(t, c)
|
||||||
|
case pp.NotInterested:
|
||||||
|
c.PeerInterested = false
|
||||||
|
c.Choke()
|
||||||
|
case pp.Have:
|
||||||
|
err = c.peerSentHave(int(msg.Index))
|
||||||
|
case pp.Request:
|
||||||
|
if c.Choked {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !c.PeerInterested {
|
||||||
|
err = errors.New("peer sent request but isn't interested")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !t.havePiece(msg.Index.Int()) {
|
||||||
|
// This isn't necessarily them screwing up. We can drop pieces
|
||||||
|
// from our storage, and can't communicate this to peers
|
||||||
|
// except by reconnecting.
|
||||||
|
requestsReceivedForMissingPieces.Add(1)
|
||||||
|
err = errors.New("peer requested piece we don't have")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if c.PeerRequests == nil {
|
||||||
|
c.PeerRequests = make(map[request]struct{}, maxRequests)
|
||||||
|
}
|
||||||
|
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
|
||||||
|
cl.upload(t, c)
|
||||||
|
case pp.Cancel:
|
||||||
|
req := newRequest(msg.Index, msg.Begin, msg.Length)
|
||||||
|
if !c.PeerCancel(req) {
|
||||||
|
unexpectedCancels.Add(1)
|
||||||
|
}
|
||||||
|
case pp.Bitfield:
|
||||||
|
err = c.peerSentBitfield(msg.Bitfield)
|
||||||
|
case pp.HaveAll:
|
||||||
|
err = c.peerSentHaveAll()
|
||||||
|
case pp.HaveNone:
|
||||||
|
err = c.peerSentHaveNone()
|
||||||
|
case pp.Piece:
|
||||||
|
cl.downloadedChunk(t, c, &msg)
|
||||||
|
case pp.Extended:
|
||||||
|
switch msg.ExtendedID {
|
||||||
|
case pp.HandshakeExtendedID:
|
||||||
|
// TODO: Create a bencode struct for this.
|
||||||
|
var d map[string]interface{}
|
||||||
|
err = bencode.Unmarshal(msg.ExtendedPayload, &d)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error decoding extended message payload: %s", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
|
||||||
|
if reqq, ok := d["reqq"]; ok {
|
||||||
|
if i, ok := reqq.(int64); ok {
|
||||||
|
c.PeerMaxRequests = int(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if v, ok := d["v"]; ok {
|
||||||
|
c.PeerClientName = v.(string)
|
||||||
|
}
|
||||||
|
m, ok := d["m"]
|
||||||
|
if !ok {
|
||||||
|
err = errors.New("handshake missing m item")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
mTyped, ok := m.(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
err = errors.New("handshake m value is not dict")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if c.PeerExtensionIDs == nil {
|
||||||
|
c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
|
||||||
|
}
|
||||||
|
for name, v := range mTyped {
|
||||||
|
id, ok := v.(int64)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("bad handshake m item extension ID type: %T", v)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if id == 0 {
|
||||||
|
delete(c.PeerExtensionIDs, name)
|
||||||
|
} else {
|
||||||
|
if c.PeerExtensionIDs[name] == 0 {
|
||||||
|
supportedExtensionMessages.Add(name, 1)
|
||||||
|
}
|
||||||
|
c.PeerExtensionIDs[name] = byte(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
metadata_sizeUntyped, ok := d["metadata_size"]
|
||||||
|
if ok {
|
||||||
|
metadata_size, ok := metadata_sizeUntyped.(int64)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
|
||||||
|
} else {
|
||||||
|
err = t.setMetadataSize(metadata_size)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error setting metadata size to %d", metadata_size)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
|
||||||
|
c.requestPendingMetadata()
|
||||||
|
}
|
||||||
|
case metadataExtendedId:
|
||||||
|
err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error handling metadata extension message: %s", err)
|
||||||
|
}
|
||||||
|
case pexExtendedId:
|
||||||
|
if cl.config.DisablePEX {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
var pexMsg peerExchangeMessage
|
||||||
|
err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("error unmarshalling PEX message: %s", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
cl.mu.Lock()
|
||||||
|
t.addPeers(func() (ret []Peer) {
|
||||||
|
for i, cp := range pexMsg.Added {
|
||||||
|
p := Peer{
|
||||||
|
IP: make([]byte, 4),
|
||||||
|
Port: cp.Port,
|
||||||
|
Source: peerSourcePEX,
|
||||||
|
}
|
||||||
|
if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
|
||||||
|
p.SupportsEncryption = true
|
||||||
|
}
|
||||||
|
missinggo.CopyExact(p.IP, cp.IP[:])
|
||||||
|
ret = append(ret, p)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}())
|
||||||
|
cl.mu.Unlock()
|
||||||
|
}()
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// That client uses its own extension IDs for outgoing message
|
||||||
|
// types, which is incorrect.
|
||||||
|
if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
|
||||||
|
strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case pp.Port:
|
||||||
|
if cl.dHT == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if msg.Port != 0 {
|
||||||
|
pingAddr.Port = int(msg.Port)
|
||||||
|
}
|
||||||
|
cl.dHT.Ping(pingAddr)
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("received unknown message type: %#v", msg.Type)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user