diff --git a/net/mock/interface.go b/net/mock/interface.go index 9887269b..28687d70 100644 --- a/net/mock/interface.go +++ b/net/mock/interface.go @@ -7,13 +7,12 @@ package mocknet import ( - "io" - "time" - ic "github.com/ipfs/go-ipfs/p2p/crypto" host "github.com/ipfs/go-ipfs/p2p/host" inet "github.com/ipfs/go-ipfs/p2p/net" peer "github.com/ipfs/go-ipfs/p2p/peer" + "io" + "time" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) @@ -59,13 +58,14 @@ type Mocknet interface { ConnectNets(inet.Network, inet.Network) (inet.Conn, error) DisconnectPeers(peer.ID, peer.ID) error DisconnectNets(inet.Network, inet.Network) error + LinkAll() error } // LinkOptions are used to change aspects of the links. // Sorry but they dont work yet :( type LinkOptions struct { Latency time.Duration - Bandwidth int // in bytes-per-second + Bandwidth float64 // in bytes-per-second // we can make these values distributions down the road. } diff --git a/net/mock/mock_link.go b/net/mock/mock_link.go index 618b9070..28575e75 100644 --- a/net/mock/mock_link.go +++ b/net/mock/mock_link.go @@ -1,8 +1,10 @@ package mocknet import ( + // "fmt" "io" "sync" + "time" inet "github.com/ipfs/go-ipfs/p2p/net" peer "github.com/ipfs/go-ipfs/p2p/peer" @@ -11,17 +13,20 @@ import ( // link implements mocknet.Link // and, for simplicity, inet.Conn type link struct { - mock *mocknet - nets []*peernet - opts LinkOptions - + mock *mocknet + nets []*peernet + opts LinkOptions + ratelimiter *ratelimiter // this could have addresses on both sides. sync.RWMutex } func newLink(mn *mocknet, opts LinkOptions) *link { - return &link{mock: mn, opts: opts} + l := &link{mock: mn, + opts: opts, + ratelimiter: NewRatelimiter(opts.Bandwidth)} + return l } func (l *link) newConnPair(dialer *peernet) (*conn, *conn) { @@ -57,8 +62,8 @@ func (l *link) newStreamPair() (*stream, *stream) { r1, w1 := io.Pipe() r2, w2 := io.Pipe() - s1 := &stream{Reader: r1, Writer: w2} - s2 := &stream{Reader: r2, Writer: w1} + s1 := NewStream(w2, r1) + s2 := NewStream(w1, r2) return s1, s2 } @@ -86,8 +91,17 @@ func (l *link) Peers() []peer.ID { func (l *link) SetOptions(o LinkOptions) { l.opts = o + l.ratelimiter.UpdateBandwidth(l.opts.Bandwidth) } func (l *link) Options() LinkOptions { return l.opts } + +func (l *link) GetLatency() time.Duration { + return l.opts.Latency +} + +func (l *link) RateLimit(dataSize int) time.Duration { + return l.ratelimiter.Limit(dataSize) +} diff --git a/net/mock/mock_notif_test.go b/net/mock/mock_notif_test.go index d91403f8..1ea9df42 100644 --- a/net/mock/mock_notif_test.go +++ b/net/mock/mock_notif_test.go @@ -63,7 +63,7 @@ func TestNotifications(t *testing.T) { } } if !found { - t.Error("connection not found") + t.Error("connection not found", c1, len(expect), len(actual)) } } diff --git a/net/mock/mock_stream.go b/net/mock/mock_stream.go index 1820553c..56e3031e 100644 --- a/net/mock/mock_stream.go +++ b/net/mock/mock_stream.go @@ -1,7 +1,11 @@ package mocknet import ( + "bytes" "io" + "time" + + process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" inet "github.com/ipfs/go-ipfs/p2p/net" ) @@ -10,10 +14,51 @@ import ( type stream struct { io.Reader io.Writer - conn *conn + conn *conn + toDeliver chan *transportObject + proc process.Process +} + +type transportObject struct { + msg []byte + arrivalTime time.Time +} + +func NewStream(w io.Writer, r io.Reader) *stream { + s := &stream{ + Reader: r, + Writer: w, + toDeliver: make(chan *transportObject), + } + + s.proc = process.WithTeardown(s.teardown) + s.proc.Go(s.transport) + return s +} + +// How to handle errors with writes? +func (s *stream) Write(p []byte) (n int, err error) { + l := s.conn.link + delay := l.GetLatency() + l.RateLimit(len(p)) + t := time.Now().Add(delay) + select { + case <-s.proc.Closing(): // bail out if we're closing. + return 0, io.ErrClosedPipe + case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}: + } + return len(p), nil } func (s *stream) Close() error { + return s.proc.Close() +} + +// teardown shuts down the stream. it is called by s.proc.Close() +// after all the children of this s.proc (i.e. transport's proc) +// are done. +func (s *stream) teardown() error { + // at this point, no streams are writing. + s.conn.removeStream(s) if r, ok := (s.Reader).(io.Closer); ok { r.Close() @@ -30,3 +75,71 @@ func (s *stream) Close() error { func (s *stream) Conn() inet.Conn { return s.conn } + +// transport will grab message arrival times, wait until that time, and +// then write the message out when it is scheduled to arrive +func (s *stream) transport(proc process.Process) { + bufsize := 256 + buf := new(bytes.Buffer) + ticker := time.NewTicker(time.Millisecond * 4) + + // writeBuf writes the contents of buf through to the s.Writer. + // done only when arrival time makes sense. + drainBuf := func() { + if buf.Len() > 0 { + _, err := s.Writer.Write(buf.Bytes()) + if err != nil { + return + } + buf.Reset() + } + } + + // deliverOrWait is a helper func that processes + // an incoming packet. it waits until the arrival time, + // and then writes things out. + deliverOrWait := func(o *transportObject) { + buffered := len(o.msg) + buf.Len() + + now := time.Now() + if now.Before(o.arrivalTime) { + if buffered < bufsize { + buf.Write(o.msg) + return + } + + // we do not buffer + return here, instead hanging the + // call (i.e. not accepting any more transportObjects) + // so that we apply back-pressure to the sender. + // this sleep should wake up same time as ticker. + time.Sleep(o.arrivalTime.Sub(now)) + } + + // ok, we waited our due time. now rite the buf + msg. + + // drainBuf first, before we write this message. + drainBuf() + + // write this message. + _, err := s.Writer.Write(o.msg) + if err != nil { + log.Error("mock_stream", err) + } + } + + for { + select { + case <-proc.Closing(): + return // bail out of here. + + case o, ok := <-s.toDeliver: + if !ok { + return + } + deliverOrWait(o) + + case <-ticker.C: // ok, due to write it out. + drainBuf() + } + } +} diff --git a/net/mock/mock_test.go b/net/mock/mock_test.go index d402fe04..36088682 100644 --- a/net/mock/mock_test.go +++ b/net/mock/mock_test.go @@ -3,9 +3,11 @@ package mocknet import ( "bytes" "io" + "math" "math/rand" "sync" "testing" + "time" inet "github.com/ipfs/go-ipfs/p2p/net" peer "github.com/ipfs/go-ipfs/p2p/peer" @@ -478,3 +480,102 @@ func TestAdding(t *testing.T) { } } + +func TestRateLimiting(t *testing.T) { + rl := NewRatelimiter(10) + + if !within(rl.Limit(10), time.Duration(float32(time.Second)), time.Millisecond/10) { + t.Fail() + } + if !within(rl.Limit(10), time.Duration(float32(time.Second*2)), time.Millisecond) { + t.Fail() + } + if !within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) { + t.Fail() + } + + if within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) { + t.Fail() + } + + rl.UpdateBandwidth(50) + if !within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) { + t.Fail() + } + + if within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) { + t.Fail() + } + + rl.UpdateBandwidth(100) + if !within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) { + t.Fail() + } + + if within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) { + t.Fail() + } +} + +func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool { + return math.Abs(float64(t1)-float64(t2)) < float64(tolerance) +} + +func TestLimitedStreams(t *testing.T) { + mn, err := FullMeshConnected(context.Background(), 2) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + messages := 4 + messageSize := 500 + handler := func(s inet.Stream) { + b := make([]byte, messageSize) + for i := 0; i < messages; i++ { + if _, err := io.ReadFull(s, b); err != nil { + log.Fatal(err) + } + if !bytes.Equal(b[:4], []byte("ping")) { + log.Fatal("bytes mismatch") + } + wg.Done() + } + s.Close() + } + + hosts := mn.Hosts() + for _, h := range mn.Hosts() { + h.SetStreamHandler(protocol.TestingID, handler) + } + + peers := mn.Peers() + links := mn.LinksBetweenPeers(peers[0], peers[1]) + // 1000 byte per second bandwidth + bps := float64(1000) + opts := links[0].Options() + opts.Bandwidth = bps + for _, link := range links { + link.SetOptions(opts) + } + + s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID()) + if err != nil { + t.Fatal(err) + } + + filler := make([]byte, messageSize-4) + data := append([]byte("ping"), filler...) + before := time.Now() + for i := 0; i < messages; i++ { + wg.Add(1) + if _, err := s.Write(data); err != nil { + panic(err) + } + } + + wg.Wait() + if !within(time.Since(before), time.Duration(time.Second*2), time.Second/3) { + t.Fatal("Expected 2ish seconds but got ", time.Since(before)) + } +} diff --git a/net/mock/ratelimiter.go b/net/mock/ratelimiter.go new file mode 100644 index 00000000..65eb7b6a --- /dev/null +++ b/net/mock/ratelimiter.go @@ -0,0 +1,69 @@ +package mocknet + +import ( + "time" +) + +// A ratelimiter is used by a link to determine how long to wait before sending +// data given a bandwidth cap. +type ratelimiter struct { + bandwidth float64 // bytes per nanosecond + allowance float64 // in bytes + maxAllowance float64 // in bytes + lastUpdate time.Time // when allowance was updated last + count int // number of times rate limiting was applied + duration time.Duration // total delay introduced due to rate limiting +} + +// Creates a new ratelimiter with bandwidth (in bytes/sec) +func NewRatelimiter(bandwidth float64) *ratelimiter { + // convert bandwidth to bytes per nanosecond + b := bandwidth / float64(time.Second) + return &ratelimiter{ + bandwidth: b, + allowance: 0, + maxAllowance: bandwidth, + lastUpdate: time.Now(), + } +} + +// Changes bandwidth of a ratelimiter and resets its allowance +func (r *ratelimiter) UpdateBandwidth(bandwidth float64) { + // Convert bandwidth from bytes/second to bytes/nanosecond + b := bandwidth / float64(time.Second) + r.bandwidth = b + // Reset allowance + r.allowance = 0 + r.maxAllowance = bandwidth + r.lastUpdate = time.Now() +} + +// Returns how long to wait before sending data with length 'dataSize' bytes +func (r *ratelimiter) Limit(dataSize int) time.Duration { + // update time + var duration time.Duration = time.Duration(0) + if r.bandwidth == 0 { + return duration + } + current := time.Now() + elapsedTime := current.Sub(r.lastUpdate) + r.lastUpdate = current + + allowance := r.allowance + float64(elapsedTime)*r.bandwidth + // allowance can't exceed bandwidth + if allowance > r.maxAllowance { + allowance = r.maxAllowance + } + + allowance -= float64(dataSize) + if allowance < 0 { + // sleep until allowance is back to 0 + duration = time.Duration(-allowance / r.bandwidth) + // rate limiting was applied, record stats + r.count++ + r.duration += duration + } + + r.allowance = allowance + return duration +}