Add ConnStats at Client level

ConnStats management is refactored to make this less tedious.
This commit is contained in:
Matt Joiner 2018-06-10 09:18:52 +10:00
parent 415c6f6654
commit ac6ba9f021
4 changed files with 74 additions and 33 deletions

View File

@ -22,6 +22,7 @@ import (
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/sync" "github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/google/btree" "github.com/google/btree"
"golang.org/x/time/rate" "golang.org/x/time/rate"
@ -62,6 +63,8 @@ type Client struct {
dopplegangerAddrs map[string]struct{} dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{} badPeerIPs map[string]struct{}
torrents map[metainfo.Hash]*Torrent torrents map[metainfo.Hash]*Torrent
// An aggregate of stats over all connections.
stats ConnStats
} }
func (cl *Client) BadPeerIPs() []string { func (cl *Client) BadPeerIPs() []string {
@ -123,6 +126,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String()) fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String())
writeDhtServerStatus(w, s) writeDhtServerStatus(w, s)
}) })
spew.Fdump(w, cl.stats)
fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice())) fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
fmt.Fprintln(w) fmt.Fprintln(w)
for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool { for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
@ -764,7 +768,7 @@ func (cl *Client) runReceivedConn(c *connection) {
} }
func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) { func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
t.reconcileHandshakeStats(c) c.setTorrent(t)
if c.PeerID == cl.peerID { if c.PeerID == cl.peerID {
if outgoing { if outgoing {
connsToSelf.Add(1) connsToSelf.Add(1)
@ -899,8 +903,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
return fmt.Errorf("data has bad offset in payload: %d", begin) return fmt.Errorf("data has bad offset in payload: %d", begin)
} }
t.saveMetadataPiece(piece, payload[begin:]) t.saveMetadataPiece(piece, payload[begin:])
c.stats.ChunksReadUseful++ c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
c.t.stats.ChunksReadUseful++
c.lastUsefulChunkReceived = time.Now() c.lastUsefulChunkReceived = time.Now()
return t.maybeCompleteMetadata() return t.maybeCompleteMetadata()
case pp.RequestMetadataExtensionMsgType: case pp.RequestMetadataExtensionMsgType:

View File

@ -52,12 +52,19 @@ func (cs *ConnStats) readMsg(msg *pp.Message) {
} }
} }
func (cs *ConnStats) wroteBytes(n int64) { func (cs *ConnStats) incrementPiecesDirtiedGood() {
cs.BytesWritten += n cs.PiecesDirtiedGood++
} }
func (cs *ConnStats) readBytes(n int64) { func (cs *ConnStats) incrementPiecesDirtiedBad() {
cs.BytesRead += n cs.PiecesDirtiedBad++
}
func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) {
return func(cs *ConnStats) {
p := f(cs)
*p += n
}
} }
type connStatsReadWriter struct { type connStatsReadWriter struct {

View File

@ -50,6 +50,9 @@ type connection struct {
cryptoMethod mse.CryptoMethod cryptoMethod mse.CryptoMethod
Discovery peerSource Discovery peerSource
closed missinggo.Event closed missinggo.Event
// Set true after we've added our ConnStats generated during handshake to
// other ConnStat instances as determined when the *Torrent became known.
reconciledHandshakeStats bool
stats ConnStats stats ConnStats
@ -839,27 +842,37 @@ func (c *connection) requestPendingMetadata() {
func (cn *connection) wroteMsg(msg *pp.Message) { func (cn *connection) wroteMsg(msg *pp.Message) {
messageTypesSent.Add(msg.Type.String(), 1) messageTypesSent.Add(msg.Type.String(), 1)
cn.stats.wroteMsg(msg) cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) })
cn.t.stats.wroteMsg(msg)
} }
func (cn *connection) readMsg(msg *pp.Message) { func (cn *connection) readMsg(msg *pp.Message) {
cn.stats.readMsg(msg) cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) })
cn.t.stats.readMsg(msg) }
// After handshake, we know what Torrent and Client stats to include for a
// connection.
func (cn *connection) postHandshakeStats(f func(*ConnStats)) {
t := cn.t
f(&t.stats.ConnStats)
f(&t.cl.stats)
}
// All ConnStats that include this connection. Some objects are not known
// until the handshake is complete, after which it's expected to reconcile the
// differences.
func (cn *connection) allStats(f func(*ConnStats)) {
f(&cn.stats)
if cn.reconciledHandshakeStats {
cn.postHandshakeStats(f)
}
} }
func (cn *connection) wroteBytes(n int64) { func (cn *connection) wroteBytes(n int64) {
cn.stats.wroteBytes(n) cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten }))
if cn.t != nil {
cn.t.stats.wroteBytes(n)
}
} }
func (cn *connection) readBytes(n int64) { func (cn *connection) readBytes(n int64) {
cn.stats.readBytes(n) cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead }))
if cn.t != nil {
cn.t.stats.readBytes(n)
}
} }
// Returns whether the connection could be useful to us. We're seeding and // Returns whether the connection could be useful to us. We're seeding and
@ -1185,18 +1198,15 @@ func (c *connection) receiveChunk(msg *pp.Message) {
// Do we actually want this chunk? // Do we actually want this chunk?
if !t.wantPiece(req) { if !t.wantPiece(req) {
unwantedChunksReceived.Add(1) unwantedChunksReceived.Add(1)
c.stats.ChunksReadUnwanted++ c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted }))
c.t.stats.ChunksReadUnwanted++
return return
} }
index := int(req.Index) index := int(req.Index)
piece := &t.pieces[index] piece := &t.pieces[index]
c.stats.ChunksReadUseful++ c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful }))
c.t.stats.ChunksReadUseful++ c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData }))
c.stats.BytesReadUsefulData += int64(len(msg.Piece))
c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
c.lastUsefulChunkReceived = time.Now() c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c { // if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c) // log.Printf("setting fastest connection %p", c)
@ -1415,5 +1425,5 @@ func (c *connection) setTorrent(t *Torrent) {
panic("connection already associated with a torrent") panic("connection already associated with a torrent")
} }
c.t = t c.t = t
t.conns[c] = struct{}{} t.reconcileHandshakeStats(c)
} }

