Add really simple upload rate limiting and test
This commit is contained in:
parent
76c24e70b5
commit
836bb344f3
21
client.go
21
client.go
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/anacrolix/sync"
|
"github.com/anacrolix/sync"
|
||||||
"github.com/anacrolix/utp"
|
"github.com/anacrolix/utp"
|
||||||
"github.com/dustin/go-humanize"
|
"github.com/dustin/go-humanize"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/bencode"
|
"github.com/anacrolix/torrent/bencode"
|
||||||
"github.com/anacrolix/torrent/dht"
|
"github.com/anacrolix/torrent/dht"
|
||||||
@ -73,7 +74,8 @@ type Client struct {
|
|||||||
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
|
// Our BitTorrent protocol extension bytes, sent in our BT handshakes.
|
||||||
extensionBytes peerExtensionBytes
|
extensionBytes peerExtensionBytes
|
||||||
// The net.Addr.String part that should be common to all active listeners.
|
// The net.Addr.String part that should be common to all active listeners.
|
||||||
listenAddr string
|
listenAddr string
|
||||||
|
uploadLimit *rate.Limiter
|
||||||
|
|
||||||
// Set of addresses that have our client ID. This intentionally will
|
// Set of addresses that have our client ID. This intentionally will
|
||||||
// include ourselves if we end up trying to connect to our own address
|
// include ourselves if we end up trying to connect to our own address
|
||||||
@ -256,6 +258,11 @@ func NewClient(cfg *Config) (cl *Client, err error) {
|
|||||||
dopplegangerAddrs: make(map[string]struct{}),
|
dopplegangerAddrs: make(map[string]struct{}),
|
||||||
torrents: make(map[metainfo.Hash]*Torrent),
|
torrents: make(map[metainfo.Hash]*Torrent),
|
||||||
}
|
}
|
||||||
|
if cfg.UploadRateLimiter == nil {
|
||||||
|
cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
|
||||||
|
} else {
|
||||||
|
cl.uploadLimit = cfg.UploadRateLimiter
|
||||||
|
}
|
||||||
missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
|
missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
|
||||||
cl.event.L = &cl.mu
|
cl.event.L = &cl.mu
|
||||||
storageImpl := cfg.DefaultStorage
|
storageImpl := cfg.DefaultStorage
|
||||||
@ -1086,6 +1093,18 @@ another:
|
|||||||
// We want to upload to the peer.
|
// We want to upload to the peer.
|
||||||
c.Unchoke()
|
c.Unchoke()
|
||||||
for r := range c.PeerRequests {
|
for r := range c.PeerRequests {
|
||||||
|
res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
|
||||||
|
delay := res.Delay()
|
||||||
|
if delay > 0 {
|
||||||
|
res.Cancel()
|
||||||
|
go func() {
|
||||||
|
time.Sleep(delay)
|
||||||
|
cl.mu.Lock()
|
||||||
|
defer cl.mu.Unlock()
|
||||||
|
cl.upload(t, c)
|
||||||
|
}()
|
||||||
|
return
|
||||||
|
}
|
||||||
err := cl.sendChunk(t, c, r)
|
err := cl.sendChunk(t, c, r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
i := int(r.Index)
|
i := int(r.Index)
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/bradfitz/iter"
|
"github.com/bradfitz/iter"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/bencode"
|
"github.com/anacrolix/torrent/bencode"
|
||||||
"github.com/anacrolix/torrent/dht"
|
"github.com/anacrolix/torrent/dht"
|
||||||
@ -270,6 +271,17 @@ func TestClientTransferDefault(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientTransferRateLimited(t *testing.T) {
|
||||||
|
started := time.Now()
|
||||||
|
testClientTransfer(t, testClientTransferParams{
|
||||||
|
// We are uploading 13 bytes (the length of the greeting torrent). The
|
||||||
|
// chunks are 2 bytes in length. Then the smallest burst we can run
|
||||||
|
// with is 2. Time taken is (13-burst)/rate.
|
||||||
|
SeederUploadRateLimiter: rate.NewLimiter(11, 2),
|
||||||
|
})
|
||||||
|
require.True(t, time.Since(started) > time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
|
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
|
||||||
return storage.NewResourcePieces(fc.AsResourceProvider())
|
return storage.NewResourcePieces(fc.AsResourceProvider())
|
||||||
}
|
}
|
||||||
@ -332,12 +344,13 @@ func TestClientTransferVarious(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type testClientTransferParams struct {
|
type testClientTransferParams struct {
|
||||||
Responsive bool
|
Responsive bool
|
||||||
Readahead int64
|
Readahead int64
|
||||||
SetReadahead bool
|
SetReadahead bool
|
||||||
ExportClientStatus bool
|
ExportClientStatus bool
|
||||||
LeecherStorage func(string) storage.ClientImpl
|
LeecherStorage func(string) storage.ClientImpl
|
||||||
SeederStorage func(string) storage.ClientImpl
|
SeederStorage func(string) storage.ClientImpl
|
||||||
|
SeederUploadRateLimiter *rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a seeder and a leecher, and ensures the data transfers when a read
|
// Creates a seeder and a leecher, and ensures the data transfers when a read
|
||||||
@ -348,6 +361,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
|
|||||||
// Create seeder and a Torrent.
|
// Create seeder and a Torrent.
|
||||||
cfg := TestingConfig
|
cfg := TestingConfig
|
||||||
cfg.Seed = true
|
cfg.Seed = true
|
||||||
|
cfg.UploadRateLimiter = ps.SeederUploadRateLimiter
|
||||||
// cfg.ListenAddr = "localhost:4000"
|
// cfg.ListenAddr = "localhost:4000"
|
||||||
if ps.SeederStorage != nil {
|
if ps.SeederStorage != nil {
|
||||||
cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
|
cfg.DefaultStorage = ps.SeederStorage(greetingTempDir)
|
||||||
@ -368,7 +382,11 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
|
|||||||
leecherDataDir, err := ioutil.TempDir("", "")
|
leecherDataDir, err := ioutil.TempDir("", "")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer os.RemoveAll(leecherDataDir)
|
defer os.RemoveAll(leecherDataDir)
|
||||||
cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
|
if ps.LeecherStorage == nil {
|
||||||
|
cfg.DataDir = leecherDataDir
|
||||||
|
} else {
|
||||||
|
cfg.DefaultStorage = ps.LeecherStorage(leecherDataDir)
|
||||||
|
}
|
||||||
// cfg.ListenAddr = "localhost:4001"
|
// cfg.ListenAddr = "localhost:4001"
|
||||||
leecher, err := NewClient(&cfg)
|
leecher, err := NewClient(&cfg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package torrent
|
package torrent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/anacrolix/torrent/dht"
|
"github.com/anacrolix/torrent/dht"
|
||||||
"github.com/anacrolix/torrent/iplist"
|
"github.com/anacrolix/torrent/iplist"
|
||||||
"github.com/anacrolix/torrent/storage"
|
"github.com/anacrolix/torrent/storage"
|
||||||
@ -29,6 +31,10 @@ type Config struct {
|
|||||||
// not altruistic, we'll upload slightly more than we download from each
|
// not altruistic, we'll upload slightly more than we download from each
|
||||||
// peer.
|
// peer.
|
||||||
Seed bool `long:"seed"`
|
Seed bool `long:"seed"`
|
||||||
|
// If non-zero, is the maximum bytes per second of data pieces we'll
|
||||||
|
// upload to peers.
|
||||||
|
UploadRateLimiter *rate.Limiter
|
||||||
|
|
||||||
// User-provided Client peer ID. If not present, one is generated automatically.
|
// User-provided Client peer ID. If not present, one is generated automatically.
|
||||||
PeerID string
|
PeerID string
|
||||||
// For the bittorrent protocol.
|
// For the bittorrent protocol.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user