mirror of
https://github.com/logos-messaging/telemetry.git
synced 2026-01-02 14:13:05 +00:00
feat: protocol stats endpoint
This commit is contained in:
parent
4a1088b220
commit
859822226d
@ -1,6 +1,7 @@
|
||||
// Code generated by go-bindata. DO NOT EDIT.
|
||||
// sources:
|
||||
// 000001_message_type.up.sql (66B)
|
||||
// 000002_bandwidth_protocol.up.sql (657B)
|
||||
// doc.go (73B)
|
||||
|
||||
package telemetry
|
||||
@ -85,11 +86,31 @@ func _000001_message_typeUpSql() (*asset, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "000001_message_type.up.sql", size: 66, mode: os.FileMode(0664), modTime: time.Unix(1674744353, 0)}
|
||||
info := bindataFileInfo{name: "000001_message_type.up.sql", size: 66, mode: os.FileMode(0664), modTime: time.Unix(1675197752, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe2, 0x43, 0xcc, 0xef, 0xad, 0x5f, 0x44, 0x58, 0x8d, 0x47, 0x70, 0x5d, 0x23, 0x30, 0xe2, 0x1f, 0xdb, 0x4d, 0xad, 0x6e, 0xd9, 0xe7, 0x50, 0x19, 0x43, 0x1c, 0x37, 0x57, 0xea, 0xc6, 0x57, 0xab}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var __000002_bandwidth_protocolUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xbc\x90\x4f\x4b\xc4\x30\x10\xc5\xef\xfd\x14\x73\x6c\xa1\x27\x61\x4f\x9e\x62\x77\xd4\xc1\x9a\x2e\x93\xac\xb8\x27\x09\xdb\x80\x85\x35\xd1\x76\xfa\xfd\xa5\x0d\x2a\xfe\xd9\x2a\x1e\xf6\x94\xc3\x7b\xcc\xcb\xef\x57\x31\x2a\x8b\x60\xd5\x45\x8d\x40\x97\xa0\x1b\x0b\x78\x4f\xc6\x1a\x78\xee\xa3\xc4\x7d\x3c\x18\x71\x32\xb0\x13\x0f\x79\x06\x00\xd0\xb5\x60\x90\x49\xd5\xb0\x61\xba\x55\xbc\x83\x1b\xdc\x95\x73\xf4\x18\x07\xa1\x16\xee\x14\x57\xd7\x8a\xf3\xb3\xd5\xaa\x98\x2f\xea\x6d\x5d\xa7\xc6\xdb\x51\xed\x9e\xfc\x52\xaf\x77\xe2\x29\xc0\xba\xd9\x4e\x1f\xdb\x30\x56\x64\xa8\xd1\x3f\xb4\x9a\x51\x7e\xab\xed\x7b\xef\xc4\xb7\x4a\x80\xb4\xc5\x2b\xe4\xaf\x79\x0c\x83\xf4\xae\x0b\xf2\x1d\xfa\x61\x0c\xdd\xcb\xe8\x21\x3d\x79\x22\x2c\x3f\x71\x94\x1f\x03\x45\x56\x9c\x67\xd9\x5f\xa5\xda\x28\xee\x30\x9c\x52\xab\x4c\x8b\x14\x8e\x78\x98\xd3\xc9\xe7\x11\x4d\xef\x1a\xd7\xca\xe2\xb2\xba\x84\xf6\x0f\x79\xaf\x01\x00\x00\xff\xff\xd6\x24\x1d\x9b\x91\x02\x00\x00")
|
||||
|
||||
func _000002_bandwidth_protocolUpSqlBytes() ([]byte, error) {
|
||||
return bindataRead(
|
||||
__000002_bandwidth_protocolUpSql,
|
||||
"000002_bandwidth_protocol.up.sql",
|
||||
)
|
||||
}
|
||||
|
||||
func _000002_bandwidth_protocolUpSql() (*asset, error) {
|
||||
bytes, err := _000002_bandwidth_protocolUpSqlBytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "000002_bandwidth_protocol.up.sql", size: 657, mode: os.FileMode(0664), modTime: time.Unix(1675204316, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x82, 0x33, 0x32, 0x3e, 0xa6, 0x35, 0x56, 0x34, 0xa1, 0xe7, 0x90, 0x7, 0x9b, 0x6a, 0xa1, 0x5d, 0xfe, 0xf, 0xe4, 0x26, 0xec, 0xb6, 0x20, 0x9, 0xa9, 0x61, 0x78, 0x87, 0xfd, 0x14, 0xbf, 0x1c}}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00")
|
||||
|
||||
func docGoBytes() ([]byte, error) {
|
||||
@ -105,7 +126,7 @@ func docGo() (*asset, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := bindataFileInfo{name: "doc.go", size: 73, mode: os.FileMode(0664), modTime: time.Unix(1674743239, 0)}
|
||||
info := bindataFileInfo{name: "doc.go", size: 73, mode: os.FileMode(0664), modTime: time.Unix(1675197752, 0)}
|
||||
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xae, 0x4f, 0xb8, 0x11, 0x84, 0x79, 0xbb, 0x6c, 0xf, 0xed, 0xc, 0xfc, 0x18, 0x32, 0x9d, 0xf1, 0x7, 0x2c, 0x20, 0xde, 0xe9, 0x97, 0x0, 0x62, 0x9f, 0x5e, 0x24, 0xfc, 0x8e, 0xc2, 0xd9, 0x2d}}
|
||||
return a, nil
|
||||
}
|
||||
@ -203,6 +224,8 @@ func AssetNames() []string {
|
||||
var _bindata = map[string]func() (*asset, error){
|
||||
"000001_message_type.up.sql": _000001_message_typeUpSql,
|
||||
|
||||
"000002_bandwidth_protocol.up.sql": _000002_bandwidth_protocolUpSql,
|
||||
|
||||
"doc.go": docGo,
|
||||
}
|
||||
|
||||
@ -247,8 +270,9 @@ type bintree struct {
|
||||
}
|
||||
|
||||
var _bintree = &bintree{nil, map[string]*bintree{
|
||||
"000001_message_type.up.sql": &bintree{_000001_message_typeUpSql, map[string]*bintree{}},
|
||||
"doc.go": &bintree{docGo, map[string]*bintree{}},
|
||||
"000001_message_type.up.sql": &bintree{_000001_message_typeUpSql, map[string]*bintree{}},
|
||||
"000002_bandwidth_protocol.up.sql": &bintree{_000002_bandwidth_protocolUpSql, map[string]*bintree{}},
|
||||
"doc.go": &bintree{docGo, map[string]*bintree{}},
|
||||
}}
|
||||
|
||||
// RestoreAsset restores an asset under the given directory.
|
||||
|
||||
56
telemetry/protocolstats.go
Normal file
56
telemetry/protocolstats.go
Normal file
@ -0,0 +1,56 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Metric struct {
|
||||
TotalIn int64 `json:"totalIn"`
|
||||
TotalOut int64 `json:"totalOut"`
|
||||
RateIn float64 `json:"rateIn"`
|
||||
RateOut float64 `json:"rateOut"`
|
||||
}
|
||||
|
||||
type ProtocolStats struct {
|
||||
HostID string `json:"hostID"`
|
||||
Relay Metric `json:"relay"`
|
||||
Store Metric `json:"store"`
|
||||
}
|
||||
|
||||
func (r *ProtocolStats) insertRate(db *sql.DB, protocolName string, metric Metric) error {
|
||||
stmt, err := db.Prepare("INSERT INTO protocolStatsRate (hostId, protocolName, rateIn, rateOut, createdAt) VALUES ($1, $2, $3, $4, $5);")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = stmt.Exec(r.HostID, protocolName, metric.RateIn, metric.RateOut, time.Now().Unix())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stmt, err = db.Prepare("INSERT INTO protocolStatsTotals (hostId, protocolName, totalIn, totalOut, createdAt) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT protocolStatsTotals_unique DO UPDATE SET totalIn = $3, totalOut = $4;")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = stmt.Exec(r.HostID, protocolName, metric.TotalIn, metric.TotalOut, time.Now().Format("2006-01-02"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ProtocolStats) put(db *sql.DB) error {
|
||||
err := r.insertRate(db, "relay", r.Relay)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.insertRate(db, "store", r.Store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -22,6 +22,7 @@ func NewServer(db *sql.DB) *Server {
|
||||
DB: db,
|
||||
}
|
||||
|
||||
server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
|
||||
server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST")
|
||||
server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET")
|
||||
|
||||
@ -78,6 +79,42 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request)
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Server) createProtocolStats(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
var protocolStats ProtocolStats
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
if err := decoder.Decode(&protocolStats); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
err := respondWithError(w, http.StatusBadRequest, "Invalid request payload")
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if err := protocolStats.put(s.DB); err != nil {
|
||||
err := respondWithError(w, http.StatusInternalServerError, "Could not save protocol stats")
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err := respondWithJSON(w, http.StatusCreated, map[string]string{"error": ""})
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
|
||||
log.Printf(
|
||||
"%s\t%s\t%s",
|
||||
r.Method,
|
||||
r.RequestURI,
|
||||
time.Since(start),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *Server) Start(port int) {
|
||||
log.Printf("Starting server on port %d", port)
|
||||
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router))
|
||||
|
||||
19
telemetry/sql/000002_bandwidth_protocol.up.sql
Normal file
19
telemetry/sql/000002_bandwidth_protocol.up.sql
Normal file
@ -0,0 +1,19 @@
|
||||
CREATE TABLE IF NOT EXISTS protocolStatsRate (
|
||||
id SERIAL PRIMARY KEY,
|
||||
hostId VARCHAR(255) NOT NULL,
|
||||
protocolName VARCHAR(255) NOT NULL,
|
||||
rateIn DOUBLE PRECISION NOT NULL,
|
||||
rateOut DOUBLE PRECISION NOT NULL,
|
||||
createdAt INTEGER NOT NULL,
|
||||
constraint protocolStatsRate_unique unique(hostId, protocolName, createdAt)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS protocolStatsTotals (
|
||||
id SERIAL PRIMARY KEY,
|
||||
hostId VARCHAR(255) NOT NULL,
|
||||
protocolName VARCHAR(255) NOT NULL,
|
||||
totalIn INTEGER NOT NULL,
|
||||
totalOut INTEGER NOT NULL,
|
||||
createdAt DATE,
|
||||
constraint protocolStatsTotals_unique unique(hostId, protocolName, createdAt)
|
||||
);
|
||||
Loading…
x
Reference in New Issue
Block a user