diff --git a/go.mod b/go.mod index 111a8fa..505faf9 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,9 @@ go 1.15 require ( github.com/golang-migrate/migrate/v4 v4.15.2 + github.com/gorilla/handlers v1.5.2 // indirect github.com/gorilla/mux v1.8.0 github.com/lib/pq v1.10.3 - github.com/mattn/go-sqlite3 v1.14.10 github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.7.0 ) diff --git a/go.sum b/go.sum index 3a988e4..6329fa9 100644 --- a/go.sum +++ b/go.sum @@ -392,6 +392,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -595,6 +597,8 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= +github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= +github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= @@ -792,7 +796,6 @@ github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vq github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= -github.com/mattn/go-sqlite3 v1.14.10 h1:MLn+5bFRlWMGoSRmJour3CL1w/qL96mvipqpwQW/Sfk= github.com/mattn/go-sqlite3 v1.14.10/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= diff --git a/telemetry/db.go b/telemetry/db.go index 5281d87..4767b98 100644 --- a/telemetry/db.go +++ b/telemetry/db.go @@ -81,6 +81,28 @@ func createTables(db *sql.DB) error { return err } + sqlStmt = `CREATE TABLE IF NOT EXISTS wakuMessages ( + id SERIAL PRIMARY KEY, + walletAddress VARCHAR(255), + peerIdSender VARCHAR(255) NOT NULL, + peerIdReporter VARCHAR(255) NOT NULL, + sequenceHash VARCHAR(255) NOT NULL, + sequenceTotal VARCHAR(255) NOT NULL, + sequenceIndex VARCHAR(255) NOT NULL, + contentTopic VARCHAR(255) NOT NULL, + pubsubTopic VARCHAR(255) NOT NULL, + timestamp INTEGER NOT NULL, + createdAt INTEGER NOT NULL, + + constraint wakuMessages_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic) + + );` + _, err = db.Exec(sqlStmt) + + if err != nil { + return err + } + sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated ( id SERIAL PRIMARY KEY, durationInSeconds INTEGER NOT NULL, diff --git a/telemetry/server.go b/telemetry/server.go index 6bb7e09..73e1f54 100644 --- a/telemetry/server.go +++ b/telemetry/server.go @@ -10,6 +10,7 @@ import ( "net/http" "time" + "github.com/gorilla/handlers" "github.com/gorilla/mux" ) @@ -26,6 +27,7 @@ func NewServer(db *sql.DB) *Server { server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST") server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST") + server.Router.HandleFunc("/waku-messages", server.createWakuMessages).Methods("POST") server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST") server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST") server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET") @@ -83,6 +85,51 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) ) } +func (s *Server) createWakuMessages(w http.ResponseWriter, r *http.Request) { + start := time.Now() + var wakuMessages []WakuMessage + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(&wakuMessages); err != nil { + log.Println(err) + + err := respondWithError(w, http.StatusBadRequest, "Invalid request payload") + if err != nil { + log.Println(err) + } + return + } + defer r.Body.Close() + + var ids []int + for _, wakuMessage := range wakuMessages { + if err := wakuMessage.put(s.DB); err != nil { + log.Println("could not save message", err, wakuMessage) + continue + } + ids = append(ids, wakuMessage.ID) + } + + if len(ids) != len(wakuMessages) { + err := respondWithError(w, http.StatusInternalServerError, "Could not save all record") + if err != nil { + log.Println(err) + } + return + } + + err := respondWithJSON(w, http.StatusCreated, wakuMessages) + if err != nil { + log.Println(err) + } + + log.Printf( + "%s\t%s\t%s", + r.Method, + r.RequestURI, + time.Since(start), + ) +} + func (s *Server) createReceivedEnvelope(w http.ResponseWriter, r *http.Request) { start := time.Now() var receivedEnvelope ReceivedEnvelope @@ -190,6 +237,10 @@ func (s *Server) createProtocolStats(w http.ResponseWriter, r *http.Request) { } func (s *Server) Start(port int) { + headersOk := handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}) + originsOk := handlers.AllowedOrigins([]string{"*"}) + methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "OPTIONS"}) + log.Printf("Starting server on port %d", port) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router)) + log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), handlers.CORS(originsOk, headersOk, methodsOk)(s.Router))) } diff --git a/telemetry/wakumessage.go b/telemetry/wakumessage.go new file mode 100644 index 0000000..fbbbc39 --- /dev/null +++ b/telemetry/wakumessage.go @@ -0,0 +1,63 @@ +package telemetry + +import ( + "database/sql" + "time" +) + +type WakuMessage struct { + ID int `json:"id"` + WalletAddress string `json:"walletAddress"` + PeerIDSender string `json:"peerIdSender"` + PeerIDReporter string `json:"peerIdReporter"` + SequenceHash string `json:"sequenceHash"` + SequenceTotal uint64 `json:"sequenceTotal"` + SequenceIndex uint64 `json:"sequenceIndex"` + ContentTopic string `json:"contentTopic"` + PubsubTopic string `json:"pubsubTopic"` + Timestamp int64 `json:"timestamp"` + CreatedAt int64 `json:"createdAt"` +} + +// func queryWakuMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*WakuMessage, error) { +// rows, err := db.Query(fmt.Sprintf("SELECT id, sequenceHash, sequenceNumber, contentTopic, pubsubTopic, createdAt FROM wakuMessages WHERE createdAt BETWEEN %d and %d", startsAt.Unix(), endsAt.Unix())) +// if err != nil { +// return nil, err +// } +// defer rows.Close() + +// var wakuMessages []*WakuMessage +// for rows.Next() { +// var wakuMessage WakuMessage +// err = rows.Scan( +// &wakuMessage.ID, +// &wakuMessage.SequenceHash, +// &wakuMessage.SequenceNumber, +// &wakuMessage.ContentTopic, +// &wakuMessage.PubsubTopic, +// &wakuMessage.CreatedAt, +// ) +// if err != nil { +// return nil, err +// } +// wakuMessages = append(wakuMessages, &wakuMessage) +// } +// return wakuMessages, nil +// } + +func (r *WakuMessage) put(db *sql.DB) error { + stmt, err := db.Prepare("INSERT INTO wakuMessages (walletAddress, peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id;") + if err != nil { + return err + } + + r.CreatedAt = time.Now().Unix() + lastInsertId := 0 + err = stmt.QueryRow(r.WalletAddress, r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId) + if err != nil { + return err + } + r.ID = lastInsertId + + return nil +}