add transport logic to mocknet
License: MIT Signed-off-by: Karthik Bala <karthikbala444@gmail.com>
This commit is contained in:
parent
aae565ae89
commit
536c183b37
|
@ -7,13 +7,12 @@
|
|||
package mocknet
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
host "github.com/ipfs/go-ipfs/p2p/host"
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
)
|
||||
|
@ -59,13 +58,14 @@ type Mocknet interface {
|
|||
ConnectNets(inet.Network, inet.Network) (inet.Conn, error)
|
||||
DisconnectPeers(peer.ID, peer.ID) error
|
||||
DisconnectNets(inet.Network, inet.Network) error
|
||||
LinkAll() error
|
||||
}
|
||||
|
||||
// LinkOptions are used to change aspects of the links.
|
||||
// Sorry but they dont work yet :(
|
||||
type LinkOptions struct {
|
||||
Latency time.Duration
|
||||
Bandwidth int // in bytes-per-second
|
||||
Bandwidth float64 // in bytes-per-second
|
||||
// we can make these values distributions down the road.
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package mocknet
|
||||
|
||||
import (
|
||||
// "fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
|
@ -11,17 +13,20 @@ import (
|
|||
// link implements mocknet.Link
|
||||
// and, for simplicity, inet.Conn
|
||||
type link struct {
|
||||
mock *mocknet
|
||||
nets []*peernet
|
||||
opts LinkOptions
|
||||
|
||||
mock *mocknet
|
||||
nets []*peernet
|
||||
opts LinkOptions
|
||||
ratelimiter *ratelimiter
|
||||
// this could have addresses on both sides.
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func newLink(mn *mocknet, opts LinkOptions) *link {
|
||||
return &link{mock: mn, opts: opts}
|
||||
l := &link{mock: mn,
|
||||
opts: opts,
|
||||
ratelimiter: NewRatelimiter(opts.Bandwidth)}
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
|
||||
|
@ -57,8 +62,8 @@ func (l *link) newStreamPair() (*stream, *stream) {
|
|||
r1, w1 := io.Pipe()
|
||||
r2, w2 := io.Pipe()
|
||||
|
||||
s1 := &stream{Reader: r1, Writer: w2}
|
||||
s2 := &stream{Reader: r2, Writer: w1}
|
||||
s1 := NewStream(w2, r1)
|
||||
s2 := NewStream(w1, r2)
|
||||
return s1, s2
|
||||
}
|
||||
|
||||
|
@ -86,8 +91,17 @@ func (l *link) Peers() []peer.ID {
|
|||
|
||||
func (l *link) SetOptions(o LinkOptions) {
|
||||
l.opts = o
|
||||
l.ratelimiter.UpdateBandwidth(l.opts.Bandwidth)
|
||||
}
|
||||
|
||||
func (l *link) Options() LinkOptions {
|
||||
return l.opts
|
||||
}
|
||||
|
||||
func (l *link) GetLatency() time.Duration {
|
||||
return l.opts.Latency
|
||||
}
|
||||
|
||||
func (l *link) RateLimit(dataSize int) time.Duration {
|
||||
return l.ratelimiter.Limit(dataSize)
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestNotifications(t *testing.T) {
|
|||
}
|
||||
}
|
||||
if !found {
|
||||
t.Error("connection not found")
|
||||
t.Error("connection not found", c1, len(expect), len(actual))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
package mocknet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
)
|
||||
|
@ -10,10 +14,51 @@ import (
|
|||
type stream struct {
|
||||
io.Reader
|
||||
io.Writer
|
||||
conn *conn
|
||||
conn *conn
|
||||
toDeliver chan *transportObject
|
||||
proc process.Process
|
||||
}
|
||||
|
||||
type transportObject struct {
|
||||
msg []byte
|
||||
arrivalTime time.Time
|
||||
}
|
||||
|
||||
func NewStream(w io.Writer, r io.Reader) *stream {
|
||||
s := &stream{
|
||||
Reader: r,
|
||||
Writer: w,
|
||||
toDeliver: make(chan *transportObject),
|
||||
}
|
||||
|
||||
s.proc = process.WithTeardown(s.teardown)
|
||||
s.proc.Go(s.transport)
|
||||
return s
|
||||
}
|
||||
|
||||
// How to handle errors with writes?
|
||||
func (s *stream) Write(p []byte) (n int, err error) {
|
||||
l := s.conn.link
|
||||
delay := l.GetLatency() + l.RateLimit(len(p))
|
||||
t := time.Now().Add(delay)
|
||||
select {
|
||||
case <-s.proc.Closing(): // bail out if we're closing.
|
||||
return 0, io.ErrClosedPipe
|
||||
case s.toDeliver <- &transportObject{msg: p, arrivalTime: t}:
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (s *stream) Close() error {
|
||||
return s.proc.Close()
|
||||
}
|
||||
|
||||
// teardown shuts down the stream. it is called by s.proc.Close()
|
||||
// after all the children of this s.proc (i.e. transport's proc)
|
||||
// are done.
|
||||
func (s *stream) teardown() error {
|
||||
// at this point, no streams are writing.
|
||||
|
||||
s.conn.removeStream(s)
|
||||
if r, ok := (s.Reader).(io.Closer); ok {
|
||||
r.Close()
|
||||
|
@ -30,3 +75,71 @@ func (s *stream) Close() error {
|
|||
func (s *stream) Conn() inet.Conn {
|
||||
return s.conn
|
||||
}
|
||||
|
||||
// transport will grab message arrival times, wait until that time, and
|
||||
// then write the message out when it is scheduled to arrive
|
||||
func (s *stream) transport(proc process.Process) {
|
||||
bufsize := 256
|
||||
buf := new(bytes.Buffer)
|
||||
ticker := time.NewTicker(time.Millisecond * 4)
|
||||
|
||||
// writeBuf writes the contents of buf through to the s.Writer.
|
||||
// done only when arrival time makes sense.
|
||||
drainBuf := func() {
|
||||
if buf.Len() > 0 {
|
||||
_, err := s.Writer.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
buf.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
// deliverOrWait is a helper func that processes
|
||||
// an incoming packet. it waits until the arrival time,
|
||||
// and then writes things out.
|
||||
deliverOrWait := func(o *transportObject) {
|
||||
buffered := len(o.msg) + buf.Len()
|
||||
|
||||
now := time.Now()
|
||||
if now.Before(o.arrivalTime) {
|
||||
if buffered < bufsize {
|
||||
buf.Write(o.msg)
|
||||
return
|
||||
}
|
||||
|
||||
// we do not buffer + return here, instead hanging the
|
||||
// call (i.e. not accepting any more transportObjects)
|
||||
// so that we apply back-pressure to the sender.
|
||||
// this sleep should wake up same time as ticker.
|
||||
time.Sleep(o.arrivalTime.Sub(now))
|
||||
}
|
||||
|
||||
// ok, we waited our due time. now rite the buf + msg.
|
||||
|
||||
// drainBuf first, before we write this message.
|
||||
drainBuf()
|
||||
|
||||
// write this message.
|
||||
_, err := s.Writer.Write(o.msg)
|
||||
if err != nil {
|
||||
log.Error("mock_stream", err)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-proc.Closing():
|
||||
return // bail out of here.
|
||||
|
||||
case o, ok := <-s.toDeliver:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
deliverOrWait(o)
|
||||
|
||||
case <-ticker.C: // ok, due to write it out.
|
||||
drainBuf()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,9 +3,11 @@ package mocknet
|
|||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
|
@ -478,3 +480,102 @@ func TestAdding(t *testing.T) {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRateLimiting(t *testing.T) {
|
||||
rl := NewRatelimiter(10)
|
||||
|
||||
if !within(rl.Limit(10), time.Duration(float32(time.Second)), time.Millisecond/10) {
|
||||
t.Fail()
|
||||
}
|
||||
if !within(rl.Limit(10), time.Duration(float32(time.Second*2)), time.Millisecond) {
|
||||
t.Fail()
|
||||
}
|
||||
if !within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if within(rl.Limit(10), time.Duration(float32(time.Second*3)), time.Millisecond) {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
rl.UpdateBandwidth(50)
|
||||
if !within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if within(rl.Limit(75), time.Duration(float32(time.Second)*1.5), time.Millisecond/10) {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
rl.UpdateBandwidth(100)
|
||||
if !within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
|
||||
t.Fail()
|
||||
}
|
||||
|
||||
if within(rl.Limit(1), time.Duration(time.Millisecond*10), time.Millisecond/10) {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func within(t1 time.Duration, t2 time.Duration, tolerance time.Duration) bool {
|
||||
return math.Abs(float64(t1)-float64(t2)) < float64(tolerance)
|
||||
}
|
||||
|
||||
func TestLimitedStreams(t *testing.T) {
|
||||
mn, err := FullMeshConnected(context.Background(), 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
messages := 4
|
||||
messageSize := 500
|
||||
handler := func(s inet.Stream) {
|
||||
b := make([]byte, messageSize)
|
||||
for i := 0; i < messages; i++ {
|
||||
if _, err := io.ReadFull(s, b); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(b[:4], []byte("ping")) {
|
||||
log.Fatal("bytes mismatch")
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
s.Close()
|
||||
}
|
||||
|
||||
hosts := mn.Hosts()
|
||||
for _, h := range mn.Hosts() {
|
||||
h.SetStreamHandler(protocol.TestingID, handler)
|
||||
}
|
||||
|
||||
peers := mn.Peers()
|
||||
links := mn.LinksBetweenPeers(peers[0], peers[1])
|
||||
// 1000 byte per second bandwidth
|
||||
bps := float64(1000)
|
||||
opts := links[0].Options()
|
||||
opts.Bandwidth = bps
|
||||
for _, link := range links {
|
||||
link.SetOptions(opts)
|
||||
}
|
||||
|
||||
s, err := hosts[0].NewStream(protocol.TestingID, hosts[1].ID())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
filler := make([]byte, messageSize-4)
|
||||
data := append([]byte("ping"), filler...)
|
||||
before := time.Now()
|
||||
for i := 0; i < messages; i++ {
|
||||
wg.Add(1)
|
||||
if _, err := s.Write(data); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
if !within(time.Since(before), time.Duration(time.Second*2), time.Second/3) {
|
||||
t.Fatal("Expected 2ish seconds but got ", time.Since(before))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package mocknet
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// A ratelimiter is used by a link to determine how long to wait before sending
|
||||
// data given a bandwidth cap.
|
||||
type ratelimiter struct {
|
||||
bandwidth float64 // bytes per nanosecond
|
||||
allowance float64 // in bytes
|
||||
maxAllowance float64 // in bytes
|
||||
lastUpdate time.Time // when allowance was updated last
|
||||
count int // number of times rate limiting was applied
|
||||
duration time.Duration // total delay introduced due to rate limiting
|
||||
}
|
||||
|
||||
// Creates a new ratelimiter with bandwidth (in bytes/sec)
|
||||
func NewRatelimiter(bandwidth float64) *ratelimiter {
|
||||
// convert bandwidth to bytes per nanosecond
|
||||
b := bandwidth / float64(time.Second)
|
||||
return &ratelimiter{
|
||||
bandwidth: b,
|
||||
allowance: 0,
|
||||
maxAllowance: bandwidth,
|
||||
lastUpdate: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Changes bandwidth of a ratelimiter and resets its allowance
|
||||
func (r *ratelimiter) UpdateBandwidth(bandwidth float64) {
|
||||
// Convert bandwidth from bytes/second to bytes/nanosecond
|
||||
b := bandwidth / float64(time.Second)
|
||||
r.bandwidth = b
|
||||
// Reset allowance
|
||||
r.allowance = 0
|
||||
r.maxAllowance = bandwidth
|
||||
r.lastUpdate = time.Now()
|
||||
}
|
||||
|
||||
// Returns how long to wait before sending data with length 'dataSize' bytes
|
||||
func (r *ratelimiter) Limit(dataSize int) time.Duration {
|
||||
// update time
|
||||
var duration time.Duration = time.Duration(0)
|
||||
if r.bandwidth == 0 {
|
||||
return duration
|
||||
}
|
||||
current := time.Now()
|
||||
elapsedTime := current.Sub(r.lastUpdate)
|
||||
r.lastUpdate = current
|
||||
|
||||
allowance := r.allowance + float64(elapsedTime)*r.bandwidth
|
||||
// allowance can't exceed bandwidth
|
||||
if allowance > r.maxAllowance {
|
||||
allowance = r.maxAllowance
|
||||
}
|
||||
|
||||
allowance -= float64(dataSize)
|
||||
if allowance < 0 {
|
||||
// sleep until allowance is back to 0
|
||||
duration = time.Duration(-allowance / r.bandwidth)
|
||||
// rate limiting was applied, record stats
|
||||
r.count++
|
||||
r.duration += duration
|
||||
}
|
||||
|
||||
r.allowance = allowance
|
||||
return duration
|
||||
}
|
Loading…
Reference in New Issue