Try a state-delta function for updating request state
Also adds Torrent.networkingEnabled, though it isn't yet useful.
This commit is contained in:
parent
493916c279
commit
76c60ffa77
@ -1178,6 +1178,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
|
|||||||
|
|
||||||
storageOpener: storageClient,
|
storageOpener: storageClient,
|
||||||
maxEstablishedConns: defaultEstablishedConnsPerTorrent,
|
maxEstablishedConns: defaultEstablishedConnsPerTorrent,
|
||||||
|
|
||||||
|
networkingEnabled: true,
|
||||||
}
|
}
|
||||||
t.setChunkSize(defaultChunkSize)
|
t.setChunkSize(defaultChunkSize)
|
||||||
return
|
return
|
||||||
|
@ -484,26 +484,67 @@ func (cn *connection) Bitfield(haves []bool) {
|
|||||||
cn.sentHaves = append([]bool(nil), haves...)
|
cn.sentHaves = append([]bool(nil), haves...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nextRequestState(
|
||||||
|
networkingEnabled bool,
|
||||||
|
currentRequests map[request]struct{},
|
||||||
|
peerChoking bool,
|
||||||
|
nextPieces prioritybitmap.PriorityBitmap,
|
||||||
|
pendingChunks func(piece int, f func(chunkSpec) bool) bool,
|
||||||
|
requestsLowWater int,
|
||||||
|
requestsHighWater int,
|
||||||
|
) (
|
||||||
|
requests map[request]struct{},
|
||||||
|
interested bool,
|
||||||
|
) {
|
||||||
|
if !networkingEnabled || nextPieces.IsEmpty() {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if peerChoking || len(currentRequests) > requestsLowWater {
|
||||||
|
return currentRequests, true
|
||||||
|
}
|
||||||
|
requests = make(map[request]struct{}, requestsHighWater)
|
||||||
|
for r := range currentRequests {
|
||||||
|
requests[r] = struct{}{}
|
||||||
|
}
|
||||||
|
nextPieces.IterTyped(func(piece int) bool {
|
||||||
|
return pendingChunks(piece, func(cs chunkSpec) bool {
|
||||||
|
if len(requests) >= requestsHighWater {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
r := request{pp.Integer(piece), cs}
|
||||||
|
requests[r] = struct{}{}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
})
|
||||||
|
return requests, true
|
||||||
|
}
|
||||||
|
|
||||||
func (cn *connection) updateRequests() {
|
func (cn *connection) updateRequests() {
|
||||||
if !cn.t.haveInfo() {
|
rs, i := nextRequestState(
|
||||||
return
|
cn.t.networkingEnabled,
|
||||||
}
|
cn.Requests,
|
||||||
if cn.Interested {
|
cn.PeerChoked,
|
||||||
if cn.PeerChoked {
|
cn.pieceRequestOrder,
|
||||||
return
|
func(piece int, f func(chunkSpec) bool) bool {
|
||||||
}
|
return undirtiedChunks(piece, cn.t, f)
|
||||||
if len(cn.Requests) > cn.requestsLowWater {
|
},
|
||||||
return
|
cn.requestsLowWater,
|
||||||
|
cn.nominalMaxRequests())
|
||||||
|
for r := range cn.Requests {
|
||||||
|
if _, ok := rs[r]; !ok {
|
||||||
|
if !cn.Cancel(r) {
|
||||||
|
panic("wat")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cn.fillRequests()
|
|
||||||
if len(cn.Requests) == 0 && !cn.PeerChoked {
|
|
||||||
// So we're not choked, but we don't want anything right now. We may
|
|
||||||
// have completed readahead, and the readahead window has not rolled
|
|
||||||
// over to the next piece. Better to stay interested in case we're
|
|
||||||
// going to want data in the near future.
|
|
||||||
cn.SetInterested(!cn.t.haveAllPieces())
|
|
||||||
}
|
}
|
||||||
|
for r := range rs {
|
||||||
|
if _, ok := cn.Requests[r]; !ok {
|
||||||
|
if !cn.Request(r) {
|
||||||
|
panic("how")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cn.SetInterested(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *connection) fillRequests() {
|
func (cn *connection) fillRequests() {
|
||||||
@ -526,6 +567,13 @@ func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
|
||||||
|
chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
|
||||||
|
return iter.ForPerm(len(chunkIndices), func(i int) bool {
|
||||||
|
return f(t.chunkIndexSpec(chunkIndices[i], piece))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (cn *connection) stopRequestingPiece(piece int) {
|
func (cn *connection) stopRequestingPiece(piece int) {
|
||||||
cn.pieceRequestOrder.Remove(piece)
|
cn.pieceRequestOrder.Remove(piece)
|
||||||
cn.updateRequests()
|
cn.updateRequests()
|
||||||
|
@ -43,6 +43,8 @@ type peersKey struct {
|
|||||||
type Torrent struct {
|
type Torrent struct {
|
||||||
cl *Client
|
cl *Client
|
||||||
|
|
||||||
|
networkingEnabled bool
|
||||||
|
|
||||||
closed missinggo.Event
|
closed missinggo.Event
|
||||||
infoHash metainfo.Hash
|
infoHash metainfo.Hash
|
||||||
pieces []piece
|
pieces []piece
|
||||||
@ -1348,6 +1350,9 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) wantConns() bool {
|
func (t *Torrent) wantConns() bool {
|
||||||
|
if !t.networkingEnabled {
|
||||||
|
return false
|
||||||
|
}
|
||||||
if t.closed.IsSet() {
|
if t.closed.IsSet() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user