Add client-level max unverified bytes

This commit is contained in:
Matt Joiner 2021-05-14 13:40:09 +10:00
parent 63b0e42731
commit ad298364aa
5 changed files with 51 additions and 32 deletions

View File

@ -173,6 +173,7 @@ type DownloadCmd struct {
TestPeer []string `help:"addresses of some starting peers"`
Seed bool `help:"seed after download is complete"`
Addr string `help:"network listen addr"`
MaxUnverifiedBytes tagflag.Bytes `help:"maximum number bytes to have pending verification"`
UploadRate *tagflag.Bytes `help:"max piece bytes to send per second"`
DownloadRate *tagflag.Bytes `help:"max bytes per second down from peers"`
PackedBlocklist string
@ -311,6 +312,7 @@ func downloadErr() error {
if flags.Quiet {
clientConfig.Logger = log.Discard
}
clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
var stop missinggo.SynchronizedEvent
defer func() {

View File

@ -59,6 +59,8 @@ type ClientConfig struct {
// (~4096), and the requested chunk size (~16KiB, see
// TorrentSpec.ChunkSize).
DownloadRateLimiter *rate.Limiter
// Maximum unverified bytes across all torrents. Not used if zero.
MaxUnverifiedBytes int64
// User-provided Client peer ID. If not present, one is generated automatically.
PeerID string

View File

@ -28,7 +28,6 @@ func (cl *Client) doRequests() {
for _, t := range cl.torrents {
rst := request_strategy.Torrent{
StableId: uintptr(unsafe.Pointer(t)),
MaxUnverifiedBytes: 10 << 20,
}
if t.storage != nil {
rst.Capacity = t.storage.Capacity
@ -72,7 +71,10 @@ func (cl *Client) doRequests() {
})
ts = append(ts, rst)
}
nextPeerStates := cl.pieceRequestOrder.DoRequests(ts)
nextPeerStates := request_strategy.Run(request_strategy.Input{
Torrents: ts,
MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
})
for p, state := range nextPeerStates {
applyPeerNextRequestState(p, state)
}

View File

@ -84,12 +84,12 @@ type filterPiece struct {
Piece
}
func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
func getRequestablePieces(input Input) (ret []requestablePiece) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
var pieces []filterPiece
for _, _t := range torrents {
for _, _t := range input.Torrents {
// TODO: We could do metainfo requests here.
t := &filterTorrent{
Torrent: _t,
@ -111,6 +111,7 @@ func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
}
}
sortFilterPieces(pieces)
var allTorrentsUnverifiedBytes int64
for _, piece := range pieces {
if left := piece.t.storageLeft; left != nil {
if *left < int64(piece.Length) {
@ -119,12 +120,18 @@ func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
*left -= int64(piece.Length)
}
if !piece.Request || piece.NumPendingChunks == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
continue
}
if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
continue
}
if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
continue
}
piece.t.unverifiedBytes += piece.Length
allTorrentsUnverifiedBytes += piece.Length
ret = append(ret, requestablePiece{
index: piece.index,
t: piece.t.Torrent,
@ -135,9 +142,15 @@ func getRequestablePieces(torrents []Torrent) (ret []requestablePiece) {
return
}
type Input struct {
Torrents []Torrent
MaxUnverifiedBytes int64
}
// TODO: We could do metainfo requests here.
func (requestOrder *ClientPieceOrder) DoRequests(torrents []Torrent) map[PeerId]PeerNextRequestState {
requestPieces := getRequestablePieces(torrents)
func Run(input Input) map[PeerId]PeerNextRequestState {
requestPieces := getRequestablePieces(input)
torrents := input.Torrents
allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
for _, t := range torrents {
peers := make([]*requestsPeer, 0, len(t.Peers))

View File

@ -45,7 +45,6 @@ func (i intPeerId) Uintptr() uintptr {
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
order := ClientPieceOrder{}
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
@ -64,7 +63,7 @@ func TestStealingFromSlowerPeer(t *testing.T) {
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := order.DoRequests([]Torrent{{
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 5,
@ -75,7 +74,8 @@ func TestStealingFromSlowerPeer(t *testing.T) {
firstStealer,
secondStealer,
},
}})
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
@ -93,7 +93,6 @@ func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, in
func TestStealingFromSlowerPeersBasic(t *testing.T) {
c := qt.New(t)
order := ClientPieceOrder{}
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
@ -111,7 +110,7 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := order.DoRequests([]Torrent{{
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 2,
@ -122,7 +121,8 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
firstStealer,
secondStealer,
},
}})
}}})
checkNumRequestsAndInterest(c, results[firstStealer.Id], 1, true)
checkNumRequestsAndInterest(c, results[secondStealer.Id], 1, true)
checkNumRequestsAndInterest(c, results[stealee.Id], 0, false)
@ -130,7 +130,6 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
order := ClientPieceOrder{}
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
@ -150,7 +149,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := order.DoRequests([]Torrent{{
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 4,
@ -161,7 +160,8 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
firstStealer,
secondStealer,
},
}})
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
@ -177,7 +177,6 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
order := ClientPieceOrder{}
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
@ -198,7 +197,7 @@ func TestDontStealUnnecessarily(t *testing.T) {
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := order.DoRequests([]Torrent{{
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 9,
@ -209,7 +208,8 @@ func TestDontStealUnnecessarily(t *testing.T) {
stealee,
secondStealer,
},
}})
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)