Implement additional tests and remove unnecessary shuffle

This commit is contained in:
Dmitry 2018-10-04 11:57:51 +03:00
parent 4e1281431d
commit 79ecad8efe
2 changed files with 81 additions and 51 deletions

View File

@ -19,7 +19,6 @@ package whisperv6
import ( import (
"fmt" "fmt"
"math" "math"
"math/rand"
"sync" "sync"
"time" "time"
@ -145,7 +144,6 @@ func (peer *Peer) handshake() error {
if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil { if err := s.Decode(&egressCfg); err == nil && peer.host.ratelimiter != nil {
peer.host.ratelimiter.E.Create(peer.peer, egressCfg) peer.host.ratelimiter.E.Create(peer.peer, egressCfg)
} }
if err := <-errc; err != nil { if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err) return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
} }
@ -207,9 +205,6 @@ func (peer *Peer) reduceBundle(bundle []*Envelope) []*Envelope {
if peer.host.ratelimiter == nil { if peer.host.ratelimiter == nil {
return bundle return bundle
} }
rand.Shuffle(len(bundle), func(i, j int) {
bundle[i], bundle[j] = bundle[j], bundle[i]
})
for i := range bundle { for i := range bundle {
size := int64(bundle[i].size()) size := int64(bundle[i].size())
if peer.host.ratelimiter.E.Available(peer.peer) < size { if peer.host.ratelimiter.E.Available(peer.peer) < size {
@ -235,7 +230,6 @@ func (peer *Peer) broadcast() error {
} }
} }
bundle = peer.reduceBundle(bundle) bundle = peer.reduceBundle(bundle)
if len(bundle) > 0 { if len(bundle) > 0 {
// transmit the batch of envelopes // transmit the batch of envelopes
if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil { if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {

View File

@ -21,10 +21,10 @@ const (
testCode = 42 // any non-defined code will work testCode = 42 // any non-defined code will work
) )
func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf ratelimiter.Config) (*Whisper, *p2p.MsgPipeRW, chan error) { func setupOneConnection(t *testing.T, rlconf, egressConf *ratelimiter.Config) (*Whisper, *p2p.MsgPipeRW, chan error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil) db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err) require.NoError(t, err)
rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, rlconf) rl := ratelimiter.ForWhisper(ratelimiter.IDMode, db, *rlconf)
conf := &Config{ conf := &Config{
MinimumAcceptedPOW: 0, MinimumAcceptedPOW: 0,
MaxMessageSize: 100 << 10, MaxMessageSize: 100 << 10,
@ -36,7 +36,8 @@ func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf rate
rw1, rw2 := p2p.MsgPipe() rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1) errorc := make(chan error, 1)
go func() { go func() {
errorc <- w.HandlePeer(p, rw2) err := w.HandlePeer(p, rw2)
errorc <- err
}() }()
require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, rlconf})) require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, rlconf}))
require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, egressConf)) require.NoError(t, p2p.SendItems(rw1, statusCode, ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, egressConf))
@ -44,7 +45,7 @@ func setupOneConnection(t *testing.T, rlconf ratelimiter.Config, egressConf rate
} }
func TestRatePeerDropsConnection(t *testing.T) { func TestRatePeerDropsConnection(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10} cfg := &ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
_, rw1, errorc := setupOneConnection(t, cfg, cfg) _, rw1, errorc := setupOneConnection(t, cfg, cfg)
require.NoError(t, p2p.Send(rw1, testCode, make([]byte, 11<<10))) // limit is 1024 require.NoError(t, p2p.Send(rw1, testCode, make([]byte, 11<<10))) // limit is 1024
@ -57,54 +58,89 @@ func TestRatePeerDropsConnection(t *testing.T) {
} }
func TestRateLimitedDelivery(t *testing.T) { func TestRateLimitedDelivery(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10} cfg := &ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
ecfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 2 << 10, Quantum: 1 << 10} type testCase struct {
w, rw1, _ := setupOneConnection(t, cfg, ecfg) description string
small1 := Envelope{ cfg *ratelimiter.Config
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()), received []uint64
TTL: 10, notReceived []uint64
Topic: TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
} }
small2 := small1 for _, tc := range []testCase{
small2.Nonce = 2 {
small2.Data = make([]byte, 3<<10) description: "NoEgress",
big := small1 received: []uint64{1, 2, 3},
big.Nonce = 3 },
big.Data = make([]byte, 11<<10) {
description: "EgressSmallerThanIngress",
received: []uint64{1},
notReceived: []uint64{2, 3},
cfg: &ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 2 << 10, Quantum: 1 << 10},
},
{
description: "EgressSameAsIngress",
received: []uint64{1, 2},
notReceived: []uint64{3},
cfg: cfg,
},
} {
tc := tc
t.Run(tc.description, func(t *testing.T) {
t.Parallel()
small1 := Envelope{
Expiry: uint32(time.Now().Add(10 * time.Second).Unix()),
TTL: 10,
Topic: TopicType{1},
Data: make([]byte, 1<<10),
Nonce: 1,
}
rand.Read(small1.Data)
small2 := small1
small2.Nonce = 2
small2.Data = make([]byte, 3<<10)
rand.Read(small2.Data)
big := small1
big.Nonce = 3
big.Data = make([]byte, 11<<10)
rand.Read(big.Data)
require.NoError(t, w.Send(&small1)) w, rw1, _ := setupOneConnection(t, cfg, tc.cfg)
require.NoError(t, w.Send(&big))
require.NoError(t, w.Send(&small2))
received := map[common.Hash]struct{}{} require.NoError(t, w.Send(&small1))
// we can not guarantee that all expected envelopes will be delivered in a one batch require.NoError(t, w.Send(&big))
// so allow whisper to write multiple times and read every message require.NoError(t, w.Send(&small2))
go func() {
time.Sleep(time.Second) received := map[uint64]struct{}{}
rw1.Close() // we can not guarantee that all expected envelopes will be delivered in a one batch
}() // so allow whisper to write multiple times and read every message
for { go func() {
msg, err := rw1.ReadMsg() time.Sleep(3 * time.Second)
if err == p2p.ErrPipeClosed { rw1.Close()
require.Contains(t, received, small1.Hash()) }()
require.NotContains(t, received, small2.Hash()) for {
require.NotContains(t, received, big.Hash()) msg, err := rw1.ReadMsg()
break if err == p2p.ErrPipeClosed {
} for _, n := range tc.received {
require.NoError(t, err) require.Contains(t, received, n)
require.Equal(t, uint64(1), msg.Code) }
var rst []*Envelope for _, n := range tc.notReceived {
require.NoError(t, msg.Decode(&rst)) require.NotContains(t, received, n)
for _, e := range rst { }
received[e.Hash()] = struct{}{} break
} }
require.NoError(t, err)
require.Equal(t, messagesCode, int(msg.Code))
var rst []*Envelope
require.NoError(t, msg.Decode(&rst))
for _, e := range rst {
received[e.Nonce] = struct{}{}
}
}
})
} }
} }
func TestRateRandomizedDelivery(t *testing.T) { func TestRateRandomizedDelivery(t *testing.T) {
cfg := ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10} cfg := &ratelimiter.Config{Interval: uint64(time.Hour), Capacity: 10 << 10, Quantum: 1 << 10}
w1, rw1, _ := setupOneConnection(t, cfg, cfg) w1, rw1, _ := setupOneConnection(t, cfg, cfg)
w2, rw2, _ := setupOneConnection(t, cfg, cfg) w2, rw2, _ := setupOneConnection(t, cfg, cfg)
w3, rw3, _ := setupOneConnection(t, cfg, cfg) w3, rw3, _ := setupOneConnection(t, cfg, cfg)