2
0
mirror of synced 2025-02-23 22:28:11 +00:00

Track ConnStats with atomics

This commit is contained in:
Matt Joiner 2018-06-12 20:21:53 +10:00
parent 2007f2c234
commit 360e1dbdd1
5 changed files with 81 additions and 62 deletions

View File

@ -908,7 +908,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
return fmt.Errorf("data has bad offset in payload: %d", begin) return fmt.Errorf("data has bad offset in payload: %d", begin)
} }
t.saveMetadataPiece(piece, payload[begin:]) t.saveMetadataPiece(piece, payload[begin:])
c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful })) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.lastUsefulChunkReceived = time.Now() c.lastUsefulChunkReceived = time.Now()
return t.maybeCompleteMetadata() return t.maybeCompleteMetadata()
case pp.RequestMetadataExtensionMsgType: case pp.RequestMetadataExtensionMsgType:
@ -1159,7 +1159,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
writeBuffer: new(bytes.Buffer), writeBuffer: new(bytes.Buffer),
} }
c.writerCond.L = &cl.mu c.writerCond.L = &cl.mu
c.setRW(connStatsReadWriter{nc, &cl.mu, c}) c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{ c.r = &rateLimitedReader{
l: cl.downloadLimit, l: cl.downloadLimit,
r: c.r, r: c.r,

View File

@ -415,10 +415,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
r.SetReadahead(ps.Readahead) r.SetReadahead(ps.Readahead)
} }
assertReadAllGreeting(t, r) assertReadAllGreeting(t, r)
assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData)
assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten) seederStats := seederTorrent.Stats()
assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData) assert.True(t, 13 <= seederStats.BytesWrittenData.Int64())
assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead) assert.True(t, 8 <= seederStats.ChunksWritten.Int64())
leecherStats := leecherTorrent.Stats()
assert.True(t, 13 <= leecherStats.BytesReadData.Int64())
assert.True(t, 8 <= leecherStats.ChunksRead.Int64())
// Try reading through again for the cases where the torrent data size // Try reading through again for the cases where the torrent data size
// exceeds the size of the cache. // exceeds the size of the cache.
assertReadAllGreeting(t, r) assertReadAllGreeting(t, r)

View File

