mirror of
https://github.com/logos-messaging/telemetry.git
synced 2026-01-05 23:53:12 +00:00
feat: accept batch (#4)
This commit is contained in:
parent
f0eca9dad5
commit
c92bd95a8b
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/status-im/dev-telemetry/telemetry"
|
"github.com/status-im/dev-telemetry/telemetry"
|
||||||
@ -20,9 +21,14 @@ func main() {
|
|||||||
|
|
||||||
aggregator := telemetry.NewAggregator(db)
|
aggregator := telemetry.NewAggregator(db)
|
||||||
c := cron.New()
|
c := cron.New()
|
||||||
c.AddFunc("0 * * * *", func() {
|
_, err := c.AddFunc("0 * * * *", func() {
|
||||||
aggregator.Run(time.Hour)
|
aggregator.Run(time.Hour)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error adding cron job: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
c.Start()
|
c.Start()
|
||||||
defer c.Stop()
|
defer c.Stop()
|
||||||
|
|
||||||
|
|||||||
@ -35,9 +35,9 @@ func handleHealthCheck(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
var receivedMessage ReceivedMessage
|
var receivedMessages []ReceivedMessage
|
||||||
decoder := json.NewDecoder(r.Body)
|
decoder := json.NewDecoder(r.Body)
|
||||||
if err := decoder.Decode(&receivedMessage); err != nil {
|
if err := decoder.Decode(&receivedMessages); err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
|
|
||||||
err := respondWithError(w, http.StatusBadRequest, "Invalid request payload")
|
err := respondWithError(w, http.StatusBadRequest, "Invalid request payload")
|
||||||
@ -48,16 +48,24 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
defer r.Body.Close()
|
defer r.Body.Close()
|
||||||
|
|
||||||
|
var ids []int
|
||||||
|
for _, receivedMessage := range receivedMessages {
|
||||||
if err := receivedMessage.put(s.DB); err != nil {
|
if err := receivedMessage.put(s.DB); err != nil {
|
||||||
log.Println(err)
|
log.Println("could not save message", err, receivedMessage)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ids = append(ids, receivedMessage.ID)
|
||||||
|
}
|
||||||
|
|
||||||
err := respondWithError(w, http.StatusInternalServerError, err.Error())
|
if len(ids) != len(receivedMessages) {
|
||||||
|
err := respondWithError(w, http.StatusInternalServerError, "Could not save all record")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := respondWithJSON(w, http.StatusCreated, receivedMessage)
|
|
||||||
|
err := respondWithJSON(w, http.StatusCreated, receivedMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(err)
|
log.Println(err)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user