feat: add endpoint for batching different metrics in single request

This commit adds a new endpoint which expects the body to be a
JSON array of arbitrary metrics, allowing a single request to
include more than one metric of one or more types. Also adds an
optional status version field to tables.
This commit is contained in:
Arseniy Klempner 2024-05-23 22:50:25 -07:00
parent 6efd105d10
commit a19ec01621
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
7 changed files with 133 additions and 23 deletions

View File

@ -42,6 +42,11 @@ func dropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}
_, err = db.Exec("DROP TABLE IF EXISTS sentEnvelopes")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}
_, err = db.Exec("DROP TABLE IF EXISTS protocolStatsRate")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)

View File

@ -5,6 +5,7 @@
// 000003_index_truncate.up.sql (598B)
// 000004_envelope.table.up.sql (531B)
// 000005_pushed_envelope.up.sql (574B)
// 000006_status_version.up.sql (198B)
// doc.go (73B)
package telemetry
@ -168,11 +169,31 @@ func _000005_pushed_envelopeUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1717559658, 0)}
info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1717560336, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7d, 0xaf, 0x8a, 0xcb, 0x97, 0x1e, 0xc6, 0xf6, 0x86, 0xe4, 0x1b, 0x67, 0x10, 0x87, 0x8e, 0x80, 0x1d, 0x5a, 0x7d, 0x64, 0xd0, 0x89, 0x3f, 0x1e, 0x6f, 0x93, 0x87, 0x4a, 0xd7, 0x87, 0xb8, 0x5e}}
return a, nil
}
var __000006_status_versionUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\x28\x4a\x4d\x4e\xcd\x2c\x4b\x4d\xf1\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x56\x70\x74\x71\x51\x70\xf6\xf7\x09\xf5\xf5\x53\x28\x2e\x49\x2c\x29\x2d\x0e\x4b\x2d\x2a\xce\xcc\xcf\x53\x08\x73\x0c\x72\xf6\x70\x0c\xd2\x30\x36\xd4\xb4\xe6\xc2\x66\x84\x6b\x5e\x59\x6a\x4e\x7e\x01\x59\x66\x14\xa7\xe6\x95\x90\xa8\x1f\x10\x00\x00\xff\xff\xeb\x4e\x39\x66\xc6\x00\x00\x00")
func _000006_status_versionUpSqlBytes() ([]byte, error) {
return bindataRead(
__000006_status_versionUpSql,
"000006_status_version.up.sql",
)
}
func _000006_status_versionUpSql() (*asset, error) {
bytes, err := _000006_status_versionUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "000006_status_version.up.sql", size: 198, mode: os.FileMode(0644), modTime: time.Unix(1717560330, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2b, 0x11, 0xee, 0x9f, 0x4f, 0xf5, 0x0, 0x9a, 0x98, 0xe9, 0x44, 0x21, 0x2e, 0x57, 0xf7, 0xae, 0xf3, 0xb2, 0x3d, 0x94, 0x40, 0x69, 0xa7, 0x1d, 0x62, 0x57, 0x31, 0x9f, 0x60, 0x6, 0xed, 0x80}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -289,6 +310,7 @@ var _bindata = map[string]func() (*asset, error){
"000003_index_truncate.up.sql": _000003_index_truncateUpSql,
"000004_envelope.table.up.sql": _000004_envelopeTableUpSql,
"000005_pushed_envelope.up.sql": _000005_pushed_envelopeUpSql,
"000006_status_version.up.sql": _000006_status_versionUpSql,
"doc.go": docGo,
}
@ -343,6 +365,7 @@ var _bintree = &bintree{nil, map[string]*bintree{
"000003_index_truncate.up.sql": {_000003_index_truncateUpSql, map[string]*bintree{}},
"000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}},
"000005_pushed_envelope.up.sql": {_000005_pushed_envelopeUpSql, map[string]*bintree{}},
"000006_status_version.up.sql": {_000006_status_versionUpSql, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
}}

View File

@ -72,9 +72,7 @@ func createTables(db *sql.DB) error {
sentAt INTEGER NOT NULL,
topic VARCHAR(255) NOT NULL,
createdAt INTEGER NOT NULL,
constraint receivedMessages_unique unique(chatId, messageHash, receiverKeyUID, nodeName)
);`
_, err := db.Exec(sqlStmt)

View File

@ -16,13 +16,14 @@ type ReceivedEnvelope struct {
ReceiverKeyUID string `json:"receiverKeyUID"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
StatusVersion string `json:"statusVersion"`
}
func (r *ReceivedEnvelope) put(db *sql.DB) error {
r.CreatedAt = time.Now().Unix()
stmt, err := db.Prepare(`INSERT INTO receivedEnvelopes (messageHash, sentAt, createdAt, pubsubTopic,
topic, receiverKeyUID, nodeName, processingError)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
topic, receiverKeyUID, nodeName, processingError, statusVersion)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT ON CONSTRAINT receivedEnvelopes_unique DO NOTHING
RETURNING id;`)
if err != nil {
@ -30,7 +31,7 @@ func (r *ReceivedEnvelope) put(db *sql.DB) error {
}
lastInsertId := 0
err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError).Scan(&lastInsertId)
err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError, r.StatusVersion).Scan(&lastInsertId)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
@ -72,13 +73,14 @@ type SentEnvelope struct {
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
PublishMethod string `json:"publishMethod"`
StatusVersion string `json:"statusVersion"`
}
func (r *SentEnvelope) put(db *sql.DB) error {
r.CreatedAt = time.Now().Unix()
stmt, err := db.Prepare(`INSERT INTO sentEnvelopes (messageHash, sentAt, createdAt, pubsubTopic,
topic, senderKeyUID, nodeName, publishMethod)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
topic, senderKeyUID, nodeName, publishMethod, statusVersion)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT ON CONSTRAINT sentEnvelopes_unique DO NOTHING
RETURNING id;`)
if err != nil {
@ -86,16 +88,16 @@ func (r *SentEnvelope) put(db *sql.DB) error {
}
lastInsertId := int64(0)
res, err := stmt.Exec(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.SenderKeyUID, r.NodeName, r.PublishMethod)
lastInsertId, _ = res.LastInsertId()
res, err := stmt.Exec(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.SenderKeyUID, r.NodeName, r.PublishMethod, r.StatusVersion)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
} else {
return err
}
return err
}
lastInsertId, err = res.LastInsertId()
if err != nil {
return err
}
defer stmt.Close()
r.ID = int(lastInsertId)

View File

@ -27,6 +27,7 @@ type ReceivedMessage struct {
Topic string `json:"topic"`
PubsubTopic string `json:"pubsubTopic"`
CreatedAt int64 `json:"createdAt"`
StatusVersion string `json:"statusVersion"`
}
func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*ReceivedMessage, error) {
@ -88,14 +89,14 @@ func didReceivedMessageBeforeAndAfterInChat(db *sql.DB, receiverPublicKey string
}
func (r *ReceivedMessage) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id;")
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic, statusVersion) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING id;")
if err != nil {
return err
}
r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic).Scan(&lastInsertId)
err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic, r.StatusVersion).Scan(&lastInsertId)
if err != nil {
return err
}

