Generate requests, cancels and interest state in the connection writer

This commit is contained in:
Matt Joiner 2017-08-31 23:48:52 +10:00
parent 986b9093a0
commit 13e79039f2
4 changed files with 76 additions and 139 deletions

View File

@ -1035,23 +1035,11 @@ func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
} }
} }
func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
conn.updateRequests()
}
func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
ok = cn.Cancel(r)
if ok {
postedCancels.Add(1)
}
return
}
func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool { func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
if !cn.RequestPending(r) { if !cn.RequestPending(r) {
return false return false
} }
delete(cn.Requests, r) delete(cn.requests, r)
return true return true
} }

View File

@ -66,7 +66,7 @@ type connection struct {
// Stuff controlled by the local peer. // Stuff controlled by the local peer.
Interested bool Interested bool
Choked bool Choked bool
Requests map[request]struct{} requests map[request]struct{}
requestsLowWater int requestsLowWater int
// Indexed by metadata piece, set to true if posted and pending a // Indexed by metadata piece, set to true if posted and pending a
// response. // response.
@ -204,7 +204,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
cn.UsefulChunksReceived, cn.UsefulChunksReceived,
cn.UnwantedChunksReceived+cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
cn.chunksSent, cn.chunksSent,
len(cn.Requests), cn.numLocalRequests(),
len(cn.PeerRequests), len(cn.PeerRequests),
cn.statusFlags(), cn.statusFlags(),
) )
@ -243,7 +243,7 @@ func (cn *connection) Post(msg pp.Message) {
} }
func (cn *connection) RequestPending(r request) bool { func (cn *connection) RequestPending(r request) bool {
_, ok := cn.Requests[r] _, ok := cn.requests[r]
return ok return ok
} }
@ -288,50 +288,6 @@ func (cn *connection) nominalMaxRequests() (ret int) {
return return
} }
// Returns true if more requests can be sent.
func (cn *connection) Request(chunk request) bool {
if len(cn.Requests) >= cn.nominalMaxRequests() {
return false
}
if !cn.PeerHasPiece(int(chunk.Index)) {
return true
}
if cn.RequestPending(chunk) {
return true
}
cn.SetInterested(true)
if cn.PeerChoked {
return false
}
if cn.Requests == nil {
cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
}
cn.Requests[chunk] = struct{}{}
cn.requestsLowWater = len(cn.Requests) / 2
cn.Post(pp.Message{
Type: pp.Request,
Index: chunk.Index,
Begin: chunk.Begin,
Length: chunk.Length,
})
return true
}
// Returns true if an unsatisfied request was canceled.
func (cn *connection) Cancel(r request) bool {
if !cn.RequestPending(r) {
return false
}
delete(cn.Requests, r)
cn.Post(pp.Message{
Type: pp.Cancel,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
})
return true
}
// Returns true if an unsatisfied request was canceled. // Returns true if an unsatisfied request was canceled.
func (cn *connection) PeerCancel(r request) bool { func (cn *connection) PeerCancel(r request) bool {
if cn.PeerRequests == nil { if cn.PeerRequests == nil {
@ -365,11 +321,13 @@ func (cn *connection) Unchoke() {
cn.Choked = false cn.Choked = false
} }
func (cn *connection) SetInterested(interested bool) { func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
if cn.Interested == interested { if cn.Interested == interested {
return return true
} }
cn.Post(pp.Message{ cn.Interested = interested
// log.Printf("%p: setting interest: %v", cn, interested)
return msg(pp.Message{
Type: func() pp.MessageType { Type: func() pp.MessageType {
if interested { if interested {
return pp.Interested return pp.Interested
@ -378,7 +336,6 @@ func (cn *connection) SetInterested(interested bool) {
} }
}(), }(),
}) })
cn.Interested = interested
} }
var ( var (
@ -388,6 +345,44 @@ var (
connectionWriterWrite = expvar.NewInt("connectionWriterWrite") connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
) )
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
rs, i := cn.desiredRequestState()
if !cn.SetInterested(i, msg) {
return
}
for r := range cn.requests {
if _, ok := rs[r]; !ok {
delete(cn.requests, r)
// log.Printf("%p: cancelling request: %v", cn, r)
if !msg(pp.Message{
Type: pp.Cancel,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}) {
return
}
}
}
for r := range rs {
if _, ok := cn.requests[r]; !ok {
if cn.requests == nil {
cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
}
cn.requests[r] = struct{}{}
// log.Printf("%p: requesting %v", cn, r)
if !msg(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}) {
return
}
}
}
}
// Writes buffers to the socket from the write channel. // Writes buffers to the socket from the write channel.
func (cn *connection) writer(keepAliveTimeout time.Duration) { func (cn *connection) writer(keepAliveTimeout time.Duration) {
var ( var (
@ -410,6 +405,12 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
for { for {
buf.Write(cn.postedBuffer.Bytes()) buf.Write(cn.postedBuffer.Bytes())
cn.postedBuffer.Reset() cn.postedBuffer.Reset()
if buf.Len() == 0 {
cn.fillWriteBuffer(func(msg pp.Message) bool {
buf.Write(msg.MustMarshalBinary())
return buf.Len() < 1<<16
})
}
if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout { if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary()) buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
postedKeepalives.Add(1) postedKeepalives.Add(1)
@ -500,31 +501,21 @@ func nextRequestState(
} }
func (cn *connection) updateRequests() { func (cn *connection) updateRequests() {
rs, i := nextRequestState( cn.writerCond.Broadcast()
}
func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
return nextRequestState(
cn.t.networkingEnabled, cn.t.networkingEnabled,
cn.Requests, cn.requests,
cn.PeerChoked, cn.PeerChoked,
&cn.pieceRequestOrder, &cn.pieceRequestOrder,
func(piece int, f func(chunkSpec) bool) bool { func(piece int, f func(chunkSpec) bool) bool {
return undirtiedChunks(piece, cn.t, f) return undirtiedChunks(piece, cn.t, f)
}, },
cn.requestsLowWater, cn.requestsLowWater,
cn.nominalMaxRequests()) cn.nominalMaxRequests(),
for r := range cn.Requests { )
if _, ok := rs[r]; !ok {
if !cn.Cancel(r) {
panic("wat")
}
}
}
for r := range rs {
if _, ok := cn.Requests[r]; !ok {
if !cn.Request(r) {
panic("how")
}
}
}
cn.SetInterested(i)
} }
func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool { func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
@ -536,7 +527,7 @@ func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
func (cn *connection) stopRequestingPiece(piece int) { func (cn *connection) stopRequestingPiece(piece int) {
cn.pieceRequestOrder.Remove(piece) cn.pieceRequestOrder.Remove(piece)
cn.updateRequests() cn.writerCond.Broadcast()
} }
// This is distinct from Torrent piece priority, which is the user's // This is distinct from Torrent piece priority, which is the user's
@ -754,7 +745,7 @@ func (c *connection) mainReadLoop() error {
switch msg.Type { switch msg.Type {
case pp.Choke: case pp.Choke:
c.PeerChoked = true c.PeerChoked = true
c.Requests = nil c.requests = nil
// We can then reset our interest. // We can then reset our interest.
c.updateRequests() c.updateRequests()
case pp.Reject: case pp.Reject:
@ -762,7 +753,7 @@ func (c *connection) mainReadLoop() error {
c.updateRequests() c.updateRequests()
case pp.Unchoke: case pp.Unchoke:
c.PeerChoked = false c.PeerChoked = false
cl.peerUnchoked(t, c) c.writerCond.Broadcast()
case pp.Interested: case pp.Interested:
c.PeerInterested = true c.PeerInterested = true
c.upload() c.upload()
@ -989,9 +980,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
// Cancel pending requests for this chunk. // Cancel pending requests for this chunk.
for c := range t.conns { for c := range t.conns {
if cl.connCancel(t, c, req) { c.updateRequests()
c.updateRequests()
}
} }
cl.mu.Unlock() cl.mu.Unlock()
@ -1096,3 +1085,7 @@ func (cn *connection) netGoodPiecesDirtied() int {
func (c *connection) peerHasWantedPieces() bool { func (c *connection) peerHasWantedPieces() bool {
return !c.pieceRequestOrder.IsEmpty() return !c.pieceRequestOrder.IsEmpty()
} }
func (c *connection) numLocalRequests() int {
return len(c.requests)
}

View File

@ -2,13 +2,10 @@ package torrent
import ( import (
"io" "io"
"io/ioutil"
"net"
"sync" "sync"
"testing" "testing"
"time" "time"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter" "github.com/bradfitz/iter"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -19,49 +16,6 @@ import (
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
func TestCancelRequestOptimized(t *testing.T) {
r, w := io.Pipe()
c := &connection{
PeerMaxRequests: 1,
peerPieces: func() bitmap.Bitmap {
var bm bitmap.Bitmap
bm.Set(1, true)
return bm
}(),
w: w,
conn: new(net.TCPConn),
// For the locks
t: &Torrent{cl: &Client{}},
}
assert.Len(t, c.Requests, 0)
c.Request(newRequest(1, 2, 3))
require.Len(t, c.Requests, 1)
// Posting this message should removing the pending Request.
require.True(t, c.Cancel(newRequest(1, 2, 3)))
assert.Len(t, c.Requests, 0)
// Check that write optimization filters out the Request, due to the
// Cancel. We should have received an Interested, due to the initial
// request, and then keep-alives until we close the connection.
go c.writer(0)
b := make([]byte, 9)
n, err := io.ReadFull(r, b)
require.NoError(t, err)
require.EqualValues(t, len(b), n)
require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
time.Sleep(time.Millisecond)
c.mu().Lock()
c.Close()
c.mu().Unlock()
w.Close()
b, err = ioutil.ReadAll(r)
require.NoError(t, err)
// A single keep-alive may have gone through, as writer would be stuck
// trying to flush it, and then promptly close.
if s := string(b); s != "\x00\x00\x00\x00" && s != "" {
t.Logf("expected zero or one keepalives, got %q", s)
}
}
// Ensure that no race exists between sending a bitfield, and a subsequent // Ensure that no race exists between sending a bitfield, and a subsequent
// Have that would potentially alter it. // Have that would potentially alter it.
func TestSendBitfieldThenHave(t *testing.T) { func TestSendBitfieldThenHave(t *testing.T) {

View File

@ -1422,16 +1422,18 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
} }
} }
func (t *Torrent) cancelRequestsForPiece(piece int) {
for cn := range t.conns {
cn.writerCond.Broadcast()
}
}
func (t *Torrent) onPieceCompleted(piece int) { func (t *Torrent) onPieceCompleted(piece int) {
t.pendingPieces.Remove(piece) t.pendingPieces.Remove(piece)
t.pendAllChunkSpecs(piece) t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
for conn := range t.conns { for conn := range t.conns {
conn.Have(piece) conn.Have(piece)
for r := range conn.Requests {
if int(r.Index) == piece {
conn.Cancel(r)
}
}
// Could check here if peer doesn't have piece, but due to caching // Could check here if peer doesn't have piece, but due to caching
// some peers may have said they have a piece but they don't. // some peers may have said they have a piece but they don't.
conn.upload() conn.upload()