mirror of
https://github.com/status-im/status-go.git
synced 2025-01-18 10:42:07 +00:00
38308d48f2
* feat_: log error and stacktrace when panic in goroutine * test_: add test TestSafeGo * chore_: rename logAndCall to call * chore_: rename SafeGo to Go * chore_: make lint-fix * chore_: use t.Cleanup * chore_: Revert "chore_: use t.Cleanup" This reverts commit 4eb420d179cc0e208e84c13cb941e6b3d1ed9819. * chore_: Revert "chore_: make lint-fix" This reverts commit fcc995f157e671a4229b47419c3a0e4004b5fdab. * chore_: Revert "chore_: rename SafeGo to Go" This reverts commit a6d73d6df583f313032d79aac62f66328039cb55. * chore_: Revert "chore_: rename logAndCall to call" This reverts commit 8fbe993bedb9fbba67349a44f151e2dd5e3bc4cc. * chore_: Revert "test_: add test TestSafeGo" This reverts commit a1fa91839f3960398980c6bf456e6462ec944819. * chore_: Revert "feat_: log error and stacktrace when panic in goroutine" This reverts commit f612dd828fa2ce410d0e806fe773ecbe3e86a68a. * feat_: log error and stacktrace when panic in goroutine * chore_: make lint-fix * chore_: rename logAndCall to call * chore_: renaming LogOnPanic * chore_: update rest goroutine function calls * chore_: make lint-fix
235 lines
5.6 KiB
Go
235 lines
5.6 KiB
Go
package anonmetrics
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/appmetrics"
|
|
gocommon "github.com/status-im/status-go/common"
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/protocol/common"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
)
|
|
|
|
const ActiveClientPhrase = "yes i am wanting the activation of the anon metrics client, please thank you lots thank you"
|
|
|
|
type ClientConfig struct {
|
|
ShouldSend bool
|
|
SendAddress *ecdsa.PublicKey
|
|
Active string
|
|
}
|
|
|
|
type Client struct {
|
|
Config *ClientConfig
|
|
DB *appmetrics.Database
|
|
Identity *ecdsa.PrivateKey
|
|
Logger *zap.Logger
|
|
|
|
//messageSender is a message processor used to send metric batch messages
|
|
messageSender *common.MessageSender
|
|
|
|
IntervalInc *FibonacciIntervalIncrementer
|
|
|
|
// mainLoopQuit is a channel that concurrently orchestrates that the main loop that should be terminated
|
|
mainLoopQuit chan struct{}
|
|
|
|
// deleteLoopQuit is a channel that concurrently orchestrates that the delete loop that should be terminated
|
|
deleteLoopQuit chan struct{}
|
|
|
|
// DBLock prevents deletion of DB items during mainloop
|
|
DBLock sync.Mutex
|
|
}
|
|
|
|
func NewClient(sender *common.MessageSender) *Client {
|
|
return &Client{
|
|
messageSender: sender,
|
|
IntervalInc: &FibonacciIntervalIncrementer{
|
|
Last: 0,
|
|
Current: 1,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (c *Client) sendUnprocessedMetrics() {
|
|
if c.Config.Active != ActiveClientPhrase {
|
|
return
|
|
}
|
|
|
|
c.Logger.Debug("sendUnprocessedMetrics() triggered")
|
|
|
|
c.DBLock.Lock()
|
|
defer c.DBLock.Unlock()
|
|
|
|
// Get all unsent metrics grouped by session id
|
|
uam, err := c.DB.GetUnprocessedGroupedBySession()
|
|
if err != nil {
|
|
c.Logger.Error("failed to get unprocessed messages grouped by session", zap.Error(err))
|
|
}
|
|
c.Logger.Debug("unprocessed metrics from db", zap.Reflect("uam", uam))
|
|
|
|
for session, batch := range uam {
|
|
c.Logger.Debug("processing uam from session", zap.String("session", session))
|
|
|
|
// Convert the metrics into protobuf
|
|
amb, err := adaptModelsToProtoBatch(batch, &c.Identity.PublicKey)
|
|
if err != nil {
|
|
c.Logger.Error("failed to adapt models to protobuf batch", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Generate an ephemeral key per session id
|
|
ephemeralKey, err := crypto.GenerateKey()
|
|
if err != nil {
|
|
c.Logger.Error("failed to generate an ephemeral key", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Prepare the protobuf message
|
|
encodedMessage, err := proto.Marshal(amb)
|
|
if err != nil {
|
|
c.Logger.Error("failed to marshal protobuf", zap.Error(err))
|
|
return
|
|
}
|
|
rawMessage := common.RawMessage{
|
|
Payload: encodedMessage,
|
|
Sender: ephemeralKey,
|
|
SkipEncryptionLayer: true,
|
|
SendOnPersonalTopic: true,
|
|
MessageType: protobuf.ApplicationMetadataMessage_ANONYMOUS_METRIC_BATCH,
|
|
}
|
|
|
|
c.Logger.Debug("rawMessage prepared from unprocessed anonymous metrics", zap.Reflect("rawMessage", rawMessage))
|
|
|
|
// Send the metrics batch
|
|
_, err = c.messageSender.SendPrivate(context.Background(), c.Config.SendAddress, &rawMessage)
|
|
if err != nil {
|
|
c.Logger.Error("failed to send metrics batch message", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// Mark metrics as processed
|
|
err = c.DB.SetToProcessed(batch)
|
|
if err != nil {
|
|
c.Logger.Error("failed to set metrics as processed in db", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) mainLoop() error {
|
|
if c.Config.Active != ActiveClientPhrase {
|
|
return nil
|
|
}
|
|
|
|
c.Logger.Debug("mainLoop() triggered")
|
|
|
|
for {
|
|
c.sendUnprocessedMetrics()
|
|
|
|
waitFor := time.Duration(c.IntervalInc.Next()) * time.Second
|
|
c.Logger.Debug("mainLoop() wait interval set", zap.Duration("waitFor", waitFor))
|
|
select {
|
|
case <-time.After(waitFor):
|
|
case <-c.mainLoopQuit:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) startMainLoop() {
|
|
if c.Config.Active != ActiveClientPhrase {
|
|
return
|
|
}
|
|
|
|
c.Logger.Debug("startMainLoop() triggered")
|
|
|
|
c.stopMainLoop()
|
|
c.mainLoopQuit = make(chan struct{})
|
|
go func() {
|
|
defer gocommon.LogOnPanic()
|
|
c.Logger.Debug("startMainLoop() anonymous go routine triggered")
|
|
err := c.mainLoop()
|
|
if err != nil {
|
|
c.Logger.Error("main loop exited with an error", zap.Error(err))
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (c *Client) deleteLoop() error {
|
|
// Sleep to give the main lock time to process any old messages
|
|
time.Sleep(time.Second * 10)
|
|
|
|
for {
|
|
func() {
|
|
c.DBLock.Lock()
|
|
defer c.DBLock.Unlock()
|
|
|
|
oneWeekAgo := time.Now().Add(time.Hour * 24 * 7 * -1)
|
|
err := c.DB.DeleteOlderThan(&oneWeekAgo)
|
|
if err != nil {
|
|
c.Logger.Error("failed to delete metrics older than given time",
|
|
zap.Time("time given", oneWeekAgo),
|
|
zap.Error(err))
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-time.After(time.Hour):
|
|
case <-c.deleteLoopQuit:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Client) startDeleteLoop() {
|
|
c.stopDeleteLoop()
|
|
c.deleteLoopQuit = make(chan struct{})
|
|
go func() {
|
|
defer gocommon.LogOnPanic()
|
|
err := c.deleteLoop()
|
|
if err != nil {
|
|
c.Logger.Error("delete loop exited with an error", zap.Error(err))
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (c *Client) Start() error {
|
|
c.Logger.Debug("Main Start() triggered")
|
|
if c.messageSender == nil {
|
|
return errors.New("can't start, missing message processor")
|
|
}
|
|
|
|
c.startMainLoop()
|
|
c.startDeleteLoop()
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) stopMainLoop() {
|
|
c.Logger.Debug("stopMainLoop() triggered")
|
|
|
|
if c.mainLoopQuit != nil {
|
|
c.Logger.Debug("mainLoopQuit not set, attempting to close")
|
|
|
|
close(c.mainLoopQuit)
|
|
c.mainLoopQuit = nil
|
|
}
|
|
}
|
|
|
|
func (c *Client) stopDeleteLoop() {
|
|
if c.deleteLoopQuit != nil {
|
|
close(c.deleteLoopQuit)
|
|
c.deleteLoopQuit = nil
|
|
}
|
|
}
|
|
|
|
func (c *Client) Stop() error {
|
|
c.stopMainLoop()
|
|
c.stopDeleteLoop()
|
|
return nil
|
|
}
|