@ -2,7 +2,9 @@ package torrent
import ( import (
"io" "io"
"sync" "log"
"reflect"
"sync/atomic"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
@ -14,81 +16,92 @@ import (
// is things sent to the peer, and Read is stuff received from them. // is things sent to the peer, and Read is stuff received from them.
type ConnStats struct { type ConnStats struct {
// Total bytes on the wire. Includes handshakes and encryption. // Total bytes on the wire. Includes handshakes and encryption.
BytesWritten int64 BytesWritten Count
BytesWrittenData int64 BytesWrittenData Count
BytesRead int64 BytesRead Count
BytesReadData int64 BytesReadData Count
BytesReadUsefulData int64 BytesReadUsefulData Count
ChunksWritten int64 ChunksWritten Count
ChunksRead int64 ChunksRead Count
ChunksReadUseful int64 ChunksReadUseful Count
ChunksReadUnwanted int64 ChunksReadUnwanted Count
// Number of pieces data was written to, that subsequently passed verification. // Number of pieces data was written to, that subsequently passed verification.
PiecesDirtiedGood int64 PiecesDirtiedGood Count
// Number of pieces data was written to, that subsequently failed // Number of pieces data was written to, that subsequently failed
// verification. Note that a connection may not have been the sole dirtier // verification. Note that a connection may not have been the sole dirtier
// of a piece. // of a piece.
PiecesDirtiedBad int64 PiecesDirtiedBad Count
}
func (me *ConnStats) Copy() (ret ConnStats) {
for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ {
n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64()
reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n)
}
return
}
type Count struct {
n int64
}
func (me *Count) Add(n int64) {
atomic.AddInt64(&me.n, n)
}
func (me *Count) Int64() int64 {
return atomic.LoadInt64(&me.n)
} }
func (cs *ConnStats) wroteMsg(msg *pp.Message) { func (cs *ConnStats) wroteMsg(msg *pp.Message) {
// TODO: Track messages and not just chunks. // TODO: Track messages and not just chunks.
switch msg.Type { switch msg.Type {
case pp.Piece: case pp.Piece:
cs.ChunksWritten++ cs.ChunksWritten.Add(1)
cs.BytesWrittenData += int64(len(msg.Piece)) cs.BytesWrittenData.Add(int64(len(msg.Piece)))
} }
} }
func (cs *ConnStats) readMsg(msg *pp.Message) { func (cs *ConnStats) readMsg(msg *pp.Message) {
switch msg.Type { switch msg.Type {
case pp.Piece: case pp.Piece:
cs.ChunksRead++ cs.ChunksRead.Add(1)
cs.BytesReadData += int64(len(msg.Piece)) cs.BytesReadData.Add(int64(len(msg.Piece)))
} }
} }
func (cs *ConnStats) incrementPiecesDirtiedGood() { func (cs *ConnStats) incrementPiecesDirtiedGood() {
cs.PiecesDirtiedGood++ cs.PiecesDirtiedGood.Add(1)
} }
func (cs *ConnStats) incrementPiecesDirtiedBad() { func (cs *ConnStats) incrementPiecesDirtiedBad() {
cs.PiecesDirtiedBad++ cs.PiecesDirtiedBad.Add(1)
} }
func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) { func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) {
return func(cs *ConnStats) { return func(cs *ConnStats) {
p := f(cs) p := f(cs)
*p += n p.Add(n)
} }
} }
type connStatsReadWriter struct { type connStatsReadWriter struct {
rw io.ReadWriter rw io.ReadWriter
l sync.Locker
c *connection c *connection
} }
func (me connStatsReadWriter) Write(b []byte) (n int, err error) { func (me connStatsReadWriter) Write(b []byte) (n int, err error) {
n, err = me.rw.Write(b) n, err = me.rw.Write(b)
go func() {
me.l.Lock()
me.c.wroteBytes(int64(n)) me.c.wroteBytes(int64(n))
me.l.Unlock()
}()
return return
} }
func (me connStatsReadWriter) Read(b []byte) (n int, err error) { func (me connStatsReadWriter) Read(b []byte) (n int, err error) {
n, err = me.rw.Read(b) n, err = me.rw.Read(b)
go func() {
me.l.Lock()
me.c.readBytes(int64(n)) me.c.readBytes(int64(n))
me.l.Unlock()
}()
return return
} }

View File

