feat: upload/download rate for waku v1 messages (#2286)

* feat: upload/download rate for waku v1 messages

* reorganize code

* fix failing test
This commit is contained in:
RichΛrd 2021-08-03 15:27:15 -04:00 committed by GitHub
parent c61cf4e1b2
commit facad9f07e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 439 additions and 63 deletions

View File

@ -141,6 +141,10 @@ func (w *gethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, er
return id, nil
}
func (w *gethWakuWrapper) GetStats() types.StatsSummary {
return w.waku.GetStats()
}
func (w *gethWakuWrapper) GetFilter(id string) types.Filter {
return NewWakuFilterWrapper(w.waku.GetFilter(id), id)
}

View File

@ -145,6 +145,10 @@ func (w *gethWakuV2Wrapper) Subscribe(opts *types.SubscriptionOptions) (string,
return id, nil
}
func (w *gethWakuV2Wrapper) GetStats() types.StatsSummary {
return w.waku.GetStats()
}
func (w *gethWakuV2Wrapper) GetFilter(id string) types.Filter {
return NewWakuV2FilterWrapper(w.waku.GetFilter(id), id)
}

6
eth-node/types/stats.go Normal file
View File

@ -0,0 +1,6 @@
package types
type StatsSummary struct {
UploadRate uint64 `json:"uploadRate"`
DownloadRate uint64 `json:"downloadRate"`
}

View File

@ -40,6 +40,8 @@ type Waku interface {
GetSymKey(id string) ([]byte, error)
MaxMessageSize() uint32
GetStats() StatsSummary
Subscribe(opts *SubscriptionOptions) (string, error)
GetFilter(id string) Filter
Unsubscribe(id string) error

View File

@ -2291,6 +2291,10 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
return m.handleRetrievedMessages(chatWithMessages)
}
func (m *Messenger) GetStats() types.StatsSummary {
return m.transport.GetStats()
}
type CurrentMessageState struct {
// Message is the protobuf message received
Message protobuf.ChatMessage

View File

@ -209,6 +209,10 @@ func (t *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*Filter, error)
return filters, nil
}
func (t *Transport) GetStats() types.StatsSummary {
return t.waku.GetStats()
}
func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
result := make(map[Filter][]*types.Message)

View File

@ -160,6 +160,7 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) {
}
go s.retrieveMessagesLoop(time.Second, s.cancelMessenger)
go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger)
go s.retrieveStats(5*time.Second, s.cancelMessenger)
return response, nil
}
@ -192,6 +193,21 @@ func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{
}
}
func (s *Service) retrieveStats(tick time.Duration, cancel <-chan struct{}) {
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
response := s.messenger.GetStats()
PublisherSignalHandler{}.Stats(response)
case <-cancel:
return
}
}
}
type verifyTransactionClient struct {
chainID *big.Int
url string

View File

@ -45,6 +45,10 @@ func (h PublisherSignalHandler) NewMessages(response *protocol.MessengerResponse
signal.SendNewMessages(response)
}
func (h PublisherSignalHandler) Stats(stats types.StatsSummary) {
signal.SendStats(stats)
}
// MessengerSignalHandler sends signals on messenger events
type MessengerSignalsHandler struct{}

11
signal/events_stats.go Normal file
View File

@ -0,0 +1,11 @@
package signal
const (
// EventsStats is sent periodically with stats like upload/download rate
EventStats = "stats"
)
// SendStats sends stats signal.
func SendStats(stats interface{}) {
send(EventStats, stats)
}

View File

@ -37,6 +37,8 @@ type Peer interface {
SendP2PMessages([]*Envelope) error
SendRawP2PDirect([]rlp.RawValue) error
SendBundle(bundle []*Envelope) (rst common.Hash, err error)
// Mark marks an envelope known to the peer so that it won't be sent back.
Mark(*Envelope)
// Marked checks if an envelope is already known to the remote peer.

118
waku/common/stats.go Normal file
View File

@ -0,0 +1,118 @@
package common
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/types"
)
type Measure struct {
Timestamp int64
Size uint64
}
type StatsTracker struct {
Uploads []Measure
Downloads []Measure
statsMutex sync.Mutex
}
const measurementPeriod = 15 * time.Second
func measure(input interface{}) (*Measure, error) {
b, err := rlp.EncodeToBytes(input)
if err != nil {
return nil, err
}
return &Measure{
Timestamp: time.Now().UnixNano(),
Size: uint64(len(b)),
}, nil
}
func (s *StatsTracker) AddUpload(input interface{}) {
go func(input interface{}) {
m, err := measure(input)
if err != nil {
return
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Uploads = append(s.Uploads, *m)
}(input)
}
func (s *StatsTracker) AddDownload(input interface{}) {
go func(input interface{}) {
m, err := measure(input)
if err != nil {
return
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Downloads = append(s.Downloads, *m)
}(input)
}
func (s *StatsTracker) AddUploadBytes(size uint64) {
go func(size uint64) {
m := Measure{
Timestamp: time.Now().UnixNano(),
Size: size,
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Uploads = append(s.Uploads, m)
}(size)
}
func (s *StatsTracker) AddDownloadBytes(size uint64) {
go func(size uint64) {
m := Measure{
Timestamp: time.Now().UnixNano(),
Size: size,
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Downloads = append(s.Downloads, m)
}(size)
}
func calculateAverage(measures []Measure, minTime int64) (validMeasures []Measure, rate uint64) {
for _, m := range measures {
if m.Timestamp > minTime {
// Only use recent measures
validMeasures = append(validMeasures, m)
rate += m.Size
}
}
rate /= (uint64(measurementPeriod) / uint64(1*time.Second))
return
}
func (s *StatsTracker) GetRatePerSecond() (uploadRate uint64, downloadRate uint64) {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
minTime := time.Now().Add(-measurementPeriod).UnixNano()
s.Uploads, uploadRate = calculateAverage(s.Uploads, minTime)
s.Downloads, downloadRate = calculateAverage(s.Downloads, minTime)
return
}
func (s *StatsTracker) GetStats() types.StatsSummary {
uploadRate, downloadRate := s.GetRatePerSecond()
summary := types.StatsSummary{
UploadRate: uploadRate,
DownloadRate: downloadRate,
}
return summary
}

View File

@ -54,10 +54,12 @@ type Peer struct {
rateLimitsMu sync.Mutex
rateLimits common.RateLimits
stats *common.StatsTracker
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
}
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) common.Peer {
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger, stats *common.StatsTracker) common.Peer {
if logger == nil {
logger = zap.NewNop()
}
@ -73,6 +75,7 @@ func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logg
quit: make(chan struct{}),
bloomFilter: common.MakeFullNodeBloom(),
fullNode: true,
stats: stats,
}
}
@ -542,7 +545,7 @@ func (p *Peer) broadcast() error {
return nil
}
batchHash, err := sendBundle(p.rw, bundle)
batchHash, err := p.SendBundle(bundle)
if err != nil {
p.logger.Debug("failed to deliver envelopes", zap.String("peerID", types.EncodeHex(p.ID())), zap.Error(err))
return err
@ -565,12 +568,12 @@ func (p *Peer) broadcast() error {
return nil
}
func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Hash, err error) {
func (p *Peer) SendBundle(bundle []*common.Envelope) (rst gethcommon.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
err = p.rw.WriteMsg(p2p.Msg{
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),

View File

@ -94,7 +94,7 @@ func TestPeerBasic(t *testing.T) {
t.Fatalf("failed Wrap with seed %d.", seed)
}
p := NewPeer(nil, nil, nil, nil)
p := NewPeer(nil, nil, nil, nil, nil)
p.Mark(env)
if !p.Marked(env) {
t.Fatalf("failed mark with seed %d.", seed)
@ -114,7 +114,9 @@ func TestSendBundle(t *testing.T) {
errc := make(chan error)
go func() {
_, err := sendBundle(rw1, envelopes)
stats := &common.StatsTracker{}
p := NewPeer(nil, nil, rw1, nil, stats)
_, err := p.SendBundle(envelopes)
errc <- err
}()
require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes))

View File

@ -54,10 +54,12 @@ type Peer struct {
bytesRateLimitsMu sync.Mutex
bytesRateLimits common.RateLimits
stats *common.StatsTracker
known mapset.Set // Messages already known by the peer to avoid wasting bandwidth
}
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) common.Peer {
func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger, stats *common.StatsTracker) common.Peer {
if logger == nil {
logger = zap.NewNop()
}
@ -73,6 +75,7 @@ func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logg
quit: make(chan struct{}),
bloomFilter: common.MakeFullNodeBloom(),
fullNode: true,
stats: stats,
}
}
@ -90,47 +93,81 @@ func (p *Peer) Stop() {
p.logger.Debug("stopping peer", zap.String("peerID", types.EncodeHex(p.ID())))
}
func (p *Peer) NotifyAboutPowRequirementChange(pow float64) error {
func (p *Peer) NotifyAboutPowRequirementChange(pow float64) (err error) {
i := math.Float64bits(pow)
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{PoWRequirement: &i})
data := StatusOptions{PoWRequirement: &i}
err = p2p.Send(p.rw, statusUpdateCode, data)
if err != nil {
p.stats.AddUpload(data)
}
return
}
func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{BloomFilter: bloom})
func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) (err error) {
data := StatusOptions{BloomFilter: bloom}
err = p2p.Send(p.rw, statusUpdateCode, data)
if err != nil {
p.stats.AddUpload(data)
}
return
}
func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) error {
return p2p.Send(p.rw, statusUpdateCode, StatusOptions{TopicInterest: topics})
func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) (err error) {
data := StatusOptions{TopicInterest: topics}
err = p2p.Send(p.rw, statusUpdateCode, data)
if err != nil {
p.stats.AddUpload(data)
}
return
}
func (p *Peer) SetPeerTrusted(trusted bool) {
p.trusted = trusted
}
func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error {
return p2p.Send(p.rw, p2pRequestCode, envelope)
func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) (err error) {
err = p2p.Send(p.rw, p2pRequestCode, envelope)
if err != nil {
p.stats.AddUpload(envelope)
}
return
}
func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error {
return p2p.Send(p.rw, p2pRequestCode, request)
func (p *Peer) SendMessagesRequest(request common.MessagesRequest) (err error) {
err = p2p.Send(p.rw, p2pRequestCode, request)
if err != nil {
p.stats.AddUpload(request)
}
return
}
func (p *Peer) SendHistoricMessageResponse(payload []byte) error {
func (p *Peer) SendHistoricMessageResponse(payload []byte) (err error) {
size, r, err := rlp.EncodeToReader(payload)
if err != nil {
return err
}
return p.rw.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
err = p.rw.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r})
if err != nil {
p.stats.AddUpload(payload)
}
return
}
func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) error {
return p2p.Send(p.rw, p2pMessageCode, envelopes)
func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) (err error) {
err = p2p.Send(p.rw, p2pMessageCode, envelopes)
if err != nil {
p.stats.AddUpload(envelopes)
}
return
}
func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) error {
return p2p.Send(p.rw, p2pMessageCode, envelopes)
func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) (err error) {
err = p2p.Send(p.rw, p2pMessageCode, envelopes)
if err != nil {
p.stats.AddUpload(envelopes)
}
return
}
func (p *Peer) SetRWWriter(rw p2p.MsgReadWriter) {
@ -189,6 +226,8 @@ func (p *Peer) Run() error {
return err
}
p.stats.AddDownloadBytes(uint64(packet.Size))
if packet.Size > p.host.MaxMessageSize() {
logger.Warn("oversize message received", zap.String("peerID", types.EncodeHex(p.ID())), zap.Uint32("size", packet.Size))
return errors.New("oversize message received")
@ -384,11 +423,18 @@ func (p *Peer) handleP2PRequestCompleteCode(packet p2p.Msg) error {
// sendConfirmation sends messageResponseCode and batchAcknowledgedCode messages.
func (p *Peer) sendConfirmation(data []byte, envelopeErrors []common.EnvelopeError) (err error) {
batchHash := crypto.Keccak256Hash(data)
err = p2p.Send(p.rw, messageResponseCode, NewMessagesResponse(batchHash, envelopeErrors))
msg := NewMessagesResponse(batchHash, envelopeErrors)
err = p2p.Send(p.rw, messageResponseCode, msg)
if err != nil {
p.stats.AddUpload(msg)
return
}
err = p2p.Send(p.rw, batchAcknowledgedCode, batchHash) // DEPRECATED
if err != nil {
p.stats.AddUpload(batchHash)
}
return
}
@ -399,7 +445,11 @@ func (p *Peer) handshake() error {
errc := make(chan error, 1)
opts := StatusOptionsFromHost(p.host)
go func() {
errc <- p2p.Send(p.rw, statusCode, opts)
err := p2p.Send(p.rw, statusCode, opts)
if err != nil {
p.stats.AddUpload(statusCode)
}
errc <- err
}()
// Fetch the remote status packet and verify protocol match
@ -407,6 +457,9 @@ func (p *Peer) handshake() error {
if err != nil {
return err
}
p.stats.AddDownloadBytes(uint64(packet.Size))
if packet.Code != statusCode {
return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code)
}
@ -418,6 +471,7 @@ func (p *Peer) handshake() error {
if err := s.Decode(&peerOptions); err != nil {
return fmt.Errorf("p [%x]: failed to decode status options: %v", p.ID(), err)
}
if err := p.setOptions(peerOptions.WithDefaults()); err != nil {
return fmt.Errorf("p [%x]: failed to set options: %v", p.ID(), err)
}
@ -532,7 +586,7 @@ func (p *Peer) broadcast() error {
return nil
}
batchHash, err := sendBundle(p.rw, bundle)
batchHash, err := p.SendBundle(bundle)
if err != nil {
p.logger.Debug("failed to deliver envelopes", zap.String("peerID", types.EncodeHex(p.ID())), zap.Error(err))
return err
@ -555,19 +609,25 @@ func (p *Peer) broadcast() error {
return nil
}
func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Hash, err error) {
func (p *Peer) SendBundle(bundle []*common.Envelope) (rst gethcommon.Hash, err error) {
data, err := rlp.EncodeToBytes(bundle)
if err != nil {
return
}
err = rw.WriteMsg(p2p.Msg{
msg := p2p.Msg{
Code: messagesCode,
Size: uint32(len(data)),
Payload: bytes.NewBuffer(data),
})
}
err = p.rw.WriteMsg(msg)
if err != nil {
return
}
p.stats.AddUpload(bundle)
return crypto.Keccak256Hash(data), nil
}

View File

@ -94,7 +94,7 @@ func TestPeerBasic(t *testing.T) {
t.Fatalf("failed Wrap with seed %d.", seed)
}
p := NewPeer(nil, nil, nil, nil)
p := NewPeer(nil, nil, nil, nil, nil)
p.Mark(env)
if !p.Marked(env) {
t.Fatalf("failed mark with seed %d.", seed)
@ -114,7 +114,9 @@ func TestSendBundle(t *testing.T) {
errc := make(chan error)
go func() {
_, err := sendBundle(rw1, envelopes)
stats := &common.StatsTracker{}
p := NewPeer(nil, nil, rw1, nil, stats)
_, err := p.SendBundle(envelopes)
errc <- err
}()
require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes))

View File

@ -87,6 +87,7 @@ type Waku struct {
expirations map[uint32]mapset.Set // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
stats *common.StatsTracker
peers map[common.Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set
@ -160,6 +161,7 @@ func New(cfg *Config, logger *zap.Logger) *Waku {
}
waku.filters = common.NewFilters()
waku.stats = &common.StatsTracker{}
// p2p waku sub-protocol handler
waku.protocols = []p2p.Protocol{{
@ -193,6 +195,10 @@ func New(cfg *Config, logger *zap.Logger) *Waku {
return waku
}
func (w *Waku) GetStats() types.StatsSummary {
return w.stats.GetStats()
}
// MinPow returns the PoW value required by this node.
func (w *Waku) MinPow() float64 {
w.settingsMu.RLock()
@ -1081,11 +1087,11 @@ func (w *Waku) Stop() error {
}
func (w *Waku) handlePeerV0(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
return w.HandlePeer(v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv0")), rw)
return w.HandlePeer(v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv0"), w.stats), rw)
}
func (w *Waku) handlePeerV1(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error {
return w.HandlePeer(v1.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv1")), rw)
return w.HandlePeer(v1.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv1"), w.stats), rw)
}
// HandlePeer is called by the underlying P2P layer when the waku sub-protocol

View File

@ -1087,8 +1087,9 @@ func TestOnNewEnvelopesSoftBlacklist(t *testing.T) {
w1 := New(nil, nil)
envelope := &common.Envelope{}
stats := &common.StatsTracker{}
p2pPeer := p2p.NewPeer(enode.ID{0x4}, "test", []p2p.Cap{})
peer := v1.NewPeer(w1, p2pPeer, nil, nil)
peer := v1.NewPeer(w1, p2pPeer, nil, nil, stats)
// Pre-condition, we need to make sure this envelope returns an EnvelopeError
envelopeError, err := w1.OnNewEnvelopes([]*common.Envelope{envelope}, peer)

View File

@ -56,12 +56,14 @@ func TestWakuV1(t *testing.T) {
type WakuTestSuite struct {
suite.Suite
seed int64
newPeer func(common.WakuHost, *p2p.Peer, p2p.MsgReadWriter, *zap.Logger) common.Peer
stats *common.StatsTracker
newPeer func(common.WakuHost, *p2p.Peer, p2p.MsgReadWriter, *zap.Logger, *common.StatsTracker) common.Peer
}
// Set up random seed
func (s *WakuTestSuite) SetupTest() {
s.seed = time.Now().Unix()
s.stats = &common.StatsTracker{}
mrand.Seed(s.seed)
}
@ -96,7 +98,7 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() {
rw1, rw2 := p2p.MsgPipe()
go func() {
s.Require().Error(w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1))
s.Require().Error(w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil, s.stats), rw1))
}()
timer := time.AfterFunc(time.Second*5, func() {
@ -104,7 +106,7 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() {
handleError(s.T(), rw2.Close())
})
peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil)
peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil, s.stats)
peer1.SetPeerTrusted(true)
err = peer1.Start()
@ -137,14 +139,14 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) {
handleError(s.T(), rw2.Close())
})
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil)
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil, s.stats)
go func() {
// This will always fail eventually as we close the channels
s.Require().Error(w1.HandlePeer(p1, rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats)
err := p2.Start()
s.Require().NoError(err)
peers := w1.getPeers()
@ -181,8 +183,8 @@ func (s *WakuTestSuite) TestMessagesResponseWithError() {
s.T().Errorf("error closing MsgPipe 2, '%s'", err)
}
}()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil)
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil, s.stats)
errorc := make(chan error, 1)
go func() { errorc <- w1.HandlePeer(p1, rw2) }()
@ -244,8 +246,8 @@ func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelop
rw1, rw2 := p2p.MsgPipe()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil)
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil, s.stats)
errorc := make(chan error, 1)
go func() { errorc <- w1.HandlePeer(p1, rw2) }()
@ -342,14 +344,14 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() {
defer sub.Unsubscribe()
rw1, rw2 := p2p.MsgPipe()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats)
go func() { handleError(s.T(), w1.HandlePeer(p1, rw2)) }()
timer := time.AfterFunc(5*time.Second, func() {
handleError(s.T(), rw1.Close())
})
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil)
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil, s.stats)
s.Require().NoError(peer2.Start())
go func() { handleError(s.T(), peer2.Run()) }()
@ -404,8 +406,8 @@ func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() {
}
}()
w1, w2 := New(c, nil), New(c, nil)
p1 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil)
p2 := s.newPeer(w1, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 1}}), rw2, nil)
p1 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil, s.stats)
p2 := s.newPeer(w1, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 1}}), rw2, nil, s.stats)
errc := make(chan error)
go func() { errc <- w1.HandlePeer(p2, rw2) }()
@ -439,7 +441,7 @@ func (s *WakuTestSuite) TestRequestSentEventWithExpiry() {
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}})
rw := discardPipe()
defer func() { handleError(s.T(), rw.Close()) }()
w.peers[s.newPeer(w, p, rw, nil)] = struct{}{}
w.peers[s.newPeer(w, p, rw, nil, s.stats)] = struct{}{}
events := make(chan common.EnvelopeEvent, 1)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
@ -490,14 +492,14 @@ func (s *WakuTestSuite) TestDeprecatedDeliverMail() {
})
rw1, rw2 := p2p.MsgPipe()
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats)
go func() { handleError(s.T(), w1.HandlePeer(p1, rw2)) }()
timer := time.AfterFunc(5*time.Second, func() {
handleError(s.T(), rw1.Close())
})
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil)
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil, s.stats)
s.Require().NoError(peer2.Start())
go func() { handleError(s.T(), peer2.Run()) }()
@ -542,7 +544,7 @@ func (s *WakuTestSuite) TestSendMessagesRequest() {
p := p2p.NewPeer(enode.ID{0x01}, "peer01", nil)
rw1, rw2 := p2p.MsgPipe()
w := New(nil, nil)
w.peers[s.newPeer(w, p, rw1, nil)] = struct{}{}
w.peers[s.newPeer(w, p, rw1, nil, s.stats)] = struct{}{}
go func() {
// Read out so that it's consumed
@ -573,7 +575,7 @@ func (s *WakuTestSuite) TestRateLimiterIntegration() {
s.T().Errorf("error closing MsgPipe, '%s'", err)
}
}()
p := s.newPeer(w, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil)
p := s.newPeer(w, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats)
errorc := make(chan error, 1)
go func() { errorc <- w.HandlePeer(p, rw2) }()
@ -595,7 +597,7 @@ func (s *WakuTestSuite) TestMailserverCompletionEvent() {
rw1, rw2 := p2p.MsgPipe()
errorc := make(chan error, 1)
go func() {
err := w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "1", []p2p.Cap{}), rw1, nil), rw1)
err := w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "1", []p2p.Cap{}), rw1, nil, s.stats), rw1)
errorc <- err
}()
@ -603,7 +605,7 @@ func (s *WakuTestSuite) TestMailserverCompletionEvent() {
s.Require().NoError(w2.Start())
defer func() { handleError(s.T(), w2.Stop()) }()
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil)
peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil, s.stats)
peer2.SetPeerTrusted(true)
events := make(chan common.EnvelopeEvent)
@ -656,10 +658,10 @@ func (s *WakuTestSuite) TestPeerHandshakeWithTwoFullNode() {
w2 := New(nil, nil)
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats)
err = p2.Start()
s.Require().NoError(err)
@ -678,10 +680,10 @@ func (s *WakuTestSuite) TestHandshakeWithOldVersionWithoutLightModeFlag() {
w2 := New(nil, nil)
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats)
err := p2.Start()
s.Require().NoError(err)
}
@ -701,10 +703,10 @@ func (s *WakuTestSuite) TestTwoLightPeerHandshakeRestrictionOff() {
w2.settings.RestrictLightClientsConn = false
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats)
s.Require().NoError(p2.Start())
}
@ -723,10 +725,10 @@ func (s *WakuTestSuite) TestTwoLightPeerHandshakeError() {
w2.settings.RestrictLightClientsConn = true
go func() {
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1))
handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1))
}()
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil)
p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats)
s.Require().Error(p2.Start())
}

