mirror of
https://github.com/logos-messaging/telemetry.git
synced 2026-01-02 14:13:05 +00:00
feat: calculate aggregation algorithm (#1)
This commit is contained in:
parent
98a049d541
commit
8dc19b6299
@ -1 +1,21 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"time"
|
||||
|
||||
"github.com/status-im/dev-telemetry/telemetry"
|
||||
)
|
||||
|
||||
func main() {
|
||||
seconds := flag.Int("seconds", 3600, "Number of seconds to aggregate")
|
||||
dataSourceName := flag.String("data-source-name", "", "DB URL")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
db := telemetry.OpenDb(*dataSourceName)
|
||||
defer db.Close()
|
||||
|
||||
aggregator := telemetry.NewAggregator(db)
|
||||
aggregator.Run(time.Duration(*seconds))
|
||||
}
|
||||
|
||||
@ -8,17 +8,11 @@ import (
|
||||
|
||||
func main() {
|
||||
port := flag.Int("port", 8080, "Port number")
|
||||
dbUsername := flag.String("db-username", "", "Db username")
|
||||
dbPassword := flag.String("db-password", "", "Db password")
|
||||
dbName := flag.String("db-name", "", "Db name")
|
||||
dataSourceName := flag.String("data-source-name", "", "DB URL")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
db := telemetry.OpenDb(
|
||||
*dbUsername,
|
||||
*dbPassword,
|
||||
*dbName,
|
||||
)
|
||||
db := telemetry.OpenDb(*dataSourceName)
|
||||
defer db.Close()
|
||||
|
||||
server := telemetry.NewServer(db)
|
||||
|
||||
2
go.mod
2
go.mod
@ -4,5 +4,7 @@ go 1.15
|
||||
|
||||
require (
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/lib/pq v1.10.3
|
||||
github.com/mattn/go-sqlite3 v1.14.9
|
||||
github.com/stretchr/testify v1.7.0
|
||||
)
|
||||
|
||||
13
go.sum
13
go.sum
@ -1,4 +1,17 @@
|
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
|
||||
github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA=
|
||||
github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
124
telemetry/aggregator.go
Normal file
124
telemetry/aggregator.go
Normal file
@ -0,0 +1,124 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Aggregator struct {
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
func NewAggregator(db *sql.DB) *Aggregator {
|
||||
return &Aggregator{
|
||||
DB: db,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Aggregator) Run(d time.Duration) {
|
||||
// Define the duration starts and end.
|
||||
// Allow a buffer of the duration to define the start and end.
|
||||
// This is to ensure we wait for people not being connected or if they received messages with delay
|
||||
runAt := time.Now()
|
||||
endsAt := runAt.Add(-d)
|
||||
startsAt := endsAt.Add(-d)
|
||||
|
||||
// Query all received message for a specific duration
|
||||
receivedMessages, err := queryReceivedMessagesBetween(a.DB, startsAt, endsAt)
|
||||
if err != nil {
|
||||
log.Fatalf("could not query received message: %s", err)
|
||||
}
|
||||
|
||||
// Collect all key uids
|
||||
receiverKeyUIDs := make(map[string]bool)
|
||||
for _, receivedMessage := range receivedMessages {
|
||||
receiverKeyUIDs[receivedMessage.ReceiverKeyUID] = true
|
||||
}
|
||||
|
||||
// Ensure the specific key uids received a message after the end of the duration
|
||||
// That way we know that this specific key uid has been connected
|
||||
for receiverKeyUID := range receiverKeyUIDs {
|
||||
ok, err := didReceivedMessageAfter(a.DB, receiverKeyUID, endsAt)
|
||||
if err != nil {
|
||||
log.Fatalf("could not check key UID: %s, because of %s", receiverKeyUID, err)
|
||||
}
|
||||
if !ok {
|
||||
receiverKeyUIDs[receiverKeyUID] = false
|
||||
}
|
||||
}
|
||||
|
||||
// Group the received messages by chat id and key uid
|
||||
groupedMessages := make(map[string]map[string]int)
|
||||
for _, receivedMessage := range receivedMessages {
|
||||
// Skip receiver key uid if it has not been connected
|
||||
if !receiverKeyUIDs[receivedMessage.ReceiverKeyUID] {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := groupedMessages[receivedMessage.ChatID]; !ok {
|
||||
groupedMessages[receivedMessage.ChatID] = make(map[string]int)
|
||||
}
|
||||
groupedMessages[receivedMessage.ChatID][receivedMessage.ReceiverKeyUID] += 1
|
||||
}
|
||||
|
||||
if len(groupedMessages) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Rch = 1 - count_of_message_missing / total_number_of_messages
|
||||
|
||||
// Calculate the reliability for each channel as:
|
||||
// Rch = 1 - count_of_message_missing / total_number_of_messages
|
||||
rChatID := make(map[string]float64)
|
||||
for chatID, countByKeyUID := range groupedMessages {
|
||||
messageMissing := 0
|
||||
totalMessages := 0
|
||||
|
||||
max := 0
|
||||
for _, count := range countByKeyUID {
|
||||
if count > max {
|
||||
max = count
|
||||
}
|
||||
}
|
||||
|
||||
for _, count := range countByKeyUID {
|
||||
totalMessages += count
|
||||
messageMissing += max - count
|
||||
}
|
||||
|
||||
rChatID[chatID] = 1 - float64(messageMissing)/float64(totalMessages)
|
||||
}
|
||||
|
||||
// Store all aggregation
|
||||
for ChatID, rChatID := range rChatID {
|
||||
rma := ReceivedMessageAggregated{
|
||||
ChatID: ChatID,
|
||||
DurationInSeconds: int64(d.Seconds()),
|
||||
Value: rChatID,
|
||||
RunAt: runAt.Unix(),
|
||||
}
|
||||
err := rma.put(a.DB)
|
||||
if err != nil {
|
||||
log.Fatalf("could not store received message aggregated: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate the global reliability R = (R(0) + R(1)+ .... + R(n)) / len(Rch)
|
||||
rChatIDTotal := 0.0
|
||||
for _, v := range rChatID {
|
||||
rChatIDTotal += v
|
||||
}
|
||||
|
||||
r := rChatIDTotal / float64(len(rChatID))
|
||||
rma := ReceivedMessageAggregated{
|
||||
ChatID: "",
|
||||
DurationInSeconds: int64(d.Seconds()),
|
||||
Value: r,
|
||||
RunAt: runAt.Unix(),
|
||||
}
|
||||
err = rma.put(a.DB)
|
||||
if err != nil {
|
||||
log.Fatalf("could not store received message aggregated: %s", err)
|
||||
}
|
||||
}
|
||||
149
telemetry/aggregator_test.go
Normal file
149
telemetry/aggregator_test.go
Normal file
@ -0,0 +1,149 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func NewMock() *sql.DB {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
if err != nil {
|
||||
log.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
|
||||
}
|
||||
|
||||
err = createTables(db)
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("an error '%s' was not expected when migrating the db", err)
|
||||
}
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) {
|
||||
rows, err := db.Query("SELECT * FROM receivedMessageAggregated")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var receivedMessageAggregateds []*ReceivedMessageAggregated
|
||||
for rows.Next() {
|
||||
var receivedMessageAggregated ReceivedMessageAggregated
|
||||
err = rows.Scan(
|
||||
&receivedMessageAggregated.ID,
|
||||
&receivedMessageAggregated.DurationInSeconds,
|
||||
&receivedMessageAggregated.ChatID,
|
||||
&receivedMessageAggregated.Value,
|
||||
&receivedMessageAggregated.RunAt,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
receivedMessageAggregateds = append(receivedMessageAggregateds, &receivedMessageAggregated)
|
||||
}
|
||||
return receivedMessageAggregateds, nil
|
||||
}
|
||||
|
||||
func TestRunAggregatorSimple(t *testing.T) {
|
||||
db := NewMock()
|
||||
|
||||
m := &ReceivedMessage{
|
||||
ChatID: "1",
|
||||
MessageHash: "1",
|
||||
ReceiverKeyUID: "1",
|
||||
SentAt: time.Now().Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
oneHourAndHalf := time.Hour + time.Minute*30
|
||||
m = &ReceivedMessage{
|
||||
ChatID: "3",
|
||||
MessageHash: "2",
|
||||
ReceiverKeyUID: "1",
|
||||
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
agg := NewAggregator(db)
|
||||
|
||||
agg.Run(time.Hour)
|
||||
|
||||
res, err := queryAggregatedMessage(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 2)
|
||||
require.Equal(t, "3", res[0].ChatID)
|
||||
require.Equal(t, 1.0, res[0].Value)
|
||||
require.Equal(t, "", res[1].ChatID)
|
||||
require.Equal(t, 1.0, res[1].Value)
|
||||
}
|
||||
|
||||
func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
|
||||
db := NewMock()
|
||||
|
||||
m := &ReceivedMessage{
|
||||
ChatID: "1",
|
||||
MessageHash: "1",
|
||||
ReceiverKeyUID: "1",
|
||||
SentAt: time.Now().Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
oneHourAndHalf := time.Hour + time.Minute*30
|
||||
m = &ReceivedMessage{
|
||||
ChatID: "3",
|
||||
MessageHash: "2",
|
||||
ReceiverKeyUID: "1",
|
||||
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
m = &ReceivedMessage{
|
||||
ChatID: "3",
|
||||
MessageHash: "3",
|
||||
ReceiverKeyUID: "1",
|
||||
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
m = &ReceivedMessage{
|
||||
ChatID: "1",
|
||||
MessageHash: "1",
|
||||
ReceiverKeyUID: "2",
|
||||
SentAt: time.Now().Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
m = &ReceivedMessage{
|
||||
ChatID: "3",
|
||||
MessageHash: "2",
|
||||
ReceiverKeyUID: "2",
|
||||
SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
|
||||
Topic: "1",
|
||||
}
|
||||
m.put(db)
|
||||
|
||||
agg := NewAggregator(db)
|
||||
|
||||
agg.Run(time.Hour)
|
||||
|
||||
res, err := queryAggregatedMessage(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res, 2)
|
||||
require.Equal(t, "3", res[0].ChatID)
|
||||
require.Equal(t, 0.67, math.Round(res[0].Value*100)/100)
|
||||
require.Equal(t, "", res[1].ChatID)
|
||||
require.Equal(t, 0.67, math.Round(res[1].Value*100)/100)
|
||||
}
|
||||
@ -4,12 +4,11 @@ import (
|
||||
"database/sql"
|
||||
"log"
|
||||
|
||||
// TODO Replace with real db driver
|
||||
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func OpenDb(username string, password string, name string) *sql.DB {
|
||||
db, err := sql.Open("sqlite3", ":memory:")
|
||||
func OpenDb(dataSourceName string) *sql.DB {
|
||||
db, err := sql.Open("postgres", dataSourceName)
|
||||
if err != nil {
|
||||
log.Fatalf("could not connect to database: %v", err)
|
||||
}
|
||||
@ -38,8 +37,20 @@ func createTables(db *sql.DB) error {
|
||||
createdAt INTEGER NOT NULL
|
||||
);`
|
||||
_, err := db.Exec(sqlStmt)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated (
|
||||
id INTEGER PRIMARY KEY,
|
||||
durationInSeconds INTEGER NOT NULL,
|
||||
chatId VARCHAR(255) NOT NULL,
|
||||
value DECIMAL NOT NULL,
|
||||
runAt INTEGER NOT NULL
|
||||
);`
|
||||
|
||||
_, err = db.Exec(sqlStmt)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -10,11 +10,14 @@ func respondWithError(w http.ResponseWriter, code int, message string) error {
|
||||
}
|
||||
|
||||
func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) error {
|
||||
response, _ := json.Marshal(payload)
|
||||
response, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(code)
|
||||
_, err := w.Write(response)
|
||||
_, err = w.Write(response)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -2,12 +2,21 @@ package telemetry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ReceivedMessageAggregated struct {
|
||||
ID int
|
||||
ChatID string
|
||||
DurationInSeconds int64
|
||||
Value float64
|
||||
RunAt int64
|
||||
}
|
||||
|
||||
type ReceivedMessage struct {
|
||||
ID int `json:"id"`
|
||||
ChatId string `json:"chatId"`
|
||||
ChatID string `json:"chatId"`
|
||||
MessageHash string `json:"messageHash"`
|
||||
ReceiverKeyUID string `json:"receiverKeyUID"`
|
||||
SentAt int64 `json:"sentAt"`
|
||||
@ -15,14 +24,42 @@ type ReceivedMessage struct {
|
||||
CreatedAt int64 `json:"createdAt"`
|
||||
}
|
||||
|
||||
func (s *ReceivedMessage) put(db *sql.DB) error {
|
||||
func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*ReceivedMessage, error) {
|
||||
rows, err := db.Query(fmt.Sprintf("SELECT * FROM receivedMessages WHERE sentAt BETWEEN %d and %d", startsAt.Unix(), endsAt.Unix()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var receivedMessages []*ReceivedMessage
|
||||
for rows.Next() {
|
||||
var receivedMessage ReceivedMessage
|
||||
err = rows.Scan(&receivedMessage.ID, &receivedMessage.ChatID, &receivedMessage.MessageHash, &receivedMessage.ReceiverKeyUID, &receivedMessage.SentAt, &receivedMessage.Topic, &receivedMessage.CreatedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
receivedMessages = append(receivedMessages, &receivedMessage)
|
||||
}
|
||||
return receivedMessages, nil
|
||||
}
|
||||
|
||||
func didReceivedMessageAfter(db *sql.DB, receiverPublicKey string, after time.Time) (bool, error) {
|
||||
var count int
|
||||
err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = '%s' AND createdAt > %d", receiverPublicKey, after.Unix())).Scan(&count)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func (r *ReceivedMessage) put(db *sql.DB) error {
|
||||
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, receiverKeyUID, sentAt, topic, createdAt) VALUES (?, ?, ?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.CreatedAt = time.Now().Unix()
|
||||
res, err := stmt.Exec(s.ChatId, s.MessageHash, s.ReceiverKeyUID, s.SentAt, s.Topic, s.CreatedAt)
|
||||
r.CreatedAt = time.Now().Unix()
|
||||
res, err := stmt.Exec(r.ChatID, r.MessageHash, r.ReceiverKeyUID, r.SentAt, r.Topic, r.CreatedAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -31,6 +68,25 @@ func (s *ReceivedMessage) put(db *sql.DB) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.ID = int(id)
|
||||
r.ID = int(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ReceivedMessageAggregated) put(db *sql.DB) error {
|
||||
stmt, err := db.Prepare("INSERT INTO receivedMessageAggregated (chatId, durationInSeconds, value, runAt) VALUES (?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := stmt.Exec(r.ChatID, r.DurationInSeconds, r.Value, r.RunAt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
id, err := res.LastInsertId()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.ID = int(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -43,6 +43,8 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request)
|
||||
defer r.Body.Close()
|
||||
|
||||
if err := receivedMessage.put(s.DB); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
err := respondWithError(w, http.StatusInternalServerError, err.Error())
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user