diff --git a/client.go b/client.go index 03bd4541..dd1c77d1 100644 --- a/client.go +++ b/client.go @@ -915,7 +915,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { } c.conn.SetWriteDeadline(time.Time{}) c.r = deadlineReader{c.conn, c.r} - completedHandshakeConnectionFlags.Add(c.ConnectionFlags(), 1) + completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1) if connIsIpv6(c.conn) { torrent.Add("completed handshake over ipv6", 1) } @@ -1344,7 +1344,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, conn: nc, writeBuffer: new(bytes.Buffer), } - c.PeerImpl = c + c.peerImpl = c c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string { return fmt.Sprintf("%v: %s", c, m.Text()) }) diff --git a/peer-impl.go b/peer-impl.go new file mode 100644 index 00000000..41540c47 --- /dev/null +++ b/peer-impl.go @@ -0,0 +1,21 @@ +package torrent + +import ( + "github.com/anacrolix/torrent/metainfo" +) + +// Contains implementation details that differ between peer types, like Webseeds and regular +// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with +// legacy PeerConn methods. +type peerImpl interface { + updateRequests() + writeInterested(interested bool) bool + cancel(request) bool + // Return true if there's room for more activity. + request(request) bool + connectionFlags() string + _close() + _postCancel(request) + onGotInfo(*metainfo.Info) + drop() +} diff --git a/peer_info.go b/peer_info.go index 049ae152..fdd74c52 100644 --- a/peer_info.go +++ b/peer_info.go @@ -20,7 +20,7 @@ type PeerInfo struct { Trusted bool } -func (me PeerInfo) Equal(other PeerInfo) bool { +func (me PeerInfo) equal(other PeerInfo) bool { return me.Id == other.Id && me.Addr.String() == other.Addr.String() && me.Source == other.Source && diff --git a/peerconn.go b/peerconn.go index 4afdd8c2..d9abf804 100644 --- a/peerconn.go +++ b/peerconn.go @@ -36,26 +36,13 @@ const ( PeerSourcePex = "X" ) -type PeerImpl interface { - UpdateRequests() - WriteInterested(interested bool) bool - Cancel(request) bool - // Return true if there's room for more activity. - Request(request) bool - ConnectionFlags() string - Close() - PostCancel(request) - onGotInfo(*metainfo.Info) - Drop() -} - type peer struct { // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats t *Torrent - PeerImpl + peerImpl connString string outgoing bool @@ -253,7 +240,7 @@ func eventAgeString(t time.Time) string { return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds()) } -func (cn *PeerConn) ConnectionFlags() (ret string) { +func (cn *PeerConn) connectionFlags() (ret string) { c := func(b byte) { ret += string([]byte{b}) } @@ -285,7 +272,7 @@ func (cn *peer) statusFlags() (ret string) { c('c') } c('-') - ret += cn.ConnectionFlags() + ret += cn.connectionFlags() c('-') if cn.peerInterested { c('i') @@ -348,10 +335,10 @@ func (cn *peer) close() { } cn.discardPieceInclination() cn._pieceRequestOrder.Clear() - cn.PeerImpl.Close() + cn.peerImpl._close() } -func (cn *PeerConn) Close() { +func (cn *PeerConn) _close() { if cn.pex.IsEnabled() { cn.pex.Close() } @@ -492,10 +479,10 @@ func (cn *peer) setInterested(interested bool) bool { } cn.updateExpectingChunks() // log.Printf("%p: setting interest: %v", cn, interested) - return cn.WriteInterested(interested) + return cn.writeInterested(interested) } -func (pc *PeerConn) WriteInterested(interested bool) bool { +func (pc *PeerConn) writeInterested(interested bool) bool { return pc.write(pp.Message{ Type: func() pp.MessageType { if interested { @@ -548,10 +535,10 @@ func (cn *peer) request(r request) bool { cn.t.pendingRequests[r]++ cn.t.requestStrategy.hooks().sentRequest(r) cn.updateExpectingChunks() - return cn.PeerImpl.Request(r) + return cn.peerImpl.request(r) } -func (me *PeerConn) Request(r request) bool { +func (me *PeerConn) request(r request) bool { return me.write(pp.Message{ Type: pp.Request, Index: r.Index, @@ -560,7 +547,7 @@ func (me *PeerConn) Request(r request) bool { }) } -func (me *PeerConn) Cancel(r request) bool { +func (me *PeerConn) cancel(r request) bool { return me.write(makeCancelMessage(r)) } @@ -573,7 +560,7 @@ func (cn *peer) doRequestState() bool { for r := range cn.requests { cn.deleteRequest(r) // log.Printf("%p: cancelling request: %v", cn, r) - if !cn.PeerImpl.Cancel(r) { + if !cn.peerImpl.cancel(r) { return false } } @@ -709,7 +696,7 @@ func (cn *PeerConn) postBitfield() { cn.sentHaves = cn.t._completedPieces.Copy() } -func (cn *PeerConn) UpdateRequests() { +func (cn *PeerConn) updateRequests() { // log.Print("update requests") cn.tickleWriter() } @@ -832,7 +819,7 @@ func (cn *PeerConn) peerPiecesChanged() { } } if prioritiesChanged { - cn.UpdateRequests() + cn.updateRequests() } } } @@ -853,7 +840,7 @@ func (cn *PeerConn) peerSentHave(piece pieceIndex) error { cn.raisePeerMinPieces(piece + 1) cn._peerPieces.Set(bitmap.BitIndex(piece), true) if cn.updatePiecePriority(piece) { - cn.UpdateRequests() + cn.updateRequests() } return nil } @@ -1096,7 +1083,7 @@ func (c *PeerConn) mainReadLoop() (err error) { c.deleteAllRequests() } // We can then reset our interest. - c.UpdateRequests() + c.updateRequests() c.updateExpectingChunks() case pp.Unchoke: c.peerChoking = false @@ -1146,7 +1133,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Suggest: torrent.Add("suggests received", 1) log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index).SetLevel(log.Debug).Log(c.t.logger) - c.UpdateRequests() + c.updateRequests() case pp.HaveAll: err = c.onPeerSentHaveAll() case pp.HaveNone: @@ -1157,7 +1144,7 @@ func (c *PeerConn) mainReadLoop() (err error) { torrent.Add("allowed fasts received", 1) log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger) c.peerAllowedFast.Add(int(msg.Index)) - c.UpdateRequests() + c.updateRequests() case pp.Extended: err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload) default: @@ -1319,7 +1306,7 @@ func (c *peer) receiveChunk(msg *pp.Message) error { // Cancel pending requests for this chunk. for c := range t.conns { - c.PostCancel(req) + c._postCancel(req) } err := func() error { @@ -1450,7 +1437,7 @@ another: return c.choke(msg) } -func (cn *PeerConn) Drop() { +func (cn *PeerConn) drop() { cn.t.dropConnection(cn) } @@ -1482,10 +1469,10 @@ func (c *peer) deleteRequest(r request) bool { if n < 0 { panic(n) } - c.UpdateRequests() + c.updateRequests() c.t.iterPeers(func(_c *peer) { if !_c.interested && _c != c && c.peerHasPiece(pieceIndex(r.Index)) { - _c.UpdateRequests() + _c.updateRequests() } }) return true @@ -1511,11 +1498,11 @@ func (c *peer) postCancel(r request) bool { if !c.deleteRequest(r) { return false } - c.PeerImpl.PostCancel(r) + c.peerImpl._postCancel(r) return true } -func (c *PeerConn) PostCancel(r request) { +func (c *PeerConn) _postCancel(r request) { c.post(makeCancelMessage(r)) } diff --git a/torrent.go b/torrent.go index a5225518..12402c5a 100644 --- a/torrent.go +++ b/torrent.go @@ -277,7 +277,7 @@ func (t *Torrent) addPeer(p PeerInfo) (added bool) { } if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok { torrent.Add("peers replaced", 1) - if !replaced.Equal(p) { + if !replaced.equal(p) { t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced) added = true } @@ -953,7 +953,7 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex) { t.iterPeers(func(c *peer) { if c.updatePiecePriority(piece) { // log.Print("conn piece priority changed") - c.UpdateRequests() + c.updateRequests() } }) t.maybeNewConns() @@ -1736,7 +1736,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { if len(bannableTouchers) >= 1 { c := bannableTouchers[0] t.cl.banPeerIP(c.remoteIp()) - c.Drop() + c.drop() } } t.onIncompletePiece(piece) @@ -1782,7 +1782,7 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) { // } t.iterPeers(func(conn *peer) { if conn.peerHasPiece(piece) { - conn.UpdateRequests() + conn.updateRequests() } }) } @@ -1946,7 +1946,7 @@ func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) { defer cb.t.cl.unlock() cb.t.iterPeers(func(cn *peer) { if cn.peerHasPiece(pieceIndex(r.Index)) { - cn.UpdateRequests() + cn.updateRequests() } }) @@ -1974,7 +1974,7 @@ func (t *Torrent) disallowDataDownloadLocked() { log.Printf("disallowing data download") t.dataDownloadDisallowed = true t.iterPeers(func(c *peer) { - c.UpdateRequests() + c.updateRequests() }) } @@ -1984,7 +1984,7 @@ func (t *Torrent) AllowDataDownload() { log.Printf("AllowDataDownload") t.dataDownloadDisallowed = false t.iterPeers(func(c *peer) { - c.UpdateRequests() + c.updateRequests() }) } @@ -2027,7 +2027,7 @@ func (t *Torrent) addWebSeed(url string) { }, requests: make(map[request]webseed.Request, maxRequests), } - ws.peer.PeerImpl = &ws + ws.peer.peerImpl = &ws if t.haveInfo() { ws.onGotInfo(t.info) } diff --git a/torrent_test.go b/torrent_test.go index 6c47a80c..2812a067 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -23,12 +23,12 @@ func r(i, b, l pp.Integer) request { return request{i, chunkSpec{b, l}} } -// Check the given Request is correct for various torrent offsets. +// Check the given request is correct for various torrent offsets. func TestTorrentRequest(t *testing.T) { const s = 472183431 // Length of torrent. for _, _case := range []struct { off int64 // An offset into the torrent. - req request // The expected Request. The zero value means !ok. + req request // The expected request. The zero value means !ok. }{ // Invalid offset. {-1, request{}}, diff --git a/web_seed.go b/web_seed.go index 297a817f..72bda5fd 100644 --- a/web_seed.go +++ b/web_seed.go @@ -32,22 +32,22 @@ type webSeed struct { peer peer } -var _ PeerImpl = (*webSeed)(nil) +var _ peerImpl = (*webSeed)(nil) func (ws *webSeed) onGotInfo(info *metainfo.Info) { ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles())) ws.client.Info = info } -func (ws *webSeed) PostCancel(r request) { - ws.Cancel(r) +func (ws *webSeed) _postCancel(r request) { + ws.cancel(r) } -func (ws *webSeed) WriteInterested(interested bool) bool { +func (ws *webSeed) writeInterested(interested bool) bool { return true } -func (ws *webSeed) Cancel(r request) bool { +func (ws *webSeed) cancel(r request) bool { ws.requests[r].Cancel() return true } @@ -56,25 +56,25 @@ func (ws *webSeed) intoSpec(r request) webseed.RequestSpec { return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} } -func (ws *webSeed) Request(r request) bool { +func (ws *webSeed) request(r request) bool { webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) ws.requests[r] = webseedRequest go ws.requestResultHandler(r, webseedRequest) return true } -func (ws *webSeed) ConnectionFlags() string { +func (ws *webSeed) connectionFlags() string { return "WS" } -func (ws *webSeed) Drop() { +func (ws *webSeed) drop() { } -func (ws *webSeed) UpdateRequests() { +func (ws *webSeed) updateRequests() { ws.peer.doRequestState() } -func (ws *webSeed) Close() {} +func (ws *webSeed) _close() {} func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) { result := <-webseedRequest.Result