feat: add table and endpoint for tracking waku message sequences

This commit is contained in:
Arseniy Klempner 2024-05-16 18:14:23 -07:00
parent 8fe36168a5
commit 0edd7772e0
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
7 changed files with 171 additions and 2 deletions

View File

@ -14,6 +14,16 @@ Then you can run the server with:
go run cmd/server/main.go -data-source-name postgres://telemetry:newPassword@127.0.0.1:5432/telemetry
```
If trying to run locally you receive the following error:
```
pq: SSL is not enabled on the server
```
Run this command instead:
```
go run cmd/server/main.go -data-source-name "postgres://telemetry:newPassword@127.0.0.1:5432/telemetry?sslmode=disable"
```
Finally, to run the test:
```
make test

2
go.mod
View File

@ -5,6 +5,7 @@ go 1.20
require (
github.com/go-auxiliaries/shrinking-map v0.3.0
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.0
github.com/lib/pq v1.10.3
github.com/robfig/cron/v3 v3.0.1
@ -15,6 +16,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect

4
go.sum
View File

@ -391,6 +391,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
@ -595,6 +597,8 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=

View File

@ -6,6 +6,7 @@
// 000004_envelope.table.up.sql (531B)
// 000005_pushed_envelope.up.sql (574B)
// 000006_status_version.up.sql (198B)
// 000007_waku_push_filter.up.sql (523B)
// doc.go (73B)
package telemetry
@ -169,7 +170,7 @@ func _000005_pushed_envelopeUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1717560336, 0)}
info := bindataFileInfo{name: "000005_pushed_envelope.up.sql", size: 574, mode: os.FileMode(0644), modTime: time.Unix(1719028717, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7d, 0xaf, 0x8a, 0xcb, 0x97, 0x1e, 0xc6, 0xf6, 0x86, 0xe4, 0x1b, 0x67, 0x10, 0x87, 0x8e, 0x80, 0x1d, 0x5a, 0x7d, 0x64, 0xd0, 0x89, 0x3f, 0x1e, 0x6f, 0x93, 0x87, 0x4a, 0xd7, 0x87, 0xb8, 0x5e}}
return a, nil
}
@ -189,11 +190,31 @@ func _000006_status_versionUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000006_status_version.up.sql", size: 198, mode: os.FileMode(0644), modTime: time.Unix(1717560330, 0)}
info := bindataFileInfo{name: "000006_status_version.up.sql", size: 198, mode: os.FileMode(0644), modTime: time.Unix(1719028717, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x2b, 0x11, 0xee, 0x9f, 0x4f, 0xf5, 0x0, 0x9a, 0x98, 0xe9, 0x44, 0x21, 0x2e, 0x57, 0xf7, 0xae, 0xf3, 0xb2, 0x3d, 0x94, 0x40, 0x69, 0xa7, 0x1d, 0x62, 0x57, 0x31, 0x9f, 0x60, 0x6, 0xed, 0x80}}
return a, nil
}
var __000007_waku_push_filterUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x91\xc1\x4a\x03\x31\x10\x86\xcf\xdd\xa7\x98\xe3\x2e\xec\x49\xe8\xc9\x53\x5c\x52\x1b\x5c\xd3\x92\x44\xb1\x27\x49\x37\x03\x0d\x6e\xb3\xdb\x64\x42\x7d\x7c\xa1\x82\x76\xc5\xb6\xa7\x61\xf8\x3f\xf8\x87\x6f\x1a\xc5\x99\xe1\x60\xd8\x43\xcb\x41\x2c\x40\xae\x0c\xf0\x37\xa1\x8d\x86\xa3\xfd\xc8\xeb\x9c\x76\x0b\xdf\x13\x46\x28\x8b\x99\x77\xa0\xb9\x12\xac\x85\xb5\x12\xcf\x4c\x6d\xe0\x89\x6f\xea\x62\x76\xb4\x7d\x8f\xc4\x9c\x8b\x98\x12\xbc\x32\xd5\x2c\x99\x2a\xef\xe6\xf3\xaa\x2e\x66\x23\x62\x14\x4e\x63\x70\x18\x27\xd9\xa9\x4b\xbe\xb4\xed\x0f\xa4\x70\x1c\x22\x5d\xc1\x12\x1e\x32\x86\x0e\x97\x36\xed\x6e\x42\x66\x20\xdb\xdf\xa4\x44\x70\xf8\x79\x91\xea\x86\x40\x18\xc8\x0c\xa3\xef\x2e\x1f\x9f\xb7\x29\x6f\xaf\x33\xe4\xf7\x98\xc8\xee\x47\x10\xd2\xf0\x47\xae\x26\x2d\x11\x2d\xa1\x63\xf4\x4f\x58\x00\x00\x34\x2b\xa9\x8d\x62\x42\x9a\x3f\x5f\x79\xcf\xc1\x1f\x32\xc2\xf7\x28\xcf\x5d\xd7\x30\x95\x5a\xc3\xb9\xbd\xdf\xed\x24\xa0\x2a\xaa\xfb\xaf\x00\x00\x00\xff\xff\x48\xf0\x3d\x30\x0b\x02\x00\x00")
func _000007_waku_push_filterUpSqlBytes() ([]byte, error) {
return bindataRead(
__000007_waku_push_filterUpSql,
"000007_waku_push_filter.up.sql",
)
}
func _000007_waku_push_filterUpSql() (*asset, error) {
bytes, err := _000007_waku_push_filterUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "000007_waku_push_filter.up.sql", size: 523, mode: os.FileMode(0644), modTime: time.Unix(1719271502, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x5d, 0xa, 0x2c, 0x93, 0xa, 0x1f, 0xeb, 0x49, 0x60, 0xe2, 0x8, 0x46, 0xb5, 0x16, 0xa4, 0xa9, 0x7f, 0xec, 0xfb, 0xe1, 0xdc, 0x12, 0x15, 0x17, 0x1, 0x28, 0xa3, 0xca, 0xeb, 0x45, 0x81, 0x31}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -311,6 +332,7 @@ var _bindata = map[string]func() (*asset, error){
"000004_envelope.table.up.sql": _000004_envelopeTableUpSql,
"000005_pushed_envelope.up.sql": _000005_pushed_envelopeUpSql,
"000006_status_version.up.sql": _000006_status_versionUpSql,
"000007_waku_push_filter.up.sql": _000007_waku_push_filterUpSql,
"doc.go": docGo,
}
@ -366,6 +388,7 @@ var _bintree = &bintree{nil, map[string]*bintree{
"000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}},
"000005_pushed_envelope.up.sql": {_000005_pushed_envelopeUpSql, map[string]*bintree{}},
"000006_status_version.up.sql": {_000006_status_versionUpSql, map[string]*bintree{}},
"000007_waku_push_filter.up.sql": {_000007_waku_push_filterUpSql, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
}}

View File

@ -41,6 +41,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server {
server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST")
server.Router.HandleFunc("/waku-metrics", server.createWakuTelemetry).Methods("POST")
server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST")
server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
@ -349,7 +350,69 @@ func (s *Server) rateLimit(next http.Handler) http.Handler {
})
}
type ErrorDetail struct {
Error string `json:"Error"`
}
func (s *Server) createWakuTelemetry(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var telemetryData []WakuTelemetryRequest
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&telemetryData); err != nil {
log.Println(err)
http.Error(w, "Failed to decode telemetry data", http.StatusBadRequest)
return
}
var errorDetails []map[string]ErrorDetail
for _, data := range telemetryData {
switch data.TelemetryType {
case LightPushFilter:
var pushFilter TelemetryPushFilter
if err := json.Unmarshal(*data.TelemetryData, &pushFilter); err != nil {
errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Error decoding lightpush/filter metric: %v", err)}})
continue
}
if err := pushFilter.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Error saving lightpush/filter metric: %v", err)}})
continue
}
default:
errorDetails = append(errorDetails, map[string]ErrorDetail{fmt.Sprintf("%d", data.Id): {Error: fmt.Sprintf("Unknown waku telemetry type: %s", data.TelemetryType)}})
}
}
if len(errorDetails) > 0 {
log.Printf("Errors encountered: %v", errorDetails)
errorDetailsJSON, err := json.Marshal(errorDetails)
if err != nil {
s.logger.Error("failed to marshal error details", zap.Error(err))
http.Error(w, "Failed to process error details", http.StatusInternalServerError)
return
}
err = respondWithError(w, http.StatusInternalServerError, string(errorDetailsJSON))
if err != nil {
s.logger.Error("failed to respond", zap.Error(err))
}
return
}
err := respondWithJSON(w, http.StatusCreated, errorDetails)
if err != nil {
log.Println(err)
}
log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}
func (s *Server) Start(port int) {
s.logger.Info("Starting server", zap.Int("port", port))
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router))
}

