From 360e1dbdd14eb2925d5d80d5f439501277424523 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 12 Jun 2018 20:21:53 +1000 Subject: [PATCH] Track ConnStats with atomics --- client.go | 4 +-- client_test.go | 13 ++++++--- conn_stats.go | 75 +++++++++++++++++++++++++++++--------------------- connection.go | 20 +++++++------- torrent.go | 31 +++++++++++---------- 5 files changed, 81 insertions(+), 62 deletions(-) diff --git a/client.go b/client.go index 2a65b906..975f0eee 100644 --- a/client.go +++ b/client.go @@ -908,7 +908,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect return fmt.Errorf("data has bad offset in payload: %d", begin) } t.saveMetadataPiece(piece, payload[begin:]) - c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful })) + c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.lastUsefulChunkReceived = time.Now() return t.maybeCompleteMetadata() case pp.RequestMetadataExtensionMsgType: @@ -1159,7 +1159,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) { writeBuffer: new(bytes.Buffer), } c.writerCond.L = &cl.mu - c.setRW(connStatsReadWriter{nc, &cl.mu, c}) + c.setRW(connStatsReadWriter{nc, c}) c.r = &rateLimitedReader{ l: cl.downloadLimit, r: c.r, diff --git a/client_test.go b/client_test.go index d74b4e7d..63bcd663 100644 --- a/client_test.go +++ b/client_test.go @@ -415,10 +415,15 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { r.SetReadahead(ps.Readahead) } assertReadAllGreeting(t, r) - assert.True(t, 13 <= seederTorrent.Stats().BytesWrittenData) - assert.True(t, 8 <= seederTorrent.Stats().ChunksWritten) - assert.True(t, 13 <= leecherTorrent.Stats().BytesReadData) - assert.True(t, 8 <= leecherTorrent.Stats().ChunksRead) + + seederStats := seederTorrent.Stats() + assert.True(t, 13 <= seederStats.BytesWrittenData.Int64()) + assert.True(t, 8 <= seederStats.ChunksWritten.Int64()) + + leecherStats := leecherTorrent.Stats() + assert.True(t, 13 <= leecherStats.BytesReadData.Int64()) + assert.True(t, 8 <= leecherStats.ChunksRead.Int64()) + // Try reading through again for the cases where the torrent data size // exceeds the size of the cache. assertReadAllGreeting(t, r) diff --git a/conn_stats.go b/conn_stats.go index 1f45fa10..79554bc0 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -2,7 +2,9 @@ package torrent import ( "io" - "sync" + "log" + "reflect" + "sync/atomic" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -14,81 +16,92 @@ import ( // is things sent to the peer, and Read is stuff received from them. type ConnStats struct { // Total bytes on the wire. Includes handshakes and encryption. - BytesWritten int64 - BytesWrittenData int64 + BytesWritten Count + BytesWrittenData Count - BytesRead int64 - BytesReadData int64 - BytesReadUsefulData int64 + BytesRead Count + BytesReadData Count + BytesReadUsefulData Count - ChunksWritten int64 + ChunksWritten Count - ChunksRead int64 - ChunksReadUseful int64 - ChunksReadUnwanted int64 + ChunksRead Count + ChunksReadUseful Count + ChunksReadUnwanted Count // Number of pieces data was written to, that subsequently passed verification. - PiecesDirtiedGood int64 + PiecesDirtiedGood Count // Number of pieces data was written to, that subsequently failed // verification. Note that a connection may not have been the sole dirtier // of a piece. - PiecesDirtiedBad int64 + PiecesDirtiedBad Count +} + +func (me *ConnStats) Copy() (ret ConnStats) { + for i := 0; i < reflect.TypeOf(ConnStats{}).NumField(); i++ { + n := reflect.ValueOf(me).Elem().Field(i).Addr().Interface().(*Count).Int64() + reflect.ValueOf(&ret).Elem().Field(i).Addr().Interface().(*Count).Add(n) + } + return +} + +type Count struct { + n int64 +} + +func (me *Count) Add(n int64) { + atomic.AddInt64(&me.n, n) +} + +func (me *Count) Int64() int64 { + return atomic.LoadInt64(&me.n) } func (cs *ConnStats) wroteMsg(msg *pp.Message) { // TODO: Track messages and not just chunks. switch msg.Type { case pp.Piece: - cs.ChunksWritten++ - cs.BytesWrittenData += int64(len(msg.Piece)) + cs.ChunksWritten.Add(1) + cs.BytesWrittenData.Add(int64(len(msg.Piece))) } } func (cs *ConnStats) readMsg(msg *pp.Message) { switch msg.Type { case pp.Piece: - cs.ChunksRead++ - cs.BytesReadData += int64(len(msg.Piece)) + cs.ChunksRead.Add(1) + cs.BytesReadData.Add(int64(len(msg.Piece))) } } func (cs *ConnStats) incrementPiecesDirtiedGood() { - cs.PiecesDirtiedGood++ + cs.PiecesDirtiedGood.Add(1) } func (cs *ConnStats) incrementPiecesDirtiedBad() { - cs.PiecesDirtiedBad++ + cs.PiecesDirtiedBad.Add(1) } -func add(n int64, f func(*ConnStats) *int64) func(*ConnStats) { +func add(n int64, f func(*ConnStats) *Count) func(*ConnStats) { return func(cs *ConnStats) { p := f(cs) - *p += n + p.Add(n) } } type connStatsReadWriter struct { rw io.ReadWriter - l sync.Locker c *connection } func (me connStatsReadWriter) Write(b []byte) (n int, err error) { n, err = me.rw.Write(b) - go func() { - me.l.Lock() - me.c.wroteBytes(int64(n)) - me.l.Unlock() - }() + me.c.wroteBytes(int64(n)) return } func (me connStatsReadWriter) Read(b []byte) (n int, err error) { n, err = me.rw.Read(b) - go func() { - me.l.Lock() - me.c.readBytes(int64(n)) - me.l.Unlock() - }() + me.c.readBytes(int64(n)) return } diff --git a/connection.go b/connection.go index 6aa007b7..70aa1ccb 100644 --- a/connection.go +++ b/connection.go @@ -223,7 +223,7 @@ func (cn *connection) statusFlags() (ret string) { // } func (cn *connection) downloadRate() float64 { - return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds() + return float64(cn.stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds() } func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { @@ -324,7 +324,7 @@ func (cn *connection) requestedMetadataPiece(index int) bool { // The actual value to use as the maximum outbound requests. func (cn *connection) nominalMaxRequests() (ret int) { - return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful)))) + return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64())))) } func (cn *connection) onPeerSentCancel(r request) { @@ -854,7 +854,7 @@ func (cn *connection) readMsg(msg *pp.Message) { // connection. func (cn *connection) postHandshakeStats(f func(*ConnStats)) { t := cn.t - f(&t.stats.ConnStats) + f(&t.stats) f(&t.cl.stats) } @@ -869,11 +869,11 @@ func (cn *connection) allStats(f func(*ConnStats)) { } func (cn *connection) wroteBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesWritten })) + cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) } func (cn *connection) readBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *int64 { return &cs.BytesRead })) + cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) } // Returns whether the connection could be useful to us. We're seeding and @@ -1199,15 +1199,15 @@ func (c *connection) receiveChunk(msg *pp.Message) { // Do we actually want this chunk? if !t.wantPiece(req) { unwantedChunksReceived.Add(1) - c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUnwanted })) + c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUnwanted })) return } index := int(req.Index) piece := &t.pieces[index] - c.allStats(add(1, func(cs *ConnStats) *int64 { return &cs.ChunksReadUseful })) - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *int64 { return &cs.BytesReadUsefulData })) + c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) + c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) c.lastUsefulChunkReceived = time.Now() // if t.fastestConn != c { // log.Printf("setting fastest connection %p", c) @@ -1283,7 +1283,7 @@ func (c *connection) uploadAllowed() bool { return false } // Don't upload more than 100 KiB more than we download. - if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 { + if c.stats.BytesWrittenData.Int64() >= c.stats.BytesReadData.Int64()+100<<10 { return false } return true @@ -1353,7 +1353,7 @@ func (cn *connection) Drop() { } func (cn *connection) netGoodPiecesDirtied() int64 { - return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad + return cn.stats.PiecesDirtiedGood.Int64() - cn.stats.PiecesDirtiedBad.Int64() } func (c *connection) peerHasWantedPieces() bool { diff --git a/torrent.go b/torrent.go index 9b3ff31e..fc1fb5df 100644 --- a/torrent.go +++ b/torrent.go @@ -135,7 +135,7 @@ type Torrent struct { // different pieces. connPieceInclinationPool sync.Pool // Torrent-level statistics. - stats TorrentStats + stats ConnStats // Count of each request across active connections. pendingRequests map[request]int @@ -851,7 +851,7 @@ func (t *Torrent) worstBadConn() *connection { heap.Init(&wcs) for wcs.Len() != 0 { c := heap.Pop(&wcs).(*connection) - if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful { + if c.stats.ChunksReadUnwanted.Int64() >= 6 && c.stats.ChunksReadUnwanted.Int64() > c.stats.ChunksReadUseful.Int64() { return c } // If the connection is in the worst half of the established @@ -1343,9 +1343,9 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest { // The following are vaguely described in BEP 3. Left: t.bytesLeftAnnounce(), - Uploaded: t.stats.BytesWrittenData, + Uploaded: t.stats.BytesWrittenData.Int64(), // There's no mention of wasted or unwanted download in the BEP. - Downloaded: t.stats.BytesReadUsefulData, + Downloaded: t.stats.BytesReadUsefulData.Int64(), } } @@ -1440,18 +1440,19 @@ func (t *Torrent) Stats() TorrentStats { return t.statsLocked() } -func (t *Torrent) statsLocked() TorrentStats { - t.stats.ActivePeers = len(t.conns) - t.stats.HalfOpenPeers = len(t.halfOpen) - t.stats.PendingPeers = t.peers.Len() - t.stats.TotalPeers = t.numTotalPeers() - t.stats.ConnectedSeeders = 0 +func (t *Torrent) statsLocked() (ret TorrentStats) { + ret.ActivePeers = len(t.conns) + ret.HalfOpenPeers = len(t.halfOpen) + ret.PendingPeers = t.peers.Len() + ret.TotalPeers = t.numTotalPeers() + ret.ConnectedSeeders = 0 for c := range t.conns { if all, ok := c.peerHasAllPieces(); all && ok { - t.stats.ConnectedSeeders++ + ret.ConnectedSeeders++ } } - return t.stats + ret.ConnStats = t.stats.Copy() + return } // The total number of peers in the torrent. @@ -1485,8 +1486,8 @@ func (t *Torrent) reconcileHandshakeStats(c *connection) { panic("bad stats") } c.postHandshakeStats(func(cs *ConnStats) { - cs.BytesRead += c.stats.BytesRead - cs.BytesWritten += c.stats.BytesWritten + cs.BytesRead.Add(c.stats.BytesRead.Int64()) + cs.BytesWritten.Add(c.stats.BytesWritten.Int64()) }) c.reconciledHandshakeStats = true } @@ -1764,6 +1765,6 @@ func (t *Torrent) AddClientPeer(cl *Client) { // All stats that include this Torrent. Useful when we want to increment // ConnStats but not for every connection. func (t *Torrent) allStats(f func(*ConnStats)) { - f(&t.stats.ConnStats) + f(&t.stats) f(&t.cl.stats) }