parent
ed0dbba384
commit
d4cbdc5c38
11
client.go
11
client.go
|
@ -74,8 +74,9 @@ type Client struct {
|
|||
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
|
||||
extensionBytes peerExtensionBytes
|
||||
// The net.Addr.String part that should be common to all active listeners.
|
||||
listenAddr string
|
||||
uploadLimit *rate.Limiter
|
||||
listenAddr string
|
||||
uploadLimit *rate.Limiter
|
||||
downloadLimit *rate.Limiter
|
||||
|
||||
// Set of addresses that have our client ID. This intentionally will
|
||||
// include ourselves if we end up trying to connect to our own address
|
||||
|
@ -263,6 +264,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
|
|||
} else {
|
||||
cl.uploadLimit = cfg.UploadRateLimiter
|
||||
}
|
||||
if cfg.DownloadRateLimiter == nil {
|
||||
cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
|
||||
} else {
|
||||
cl.downloadLimit = cfg.DownloadRateLimiter
|
||||
}
|
||||
missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
|
||||
cl.event.L = &cl.mu
|
||||
storageImpl := cfg.DefaultStorage
|
||||
|
@ -1583,5 +1589,6 @@ func (cl *Client) newConnection(nc net.Conn) (c *connection) {
|
|||
PeerMaxRequests: 250,
|
||||
}
|
||||
c.setRW(connStatsReadWriter{nc, &cl.mu, c})
|
||||
c.r = rateLimitedReader{cl.downloadLimit, c.r}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -271,7 +271,7 @@ func TestClientTransferDefault(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestClientTransferRateLimited(t *testing.T) {
|
||||
func TestClientTransferRateLimitedUpload(t *testing.T) {
|
||||
started := time.Now()
|
||||
testClientTransfer(t, testClientTransferParams{
|
||||
// We are uploading 13 bytes (the length of the greeting torrent). The
|
||||
|
@ -282,6 +282,12 @@ func TestClientTransferRateLimited(t *testing.T) {
|
|||
require.True(t, time.Since(started) > time.Second)
|
||||
}
|
||||
|
||||
func TestClientTransferRateLimitedDownload(t *testing.T) {
|
||||
testClientTransfer(t, testClientTransferParams{
|
||||
LeecherDownloadRateLimiter: rate.NewLimiter(512, 512),
|
||||
})
|
||||
}
|
||||
|
||||
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
|
||||
return storage.NewResourcePieces(fc.AsResourceProvider())
|
||||
}
|
||||
|
@ -344,13 +350,14 @@ func TestClientTransferVarious(t *testing.T) {
|
|||
}
|
||||
|
||||
type testClientTransferParams struct {
|
||||
Responsive bool
|
||||
Readahead int64
|
||||
SetReadahead bool
|
||||
ExportClientStatus bool
|
||||
LeecherStorage func(string) storage.ClientImpl
|
||||
SeederStorage func(string) storage.ClientImpl
|
||||
SeederUploadRateLimiter *rate.Limiter
|
||||
Responsive bool
|
||||
Readahead int64
|
||||
SetReadahead bool
|
||||
ExportClientStatus bool
|
||||
LeecherStorage func(string) storage.ClientImpl
|
||||
SeederStorage func(string) storage.ClientImpl
|
||||
SeederUploadRateLimiter *rate.Limiter
|
||||
LeecherDownloadRateLimiter *rate.Limiter
|
||||
}
|
||||
|
||||
// Creates a seeder and a leecher, and ensures the data transfers when a read
|
||||
|
@ -387,6 +394,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
|
|||
} else {
|
||||
cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
|
||||
}
|
||||
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
|
||||
// cfg.ListenAddr = "localhost:4001"
|
||||
leecher, err := NewClient(&cfg)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/anacrolix/tagflag"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/gosuri/uiprogress"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
|
@ -110,13 +111,18 @@ func addTorrents(client *torrent.Client) {
|
|||
}
|
||||
}
|
||||
|
||||
var flags struct {
|
||||
Mmap bool `help:"memory-map torrent data"`
|
||||
TestPeer []*net.TCPAddr `help:"addresses of some starting peers"`
|
||||
Seed bool `help:"seed after download is complete"`
|
||||
Addr *net.TCPAddr `help:"network listen addr"`
|
||||
var flags = struct {
|
||||
Mmap bool `help:"memory-map torrent data"`
|
||||
TestPeer []*net.TCPAddr `help:"addresses of some starting peers"`
|
||||
Seed bool `help:"seed after download is complete"`
|
||||
Addr *net.TCPAddr `help:"network listen addr"`
|
||||
UploadRate tagflag.Bytes `help:"max piece bytes to send per second"`
|
||||
DownloadRate tagflag.Bytes `help:"max bytes per second down from peers"`
|
||||
tagflag.StartPos
|
||||
Torrent []string `arity:"+" help:"torrent file path or magnet uri"`
|
||||
}{
|
||||
UploadRate: -1,
|
||||
DownloadRate: -1,
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
@ -132,6 +138,12 @@ func main() {
|
|||
if flags.Seed {
|
||||
clientConfig.Seed = true
|
||||
}
|
||||
if flags.UploadRate != -1 {
|
||||
clientConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(flags.UploadRate), 256<<10)
|
||||
}
|
||||
if flags.DownloadRate != -1 {
|
||||
clientConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(flags.DownloadRate), 1<<20)
|
||||
}
|
||||
|
||||
client, err := torrent.NewClient(&clientConfig)
|
||||
if err != nil {
|
||||
|
|
|
@ -33,7 +33,12 @@ type Config struct {
|
|||
Seed bool `long:"seed"`
|
||||
// Events are data bytes sent in pieces. The burst must be large enough to
|
||||
// fit a whole chunk.
|
||||
UploadRateLimiter *rate.Limiter
|
||||
UploadRateLimiter *rate.Limiter
|
||||
// The events are bytes read from connections. The burst must be bigger
|
||||
// than the largest Read performed on a Conn minus one. This is likely to
|
||||
// be the larger of the main read loop buffer (~4096), and the requested
|
||||
// chunk size (~16KiB).
|
||||
DownloadRateLimiter *rate.Limiter
|
||||
|
||||
// User-provided Client peer ID. If not present, one is generated automatically.
|
||||
PeerID string
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package torrent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type rateLimitedReader struct {
|
||||
l *rate.Limiter
|
||||
r io.Reader
|
||||
}
|
||||
|
||||
func (me rateLimitedReader) Read(b []byte) (n int, err error) {
|
||||
if err := me.l.WaitN(context.Background(), 1); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
n, err = me.r.Read(b)
|
||||
if !me.l.ReserveN(time.Now(), n-1).OK() {
|
||||
panic(n - 1)
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Reference in New Issue