View File

@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS wakuPushFilter (
id SERIAL PRIMARY KEY,
walletAddress VARCHAR(255),
peerIdSender VARCHAR(255) NOT NULL,
peerIdReporter VARCHAR(255) NOT NULL,
sequenceHash VARCHAR(255) NOT NULL,
sequenceTotal VARCHAR(255) NOT NULL,
sequenceIndex VARCHAR(255) NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
timestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
CONSTRAINT wakuPushFilter_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceIndex)
);

52
telemetry/waku_metrics.go Normal file
View File

@ -0,0 +1,52 @@
package telemetry
import (
"database/sql"
"encoding/json"
"time"
)
type WakuTelemetryType string
const (
LightPushFilter WakuTelemetryType = "LightPushFilter"
)
type WakuTelemetryRequest struct {
Id int `json:"id"`
TelemetryType WakuTelemetryType `json:"telemetryType"`
TelemetryData *json.RawMessage `json:"telemetryData"`
}
type TelemetryPushFilter struct {
ID int `json:"id"`
WalletAddress string `json:"walletAddress"`
PeerIDSender string `json:"peerIdSender"`
PeerIDReporter string `json:"peerIdReporter"`
SequenceHash string `json:"sequenceHash"`
SequenceTotal uint64 `json:"sequenceTotal"`
SequenceIndex uint64 `json:"sequenceIndex"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
Timestamp int64 `json:"timestamp"`
CreatedAt int64 `json:"createdAt"`
}
func (r *TelemetryPushFilter) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO wakuPushFilter (peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id;")
if err != nil {
return err
}
defer stmt.Close()
r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = lastInsertId
return nil
}