Combine pending and last requested

This commit is contained in:
Matt Joiner 2022-05-09 19:34:43 +10:00
parent 5fe7007d34
commit 7d4f64ce3c
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
3 changed files with 31 additions and 22 deletions

View File

@ -48,11 +48,10 @@ type PeerRemoteAddr interface {
String() string
}
// Since we have to store all the requests in memory, we can't reasonably exceed what would be
// indexable with the memory space available.
type (
maxRequests = int
requestState = request_strategy.PeerRequestState
// Since we have to store all the requests in memory, we can't reasonably exceed what could be
// indexed with the memory space available.
maxRequests = int
)
type Peer struct {
@ -85,7 +84,7 @@ type Peer struct {
// Stuff controlled by the local peer.
needRequestUpdate string
requestState requestState
requestState request_strategy.PeerRequestState
updateRequestsTimer *time.Timer
lastRequestUpdate time.Time
peakRequests maxRequests
@ -637,8 +636,10 @@ func (cn *Peer) request(r RequestIndex) (more bool, err error) {
cn.validReceiveChunks = make(map[RequestIndex]int)
}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r] = cn
cn.t.lastRequested[r] = time.Now()
cn.t.requestState[r] = requestState{
peer: cn,
when: time.Now(),
}
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
@ -1464,7 +1465,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk from *other* peers.
if p := t.pendingRequests[req]; p != nil {
if p := t.requestingPeer(req); p != nil {
if p == c {
panic("should not be pending request from conn that just received it")
}
@ -1627,8 +1628,7 @@ func (c *Peer) deleteRequest(r RequestIndex) bool {
if c.t.requestingPeer(r) != c {
panic("only one peer should have a given request at a time")
}
delete(c.t.pendingRequests, r)
delete(c.t.lastRequested, r)
delete(c.t.requestState, r)
// c.t.iterPeers(func(p *Peer) {
// if p.isLowOnRequests() {
// p.updateRequests("Peer.deleteRequest")

View File

@ -118,8 +118,10 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex
if ml.Ok() {
return ml.MustLess()
}
leftPeer := t.pendingRequests[leftRequest]
rightPeer := t.pendingRequests[rightRequest]
leftRequestState := t.requestState[leftRequest]
rightRequestState := t.requestState[rightRequest]
leftPeer := leftRequestState.peer
rightPeer := rightRequestState.peer
// Prefer chunks already requested from this peer.
ml = ml.Bool(rightPeer == p.peer, leftPeer == p.peer)
// Prefer unrequested chunks.
@ -134,8 +136,8 @@ func (p *desiredPeerRequests) lessByValue(leftRequest, rightRequest RequestIndex
leftPeer.requestState.Requests.GetCardinality(),
)
// Could either of the lastRequested be Zero? That's what checking an existing peer is for.
leftLast := t.lastRequested[leftRequest]
rightLast := t.lastRequested[rightRequest]
leftLast := leftRequestState.when
rightLast := rightRequestState.when
if leftLast.IsZero() || rightLast.IsZero() {
panic("expected non-zero last requested times")
}

View File

@ -140,9 +140,8 @@ type Torrent struct {
initialPieceCheckDisabled bool
connsWithAllPieces map[*Peer]struct{}
// Count of each request across active connections.
pendingRequests map[RequestIndex]*Peer
lastRequested map[RequestIndex]time.Time
requestState map[RequestIndex]requestState
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks typedRoaring.Bitmap[RequestIndex]
@ -472,8 +471,7 @@ func (t *Torrent) onSetInfo() {
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
t.pendingRequests = make(map[RequestIndex]*Peer)
t.lastRequested = make(map[RequestIndex]time.Time)
t.requestState = make(map[RequestIndex]requestState)
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
@ -2453,16 +2451,20 @@ func (t *Torrent) updateComplete() {
}
func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
p := t.pendingRequests[r]
p := t.requestingPeer(r)
if p != nil {
p.cancel(r)
}
delete(t.pendingRequests, r)
// TODO: This is a check that an old invariant holds. It can be removed after some testing.
//delete(t.pendingRequests, r)
if _, ok := t.requestState[r]; ok {
panic("expected request state to be gone")
}
return p
}
func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
return t.pendingRequests[r]
return t.requestState[r].peer
}
func (t *Torrent) addConnWithAllPieces(p *Peer) {
@ -2494,3 +2496,8 @@ func (t *Torrent) hasStorageCap() bool {
func (t *Torrent) pieceIndexOfRequestIndex(ri RequestIndex) pieceIndex {
return pieceIndex(ri / t.chunksPerRegularPiece())
}
type requestState struct {
peer *Peer
when time.Time
}