2
0
mirror of synced 2025-02-23 06:08:07 +00:00

More consistent cancellation management

This commit is contained in:
Matt Joiner 2021-12-12 00:04:06 +11:00
parent 30375615b3
commit 04beb8937f
6 changed files with 102 additions and 102 deletions

View File

@ -15,9 +15,9 @@ type peerImpl interface {
isLowOnRequests() bool
writeInterested(interested bool) bool
// Neither of these return buffer room anymore, because they're currently both posted. There's
// also PeerConn.writeBufferFull for when/where it matters.
_cancel(RequestIndex)
// _cancel initiates cancellation of a request and returns acked if it expects the cancel to be
// handled by a follow-up event.
_cancel(RequestIndex) (acked bool)
_request(Request) bool
connectionFlags() string
onClose()

View File

@ -52,7 +52,7 @@ type PeerRemoteAddr interface {
// indexable with the memory space available.
type (
maxRequests = int
requestState = request_strategy.PeerNextRequestState
requestState = request_strategy.PeerRequestState
)
type Peer struct {
@ -84,9 +84,8 @@ type Peer struct {
// Stuff controlled by the local peer.
needRequestUpdate string
actualRequestState requestState
requestState requestState
updateRequestsTimer *time.Timer
cancelledRequests roaring.Bitmap
lastBecameInterested time.Time
priorInterest time.Duration
@ -97,9 +96,9 @@ type Peer struct {
choking bool
piecesReceivedSinceLastRequestUpdate maxRequests
maxPiecesReceivedBetweenRequestUpdates maxRequests
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
// Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering,
// and implementation differences, we may receive chunks that are no longer in the set of
// requests actually want. This could use a roaring.BSI if the memory use becomes noticeable.
validReceiveChunks map[RequestIndex]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
@ -177,10 +176,10 @@ func (cn *Peer) updateExpectingChunks() {
}
func (cn *Peer) expectingChunks() bool {
if cn.actualRequestState.Requests.IsEmpty() {
if cn.requestState.Requests.IsEmpty() {
return false
}
if !cn.actualRequestState.Interested {
if !cn.requestState.Interested {
return false
}
if !cn.peerChoking {
@ -189,7 +188,7 @@ func (cn *Peer) expectingChunks() bool {
haveAllowedFastRequests := false
cn.peerAllowedFast.Iterate(func(i uint32) bool {
haveAllowedFastRequests = roaringBitmapRangeCardinality(
&cn.actualRequestState.Requests,
&cn.requestState.Requests,
cn.t.pieceRequestIndexOffset(pieceIndex(i)),
cn.t.pieceRequestIndexOffset(pieceIndex(i+1)),
) == 0
@ -230,7 +229,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
func (cn *Peer) cumInterest() time.Duration {
ret := cn.priorInterest
if cn.actualRequestState.Interested {
if cn.requestState.Interested {
ret += time.Since(cn.lastBecameInterested)
}
return ret
@ -318,7 +317,7 @@ func (cn *Peer) statusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
if cn.actualRequestState.Interested {
if cn.requestState.Interested {
c('i')
}
if cn.choking {
@ -346,7 +345,7 @@ func (cn *Peer) downloadRate() float64 {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
cn.requestState.Requests.Iterate(func(x uint32) bool {
ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
return true
})
@ -373,14 +372,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
" %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d-%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
" %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
cn.actualRequestState.Requests.GetCardinality(),
cn.cancelledRequests.GetCardinality(),
cn.requestState.Requests.GetCardinality(),
cn.requestState.Cancelled.GetCardinality(),
cn.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
@ -537,10 +536,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
}
func (cn *Peer) setInterested(interested bool) bool {
if cn.actualRequestState.Interested == interested {
if cn.requestState.Interested == interested {
return true
}
cn.actualRequestState.Interested = interested
cn.requestState.Interested = interested
if interested {
cn.lastBecameInterested = time.Now()
} else if !cn.lastBecameInterested.IsZero() {
@ -589,7 +588,7 @@ func (cn *Peer) shouldRequest(r RequestIndex) error {
if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
// This could occur if we made a request with the fast extension, and then got choked and
// haven't had the request rejected yet.
if !cn.actualRequestState.Requests.Contains(r) {
if !cn.requestState.Requests.Contains(r) {
panic("peer choking and piece not allowed fast")
}
}
@ -608,13 +607,13 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
if cn.actualRequestState.Requests.Contains(r) {
if cn.requestState.Requests.Contains(r) {
return true, nil
}
if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
cn.actualRequestState.Requests.Add(r)
cn.requestState.Requests.Add(r)
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[RequestIndex]int)
}
@ -639,30 +638,24 @@ func (me *PeerConn) _request(r Request) bool {
}
func (me *Peer) cancel(r RequestIndex) {
if !me.actualRequestState.Requests.Contains(r) {
panic(r)
if !me.deleteRequest(r) {
panic("request not existing should have been guarded")
}
if me._cancel(r) {
if !me.requestState.Cancelled.CheckedAdd(r) {
panic("request already cancelled")
}
}
if me.isLowOnRequests() {
me.updateRequests("Peer.cancel")
}
me._cancel(r)
}
func (me *PeerConn) _cancel(r RequestIndex) {
if me.cancelledRequests.Contains(r) {
// Already cancelled and waiting for a response.
panic(r)
}
func (me *PeerConn) _cancel(r RequestIndex) bool {
me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
// Transmission does not send rejects for received cancels. See
// https://github.com/transmission/transmission/pull/2275.
if me.fastEnabled() && !me.remoteIsTransmission() {
me.cancelledRequests.Add(r)
} else {
if !me.deleteRequest(r) {
panic("request not existing should have been guarded")
}
if me.isLowOnRequests() {
me.updateRequests("Peer.cancel")
}
}
me.write(makeCancelMessage(me.t.requestIndexToRequest(r)))
return me.fastEnabled() && !me.remoteIsTransmission()
}
func (cn *PeerConn) fillWriteBuffer() {
@ -1102,18 +1095,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
c.deleteAllRequests()
} else {
// We don't decrement pending requests here, let's wait for the peer to either
// reject or satisfy the outstanding requests. Additionally some peers may unchoke
// reject or satisfy the outstanding requests. Additionally, some peers may unchoke
// us and resume where they left off, we don't want to have piled on to those chunks
// in the meanwhile. I think a peers ability to abuse this should be limited: they
// in the meanwhile. I think a peer's ability to abuse this should be limited: they
// could let us request a lot of stuff, then choke us and never reject, but they're
// only a single peer, our chunk balancing should smooth over this abuse.
}
c.peerChoking = true
// We can now reset our interest. I think we do this after setting the flag in case the
// peerImpl updates synchronously (webseeds?).
if !c.actualRequestState.Requests.IsEmpty() {
c.updateRequests("choked")
}
c.updateExpectingChunks()
case pp.Unchoke:
if !c.peerChoking {
@ -1124,7 +1112,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
}
c.peerChoking = false
preservedCount := 0
c.actualRequestState.Requests.Iterate(func(x uint32) bool {
c.requestState.Requests.Iterate(func(x uint32) bool {
if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) {
preservedCount++
}
@ -1194,7 +1182,11 @@ func (c *PeerConn) mainReadLoop() (err error) {
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Reject:
c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
req := newRequestFromMessage(&msg)
if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) {
log.Printf("received invalid reject [request=%v, peer=%v]", req, c)
err = fmt.Errorf("received invalid reject [request=%v]", req)
}
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@ -1210,13 +1202,16 @@ func (c *PeerConn) mainReadLoop() (err error) {
}
}
func (c *Peer) remoteRejectedRequest(r RequestIndex) {
if c.deleteRequest(r) {
if c.isLowOnRequests() {
c.updateRequests("Peer.remoteRejectedRequest")
}
c.decExpectedChunkReceive(r)
// Returns true if it was valid to reject the request.
func (c *Peer) remoteRejectedRequest(r RequestIndex) bool {
if !c.deleteRequest(r) && !c.requestState.Cancelled.CheckedRemove(r) {
return false
}
if c.isLowOnRequests() {
c.updateRequests("Peer.remoteRejectedRequest")
}
c.decExpectedChunkReceive(r)
return true
}
func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
@ -1342,16 +1337,16 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// The request needs to be deleted immediately to prevent cancels occurring asynchronously when
// have actually already received the piece, while we have the Client unlocked to write the data
// out.
deletedRequest := false
intended := false
{
if c.actualRequestState.Requests.Contains(req) {
if c.requestState.Requests.Contains(req) {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
}
// Request has been satisfied.
if c.deleteRequest(req) {
deletedRequest = true
if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) {
intended = true
if !c.peerChoking {
c._chunksReceivedWhileExpecting++
}
@ -1359,7 +1354,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
c.updateRequests("Peer.receiveChunk deleted request")
}
} else {
chunksReceived.Add("unwanted", 1)
chunksReceived.Add("unintended", 1)
}
}
@ -1369,7 +1364,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// Do we actually want this chunk?
if t.haveChunk(ppReq) {
// panic(fmt.Sprintf("%+v", ppReq))
chunksReceived.Add("wasted", 1)
chunksReceived.Add("redundant", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
@ -1378,7 +1373,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
if deletedRequest {
if intended {
c.piecesReceivedSinceLastRequestUpdate++
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
}
@ -1541,29 +1536,33 @@ func (c *Peer) peerHasWantedPieces() bool {
return c.peerPieces().Intersects(&c.t._pendingPieces)
}
// Returns true if an outstanding request is removed. Cancelled requests should be handled
// separately.
func (c *Peer) deleteRequest(r RequestIndex) bool {
if !c.actualRequestState.Requests.CheckedRemove(r) {
if !c.requestState.Requests.CheckedRemove(r) {
return false
}
c.cancelledRequests.Remove(r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
}
c.updateExpectingChunks()
if c.t.requestingPeer(r) == c {
delete(c.t.pendingRequests, r)
delete(c.t.lastRequested, r)
if c.t.requestingPeer(r) != c {
panic("only one peer should have a given request at a time")
}
delete(c.t.pendingRequests, r)
delete(c.t.lastRequested, r)
return true
}
func (c *Peer) deleteAllRequests() {
c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
c.deleteRequest(x)
c.requestState.Requests.Clone().Iterate(func(x uint32) bool {
if !c.deleteRequest(x) {
panic("request should exist")
}
return true
})
if !c.actualRequestState.Requests.IsEmpty() {
panic(c.actualRequestState.Requests.GetCardinality())
if !c.requestState.Requests.IsEmpty() {
panic(c.requestState.Requests.GetCardinality())
}
}
@ -1693,11 +1692,11 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
}
func (pc *PeerConn) isLowOnRequests() bool {
return pc.actualRequestState.Requests.IsEmpty()
return pc.requestState.Requests.IsEmpty() && pc.requestState.Cancelled.IsEmpty()
}
func (p *Peer) uncancelledRequests() uint64 {
return p.actualRequestState.Requests.GetCardinality() - p.cancelledRequests.GetCardinality()
return p.requestState.Requests.GetCardinality()
}
func (pc *PeerConn) remoteIsTransmission() bool {

View File

@ -4,7 +4,10 @@ import (
"github.com/RoaringBitmap/roaring"
)
type PeerNextRequestState struct {
type PeerRequestState struct {
Interested bool
Requests roaring.Bitmap
// Expecting
Requests roaring.Bitmap
// Cancelled and waiting response
Cancelled roaring.Bitmap
}

View File

@ -101,8 +101,8 @@ func (p *peerRequests) Less(i, j int) bool {
if leftPeer != nil {
// The right peer should also be set, or we'd have resolved the computation by now.
ml = ml.Uint64(
rightPeer.actualRequestState.Requests.GetCardinality(),
leftPeer.actualRequestState.Requests.GetCardinality(),
rightPeer.requestState.Requests.GetCardinality(),
leftPeer.requestState.Requests.GetCardinality(),
)
// Could either of the lastRequested be Zero? That's what checking an existing peer is for.
leftLast := t.lastRequested[leftRequest]
@ -171,7 +171,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
// if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
// if p.t.pendingRequests.Get(r) != 0 && !p.requestState.Requests.Contains(r) {
// return
// }
if !allowedFast {
@ -183,11 +183,15 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
// have made the request previously (presumably while unchoked), and haven't had
// the peer respond yet (and the request was retained because we are using the
// fast extension).
if p.peerChoking && !p.actualRequestState.Requests.Contains(r) {
if p.peerChoking && !p.requestState.Requests.Contains(r) {
// We can't request this right now.
return
}
}
if p.requestState.Cancelled.Contains(r) {
// Can't re-request.
return
}
requestHeap.requestIndexes = append(requestHeap.requestIndexes, r)
})
},
@ -215,7 +219,7 @@ func (p *Peer) maybeUpdateActualRequestState() bool {
// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next desiredRequestState) bool {
current := &p.actualRequestState
current := &p.requestState
if !p.setInterested(next.Interested) {
return false
}
@ -225,11 +229,6 @@ func (p *Peer) applyRequestState(next desiredRequestState) bool {
heap.Init(requestHeap)
for requestHeap.Len() != 0 && maxRequests(current.Requests.GetCardinality()) < p.nominalMaxRequests() {
req := heap.Pop(requestHeap).(RequestIndex)
if p.cancelledRequests.Contains(req) {
// Waiting for a reject or piece message, which will suitably trigger us to update our
// requests, so we can skip this one with no additional consideration.
continue
}
existing := t.requestingPeer(req)
if existing != nil && existing != p {
// Don't steal from the poor.

View File

@ -1099,7 +1099,7 @@ func (t *Torrent) maybeNewConns() {
func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
if t._pendingPieces.Contains(uint32(piece)) {
t.iterPeers(func(c *Peer) {
// if c.actualRequestState.Interested {
// if c.requestState.Interested {
// return
// }
if !c.isLowOnRequests() {
@ -1420,7 +1420,7 @@ func (t *Torrent) assertPendingRequests() {
// actual.m = make([]int, t.numRequests())
// }
// t.iterPeers(func(p *Peer) {
// p.actualRequestState.Requests.Iterate(func(x uint32) bool {
// p.requestState.Requests.Iterate(func(x uint32) bool {
// actual.Inc(x)
// return true
// })

View File

@ -48,17 +48,14 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
return true
}
func (ws *webseedPeer) _cancel(r RequestIndex) {
active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]
if ok {
func (ws *webseedPeer) _cancel(r RequestIndex) bool {
if active, ok := ws.activeRequests[ws.peer.t.requestIndexToRequest(r)]; ok {
active.Cancel()
// The requester is running and will handle the result.
return true
}
if !ws.peer.deleteRequest(r) {
panic("cancelled webseed request should exist")
}
if ws.peer.isLowOnRequests() {
ws.peer.updateRequests("webseedPeer._cancel")
}
// There should be no requester handling this, so no further events will occur.
return false
}
func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
@ -88,7 +85,7 @@ func (ws *webseedPeer) requester(i int) {
start:
for !ws.peer.closed.IsSet() {
restart := false
ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
ws.peer.requestState.Requests.Iterate(func(x uint32) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
return true
@ -170,7 +167,9 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
log.Printf("closing %v", ws)
ws.peer.close()
}
ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
panic("invalid reject")
}
return err
}
err = ws.peer.receiveChunk(&pp.Message{
@ -186,7 +185,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}
func (me *webseedPeer) isLowOnRequests() bool {
return me.peer.actualRequestState.Requests.GetCardinality() < uint64(me.maxRequests)
return me.peer.requestState.Requests.GetCardinality() < uint64(me.maxRequests)
}
func (me *webseedPeer) peerPieces() *roaring.Bitmap {