feat_: batch all telemetry data and send request every 10 seconds (#5251)
* fix_: add status telemetry client to cli * feat_: call telemetry when pushing an envelope * feat_: log status version in all telemetry calls * feat_: batch all telemetry data and send request every 10 seconds
This commit is contained in:
parent
ea5c444dbe
commit
1bbb2537b4
|
@ -1,6 +1,7 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
@ -12,6 +13,7 @@ import (
|
||||||
"github.com/status-im/status-go/multiaccounts"
|
"github.com/status-im/status-go/multiaccounts"
|
||||||
"github.com/status-im/status-go/protocol/requests"
|
"github.com/status-im/status-go/protocol/requests"
|
||||||
"github.com/status-im/status-go/services/wakuv2ext"
|
"github.com/status-im/status-go/services/wakuv2ext"
|
||||||
|
"github.com/status-im/status-go/telemetry"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -41,7 +43,7 @@ func start(name string, port int, apiModules string, telemetryUrl string, useExi
|
||||||
)
|
)
|
||||||
setupLogger(name)
|
setupLogger(name)
|
||||||
nlog := logger.Named(name)
|
nlog := logger.Named(name)
|
||||||
nlog.Info("starting messager")
|
nlog.Info("starting messenger")
|
||||||
|
|
||||||
backend := api.NewGethStatusBackend()
|
backend := api.NewGethStatusBackend()
|
||||||
if useExistingAccount {
|
if useExistingAccount {
|
||||||
|
@ -60,6 +62,12 @@ func start(name string, port int, apiModules string, telemetryUrl string, useExi
|
||||||
if wakuService == nil {
|
if wakuService == nil {
|
||||||
return nil, errors.New("waku service is not available")
|
return nil, errors.New("waku service is not available")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if telemetryUrl != "" {
|
||||||
|
telemetryClient := telemetry.NewClient(nlog.Desugar(), telemetryUrl, backend.SelectedAccountKeyID(), name, "cli")
|
||||||
|
go telemetryClient.Start(context.Background())
|
||||||
|
backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient)
|
||||||
|
}
|
||||||
wakuAPI := wakuv2ext.NewPublicAPI(wakuService)
|
wakuAPI := wakuv2ext.NewPublicAPI(wakuService)
|
||||||
|
|
||||||
messenger := wakuAPI.Messenger()
|
messenger := wakuAPI.Messenger()
|
||||||
|
|
|
@ -280,6 +280,7 @@ func main() {
|
||||||
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
|
gethbridge.NewNodeBridge(backend.StatusNode().GethNode(), backend.StatusNode().WakuService(), backend.StatusNode().WakuV2Service()),
|
||||||
installationID.String(),
|
installationID.String(),
|
||||||
nil,
|
nil,
|
||||||
|
config.Version,
|
||||||
options...,
|
options...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -299,6 +299,7 @@ func NewMessenger(
|
||||||
node types.Node,
|
node types.Node,
|
||||||
installationID string,
|
installationID string,
|
||||||
peerStore *mailservers.PeerStore,
|
peerStore *mailservers.PeerStore,
|
||||||
|
version string,
|
||||||
opts ...Option,
|
opts ...Option,
|
||||||
) (*Messenger, error) {
|
) (*Messenger, error) {
|
||||||
var messenger *Messenger
|
var messenger *Messenger
|
||||||
|
@ -428,10 +429,11 @@ func NewMessenger(
|
||||||
|
|
||||||
var telemetryClient *telemetry.Client
|
var telemetryClient *telemetry.Client
|
||||||
if c.telemetryServerURL != "" {
|
if c.telemetryServerURL != "" {
|
||||||
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName)
|
telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version)
|
||||||
if c.wakuService != nil {
|
if c.wakuService != nil {
|
||||||
c.wakuService.SetStatusTelemetryClient(telemetryClient)
|
c.wakuService.SetStatusTelemetryClient(telemetryClient)
|
||||||
}
|
}
|
||||||
|
go telemetryClient.Start(messenger.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize push notification server
|
// Initialize push notification server
|
||||||
|
@ -3861,7 +3863,11 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
|
||||||
statusMessages := handleMessagesResponse.StatusMessages
|
statusMessages := handleMessagesResponse.StatusMessages
|
||||||
|
|
||||||
if m.telemetryClient != nil {
|
if m.telemetryClient != nil {
|
||||||
go m.telemetryClient.PushReceivedMessages(filter, shhMessage, statusMessages)
|
m.telemetryClient.PushReceivedMessages(telemetry.ReceivedMessages{
|
||||||
|
Filter: filter,
|
||||||
|
SSHMessage: shhMessage,
|
||||||
|
Messages: statusMessages,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.handleDatasyncMetadata(handleMessagesResponse)
|
err = m.handleDatasyncMetadata(handleMessagesResponse)
|
||||||
|
|
|
@ -92,6 +92,7 @@ func newTestMessenger(waku types.Waku, config testMessengerConfig) (*Messenger,
|
||||||
&testNode{shh: waku},
|
&testNode{shh: waku},
|
||||||
uuid.New().String(),
|
uuid.New().String(),
|
||||||
nil,
|
nil,
|
||||||
|
"testVersion",
|
||||||
options...,
|
options...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -174,6 +174,7 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, appD
|
||||||
s.n,
|
s.n,
|
||||||
s.config.ShhextConfig.InstallationID,
|
s.config.ShhextConfig.InstallationID,
|
||||||
s.peerStore,
|
s.peerStore,
|
||||||
|
params.Version,
|
||||||
options...,
|
options...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package telemetry
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -11,56 +12,187 @@ import (
|
||||||
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
"github.com/status-im/status-go/protocol/transport"
|
"github.com/status-im/status-go/protocol/transport"
|
||||||
v1protocol "github.com/status-im/status-go/protocol/v1"
|
"github.com/status-im/status-go/wakuv2"
|
||||||
|
|
||||||
|
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||||
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type TelemetryType string
|
||||||
serverURL string
|
|
||||||
httpClient *http.Client
|
const (
|
||||||
logger *zap.Logger
|
ProtocolStatsMetric TelemetryType = "ProtocolStats"
|
||||||
keyUID string
|
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
|
||||||
nodeName string
|
SentEnvelopeMetric TelemetryType = "SentEnvelope"
|
||||||
|
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
|
||||||
|
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TelemetryRequest struct {
|
||||||
|
Id int `json:"id"`
|
||||||
|
TelemetryType TelemetryType `json:"telemetry_type"`
|
||||||
|
TelemetryData *json.RawMessage `json:"telemetry_data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string) *Client {
|
func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) {
|
||||||
|
c.receivedMessagesCh <- receivedMessages
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) {
|
||||||
|
c.sentEnvelopeCh <- sentEnvelope
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope) {
|
||||||
|
c.receivedEnvelopeCh <- receivedEnvelope
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReceivedMessages struct {
|
||||||
|
Filter transport.Filter
|
||||||
|
SSHMessage *types.Message
|
||||||
|
Messages []*v1protocol.StatusMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
serverURL string
|
||||||
|
httpClient *http.Client
|
||||||
|
logger *zap.Logger
|
||||||
|
keyUID string
|
||||||
|
nodeName string
|
||||||
|
version string
|
||||||
|
receivedMessagesCh chan ReceivedMessages
|
||||||
|
receivedEnvelopeCh chan *v2protocol.Envelope
|
||||||
|
sentEnvelopeCh chan wakuv2.SentEnvelope
|
||||||
|
telemetryCh chan TelemetryRequest
|
||||||
|
nextId int
|
||||||
|
sendPeriod time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
serverURL: serverURL,
|
serverURL: serverURL,
|
||||||
httpClient: &http.Client{Timeout: time.Minute},
|
httpClient: &http.Client{Timeout: time.Minute},
|
||||||
logger: logger,
|
logger: logger,
|
||||||
keyUID: keyUID,
|
keyUID: keyUID,
|
||||||
nodeName: nodeName,
|
nodeName: nodeName,
|
||||||
|
version: version,
|
||||||
|
receivedMessagesCh: make(chan ReceivedMessages),
|
||||||
|
receivedEnvelopeCh: make(chan *v2protocol.Envelope),
|
||||||
|
sentEnvelopeCh: make(chan wakuv2.SentEnvelope),
|
||||||
|
telemetryCh: make(chan TelemetryRequest),
|
||||||
|
nextId: 0,
|
||||||
|
sendPeriod: 10 * time.Second,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) PushReceivedMessages(filter transport.Filter, sshMessage *types.Message, messages []*v1protocol.StatusMessage) {
|
func (c *Client) CollectAndProcessTelemetry(ctx context.Context) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case receivedMessages := <-c.receivedMessagesCh:
|
||||||
|
c.processAndPushTelemetry(receivedMessages)
|
||||||
|
case receivedEnvelope := <-c.receivedEnvelopeCh:
|
||||||
|
c.processAndPushTelemetry(receivedEnvelope)
|
||||||
|
case sentEnvelope := <-c.sentEnvelopeCh:
|
||||||
|
c.processAndPushTelemetry(sentEnvelope)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Start(ctx context.Context) {
|
||||||
|
go c.CollectAndProcessTelemetry(ctx)
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(c.sendPeriod)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
var telemetryRequests []TelemetryRequest
|
||||||
|
collecting := true
|
||||||
|
for collecting {
|
||||||
|
select {
|
||||||
|
case telemetryRequest := <-c.telemetryCh:
|
||||||
|
telemetryRequests = append(telemetryRequests, telemetryRequest)
|
||||||
|
default:
|
||||||
|
collecting = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(telemetryRequests) > 0 {
|
||||||
|
c.pushTelemetryRequest(telemetryRequests)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) processAndPushTelemetry(data interface{}) {
|
||||||
|
var telemetryRequest TelemetryRequest
|
||||||
|
switch v := data.(type) {
|
||||||
|
case ReceivedMessages:
|
||||||
|
telemetryRequest = TelemetryRequest{
|
||||||
|
Id: c.nextId,
|
||||||
|
TelemetryType: ReceivedMessagesMetric,
|
||||||
|
TelemetryData: c.ProcessReceivedMessages(v),
|
||||||
|
}
|
||||||
|
case *v2protocol.Envelope:
|
||||||
|
telemetryRequest = TelemetryRequest{
|
||||||
|
Id: c.nextId,
|
||||||
|
TelemetryType: ReceivedEnvelopeMetric,
|
||||||
|
TelemetryData: c.ProcessReceivedEnvelope(v),
|
||||||
|
}
|
||||||
|
case wakuv2.SentEnvelope:
|
||||||
|
telemetryRequest = TelemetryRequest{
|
||||||
|
Id: c.nextId,
|
||||||
|
TelemetryType: SentEnvelopeMetric,
|
||||||
|
TelemetryData: c.ProcessSentEnvelope(v),
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
c.logger.Error("Unknown telemetry data type")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.nextId++
|
||||||
|
c.telemetryCh <- telemetryRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) pushTelemetryRequest(request []TelemetryRequest) {
|
||||||
|
url := fmt.Sprintf("%s/record-metrics", c.serverURL)
|
||||||
|
body, _ := json.Marshal(request)
|
||||||
|
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error("Error sending telemetry data", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage {
|
||||||
c.logger.Debug("Pushing received messages to telemetry server")
|
c.logger.Debug("Pushing received messages to telemetry server")
|
||||||
url := fmt.Sprintf("%s/received-messages", c.serverURL)
|
|
||||||
var postBody []map[string]interface{}
|
var postBody []map[string]interface{}
|
||||||
for _, message := range messages {
|
for _, message := range receivedMessages.Messages {
|
||||||
postBody = append(postBody, map[string]interface{}{
|
postBody = append(postBody, map[string]interface{}{
|
||||||
"chatId": filter.ChatID,
|
"chatId": receivedMessages.Filter.ChatID,
|
||||||
"messageHash": types.EncodeHex(sshMessage.Hash),
|
"messageHash": types.EncodeHex(receivedMessages.SSHMessage.Hash),
|
||||||
"messageId": message.ApplicationLayer.ID,
|
"messageId": message.ApplicationLayer.ID,
|
||||||
"sentAt": sshMessage.Timestamp,
|
"sentAt": receivedMessages.SSHMessage.Timestamp,
|
||||||
"pubsubTopic": filter.PubsubTopic,
|
"pubsubTopic": receivedMessages.Filter.PubsubTopic,
|
||||||
"topic": filter.ContentTopic.String(),
|
"topic": receivedMessages.Filter.ContentTopic.String(),
|
||||||
"messageType": message.ApplicationLayer.Type.String(),
|
"messageType": message.ApplicationLayer.Type.String(),
|
||||||
"receiverKeyUID": c.keyUID,
|
"receiverKeyUID": c.keyUID,
|
||||||
"nodeName": c.nodeName,
|
"nodeName": c.nodeName,
|
||||||
"messageSize": len(sshMessage.Payload),
|
"messageSize": len(receivedMessages.SSHMessage.Payload),
|
||||||
|
"statusVersion": c.version,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
body, _ := json.Marshal(postBody)
|
body, _ := json.Marshal(postBody)
|
||||||
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
|
jsonRawMessage := json.RawMessage(body)
|
||||||
if err != nil {
|
return &jsonRawMessage
|
||||||
c.logger.Error("Error sending message to telemetry server", zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) {
|
func (c *Client) ProcessReceivedEnvelope(envelope *v2protocol.Envelope) *json.RawMessage {
|
||||||
url := fmt.Sprintf("%s/received-envelope", c.serverURL)
|
|
||||||
postBody := map[string]interface{}{
|
postBody := map[string]interface{}{
|
||||||
"messageHash": envelope.Hash().String(),
|
"messageHash": envelope.Hash().String(),
|
||||||
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
|
"sentAt": uint32(envelope.Message().GetTimestamp() / int64(time.Second)),
|
||||||
|
@ -68,12 +200,27 @@ func (c *Client) PushReceivedEnvelope(envelope *v2protocol.Envelope) {
|
||||||
"topic": envelope.Message().ContentTopic,
|
"topic": envelope.Message().ContentTopic,
|
||||||
"receiverKeyUID": c.keyUID,
|
"receiverKeyUID": c.keyUID,
|
||||||
"nodeName": c.nodeName,
|
"nodeName": c.nodeName,
|
||||||
|
"statusVersion": c.version,
|
||||||
}
|
}
|
||||||
body, _ := json.Marshal(postBody)
|
body, _ := json.Marshal(postBody)
|
||||||
_, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body))
|
jsonRawMessage := json.RawMessage(body)
|
||||||
if err != nil {
|
return &jsonRawMessage
|
||||||
c.logger.Error("Error sending envelope to telemetry server", zap.Error(err))
|
}
|
||||||
|
|
||||||
|
func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage {
|
||||||
|
postBody := map[string]interface{}{
|
||||||
|
"messageHash": sentEnvelope.Envelope.Hash().String(),
|
||||||
|
"sentAt": uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)),
|
||||||
|
"pubsubTopic": sentEnvelope.Envelope.PubsubTopic(),
|
||||||
|
"topic": sentEnvelope.Envelope.Message().ContentTopic,
|
||||||
|
"senderKeyUID": c.keyUID,
|
||||||
|
"nodeName": c.nodeName,
|
||||||
|
"publishMethod": sentEnvelope.PublishMethod.String(),
|
||||||
|
"statusVersion": c.version,
|
||||||
}
|
}
|
||||||
|
body, _ := json.Marshal(postBody)
|
||||||
|
jsonRawMessage := json.RawMessage(body)
|
||||||
|
return &jsonRawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
|
func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) {
|
||||||
|
|
|
@ -0,0 +1,157 @@
|
||||||
|
package telemetry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
|
"github.com/status-im/status-go/protocol/transport"
|
||||||
|
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||||
|
"github.com/status-im/status-go/wakuv2"
|
||||||
|
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createMockServer(t *testing.T) *httptest.Server {
|
||||||
|
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != "POST" {
|
||||||
|
t.Errorf("Expected 'POST' request, got '%s'", r.Method)
|
||||||
|
}
|
||||||
|
if r.URL.EscapedPath() != "/record-metrics" {
|
||||||
|
t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the request body is as expected
|
||||||
|
var received []TelemetryRequest
|
||||||
|
err := json.NewDecoder(r.Body).Decode(&received)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(received) != 1 {
|
||||||
|
t.Errorf("Unexpected data received: %+v", received)
|
||||||
|
} else {
|
||||||
|
// If the data is as expected, respond with success
|
||||||
|
t.Log("Responding with success")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_ProcessReceivedMessages(t *testing.T) {
|
||||||
|
// Setup a mock server to handle post requests
|
||||||
|
mockServer := createMockServer(t)
|
||||||
|
defer mockServer.Close()
|
||||||
|
|
||||||
|
// Create a client with the mock server URL
|
||||||
|
config := zap.NewDevelopmentConfig()
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||||
|
logger, err := config.Build()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create logger: %v", err)
|
||||||
|
}
|
||||||
|
client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0")
|
||||||
|
|
||||||
|
// Create a telemetry request to send
|
||||||
|
data := ReceivedMessages{
|
||||||
|
Filter: transport.Filter{
|
||||||
|
ChatID: "testChat",
|
||||||
|
PubsubTopic: "testTopic",
|
||||||
|
ContentTopic: types.StringToTopic("testContentTopic"),
|
||||||
|
},
|
||||||
|
SSHMessage: &types.Message{
|
||||||
|
Hash: []byte("hash"),
|
||||||
|
Timestamp: uint32(time.Now().Unix()),
|
||||||
|
},
|
||||||
|
Messages: []*v1protocol.StatusMessage{
|
||||||
|
{
|
||||||
|
ApplicationLayer: v1protocol.ApplicationLayer{
|
||||||
|
ID: types.HexBytes("123"),
|
||||||
|
Type: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
telemetryData := client.ProcessReceivedMessages(data)
|
||||||
|
telemetryRequest := TelemetryRequest{
|
||||||
|
Id: 1,
|
||||||
|
TelemetryType: ReceivedMessagesMetric,
|
||||||
|
TelemetryData: telemetryData,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the telemetry request
|
||||||
|
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_ProcessReceivedEnvelope(t *testing.T) {
|
||||||
|
// Setup a mock server to handle post requests
|
||||||
|
mockServer := createMockServer(t)
|
||||||
|
defer mockServer.Close()
|
||||||
|
|
||||||
|
// Create a client with the mock server URL
|
||||||
|
config := zap.NewDevelopmentConfig()
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||||
|
logger, err := config.Build()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create logger: %v", err)
|
||||||
|
}
|
||||||
|
client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0")
|
||||||
|
|
||||||
|
// Create a telemetry request to send
|
||||||
|
envelope := v2protocol.NewEnvelope(&pb.WakuMessage{
|
||||||
|
Payload: []byte{1, 2, 3, 4, 5},
|
||||||
|
ContentTopic: "testContentTopic",
|
||||||
|
Version: proto.Uint32(0),
|
||||||
|
Timestamp: proto.Int64(time.Now().Unix()),
|
||||||
|
}, 0, "")
|
||||||
|
telemetryData := client.ProcessReceivedEnvelope(envelope)
|
||||||
|
telemetryRequest := TelemetryRequest{
|
||||||
|
Id: 2,
|
||||||
|
TelemetryType: ReceivedEnvelopeMetric,
|
||||||
|
TelemetryData: telemetryData,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the telemetry request
|
||||||
|
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClient_ProcessSentEnvelope(t *testing.T) {
|
||||||
|
// Setup a mock server to handle post requests
|
||||||
|
mockServer := createMockServer(t)
|
||||||
|
defer mockServer.Close()
|
||||||
|
|
||||||
|
// Create a client with the mock server URL
|
||||||
|
config := zap.NewDevelopmentConfig()
|
||||||
|
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
|
||||||
|
logger, err := config.Build()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to create logger: %v", err)
|
||||||
|
}
|
||||||
|
client := NewClient(logger, mockServer.URL, "testUID", "testNode", "1.0")
|
||||||
|
|
||||||
|
// Create a telemetry request to send
|
||||||
|
sentEnvelope := wakuv2.SentEnvelope{
|
||||||
|
Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{
|
||||||
|
Payload: []byte{1, 2, 3, 4, 5},
|
||||||
|
ContentTopic: "testContentTopic",
|
||||||
|
Version: proto.Uint32(0),
|
||||||
|
Timestamp: proto.Int64(time.Now().Unix()),
|
||||||
|
}, 0, ""),
|
||||||
|
PublishMethod: wakuv2.LightPush,
|
||||||
|
}
|
||||||
|
telemetryData := client.ProcessSentEnvelope(sentEnvelope)
|
||||||
|
telemetryRequest := TelemetryRequest{
|
||||||
|
Id: 3,
|
||||||
|
TelemetryType: SentEnvelopeMetric,
|
||||||
|
TelemetryData: telemetryData,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the telemetry request
|
||||||
|
client.pushTelemetryRequest([]TelemetryRequest{telemetryRequest})
|
||||||
|
}
|
|
@ -75,6 +75,7 @@ import (
|
||||||
"github.com/status-im/status-go/wakuv2/persistence"
|
"github.com/status-im/status-go/wakuv2/persistence"
|
||||||
|
|
||||||
node "github.com/waku-org/go-waku/waku/v2/node"
|
node "github.com/waku-org/go-waku/waku/v2/node"
|
||||||
|
v2protocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -87,8 +88,14 @@ const maxHashQueryLength = 100
|
||||||
const hashQueryInterval = 5 * time.Second
|
const hashQueryInterval = 5 * time.Second
|
||||||
const messageSentPeriod = 5 // in seconds
|
const messageSentPeriod = 5 // in seconds
|
||||||
|
|
||||||
|
type SentEnvelope struct {
|
||||||
|
Envelope *v2protocol.Envelope
|
||||||
|
PublishMethod PublishMethod
|
||||||
|
}
|
||||||
|
|
||||||
type ITelemetryClient interface {
|
type ITelemetryClient interface {
|
||||||
PushReceivedEnvelope(*protocol.Envelope)
|
PushReceivedEnvelope(receivedEnvelope *v2protocol.Envelope)
|
||||||
|
PushSentEnvelope(sentEnvelope SentEnvelope)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Waku represents a dark communication interface through the Ethereum
|
// Waku represents a dark communication interface through the Ethereum
|
||||||
|
@ -947,24 +954,45 @@ func (w *Waku) SkipPublishToTopic(value bool) {
|
||||||
w.cfg.SkipPublishToTopic = value
|
w.cfg.SkipPublishToTopic = value
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PublishMethod int
|
||||||
|
|
||||||
|
const (
|
||||||
|
LightPush PublishMethod = iota
|
||||||
|
Relay
|
||||||
|
)
|
||||||
|
|
||||||
|
func (pm PublishMethod) String() string {
|
||||||
|
switch pm {
|
||||||
|
case LightPush:
|
||||||
|
return "LightPush"
|
||||||
|
case Relay:
|
||||||
|
return "Relay"
|
||||||
|
default:
|
||||||
|
return "Unknown"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (w *Waku) broadcast() {
|
func (w *Waku) broadcast() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case envelope := <-w.sendQueue:
|
case envelope := <-w.sendQueue:
|
||||||
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
|
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
|
||||||
var fn publishFn
|
var fn publishFn
|
||||||
|
var publishMethod PublishMethod
|
||||||
if w.cfg.SkipPublishToTopic {
|
if w.cfg.SkipPublishToTopic {
|
||||||
// For now only used in testing to simulate going offline
|
// For now only used in testing to simulate going offline
|
||||||
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
||||||
return errors.New("test send failure")
|
return errors.New("test send failure")
|
||||||
}
|
}
|
||||||
} else if w.cfg.LightClient {
|
} else if w.cfg.LightClient {
|
||||||
|
publishMethod = LightPush
|
||||||
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
||||||
logger.Info("publishing message via lightpush")
|
logger.Info("publishing message via lightpush")
|
||||||
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()))
|
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
publishMethod = Relay
|
||||||
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
||||||
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
|
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
|
||||||
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
|
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
|
||||||
|
@ -973,6 +1001,22 @@ func (w *Waku) broadcast() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wraps the publish function with a call to the telemetry client
|
||||||
|
if w.statusTelemetryClient != nil {
|
||||||
|
sendFn := fn
|
||||||
|
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
|
||||||
|
err := sendFn(env, logger)
|
||||||
|
if err == nil {
|
||||||
|
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod})
|
||||||
|
}
|
||||||
|
// else {
|
||||||
|
// TODO: send error from Relay or LightPush to Telemetry
|
||||||
|
// w.statusTelemetryClient.PushError(err)
|
||||||
|
// }
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
w.wg.Add(1)
|
w.wg.Add(1)
|
||||||
go w.publishEnvelope(envelope, fn, logger)
|
go w.publishEnvelope(envelope, fn, logger)
|
||||||
case <-w.ctx.Done():
|
case <-w.ctx.Done():
|
||||||
|
|
Loading…
Reference in New Issue