From 1bbb2537b47ab913f5f4802628d56acc5e0a891d Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 13 Jun 2024 15:31:09 -0700 Subject: [PATCH] feat_: batch all telemetry data and send request every 10 seconds (#5251) * fix_: add status telemetry client to cli * feat_: call telemetry when pushing an envelope * feat_: log status version in all telemetry calls * feat_: batch all telemetry data and send request every 10 seconds --- cmd/status-cli/util.go | 10 +- cmd/statusd/main.go | 1 + protocol/messenger.go | 10 +- protocol/messenger_builder_test.go | 1 + services/ext/service.go | 1 + telemetry/client.go | 209 ++++++++++++++++++++++++----- telemetry/client_test.go | 157 ++++++++++++++++++++++ wakuv2/waku.go | 46 ++++++- 8 files changed, 400 insertions(+), 35 deletions(-) create mode 100644 telemetry/client_test.go diff --git a/cmd/status-cli/util.go b/cmd/status-cli/util.go index 4bc375794..71fd8da79 100644 --- a/cmd/status-cli/util.go +++ b/cmd/status-cli/util.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "os" @@ -12,6 +13,7 @@ import ( "github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/services/wakuv2ext" + "github.com/status-im/status-go/telemetry" "go.uber.org/zap" ) @@ -41,7 +43,7 @@ func start(name string, port int, apiModules string, telemetryUrl string, useExi ) setupLogger(name) nlog := logger.Named(name) - nlog.Info("starting messager") + nlog.Info("starting messenger") backend := api.NewGethStatusBackend() if useExistingAccount { @@ -60,6 +62,12 @@ func start(name string, port int, apiModules string, telemetryUrl string, useExi if wakuService == nil { return nil, errors.New("waku service is not available") } + + if telemetryUrl != "" { + telemetryClient := telemetry.NewClient(nlog.Desugar(), telemetryUrl, backend.SelectedAccountKeyID(), name, "cli") + go telemetryClient.Start(context.Background()) + backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient) + } wakuAPI := wakuv2ext.NewPublicAPI(wakuService) messenger := wakuAPI.Messenger() diff --git a/cmd/statusd/main.go b/cmd/statusd/main.go index ecfd99ee7..d341d8c10 100644 --- a/cmd/statusd/main.go +++ b/cmd/statusd/main.go @@ -280,6 +280,7 @@ func main() { gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()), installationID.String(), nil, + config.Version, options..., ) if err != nil { diff --git a/protocol/messenger.go b/protocol/messenger.go index 6f36a579f..1f0965490 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -299,6 +299,7 @@ func NewMessenger( node types.Node, installationID string, peerStore *mailservers.PeerStore, + version string, opts ...Option, ) (*Messenger, error) { var messenger *Messenger @@ -428,10 +429,11 @@ func NewMessenger( var telemetryClient *telemetry.Client if c.telemetryServerURL != "" { - telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName) + telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version) if c.wakuService != nil { c.wakuService.SetStatusTelemetryClient(telemetryClient) } + go telemetryClient.Start(messenger.ctx) } // Initialize push notification server @@ -3861,7 +3863,11 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte statusMessages := handleMessagesResponse.StatusMessages if m.telemetryClient != nil { - go m.telemetryClient.PushReceivedMessages(filter, shhMessage, statusMessages) + m.telemetryClient.PushReceivedMessages(telemetry.ReceivedMessages{ + Filter: filter, + SSHMessage: shhMessage, + Messages: statusMessages, + }) } err = m.handleDatasyncMetadata(handleMessagesResponse) diff --git a/protocol/messenger_builder_test.go b/protocol/messenger_builder_test.go index d438373bd..7829b9134 100644 --- a/protocol/messenger_builder_test.go +++ b/protocol/messenger_builder_test.go @@ -92,6 +92,7 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger, &testNode{shh: waku}, uuid.New().String(), nil, + "testVersion", options..., ) if err != nil { diff --git a/services/ext/service.go b/services/ext/service.go index 9c6759b1c..fcb364c68 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -174,6 +174,7 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appD s.n, s.config.ShhextConfig.InstallationID, s.peerStore, + params.Version, options..., ) if err != nil { diff --git a/telemetry/client.go b/telemetry/client.go index 14b671897..5b8a2a65c 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -2,6 +2,7 @@ package telemetry import ( "bytes" + "context" "encoding/json" "fmt" "net/http" @@ -11,56 +12,187 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/transport" - v1protocol "github.com/status-im/status-go/protocol/v1" + "github.com/status-im/status-go/wakuv2" + v1protocol "github.com/status-im/status-go/protocol/v1" v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" ) -type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string +type TelemetryType string + +const ( + ProtocolStatsMetric TelemetryType = "ProtocolStats" + ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope" + SentEnvelopeMetric TelemetryType = "SentEnvelope" + UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" + ReceivedMessagesMetric TelemetryType = "ReceivedMessages" +) + +type TelemetryRequest struct { + Id int `json:"id"` + TelemetryType TelemetryType `json:"telemetry_type"` + TelemetryData *json.RawMessage `json:"telemetry_data"` } -func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string) *Client { +func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { + c.receivedMessagesCh <- receivedMessages +} + +func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { + c.sentEnvelopeCh <- sentEnvelope +} + +func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) { + c.receivedEnvelopeCh <- receivedEnvelope +} + +type ReceivedMessages struct { + Filter transport.Filter + SSHMessage *types.Message + Messages []*v1protocol.StatusMessage +} + +type Client struct { + serverURL string + httpClient *http.Client + logger *zap.Logger + keyUID string + nodeName string + version string + receivedMessagesCh chan ReceivedMessages + receivedEnvelopeCh chan *v2protocol.Envelope + sentEnvelopeCh chan wakuv2.SentEnvelope + telemetryCh chan TelemetryRequest + nextId int + sendPeriod time.Duration +} + +func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client { return &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, + serverURL: serverURL, + httpClient: &http.Client{Timeout: time.Minute}, + logger: logger, + keyUID: keyUID, + nodeName: nodeName, + version: version, + receivedMessagesCh: make(chan ReceivedMessages), + receivedEnvelopeCh: make(chan *v2protocol.Envelope), + sentEnvelopeCh: make(chan wakuv2.SentEnvelope), + telemetryCh: make(chan TelemetryRequest), + nextId: 0, + sendPeriod: 10 * time.Second, } } -func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types.Message, messages []*v1protocol.StatusMessage) { +func (c *Client) CollectAndProcessTelemetry(ctx context.Context) { + go func() { + for { + select { + case receivedMessages := <-c.receivedMessagesCh: + c.processAndPushTelemetry(receivedMessages) + case receivedEnvelope := <-c.receivedEnvelopeCh: + c.processAndPushTelemetry(receivedEnvelope) + case sentEnvelope := <-c.sentEnvelopeCh: + c.processAndPushTelemetry(sentEnvelope) + case <-ctx.Done(): + return + } + } + }() +} + +func (c *Client) Start(ctx context.Context) { + go c.CollectAndProcessTelemetry(ctx) + go func() { + ticker := time.NewTicker(c.sendPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + var telemetryRequests []TelemetryRequest + collecting := true + for collecting { + select { + case telemetryRequest := <-c.telemetryCh: + telemetryRequests = append(telemetryRequests, telemetryRequest) + default: + collecting = false + } + } + if len(telemetryRequests) > 0 { + c.pushTelemetryRequest(telemetryRequests) + } + case <-ctx.Done(): + return + } + } + }() +} + +func (c *Client) processAndPushTelemetry(data interface{}) { + var telemetryRequest TelemetryRequest + switch v := data.(type) { + case ReceivedMessages: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: ReceivedMessagesMetric, + TelemetryData: c.ProcessReceivedMessages(v), + } + case *v2protocol.Envelope: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: ReceivedEnvelopeMetric, + TelemetryData: c.ProcessReceivedEnvelope(v), + } + case wakuv2.SentEnvelope: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: SentEnvelopeMetric, + TelemetryData: c.ProcessSentEnvelope(v), + } + default: + c.logger.Error("Unknown telemetry data type") + return + } + + c.nextId++ + c.telemetryCh <- telemetryRequest +} + +func (c *Client) pushTelemetryRequest(request []TelemetryRequest) { + url := fmt.Sprintf("%s/record-metrics", c.serverURL) + body, _ := json.Marshal(request) + _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) + if err != nil { + c.logger.Error("Error sending telemetry data", zap.Error(err)) + } +} + +func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { c.logger.Debug("Pushing received messages to telemetry server") - url := fmt.Sprintf("%s/received-messages", c.serverURL) var postBody []map[string]interface{} - for _, message := range messages { + for _, message := range receivedMessages.Messages { postBody = append(postBody, map[string]interface{}{ - "chatId": filter.ChatID, - "messageHash": types.EncodeHex(sshMessage.Hash), + "chatId": receivedMessages.Filter.ChatID, + "messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash), "messageId": message.ApplicationLayer.ID, - "sentAt": sshMessage.Timestamp, - "pubsubTopic": filter.PubsubTopic, - "topic": filter.ContentTopic.String(), + "sentAt": receivedMessages.SSHMessage.Timestamp, + "pubsubTopic": receivedMessages.Filter.PubsubTopic, + "topic": receivedMessages.Filter.ContentTopic.String(), "messageType": message.ApplicationLayer.Type.String(), "receiverKeyUID": c.keyUID, "nodeName": c.nodeName, - "messageSize": len(sshMessage.Payload), + "messageSize": len(receivedMessages.SSHMessage.Payload), + "statusVersion": c.version, }) } body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending message to telemetry server", zap.Error(err)) - } + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage } -func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) { - url := fmt.Sprintf("%s/received-envelope", c.serverURL) +func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage { postBody := map[string]interface{}{ "messageHash": envelope.Hash().String(), "sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)), @@ -68,12 +200,27 @@ func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) { "topic": envelope.Message().ContentTopic, "receiverKeyUID": c.keyUID, "nodeName": c.nodeName, + "statusVersion": c.version, } body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending envelope to telemetry server", zap.Error(err)) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage +} + +func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { + postBody := map[string]interface{}{ + "messageHash": sentEnvelope.Envelope.Hash().String(), + "sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)), + "pubsubTopic": sentEnvelope.Envelope.PubsubTopic(), + "topic": sentEnvelope.Envelope.Message().ContentTopic, + "senderKeyUID": c.keyUID, + "nodeName": c.nodeName, + "publishMethod": sentEnvelope.PublishMethod.String(), + "statusVersion": c.version, } + body, _ := json.Marshal(postBody) + jsonRawMessage := json.RawMessage(body) + return &jsonRawMessage } func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { diff --git a/telemetry/client_test.go b/telemetry/client_test.go new file mode 100644 index 000000000..cc04a3a33 --- /dev/null +++ b/telemetry/client_test.go @@ -0,0 +1,157 @@ +package telemetry + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/transport" + v1protocol "github.com/status-im/status-go/protocol/v1" + "github.com/status-im/status-go/wakuv2" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +func createMockServer(t *testing.T) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("Expected 'POST' request, got '%s'", r.Method) + } + if r.URL.EscapedPath() != "/record-metrics" { + t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) + } + + // Check the request body is as expected + var received []TelemetryRequest + err := json.NewDecoder(r.Body).Decode(&received) + if err != nil { + t.Fatal(err) + } + + if len(received) != 1 { + t.Errorf("Unexpected data received: %+v", received) + } else { + // If the data is as expected, respond with success + t.Log("Responding with success") + w.WriteHeader(http.StatusOK) + } + })) +} + +func TestClient_ProcessReceivedMessages(t *testing.T) { + // Setup a mock server to handle post requests + mockServer := createMockServer(t) + defer mockServer.Close() + + // Create a client with the mock server URL + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + logger, err := config.Build() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + + // Create a telemetry request to send + data := ReceivedMessages{ + Filter: transport.Filter{ + ChatID: "testChat", + PubsubTopic: "testTopic", + ContentTopic: types.StringToTopic("testContentTopic"), + }, + SSHMessage: &types.Message{ + Hash: []byte("hash"), + Timestamp: uint32(time.Now().Unix()), + }, + Messages: []*v1protocol.StatusMessage{ + { + ApplicationLayer: v1protocol.ApplicationLayer{ + ID: types.HexBytes("123"), + Type: 1, + }, + }, + }, + } + telemetryData := client.ProcessReceivedMessages(data) + telemetryRequest := TelemetryRequest{ + Id: 1, + TelemetryType: ReceivedMessagesMetric, + TelemetryData: telemetryData, + } + + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) +} + +func TestClient_ProcessReceivedEnvelope(t *testing.T) { + // Setup a mock server to handle post requests + mockServer := createMockServer(t) + defer mockServer.Close() + + // Create a client with the mock server URL + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + logger, err := config.Build() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + + // Create a telemetry request to send + envelope := v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: "testContentTopic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, "") + telemetryData := client.ProcessReceivedEnvelope(envelope) + telemetryRequest := TelemetryRequest{ + Id: 2, + TelemetryType: ReceivedEnvelopeMetric, + TelemetryData: telemetryData, + } + + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) +} + +func TestClient_ProcessSentEnvelope(t *testing.T) { + // Setup a mock server to handle post requests + mockServer := createMockServer(t) + defer mockServer.Close() + + // Create a client with the mock server URL + config := zap.NewDevelopmentConfig() + config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + logger, err := config.Build() + if err != nil { + t.Fatalf("Failed to create logger: %v", err) + } + client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0") + + // Create a telemetry request to send + sentEnvelope := wakuv2.SentEnvelope{ + Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: "testContentTopic", + Version: proto.Uint32(0), + Timestamp: proto.Int64(time.Now().Unix()), + }, 0, ""), + PublishMethod: wakuv2.LightPush, + } + telemetryData := client.ProcessSentEnvelope(sentEnvelope) + telemetryRequest := TelemetryRequest{ + Id: 3, + TelemetryType: SentEnvelopeMetric, + TelemetryData: telemetryData, + } + + // Send the telemetry request + client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest}) +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 7ff2422d5..3f0c27af4 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -75,6 +75,7 @@ import ( "github.com/status-im/status-go/wakuv2/persistence" node "github.com/waku-org/go-waku/waku/v2/node" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -87,8 +88,14 @@ const maxHashQueryLength = 100 const hashQueryInterval = 5 * time.Second const messageSentPeriod = 5 // in seconds +type SentEnvelope struct { + Envelope *v2protocol.Envelope + PublishMethod PublishMethod +} + type ITelemetryClient interface { - PushReceivedEnvelope(*protocol.Envelope) + PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) + PushSentEnvelope(sentEnvelope SentEnvelope) } // Waku represents a dark communication interface through the Ethereum @@ -947,24 +954,45 @@ func (w *Waku) SkipPublishToTopic(value bool) { w.cfg.SkipPublishToTopic = value } +type PublishMethod int + +const ( + LightPush PublishMethod = iota + Relay +) + +func (pm PublishMethod) String() string { + switch pm { + case LightPush: + return "LightPush" + case Relay: + return "Relay" + default: + return "Unknown" + } +} + func (w *Waku) broadcast() { for { select { case envelope := <-w.sendQueue: logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) var fn publishFn + var publishMethod PublishMethod if w.cfg.SkipPublishToTopic { // For now only used in testing to simulate going offline fn = func(env *protocol.Envelope, logger *zap.Logger) error { return errors.New("test send failure") } } else if w.cfg.LightClient { + publishMethod = LightPush fn = func(env *protocol.Envelope, logger *zap.Logger) error { logger.Info("publishing message via lightpush") _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic())) return err } } else { + publishMethod = Relay fn = func(env *protocol.Envelope, logger *zap.Logger) error { peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic())) logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) @@ -973,6 +1001,22 @@ func (w *Waku) broadcast() { } } + // Wraps the publish function with a call to the telemetry client + if w.statusTelemetryClient != nil { + sendFn := fn + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + err := sendFn(env, logger) + if err == nil { + w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) + } + // else { + // TODO: send error from Relay or LightPush to Telemetry + // w.statusTelemetryClient.PushError(err) + // } + return err + } + } + w.wg.Add(1) go w.publishEnvelope(envelope, fn, logger) case <-w.ctx.Done():