2
0
mirror of synced 2025-02-23 22:28:11 +00:00

Wait for cancelled requests to be rejected per the spec

This commit is contained in:
Matt Joiner 2021-10-13 10:16:56 +11:00
parent 8b90e737c8
commit 23b4e2dc9c
4 changed files with 44 additions and 21 deletions

View File

@ -14,7 +14,7 @@ type peerImpl interface {
// Neither of these return buffer room anymore, because they're currently both posted. There's // Neither of these return buffer room anymore, because they're currently both posted. There's
// also PeerConn.writeBufferFull for when/where it matters. // also PeerConn.writeBufferFull for when/where it matters.
_cancel(Request) bool _cancel(RequestIndex) bool
_request(Request) bool _request(Request) bool
connectionFlags() string connectionFlags() string

View File

@ -84,6 +84,7 @@ type Peer struct {
// Stuff controlled by the local peer. // Stuff controlled by the local peer.
needRequestUpdate string needRequestUpdate string
actualRequestState requestState actualRequestState requestState
cancelledRequests roaring.Bitmap
lastBecameInterested time.Time lastBecameInterested time.Time
priorInterest time.Duration priorInterest time.Duration
@ -607,17 +608,28 @@ func (me *PeerConn) _request(r Request) bool {
} }
func (me *Peer) cancel(r RequestIndex) bool { func (me *Peer) cancel(r RequestIndex) bool {
if me.deleteRequest(r) { if !me.actualRequestState.Requests.Contains(r) {
return true
}
return me._cancel(r)
}
func (me *PeerConn) _cancel(r RequestIndex) bool {
if me.cancelledRequests.Contains(r) {
// Already cancelled and waiting for a response.
return true
}
if me.fastEnabled() {
me.cancelledRequests.Add(r)
} else {
if !me.deleteRequest(r) {
panic("request not existing should have been guarded")
}
if me.actualRequestState.Requests.GetCardinality() == 0 { if me.actualRequestState.Requests.GetCardinality() == 0 {
me.updateRequests("Peer.cancel") me.updateRequests("Peer.cancel")
} }
return me.peerImpl._cancel(me.t.requestIndexToRequest(r))
} }
return true return me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
}
func (me *PeerConn) _cancel(r Request) bool {
return me.write(makeCancelMessage(r))
} }
func (cn *PeerConn) fillWriteBuffer() { func (cn *PeerConn) fillWriteBuffer() {
@ -1299,6 +1311,9 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
if !c.peerChoking { if !c.peerChoking {
c._chunksReceivedWhileExpecting++ c._chunksReceivedWhileExpecting++
} }
if c.actualRequestState.Requests.GetCardinality() == 0 {
c.updateRequests("Peer.receiveChunk deleted request")
}
} else { } else {
chunksReceived.Add("unwanted", 1) chunksReceived.Add("unwanted", 1)
} }
@ -1320,9 +1335,6 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
if deletedRequest { if deletedRequest {
c.piecesReceivedSinceLastRequestUpdate++ c.piecesReceivedSinceLastRequestUpdate++
if c.actualRequestState.Requests.GetCardinality() == 0 {
c.updateRequests("Peer.receiveChunk deleted request")
}
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
} }
for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
@ -1490,6 +1502,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
if !c.actualRequestState.Requests.CheckedRemove(r) { if !c.actualRequestState.Requests.CheckedRemove(r) {
return false return false
} }
c.cancelledRequests.Remove(r)
for _, f := range c.callbacks.DeletedRequest { for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
} }

View File

@ -11,7 +11,6 @@ import (
"github.com/RoaringBitmap/roaring" "github.com/RoaringBitmap/roaring"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless" "github.com/anacrolix/multiless"
request_strategy "github.com/anacrolix/torrent/request-strategy" request_strategy "github.com/anacrolix/torrent/request-strategy"
@ -268,18 +267,23 @@ func (p *Peer) applyRequestState(next requestState) bool {
return false return false
} }
next.Requests.Iterate(func(req uint32) bool { next.Requests.Iterate(func(req uint32) bool {
// This could happen if the peer chokes us between the next state being generated, and us if p.cancelledRequests.Contains(req) {
// trying to transmit the state. log.Printf("waiting for cancelled request %v", req)
if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req/p.t.chunksPerRegularPiece())) { return false
return true }
if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
log.Printf("not assigning all requests [desired=%v, cancelled=%v, max=%v]",
current.Requests.GetCardinality(),
p.cancelledRequests.GetCardinality(),
p.nominalMaxRequests(),
)
return false
} }
var err error var err error
more, err = p.request(req) more, err = p.request(req)
if err != nil { if err != nil {
panic(err) panic(err)
} /* else { }
log.Print(req)
} */
return more return more
}) })
if more { if more {

View File

@ -45,10 +45,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
return true return true
} }
func (ws *webseedPeer) _cancel(r Request) bool { func (ws *webseedPeer) _cancel(r RequestIndex) bool {
active, ok := ws.activeRequests[r] active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
if ok { if ok {
active.Cancel() active.Cancel()
if !ws.peer.deleteRequest(r) {
panic("cancelled webseed request should exist")
}
if ws.peer.actualRequestState.Requests.GetCardinality() == 0 {
ws.peer.updateRequests("webseedPeer._cancel")
}
} }
return true return true
} }