2
0
mirror of synced 2025-02-23 06:08:07 +00:00

Store peer requests in a bitmap

This commit is contained in:
Matt Joiner 2021-09-19 15:16:37 +10:00
parent cd49f75cb9
commit 1d2d1a9cde
11 changed files with 230 additions and 169 deletions

View File

@ -97,7 +97,7 @@ type Peer struct {
// Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want.
validReceiveChunks map[Request]int
validReceiveChunks map[RequestIndex]int
// Indexed by metadata piece, set to true if posted and pending a
// response.
metadataRequests []bool
@ -175,18 +175,20 @@ func (cn *Peer) updateExpectingChunks() {
}
func (cn *Peer) expectingChunks() bool {
if len(cn.actualRequestState.Requests) == 0 {
if cn.actualRequestState.Requests.IsEmpty() {
return false
}
if !cn.actualRequestState.Interested {
return false
}
for r := range cn.actualRequestState.Requests {
if !cn.remoteChokingPiece(r.Index.Int()) {
return true
}
if cn.peerAllowedFast.IterTyped(func(_i int) bool {
i := RequestIndex(_i)
return cn.actualRequestState.Requests.Rank((i+1)*cn.t.chunksPerRegularPiece())-
cn.actualRequestState.Requests.Rank(i*cn.t.chunksPerRegularPiece()) == 0
}) {
return true
}
return false
return !cn.peerChoking
}
func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool {
@ -333,9 +335,10 @@ func (cn *Peer) downloadRate() float64 {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
for r := range cn.actualRequestState.Requests {
ret[pieceIndex(r.Index)]++
}
cn.actualRequestState.Requests.Iterate(func(x uint32) bool {
ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++
return true
})
return
}
@ -365,7 +368,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
&cn._stats.ChunksReadUseful,
&cn._stats.ChunksRead,
&cn._stats.ChunksWritten,
len(cn.actualRequestState.Requests),
cn.actualRequestState.Requests.GetCardinality(),
cn.nominalMaxRequests(),
cn.PeerMaxRequests,
len(cn.peerRequests),
@ -547,8 +550,9 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
// are okay.
type messageWriter func(pp.Message) bool
func (cn *Peer) shouldRequest(r Request) error {
if !cn.peerHasPiece(pieceIndex(r.Index)) {
func (cn *Peer) shouldRequest(r RequestIndex) error {
pi := pieceIndex(r / cn.t.chunksPerRegularPiece())
if !cn.peerHasPiece(pi) {
return errors.New("requesting piece peer doesn't have")
}
if !cn.t.peerIsActive(cn) {
@ -557,42 +561,40 @@ func (cn *Peer) shouldRequest(r Request) error {
if cn.closed.IsSet() {
panic("requesting when connection is closed")
}
if cn.t.hashingPiece(pieceIndex(r.Index)) {
if cn.t.hashingPiece(pi) {
panic("piece is being hashed")
}
if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
if cn.t.pieceQueuedForHash(pi) {
panic("piece is queued for hash")
}
if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) {
panic("peer choking and piece not allowed fast")
}
return nil
}
func (cn *Peer) request(r Request) (more bool, err error) {
func (cn *Peer) request(r RequestIndex) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
if _, ok := cn.actualRequestState.Requests[r]; ok {
if cn.actualRequestState.Requests.Contains(r) {
return true, nil
}
if len(cn.actualRequestState.Requests) >= cn.nominalMaxRequests() {
if maxRequests(cn.actualRequestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests")
}
if cn.actualRequestState.Requests == nil {
cn.actualRequestState.Requests = make(map[Request]struct{})
}
cn.actualRequestState.Requests[r] = struct{}{}
cn.actualRequestState.Requests.Add(r)
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[Request]int)
cn.validReceiveChunks = make(map[RequestIndex]int)
}
cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++
cn.updateExpectingChunks()
ppReq := cn.t.requestIndexToRequest(r)
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
f(PeerRequestEvent{cn, ppReq})
}
return cn.peerImpl._request(r), nil
return cn.peerImpl._request(ppReq), nil
}
func (me *PeerConn) _request(r Request) bool {
@ -604,9 +606,9 @@ func (me *PeerConn) _request(r Request) bool {
})
}
func (me *Peer) cancel(r Request) bool {
func (me *Peer) cancel(r RequestIndex) bool {
if me.deleteRequest(r) {
return me.peerImpl._cancel(r)
return me.peerImpl._cancel(me.t.requestIndexToRequest(r))
}
return true
}
@ -653,7 +655,7 @@ func (cn *PeerConn) postBitfield() {
}
func (cn *PeerConn) updateRequests() {
if len(cn.actualRequestState.Requests) != 0 {
if cn.actualRequestState.Requests.GetCardinality() != 0 {
return
}
cn.tickleWriter()
@ -1128,7 +1130,7 @@ func (c *PeerConn) mainReadLoop() (err error) {
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Reject:
c.remoteRejectedRequest(newRequestFromMessage(&msg))
c.remoteRejectedRequest(c.t.requestIndexFromRequest(newRequestFromMessage(&msg)))
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
@ -1145,13 +1147,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
}
}
func (c *Peer) remoteRejectedRequest(r Request) {
func (c *Peer) remoteRejectedRequest(r RequestIndex) {
if c.deleteRequest(r) {
c.decExpectedChunkReceive(r)
}
}
func (c *Peer) decExpectedChunkReceive(r Request) {
func (c *Peer) decExpectedChunkReceive(r RequestIndex) {
count := c.validReceiveChunks[r]
if count == 1 {
delete(c.validReceiveChunks, r)
@ -1250,7 +1252,8 @@ func (c *Peer) doChunkReadStats(size int64) {
func (c *Peer) receiveChunk(msg *pp.Message) error {
chunksReceived.Add("total", 1)
req := newRequestFromMessage(msg)
ppReq := newRequestFromMessage(msg)
req := c.t.requestIndexFromRequest(ppReq)
if c.peerChoking {
chunksReceived.Add("while choked", 1)
@ -1262,7 +1265,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
}
c.decExpectedChunkReceive(req)
if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(req.Index)) {
if c.peerChoking && c.peerAllowedFast.Get(bitmap.BitIndex(ppReq.Index)) {
chunksReceived.Add("due to allowed fast", 1)
}
@ -1271,7 +1274,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
// out.
deletedRequest := false
{
if _, ok := c.actualRequestState.Requests[req]; ok {
if c.actualRequestState.Requests.Contains(req) {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
@ -1291,13 +1294,13 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
cl := t.cl
// Do we actually want this chunk?
if t.haveChunk(req) {
if t.haveChunk(ppReq) {
chunksReceived.Add("wasted", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted }))
return nil
}
piece := &t.pieces[req.Index]
piece := &t.pieces[ppReq.Index]
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
@ -1316,7 +1319,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
piece.incrementPendingWrites()
// Record that we have the chunk, so we aren't trying to download it while
// waiting for it to be written to storage.
piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk from *other* peers.
t.iterPeers(func(p *Peer) {
@ -1349,11 +1352,11 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
return nil
}
c.onDirtiedPiece(pieceIndex(req.Index))
c.onDirtiedPiece(pieceIndex(ppReq.Index))
// We need to ensure the piece is only queued once, so only the last chunk writer gets this job.
if t.pieceAllDirty(pieceIndex(req.Index)) && piece.pendingWrites == 0 {
t.queuePieceCheck(pieceIndex(req.Index))
if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 {
t.queuePieceCheck(pieceIndex(ppReq.Index))
// We don't pend all chunks here anymore because we don't want code dependent on the dirty
// chunk status (such as the haveChunk call above) to have to check all the various other
// piece states like queued for hash, hashing etc. This does mean that we need to be sure
@ -1362,7 +1365,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
cl.event.Broadcast()
// We do this because we've written a chunk, and may change PieceState.Partial.
t.publishPieceChange(pieceIndex(req.Index))
t.publishPieceChange(pieceIndex(ppReq.Index))
return nil
}
@ -1456,14 +1459,13 @@ func (c *Peer) peerHasWantedPieces() bool {
return !c._pieceRequestOrder.IsEmpty()
}
func (c *Peer) deleteRequest(r Request) bool {
delete(c.nextRequestState.Requests, r)
if _, ok := c.actualRequestState.Requests[r]; !ok {
func (c *Peer) deleteRequest(r RequestIndex) bool {
c.nextRequestState.Requests.Remove(r)
if !c.actualRequestState.Requests.CheckedRemove(r) {
return false
}
delete(c.actualRequestState.Requests, r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, r})
f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)})
}
c.updateExpectingChunks()
pr := c.t.pendingRequests
@ -1479,13 +1481,14 @@ func (c *Peer) deleteRequest(r Request) bool {
}
func (c *Peer) deleteAllRequests() {
for r := range c.actualRequestState.Requests {
c.deleteRequest(r)
c.actualRequestState.Requests.Clone().Iterate(func(x uint32) bool {
c.deleteRequest(x)
return true
})
if !c.actualRequestState.Requests.IsEmpty() {
panic(c.actualRequestState.Requests.GetCardinality())
}
if l := len(c.actualRequestState.Requests); l != 0 {
panic(l)
}
c.nextRequestState.Requests = nil
c.nextRequestState.Requests.Clear()
// for c := range c.t.conns {
// c.tickleWriter()
// }

View File

@ -23,7 +23,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
cl.initLogger()
c := cl.newConnection(nil, false, nil, "io.Pipe", "")
c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
if err := c.t.setInfo(&metainfo.Info{ Pieces: make([]byte, metainfo.HashSize*3) }); err != nil {
if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
t.Log(err)
}
r, w := io.Pipe()
@ -129,7 +129,9 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
// The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear()
cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
cn.validReceiveChunks = map[RequestIndex]int{
t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
}
cl.unlock()
n, err := w.Write(wb)
require.NoError(b, err)

View File

@ -55,12 +55,12 @@ func (p *Piece) Storage() storage.Piece {
return p.t.storage.Piece(p.Info())
}
func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex))
}
func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
return p.pendingChunkIndex(chunkIndexFromChunkSpec(cs, chunkSize))
}
func (p *Piece) hasDirtyChunks() bool {
@ -71,17 +71,17 @@ func (p *Piece) numDirtyChunks() pp.Integer {
return pp.Integer(p._dirtyChunks.Len())
}
func (p *Piece) unpendChunkIndex(i int) {
func (p *Piece) unpendChunkIndex(i chunkIndexType) {
p._dirtyChunks.Add(bitmap.BitIndex(i))
p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i int) {
func (p *Piece) pendChunkIndex(i RequestIndex) {
p._dirtyChunks.Remove(bitmap.BitIndex(i))
}
func (p *Piece) numChunks() pp.Integer {
return p.t.pieceNumChunks(p.index)
return pp.Integer(p.t.pieceNumChunks(p.index))
}
func (p *Piece) incrementPendingWrites() {
@ -237,11 +237,15 @@ func (p *Piece) State() PieceState {
return p.t.PieceState(p.index)
}
func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec)) {
for i := pp.Integer(0); i < p.numChunks(); i++ {
if p.chunkIndexDirty(i) {
func (p *Piece) iterUndirtiedChunks(f func(cs chunkIndexType)) {
for i := chunkIndexType(0); i < chunkIndexType(p.numChunks()); i++ {
if p.chunkIndexDirty(pp.Integer(i)) {
continue
}
f(p.chunkIndexSpec(i))
f(i)
}
}
func (p *Piece) requestIndexOffset() RequestIndex {
return RequestIndex(p.index) * p.t.chunksPerRegularPiece()
}

View File

@ -10,11 +10,12 @@ import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
)
type (
RequestIndex = uint32
ChunkIndex = uint32
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
@ -59,15 +60,13 @@ type requestsPeer struct {
}
func (rp *requestsPeer) canFitRequest() bool {
return len(rp.nextState.Requests) < rp.MaxRequests
return int(rp.nextState.Requests.GetCardinality()) < rp.MaxRequests
}
func (rp *requestsPeer) addNextRequest(r Request) {
_, ok := rp.nextState.Requests[r]
if ok {
func (rp *requestsPeer) addNextRequest(r RequestIndex) {
if !rp.nextState.Requests.CheckedAdd(r) {
panic("should only add once")
}
rp.nextState.Requests[r] = struct{}{}
}
type peersForPieceRequests struct {
@ -75,7 +74,7 @@ type peersForPieceRequests struct {
*requestsPeer
}
func (me *peersForPieceRequests) addNextRequest(r Request) {
func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
me.requestsPeer.addNextRequest(r)
me.requestsInPiece++
}
@ -88,6 +87,10 @@ type requestablePiece struct {
IterPendingChunks ChunksIter
}
func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
return RequestIndex(p.t.ChunksPerPiece*p.index) + RequestIndex(c)
}
type filterPiece struct {
t *filterTorrent
index pieceIndex
@ -181,9 +184,6 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
for _, p := range t.Peers {
peers = append(peers, &requestsPeer{
Peer: p,
nextState: PeerNextRequestState{
Requests: make(map[Request]struct{}, p.MaxRequests),
},
})
}
allPeers[t.InfoHash] = peers
@ -239,7 +239,7 @@ func makePeersForPiece(cap int) []*peersForPieceRequests {
type peersForPieceSorter struct {
peersForPiece []*peersForPieceRequests
req *Request
req *RequestIndex
p requestablePiece
}
@ -259,8 +259,8 @@ func (me *peersForPieceSorter) Less(_i, _j int) bool {
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
_, iHas := i.nextState.Requests[*req]
_, jHas := j.nextState.Requests[*req]
iHas := i.nextState.Requests.Contains(*req)
jHas := j.nextState.Requests.Contains(*req)
ml = ml.Bool(jHas, iHas)
}
return ml
@ -327,16 +327,16 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece: peersForPiece,
p: p,
}
sortPeersForPiece := func(req *Request) {
sortPeersForPiece := func(req *RequestIndex) {
peersForPieceSorter.req = req
sort.Sort(&peersForPieceSorter)
//ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
}
// Chunks can be preassigned several times, if peers haven't been able to update their "actual"
// with "next" request state before another request strategy run occurs.
preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
p.IterPendingChunks(func(spec ChunkSpec) {
req := Request{pp.Integer(p.index), spec}
preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
p.IterPendingChunks(func(spec ChunkIndex) {
req := p.chunkIndexToRequestIndex(spec)
for _, peer := range peersForPiece {
if h := peer.HasExistingRequest; h == nil || !h(req) {
continue
@ -349,11 +349,11 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
}
})
pendingChunksRemaining := int(p.NumPendingChunks)
p.IterPendingChunks(func(chunk types.ChunkSpec) {
if _, ok := preallocated[chunk]; ok {
p.IterPendingChunks(func(chunk ChunkIndex) {
if len(preallocated[chunk]) != 0 {
return
}
req := Request{pp.Integer(p.index), chunk}
req := p.chunkIndexToRequestIndex(chunk)
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece(nil)
for _, peer := range peersForPiece {
@ -373,14 +373,17 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
})
chunk:
for chunk, prePeers := range preallocated {
if len(prePeers) == 0 {
continue
}
pendingChunksRemaining--
req := Request{pp.Integer(p.index), chunk}
req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
for _, pp := range prePeers {
pp.requestsInPiece--
}
sortPeersForPiece(&req)
for _, pp := range prePeers {
delete(pp.nextState.Requests, req)
pp.nextState.Requests.Remove(req)
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {

View File

@ -4,36 +4,29 @@ import (
"math"
"testing"
"github.com/RoaringBitmap/roaring"
qt "github.com/frankban/quicktest"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/google/go-cmp/cmp"
)
func r(i pieceIndex, begin int) Request {
return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
}
func chunkIterRange(end int) func(func(ChunkSpec)) {
return func(f func(ChunkSpec)) {
for offset := 0; offset < end; offset += 1 {
f(ChunkSpec{pp.Integer(offset), 1})
func chunkIterRange(end ChunkIndex) ChunksIter {
return func(f func(ChunkIndex)) {
for offset := ChunkIndex(0); offset < end; offset += 1 {
f(offset)
}
}
}
func chunkIter(offsets ...int) func(func(ChunkSpec)) {
return func(f func(ChunkSpec)) {
func chunkIter(offsets ...ChunkIndex) ChunksIter {
return func(f func(ChunkIndex)) {
for _, offset := range offsets {
f(ChunkSpec{pp.Integer(offset), 1})
f(offset)
}
}
}
func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) {
ret = make(map[Request]struct{}, len(rs))
for _, r := range rs {
ret[r] = struct{}{}
}
func requestSetFromSlice(rs ...RequestIndex) (ret roaring.Bitmap) {
ret.AddMany(rs)
return
}
@ -43,6 +36,8 @@ func (i intPeerId) Uintptr() uintptr {
return uintptr(i)
}
func hasAllRequests(RequestIndex) bool { return true }
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
@ -55,15 +50,14 @@ func TestStealingFromSlowerPeer(t *testing.T) {
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
stealee.HasExistingRequest = func(r Request) bool {
return true
}
stealee.HasExistingRequest = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 5,
@ -77,8 +71,9 @@ func TestStealingFromSlowerPeer(t *testing.T) {
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
check := func(p PeerId, l uint64) {
addressableBm := results[p].Requests
c.Check(addressableBm.GetCardinality(), qt.ContentEquals, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(stealee.Id, 1)
@ -86,8 +81,9 @@ func TestStealingFromSlowerPeer(t *testing.T) {
check(secondStealer.Id, 2)
}
func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, interest bool) {
c.Check(next.Requests, qt.HasLen, num)
func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num uint64, interest bool) {
addressableBm := next.Requests
c.Check(addressableBm.GetCardinality(), qt.ContentEquals, num)
c.Check(next.Interested, qt.Equals, interest)
}
@ -102,15 +98,14 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
}
stealee := basePeer
stealee.DownloadRate = 1
stealee.HasExistingRequest = func(r Request) bool {
return true
}
stealee.HasExistingRequest = hasAllRequests
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 2,
@ -128,6 +123,10 @@ func TestStealingFromSlowerPeersBasic(t *testing.T) {
checkNumRequestsAndInterest(c, results[stealee.Id], 0, false)
}
func checkResultsRequestsLen(t *testing.T, reqs roaring.Bitmap, l uint64) {
qt.Check(t, reqs.GetCardinality(), qt.Equals, l)
}
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
@ -140,8 +139,8 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReq := r(0, 0)
stealee.HasExistingRequest = func(r Request) bool {
keepReq := RequestIndex(0)
stealee.HasExistingRequest = func(r RequestIndex) bool {
return r == keepReq
}
stealee.Id = intPeerId(1)
@ -150,6 +149,7 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 4,
@ -163,18 +163,29 @@ func TestPeerKeepsExistingIfReasonable(t *testing.T) {
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
check := func(p PeerId, l uint64) {
checkResultsRequestsLen(t, results[p].Requests, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 2)
check(secondStealer.Id, 1)
c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(keepReq),
})
c.Check(
results[stealee.Id],
peerNextRequestStateChecker,
PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(keepReq),
},
)
}
var peerNextRequestStateChecker = qt.CmpEquals(
cmp.Transformer(
"bitmap",
func(bm roaring.Bitmap) []uint32 {
return bm.ToArray()
}))
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
@ -187,12 +198,14 @@ func TestDontStealUnnecessarily(t *testing.T) {
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
r := func(i, c RequestIndex) RequestIndex {
return i*9 + c
}
keepReqs := requestSetFromSlice(
r(3, 2), r(3, 4), r(3, 6), r(3, 8),
r(4, 0), r(4, 1), r(4, 7), r(4, 8))
stealee.HasExistingRequest = func(r Request) bool {
_, ok := keepReqs[r]
return ok
stealee.HasExistingRequest = func(r RequestIndex) bool {
return keepReqs.Contains(r)
}
stealee.Id = intPeerId(1)
firstStealer := basePeer
@ -208,6 +221,7 @@ func TestDontStealUnnecessarily(t *testing.T) {
}
}
results := Run(Input{Torrents: []Torrent{{
ChunksPerPiece: 9,
Pieces: []Piece{
{
Request: true,
@ -242,16 +256,20 @@ func TestDontStealUnnecessarily(t *testing.T) {
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
check := func(p PeerId, l uint64) {
checkResultsRequestsLen(t, results[p].Requests, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 5)
check(secondStealer.Id, 7+9)
c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
})
c.Check(
results[stealee.Id],
peerNextRequestStateChecker,
PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
},
)
}
// This tests a situation where multiple peers had the same existing request, due to "actual" and
@ -260,10 +278,8 @@ func TestDontStealUnnecessarily(t *testing.T) {
func TestDuplicatePreallocations(t *testing.T) {
peer := func(id int, downloadRate float64) Peer {
return Peer{
HasExistingRequest: func(r Request) bool {
return true
},
MaxRequests: 2,
HasExistingRequest: hasAllRequests,
MaxRequests: 2,
HasPiece: func(i pieceIndex) bool {
return true
},
@ -273,6 +289,7 @@ func TestDuplicatePreallocations(t *testing.T) {
}
results := Run(Input{
Torrents: []Torrent{{
ChunksPerPiece: 1,
Pieces: []Piece{{
Request: true,
NumPendingChunks: 1,
@ -292,5 +309,7 @@ func TestDuplicatePreallocations(t *testing.T) {
}},
})
c := qt.New(t)
c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests))
req1 := results[intPeerId(1)].Requests
req2 := results[intPeerId(2)].Requests
c.Assert(uint64(2), qt.Equals, req1.GetCardinality()+req2.GetCardinality())
}

View File

@ -2,11 +2,13 @@ package request_strategy
import (
"time"
"github.com/RoaringBitmap/roaring"
)
type PeerNextRequestState struct {
Interested bool
Requests map[Request]struct{}
Requests roaring.Bitmap
}
type PeerId interface {
@ -16,7 +18,7 @@ type PeerId interface {
type Peer struct {
HasPiece func(i pieceIndex) bool
MaxRequests int
HasExistingRequest func(r Request) bool
HasExistingRequest func(r RequestIndex) bool
Choking bool
PieceAllowedFast func(pieceIndex) bool
DownloadRate float64

View File

@ -1,10 +1,6 @@
package request_strategy
import (
"github.com/anacrolix/torrent/types"
)
type ChunksIter func(func(types.ChunkSpec))
type ChunksIter func(func(ChunkIndex))
type Piece struct {
Request bool
@ -16,7 +12,7 @@ type Piece struct {
IterPendingChunks ChunksIter
}
func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
i := p.IterPendingChunks
if i != nil {
i(f)

View File

@ -11,7 +11,8 @@ type Torrent struct {
// Unclosed Peers. Not necessary for getting requestable piece ordering.
Peers []Peer
// Some value that's unique and stable between runs. Could even use the infohash?
InfoHash metainfo.Hash
InfoHash metainfo.Hash
ChunksPerPiece int
MaxUnverifiedBytes int64
}

View File

@ -5,7 +5,6 @@ import (
"unsafe"
"github.com/anacrolix/missinggo/v2/bitmap"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/chansync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
@ -43,8 +42,13 @@ func (cl *Client) tickleRequester() {
func (cl *Client) getRequestStrategyInput() request_strategy.Input {
ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
if !t.haveInfo() {
// This would be removed if metadata is handled here.
continue
}
rst := request_strategy.Torrent{
InfoHash: t.infoHash,
InfoHash: t.infoHash,
ChunksPerPiece: (t.usualPieceSize() + int(t.chunkSize) - 1) / int(t.chunkSize),
}
if t.storage != nil {
rst.Capacity = t.storage.Capacity
@ -73,9 +77,8 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
rst.Peers = append(rst.Peers, request_strategy.Peer{
HasPiece: p.peerHasPiece,
MaxRequests: p.nominalMaxRequests(),
HasExistingRequest: func(r request_strategy.Request) bool {
_, ok := p.actualRequestState.Requests[r]
return ok
HasExistingRequest: func(r RequestIndex) bool {
return p.actualRequestState.Requests.Contains(r)
},
Choking: p.peerChoking,
PieceAllowedFast: func(i pieceIndex) bool {
@ -119,8 +122,11 @@ func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.Pee
p.onNextRequestStateChanged()
}
type RequestIndex = request_strategy.RequestIndex
type chunkIndexType = request_strategy.ChunkIndex
func (p *Peer) applyNextRequestState() bool {
if len(p.actualRequestState.Requests) > p.nominalMaxRequests()/2 {
if p.actualRequestState.Requests.GetCardinality() > uint64(p.nominalMaxRequests()/2) {
return true
}
type piece struct {
@ -148,8 +154,8 @@ func (p *Peer) applyNextRequestState() bool {
for _, endGameIter := range []bool{false, true} {
for _, piece := range pieceOrder {
tp := p.t.piece(piece.index)
tp.iterUndirtiedChunks(func(cs ChunkSpec) {
req := Request{pp.Integer(piece.index), cs}
tp.iterUndirtiedChunks(func(cs chunkIndexType) {
req := cs + tp.requestIndexOffset()
if !piece.endGame && !endGameIter && p.t.pendingRequests[req] > 0 {
return
}
@ -158,10 +164,10 @@ func (p *Peer) applyNextRequestState() bool {
if !more {
return
}
if len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
if maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
return
}
if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(req.Index)) {
if p.peerChoking && !p.peerAllowedFast.Contains(bitmap.BitIndex(piece.index)) {
return
}
var err error
@ -170,7 +176,7 @@ func (p *Peer) applyNextRequestState() bool {
panic(err)
}
})
if interested && len(p.actualRequestState.Requests) >= p.nominalMaxRequests() {
if interested && maxRequests(p.actualRequestState.Requests.GetCardinality()) >= p.nominalMaxRequests() {
break
}
if !more {

View File

@ -143,7 +143,7 @@ type Torrent struct {
connPieceInclinationPool sync.Pool
// Count of each request across active connections.
pendingRequests map[Request]int
pendingRequests map[RequestIndex]int
pex pexState
}
@ -440,7 +440,7 @@ func (t *Torrent) onSetInfo() {
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
t.pendingRequests = make(map[Request]int)
t.pendingRequests = make(map[RequestIndex]int)
t.tryCreateMorePieceHashers()
}
@ -842,8 +842,12 @@ func (t *Torrent) bitfield() (bf []bool) {
return
}
func (t *Torrent) pieceNumChunks(piece pieceIndex) pp.Integer {
return (t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize
func (t *Torrent) chunksPerRegularPiece() uint32 {
return uint32((pp.Integer(t.usualPieceSize()) + t.chunkSize - 1) / t.chunkSize)
}
func (t *Torrent) pieceNumChunks(piece pieceIndex) chunkIndexType {
return chunkIndexType((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
}
func (t *Torrent) pendAllChunkSpecs(pieceIndex pieceIndex) {
@ -940,8 +944,8 @@ func (t *Torrent) haveChunk(r Request) (ret bool) {
return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
}
func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
return int(cs.Begin / chunkSize)
func chunkIndexFromChunkSpec(cs ChunkSpec, chunkSize pp.Integer) chunkIndexType {
return chunkIndexType(cs.Begin / chunkSize)
}
func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
@ -1033,7 +1037,7 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
if t.pieceComplete(piece) {
return 0
}
return t.pieceNumChunks(piece) - t.pieces[piece].numDirtyChunks()
return pp.Integer(t.pieceNumChunks(piece)) - t.pieces[piece].numDirtyChunks()
}
func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
@ -1170,9 +1174,8 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
return ret
}
func (t *Torrent) pendRequest(req Request) {
ci := chunkIndex(req.ChunkSpec, t.chunkSize)
t.pieces[req.Index].pendChunkIndex(ci)
func (t *Torrent) pendRequest(req RequestIndex) {
t.piece(int(req / t.chunksPerRegularPiece())).pendChunkIndex(req % t.chunksPerRegularPiece())
}
func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
@ -2259,3 +2262,19 @@ func (t *Torrent) peerIsActive(p *Peer) (active bool) {
})
return
}
func (t *Torrent) requestIndexToRequest(ri RequestIndex) Request {
index := ri / t.chunksPerRegularPiece()
return Request{
pp.Integer(index),
t.piece(int(index)).chunkIndexSpec(pp.Integer(ri % t.chunksPerRegularPiece())),
}
}
func (t *Torrent) requestIndexFromRequest(r Request) RequestIndex {
return t.chunksPerRegularPiece()*uint32(r.Index) + uint32(r.Begin/t.chunkSize)
}
func (t *Torrent) numChunks() RequestIndex {
return RequestIndex((t.Length() + int64(t.chunkSize) - 1) / int64(t.chunkSize))
}

View File

@ -78,11 +78,17 @@ func (ws *webseedPeer) requester() {
defer ws.requesterCond.L.Unlock()
start:
for !ws.peer.closed.IsSet() {
for r := range ws.peer.actualRequestState.Requests {
restart := false
ws.peer.actualRequestState.Requests.Iterate(func(x uint32) bool {
r := ws.peer.t.requestIndexToRequest(x)
if _, ok := ws.activeRequests[r]; ok {
continue
return true
}
ws.doRequest(r)
restart = true
return false
})
if restart {
goto start
}
ws.requesterCond.Wait()
@ -134,7 +140,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
}() {
ws.peer.close()
} else {
ws.peer.remoteRejectedRequest(r)
ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r))
}
} else {
err := ws.peer.receiveChunk(&pp.Message{