diff --git a/cmd/aggregator/main.go b/cmd/aggregator/main.go index 06ab7d0..d2f2e2e 100644 --- a/cmd/aggregator/main.go +++ b/cmd/aggregator/main.go @@ -1 +1,21 @@ package main + +import ( + "flag" + "time" + + "github.com/status-im/dev-telemetry/telemetry" +) + +func main() { + seconds := flag.Int("seconds", 3600, "Number of seconds to aggregate") + dataSourceName := flag.String("data-source-name", "", "DB URL") + + flag.Parse() + + db := telemetry.OpenDb(*dataSourceName) + defer db.Close() + + aggregator := telemetry.NewAggregator(db) + aggregator.Run(time.Duration(*seconds)) +} diff --git a/cmd/server/main.go b/cmd/server/main.go index abbbe40..322b33c 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -8,17 +8,11 @@ import ( func main() { port := flag.Int("port", 8080, "Port number") - dbUsername := flag.String("db-username", "", "Db username") - dbPassword := flag.String("db-password", "", "Db password") - dbName := flag.String("db-name", "", "Db name") + dataSourceName := flag.String("data-source-name", "", "DB URL") flag.Parse() - db := telemetry.OpenDb( - *dbUsername, - *dbPassword, - *dbName, - ) + db := telemetry.OpenDb(*dataSourceName) defer db.Close() server := telemetry.NewServer(db) diff --git a/go.mod b/go.mod index 93ea346..00420c8 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,7 @@ go 1.15 require ( github.com/gorilla/mux v1.8.0 + github.com/lib/pq v1.10.3 github.com/mattn/go-sqlite3 v1.14.9 + github.com/stretchr/testify v1.7.0 ) diff --git a/go.sum b/go.sum index f973f6e..5386c44 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,17 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= +github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-sqlite3 v1.14.9 h1:10HX2Td0ocZpYEjhilsuo6WWtUqttj2Kb0KtD86/KYA= github.com/mattn/go-sqlite3 v1.14.9/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/telemetry/aggregator.go b/telemetry/aggregator.go new file mode 100644 index 0000000..5932722 --- /dev/null +++ b/telemetry/aggregator.go @@ -0,0 +1,124 @@ +package telemetry + +import ( + "database/sql" + "log" + "time" +) + +type Aggregator struct { + DB *sql.DB +} + +func NewAggregator(db *sql.DB) *Aggregator { + return &Aggregator{ + DB: db, + } +} + +func (a *Aggregator) Run(d time.Duration) { + // Define the duration starts and end. + // Allow a buffer of the duration to define the start and end. + // This is to ensure we wait for people not being connected or if they received messages with delay + runAt := time.Now() + endsAt := runAt.Add(-d) + startsAt := endsAt.Add(-d) + + // Query all received message for a specific duration + receivedMessages, err := queryReceivedMessagesBetween(a.DB, startsAt, endsAt) + if err != nil { + log.Fatalf("could not query received message: %s", err) + } + + // Collect all key uids + receiverKeyUIDs := make(map[string]bool) + for _, receivedMessage := range receivedMessages { + receiverKeyUIDs[receivedMessage.ReceiverKeyUID] = true + } + + // Ensure the specific key uids received a message after the end of the duration + // That way we know that this specific key uid has been connected + for receiverKeyUID := range receiverKeyUIDs { + ok, err := didReceivedMessageAfter(a.DB, receiverKeyUID, endsAt) + if err != nil { + log.Fatalf("could not check key UID: %s, because of %s", receiverKeyUID, err) + } + if !ok { + receiverKeyUIDs[receiverKeyUID] = false + } + } + + // Group the received messages by chat id and key uid + groupedMessages := make(map[string]map[string]int) + for _, receivedMessage := range receivedMessages { + // Skip receiver key uid if it has not been connected + if !receiverKeyUIDs[receivedMessage.ReceiverKeyUID] { + continue + } + + if _, ok := groupedMessages[receivedMessage.ChatID]; !ok { + groupedMessages[receivedMessage.ChatID] = make(map[string]int) + } + groupedMessages[receivedMessage.ChatID][receivedMessage.ReceiverKeyUID] += 1 + } + + if len(groupedMessages) == 0 { + return + } + + // Rch = 1 - count_of_message_missing / total_number_of_messages + + // Calculate the reliability for each channel as: + // Rch = 1 - count_of_message_missing / total_number_of_messages + rChatID := make(map[string]float64) + for chatID, countByKeyUID := range groupedMessages { + messageMissing := 0 + totalMessages := 0 + + max := 0 + for _, count := range countByKeyUID { + if count > max { + max = count + } + } + + for _, count := range countByKeyUID { + totalMessages += count + messageMissing += max - count + } + + rChatID[chatID] = 1 - float64(messageMissing)/float64(totalMessages) + } + + // Store all aggregation + for ChatID, rChatID := range rChatID { + rma := ReceivedMessageAggregated{ + ChatID: ChatID, + DurationInSeconds: int64(d.Seconds()), + Value: rChatID, + RunAt: runAt.Unix(), + } + err := rma.put(a.DB) + if err != nil { + log.Fatalf("could not store received message aggregated: %s", err) + } + } + + // Calculate the global reliability R = (R(0) + R(1)+ .... + R(n)) / len(Rch) + rChatIDTotal := 0.0 + for _, v := range rChatID { + rChatIDTotal += v + } + + r := rChatIDTotal / float64(len(rChatID)) + rma := ReceivedMessageAggregated{ + ChatID: "", + DurationInSeconds: int64(d.Seconds()), + Value: r, + RunAt: runAt.Unix(), + } + err = rma.put(a.DB) + if err != nil { + log.Fatalf("could not store received message aggregated: %s", err) + } +} diff --git a/telemetry/aggregator_test.go b/telemetry/aggregator_test.go new file mode 100644 index 0000000..3a16ca2 --- /dev/null +++ b/telemetry/aggregator_test.go @@ -0,0 +1,149 @@ +package telemetry + +import ( + "database/sql" + "log" + "math" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver + "github.com/stretchr/testify/require" +) + +func NewMock() *sql.DB { + db, err := sql.Open("sqlite3", ":memory:") + if err != nil { + log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + + err = createTables(db) + + if err != nil { + log.Fatalf("an error '%s' was not expected when migrating the db", err) + } + + return db +} + +func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) { + rows, err := db.Query("SELECT * FROM receivedMessageAggregated") + if err != nil { + return nil, err + } + defer rows.Close() + + var receivedMessageAggregateds []*ReceivedMessageAggregated + for rows.Next() { + var receivedMessageAggregated ReceivedMessageAggregated + err = rows.Scan( + &receivedMessageAggregated.ID, + &receivedMessageAggregated.DurationInSeconds, + &receivedMessageAggregated.ChatID, + &receivedMessageAggregated.Value, + &receivedMessageAggregated.RunAt, + ) + if err != nil { + return nil, err + } + receivedMessageAggregateds = append(receivedMessageAggregateds, &receivedMessageAggregated) + } + return receivedMessageAggregateds, nil +} + +func TestRunAggregatorSimple(t *testing.T) { + db := NewMock() + + m := &ReceivedMessage{ + ChatID: "1", + MessageHash: "1", + ReceiverKeyUID: "1", + SentAt: time.Now().Unix(), + Topic: "1", + } + m.put(db) + + oneHourAndHalf := time.Hour + time.Minute*30 + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "2", + ReceiverKeyUID: "1", + SentAt: time.Now().Add(-oneHourAndHalf).Unix(), + Topic: "1", + } + m.put(db) + + agg := NewAggregator(db) + + agg.Run(time.Hour) + + res, err := queryAggregatedMessage(db) + require.NoError(t, err) + require.Len(t, res, 2) + require.Equal(t, "3", res[0].ChatID) + require.Equal(t, 1.0, res[0].Value) + require.Equal(t, "", res[1].ChatID) + require.Equal(t, 1.0, res[1].Value) +} + +func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { + db := NewMock() + + m := &ReceivedMessage{ + ChatID: "1", + MessageHash: "1", + ReceiverKeyUID: "1", + SentAt: time.Now().Unix(), + Topic: "1", + } + m.put(db) + + oneHourAndHalf := time.Hour + time.Minute*30 + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "2", + ReceiverKeyUID: "1", + SentAt: time.Now().Add(-oneHourAndHalf).Unix(), + Topic: "1", + } + m.put(db) + + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "3", + ReceiverKeyUID: "1", + SentAt: time.Now().Add(-oneHourAndHalf).Unix(), + Topic: "1", + } + m.put(db) + + m = &ReceivedMessage{ + ChatID: "1", + MessageHash: "1", + ReceiverKeyUID: "2", + SentAt: time.Now().Unix(), + Topic: "1", + } + m.put(db) + + m = &ReceivedMessage{ + ChatID: "3", + MessageHash: "2", + ReceiverKeyUID: "2", + SentAt: time.Now().Add(-oneHourAndHalf).Unix(), + Topic: "1", + } + m.put(db) + + agg := NewAggregator(db) + + agg.Run(time.Hour) + + res, err := queryAggregatedMessage(db) + require.NoError(t, err) + require.Len(t, res, 2) + require.Equal(t, "3", res[0].ChatID) + require.Equal(t, 0.67, math.Round(res[0].Value*100)/100) + require.Equal(t, "", res[1].ChatID) + require.Equal(t, 0.67, math.Round(res[1].Value*100)/100) +} diff --git a/telemetry/db.go b/telemetry/db.go index dc7bd90..efce152 100644 --- a/telemetry/db.go +++ b/telemetry/db.go @@ -4,12 +4,11 @@ import ( "database/sql" "log" - // TODO Replace with real db driver - _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver + _ "github.com/lib/pq" ) -func OpenDb(username string, password string, name string) *sql.DB { - db, err := sql.Open("sqlite3", ":memory:") +func OpenDb(dataSourceName string) *sql.DB { + db, err := sql.Open("postgres", dataSourceName) if err != nil { log.Fatalf("could not connect to database: %v", err) } @@ -38,8 +37,20 @@ func createTables(db *sql.DB) error { createdAt INTEGER NOT NULL );` _, err := db.Exec(sqlStmt) + if err != nil { return err } - return nil + + sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated ( + id INTEGER PRIMARY KEY, + durationInSeconds INTEGER NOT NULL, + chatId VARCHAR(255) NOT NULL, + value DECIMAL NOT NULL, + runAt INTEGER NOT NULL + );` + + _, err = db.Exec(sqlStmt) + + return err } diff --git a/telemetry/json.go b/telemetry/json.go index 8d35a63..db7c288 100644 --- a/telemetry/json.go +++ b/telemetry/json.go @@ -10,11 +10,14 @@ func respondWithError(w http.ResponseWriter, code int, message string) error { } func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) error { - response, _ := json.Marshal(payload) + response, err := json.Marshal(payload) + if err != nil { + return err + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) - _, err := w.Write(response) + _, err = w.Write(response) return err } diff --git a/telemetry/receivedmessage.go b/telemetry/receivedmessage.go index 76a5bf2..e461644 100644 --- a/telemetry/receivedmessage.go +++ b/telemetry/receivedmessage.go @@ -2,12 +2,21 @@ package telemetry import ( "database/sql" + "fmt" "time" ) +type ReceivedMessageAggregated struct { + ID int + ChatID string + DurationInSeconds int64 + Value float64 + RunAt int64 +} + type ReceivedMessage struct { ID int `json:"id"` - ChatId string `json:"chatId"` + ChatID string `json:"chatId"` MessageHash string `json:"messageHash"` ReceiverKeyUID string `json:"receiverKeyUID"` SentAt int64 `json:"sentAt"` @@ -15,14 +24,42 @@ type ReceivedMessage struct { CreatedAt int64 `json:"createdAt"` } -func (s *ReceivedMessage) put(db *sql.DB) error { +func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*ReceivedMessage, error) { + rows, err := db.Query(fmt.Sprintf("SELECT * FROM receivedMessages WHERE sentAt BETWEEN %d and %d", startsAt.Unix(), endsAt.Unix())) + if err != nil { + return nil, err + } + defer rows.Close() + + var receivedMessages []*ReceivedMessage + for rows.Next() { + var receivedMessage ReceivedMessage + err = rows.Scan(&receivedMessage.ID, &receivedMessage.ChatID, &receivedMessage.MessageHash, &receivedMessage.ReceiverKeyUID, &receivedMessage.SentAt, &receivedMessage.Topic, &receivedMessage.CreatedAt) + if err != nil { + return nil, err + } + receivedMessages = append(receivedMessages, &receivedMessage) + } + return receivedMessages, nil +} + +func didReceivedMessageAfter(db *sql.DB, receiverPublicKey string, after time.Time) (bool, error) { + var count int + err := db.QueryRow(fmt.Sprintf("SELECT COUNT(*) FROM receivedMessages WHERE receiverKeyUID = '%s' AND createdAt > %d", receiverPublicKey, after.Unix())).Scan(&count) + if err != nil { + return false, err + } + return count > 0, nil +} + +func (r *ReceivedMessage) put(db *sql.DB) error { stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, receiverKeyUID, sentAt, topic, createdAt) VALUES (?, ?, ?, ?, ?, ?)") if err != nil { return err } - s.CreatedAt = time.Now().Unix() - res, err := stmt.Exec(s.ChatId, s.MessageHash, s.ReceiverKeyUID, s.SentAt, s.Topic, s.CreatedAt) + r.CreatedAt = time.Now().Unix() + res, err := stmt.Exec(r.ChatID, r.MessageHash, r.ReceiverKeyUID, r.SentAt, r.Topic, r.CreatedAt) if err != nil { return err } @@ -31,6 +68,25 @@ func (s *ReceivedMessage) put(db *sql.DB) error { return err } - s.ID = int(id) + r.ID = int(id) + return nil +} + +func (r *ReceivedMessageAggregated) put(db *sql.DB) error { + stmt, err := db.Prepare("INSERT INTO receivedMessageAggregated (chatId, durationInSeconds, value, runAt) VALUES (?, ?, ?, ?)") + if err != nil { + return err + } + + res, err := stmt.Exec(r.ChatID, r.DurationInSeconds, r.Value, r.RunAt) + if err != nil { + return err + } + id, err := res.LastInsertId() + if err != nil { + return err + } + + r.ID = int(id) return nil } diff --git a/telemetry/server.go b/telemetry/server.go index 22501cf..e3a2e5f 100644 --- a/telemetry/server.go +++ b/telemetry/server.go @@ -43,6 +43,8 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) defer r.Body.Close() if err := receivedMessage.put(s.DB); err != nil { + log.Println(err) + err := respondWithError(w, http.StatusInternalServerError, err.Error()) if err != nil { log.Println(err)