2
0
mirror of synced 2025-02-23 14:18:13 +00:00

Implement piece request ordering with retained state

This commit is contained in:
Matt Joiner 2021-12-01 14:38:47 +11:00
parent b99dd505b5
commit 94bb5d40ba
7 changed files with 243 additions and 43 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/google/btree"
@ -74,6 +75,7 @@ type Client struct {
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
torrents map[InfoHash]*Torrent
pieceRequestOrder map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder
acceptLimiter map[ipStr]int
dialRateLimiter *rate.Limiter

View File

@ -77,11 +77,13 @@ func (p *Piece) numDirtyChunks() chunkIndexType {
func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p.t.dirtyChunks.Add(p.requestIndexOffset() + i)
p.t.updatePieceRequestOrder(p.index)
p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i RequestIndex) {
p.t.dirtyChunks.Remove(p.requestIndexOffset() + i)
p.t.updatePieceRequestOrder(p.index)
}
func (p *Piece) numChunks() chunkIndexType {

View File

@ -3,12 +3,14 @@ package request_strategy
import (
"bytes"
"expvar"
"log"
"runtime"
"sort"
"sync"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
"github.com/google/btree"
"github.com/anacrolix/torrent/types"
)
@ -60,6 +62,15 @@ func (me pieceSorter) Swap(i, j int) {
func (me pieceSorter) Less(_i, _j int) bool {
i := me.get(_i)
j := me.get(_j)
return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
}
type pieceOrderInput struct {
PieceRequestOrderState
PieceRequestOrderKey
}
func pieceOrderLess(i, j pieceOrderInput) multiless.Computation {
return multiless.New().Int(
int(j.Priority), int(i.Priority),
).Bool(
@ -67,13 +78,13 @@ func (me pieceSorter) Less(_i, _j int) bool {
).Int64(
i.Availability, j.Availability,
).Int(
i.index, j.index,
i.Index, j.Index,
).Lazy(func() multiless.Computation {
return multiless.New().Cmp(bytes.Compare(
i.t.InfoHash[:],
j.t.InfoHash[:],
i.InfoHash[:],
j.InfoHash[:],
))
}).MustLess()
})
}
type requestsPeer struct {
@ -120,6 +131,15 @@ type filterPiece struct {
*Piece
}
func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
ret.Partial = fp.Partial
ret.InfoHash = fp.t.InfoHash
ret.Availability = fp.Availability
ret.Priority = fp.Priority
ret.Index = fp.index
return
}
var (
sortsMu sync.Mutex
sorts = map[*[]filterPiece][]int{}
@ -186,19 +206,13 @@ type pieceOrderingFinalizer struct {
}
// Calls f with requestable pieces in order.
func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
if false {
maxPieces := 0
for i := range input.Torrents {
maxPieces += len(input.Torrents[i].Pieces)
}
pieces := make([]filterPiece, 0, maxPieces)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
var storageLeft *int64
if input.Capacity != nil {
storageLeft = new(int64)
*storageLeft = *input.Capacity
}
for _t := range input.Torrents {
// TODO: We could do metainfo requests here.
t := &input.Torrents[_t]
@ -211,30 +225,68 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i
}
}
pieces = getSortedFilterPieces(pieces)
{
if len(pieces) != pro.tree.Len() {
panic("length doesn't match")
}
pieces := pieces
pro.tree.Ascend(func(i btree.Item) bool {
_i := i.(pieceRequestOrderItem)
ii := pieceOrderInput{
_i.state,
_i.key,
}
if pieces[0].toPieceOrderInput() != ii {
panic(_i)
}
pieces = pieces[1:]
return true
})
}
log.Printf("%v pieces passed", len(pieces))
}
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
var storageLeft *int64
if input.Capacity != nil {
storageLeft = new(int64)
*storageLeft = *input.Capacity
}
var allTorrentsUnverifiedBytes int64
torrentUnverifiedBytes := map[metainfo.Hash]int64{}
for _, piece := range pieces {
pro.tree.Ascend(func(i btree.Item) bool {
_i := i.(pieceRequestOrderItem)
var piece *Piece
var t Torrent
for _, t = range input.Torrents {
if t.InfoHash == _i.key.InfoHash {
piece = &t.Pieces[_i.key.Index]
break
}
}
if left := storageLeft; left != nil {
if *left < piece.Length {
continue
return true
}
*left -= piece.Length
}
if !piece.Request || piece.NumPendingChunks == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
continue
return true
}
if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
continue
if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
return true
}
if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
continue
return true
}
torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
torrentUnverifiedBytes[t.InfoHash] += piece.Length
allTorrentsUnverifiedBytes += piece.Length
f(piece.t, piece.Piece, piece.index)
}
f(&t, piece, _i.key.Index)
return true
})
return
}

View File

