From ac6ba9f02156d77ebf3f22b01f19646f3450b5b3 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 10 Jun 2018 09:18:52 +1000 Subject: [PATCH] Add ConnStats at Client level ConnStats management is refactored to make this less tedious. --- client.go | 9 ++++++--- conn_stats.go | 15 +++++++++++---- connection.go | 48 +++++++++++++++++++++++++++++------------------- torrent.go | 35 ++++++++++++++++++++++++++++------- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/client.go b/client.go index f5a82300..2bcd81f8 100644 --- a/client.go +++ b/client.go @@ -22,6 +22,7 @@ import ( "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/sync" + "github.com/davecgh/go-spew/spew" "github.com/dustin/go-humanize" "github.com/google/btree" "golang.org/x/time/rate" @@ -62,6 +63,8 @@ type Client struct { dopplegangerAddrs map[string]struct{} badPeerIPs map[string]struct{} torrents map[metainfo.Hash]*Torrent + // An aggregate of stats over all connections. + stats ConnStats } func (cl *Client) BadPeerIPs() []string { @@ -123,6 +126,7 @@ func (cl *Client) WriteStatus(_w io.Writer) { fmt.Fprintf(w, "%s DHT server at %s:\n", s.Addr().Network(), s.Addr().String()) writeDhtServerStatus(w, s) }) + spew.Fdump(w, cl.stats) fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice())) fmt.Fprintln(w) for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool { @@ -764,7 +768,7 @@ func (cl *Client) runReceivedConn(c *connection) { } func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) { - t.reconcileHandshakeStats(c) + c.setTorrent(t) if c.PeerID == cl.peerID { if outgoing { connsToSelf.Add(1) @@ -899,8 +903,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect return fmt.Errorf("data has bad offset in payload: %d", begin) } t.saveMetadataPiece(piece, payload[begin:]) - c.stats.ChunksReadUseful++ - c.t.stats.ChunksReadUseful++ + c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful })) c.lastUsefulChunkReceived = time.Now() return t.maybeCompleteMetadata() case pp.RequestMetadataExtensionMsgType: diff --git a/conn_stats.go b/conn_stats.go index 9e01104b..1f45fa10 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -52,12 +52,19 @@ func (cs *ConnStats) readMsg(msg *pp.Message) { } } -func (cs *ConnStats) wroteBytes(n int64) { - cs.BytesWritten += n +func (cs *ConnStats) incrementPiecesDirtiedGood() { + cs.PiecesDirtiedGood++ } -func (cs *ConnStats) readBytes(n int64) { - cs.BytesRead += n +func (cs *ConnStats) incrementPiecesDirtiedBad() { + cs.PiecesDirtiedBad++ +} + +func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) { + return func(cs *ConnStats) { + p := f(cs) + *p += n + } } type connStatsReadWriter struct { diff --git a/connection.go b/connection.go index 9246072c..e330bd29 100644 --- a/connection.go +++ b/connection.go @@ -50,6 +50,9 @@ type connection struct { cryptoMethod mse.CryptoMethod Discovery peerSource closed missinggo.Event + // Set true after we've added our ConnStats generated during handshake to + // other ConnStat instances as determined when the *Torrent became known. + reconciledHandshakeStats bool stats ConnStats @@ -839,27 +842,37 @@ func (c *connection) requestPendingMetadata() { func (cn *connection) wroteMsg(msg *pp.Message) { messageTypesSent.Add(msg.Type.String(), 1) - cn.stats.wroteMsg(msg) - cn.t.stats.wroteMsg(msg) + cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) }) } func (cn *connection) readMsg(msg *pp.Message) { - cn.stats.readMsg(msg) - cn.t.stats.readMsg(msg) + cn.allStats(func(cs *ConnStats) { cs.readMsg(msg) }) +} + +// After handshake, we know what Torrent and Client stats to include for a +// connection. +func (cn *connection) postHandshakeStats(f func(*ConnStats)) { + t := cn.t + f(&t.stats.ConnStats) + f(&t.cl.stats) +} + +// All ConnStats that include this connection. Some objects are not known +// until the handshake is complete, after which it's expected to reconcile the +// differences. +func (cn *connection) allStats(f func(*ConnStats)) { + f(&cn.stats) + if cn.reconciledHandshakeStats { + cn.postHandshakeStats(f) + } } func (cn *connection) wroteBytes(n int64) { - cn.stats.wroteBytes(n) - if cn.t != nil { - cn.t.stats.wroteBytes(n) - } + cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten })) } func (cn *connection) readBytes(n int64) { - cn.stats.readBytes(n) - if cn.t != nil { - cn.t.stats.readBytes(n) - } + cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead })) } // Returns whether the connection could be useful to us. We're seeding and @@ -1185,18 +1198,15 @@ func (c *connection) receiveChunk(msg *pp.Message) { // Do we actually want this chunk? if !t.wantPiece(req) { unwantedChunksReceived.Add(1) - c.stats.ChunksReadUnwanted++ - c.t.stats.ChunksReadUnwanted++ + c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted })) return } index := int(req.Index) piece := &t.pieces[index] - c.stats.ChunksReadUseful++ - c.t.stats.ChunksReadUseful++ - c.stats.BytesReadUsefulData += int64(len(msg.Piece)) - c.t.stats.BytesReadUsefulData += int64(len(msg.Piece)) + c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful })) + c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData })) c.lastUsefulChunkReceived = time.Now() // if t.fastestConn != c { // log.Printf("setting fastest connection %p", c) @@ -1415,5 +1425,5 @@ func (c *connection) setTorrent(t *Torrent) { panic("connection already associated with a torrent") } c.t = t - t.conns[c] = struct{}{} + t.reconcileHandshakeStats(c) } diff --git a/torrent.go b/torrent.go index 4a784300..b8b4282b 100644 --- a/torrent.go +++ b/torrent.go @@ -1474,8 +1474,18 @@ func (t *Torrent) numTotalPeers() int { // Reconcile bytes transferred before connection was associated with a // torrent. func (t *Torrent) reconcileHandshakeStats(c *connection) { - t.stats.wroteBytes(c.stats.BytesWritten) - t.stats.readBytes(c.stats.BytesRead) + if c.stats != (ConnStats{ + // Handshakes should only increment these fields: + BytesWritten: c.stats.BytesWritten, + BytesRead: c.stats.BytesRead, + }) { + panic("bad stats") + } + c.postHandshakeStats(func(cs *ConnStats) { + cs.BytesRead += c.stats.BytesRead + cs.BytesWritten += c.stats.BytesWritten + }) + c.reconciledHandshakeStats = true } // Returns true if the connection is added. @@ -1519,7 +1529,7 @@ func (t *Torrent) addConnection(c *connection, outgoing bool) bool { if len(t.conns) >= t.maxEstablishedConns { panic(len(t.conns)) } - c.setTorrent(t) + t.conns[c] = struct{}{} return true } @@ -1575,10 +1585,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { } if correct { if len(touchers) != 0 { - t.stats.PiecesDirtiedGood++ + // Don't increment stats above connection-level for every involved + // connection. + t.allStats((*ConnStats).incrementPiecesDirtiedGood) } for _, c := range touchers { - c.stats.PiecesDirtiedGood++ + c.stats.incrementPiecesDirtiedGood() } err := p.Storage().MarkComplete() if err != nil { @@ -1586,10 +1598,12 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { } } else { if len(touchers) != 0 { - t.stats.PiecesDirtiedBad++ + // Don't increment stats above connection-level for every involved + // connection. + t.allStats((*ConnStats).incrementPiecesDirtiedBad) for _, c := range touchers { // Y u do dis peer?! - c.stats.PiecesDirtiedBad++ + c.stats.incrementPiecesDirtiedBad() } slices.Sort(touchers, connLessTrusted) if t.cl.config.Debug { @@ -1746,3 +1760,10 @@ func (t *Torrent) AddClientPeer(cl *Client) { return }()) } + +// All stats that include this Torrent. Useful when we want to increment +// ConnStats but not for every connection. +func (t *Torrent) allStats(f func(*ConnStats)) { + f(&t.stats.ConnStats) + f(&t.cl.stats) +}