diff --git a/client.go b/client.go index e0aa472a..712e557d 100644 --- a/client.go +++ b/client.go @@ -22,6 +22,7 @@ import ( "github.com/anacrolix/sync" "github.com/anacrolix/utp" "github.com/dustin/go-humanize" + "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/dht" @@ -73,7 +74,8 @@ type Client struct { // Our BitTorrent protocol extension bytes, sent in our BT handshakes. extensionBytes peerExtensionBytes // The net.Addr.String part that should be common to all active listeners. - listenAddr string + listenAddr string + uploadLimit *rate.Limiter // Set of addresses that have our client ID. This intentionally will // include ourselves if we end up trying to connect to our own address @@ -256,6 +258,11 @@ func NewClient(cfg *Config) (cl *Client, err error) { dopplegangerAddrs: make(map[string]struct{}), torrents: make(map[metainfo.Hash]*Torrent), } + if cfg.UploadRateLimiter == nil { + cl.uploadLimit = rate.NewLimiter(rate.Inf, 0) + } else { + cl.uploadLimit = cfg.UploadRateLimiter + } missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes) cl.event.L = &cl.mu storageImpl := cfg.DefaultStorage @@ -1086,6 +1093,18 @@ another: // We want to upload to the peer. c.Unchoke() for r := range c.PeerRequests { + res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length)) + delay := res.Delay() + if delay > 0 { + res.Cancel() + go func() { + time.Sleep(delay) + cl.mu.Lock() + defer cl.mu.Unlock() + cl.upload(t, c) + }() + return + } err := cl.sendChunk(t, c, r) if err != nil { i := int(r.Index) diff --git a/client_test.go b/client_test.go index cc1ab594..20aa96f5 100644 --- a/client_test.go +++ b/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/bradfitz/iter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/dht" @@ -270,6 +271,17 @@ func TestClientTransferDefault(t *testing.T) { }) } +func TestClientTransferRateLimited(t *testing.T) { + started := time.Now() + testClientTransfer(t, testClientTransferParams{ + // We are uploading 13 bytes (the length of the greeting torrent). The + // chunks are 2 bytes in length. Then the smallest burst we can run + // with is 2. Time taken is (13-burst)/rate. + SeederUploadRateLimiter: rate.NewLimiter(11, 2), + }) + require.True(t, time.Since(started) > time.Second) +} + func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl { return storage.NewResourcePieces(fc.AsResourceProvider()) } @@ -332,12 +344,13 @@ func TestClientTransferVarious(t *testing.T) { } type testClientTransferParams struct { - Responsive bool - Readahead int64 - SetReadahead bool - ExportClientStatus bool - LeecherStorage func(string) storage.ClientImpl - SeederStorage func(string) storage.ClientImpl + Responsive bool + Readahead int64 + SetReadahead bool + ExportClientStatus bool + LeecherStorage func(string) storage.ClientImpl + SeederStorage func(string) storage.ClientImpl + SeederUploadRateLimiter *rate.Limiter } // Creates a seeder and a leecher, and ensures the data transfers when a read @@ -348,6 +361,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { // Create seeder and a Torrent. cfg := TestingConfig cfg.Seed = true + cfg.UploadRateLimiter = ps.SeederUploadRateLimiter // cfg.ListenAddr = "localhost:4000" if ps.SeederStorage != nil { cfg.DefaultStorage = ps.SeederStorage(greetingTempDir) @@ -368,7 +382,11 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { leecherDataDir, err := ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(leecherDataDir) - cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir) + if ps.LeecherStorage == nil { + cfg.DataDir = leecherDataDir + } else { + cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir) + } // cfg.ListenAddr = "localhost:4001" leecher, err := NewClient(&cfg) require.NoError(t, err) diff --git a/config.go b/config.go index cb52d3cd..2f87c180 100644 --- a/config.go +++ b/config.go @@ -1,6 +1,8 @@ package torrent import ( + "golang.org/x/time/rate" + "github.com/anacrolix/torrent/dht" "github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/storage" @@ -29,6 +31,10 @@ type Config struct { // not altruistic, we'll upload slightly more than we download from each // peer. Seed bool `long:"seed"` + // If non-zero, is the maximum bytes per second of data pieces we'll + // upload to peers. + UploadRateLimiter *rate.Limiter + // User-provided Client peer ID. If not present, one is generated automatically. PeerID string // For the bittorrent protocol.