diff --git a/client_test.go b/client_test.go index cbfe48bb..f4c30851 100644 --- a/client_test.go +++ b/client_test.go @@ -537,8 +537,10 @@ func TestPeerInvalidHave(t *testing.T) { assert.True(t, _new) defer tt.Drop() cn := &PeerConn{Peer: Peer{ - t: tt, + t: tt, + callbacks: &cfg.Callbacks, }} + tt.conns[cn] = struct{}{} cn.peerImpl = cn cl.lock() defer cl.unlock() diff --git a/misc.go b/misc.go index 139af2d4..db924bbe 100644 --- a/misc.go +++ b/misc.go @@ -4,6 +4,7 @@ import ( "errors" "net" + "github.com/RoaringBitmap/roaring" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/torrent/types" "golang.org/x/time/rate" @@ -169,3 +170,12 @@ type ( InfoHash = metainfo.Hash IpPort = missinggo.IpPort ) + +func boolSliceToBitmap(slice []bool) (rb roaring.Bitmap) { + for i, b := range slice { + if b { + rb.AddInt(i) + } + } + return +} diff --git a/peerconn.go b/peerconn.go index 4a299714..40692225 100644 --- a/peerconn.go +++ b/peerconn.go @@ -774,28 +774,49 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { // Ignore known excess pieces. bf = bf[:cn.t.numPieces()] } - pp := cn.newPeerPieces() - cn.peerSentHaveAll = false - for i, have := range bf { - if have { - cn.raisePeerMinPieces(pieceIndex(i) + 1) - if !pp.Contains(bitmap.BitIndex(i)) { - cn.t.incPieceAvailability(i) - } - } else { - if pp.Contains(bitmap.BitIndex(i)) { - cn.t.decPieceAvailability(i) - } - } - if have { - cn._peerPieces.Add(uint32(i)) - if cn.t.wantPieceIndex(i) { - cn.updateRequests("bitfield") - } - } else { - cn._peerPieces.Remove(uint32(i)) - } + bm := boolSliceToBitmap(bf) + if cn.t.haveInfo() && pieceIndex(bm.GetCardinality()) == cn.t.numPieces() { + cn.onPeerHasAllPieces() + return nil } + if !bm.IsEmpty() { + cn.raisePeerMinPieces(pieceIndex(bm.Maximum()) + 1) + } + shouldUpdateRequests := false + if cn.peerSentHaveAll { + if !cn.t.deleteConnWithAllPieces(&cn.Peer) { + panic(cn) + } + cn.peerSentHaveAll = false + if !cn._peerPieces.IsEmpty() { + panic("if peer has all, we expect no individual peer pieces to be set") + } + } else { + bm.Xor(&cn._peerPieces) + } + cn.peerSentHaveAll = false + // bm is now 'on' for pieces that are changing + bm.Iterate(func(x uint32) bool { + pi := pieceIndex(x) + if cn._peerPieces.Contains(x) { + // Then we must be losing this piece + cn.t.decPieceAvailability(pi) + } else { + if !shouldUpdateRequests && cn.t.wantPieceIndex(pieceIndex(x)) { + shouldUpdateRequests = true + } + // We must be gaining this piece + cn.t.incPieceAvailability(pieceIndex(x)) + } + return true + }) + // Apply the changes. If we had everything previously, this should be empty, so xor is the same + // as or. + cn._peerPieces.Xor(&bm) + if shouldUpdateRequests { + cn.updateRequests("bitfield") + } + // We didn't guard this before, I see no reason to do it now. cn.peerPiecesChanged() return nil } @@ -803,13 +824,12 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { func (cn *PeerConn) onPeerHasAllPieces() { t := cn.t if t.haveInfo() { - npp, pc := cn.newPeerPieces(), t.numPieces() - for i := 0; i < pc; i += 1 { - if !npp.Contains(bitmap.BitIndex(i)) { - t.incPieceAvailability(i) - } - } + cn._peerPieces.Iterate(func(x uint32) bool { + t.decPieceAvailability(pieceIndex(x)) + return true + }) } + t.addConnWithAllPieces(&cn.Peer) cn.peerSentHaveAll = true cn._peerPieces.Clear() if !cn.t._pendingPieces.IsEmpty() { @@ -824,7 +844,9 @@ func (cn *PeerConn) onPeerSentHaveAll() error { } func (cn *PeerConn) peerSentHaveNone() error { - cn.t.decPeerPieceAvailability(&cn.Peer) + if cn.peerSentHaveAll { + cn.t.decPeerPieceAvailability(&cn.Peer) + } cn._peerPieces.Clear() cn.peerSentHaveAll = false cn.peerPiecesChanged() diff --git a/peerconn_test.go b/peerconn_test.go index 93e512a0..7ecb6933 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -222,8 +222,10 @@ func TestHaveAllThenBitfield(t *testing.T) { pc.peerImpl = &pc tt.conns[&pc] = struct{}{} c.Assert(pc.onPeerSentHaveAll(), qt.IsNil) + c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}}) pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false}) c.Check(pc.peerMinPieces, qt.Equals, 6) + c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0) c.Assert(pc.t.setInfo(&metainfo.Info{ PieceLength: 0, Pieces: make([]byte, pieceHash.Size()*7), diff --git a/piece.go b/piece.go index 6caa7628..1c4375f1 100644 --- a/piece.go +++ b/piece.go @@ -30,7 +30,10 @@ type Piece struct { publicPieceState PieceState priority piecePriority - availability int64 + // Availability adjustment for this piece relative to len(Torrent.connsWithAllPieces). This is + // incremented for any piece a peer has when a peer has a piece, Torrent.haveInfo is true, and + // the Peer isn't recorded in Torrent.connsWithAllPieces. + relativeAvailability int // This can be locked when the Client lock is taken, but probably not vice versa. pendingWritesMutex sync.Mutex @@ -279,3 +282,7 @@ func (me *undirtiedChunksIter) Iter(f func(chunkIndexType)) { func (p *Piece) requestIndexOffset() RequestIndex { return p.t.pieceRequestIndexOffset(p.index) } + +func (p *Piece) availability() int { + return len(p.t.connsWithAllPieces) + p.relativeAvailability +} diff --git a/request-strategy/order.go b/request-strategy/order.go index 50c9838f..cb69d23d 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -24,9 +24,12 @@ type ( func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation { return multiless.New().Int( int(j.state.Priority), int(i.state.Priority), + // TODO: Should we match on complete here to prevent churn when availability changes? ).Bool( j.state.Partial, i.state.Partial, - ).Int64( + ).Int( + // If this is done with relative availability, do we lose some determinism? If completeness + // is used, would that push this far enough down? i.state.Availability, j.state.Availability, ).Int( i.key.Index, j.key.Index, diff --git a/request-strategy/piece-request-order.go b/request-strategy/piece-request-order.go index c9a89663..dbdac738 100644 --- a/request-strategy/piece-request-order.go +++ b/request-strategy/piece-request-order.go @@ -27,7 +27,7 @@ type PieceRequestOrderKey struct { type PieceRequestOrderState struct { Priority piecePriority Partial bool - Availability int64 + Availability int } type pieceRequestOrderItem struct { diff --git a/requesting.go b/requesting.go index 59f6693c..53d5f324 100644 --- a/requesting.go +++ b/requesting.go @@ -19,7 +19,7 @@ func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRe return request_strategy.PieceRequestOrderState{ Priority: t.piece(i).purePriority(), Partial: t.piecePartiallyDownloaded(i), - Availability: t.piece(i).availability, + Availability: t.piece(i).availability(), } } @@ -125,8 +125,8 @@ func (p *peerRequests) Less(i, j int) bool { -int(rightPiece.purePriority()), ) ml = ml.Int( - int(leftPiece.availability), - int(rightPiece.availability)) + int(leftPiece.relativeAvailability), + int(rightPiece.relativeAvailability)) return ml.Less() } diff --git a/torrent.go b/torrent.go index ce4d7f59..f82032be 100644 --- a/torrent.go +++ b/torrent.go @@ -85,7 +85,6 @@ type Torrent struct { files *[]*File webSeeds map[string]*Peer - // Active peer connections, running message stream loops. TODO: Make this // open (not-closed) connections only. conns map[*PeerConn]struct{} @@ -137,6 +136,7 @@ type Torrent struct { activePieceHashes int initialPieceCheckDisabled bool + connsWithAllPieces map[*Peer]struct{} // Count of each request across active connections. pendingRequests map[RequestIndex]*Peer lastRequested map[RequestIndex]time.Time @@ -149,8 +149,12 @@ type Torrent struct { Complete chansync.Flag } -func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) { +func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) { + // This could be done with roaring.BitSliceIndexing. t.iterPeers(func(peer *Peer) { + if _, ok := t.connsWithAllPieces[peer]; ok { + return + } if peer.peerHasPiece(i) { count++ } @@ -163,10 +167,10 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) { return } p := t.piece(i) - if p.availability <= 0 { - panic(p.availability) + if p.relativeAvailability <= 0 { + panic(p.relativeAvailability) } - p.availability-- + p.relativeAvailability-- t.updatePieceRequestOrder(i) } @@ -174,7 +178,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) { // If we don't the info, this should be reconciled when we do. if t.haveInfo() { p := t.piece(i) - p.availability++ + p.relativeAvailability++ t.updatePieceRequestOrder(i) } } @@ -443,12 +447,12 @@ func (t *Torrent) onSetInfo() { t.initPieceRequestOrder() for i := range t.pieces { p := &t.pieces[i] - // Need to add availability before updating piece completion, as that may result in conns + // Need to add relativeAvailability before updating piece completion, as that may result in conns // being dropped. - if p.availability != 0 { - panic(p.availability) + if p.relativeAvailability != 0 { + panic(p.relativeAvailability) } - p.availability = int64(t.pieceAvailabilityFromPeers(i)) + p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i) t.addRequestOrderPiece(i) t.updatePieceCompletion(pieceIndex(i)) if !t.initialPieceCheckDisabled && !p.storageCompletionOk { @@ -571,7 +575,7 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMe type pieceAvailabilityRun struct { Count pieceIndex - Availability int64 + Availability int } func (me pieceAvailabilityRun) String() string { @@ -580,10 +584,10 @@ func (me pieceAvailabilityRun) String() string { func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) { rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) { - ret = append(ret, pieceAvailabilityRun{Availability: el.(int64), Count: int(count)}) + ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)}) }) for i := range t.pieces { - rle.Append(t.pieces[i].availability, 1) + rle.Append(t.pieces[i].availability(), 1) } rle.Flush() return @@ -836,6 +840,12 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) { if t.storage != nil { t.deletePieceRequestOrder() } + for i := range t.pieces { + p := t.piece(i) + if p.relativeAvailability != 0 { + panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability)) + } + } t.pex.Reset() t.cl.event.Broadcast() t.pieceStateChanges.Close() @@ -1407,14 +1417,20 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) { }) } t.assertPendingRequests() + if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 { + panic(t.connsWithAllPieces) + } return } func (t *Torrent) decPeerPieceAvailability(p *Peer) { + if t.deleteConnWithAllPieces(p) { + return + } if !t.haveInfo() { return } - p.newPeerPieces().Iterate(func(i uint32) bool { + p.peerPieces().Iterate(func(i uint32) bool { p.t.decPieceAvailability(pieceIndex(i)) return true }) @@ -2319,3 +2335,20 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer { func (t *Torrent) requestingPeer(r RequestIndex) *Peer { return t.pendingRequests[r] } + +func (t *Torrent) addConnWithAllPieces(p *Peer) { + if t.connsWithAllPieces == nil { + t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns) + } + t.connsWithAllPieces[p] = struct{}{} +} + +func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool { + _, ok := t.connsWithAllPieces[p] + delete(t.connsWithAllPieces, p) + return ok +} + +func (t *Torrent) numActivePeers() int { + return len(t.conns) + len(t.webSeeds) +}