Unbiased request ordering was requesting chunks the peer doesn't have

This commit is contained in:
Matt Joiner 2017-11-06 14:53:00 +11:00
parent 82093937d5
commit 62b1e1b749
1 changed files with 34 additions and 16 deletions

View File

@ -335,6 +335,29 @@ func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool)
})
}
// The function takes a message to be sent, and returns true if more messages
// are okay.
type messageWriter func(pp.Message) bool
func (cn *connection) request(r request, mw messageWriter) bool {
if cn.requests == nil {
cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
}
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
}
if !cn.PeerHasPiece(r.Index.Int()) {
panic("requesting piece peer doesn't have")
}
cn.requests[r] = struct{}{}
return mw(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
})
}
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
numFillBuffers.Add(1)
cancel, new, i := cn.desiredRequestState()
@ -354,23 +377,13 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
if len(new) != 0 {
fillBufferSentRequests.Add(1)
for _, r := range new {
if cn.requests == nil {
cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
}
cn.requests[r] = struct{}{}
// log.Printf("%p: requesting %v", cn, r)
if !msg(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}) {
if !cn.request(r, msg) {
// If we didn't completely top up the requests, we shouldn't
// mark the low water, since we'll want to top up the requests
// as soon as we have more write buffer space.
return
}
}
// If we didn't completely top up the requests, we shouldn't mark the
// low water, since we'll want to top up the requests as soon as we
// have more write buffer space.
cn.requestsLowWater = len(cn.requests) / 2
}
cn.upload(msg)
@ -503,6 +516,9 @@ func (cn *connection) updateRequests() {
cn.tickleWriter()
}
// Emits the indices in the Bitmaps bms in order, never repeating any index.
// skip is mutated during execution, and its initial values will never be
// emitted.
func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
return func(cb iter.Callback) {
for _, bm := range bms {
@ -518,7 +534,9 @@ func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
now, readahead := cn.t.readerPiecePriorities()
return iterBitmapsDistinct(cn.t.completedPieces.Copy(), now, readahead, cn.t.pendingPieces)
skip := bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
skip.Union(cn.t.completedPieces)
return iterBitmapsDistinct(skip, now, readahead, cn.t.pendingPieces)
}
// The connection should download highest priority pieces first, without any
@ -831,7 +849,7 @@ func (c *connection) mainReadLoop() error {
// from our storage, and can't communicate this to peers
// except by reconnecting.
requestsReceivedForMissingPieces.Add(1)
err = errors.New("peer requested piece we don't have")
err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int())
break
}
if c.PeerRequests == nil {