2
0
mirror of synced 2025-02-24 06:38:14 +00:00

444 lines
11 KiB
Go
Raw Normal View History

package request_strategy
import (
"bytes"
2021-11-30 15:18:38 +11:00
"expvar"
"runtime"
"sort"
"sync"
"github.com/anacrolix/multiless"
2021-11-30 15:18:38 +11:00
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/types"
)
type (
2021-09-19 15:16:37 +10:00
RequestIndex = uint32
ChunkIndex = uint32
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
2021-05-13 11:26:22 +10:00
// This can be made into a type-param later, will be great for testing.
ChunkSpec = types.ChunkSpec
)
2021-05-14 13:06:12 +10:00
type ClientPieceOrder struct{}
2021-11-30 15:18:38 +11:00
func equalFilterPieces(l, r []filterPiece) bool {
if len(l) != len(r) {
return false
}
for i := range l {
lp := &l[i]
rp := &r[i]
if lp.Priority != rp.Priority ||
lp.Partial != rp.Partial ||
lp.Availability != rp.Availability ||
lp.index != rp.index ||
lp.t.InfoHash != rp.t.InfoHash {
return false
}
}
return true
}
type pieceSorter struct {
swap func(i, j int)
get func(i int) *filterPiece
len int
}
func (me pieceSorter) Len() int {
return me.len
}
func (me pieceSorter) Swap(i, j int) {
me.swap(i, j)
}
func (me pieceSorter) Less(_i, _j int) bool {
i := me.get(_i)
j := me.get(_j)
return multiless.New().Int(
int(j.Priority), int(i.Priority),
).Bool(
j.Partial, i.Partial,
).Int64(
i.Availability, j.Availability,
).Int(
i.index, j.index,
).Lazy(func() multiless.Computation {
return multiless.New().Cmp(bytes.Compare(
i.t.InfoHash[:],
j.t.InfoHash[:],
))
}).MustLess()
}
type requestsPeer struct {
2021-05-13 11:26:22 +10:00
Peer
nextState PeerNextRequestState
requestablePiecesRemaining int
}
func (rp *requestsPeer) canFitRequest() bool {
2021-09-19 15:16:37 +10:00
return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
}
2021-09-19 15:16:37 +10:00
func (rp *requestsPeer) addNextRequest(r RequestIndex) {
if !rp.nextState.Requests.CheckedAdd(r) {
panic("should only add once")
}
}
type peersForPieceRequests struct {
requestsInPiece int
*requestsPeer
}
2021-09-19 15:16:37 +10:00
func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
me.requestsPeer.addNextRequest(r)
me.requestsInPiece++
}
2021-05-14 13:06:12 +10:00
type requestablePiece struct {
index pieceIndex
t *Torrent
alwaysReallocate bool
2021-05-14 13:06:12 +10:00
NumPendingChunks int
IterPendingChunks ChunksIterFunc
2021-05-14 13:06:12 +10:00
}
2021-09-19 15:16:37 +10:00
func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
return p.t.ChunksPerPiece*uint32(p.index) + c
2021-09-19 15:16:37 +10:00
}
2021-05-14 13:06:12 +10:00
type filterPiece struct {
2021-11-30 15:18:38 +11:00
t *Torrent
2021-05-14 13:06:12 +10:00
index pieceIndex
*Piece
2021-05-14 13:06:12 +10:00
}
2021-11-30 15:18:38 +11:00
var (
sortsMu sync.Mutex
sorts = map[*[]filterPiece][]int{}
)
func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
ret = make([]filterPiece, len(indices))
for i, j := range indices {
ret[i] = pieces[j]
}
return
}
var packageExpvarMap = expvar.NewMap("request-strategy")
func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
const cachePieceSorts = false
if !cachePieceSorts {
sort.Sort(pieceSorter{
len: len(unsorted),
swap: func(i, j int) {
unsorted[i], unsorted[j] = unsorted[j], unsorted[i]
},
get: func(i int) *filterPiece {
return &unsorted[i]
},
})
return unsorted
}
2021-11-30 15:18:38 +11:00
sortsMu.Lock()
defer sortsMu.Unlock()
for key, order := range sorts {
if equalFilterPieces(*key, unsorted) {
packageExpvarMap.Add("reused filter piece ordering", 1)
return reorderedFilterPieces(unsorted, order)
}
}
indices := make([]int, len(unsorted))
2021-11-30 15:18:38 +11:00
for i := 0; i < len(indices); i++ {
indices[i] = i
}
sort.Sort(pieceSorter{
len: len(unsorted),
swap: func(i, j int) {
indices[i], indices[j] = indices[j], indices[i]
},
get: func(i int) *filterPiece {
return &unsorted[indices[i]]
},
})
2021-11-30 15:18:38 +11:00
packageExpvarMap.Add("added filter piece ordering", 1)
sorts[&unsorted] = indices
runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
packageExpvarMap.Add("finalized filter piece ordering", 1)
sortsMu.Lock()
defer sortsMu.Unlock()
delete(sorts, me.unsorted)
})
return reorderedFilterPieces(unsorted, indices)
}
type pieceOrderingFinalizer struct {
unsorted *[]filterPiece
}
// Calls f with requestable pieces in order.
func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex int)) {
maxPieces := 0
for i := range input.Torrents {
maxPieces += len(input.Torrents[i].Pieces)
}
pieces := make([]filterPiece, 0, maxPieces)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
var storageLeft *int64
if input.Capacity != nil {
storageLeft = new(int64)
*storageLeft = *input.Capacity
}
for _t := range input.Torrents {
// TODO: We could do metainfo requests here.
2021-11-30 15:18:38 +11:00
t := &input.Torrents[_t]
for i := range t.Pieces {
2021-05-14 13:06:12 +10:00
pieces = append(pieces, filterPiece{
2021-11-30 15:18:38 +11:00
t: &input.Torrents[_t],
index: i,
Piece: &t.Pieces[i],
})
}
}
2021-11-30 15:18:38 +11:00
pieces = getSortedFilterPieces(pieces)
2021-05-14 13:40:09 +10:00
var allTorrentsUnverifiedBytes int64
2021-11-30 15:18:38 +11:00
torrentUnverifiedBytes := map[metainfo.Hash]int64{}
2021-05-14 13:06:12 +10:00
for _, piece := range pieces {
if left := storageLeft; left != nil {
if *left < piece.Length {
continue
}
*left -= piece.Length
}
if !piece.Request || piece.NumPendingChunks == 0 {
2021-05-14 13:40:09 +10:00
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
continue
}
2021-11-30 15:18:38 +11:00
if piece.t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[piece.t.InfoHash]+piece.Length > piece.t.MaxUnverifiedBytes {
continue
}
2021-05-14 13:40:09 +10:00
if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
continue
}
2021-11-30 15:18:38 +11:00
torrentUnverifiedBytes[piece.t.InfoHash] += piece.Length
2021-05-14 13:40:09 +10:00
allTorrentsUnverifiedBytes += piece.Length
2021-11-30 15:18:38 +11:00
f(piece.t, piece.Piece, piece.index)
2021-05-14 13:06:12 +10:00
}
return
}
2021-05-14 13:40:09 +10:00
type Input struct {
// This is all torrents that share the same capacity below (or likely a single torrent if there
// is infinite capacity, since you could just run it separately for each Torrent if that's the
// case).
Torrents []Torrent
// Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
// that share the same capacity key must be incorporated in piece ordering.
Capacity *int64
// Across all the Torrents. This might be partitioned by storage capacity key now.
2021-05-14 13:40:09 +10:00
MaxUnverifiedBytes int64
}
// Checks that a sorted peersForPiece slice makes sense.
func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
if !sort.IsSorted(peers) {
panic("not sorted")
}
peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
for _, p := range peers.peersForPiece {
if _, ok := peerMap[p]; ok {
panic(p)
}
peerMap[p] = struct{}{}
}
}
var peersForPiecesPool sync.Pool
func makePeersForPiece(cap int) []*peersForPieceRequests {
got := peersForPiecesPool.Get()
if got == nil {
return make([]*peersForPieceRequests, 0, cap)
}
return got.([]*peersForPieceRequests)[:0]
}
type peersForPieceSorter struct {
peersForPiece []*peersForPieceRequests
2021-09-19 15:16:37 +10:00
req *RequestIndex
p requestablePiece
}
func (me *peersForPieceSorter) Len() int {
return len(me.peersForPiece)
}
func (me *peersForPieceSorter) Swap(i, j int) {
me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
}
func (me *peersForPieceSorter) Less(_i, _j int) bool {
i := me.peersForPiece[_i]
j := me.peersForPiece[_j]
req := me.req
p := &me.p
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
2021-09-19 15:16:37 +10:00
iHas := i.nextState.Requests.Contains(*req)
jHas := j.nextState.Requests.Contains(*req)
ml = ml.Bool(jHas, iHas)
}
return ml
}()
ml := multiless.New()
// We always "reallocate", that is force even striping amongst peers that are either on
// the last piece they can contribute too, or for pieces marked for this behaviour.
// Striping prevents starving peers of requests, and will always re-balance to the
// fastest known peers.
if !p.alwaysReallocate {
ml = ml.Bool(
j.requestablePiecesRemaining == 1,
i.requestablePiecesRemaining == 1)
}
if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
ml = ml.Int(
i.requestsInPiece,
j.requestsInPiece)
} else {
ml = ml.AndThen(byHasRequest)
}
ml = ml.Int(
i.requestablePiecesRemaining,
j.requestablePiecesRemaining,
).Float64(
j.DownloadRate,
i.DownloadRate,
)
if ml.Ok() {
return ml.Less()
}
ml = ml.AndThen(byHasRequest)
return ml.Int64(
int64(j.Age), int64(i.Age),
// TODO: Probably peer priority can come next
).Uintptr(
i.Id.Uintptr(),
j.Id.Uintptr(),
).MustLess()
}
2021-05-14 13:06:12 +10:00
func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece := makePeersForPiece(len(peers))
2021-05-13 18:35:49 +10:00
for _, peer := range peers {
if !peer.canRequestPiece(p.index) {
continue
}
if !peer.canFitRequest() {
peer.requestablePiecesRemaining--
continue
}
2021-05-13 18:35:49 +10:00
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
defer func() {
for _, peer := range peersForPiece {
peer.requestablePiecesRemaining--
}
peersForPiecesPool.Put(peersForPiece)
}()
peersForPieceSorter := peersForPieceSorter{
peersForPiece: peersForPiece,
p: p,
}
2021-09-19 15:16:37 +10:00
sortPeersForPiece := func(req *RequestIndex) {
peersForPieceSorter.req = req
sort.Sort(&peersForPieceSorter)
2021-11-08 14:47:01 +11:00
// ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
2021-05-13 18:35:49 +10:00
}
// Chunks can be preassigned several times, if peers haven't been able to update their "actual"
// with "next" request state before another request strategy run occurs.
2021-09-19 15:16:37 +10:00
preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
p.IterPendingChunks(func(spec ChunkIndex) {
req := p.chunkIndexToRequestIndex(spec)
for _, peer := range peersForPiece {
if !peer.ExistingRequests.Contains(req) {
continue
}
if !peer.canFitRequest() {
continue
}
preallocated[spec] = append(preallocated[spec], peer)
peer.addNextRequest(req)
}
})
2021-05-13 18:35:49 +10:00
pendingChunksRemaining := int(p.NumPendingChunks)
2021-09-19 15:16:37 +10:00
p.IterPendingChunks(func(chunk ChunkIndex) {
if len(preallocated[chunk]) != 0 {
return
}
2021-09-19 15:16:37 +10:00
req := p.chunkIndexToRequestIndex(chunk)
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece(nil)
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
2021-05-13 18:35:49 +10:00
}
if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
2021-05-13 18:35:49 +10:00
continue
}
}
peer.addNextRequest(req)
2021-05-14 13:06:12 +10:00
break
}
})
chunk:
for chunk, prePeers := range preallocated {
2021-09-19 15:16:37 +10:00
if len(prePeers) == 0 {
continue
}
2021-05-14 13:06:12 +10:00
pendingChunksRemaining--
2021-09-19 15:16:37 +10:00
req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
for _, pp := range prePeers {
pp.requestsInPiece--
}
sortPeersForPiece(&req)
for _, pp := range prePeers {
2021-09-19 15:16:37 +10:00
pp.nextState.Requests.Remove(req)
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
2021-05-13 18:35:49 +10:00
continue
}
}
peer.addNextRequest(req)
continue chunk
}
2021-05-13 18:35:49 +10:00
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
}