@ -0,0 +1,88 @@
package request_strategy
import (
"github.com/anacrolix/torrent/metainfo"
"github.com/google/btree"
)
func NewPieceOrder() *PieceRequestOrder {
return &PieceRequestOrder{
tree: btree.New(32),
keys: make(map[PieceRequestOrderKey]PieceRequestOrderState),
}
}
type PieceRequestOrder struct {
tree *btree.BTree
keys map[PieceRequestOrderKey]PieceRequestOrderState
}
type PieceRequestOrderKey struct {
InfoHash metainfo.Hash
Index int
}
type PieceRequestOrderState struct {
Priority piecePriority
Partial bool
Availability int64
}
type pieceRequestOrderItem struct {
key PieceRequestOrderKey
state PieceRequestOrderState
}
func (me pieceRequestOrderItem) Less(other btree.Item) bool {
otherConcrete := other.(pieceRequestOrderItem)
return pieceOrderLess(
pieceOrderInput{
PieceRequestOrderState: me.state,
PieceRequestOrderKey: me.key,
},
pieceOrderInput{
PieceRequestOrderState: otherConcrete.state,
PieceRequestOrderKey: otherConcrete.key,
},
).Less()
}
func (me *PieceRequestOrder) Add(key PieceRequestOrderKey, state PieceRequestOrderState) {
if _, ok := me.keys[key]; ok {
panic(key)
}
if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
key: key,
state: state,
}) != nil {
panic("shouldn't already have this")
}
me.keys[key] = state
}
func (me *PieceRequestOrder) Update(key PieceRequestOrderKey, state PieceRequestOrderState) {
if me.tree.Delete(me.existingItemForKey(key)) == nil {
panic(key)
}
if me.tree.ReplaceOrInsert(pieceRequestOrderItem{
key: key,
state: state,
}) != nil {
panic(key)
}
me.keys[key] = state
}
func (me *PieceRequestOrder) existingItemForKey(key PieceRequestOrderKey) pieceRequestOrderItem {
return pieceRequestOrderItem{
key: key,
state: me.keys[key],
}
}
func (me *PieceRequestOrder) Delete(key PieceRequestOrderKey) {
if me.tree.Delete(me.existingItemForKey(key)) == nil {
panic(key)
}
delete(me.keys, key)
}

View File

@ -27,10 +27,12 @@ func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input reques
input.Capacity = &cap
}
}
if false {
if input.Capacity == nil {
input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
return
}
}
input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
if !t.haveInfo() {
@ -58,8 +60,22 @@ func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
}
rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
for i := range t.pieces {
rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
}
return rst
}
func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
return request_strategy.PieceRequestOrderState{
Priority: t.piece(i).purePriority(),
Partial: t.piecePartiallyDownloaded(i),
Availability: t.piece(i).availability,
}
}
func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
p := &t.pieces[i]
rst.Pieces = append(rst.Pieces, request_strategy.Piece{
return request_strategy.Piece{
Request: !t.ignorePieceForRequests(i),
Priority: p.purePriority(),
Partial: t.piecePartiallyDownloaded(i),
@ -67,9 +83,7 @@ func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
Length: int64(p.length()),
NumPendingChunks: int(t.pieceNumPendingChunks(i)),
IterPendingChunks: &p.undirtiedChunksIter,
})
}
return rst
}
func init() {
@ -205,6 +219,7 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
}
request_strategy.GetRequestablePieces(
input,
p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
if t.InfoHash != p.t.infoHash {
return

View File

@ -0,0 +1,7 @@
package torrent
func (t *Torrent) updatePieceRequestOrder(pieceIndex int) {
t.cl.pieceRequestOrder[t.storage.Capacity].Update(
t.pieceRequestOrderKey(pieceIndex),
t.requestStrategyPieceOrderState(pieceIndex))
}

View File

@ -27,6 +27,7 @@ import (
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
@ -165,6 +166,7 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
panic(p.availability)
}
p.availability--
t.updatePieceRequestOrder(i)
}
func (t *Torrent) incPieceAvailability(i pieceIndex) {
@ -172,6 +174,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
if t.haveInfo() {
p := t.piece(i)
p.availability++
t.updatePieceRequestOrder(i)
}
}
@ -424,8 +427,21 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
return nil
}
func (t *Torrent) pieceRequestOrderKey(i int) request_strategy.PieceRequestOrderKey {
return request_strategy.PieceRequestOrderKey{
InfoHash: t.infoHash,
Index: i,
}
}
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
if t.cl.pieceRequestOrder == nil {
t.cl.pieceRequestOrder = make(map[storage.TorrentCapacity]*request_strategy.PieceRequestOrder)
}
if t.cl.pieceRequestOrder[t.storage.Capacity] == nil {
t.cl.pieceRequestOrder[t.storage.Capacity] = request_strategy.NewPieceOrder()
}
for i := range t.pieces {
p := &t.pieces[i]
// Need to add availability before updating piece completion, as that may result in conns
@ -434,6 +450,9 @@ func (t *Torrent) onSetInfo() {
panic(p.availability)
}
p.availability = int64(t.pieceAvailabilityFromPeers(i))
t.cl.pieceRequestOrder[t.storage.Capacity].Add(
t.pieceRequestOrderKey(i),
t.requestStrategyPieceOrderState(i))
t.updatePieceCompletion(pieceIndex(i))
if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
// t.logger.Printf("piece %s completion unknown, queueing check", p)
@ -797,6 +816,12 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
return pieceIndex(t._completedPieces.GetCardinality())
}
func (t *Torrent) deletePieceRequestOrder() {
for i := 0; i < t.numPieces(); i++ {
t.cl.pieceRequestOrder[t.storage.Capacity].Delete(t.pieceRequestOrderKey(i))
}
}
func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
t.closed.Set()
if t.storage != nil {
@ -816,6 +841,9 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
t.iterPeers(func(p *Peer) {
p.close()
})
if t.storage != nil {
t.deletePieceRequestOrder()
}
t.pex.Reset()
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
@ -1102,6 +1130,11 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) {
}
func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) {
if !t.closed.IsSet() {
// It would be possible to filter on pure-priority changes here to avoid churning the piece
// request order.
t.updatePieceRequestOrder(piece)
}
p := &t.pieces[piece]
newPrio := p.uncachedPriority()
// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
@ -1238,6 +1271,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool {
} else {
t._completedPieces.Remove(x)
}
p.t.updatePieceRequestOrder(piece)
t.updateComplete()
if complete && len(p.dirtiers) != 0 {
t.logger.Printf("marked piece %v complete but still has dirtiers", piece)