Benchmark tests (#986)
A set of tests used to benchmark Whisper and MailServer nodes.
This commit is contained in:
parent
ae7faece88
commit
bbe7fba0c9
7
Makefile
7
Makefile
|
@ -30,7 +30,12 @@ STATUSD_PRUNE_IMAGE_NAME ?= statusteam/statusd-prune
|
|||
DOCKER_TEST_WORKDIR = /go/src/github.com/status-im/status-go/
|
||||
DOCKER_TEST_IMAGE = golang:1.10
|
||||
|
||||
UNIT_TEST_PACKAGES := $(shell go list ./... | grep -v /vendor | grep -v /t/e2e | grep -v /t/destructive | grep -v /lib)
|
||||
UNIT_TEST_PACKAGES := $(shell go list ./... | \
|
||||
grep -v /vendor | \
|
||||
grep -v /t/e2e | \
|
||||
grep -v /t/destructive | \
|
||||
grep -v /t/benchmarks | \
|
||||
grep -v /lib)
|
||||
|
||||
# This is a code for automatic help generator.
|
||||
# It supports ANSI colors and categories.
|
||||
|
|
|
@ -168,7 +168,9 @@ func (s *WMailServer) Archive(env *whisper.Envelope) {
|
|||
|
||||
// DeliverMail sends mail to specified whisper peer.
|
||||
func (s *WMailServer) DeliverMail(peer *whisper.Peer, request *whisper.Envelope) {
|
||||
log.Info("Delivering mail", "peer", peer.ID)
|
||||
requestsCounter.Inc(1)
|
||||
|
||||
if peer == nil {
|
||||
requestErrorsCounter.Inc(1)
|
||||
log.Error("Whisper peer is nil")
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
Package benchmarks contains tests that can be used
|
||||
to run benchmarks and stress tests of our cluster components.
|
||||
|
||||
|
||||
Example usage:
|
||||
|
||||
1. Start a Whisper node with mail server capability:
|
||||
./build/bin/statusd \
|
||||
-networkid=4 \
|
||||
-maxpeers=100 \
|
||||
-shh \
|
||||
-shh.pow=0.002 \
|
||||
-shh.mailserver \
|
||||
-shh.passwordfile=./static/keys/wnodepassword \
|
||||
-log DEBUG
|
||||
2. Generate some messages:
|
||||
go test -v -timeout=30s -run TestSendMessages ./t/benchmarks \
|
||||
-peerurl=$ENODE_ADDR \
|
||||
-msgcount=200 \
|
||||
-msgbatchsize=50
|
||||
3. Retrieve them from the mail server:
|
||||
go test -v -timeout=30s -parallel 20 \
|
||||
-run TestConcurrentMailserverPeers
|
||||
./t/benchmarks \
|
||||
-peerurl=$ENODE_ADDR \
|
||||
-msgcount=200
|
||||
|
||||
The result of the last command will tell you how long it took to
|
||||
retrieve 200 messages with 20 concurrent peers (20 * 200 messages
|
||||
in total).
|
||||
|
||||
The result may be affected due to limitations of the host
|
||||
on which it was called. It's recommended running mail server
|
||||
on a different machine and running the third command
|
||||
from some beefy server.
|
||||
*/
|
||||
|
||||
package benchmarks
|
|
@ -0,0 +1,27 @@
|
|||
package benchmarks
|
||||
|
||||
import (
|
||||
"flag"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
)
|
||||
|
||||
var (
|
||||
// general
|
||||
peerURL = flag.String("peerurl", "", "Peer raw URL to which send messages")
|
||||
// mailserver tests
|
||||
ccyPeers = flag.Int("ccypeers", 1, "Number of concurrent peers requesting messages")
|
||||
// messages tests
|
||||
msgPass = flag.String("msgpass", "message-pass", "Password to create sym key from")
|
||||
msgCount = flag.Int64("msgcount", 100, "Number of messages to send")
|
||||
msgSize = flag.Int64("msgsize", int64(1024), "Message size in bytes")
|
||||
msgBatchSize = flag.Int64("msgbatchsize", int64(20), "Number of messages to send in a batch")
|
||||
)
|
||||
|
||||
var peerEnode *discover.Node
|
||||
|
||||
func init() {
|
||||
flag.Parse()
|
||||
|
||||
peerEnode = discover.MustParseNode(*peerURL)
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
package benchmarks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
"github.com/status-im/status-go/services/shhext"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
mailServerPass = "status-offline-inbox"
|
||||
)
|
||||
|
||||
// TestConcurrentMailserverPeers runs `ccyPeers` tests in parallel
|
||||
// that require messages from a MailServer.
|
||||
//
|
||||
// It can be used to test the maximum number of concurrent MailServer peers.
|
||||
//
|
||||
// Messages stored by the MailServer must be generated separately.
|
||||
// Take a look at TestSendMessages test.
|
||||
func TestConcurrentMailserverPeers(t *testing.T) {
|
||||
// Request for messages from mail server
|
||||
for i := 0; i < *ccyPeers; i++ {
|
||||
t.Run(fmt.Sprintf("Peer #%d", i), testMailserverPeer)
|
||||
}
|
||||
}
|
||||
|
||||
func testMailserverPeer(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
shhService := createWhisperService()
|
||||
shhAPI := whisper.NewPublicWhisperAPI(shhService)
|
||||
mailService := shhext.New(shhService, nil, nil)
|
||||
shhextAPI := shhext.NewPublicAPI(mailService)
|
||||
|
||||
// create node with services
|
||||
n, err := createNode()
|
||||
require.NoError(t, err)
|
||||
err = n.Register(func(_ *node.ServiceContext) (node.Service, error) {
|
||||
return shhService, nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
// register mail service as well
|
||||
err = n.Register(func(_ *node.ServiceContext) (node.Service, error) {
|
||||
return mailService, nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// start node
|
||||
require.NoError(t, n.Start())
|
||||
defer func() { require.NoError(t, n.Stop()) }()
|
||||
|
||||
// add mail server as a peer
|
||||
require.NoError(t, addPeerWithConfirmation(n.Server(), peerEnode))
|
||||
|
||||
// sym key to decrypt messages
|
||||
msgSymKeyID, err := shhService.AddSymKeyFromPassword(*msgPass)
|
||||
require.NoError(t, err)
|
||||
|
||||
// prepare new filter for messages from mail server
|
||||
filterID, err := shhAPI.NewMessageFilter(whisper.Criteria{
|
||||
SymKeyID: msgSymKeyID,
|
||||
Topics: []whisper.TopicType{topic},
|
||||
AllowP2P: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
messages, err := shhAPI.GetFilterMessages(filterID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messages, 0)
|
||||
|
||||
// request messages from mail server
|
||||
symKeyID, err := shhService.AddSymKeyFromPassword(mailServerPass)
|
||||
require.NoError(t, err)
|
||||
ok, err := shhAPI.MarkTrustedPeer(context.TODO(), *peerURL)
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
ok, err = shhextAPI.RequestMessages(context.TODO(), shhext.MessagesRequest{
|
||||
MailServerPeer: *peerURL,
|
||||
SymKeyID: symKeyID,
|
||||
Topic: topic,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
// wait for all messages
|
||||
require.NoError(t, waitForMessages(t, *msgCount, shhAPI, filterID))
|
||||
}
|
||||
|
||||
func waitForMessages(t *testing.T, messagesCount int64, shhAPI *whisper.PublicWhisperAPI, filterID string) error {
|
||||
received := int64(0)
|
||||
for range time.After(time.Second) {
|
||||
messages, err := shhAPI.GetFilterMessages(filterID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
received += int64(len(messages))
|
||||
|
||||
fmt.Printf("Received %d messages so far\n", received)
|
||||
|
||||
if received >= messagesCount {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
package benchmarks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestSendMessages sends messages to a peer.
|
||||
//
|
||||
// Because of batching outgoing messages in Whisper V6,
|
||||
// we need to pause and wait until the pending queue
|
||||
// is emptied in Whisper API. Otherwise, the batch
|
||||
// will be too large for the peer to consume it.
|
||||
// It's a potential bug that Whisper code performs
|
||||
// packet.Size > whisper.MaxMessageSize()
|
||||
// check instead of checking the size of each individual message.
|
||||
func TestSendMessages(t *testing.T) {
|
||||
shhService := createWhisperService()
|
||||
shhAPI := whisper.NewPublicWhisperAPI(shhService)
|
||||
|
||||
n, err := createNode()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = n.Register(func(_ *node.ServiceContext) (node.Service, error) {
|
||||
return shhService, nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, n.Stop()) }()
|
||||
|
||||
err = addPeerWithConfirmation(n.Server(), peerEnode)
|
||||
require.NoError(t, err)
|
||||
|
||||
symKeyID, err := shhService.AddSymKeyFromPassword(*msgPass)
|
||||
require.NoError(t, err)
|
||||
|
||||
payload := make([]byte, *msgSize)
|
||||
rand.Read(payload)
|
||||
|
||||
envelopeEvents := make(chan whisper.EnvelopeEvent, 100)
|
||||
sub := shhService.SubscribeEnvelopeEvents(envelopeEvents)
|
||||
defer sub.Unsubscribe()
|
||||
|
||||
batchSent := make(chan struct{})
|
||||
envelopesSent := int64(0)
|
||||
go func() {
|
||||
for ev := range envelopeEvents {
|
||||
if ev.Event != whisper.EventEnvelopeSent {
|
||||
continue
|
||||
}
|
||||
|
||||
envelopesSent++
|
||||
|
||||
if envelopesSent%(*msgBatchSize) == 0 {
|
||||
fmt.Printf("Sent a batch and %d messages\n", envelopesSent)
|
||||
batchSent <- struct{}{}
|
||||
}
|
||||
|
||||
if envelopesSent == *msgCount {
|
||||
fmt.Println("Sent all messages")
|
||||
close(batchSent)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i := int64(1); i <= *msgCount; i++ {
|
||||
_, err := shhAPI.Post(context.TODO(), whisper.NewMessage{
|
||||
SymKeyID: symKeyID,
|
||||
TTL: 30,
|
||||
Topic: topic,
|
||||
Payload: payload,
|
||||
PowTime: 10,
|
||||
PowTarget: 0.005,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
if i%(*msgBatchSize) == 0 {
|
||||
fmt.Println("Waiting for a batch")
|
||||
<-batchSent
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("Waiting for all messages to be sent")
|
||||
<-batchSent
|
||||
require.Equal(t, *msgCount, envelopesSent)
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package benchmarks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/p2p/nat"
|
||||
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
|
||||
)
|
||||
|
||||
var (
|
||||
topic = whisper.TopicType{0xfa, 0xfb, 0xfc, 0xfd}
|
||||
)
|
||||
|
||||
func createNode() (*node.Node, error) {
|
||||
key, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return node.New(&node.Config{
|
||||
DataDir: "",
|
||||
P2P: p2p.Config{
|
||||
PrivateKey: key,
|
||||
DiscoveryV5: false,
|
||||
NoDiscovery: true,
|
||||
MaxPeers: 1,
|
||||
NAT: nat.Any(),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func addPeerWithConfirmation(server *p2p.Server, node *discover.Node) error {
|
||||
ch := make(chan *p2p.PeerEvent, server.MaxPeers)
|
||||
subscription := server.SubscribeEvents(ch)
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
server.AddPeer(node)
|
||||
|
||||
ev := <-ch
|
||||
if ev.Type != p2p.PeerEventTypeAdd || ev.Peer != node.ID {
|
||||
return fmt.Errorf("got unexpected event: %+v", ev)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createWhisperService() *whisper.Whisper {
|
||||
whisperServiceConfig := &whisper.Config{
|
||||
MaxMessageSize: whisper.DefaultMaxMessageSize,
|
||||
MinimumAcceptedPOW: 0.005,
|
||||
TimeSource: func() time.Time { return time.Now().UTC() },
|
||||
}
|
||||
return whisper.New(whisperServiceConfig)
|
||||
}
|
Loading…
Reference in New Issue