diff --git a/telemetry/bindata.go b/telemetry/bindata.go index 9e85efc..854e676 100644 --- a/telemetry/bindata.go +++ b/telemetry/bindata.go @@ -1,6 +1,7 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: // 000001_message_type.up.sql (66B) +// 000002_bandwidth_protocol.up.sql (657B) // doc.go (73B) package telemetry @@ -85,11 +86,31 @@ func _000001_message_typeUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "000001_message_type.up.sql", size: 66, mode: os.FileMode(0664), modTime: time.Unix(1674744353, 0)} + info := bindataFileInfo{name: "000001_message_type.up.sql", size: 66, mode: os.FileMode(0664), modTime: time.Unix(1675197752, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe2, 0x43, 0xcc, 0xef, 0xad, 0x5f, 0x44, 0x58, 0x8d, 0x47, 0x70, 0x5d, 0x23, 0x30, 0xe2, 0x1f, 0xdb, 0x4d, 0xad, 0x6e, 0xd9, 0xe7, 0x50, 0x19, 0x43, 0x1c, 0x37, 0x57, 0xea, 0xc6, 0x57, 0xab}} return a, nil } +var __000002_bandwidth_protocolUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x90\x4f\x4b\xc4\x30\x10\xc5\xef\xfd\x14\x73\x6c\xa1\x27\x61\x4f\x9e\x62\x77\xd4\xc1\x9a\x2e\x93\xac\xb8\x27\x09\xdb\x80\x85\x35\xd1\x76\xfa\xfd\xa5\x0d\x2a\xfe\xd9\x2a\x1e\xf6\x94\xc3\x7b\xcc\xcb\xef\x57\x31\x2a\x8b\x60\xd5\x45\x8d\x40\x97\xa0\x1b\x0b\x78\x4f\xc6\x1a\x78\xee\xa3\xc4\x7d\x3c\x18\x71\x32\xb0\x13\x0f\x79\x06\x00\xd0\xb5\x60\x90\x49\xd5\xb0\x61\xba\x55\xbc\x83\x1b\xdc\x95\x73\xf4\x18\x07\xa1\x16\xee\x14\x57\xd7\x8a\xf3\xb3\xd5\xaa\x98\x2f\xea\x6d\x5d\xa7\xc6\xdb\x51\xed\x9e\xfc\x52\xaf\x77\xe2\x29\xc0\xba\xd9\x4e\x1f\xdb\x30\x56\x64\xa8\xd1\x3f\xb4\x9a\x51\x7e\xab\xed\x7b\xef\xc4\xb7\x4a\x80\xb4\xc5\x2b\xe4\xaf\x79\x0c\x83\xf4\xae\x0b\xf2\x1d\xfa\x61\x0c\xdd\xcb\xe8\x21\x3d\x79\x22\x2c\x3f\x71\x94\x1f\x03\x45\x56\x9c\x67\xd9\x5f\xa5\xda\x28\xee\x30\x9c\x52\xab\x4c\x8b\x14\x8e\x78\x98\xd3\xc9\xe7\x11\x4d\xef\x1a\xd7\xca\xe2\xb2\xba\x84\xf6\x0f\x79\xaf\x01\x00\x00\xff\xff\xd6\x24\x1d\x9b\x91\x02\x00\x00") + +func _000002_bandwidth_protocolUpSqlBytes() ([]byte, error) { + return bindataRead( + __000002_bandwidth_protocolUpSql, + "000002_bandwidth_protocol.up.sql", + ) +} + +func _000002_bandwidth_protocolUpSql() (*asset, error) { + bytes, err := _000002_bandwidth_protocolUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "000002_bandwidth_protocol.up.sql", size: 657, mode: os.FileMode(0664), modTime: time.Unix(1675204316, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x82, 0x33, 0x32, 0x3e, 0xa6, 0x35, 0x56, 0x34, 0xa1, 0xe7, 0x90, 0x7, 0x9b, 0x6a, 0xa1, 0x5d, 0xfe, 0xf, 0xe4, 0x26, 0xec, 0xb6, 0x20, 0x9, 0xa9, 0x61, 0x78, 0x87, 0xfd, 0x14, 0xbf, 0x1c}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -105,7 +126,7 @@ func docGo() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "doc.go", size: 73, mode: os.FileMode(0664), modTime: time.Unix(1674743239, 0)} + info := bindataFileInfo{name: "doc.go", size: 73, mode: os.FileMode(0664), modTime: time.Unix(1675197752, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xae, 0x4f, 0xb8, 0x11, 0x84, 0x79, 0xbb, 0x6c, 0xf, 0xed, 0xc, 0xfc, 0x18, 0x32, 0x9d, 0xf1, 0x7, 0x2c, 0x20, 0xde, 0xe9, 0x97, 0x0, 0x62, 0x9f, 0x5e, 0x24, 0xfc, 0x8e, 0xc2, 0xd9, 0x2d}} return a, nil } @@ -203,6 +224,8 @@ func AssetNames() []string { var _bindata = map[string]func() (*asset, error){ "000001_message_type.up.sql": _000001_message_typeUpSql, + "000002_bandwidth_protocol.up.sql": _000002_bandwidth_protocolUpSql, + "doc.go": docGo, } @@ -247,8 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "000001_message_type.up.sql": &bintree{_000001_message_typeUpSql, map[string]*bintree{}}, - "doc.go": &bintree{docGo, map[string]*bintree{}}, + "000001_message_type.up.sql": &bintree{_000001_message_typeUpSql, map[string]*bintree{}}, + "000002_bandwidth_protocol.up.sql": &bintree{_000002_bandwidth_protocolUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. diff --git a/telemetry/protocolstats.go b/telemetry/protocolstats.go new file mode 100644 index 0000000..52bc217 --- /dev/null +++ b/telemetry/protocolstats.go @@ -0,0 +1,56 @@ +package telemetry + +import ( + "database/sql" + "time" +) + +type Metric struct { + TotalIn int64 `json:"totalIn"` + TotalOut int64 `json:"totalOut"` + RateIn float64 `json:"rateIn"` + RateOut float64 `json:"rateOut"` +} + +type ProtocolStats struct { + HostID string `json:"hostID"` + Relay Metric `json:"relay"` + Store Metric `json:"store"` +} + +func (r *ProtocolStats) insertRate(db *sql.DB, protocolName string, metric Metric) error { + stmt, err := db.Prepare("INSERT INTO protocolStatsRate (hostId, protocolName, rateIn, rateOut, createdAt) VALUES ($1, $2, $3, $4, $5);") + if err != nil { + return err + } + _, err = stmt.Exec(r.HostID, protocolName, metric.RateIn, metric.RateOut, time.Now().Unix()) + if err != nil { + return err + } + + stmt, err = db.Prepare("INSERT INTO protocolStatsTotals (hostId, protocolName, totalIn, totalOut, createdAt) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT protocolStatsTotals_unique DO UPDATE SET totalIn = $3, totalOut = $4;") + if err != nil { + return err + } + + _, err = stmt.Exec(r.HostID, protocolName, metric.TotalIn, metric.TotalOut, time.Now().Format("2006-01-02")) + if err != nil { + return err + } + + return nil +} + +func (r *ProtocolStats) put(db *sql.DB) error { + err := r.insertRate(db, "relay", r.Relay) + if err != nil { + return err + } + + err = r.insertRate(db, "store", r.Store) + if err != nil { + return err + } + + return nil +} diff --git a/telemetry/server.go b/telemetry/server.go index 35dfcf5..1dbb0e0 100644 --- a/telemetry/server.go +++ b/telemetry/server.go @@ -22,6 +22,7 @@ func NewServer(db *sql.DB) *Server { DB: db, } + server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST") server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST") server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET") @@ -78,6 +79,42 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) ) } +func (s *Server) createProtocolStats(w http.ResponseWriter, r *http.Request) { + start := time.Now() + var protocolStats ProtocolStats + decoder := json.NewDecoder(r.Body) + if err := decoder.Decode(&protocolStats); err != nil { + log.Println(err) + + err := respondWithError(w, http.StatusBadRequest, "Invalid request payload") + if err != nil { + log.Println(err) + } + return + } + defer r.Body.Close() + + if err := protocolStats.put(s.DB); err != nil { + err := respondWithError(w, http.StatusInternalServerError, "Could not save protocol stats") + if err != nil { + log.Println(err) + } + return + } + + err := respondWithJSON(w, http.StatusCreated, map[string]string{"error": ""}) + if err != nil { + log.Println(err) + } + + log.Printf( + "%s\t%s\t%s", + r.Method, + r.RequestURI, + time.Since(start), + ) +} + func (s *Server) Start(port int) { log.Printf("Starting server on port %d", port) log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router)) diff --git a/telemetry/sql/000002_bandwidth_protocol.up.sql b/telemetry/sql/000002_bandwidth_protocol.up.sql new file mode 100644 index 0000000..5ea291c --- /dev/null +++ b/telemetry/sql/000002_bandwidth_protocol.up.sql @@ -0,0 +1,19 @@ +CREATE TABLE IF NOT EXISTS protocolStatsRate ( + id SERIAL PRIMARY KEY, + hostId VARCHAR(255) NOT NULL, + protocolName VARCHAR(255) NOT NULL, + rateIn DOUBLE PRECISION NOT NULL, + rateOut DOUBLE PRECISION NOT NULL, + createdAt INTEGER NOT NULL, + constraint protocolStatsRate_unique unique(hostId, protocolName, createdAt) +); + +CREATE TABLE IF NOT EXISTS protocolStatsTotals ( + id SERIAL PRIMARY KEY, + hostId VARCHAR(255) NOT NULL, + protocolName VARCHAR(255) NOT NULL, + totalIn INTEGER NOT NULL, + totalOut INTEGER NOT NULL, + createdAt DATE, + constraint protocolStatsTotals_unique unique(hostId, protocolName, createdAt) +);