Do chunk pooling at Torrent instead of connection level
This commit is contained in:
parent
d72d93bba7
commit
de761fb506
10
client.go
10
client.go
|
@ -1169,10 +1169,9 @@ func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
|
||||||
// Return a Torrent ready for insertion into a Client.
|
// Return a Torrent ready for insertion into a Client.
|
||||||
func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
|
func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
|
||||||
t = &Torrent{
|
t = &Torrent{
|
||||||
cl: cl,
|
cl: cl,
|
||||||
infoHash: ih,
|
infoHash: ih,
|
||||||
chunkSize: defaultChunkSize,
|
peers: make(map[peersKey]Peer),
|
||||||
peers: make(map[peersKey]Peer),
|
|
||||||
|
|
||||||
halfOpen: make(map[string]struct{}),
|
halfOpen: make(map[string]struct{}),
|
||||||
pieceStateChanges: pubsub.NewPubSub(),
|
pieceStateChanges: pubsub.NewPubSub(),
|
||||||
|
@ -1180,6 +1179,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
|
||||||
storageOpener: cl.defaultStorage,
|
storageOpener: cl.defaultStorage,
|
||||||
maxEstablishedConns: defaultEstablishedConnsPerTorrent,
|
maxEstablishedConns: defaultEstablishedConnsPerTorrent,
|
||||||
}
|
}
|
||||||
|
t.setChunkSize(defaultChunkSize)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1228,7 +1228,7 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e
|
||||||
cl.mu.Lock()
|
cl.mu.Lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.mu.Unlock()
|
||||||
if spec.ChunkSize != 0 {
|
if spec.ChunkSize != 0 {
|
||||||
t.chunkSize = pp.Integer(spec.ChunkSize)
|
t.setChunkSize(pp.Integer(spec.ChunkSize))
|
||||||
}
|
}
|
||||||
t.addTrackers(spec.Trackers)
|
t.addTrackers(spec.Trackers)
|
||||||
t.maybeNewConns()
|
t.maybeNewConns()
|
||||||
|
|
|
@ -698,16 +698,11 @@ func (c *connection) lastHelpful() (ret time.Time) {
|
||||||
func (c *connection) mainReadLoop() error {
|
func (c *connection) mainReadLoop() error {
|
||||||
t := c.t
|
t := c.t
|
||||||
cl := t.cl
|
cl := t.cl
|
||||||
pool := &sync.Pool{
|
|
||||||
New: func() interface{} {
|
|
||||||
return make([]byte, t.chunkSize)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
decoder := pp.Decoder{
|
decoder := pp.Decoder{
|
||||||
R: bufio.NewReader(c.rw),
|
R: bufio.NewReader(c.rw),
|
||||||
MaxLength: 256 * 1024,
|
MaxLength: 256 * 1024,
|
||||||
Pool: pool,
|
Pool: t.chunkPool,
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
cl.mu.Unlock()
|
cl.mu.Unlock()
|
||||||
|
@ -782,7 +777,7 @@ func (c *connection) mainReadLoop() error {
|
||||||
case pp.Piece:
|
case pp.Piece:
|
||||||
cl.downloadedChunk(t, c, &msg)
|
cl.downloadedChunk(t, c, &msg)
|
||||||
if len(msg.Piece) == int(t.chunkSize) {
|
if len(msg.Piece) == int(t.chunkSize) {
|
||||||
pool.Put(msg.Piece)
|
t.chunkPool.Put(msg.Piece)
|
||||||
}
|
}
|
||||||
case pp.Extended:
|
case pp.Extended:
|
||||||
switch msg.ExtendedID {
|
switch msg.ExtendedID {
|
||||||
|
|
|
@ -144,10 +144,10 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
|
||||||
Length: 1 << 20,
|
Length: 1 << 20,
|
||||||
PieceLength: 1 << 20,
|
PieceLength: 1 << 20,
|
||||||
},
|
},
|
||||||
chunkSize: defaultChunkSize,
|
|
||||||
storage: &storage.Torrent{ts},
|
storage: &storage.Torrent{ts},
|
||||||
pieceStateChanges: pubsub.NewPubSub(),
|
pieceStateChanges: pubsub.NewPubSub(),
|
||||||
}
|
}
|
||||||
|
t.setChunkSize(defaultChunkSize)
|
||||||
t.makePieces()
|
t.makePieces()
|
||||||
t.pendingPieces.Add(0)
|
t.pendingPieces.Add(0)
|
||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
|
|
10
torrent.go
10
torrent.go
|
@ -52,6 +52,7 @@ type Torrent struct {
|
||||||
// The size of chunks to request from peers over the wire. This is
|
// The size of chunks to request from peers over the wire. This is
|
||||||
// normally 16KiB by convention these days.
|
// normally 16KiB by convention these days.
|
||||||
chunkSize pp.Integer
|
chunkSize pp.Integer
|
||||||
|
chunkPool *sync.Pool
|
||||||
// Total length of the torrent in bytes. Stored because it's not O(1) to
|
// Total length of the torrent in bytes. Stored because it's not O(1) to
|
||||||
// get this from the info dict.
|
// get this from the info dict.
|
||||||
length int64
|
length int64
|
||||||
|
@ -115,6 +116,15 @@ type Torrent struct {
|
||||||
stats TorrentStats
|
stats TorrentStats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *Torrent) setChunkSize(size pp.Integer) {
|
||||||
|
t.chunkSize = size
|
||||||
|
t.chunkPool = &sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make([]byte, size)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *Torrent) setDisplayName(dn string) {
|
func (t *Torrent) setDisplayName(dn string) {
|
||||||
if t.haveInfo() {
|
if t.haveInfo() {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue