status-go/protocol/anonmetrics/client.go

232 lines
5.5 KiB
Go
Raw Normal View History

Anon Metrics Broadcast (#2198) * Protobufs and adapters * Added basic anon metric service and config init * Added fibonacci interval incrementer * Added basic Client.Start func and integrated interval incrementer * Added new processed field to app metrics table * Added id column to app metrics table * Added migration clean up * Added appmetrics GetUnprocessed and SetToProcessedByIDs and tests There was a wierd bug where metrics in the db that did not explicitly insert a value would be NULL, so could not be found by . In addition I've added a new primary id field to the app_metrics table so that updates could be done against very specific metric rows. * Updated adaptors and db to handle proto_id I need a way to distinguish individual metric items from each other so that I can ignore the ones that have been seen before. * Moved incrementer into dedicated file * Resolve incrementer test fail * Finalised the main loop functionality * Implemented delete loop framework * Updated adaptors file name * Added delete loop delay and quit, and tweak on RawMessage gen * Completed delete loop logic * Added DBLock to prevent deletion during mainLoop * Added postgres DB connection, integrated into anonmetrics.Server * Removed proto_id from SQL migration and model * Integrated postgres with Server and updated adaptors * Function name update * Added sample config files for client and server * Fixes and testing for low level e2e * make generate * Fix lint * Fix for receiving an anonMetricBatch not in server mode * Postgres test fixes * Tidy up, make vendor and make generate * delinting * Fixing database tests * Attempted fix of does: cannot open `does' (No such file or directory) not: cannot open `not' (No such file or directory) exist: cannot open `exist' (No such file or directory) error on sql resource loas * Moved all anon metric postgres migration logic and sources into a the protocol/anonmetrics package or sub packages. I don't know if this will fix the does: cannot open `does' (No such file or directory) not: cannot open `not' (No such file or directory) exist: cannot open `exist' (No such file or directory) error that happens in Jenkins but this could work * Lint for the lint god * Why doesn't the linter list all its problems at once? * test tweaks * Fix for wakuV2 change * DB reset change * Fix for postgres db migrations fails * More robust implementation of postgres test setup and teardown * Added block for anon metrics functionality * Version Bump to 0.84.0 * Added test to check anon metrics broadcast is deactivated * Protobufs and adapters * Added basic anon metric service and config init * Added new processed field to app metrics table * Added id column to app metrics table * Added migration clean up * Added appmetrics GetUnprocessed and SetToProcessedByIDs and tests There was a wierd bug where metrics in the db that did not explicitly insert a value would be NULL, so could not be found by . In addition I've added a new primary id field to the app_metrics table so that updates could be done against very specific metric rows. * Updated adaptors and db to handle proto_id I need a way to distinguish individual metric items from each other so that I can ignore the ones that have been seen before. * Added postgres DB connection, integrated into anonmetrics.Server * Removed proto_id from SQL migration and model * Integrated postgres with Server and updated adaptors * Added sample config files for client and server * Fix lint * Fix for receiving an anonMetricBatch not in server mode * Postgres test fixes * Tidy up, make vendor and make generate * Moved all anon metric postgres migration logic and sources into a the protocol/anonmetrics package or sub packages. I don't know if this will fix the does: cannot open `does' (No such file or directory) not: cannot open `not' (No such file or directory) exist: cannot open `exist' (No such file or directory) error that happens in Jenkins but this could work
2021-09-01 12:02:18 +00:00
package anonmetrics
import (
"context"
"crypto/ecdsa"
"errors"
"sync"
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/status-im/status-go/appmetrics"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf"
)
const ActiveClientPhrase = "yes i am wanting the activation of the anon metrics client, please thank you lots thank you"
type ClientConfig struct {
ShouldSend bool
SendAddress *ecdsa.PublicKey
Active string
}
type Client struct {
Config *ClientConfig
DB *appmetrics.Database
Identity *ecdsa.PrivateKey
Logger *zap.Logger
//messageSender is a message processor used to send metric batch messages
messageSender *common.MessageSender
IntervalInc *FibonacciIntervalIncrementer
// mainLoopQuit is a channel that concurrently orchestrates that the main loop that should be terminated
mainLoopQuit chan struct{}
// deleteLoopQuit is a channel that concurrently orchestrates that the delete loop that should be terminated
deleteLoopQuit chan struct{}
// DBLock prevents deletion of DB items during mainloop
DBLock sync.Mutex
}
func NewClient(sender *common.MessageSender) *Client {
return &Client{
messageSender: sender,
IntervalInc: &FibonacciIntervalIncrementer{
Last: 0,
Current: 1,
},
}
}
func (c *Client) sendUnprocessedMetrics() {
if c.Config.Active != ActiveClientPhrase {
return
}
c.Logger.Debug("sendUnprocessedMetrics() triggered")
c.DBLock.Lock()
defer c.DBLock.Unlock()
// Get all unsent metrics grouped by session id
uam, err := c.DB.GetUnprocessedGroupedBySession()
if err != nil {
c.Logger.Error("failed to get unprocessed messages grouped by session", zap.Error(err))
}
c.Logger.Debug("unprocessed metrics from db", zap.Reflect("uam", uam))
for session, batch := range uam {
c.Logger.Debug("processing uam from session", zap.String("session", session))
// Convert the metrics into protobuf
amb, err := adaptModelsToProtoBatch(batch, &c.Identity.PublicKey)
if err != nil {
c.Logger.Error("failed to adapt models to protobuf batch", zap.Error(err))
return
}
// Generate an ephemeral key per session id
ephemeralKey, err := crypto.GenerateKey()
if err != nil {
c.Logger.Error("failed to generate an ephemeral key", zap.Error(err))
return
}
// Prepare the protobuf message
encodedMessage, err := proto.Marshal(amb)
if err != nil {
c.Logger.Error("failed to marshal protobuf", zap.Error(err))
return
}
rawMessage := common.RawMessage{
Payload: encodedMessage,
Sender: ephemeralKey,
SkipEncryption: true,
SendOnPersonalTopic: true,
MessageType: protobuf.ApplicationMetadataMessage_ANONYMOUS_METRIC_BATCH,
}
c.Logger.Debug("rawMessage prepared from unprocessed anonymous metrics", zap.Reflect("rawMessage", rawMessage))
// Send the metrics batch
_, err = c.messageSender.SendPrivate(context.Background(), c.Config.SendAddress, &rawMessage)
if err != nil {
c.Logger.Error("failed to send metrics batch message", zap.Error(err))
return
}
// Mark metrics as processed
err = c.DB.SetToProcessed(batch)
if err != nil {
c.Logger.Error("failed to set metrics as processed in db", zap.Error(err))
}
}
}
func (c *Client) mainLoop() error {
if c.Config.Active != ActiveClientPhrase {
return nil
}
c.Logger.Debug("mainLoop() triggered")
for {
c.sendUnprocessedMetrics()
waitFor := time.Duration(c.IntervalInc.Next()) * time.Second
c.Logger.Debug("mainLoop() wait interval set", zap.Duration("waitFor", waitFor))
select {
case <-time.After(waitFor):
case <-c.mainLoopQuit:
return nil
}
}
}
func (c *Client) startMainLoop() {
if c.Config.Active != ActiveClientPhrase {
return
}
c.Logger.Debug("startMainLoop() triggered")
c.stopMainLoop()
c.mainLoopQuit = make(chan struct{})
go func() {
c.Logger.Debug("startMainLoop() anonymous go routine triggered")
err := c.mainLoop()
if err != nil {
c.Logger.Error("main loop exited with an error", zap.Error(err))
}
}()
}
func (c *Client) deleteLoop() error {
// Sleep to give the main lock time to process any old messages
time.Sleep(time.Second * 10)
for {
func() {
c.DBLock.Lock()
defer c.DBLock.Unlock()
oneWeekAgo := time.Now().Add(time.Hour * 24 * 7 * -1)
err := c.DB.DeleteOlderThan(&oneWeekAgo)
if err != nil {
c.Logger.Error("failed to delete metrics older than given time",
zap.Time("time given", oneWeekAgo),
zap.Error(err))
}
}()
select {
case <-time.After(time.Hour):
case <-c.deleteLoopQuit:
return nil
}
}
}
func (c *Client) startDeleteLoop() {
c.stopDeleteLoop()
c.deleteLoopQuit = make(chan struct{})
go func() {
err := c.deleteLoop()
if err != nil {
c.Logger.Error("delete loop exited with an error", zap.Error(err))
}
}()
}
func (c *Client) Start() error {
c.Logger.Debug("Main Start() triggered")
if c.messageSender == nil {
return errors.New("can't start, missing message processor")
}
c.startMainLoop()
c.startDeleteLoop()
return nil
}
func (c *Client) stopMainLoop() {
c.Logger.Debug("stopMainLoop() triggered")
if c.mainLoopQuit != nil {
c.Logger.Debug("mainLoopQuit not set, attempting to close")
close(c.mainLoopQuit)
c.mainLoopQuit = nil
}
}
func (c *Client) stopDeleteLoop() {
if c.deleteLoopQuit != nil {
close(c.deleteLoopQuit)
c.deleteLoopQuit = nil
}
}
func (c *Client) Stop() error {
c.stopMainLoop()
c.stopDeleteLoop()
return nil
}