From de761fb50628009c57666d4567009963ed81f40c Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 5 Oct 2016 15:57:00 +1100 Subject: [PATCH] Do chunk pooling at Torrent instead of connection level --- client.go | 10 +++++----- connection.go | 9 ++------- connection_test.go | 2 +- torrent.go | 10 ++++++++++ 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/client.go b/client.go index b0df5fa1..29aed198 100644 --- a/client.go +++ b/client.go @@ -1169,10 +1169,9 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool { // Return a Torrent ready for insertion into a Client. func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { t = &Torrent{ - cl: cl, - infoHash: ih, - chunkSize: defaultChunkSize, - peers: make(map[peersKey]Peer), + cl: cl, + infoHash: ih, + peers: make(map[peersKey]Peer), halfOpen: make(map[string]struct{}), pieceStateChanges: pubsub.NewPubSub(), @@ -1180,6 +1179,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) { storageOpener: cl.defaultStorage, maxEstablishedConns: defaultEstablishedConnsPerTorrent, } + t.setChunkSize(defaultChunkSize) return } @@ -1228,7 +1228,7 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e cl.mu.Lock() defer cl.mu.Unlock() if spec.ChunkSize != 0 { - t.chunkSize = pp.Integer(spec.ChunkSize) + t.setChunkSize(pp.Integer(spec.ChunkSize)) } t.addTrackers(spec.Trackers) t.maybeNewConns() diff --git a/connection.go b/connection.go index b25e2662..742b90af 100644 --- a/connection.go +++ b/connection.go @@ -698,16 +698,11 @@ func (c *connection) lastHelpful() (ret time.Time) { func (c *connection) mainReadLoop() error { t := c.t cl := t.cl - pool := &sync.Pool{ - New: func() interface{} { - return make([]byte, t.chunkSize) - }, - } decoder := pp.Decoder{ R: bufio.NewReader(c.rw), MaxLength: 256 * 1024, - Pool: pool, + Pool: t.chunkPool, } for { cl.mu.Unlock() @@ -782,7 +777,7 @@ func (c *connection) mainReadLoop() error { case pp.Piece: cl.downloadedChunk(t, c, &msg) if len(msg.Piece) == int(t.chunkSize) { - pool.Put(msg.Piece) + t.chunkPool.Put(msg.Piece) } case pp.Extended: switch msg.ExtendedID { diff --git a/connection_test.go b/connection_test.go index ae6d2348..52f269b3 100644 --- a/connection_test.go +++ b/connection_test.go @@ -144,10 +144,10 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { Length: 1 << 20, PieceLength: 1 << 20, }, - chunkSize: defaultChunkSize, storage: &storage.Torrent{ts}, pieceStateChanges: pubsub.NewPubSub(), } + t.setChunkSize(defaultChunkSize) t.makePieces() t.pendingPieces.Add(0) r, w := io.Pipe() diff --git a/torrent.go b/torrent.go index 767f36bf..b9ac2f55 100644 --- a/torrent.go +++ b/torrent.go @@ -52,6 +52,7 @@ type Torrent struct { // The size of chunks to request from peers over the wire. This is // normally 16KiB by convention these days. chunkSize pp.Integer + chunkPool *sync.Pool // Total length of the torrent in bytes. Stored because it's not O(1) to // get this from the info dict. length int64 @@ -115,6 +116,15 @@ type Torrent struct { stats TorrentStats } +func (t *Torrent) setChunkSize(size pp.Integer) { + t.chunkSize = size + t.chunkPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, size) + }, + } +} + func (t *Torrent) setDisplayName(dn string) { if t.haveInfo() { return