Move piece allocation to its own func

This commit is contained in:
Matt Joiner 2021-05-13 18:35:49 +10:00
parent 07ba6e9210
commit 4e9f707aeb
1 changed files with 92 additions and 88 deletions

View File

@ -133,94 +133,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
if !p.Request {
continue
}
peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
for _, peer := range allPeers[p.t] {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
sortPeersForPiece := func() {
sort.Slice(peersForPiece, func(i, j int) bool {
return multiless.New().Int(
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece,
).Int(
peersForPiece[i].requestablePiecesRemaining,
peersForPiece[j].requestablePiecesRemaining,
).Float64(
peersForPiece[j].DownloadRate,
peersForPiece[i].DownloadRate,
).Int64(
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
peersForPiece[i].Id.Uintptr(),
peersForPiece[j].Id.Uintptr(),
).MustLess()
})
}
pendingChunksRemaining := int(p.NumPendingChunks)
if f := torrentPiece.IterPendingChunks; f != nil {
f(func(chunk types.ChunkSpec) {
req := Request{pp.Integer(p.index), chunk}
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece()
skipped := 0
// Try up to the number of peers that could legitimately receive the request equal to
// the number of chunks left. This should ensure that only the best peers serve the last
// few chunks in a piece.
lowestNumRequestsInPiece := math.MaxInt16
for _, peer := range peersForPiece {
if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
continue
}
if skipped+1 >= pendingChunksRemaining {
break
}
if f := peer.HasExistingRequest; f == nil || !f(req) {
skipped++
lowestNumRequestsInPiece = peer.requestsInPiece
continue
}
if peer.requestsInPiece > lowestNumRequestsInPiece {
break
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// We must stay interested for this.
peer.nextState.Interested = true
}
peer.addNextRequest(req)
return
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast
// pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
return
}
})
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
for _, peer := range peersForPiece {
if peer.canRequestPiece(p.index) {
peer.requestablePiecesRemaining--
}
}
allocatePendingChunks(p, allPeers[p.t])
}
ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
@ -233,3 +146,94 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
}
return ret
}
func allocatePendingChunks(p pieceRequestOrderPiece, peers []*requestsPeer) {
peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
for _, peer := range peers {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
sortPeersForPiece := func() {
sort.Slice(peersForPiece, func(i, j int) bool {
return multiless.New().Int(
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece,
).Int(
peersForPiece[i].requestablePiecesRemaining,
peersForPiece[j].requestablePiecesRemaining,
).Float64(
peersForPiece[j].DownloadRate,
peersForPiece[i].DownloadRate,
).Int64(
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
peersForPiece[i].Id.Uintptr(),
peersForPiece[j].Id.Uintptr(),
).MustLess()
})
}
pendingChunksRemaining := int(p.NumPendingChunks)
if f := p.IterPendingChunks; f != nil {
f(func(chunk types.ChunkSpec) {
req := Request{pp.Integer(p.index), chunk}
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece()
skipped := 0
// Try up to the number of peers that could legitimately receive the request equal to
// the number of chunks left. This should ensure that only the best peers serve the last
// few chunks in a piece.
lowestNumRequestsInPiece := math.MaxInt16
for _, peer := range peersForPiece {
if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.pieceAllowedFastOrDefault(p.index) && peer.Choking) {
continue
}
if skipped+1 >= pendingChunksRemaining {
break
}
if f := peer.HasExistingRequest; f == nil || !f(req) {
skipped++
lowestNumRequestsInPiece = peer.requestsInPiece
continue
}
if peer.requestsInPiece > lowestNumRequestsInPiece {
break
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// We must stay interested for this.
peer.nextState.Interested = true
}
peer.addNextRequest(req)
return
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast
// pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
return
}
})
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
for _, peer := range peersForPiece {
if peer.canRequestPiece(p.index) {
peer.requestablePiecesRemaining--
}
}
}