@ -223,7 +223,7 @@ func (cn *connection) statusFlags() (ret string) {
// } // }
func (cn *connection) downloadRate() float64 { func (cn *connection) downloadRate() float64 {
return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds() return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
} }
func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
@ -324,7 +324,7 @@ func (cn *connection) requestedMetadataPiece(index int) bool {
// The actual value to use as the maximum outbound requests. // The actual value to use as the maximum outbound requests.
func (cn *connection) nominalMaxRequests() (ret int) { func (cn *connection) nominalMaxRequests() (ret int) {
return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful)))) return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
} }
func (cn *connection) onPeerSentCancel(r request) { func (cn *connection) onPeerSentCancel(r request) {
@ -854,7 +854,7 @@ func (cn *connection) readMsg(msg *pp.Message) {
// connection. // connection.
func (cn *connection) postHandshakeStats(f func(*ConnStats)) { func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
t := cn.t t := cn.t
f(&t.stats.ConnStats) f(&t.stats)
f(&t.cl.stats) f(&t.cl.stats)
} }
@ -869,11 +869,11 @@ func (cn *connection) allStats(f func(*ConnStats)) {
} }
func (cn *connection) wroteBytes(n int64) { func (cn *connection) wroteBytes(n int64) {
cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten })) cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten }))
} }
func (cn *connection) readBytes(n int64) { func (cn *connection) readBytes(n int64) {
cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead })) cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead }))
} }
// Returns whether the connection could be useful to us. We're seeding and // Returns whether the connection could be useful to us. We're seeding and
@ -1199,15 +1199,15 @@ func (c *connection) receiveChunk(msg *pp.Message) {
// Do we actually want this chunk? // Do we actually want this chunk?
if !t.wantPiece(req) { if !t.wantPiece(req) {
unwantedChunksReceived.Add(1) unwantedChunksReceived.Add(1)
c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted })) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted }))
return return
} }
index := int(req.Index) index := int(req.Index)
piece := &t.pieces[index] piece := &t.pieces[index]
c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful })) c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
c.lastUsefulChunkReceived = time.Now() c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c { // if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c) // log.Printf("setting fastest connection %p", c)
@ -1283,7 +1283,7 @@ func (c *connection) uploadAllowed() bool {
return false return false
} }
// Don't upload more than 100 KiB more than we download. // Don't upload more than 100 KiB more than we download.
if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 { if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 {
return false return false
} }
return true return true
@ -1353,7 +1353,7 @@ func (cn *connection) Drop() {
} }
func (cn *connection) netGoodPiecesDirtied() int64 { func (cn *connection) netGoodPiecesDirtied() int64 {
return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64()
} }
func (c *connection) peerHasWantedPieces() bool { func (c *connection) peerHasWantedPieces() bool {

View File

@ -135,7 +135,7 @@ type Torrent struct {
// different pieces. // different pieces.
connPieceInclinationPool sync.Pool connPieceInclinationPool sync.Pool
// Torrent-level statistics. // Torrent-level statistics.
stats TorrentStats stats ConnStats
// Count of each request across active connections. // Count of each request across active connections.
pendingRequests map[request]int pendingRequests map[request]int
@ -851,7 +851,7 @@ func (t *Torrent) worstBadConn() *connection {
heap.Init(&wcs) heap.Init(&wcs)
for wcs.Len() != 0 { for wcs.Len() != 0 {
c := heap.Pop(&wcs).(*connection) c := heap.Pop(&wcs).(*connection)
if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful { if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() {
return c return c
} }
// If the connection is in the worst half of the established // If the connection is in the worst half of the established
@ -1343,9 +1343,9 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
// The following are vaguely described in BEP 3. // The following are vaguely described in BEP 3.
Left: t.bytesLeftAnnounce(), Left: t.bytesLeftAnnounce(),
Uploaded: t.stats.BytesWrittenData, Uploaded: t.stats.BytesWrittenData.Int64(),
// There's no mention of wasted or unwanted download in the BEP. // There's no mention of wasted or unwanted download in the BEP.
Downloaded: t.stats.BytesReadUsefulData, Downloaded: t.stats.BytesReadUsefulData.Int64(),
} }
} }
@ -1440,18 +1440,19 @@ func (t *Torrent) Stats() TorrentStats {
return t.statsLocked() return t.statsLocked()
} }
func (t *Torrent) statsLocked() TorrentStats { func (t *Torrent) statsLocked() (ret TorrentStats) {
t.stats.ActivePeers = len(t.conns) ret.ActivePeers = len(t.conns)
t.stats.HalfOpenPeers = len(t.halfOpen) ret.HalfOpenPeers = len(t.halfOpen)
t.stats.PendingPeers = t.peers.Len() ret.PendingPeers = t.peers.Len()
t.stats.TotalPeers = t.numTotalPeers() ret.TotalPeers = t.numTotalPeers()
t.stats.ConnectedSeeders = 0 ret.ConnectedSeeders = 0
for c := range t.conns { for c := range t.conns {
if all, ok := c.peerHasAllPieces(); all && ok { if all, ok := c.peerHasAllPieces(); all && ok {
t.stats.ConnectedSeeders++ ret.ConnectedSeeders++
} }
} }
return t.stats ret.ConnStats = t.stats.Copy()
return
} }
// The total number of peers in the torrent. // The total number of peers in the torrent.
@ -1485,8 +1486,8 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) {
panic("bad stats") panic("bad stats")
} }
c.postHandshakeStats(func(cs *ConnStats) { c.postHandshakeStats(func(cs *ConnStats) {
cs.BytesRead += c.stats.BytesRead cs.BytesRead.Add(c.stats.BytesRead.Int64())
cs.BytesWritten += c.stats.BytesWritten cs.BytesWritten.Add(c.stats.BytesWritten.Int64())
}) })
c.reconciledHandshakeStats = true c.reconciledHandshakeStats = true
} }
@ -1764,6 +1765,6 @@ func (t *Torrent) AddClientPeer(cl *Client) {
// All stats that include this Torrent. Useful when we want to increment // All stats that include this Torrent. Useful when we want to increment
// ConnStats but not for every connection. // ConnStats but not for every connection.
func (t *Torrent) allStats(f func(*ConnStats)) { func (t *Torrent) allStats(f func(*ConnStats)) {
f(&t.stats.ConnStats) f(&t.stats)
f(&t.cl.stats) f(&t.cl.stats)
} }