View File

@ -1474,8 +1474,18 @@ func (t *Torrent) numTotalPeers() int {
// Reconcile bytes transferred before connection was associated with a // Reconcile bytes transferred before connection was associated with a
// torrent. // torrent.
func (t *Torrent) reconcileHandshakeStats(c *connection) { func (t *Torrent) reconcileHandshakeStats(c *connection) {
t.stats.wroteBytes(c.stats.BytesWritten) if c.stats != (ConnStats{
t.stats.readBytes(c.stats.BytesRead) // Handshakes should only increment these fields:
BytesWritten: c.stats.BytesWritten,
BytesRead: c.stats.BytesRead,
}) {
panic("bad stats")
}
c.postHandshakeStats(func(cs *ConnStats) {
cs.BytesRead += c.stats.BytesRead
cs.BytesWritten += c.stats.BytesWritten
})
c.reconciledHandshakeStats = true
} }
// Returns true if the connection is added. // Returns true if the connection is added.
@ -1519,7 +1529,7 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool {
if len(t.conns) >= t.maxEstablishedConns { if len(t.conns) >= t.maxEstablishedConns {
panic(len(t.conns)) panic(len(t.conns))
} }
c.setTorrent(t) t.conns[c] = struct{}{}
return true return true
} }
@ -1575,10 +1585,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
} }
if correct { if correct {
if len(touchers) != 0 { if len(touchers) != 0 {
t.stats.PiecesDirtiedGood++ // Don't increment stats above connection-level for every involved
// connection.
t.allStats((*ConnStats).incrementPiecesDirtiedGood)
} }
for _, c := range touchers { for _, c := range touchers {
c.stats.PiecesDirtiedGood++ c.stats.incrementPiecesDirtiedGood()
} }
err := p.Storage().MarkComplete() err := p.Storage().MarkComplete()
if err != nil { if err != nil {
@ -1586,10 +1598,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
} }
} else { } else {
if len(touchers) != 0 { if len(touchers) != 0 {
t.stats.PiecesDirtiedBad++ // Don't increment stats above connection-level for every involved
// connection.
t.allStats((*ConnStats).incrementPiecesDirtiedBad)
for _, c := range touchers { for _, c := range touchers {
// Y u do dis peer?! // Y u do dis peer?!
c.stats.PiecesDirtiedBad++ c.stats.incrementPiecesDirtiedBad()
} }
slices.Sort(touchers, connLessTrusted) slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug { if t.cl.config.Debug {
@ -1746,3 +1760,10 @@ func (t *Torrent) AddClientPeer(cl *Client) {
return return
}()) }())
} }
// All stats that include this Torrent. Useful when we want to increment
// ConnStats but not for every connection.
func (t *Torrent) allStats(f func(*ConnStats)) {
f(&t.stats.ConnStats)
f(&t.cl.stats)
}