118
wakuv2/common/stats.go Normal file
View File

@ -0,0 +1,118 @@
package common
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/types"
)
type Measure struct {
Timestamp int64
Size uint64
}
type StatsTracker struct {
Uploads []Measure
Downloads []Measure
statsMutex sync.Mutex
}
const measurementPeriod = 15 * time.Second
func measure(input interface{}) (*Measure, error) {
b, err := rlp.EncodeToBytes(input)
if err != nil {
return nil, err
}
return &Measure{
Timestamp: time.Now().UnixNano(),
Size: uint64(len(b)),
}, nil
}
func (s *StatsTracker) AddUpload(input interface{}) {
go func(input interface{}) {
m, err := measure(input)
if err != nil {
return
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Uploads = append(s.Uploads, *m)
}(input)
}
func (s *StatsTracker) AddDownload(input interface{}) {
go func(input interface{}) {
m, err := measure(input)
if err != nil {
return
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Downloads = append(s.Downloads, *m)
}(input)
}
func (s *StatsTracker) AddUploadBytes(size uint64) {
go func(size uint64) {
m := Measure{
Timestamp: time.Now().UnixNano(),
Size: size,
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Uploads = append(s.Uploads, m)
}(size)
}
func (s *StatsTracker) AddDownloadBytes(size uint64) {
go func(size uint64) {
m := Measure{
Timestamp: time.Now().UnixNano(),
Size: size,
}
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
s.Downloads = append(s.Downloads, m)
}(size)
}
func calculateAverage(measures []Measure, minTime int64) (validMeasures []Measure, rate uint64) {
for _, m := range measures {
if m.Timestamp > minTime {
// Only use recent measures
validMeasures = append(validMeasures, m)
rate += m.Size
}
}
rate /= (uint64(measurementPeriod) / uint64(1*time.Second))
return
}
func (s *StatsTracker) GetRatePerSecond() (uploadRate uint64, downloadRate uint64) {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()
minTime := time.Now().Add(-measurementPeriod).UnixNano()
s.Uploads, uploadRate = calculateAverage(s.Uploads, minTime)
s.Downloads, downloadRate = calculateAverage(s.Downloads, minTime)
return
}
func (s *StatsTracker) GetStats() types.StatsSummary {
uploadRate, downloadRate := s.GetRatePerSecond()
summary := types.StatsSummary{
UploadRate: uploadRate,
DownloadRate: downloadRate,
}
return summary
}

View File

@ -78,6 +78,8 @@ type Waku struct {
expirations map[uint32]mapset.Set // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools
stats *common.StatsTracker
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
quit chan struct{} // Channel used for graceful exit
@ -120,6 +122,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
}
waku.filters = common.NewFilters()
waku.stats = &common.StatsTracker{}
var privateKey *ecdsa.PrivateKey
var err error
@ -176,6 +179,10 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) {
return waku, nil
}
func (w *Waku) GetStats() types.StatsSummary {
return w.stats.GetStats()
}
func (w *Waku) runMsgLoop() {
sub, err := w.node.Subscribe(nil)
if err != nil {