View File

@ -45,6 +45,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server {
server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET")
server.Router.HandleFunc("/record-metrics", server.createTelemetryData).Methods("POST")
server.Router.Use(server.rateLimit)
return server
@ -55,6 +56,88 @@ func handleHealthCheck(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK")
}
type TelemetryType string
const (
ProtocolStatsMetric TelemetryType = "ProtocolStats"
ReceivedEnvelopeMetric TelemetryType = "ReceivedEnvelope"
SentEnvelopeMetric TelemetryType = "SentEnvelope"
UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope"
ReceivedMessagesMetric TelemetryType = "ReceivedMessages"
)
type TelemetryRequest struct {
Id int `json:"id"`
TelemetryType TelemetryType `json:"telemetry_type"`
TelemetryData *json.RawMessage `json:"telemetry_data"`
}
func (s *Server) createTelemetryData(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var telemetryData []TelemetryRequest
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&telemetryData); err != nil {
log.Println(err)
http.Error(w, "Failed to decode telemetry data", http.StatusBadRequest)
return
}
var errorDetails []map[string]interface{}
for _, data := range telemetryData {
switch data.TelemetryType {
case ProtocolStatsMetric:
var stats ProtocolStats
if err := json.Unmarshal(*data.TelemetryData, &stats); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding protocol stats: %v", err)})
continue
}
if err := stats.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving protocol stats: %v", err)})
continue
}
case ReceivedEnvelopeMetric:
var envelope ReceivedEnvelope
if err := json.Unmarshal(*data.TelemetryData, &envelope); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding received envelope: %v", err)})
continue
}
if err := envelope.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving received envelope: %v", err)})
continue
}
case SentEnvelopeMetric:
var envelope SentEnvelope
if err := json.Unmarshal(*data.TelemetryData, &envelope); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding sent envelope: %v", err)})
continue
}
if err := envelope.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving sent envelope: %v", err)})
continue
}
default:
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Unknown telemetry type: %s", data.TelemetryType)})
}
}
if len(errorDetails) > 0 {
log.Printf("Errors encountered: %v", errorDetails)
}
err := respondWithJSON(w, http.StatusCreated, errorDetails)
if err != nil {
log.Println(err)
}
log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}
func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var receivedMessages []ReceivedMessage
@ -198,11 +281,6 @@ func (s *Server) createSentEnvelope(w http.ResponseWriter, r *http.Request) {
err := sentEnvelope.put(s.DB)
if err != nil {
log.Println("could not save envelope", err, sentEnvelope)
err := respondWithError(w, http.StatusBadRequest, "could not save envelope")
if err != nil {
log.Println(err)
}
return
}
err = respondWithJSON(w, http.StatusCreated, sentEnvelope)

View File

@ -0,0 +1,3 @@
ALTER TABLE receivedMessages ADD COLUMN statusVersion VARCHAR(31);
ALTER TABLE receivedEnvelopes ADD COLUMN statusVersion VARCHAR(31);
ALTER TABLE sentEnvelopes ADD COLUMN statusVersion VARCHAR(31);