2
0
mirror of synced 2025-02-24 06:38:14 +00:00

259 lines
6.2 KiB
Go
Raw Normal View History

package request_strategy
import (
"sort"
"github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
)
type (
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
2021-05-13 11:26:22 +10:00
// This can be made into a type-param later, will be great for testing.
ChunkSpec = types.ChunkSpec
)
type ClientPieceOrder struct {
pieces []pieceRequestOrderPiece
}
type pieceRequestOrderPiece struct {
t *Torrent
index pieceIndex
Piece
}
func (me *ClientPieceOrder) Len() int {
return len(me.pieces)
}
func (me ClientPieceOrder) sort() {
sort.Slice(me.pieces, me.less)
}
func (me ClientPieceOrder) less(_i, _j int) bool {
i := me.pieces[_i]
j := me.pieces[_j]
return multiless.New().Int(
int(j.Priority), int(i.Priority),
).Bool(
j.Partial, i.Partial,
).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
}
type requestsPeer struct {
2021-05-13 11:26:22 +10:00
Peer
nextState PeerNextRequestState
requestablePiecesRemaining int
}
func (rp *requestsPeer) canFitRequest() bool {
2021-05-13 11:26:22 +10:00
return len(rp.nextState.Requests) < rp.MaxRequests
}
func (rp *requestsPeer) addNextRequest(r Request) {
_, ok := rp.nextState.Requests[r]
if ok {
panic("should only add once")
}
rp.nextState.Requests[r] = struct{}{}
}
type peersForPieceRequests struct {
requestsInPiece int
*requestsPeer
}
func (me *peersForPieceRequests) addNextRequest(r Request) {
me.requestsPeer.addNextRequest(r)
me.requestsInPiece++
}
type Torrent struct {
Pieces []Piece
Capacity *func() *int64
2021-05-13 11:26:22 +10:00
Peers []Peer // not closed.
}
2021-05-13 11:26:22 +10:00
func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
requestOrder.pieces = requestOrder.pieces[:0]
allPeers := make(map[*Torrent][]*requestsPeer)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
for _, t := range torrents {
// TODO: We could do metainfo requests here.
key := t.Capacity
if key != nil {
if _, ok := storageLeft[key]; !ok {
storageLeft[key] = (*key)()
}
}
var peers []*requestsPeer
for _, p := range t.Peers {
peers = append(peers, &requestsPeer{
Peer: p,
nextState: PeerNextRequestState{
Requests: make(map[Request]struct{}),
},
})
}
for i, tp := range t.Pieces {
requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
t: t,
index: i,
Piece: tp,
})
if tp.Request {
for _, p := range peers {
if p.canRequestPiece(i) {
p.requestablePiecesRemaining++
}
}
}
}
allPeers[t] = peers
}
requestOrder.sort()
for _, p := range requestOrder.pieces {
torrentPiece := p
if left := storageLeft[p.t.Capacity]; left != nil {
if *left < int64(torrentPiece.Length) {
continue
}
*left -= int64(torrentPiece.Length)
}
if !p.Request {
continue
}
2021-05-13 18:35:49 +10:00
allocatePendingChunks(p, allPeers[p.t])
}
2021-05-13 11:26:22 +10:00
ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
ret[rp.Id] = rp.nextState
}
}
return ret
}
2021-05-13 18:35:49 +10:00
func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
for _, peer := range peers {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
defer func() {
for _, peer := range peersForPiece {
if peer.canRequestPiece(p.index) {
peer.requestablePiecesRemaining--
}
}
}()
sortPeersForPiece := func(byHasRequest *Request) {
2021-05-13 18:35:49 +10:00
sort.Slice(peersForPiece, func(i, j int) bool {
ml := multiless.New().Int(
2021-05-13 18:35:49 +10:00
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece,
).Int(
peersForPiece[i].requestablePiecesRemaining,
peersForPiece[j].requestablePiecesRemaining,
).Float64(
peersForPiece[j].DownloadRate,
peersForPiece[i].DownloadRate,
)
if byHasRequest != nil {
_, iHas := peersForPiece[i].nextState.Requests[*byHasRequest]
_, jHas := peersForPiece[j].nextState.Requests[*byHasRequest]
ml = ml.Bool(jHas, iHas)
}
return ml.Int64(
2021-05-13 18:35:49 +10:00
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
peersForPiece[i].Id.Uintptr(),
peersForPiece[j].Id.Uintptr(),
).MustLess()
})
}
preallocated := make(map[ChunkSpec]*peersForPieceRequests, p.NumPendingChunks)
p.iterPendingChunksWrapper(func(spec ChunkSpec) {
req := Request{pp.Integer(p.index), spec}
for _, peer := range peersForPiece {
if h := peer.HasExistingRequest; h == nil || !h(req) {
continue
}
if !peer.canFitRequest() {
continue
}
if !peer.canRequestPiece(p.index) {
continue
}
preallocated[spec] = peer
peer.addNextRequest(req)
}
})
2021-05-13 18:35:49 +10:00
pendingChunksRemaining := int(p.NumPendingChunks)
p.iterPendingChunksWrapper(func(chunk types.ChunkSpec) {
if _, ok := preallocated[chunk]; ok {
return
}
req := Request{pp.Integer(p.index), chunk}
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece(nil)
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
2021-05-13 18:35:49 +10:00
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
2021-05-13 18:35:49 +10:00
continue
}
}
peer.addNextRequest(req)
return
}
})
chunk:
for chunk, prePeer := range preallocated {
req := Request{pp.Integer(p.index), chunk}
prePeer.requestsInPiece--
sortPeersForPiece(&req)
delete(prePeer.nextState.Requests, req)
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
2021-05-13 18:35:49 +10:00
continue
}
}
pendingChunksRemaining--
peer.addNextRequest(req)
continue chunk
}
2021-05-13 18:35:49 +10:00
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
}