From facad9f07e2674dc73fe6c83a4a78f2ea0254735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rich=CE=9Brd?= Date: Tue, 3 Aug 2021 15:27:15 -0400 Subject: [PATCH] feat: upload/download rate for waku v1 messages (#2286) * feat: upload/download rate for waku v1 messages * reorganize code * fix failing test --- eth-node/bridge/geth/waku.go | 4 ++ eth-node/bridge/geth/wakuv2.go | 4 ++ eth-node/types/stats.go | 6 ++ eth-node/types/waku.go | 2 + protocol/messenger.go | 4 ++ protocol/transport/transport.go | 4 ++ services/ext/service.go | 16 +++++ services/ext/signal.go | 4 ++ signal/events_stats.go | 11 +++ waku/common/protocol.go | 2 + waku/common/stats.go | 118 ++++++++++++++++++++++++++++++++ waku/v0/peer.go | 11 +-- waku/v0/peer_test.go | 6 +- waku/v1/peer.go | 108 ++++++++++++++++++++++------- waku/v1/peer_test.go | 6 +- waku/waku.go | 10 ++- waku/waku_test.go | 3 +- waku/waku_version_test.go | 58 ++++++++-------- wakuv2/common/stats.go | 118 ++++++++++++++++++++++++++++++++ wakuv2/waku.go | 7 ++ 20 files changed, 439 insertions(+), 63 deletions(-) create mode 100644 eth-node/types/stats.go create mode 100644 signal/events_stats.go create mode 100644 waku/common/stats.go create mode 100644 wakuv2/common/stats.go diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index dd3c6538b..c607c1984 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -141,6 +141,10 @@ func (w *gethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, er return id, nil } +func (w *gethWakuWrapper) GetStats() types.StatsSummary { + return w.waku.GetStats() +} + func (w *gethWakuWrapper) GetFilter(id string) types.Filter { return NewWakuFilterWrapper(w.waku.GetFilter(id), id) } diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 5dc90ea60..e4d5b4220 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -145,6 +145,10 @@ func (w *gethWakuV2Wrapper) Subscribe(opts *types.SubscriptionOptions) (string, return id, nil } +func (w *gethWakuV2Wrapper) GetStats() types.StatsSummary { + return w.waku.GetStats() +} + func (w *gethWakuV2Wrapper) GetFilter(id string) types.Filter { return NewWakuV2FilterWrapper(w.waku.GetFilter(id), id) } diff --git a/eth-node/types/stats.go b/eth-node/types/stats.go new file mode 100644 index 000000000..c10a67663 --- /dev/null +++ b/eth-node/types/stats.go @@ -0,0 +1,6 @@ +package types + +type StatsSummary struct { + UploadRate uint64 `json:"uploadRate"` + DownloadRate uint64 `json:"downloadRate"` +} diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index 1bd3cf626..d010100fc 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -40,6 +40,8 @@ type Waku interface { GetSymKey(id string) ([]byte, error) MaxMessageSize() uint32 + GetStats() StatsSummary + Subscribe(opts *SubscriptionOptions) (string, error) GetFilter(id string) Filter Unsubscribe(id string) error diff --git a/protocol/messenger.go b/protocol/messenger.go index c9d77c769..ce3372a3d 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -2291,6 +2291,10 @@ func (m *Messenger) RetrieveAll() (*MessengerResponse, error) { return m.handleRetrievedMessages(chatWithMessages) } +func (m *Messenger) GetStats() types.StatsSummary { + return m.transport.GetStats() +} + type CurrentMessageState struct { // Message is the protobuf message received Message protobuf.ChatMessage diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index d6467dc24..188944021 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -209,6 +209,10 @@ func (t *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) ([]*Filter, error) return filters, nil } +func (t *Transport) GetStats() types.StatsSummary { + return t.waku.GetStats() +} + func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { result := make(map[Filter][]*types.Message) diff --git a/services/ext/service.go b/services/ext/service.go index 0e427d2c2..d516e4fa7 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -160,6 +160,7 @@ func (s *Service) StartMessenger() (*protocol.MessengerResponse, error) { } go s.retrieveMessagesLoop(time.Second, s.cancelMessenger) go s.verifyTransactionLoop(30*time.Second, s.cancelMessenger) + go s.retrieveStats(5*time.Second, s.cancelMessenger) return response, nil } @@ -192,6 +193,21 @@ func (s *Service) retrieveMessagesLoop(tick time.Duration, cancel <-chan struct{ } } +func (s *Service) retrieveStats(tick time.Duration, cancel <-chan struct{}) { + ticker := time.NewTicker(tick) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + response := s.messenger.GetStats() + PublisherSignalHandler{}.Stats(response) + case <-cancel: + return + } + } +} + type verifyTransactionClient struct { chainID *big.Int url string diff --git a/services/ext/signal.go b/services/ext/signal.go index 0f69f79db..9d6234773 100644 --- a/services/ext/signal.go +++ b/services/ext/signal.go @@ -45,6 +45,10 @@ func (h PublisherSignalHandler) NewMessages(response *protocol.MessengerResponse signal.SendNewMessages(response) } +func (h PublisherSignalHandler) Stats(stats types.StatsSummary) { + signal.SendStats(stats) +} + // MessengerSignalHandler sends signals on messenger events type MessengerSignalsHandler struct{} diff --git a/signal/events_stats.go b/signal/events_stats.go new file mode 100644 index 000000000..f7277c745 --- /dev/null +++ b/signal/events_stats.go @@ -0,0 +1,11 @@ +package signal + +const ( + // EventsStats is sent periodically with stats like upload/download rate + EventStats = "stats" +) + +// SendStats sends stats signal. +func SendStats(stats interface{}) { + send(EventStats, stats) +} diff --git a/waku/common/protocol.go b/waku/common/protocol.go index dd7b25434..b64c3f992 100644 --- a/waku/common/protocol.go +++ b/waku/common/protocol.go @@ -37,6 +37,8 @@ type Peer interface { SendP2PMessages([]*Envelope) error SendRawP2PDirect([]rlp.RawValue) error + SendBundle(bundle []*Envelope) (rst common.Hash, err error) + // Mark marks an envelope known to the peer so that it won't be sent back. Mark(*Envelope) // Marked checks if an envelope is already known to the remote peer. diff --git a/waku/common/stats.go b/waku/common/stats.go new file mode 100644 index 000000000..c42c2c571 --- /dev/null +++ b/waku/common/stats.go @@ -0,0 +1,118 @@ +package common + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/status-im/status-go/eth-node/types" +) + +type Measure struct { + Timestamp int64 + Size uint64 +} + +type StatsTracker struct { + Uploads []Measure + Downloads []Measure + + statsMutex sync.Mutex +} + +const measurementPeriod = 15 * time.Second + +func measure(input interface{}) (*Measure, error) { + b, err := rlp.EncodeToBytes(input) + if err != nil { + return nil, err + } + return &Measure{ + Timestamp: time.Now().UnixNano(), + Size: uint64(len(b)), + }, nil + +} + +func (s *StatsTracker) AddUpload(input interface{}) { + go func(input interface{}) { + m, err := measure(input) + if err != nil { + return + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Uploads = append(s.Uploads, *m) + }(input) +} + +func (s *StatsTracker) AddDownload(input interface{}) { + go func(input interface{}) { + m, err := measure(input) + if err != nil { + return + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Downloads = append(s.Downloads, *m) + }(input) +} + +func (s *StatsTracker) AddUploadBytes(size uint64) { + go func(size uint64) { + m := Measure{ + Timestamp: time.Now().UnixNano(), + Size: size, + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Uploads = append(s.Uploads, m) + }(size) +} + +func (s *StatsTracker) AddDownloadBytes(size uint64) { + go func(size uint64) { + m := Measure{ + Timestamp: time.Now().UnixNano(), + Size: size, + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Downloads = append(s.Downloads, m) + }(size) +} + +func calculateAverage(measures []Measure, minTime int64) (validMeasures []Measure, rate uint64) { + for _, m := range measures { + if m.Timestamp > minTime { + // Only use recent measures + validMeasures = append(validMeasures, m) + rate += m.Size + } + } + + rate /= (uint64(measurementPeriod) / uint64(1*time.Second)) + return +} + +func (s *StatsTracker) GetRatePerSecond() (uploadRate uint64, downloadRate uint64) { + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + minTime := time.Now().Add(-measurementPeriod).UnixNano() + s.Uploads, uploadRate = calculateAverage(s.Uploads, minTime) + s.Downloads, downloadRate = calculateAverage(s.Downloads, minTime) + return +} + +func (s *StatsTracker) GetStats() types.StatsSummary { + uploadRate, downloadRate := s.GetRatePerSecond() + summary := types.StatsSummary{ + UploadRate: uploadRate, + DownloadRate: downloadRate, + } + return summary +} diff --git a/waku/v0/peer.go b/waku/v0/peer.go index a8932ab29..6ba6f2e11 100644 --- a/waku/v0/peer.go +++ b/waku/v0/peer.go @@ -54,10 +54,12 @@ type Peer struct { rateLimitsMu sync.Mutex rateLimits common.RateLimits + stats *common.StatsTracker + known mapset.Set // Messages already known by the peer to avoid wasting bandwidth } -func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) common.Peer { +func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger, stats *common.StatsTracker) common.Peer { if logger == nil { logger = zap.NewNop() } @@ -73,6 +75,7 @@ func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logg quit: make(chan struct{}), bloomFilter: common.MakeFullNodeBloom(), fullNode: true, + stats: stats, } } @@ -542,7 +545,7 @@ func (p *Peer) broadcast() error { return nil } - batchHash, err := sendBundle(p.rw, bundle) + batchHash, err := p.SendBundle(bundle) if err != nil { p.logger.Debug("failed to deliver envelopes", zap.String("peerID", types.EncodeHex(p.ID())), zap.Error(err)) return err @@ -565,12 +568,12 @@ func (p *Peer) broadcast() error { return nil } -func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Hash, err error) { +func (p *Peer) SendBundle(bundle []*common.Envelope) (rst gethcommon.Hash, err error) { data, err := rlp.EncodeToBytes(bundle) if err != nil { return } - err = rw.WriteMsg(p2p.Msg{ + err = p.rw.WriteMsg(p2p.Msg{ Code: messagesCode, Size: uint32(len(data)), Payload: bytes.NewBuffer(data), diff --git a/waku/v0/peer_test.go b/waku/v0/peer_test.go index 7488dcebb..c3e68cfc2 100644 --- a/waku/v0/peer_test.go +++ b/waku/v0/peer_test.go @@ -94,7 +94,7 @@ func TestPeerBasic(t *testing.T) { t.Fatalf("failed Wrap with seed %d.", seed) } - p := NewPeer(nil, nil, nil, nil) + p := NewPeer(nil, nil, nil, nil, nil) p.Mark(env) if !p.Marked(env) { t.Fatalf("failed mark with seed %d.", seed) @@ -114,7 +114,9 @@ func TestSendBundle(t *testing.T) { errc := make(chan error) go func() { - _, err := sendBundle(rw1, envelopes) + stats := &common.StatsTracker{} + p := NewPeer(nil, nil, rw1, nil, stats) + _, err := p.SendBundle(envelopes) errc <- err }() require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes)) diff --git a/waku/v1/peer.go b/waku/v1/peer.go index 99a64b9e7..068b36738 100644 --- a/waku/v1/peer.go +++ b/waku/v1/peer.go @@ -54,10 +54,12 @@ type Peer struct { bytesRateLimitsMu sync.Mutex bytesRateLimits common.RateLimits + stats *common.StatsTracker + known mapset.Set // Messages already known by the peer to avoid wasting bandwidth } -func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger) common.Peer { +func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logger *zap.Logger, stats *common.StatsTracker) common.Peer { if logger == nil { logger = zap.NewNop() } @@ -73,6 +75,7 @@ func NewPeer(host common.WakuHost, p2pPeer *p2p.Peer, rw p2p.MsgReadWriter, logg quit: make(chan struct{}), bloomFilter: common.MakeFullNodeBloom(), fullNode: true, + stats: stats, } } @@ -90,47 +93,81 @@ func (p *Peer) Stop() { p.logger.Debug("stopping peer", zap.String("peerID", types.EncodeHex(p.ID()))) } -func (p *Peer) NotifyAboutPowRequirementChange(pow float64) error { +func (p *Peer) NotifyAboutPowRequirementChange(pow float64) (err error) { i := math.Float64bits(pow) - return p2p.Send(p.rw, statusUpdateCode, StatusOptions{PoWRequirement: &i}) + data := StatusOptions{PoWRequirement: &i} + err = p2p.Send(p.rw, statusUpdateCode, data) + if err != nil { + p.stats.AddUpload(data) + } + return } -func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) error { - return p2p.Send(p.rw, statusUpdateCode, StatusOptions{BloomFilter: bloom}) +func (p *Peer) NotifyAboutBloomFilterChange(bloom []byte) (err error) { + data := StatusOptions{BloomFilter: bloom} + err = p2p.Send(p.rw, statusUpdateCode, data) + if err != nil { + p.stats.AddUpload(data) + } + return } -func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) error { - return p2p.Send(p.rw, statusUpdateCode, StatusOptions{TopicInterest: topics}) +func (p *Peer) NotifyAboutTopicInterestChange(topics []common.TopicType) (err error) { + data := StatusOptions{TopicInterest: topics} + err = p2p.Send(p.rw, statusUpdateCode, data) + if err != nil { + p.stats.AddUpload(data) + } + return } func (p *Peer) SetPeerTrusted(trusted bool) { p.trusted = trusted } -func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) error { - return p2p.Send(p.rw, p2pRequestCode, envelope) +func (p *Peer) RequestHistoricMessages(envelope *common.Envelope) (err error) { + err = p2p.Send(p.rw, p2pRequestCode, envelope) + if err != nil { + p.stats.AddUpload(envelope) + } + return } -func (p *Peer) SendMessagesRequest(request common.MessagesRequest) error { - return p2p.Send(p.rw, p2pRequestCode, request) +func (p *Peer) SendMessagesRequest(request common.MessagesRequest) (err error) { + err = p2p.Send(p.rw, p2pRequestCode, request) + if err != nil { + p.stats.AddUpload(request) + } + return } -func (p *Peer) SendHistoricMessageResponse(payload []byte) error { +func (p *Peer) SendHistoricMessageResponse(payload []byte) (err error) { size, r, err := rlp.EncodeToReader(payload) if err != nil { return err } - return p.rw.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) - + err = p.rw.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) + if err != nil { + p.stats.AddUpload(payload) + } + return } -func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) error { - return p2p.Send(p.rw, p2pMessageCode, envelopes) +func (p *Peer) SendP2PMessages(envelopes []*common.Envelope) (err error) { + err = p2p.Send(p.rw, p2pMessageCode, envelopes) + if err != nil { + p.stats.AddUpload(envelopes) + } + return } -func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) error { - return p2p.Send(p.rw, p2pMessageCode, envelopes) +func (p *Peer) SendRawP2PDirect(envelopes []rlp.RawValue) (err error) { + err = p2p.Send(p.rw, p2pMessageCode, envelopes) + if err != nil { + p.stats.AddUpload(envelopes) + } + return } func (p *Peer) SetRWWriter(rw p2p.MsgReadWriter) { @@ -189,6 +226,8 @@ func (p *Peer) Run() error { return err } + p.stats.AddDownloadBytes(uint64(packet.Size)) + if packet.Size > p.host.MaxMessageSize() { logger.Warn("oversize message received", zap.String("peerID", types.EncodeHex(p.ID())), zap.Uint32("size", packet.Size)) return errors.New("oversize message received") @@ -384,11 +423,18 @@ func (p *Peer) handleP2PRequestCompleteCode(packet p2p.Msg) error { // sendConfirmation sends messageResponseCode and batchAcknowledgedCode messages. func (p *Peer) sendConfirmation(data []byte, envelopeErrors []common.EnvelopeError) (err error) { batchHash := crypto.Keccak256Hash(data) - err = p2p.Send(p.rw, messageResponseCode, NewMessagesResponse(batchHash, envelopeErrors)) + msg := NewMessagesResponse(batchHash, envelopeErrors) + err = p2p.Send(p.rw, messageResponseCode, msg) if err != nil { + p.stats.AddUpload(msg) return } + err = p2p.Send(p.rw, batchAcknowledgedCode, batchHash) // DEPRECATED + if err != nil { + p.stats.AddUpload(batchHash) + } + return } @@ -399,7 +445,11 @@ func (p *Peer) handshake() error { errc := make(chan error, 1) opts := StatusOptionsFromHost(p.host) go func() { - errc <- p2p.Send(p.rw, statusCode, opts) + err := p2p.Send(p.rw, statusCode, opts) + if err != nil { + p.stats.AddUpload(statusCode) + } + errc <- err }() // Fetch the remote status packet and verify protocol match @@ -407,6 +457,9 @@ func (p *Peer) handshake() error { if err != nil { return err } + + p.stats.AddDownloadBytes(uint64(packet.Size)) + if packet.Code != statusCode { return fmt.Errorf("p [%x] sent packet %x before status packet", p.ID(), packet.Code) } @@ -418,6 +471,7 @@ func (p *Peer) handshake() error { if err := s.Decode(&peerOptions); err != nil { return fmt.Errorf("p [%x]: failed to decode status options: %v", p.ID(), err) } + if err := p.setOptions(peerOptions.WithDefaults()); err != nil { return fmt.Errorf("p [%x]: failed to set options: %v", p.ID(), err) } @@ -532,7 +586,7 @@ func (p *Peer) broadcast() error { return nil } - batchHash, err := sendBundle(p.rw, bundle) + batchHash, err := p.SendBundle(bundle) if err != nil { p.logger.Debug("failed to deliver envelopes", zap.String("peerID", types.EncodeHex(p.ID())), zap.Error(err)) return err @@ -555,19 +609,25 @@ func (p *Peer) broadcast() error { return nil } -func sendBundle(rw p2p.MsgWriter, bundle []*common.Envelope) (rst gethcommon.Hash, err error) { +func (p *Peer) SendBundle(bundle []*common.Envelope) (rst gethcommon.Hash, err error) { data, err := rlp.EncodeToBytes(bundle) if err != nil { return } - err = rw.WriteMsg(p2p.Msg{ + + msg := p2p.Msg{ Code: messagesCode, Size: uint32(len(data)), Payload: bytes.NewBuffer(data), - }) + } + + err = p.rw.WriteMsg(msg) if err != nil { return } + + p.stats.AddUpload(bundle) + return crypto.Keccak256Hash(data), nil } diff --git a/waku/v1/peer_test.go b/waku/v1/peer_test.go index 99f0fe554..0a9f8c74b 100644 --- a/waku/v1/peer_test.go +++ b/waku/v1/peer_test.go @@ -94,7 +94,7 @@ func TestPeerBasic(t *testing.T) { t.Fatalf("failed Wrap with seed %d.", seed) } - p := NewPeer(nil, nil, nil, nil) + p := NewPeer(nil, nil, nil, nil, nil) p.Mark(env) if !p.Marked(env) { t.Fatalf("failed mark with seed %d.", seed) @@ -114,7 +114,9 @@ func TestSendBundle(t *testing.T) { errc := make(chan error) go func() { - _, err := sendBundle(rw1, envelopes) + stats := &common.StatsTracker{} + p := NewPeer(nil, nil, rw1, nil, stats) + _, err := p.SendBundle(envelopes) errc <- err }() require.NoError(t, p2p.ExpectMsg(rw2, messagesCode, envelopes)) diff --git a/waku/waku.go b/waku/waku.go index e66724b3f..74c6d0979 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -87,6 +87,7 @@ type Waku struct { expirations map[uint32]mapset.Set // Message expiration pool poolMu sync.RWMutex // Mutex to sync the message and expiration pools + stats *common.StatsTracker peers map[common.Peer]struct{} // Set of currently active peers peerMu sync.RWMutex // Mutex to sync the active peer set @@ -160,6 +161,7 @@ func New(cfg *Config, logger *zap.Logger) *Waku { } waku.filters = common.NewFilters() + waku.stats = &common.StatsTracker{} // p2p waku sub-protocol handler waku.protocols = []p2p.Protocol{{ @@ -193,6 +195,10 @@ func New(cfg *Config, logger *zap.Logger) *Waku { return waku } +func (w *Waku) GetStats() types.StatsSummary { + return w.stats.GetStats() +} + // MinPow returns the PoW value required by this node. func (w *Waku) MinPow() float64 { w.settingsMu.RLock() @@ -1081,11 +1087,11 @@ func (w *Waku) Stop() error { } func (w *Waku) handlePeerV0(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error { - return w.HandlePeer(v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv0")), rw) + return w.HandlePeer(v0.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv0"), w.stats), rw) } func (w *Waku) handlePeerV1(p2pPeer *p2p.Peer, rw p2p.MsgReadWriter) error { - return w.HandlePeer(v1.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv1")), rw) + return w.HandlePeer(v1.NewPeer(w, p2pPeer, rw, w.logger.Named("waku/peerv1"), w.stats), rw) } // HandlePeer is called by the underlying P2P layer when the waku sub-protocol diff --git a/waku/waku_test.go b/waku/waku_test.go index 7b1a2dc90..bf7d5d9a8 100644 --- a/waku/waku_test.go +++ b/waku/waku_test.go @@ -1087,8 +1087,9 @@ func TestOnNewEnvelopesSoftBlacklist(t *testing.T) { w1 := New(nil, nil) envelope := &common.Envelope{} + stats := &common.StatsTracker{} p2pPeer := p2p.NewPeer(enode.ID{0x4}, "test", []p2p.Cap{}) - peer := v1.NewPeer(w1, p2pPeer, nil, nil) + peer := v1.NewPeer(w1, p2pPeer, nil, nil, stats) // Pre-condition, we need to make sure this envelope returns an EnvelopeError envelopeError, err := w1.OnNewEnvelopes([]*common.Envelope{envelope}, peer) diff --git a/waku/waku_version_test.go b/waku/waku_version_test.go index 98bac7a82..28e4c456b 100644 --- a/waku/waku_version_test.go +++ b/waku/waku_version_test.go @@ -56,12 +56,14 @@ func TestWakuV1(t *testing.T) { type WakuTestSuite struct { suite.Suite seed int64 - newPeer func(common.WakuHost, *p2p.Peer, p2p.MsgReadWriter, *zap.Logger) common.Peer + stats *common.StatsTracker + newPeer func(common.WakuHost, *p2p.Peer, p2p.MsgReadWriter, *zap.Logger, *common.StatsTracker) common.Peer } // Set up random seed func (s *WakuTestSuite) SetupTest() { s.seed = time.Now().Unix() + s.stats = &common.StatsTracker{} mrand.Seed(s.seed) } @@ -96,7 +98,7 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() { rw1, rw2 := p2p.MsgPipe() go func() { - s.Require().Error(w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil), rw1)) + s.Require().Error(w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw1, nil, s.stats), rw1)) }() timer := time.AfterFunc(time.Second*5, func() { @@ -104,7 +106,7 @@ func (s *WakuTestSuite) TestHandleP2PMessageCode() { handleError(s.T(), rw2.Close()) }) - peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil) + peer1 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), rw2, nil, s.stats) peer1.SetPeerTrusted(true) err = peer1.Start() @@ -137,14 +139,14 @@ func (s *WakuTestSuite) testConfirmationsHandshake(expectConfirmations bool) { handleError(s.T(), rw2.Close()) }) - p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil) + p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil, s.stats) go func() { // This will always fail eventually as we close the channels s.Require().Error(w1.HandlePeer(p1, rw1)) }() - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats) err := p2.Start() s.Require().NoError(err) peers := w1.getPeers() @@ -181,8 +183,8 @@ func (s *WakuTestSuite) TestMessagesResponseWithError() { s.T().Errorf("error closing MsgPipe 2, '%s'", err) } }() - p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil) - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil) + p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil, s.stats) errorc := make(chan error, 1) go func() { errorc <- w1.HandlePeer(p1, rw2) }() @@ -244,8 +246,8 @@ func (s *WakuTestSuite) testConfirmationEvents(envelope common.Envelope, envelop rw1, rw2 := p2p.MsgPipe() - p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil) - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil) + p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 0}}), rw1, nil, s.stats) errorc := make(chan error, 1) go func() { errorc <- w1.HandlePeer(p1, rw2) }() @@ -342,14 +344,14 @@ func (s *WakuTestSuite) TestEventsWithoutConfirmation() { defer sub.Unsubscribe() rw1, rw2 := p2p.MsgPipe() - p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil) + p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats) go func() { handleError(s.T(), w1.HandlePeer(p1, rw2)) }() timer := time.AfterFunc(5*time.Second, func() { handleError(s.T(), rw1.Close()) }) - peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil) + peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil, s.stats) s.Require().NoError(peer2.Start()) go func() { handleError(s.T(), peer2.Run()) }() @@ -404,8 +406,8 @@ func (s *WakuTestSuite) TestWakuTimeDesyncEnvelopeIgnored() { } }() w1, w2 := New(c, nil), New(c, nil) - p1 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil) - p2 := s.newPeer(w1, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 1}}), rw2, nil) + p1 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}), rw1, nil, s.stats) + p2 := s.newPeer(w1, p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"waku", 1}}), rw2, nil, s.stats) errc := make(chan error) go func() { errc <- w1.HandlePeer(p2, rw2) }() @@ -439,7 +441,7 @@ func (s *WakuTestSuite) TestRequestSentEventWithExpiry() { p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 1}}) rw := discardPipe() defer func() { handleError(s.T(), rw.Close()) }() - w.peers[s.newPeer(w, p, rw, nil)] = struct{}{} + w.peers[s.newPeer(w, p, rw, nil, s.stats)] = struct{}{} events := make(chan common.EnvelopeEvent, 1) sub := w.SubscribeEnvelopeEvents(events) defer sub.Unsubscribe() @@ -490,14 +492,14 @@ func (s *WakuTestSuite) TestDeprecatedDeliverMail() { }) rw1, rw2 := p2p.MsgPipe() - p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil) + p1 := s.newPeer(w1, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats) go func() { handleError(s.T(), w1.HandlePeer(p1, rw2)) }() timer := time.AfterFunc(5*time.Second, func() { handleError(s.T(), rw1.Close()) }) - peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil) + peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw1, nil, s.stats) s.Require().NoError(peer2.Start()) go func() { handleError(s.T(), peer2.Run()) }() @@ -542,7 +544,7 @@ func (s *WakuTestSuite) TestSendMessagesRequest() { p := p2p.NewPeer(enode.ID{0x01}, "peer01", nil) rw1, rw2 := p2p.MsgPipe() w := New(nil, nil) - w.peers[s.newPeer(w, p, rw1, nil)] = struct{}{} + w.peers[s.newPeer(w, p, rw1, nil, s.stats)] = struct{}{} go func() { // Read out so that it's consumed @@ -573,7 +575,7 @@ func (s *WakuTestSuite) TestRateLimiterIntegration() { s.T().Errorf("error closing MsgPipe, '%s'", err) } }() - p := s.newPeer(w, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil) + p := s.newPeer(w, p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"waku", 0}}), rw2, nil, s.stats) errorc := make(chan error, 1) go func() { errorc <- w.HandlePeer(p, rw2) }() @@ -595,7 +597,7 @@ func (s *WakuTestSuite) TestMailserverCompletionEvent() { rw1, rw2 := p2p.MsgPipe() errorc := make(chan error, 1) go func() { - err := w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "1", []p2p.Cap{}), rw1, nil), rw1) + err := w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "1", []p2p.Cap{}), rw1, nil, s.stats), rw1) errorc <- err }() @@ -603,7 +605,7 @@ func (s *WakuTestSuite) TestMailserverCompletionEvent() { s.Require().NoError(w2.Start()) defer func() { handleError(s.T(), w2.Stop()) }() - peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil) + peer2 := s.newPeer(w2, p2p.NewPeer(enode.ID{1}, "1", nil), rw2, nil, s.stats) peer2.SetPeerTrusted(true) events := make(chan common.EnvelopeEvent) @@ -656,10 +658,10 @@ func (s *WakuTestSuite) TestPeerHandshakeWithTwoFullNode() { w2 := New(nil, nil) go func() { - handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1)) + handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1)) }() - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats) err = p2.Start() s.Require().NoError(err) @@ -678,10 +680,10 @@ func (s *WakuTestSuite) TestHandshakeWithOldVersionWithoutLightModeFlag() { w2 := New(nil, nil) go func() { - handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1)) + handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1)) }() - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats) err := p2.Start() s.Require().NoError(err) } @@ -701,10 +703,10 @@ func (s *WakuTestSuite) TestTwoLightPeerHandshakeRestrictionOff() { w2.settings.RestrictLightClientsConn = false go func() { - handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1)) + handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1)) }() - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats) s.Require().NoError(p2.Start()) } @@ -723,10 +725,10 @@ func (s *WakuTestSuite) TestTwoLightPeerHandshakeError() { w2.settings.RestrictLightClientsConn = true go func() { - handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil), rw1)) + handleError(s.T(), w1.HandlePeer(s.newPeer(w1, p2p.NewPeer(enode.ID{}, "test-1", []p2p.Cap{}), rw1, nil, s.stats), rw1)) }() - p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil) + p2 := s.newPeer(w2, p2p.NewPeer(enode.ID{}, "test-2", []p2p.Cap{}), rw2, nil, s.stats) s.Require().Error(p2.Start()) } diff --git a/wakuv2/common/stats.go b/wakuv2/common/stats.go new file mode 100644 index 000000000..c42c2c571 --- /dev/null +++ b/wakuv2/common/stats.go @@ -0,0 +1,118 @@ +package common + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/status-im/status-go/eth-node/types" +) + +type Measure struct { + Timestamp int64 + Size uint64 +} + +type StatsTracker struct { + Uploads []Measure + Downloads []Measure + + statsMutex sync.Mutex +} + +const measurementPeriod = 15 * time.Second + +func measure(input interface{}) (*Measure, error) { + b, err := rlp.EncodeToBytes(input) + if err != nil { + return nil, err + } + return &Measure{ + Timestamp: time.Now().UnixNano(), + Size: uint64(len(b)), + }, nil + +} + +func (s *StatsTracker) AddUpload(input interface{}) { + go func(input interface{}) { + m, err := measure(input) + if err != nil { + return + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Uploads = append(s.Uploads, *m) + }(input) +} + +func (s *StatsTracker) AddDownload(input interface{}) { + go func(input interface{}) { + m, err := measure(input) + if err != nil { + return + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Downloads = append(s.Downloads, *m) + }(input) +} + +func (s *StatsTracker) AddUploadBytes(size uint64) { + go func(size uint64) { + m := Measure{ + Timestamp: time.Now().UnixNano(), + Size: size, + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Uploads = append(s.Uploads, m) + }(size) +} + +func (s *StatsTracker) AddDownloadBytes(size uint64) { + go func(size uint64) { + m := Measure{ + Timestamp: time.Now().UnixNano(), + Size: size, + } + + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + s.Downloads = append(s.Downloads, m) + }(size) +} + +func calculateAverage(measures []Measure, minTime int64) (validMeasures []Measure, rate uint64) { + for _, m := range measures { + if m.Timestamp > minTime { + // Only use recent measures + validMeasures = append(validMeasures, m) + rate += m.Size + } + } + + rate /= (uint64(measurementPeriod) / uint64(1*time.Second)) + return +} + +func (s *StatsTracker) GetRatePerSecond() (uploadRate uint64, downloadRate uint64) { + s.statsMutex.Lock() + defer s.statsMutex.Unlock() + minTime := time.Now().Add(-measurementPeriod).UnixNano() + s.Uploads, uploadRate = calculateAverage(s.Uploads, minTime) + s.Downloads, downloadRate = calculateAverage(s.Downloads, minTime) + return +} + +func (s *StatsTracker) GetStats() types.StatsSummary { + uploadRate, downloadRate := s.GetRatePerSecond() + summary := types.StatsSummary{ + UploadRate: uploadRate, + DownloadRate: downloadRate, + } + return summary +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index c8c70dd78..938bb2177 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -78,6 +78,8 @@ type Waku struct { expirations map[uint32]mapset.Set // Message expiration pool poolMu sync.RWMutex // Mutex to sync the message and expiration pools + stats *common.StatsTracker + msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded quit chan struct{} // Channel used for graceful exit @@ -120,6 +122,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { } waku.filters = common.NewFilters() + waku.stats = &common.StatsTracker{} var privateKey *ecdsa.PrivateKey var err error @@ -176,6 +179,10 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger) (*Waku, error) { return waku, nil } +func (w *Waku) GetStats() types.StatsSummary { + return w.stats.GetStats() +} + func (w *Waku) runMsgLoop() { sub, err := w.node.Subscribe(nil) if err != nil {