feat: add table and endpoint for tracking waku message sequences

This commit is contained in:
Arseniy Klempner 2024-05-16 18:14:23 -07:00
parent 9097a5bc63
commit 1a43c03319
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
5 changed files with 142 additions and 3 deletions

2
go.mod
View File

@ -4,9 +4,9 @@ go 1.15
require (
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/gorilla/handlers v1.5.2 // indirect
github.com/gorilla/mux v1.8.0
github.com/lib/pq v1.10.3
github.com/mattn/go-sqlite3 v1.14.10
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.7.0
)

5
go.sum
View File

@ -392,6 +392,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
@ -595,6 +597,8 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
@ -792,7 +796,6 @@ github.com/mattn/go-shellwords v1.0.6/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vq
github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.10 h1:MLn+5bFRlWMGoSRmJour3CL1w/qL96mvipqpwQW/Sfk=
github.com/mattn/go-sqlite3 v1.14.10/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=

View File

@ -81,6 +81,28 @@ func createTables(db *sql.DB) error {
return err
}
sqlStmt = `CREATE TABLE IF NOT EXISTS wakuMessages (
id SERIAL PRIMARY KEY,
walletAddress VARCHAR(255),
peerIdSender VARCHAR(255) NOT NULL,
peerIdReporter VARCHAR(255) NOT NULL,
sequenceHash VARCHAR(255) NOT NULL,
sequenceTotal VARCHAR(255) NOT NULL,
sequenceIndex VARCHAR(255) NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
timestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
constraint wakuMessages_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic)
);`
_, err = db.Exec(sqlStmt)
if err != nil {
return err
}
sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated (
id SERIAL PRIMARY KEY,
durationInSeconds INTEGER NOT NULL,

View File

@ -10,6 +10,7 @@ import (
"net/http"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
)
@ -26,6 +27,7 @@ func NewServer(db *sql.DB) *Server {
server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST")
server.Router.HandleFunc("/waku-messages", server.createWakuMessages).Methods("POST")
server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET")
@ -83,6 +85,51 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request)
)
}
func (s *Server) createWakuMessages(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var wakuMessages []WakuMessage
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&wakuMessages); err != nil {
log.Println(err)
err := respondWithError(w, http.StatusBadRequest, "Invalid request payload")
if err != nil {
log.Println(err)
}
return
}
defer r.Body.Close()
var ids []int
for _, wakuMessage := range wakuMessages {
if err := wakuMessage.put(s.DB); err != nil {
log.Println("could not save message", err, wakuMessage)
continue
}
ids = append(ids, wakuMessage.ID)
}
if len(ids) != len(wakuMessages) {
err := respondWithError(w, http.StatusInternalServerError, "Could not save all record")
if err != nil {
log.Println(err)
}
return
}
err := respondWithJSON(w, http.StatusCreated, wakuMessages)
if err != nil {
log.Println(err)
}
log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}
func (s *Server) createReceivedEnvelope(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var receivedEnvelope ReceivedEnvelope
@ -190,6 +237,10 @@ func (s *Server) createProtocolStats(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) Start(port int) {
headersOk := handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"})
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "OPTIONS"})
log.Printf("Starting server on port %d", port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router))
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), handlers.CORS(originsOk, headersOk, methodsOk)(s.Router)))
}

63
telemetry/wakumessage.go Normal file
View File

@ -0,0 +1,63 @@
package telemetry
import (
"database/sql"
"time"
)
type WakuMessage struct {
ID int `json:"id"`
WalletAddress string `json:"walletAddress"`
PeerIDSender string `json:"peerIdSender"`
PeerIDReporter string `json:"peerIdReporter"`
SequenceHash string `json:"sequenceHash"`
SequenceTotal uint64 `json:"sequenceTotal"`
SequenceIndex uint64 `json:"sequenceIndex"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
Timestamp int64 `json:"timestamp"`
CreatedAt int64 `json:"createdAt"`
}
// func queryWakuMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*WakuMessage, error) {
// rows, err := db.Query(fmt.Sprintf("SELECT id, sequenceHash, sequenceNumber, contentTopic, pubsubTopic, createdAt FROM wakuMessages WHERE createdAt BETWEEN %d and %d", startsAt.Unix(), endsAt.Unix()))
// if err != nil {
// return nil, err
// }
// defer rows.Close()
// var wakuMessages []*WakuMessage
// for rows.Next() {
// var wakuMessage WakuMessage
// err = rows.Scan(
// &wakuMessage.ID,
// &wakuMessage.SequenceHash,
// &wakuMessage.SequenceNumber,
// &wakuMessage.ContentTopic,
// &wakuMessage.PubsubTopic,
// &wakuMessage.CreatedAt,
// )
// if err != nil {
// return nil, err
// }
// wakuMessages = append(wakuMessages, &wakuMessage)
// }
// return wakuMessages, nil
// }
func (r *WakuMessage) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO wakuMessages (walletAddress, peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id;")
if err != nil {
return err
}
r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.WalletAddress, r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = lastInsertId
return nil
}