From 2fc814b5eb308c0e7f3cd72069309850734a0074 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 11 Sep 2016 14:32:56 +1000 Subject: [PATCH] Move Client.connectionLoop to connection.mainReadLoop --- client.go | 210 +------------------------------------------------ connection.go | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+), 209 deletions(-) diff --git a/client.go b/client.go index d0d34ca8..79409d72 100644 --- a/client.go +++ b/client.go @@ -929,7 +929,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) { defer t.dropConnection(c) go c.writer(time.Minute) cl.sendInitialMessages(c, t) - err := cl.connectionLoop(t, c) + err := c.mainReadLoop() if err != nil && cl.config.Debug { log.Printf("error during connection loop: %s", err) } @@ -1118,214 +1118,6 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error { return nil } -// Processes incoming bittorrent messages. The client lock is held upon entry -// and exit. Returning will end the connection. -func (cl *Client) connectionLoop(t *Torrent, c *connection) error { - decoder := pp.Decoder{ - R: bufio.NewReader(c.rw), - MaxLength: 256 * 1024, - } - for { - cl.mu.Unlock() - var msg pp.Message - err := decoder.Decode(&msg) - cl.mu.Lock() - if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF { - return nil - } - if err != nil { - return err - } - c.readMsg(&msg) - c.lastMessageReceived = time.Now() - if msg.Keepalive { - receivedKeepalives.Add(1) - continue - } - receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1) - switch msg.Type { - case pp.Choke: - c.PeerChoked = true - c.Requests = nil - // We can then reset our interest. - c.updateRequests() - case pp.Reject: - cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length)) - c.updateRequests() - case pp.Unchoke: - c.PeerChoked = false - cl.peerUnchoked(t, c) - case pp.Interested: - c.PeerInterested = true - cl.upload(t, c) - case pp.NotInterested: - c.PeerInterested = false - c.Choke() - case pp.Have: - err = c.peerSentHave(int(msg.Index)) - case pp.Request: - if c.Choked { - break - } - if !c.PeerInterested { - err = errors.New("peer sent request but isn't interested") - break - } - if !t.havePiece(msg.Index.Int()) { - // This isn't necessarily them screwing up. We can drop pieces - // from our storage, and can't communicate this to peers - // except by reconnecting. - requestsReceivedForMissingPieces.Add(1) - err = errors.New("peer requested piece we don't have") - break - } - if c.PeerRequests == nil { - c.PeerRequests = make(map[request]struct{}, maxRequests) - } - c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{} - cl.upload(t, c) - case pp.Cancel: - req := newRequest(msg.Index, msg.Begin, msg.Length) - if !c.PeerCancel(req) { - unexpectedCancels.Add(1) - } - case pp.Bitfield: - err = c.peerSentBitfield(msg.Bitfield) - case pp.HaveAll: - err = c.peerSentHaveAll() - case pp.HaveNone: - err = c.peerSentHaveNone() - case pp.Piece: - cl.downloadedChunk(t, c, &msg) - case pp.Extended: - switch msg.ExtendedID { - case pp.HandshakeExtendedID: - // TODO: Create a bencode struct for this. - var d map[string]interface{} - err = bencode.Unmarshal(msg.ExtendedPayload, &d) - if err != nil { - err = fmt.Errorf("error decoding extended message payload: %s", err) - break - } - // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d) - if reqq, ok := d["reqq"]; ok { - if i, ok := reqq.(int64); ok { - c.PeerMaxRequests = int(i) - } - } - if v, ok := d["v"]; ok { - c.PeerClientName = v.(string) - } - m, ok := d["m"] - if !ok { - err = errors.New("handshake missing m item") - break - } - mTyped, ok := m.(map[string]interface{}) - if !ok { - err = errors.New("handshake m value is not dict") - break - } - if c.PeerExtensionIDs == nil { - c.PeerExtensionIDs = make(map[string]byte, len(mTyped)) - } - for name, v := range mTyped { - id, ok := v.(int64) - if !ok { - log.Printf("bad handshake m item extension ID type: %T", v) - continue - } - if id == 0 { - delete(c.PeerExtensionIDs, name) - } else { - if c.PeerExtensionIDs[name] == 0 { - supportedExtensionMessages.Add(name, 1) - } - c.PeerExtensionIDs[name] = byte(id) - } - } - metadata_sizeUntyped, ok := d["metadata_size"] - if ok { - metadata_size, ok := metadata_sizeUntyped.(int64) - if !ok { - log.Printf("bad metadata_size type: %T", metadata_sizeUntyped) - } else { - err = t.setMetadataSize(metadata_size) - if err != nil { - err = fmt.Errorf("error setting metadata size to %d", metadata_size) - break - } - } - } - if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok { - c.requestPendingMetadata() - } - case metadataExtendedId: - err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c) - if err != nil { - err = fmt.Errorf("error handling metadata extension message: %s", err) - } - case pexExtendedId: - if cl.config.DisablePEX { - break - } - var pexMsg peerExchangeMessage - err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg) - if err != nil { - err = fmt.Errorf("error unmarshalling PEX message: %s", err) - break - } - go func() { - cl.mu.Lock() - t.addPeers(func() (ret []Peer) { - for i, cp := range pexMsg.Added { - p := Peer{ - IP: make([]byte, 4), - Port: cp.Port, - Source: peerSourcePEX, - } - if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 { - p.SupportsEncryption = true - } - missinggo.CopyExact(p.IP, cp.IP[:]) - ret = append(ret, p) - } - return - }()) - cl.mu.Unlock() - }() - default: - err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID) - } - if err != nil { - // That client uses its own extension IDs for outgoing message - // types, which is incorrect. - if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || - strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") { - return nil - } - } - case pp.Port: - if cl.dHT == nil { - break - } - pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String()) - if err != nil { - panic(err) - } - if msg.Port != 0 { - pingAddr.Port = int(msg.Port) - } - cl.dHT.Ping(pingAddr) - default: - err = fmt.Errorf("received unknown message type: %#v", msg.Type) - } - if err != nil { - return err - } - } -} - func (cl *Client) openNewConns(t *Torrent) { defer t.updateWantPeersEvent() for len(t.peers) != 0 { diff --git a/connection.go b/connection.go index 3e29e319..d98b1c27 100644 --- a/connection.go +++ b/connection.go @@ -8,9 +8,11 @@ import ( "expvar" "fmt" "io" + "log" "math/rand" "net" "strconv" + "strings" "sync" "time" @@ -690,3 +692,213 @@ func (c *connection) lastHelpful() (ret time.Time) { } return } + +// Processes incoming bittorrent messages. The client lock is held upon entry +// and exit. Returning will end the connection. +func (c *connection) mainReadLoop() error { + t := c.t + cl := t.cl + decoder := pp.Decoder{ + R: bufio.NewReader(c.rw), + MaxLength: 256 * 1024, + } + for { + cl.mu.Unlock() + var msg pp.Message + err := decoder.Decode(&msg) + cl.mu.Lock() + if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF { + return nil + } + if err != nil { + return err + } + c.readMsg(&msg) + c.lastMessageReceived = time.Now() + if msg.Keepalive { + receivedKeepalives.Add(1) + continue + } + receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1) + switch msg.Type { + case pp.Choke: + c.PeerChoked = true + c.Requests = nil + // We can then reset our interest. + c.updateRequests() + case pp.Reject: + cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length)) + c.updateRequests() + case pp.Unchoke: + c.PeerChoked = false + cl.peerUnchoked(t, c) + case pp.Interested: + c.PeerInterested = true + cl.upload(t, c) + case pp.NotInterested: + c.PeerInterested = false + c.Choke() + case pp.Have: + err = c.peerSentHave(int(msg.Index)) + case pp.Request: + if c.Choked { + break + } + if !c.PeerInterested { + err = errors.New("peer sent request but isn't interested") + break + } + if !t.havePiece(msg.Index.Int()) { + // This isn't necessarily them screwing up. We can drop pieces + // from our storage, and can't communicate this to peers + // except by reconnecting. + requestsReceivedForMissingPieces.Add(1) + err = errors.New("peer requested piece we don't have") + break + } + if c.PeerRequests == nil { + c.PeerRequests = make(map[request]struct{}, maxRequests) + } + c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{} + cl.upload(t, c) + case pp.Cancel: + req := newRequest(msg.Index, msg.Begin, msg.Length) + if !c.PeerCancel(req) { + unexpectedCancels.Add(1) + } + case pp.Bitfield: + err = c.peerSentBitfield(msg.Bitfield) + case pp.HaveAll: + err = c.peerSentHaveAll() + case pp.HaveNone: + err = c.peerSentHaveNone() + case pp.Piece: + cl.downloadedChunk(t, c, &msg) + case pp.Extended: + switch msg.ExtendedID { + case pp.HandshakeExtendedID: + // TODO: Create a bencode struct for this. + var d map[string]interface{} + err = bencode.Unmarshal(msg.ExtendedPayload, &d) + if err != nil { + err = fmt.Errorf("error decoding extended message payload: %s", err) + break + } + // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d) + if reqq, ok := d["reqq"]; ok { + if i, ok := reqq.(int64); ok { + c.PeerMaxRequests = int(i) + } + } + if v, ok := d["v"]; ok { + c.PeerClientName = v.(string) + } + m, ok := d["m"] + if !ok { + err = errors.New("handshake missing m item") + break + } + mTyped, ok := m.(map[string]interface{}) + if !ok { + err = errors.New("handshake m value is not dict") + break + } + if c.PeerExtensionIDs == nil { + c.PeerExtensionIDs = make(map[string]byte, len(mTyped)) + } + for name, v := range mTyped { + id, ok := v.(int64) + if !ok { + log.Printf("bad handshake m item extension ID type: %T", v) + continue + } + if id == 0 { + delete(c.PeerExtensionIDs, name) + } else { + if c.PeerExtensionIDs[name] == 0 { + supportedExtensionMessages.Add(name, 1) + } + c.PeerExtensionIDs[name] = byte(id) + } + } + metadata_sizeUntyped, ok := d["metadata_size"] + if ok { + metadata_size, ok := metadata_sizeUntyped.(int64) + if !ok { + log.Printf("bad metadata_size type: %T", metadata_sizeUntyped) + } else { + err = t.setMetadataSize(metadata_size) + if err != nil { + err = fmt.Errorf("error setting metadata size to %d", metadata_size) + break + } + } + } + if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok { + c.requestPendingMetadata() + } + case metadataExtendedId: + err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c) + if err != nil { + err = fmt.Errorf("error handling metadata extension message: %s", err) + } + case pexExtendedId: + if cl.config.DisablePEX { + break + } + var pexMsg peerExchangeMessage + err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg) + if err != nil { + err = fmt.Errorf("error unmarshalling PEX message: %s", err) + break + } + go func() { + cl.mu.Lock() + t.addPeers(func() (ret []Peer) { + for i, cp := range pexMsg.Added { + p := Peer{ + IP: make([]byte, 4), + Port: cp.Port, + Source: peerSourcePEX, + } + if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 { + p.SupportsEncryption = true + } + missinggo.CopyExact(p.IP, cp.IP[:]) + ret = append(ret, p) + } + return + }()) + cl.mu.Unlock() + }() + default: + err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID) + } + if err != nil { + // That client uses its own extension IDs for outgoing message + // types, which is incorrect. + if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || + strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") { + return nil + } + } + case pp.Port: + if cl.dHT == nil { + break + } + pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String()) + if err != nil { + panic(err) + } + if msg.Port != 0 { + pingAddr.Port = int(msg.Port) + } + cl.dHT.Ping(pingAddr) + default: + err = fmt.Errorf("received unknown message type: %#v", msg.Type) + } + if err != nil { + return err + } + } +}