Merge pull request #12 from status-im/feat/receive-envelopes

feat: Add received envelopes to database
This commit is contained in:
Michał Iskierko 2023-11-07 18:16:33 +01:00 committed by GitHub
commit 9097a5bc63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 248 additions and 26 deletions

View File

@ -41,6 +41,11 @@ func dropTables(db *sql.DB) {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}
_, err = db.Exec("DROP TABLE IF EXISTS receivedEnvelopes")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}
db.Close()
}

View File

@ -3,6 +3,7 @@
// 000001_message_type.up.sql (66B)
// 000002_bandwidth_protocol.up.sql (719B)
// 000003_index_truncate.up.sql (598B)
// 000004_envelope.table.up.sql (531B)
// doc.go (73B)
package telemetry
@ -13,7 +14,6 @@ import (
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
@ -23,7 +23,7 @@ import (
func bindataRead(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
return nil, fmt.Errorf("read %q: %w", name, err)
}
var buf bytes.Buffer
@ -31,7 +31,7 @@ func bindataRead(data []byte, name string) ([]byte, error) {
clErr := gz.Close()
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
return nil, fmt.Errorf("read %q: %w", name, err)
}
if clErr != nil {
return nil, err
@ -87,7 +87,7 @@ func _000001_message_typeUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000001_message_type.up.sql", size: 66, mode: os.FileMode(0664), modTime: time.Unix(1675197752, 0)}
info := bindataFileInfo{name: "000001_message_type.up.sql", size: 66, mode: os.FileMode(0644), modTime: time.Unix(1697552711, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe2, 0x43, 0xcc, 0xef, 0xad, 0x5f, 0x44, 0x58, 0x8d, 0x47, 0x70, 0x5d, 0x23, 0x30, 0xe2, 0x1f, 0xdb, 0x4d, 0xad, 0x6e, 0xd9, 0xe7, 0x50, 0x19, 0x43, 0x1c, 0x37, 0x57, 0xea, 0xc6, 0x57, 0xab}}
return a, nil
}
@ -107,7 +107,7 @@ func _000002_bandwidth_protocolUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000002_bandwidth_protocol.up.sql", size: 719, mode: os.FileMode(0664), modTime: time.Unix(1675687684, 0)}
info := bindataFileInfo{name: "000002_bandwidth_protocol.up.sql", size: 719, mode: os.FileMode(0644), modTime: time.Unix(1697552711, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xfe, 0x83, 0x69, 0xab, 0x3e, 0xf5, 0x8d, 0x44, 0xb2, 0x6e, 0x52, 0x8d, 0x27, 0xe8, 0x95, 0x28, 0x3c, 0xea, 0x29, 0x93, 0x6d, 0xa3, 0x10, 0xde, 0x9b, 0xc8, 0xa6, 0xb9, 0x80, 0xa1, 0x3, 0x6f}}
return a, nil
}
@ -127,11 +127,31 @@ func _000003_index_truncateUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000003_index_truncate.up.sql", size: 598, mode: os.FileMode(0664), modTime: time.Unix(1675687705, 0)}
info := bindataFileInfo{name: "000003_index_truncate.up.sql", size: 598, mode: os.FileMode(0644), modTime: time.Unix(1698325445, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xcf, 0x8, 0x4, 0x47, 0xc8, 0x65, 0x38, 0x79, 0x3e, 0x37, 0xec, 0x4e, 0x1a, 0x24, 0x50, 0x3c, 0x1c, 0x75, 0xe8, 0x3b, 0x2, 0x62, 0x2, 0x52, 0x50, 0xff, 0x4a, 0x8f, 0x9d, 0x71, 0x79, 0xf6}}
return a, nil
}
var __000004_envelopeTableUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x91\xb1\x6e\x83\x30\x10\x86\x77\x9e\xe2\x46\x90\x98\x2a\x65\xca\x74\x85\x6b\x63\xc5\x98\xca\x98\xaa\x99\x2a\x02\xa7\x14\xa9\xb1\xa9\x0d\x91\xfa\xf6\x95\x42\x5a\x55\xa9\x42\x27\x0f\xff\xe7\xdf\xe7\xef\x32\x4d\x68\x08\x0c\xde\x4b\x02\xf1\x00\xaa\x34\x40\x2f\xa2\x32\x15\x78\x6e\xb9\x3f\x71\x47\xf6\xc4\xef\x6e\xe0\x00\x71\x04\x00\xd0\x77\x50\x91\x16\x28\xe1\x49\x8b\x02\xf5\x0e\xb6\xb4\x4b\xcf\xd1\x91\x43\x68\x0e\xbc\x69\xc2\x1b\x3c\xa3\xce\x36\xa8\xe3\xbb\xd5\x2a\x39\xd7\xaa\x5a\xca\x19\x0b\x6c\x47\x1c\x41\x28\x43\x8f\xa4\xaf\xc2\xd6\x73\x33\x72\x77\x33\x1f\xdd\xd0\xb7\x4b\xed\xc3\xb4\xaf\xa6\xbd\xf9\x0f\xbb\x7c\xcf\x6f\xf9\xb3\x16\xf9\x12\x69\x5d\xc7\xaa\x39\xf2\xe2\xa3\xde\xb5\x1c\x42\x6f\x0f\xe4\xbd\xf3\x4b\x68\x56\xaa\xca\x68\x14\xca\xfc\x55\xfc\x3a\xd9\xfe\x63\x62\x98\x8f\x78\x16\x95\xfe\xf6\x9a\x5e\x0d\x9e\xfe\x8c\x97\x44\xc9\x3a\x8a\x50\x1a\xd2\x97\x7d\x7e\xd7\x17\xf3\xf5\x00\x98\xe7\x90\x95\xb2\x2e\xd4\x4d\x4b\xeb\xe8\x2b\x00\x00\xff\xff\x9d\x3f\xc2\xc6\x13\x02\x00\x00")
func _000004_envelopeTableUpSqlBytes() ([]byte, error) {
return bindataRead(
__000004_envelopeTableUpSql,
"000004_envelope.table.up.sql",
)
}
func _000004_envelopeTableUpSql() (*asset, error) {
bytes, err := _000004_envelopeTableUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "000004_envelope.table.up.sql", size: 531, mode: os.FileMode(0644), modTime: time.Unix(1698655313, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x32, 0xee, 0x49, 0xa0, 0x48, 0x2b, 0x8b, 0xe8, 0xd3, 0x6a, 0xae, 0x7f, 0x62, 0x65, 0x8a, 0x45, 0xbb, 0x8a, 0xee, 0xcd, 0x13, 0xde, 0xd6, 0x33, 0xe2, 0x3f, 0x32, 0xff, 0xfe, 0xf4, 0xda, 0xe7}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\x31\x12\x84\x20\x0c\x05\xd0\x9e\x53\xfc\x0b\x90\xf4\x7b\x9b\xac\xfe\xc9\x38\x20\x41\x4c\xe3\xed\x6d\xac\xdf\xb4\xad\x99\x13\xf7\xd5\x4b\x51\xf5\xf8\x39\x07\x97\x25\xe1\x51\xff\xc7\xd8\x2d\x0d\x75\x36\x47\xb2\xf3\x64\xae\x07\x35\x20\xa2\x1f\x8a\x07\x44\xcb\x1b\x00\x00\xff\xff\xb6\x03\x50\xe0\x49\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -147,7 +167,7 @@ func docGo() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 73, mode: os.FileMode(0664), modTime: time.Unix(1675197752, 0)}
info := bindataFileInfo{name: "doc.go", size: 73, mode: os.FileMode(0644), modTime: time.Unix(1697552711, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xae, 0x4f, 0xb8, 0x11, 0x84, 0x79, 0xbb, 0x6c, 0xf, 0xed, 0xc, 0xfc, 0x18, 0x32, 0x9d, 0xf1, 0x7, 0x2c, 0x20, 0xde, 0xe9, 0x97, 0x0, 0x62, 0x9f, 0x5e, 0x24, 0xfc, 0x8e, 0xc2, 0xd9, 0x2d}}
return a, nil
}
@ -243,24 +263,27 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"000001_message_type.up.sql": _000001_message_typeUpSql,
"000001_message_type.up.sql": _000001_message_typeUpSql,
"000002_bandwidth_protocol.up.sql": _000002_bandwidth_protocolUpSql,
"000003_index_truncate.up.sql": _000003_index_truncateUpSql,
"doc.go": docGo,
"000003_index_truncate.up.sql": _000003_index_truncateUpSql,
"000004_envelope.table.up.sql": _000004_envelopeTableUpSql,
"doc.go": docGo,
}
// AssetDebug is true if the assets were built with the debug flag enabled.
const AssetDebug = false
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
//
// data/
// foo.txt
// img/
// a.png
// b.png
//
// then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
@ -293,10 +316,11 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"000001_message_type.up.sql": &bintree{_000001_message_typeUpSql, map[string]*bintree{}},
"000002_bandwidth_protocol.up.sql": &bintree{_000002_bandwidth_protocolUpSql, map[string]*bintree{}},
"000003_index_truncate.up.sql": &bintree{_000003_index_truncateUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
"000001_message_type.up.sql": {_000001_message_typeUpSql, map[string]*bintree{}},
"000002_bandwidth_protocol.up.sql": {_000002_bandwidth_protocolUpSql, map[string]*bintree{}},
"000003_index_truncate.up.sql": {_000003_index_truncateUpSql, map[string]*bintree{}},
"000004_envelope.table.up.sql": {_000004_envelopeTableUpSql, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.
@ -313,7 +337,7 @@ func RestoreAsset(dir, name string) error {
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
err = os.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}

View File

@ -0,0 +1,46 @@
package telemetry
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestEnvelopesUpdate(t *testing.T) {
db := NewMock()
defer dropTables(db)
firstEnvelope := &ReceivedEnvelope{
MessageHash: "1",
ReceiverKeyUID: "1",
NodeName: "status",
SentAt: time.Now().Unix(),
Topic: "1",
PubsubTopic: "1",
}
err := firstEnvelope.put(db)
require.NoError(t, err)
envelopeToUpdate := &ReceivedEnvelope{
MessageHash: "1",
ReceiverKeyUID: "1",
NodeName: "status",
SentAt: time.Now().Unix(),
Topic: "1",
PubsubTopic: "1",
ProcessingError: "MyError",
}
err = envelopeToUpdate.updateProcessingError(db)
require.NoError(t, err)
rows, err := db.Query("SELECT processingerror FROM receivedEnvelopes WHERE messagehash = '1';")
require.NoError(t, err)
defer rows.Close()
rows.Next()
var procError = ""
err = rows.Scan(&procError)
require.NoError(t, err)
require.Equal(t, "MyError", procError)
}

View File

@ -0,0 +1,62 @@
package telemetry
import (
"database/sql"
"errors"
"time"
)
type ReceivedEnvelope struct {
ID int `json:"id"`
MessageHash string `json:"messageHash"`
SentAt int64 `json:"sentAt"`
CreatedAt int64 `json:"createdAt"`
PubsubTopic string `json:"pubsubTopic"`
Topic string `json:"topic"`
ReceiverKeyUID string `json:"receiverKeyUID"`
NodeName string `json:"nodeName"`
ProcessingError string `json:"processingError"`
}
func (r *ReceivedEnvelope) put(db *sql.DB) error {
r.CreatedAt = time.Now().Unix()
stmt, err := db.Prepare(`INSERT INTO receivedEnvelopes (messageHash, sentAt, createdAt, pubsubTopic,
topic, receiverKeyUID, nodeName, processingError)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT ON CONSTRAINT receivedEnvelopes_unique DO NOTHING
RETURNING id;`)
if err != nil {
return err
}
lastInsertId := 0
err = stmt.QueryRow(r.MessageHash, r.SentAt, r.CreatedAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName, r.ProcessingError).Scan(&lastInsertId)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
} else {
return err
}
}
r.ID = lastInsertId
return nil
}
func (r *ReceivedEnvelope) updateProcessingError(db *sql.DB) error {
r.CreatedAt = time.Now().Unix()
stmt, err := db.Prepare(`UPDATE receivedEnvelopes SET processingError=$1 WHERE
messageHash = $2 AND sentAt = $3 AND
pubsubTopic = $4 AND topic = $5 AND
receiverKeyUID = $6 AND nodeName = $7;`)
if err != nil {
return err
}
_, err = stmt.Exec(r.ProcessingError, r.MessageHash, r.SentAt, r.PubsubTopic, r.Topic, r.ReceiverKeyUID, r.NodeName)
if err != nil {
return err
}
return nil
}

View File

@ -25,11 +25,12 @@ type ReceivedMessage struct {
NodeName string `json:"nodeName"`
SentAt int64 `json:"sentAt"`
Topic string `json:"topic"`
PubsubTopic string `json:"pubsubTopic"`
CreatedAt int64 `json:"createdAt"`
}
func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Time) ([]*ReceivedMessage, error) {
rows, err := db.Query(fmt.Sprintf("SELECT id, chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt FROM receivedMessages WHERE sentAt BETWEEN %d and %d", startsAt.Unix(), endsAt.Unix()))
rows, err := db.Query(fmt.Sprintf("SELECT id, chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic FROM receivedMessages WHERE sentAt BETWEEN %d and %d", startsAt.Unix(), endsAt.Unix()))
if err != nil {
return nil, err
}
@ -50,6 +51,7 @@ func queryReceivedMessagesBetween(db *sql.DB, startsAt time.Time, endsAt time.Ti
&receivedMessage.MessageType,
&receivedMessage.MessageSize,
&receivedMessage.CreatedAt,
&receivedMessage.PubsubTopic,
)
if err != nil {
return nil, err
@ -86,14 +88,14 @@ func didReceivedMessageBeforeAndAfterInChat(db *sql.DB, receiverPublicKey string
}
func (r *ReceivedMessage) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) RETURNING id;")
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, messageId, receiverKeyUID, nodeName, sentAt, topic, messageType, messageSize, createdAt, pubSubTopic) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) RETURNING id;")
if err != nil {
return err
}
r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt).Scan(&lastInsertId)
err = stmt.QueryRow(r.ChatID, r.MessageHash, r.MessageID, r.ReceiverKeyUID, r.NodeName, r.SentAt, r.Topic, r.MessageType, r.MessageSize, r.CreatedAt, r.PubsubTopic).Scan(&lastInsertId)
if err != nil {
return err
}

View File

@ -26,6 +26,8 @@ func NewServer(db *sql.DB) *Server {
server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST")
server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
server.Router.HandleFunc("/health", handleHealthCheck).Methods("GET")
return server
@ -81,6 +83,73 @@ func (s *Server) createReceivedMessages(w http.ResponseWriter, r *http.Request)
)
}
func (s *Server) createReceivedEnvelope(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var receivedEnvelope ReceivedEnvelope
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&receivedEnvelope); err != nil {
log.Println(err)
err := respondWithError(w, http.StatusBadRequest, "Invalid request payload")
if err != nil {
log.Println(err)
}
return
}
defer r.Body.Close()
err := receivedEnvelope.put(s.DB)
if err != nil {
log.Println("could not save envelope", err, receivedEnvelope)
}
err = respondWithJSON(w, http.StatusCreated, receivedEnvelope)
if err != nil {
log.Println(err)
}
log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}
func (s *Server) updateEnvelope(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var receivedEnvelope ReceivedEnvelope
decoder := json.NewDecoder(r.Body)
log.Println("Update envelope")
if err := decoder.Decode(&receivedEnvelope); err != nil {
log.Println(err)
err := respondWithError(w, http.StatusBadRequest, "Invalid request payload")
if err != nil {
log.Println(err)
}
return
}
defer r.Body.Close()
err := receivedEnvelope.updateProcessingError(s.DB)
if err != nil {
log.Println("could not update envelope", err, receivedEnvelope)
}
err = respondWithJSON(w, http.StatusCreated, receivedEnvelope)
if err != nil {
log.Println(err)
}
log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}
func (s *Server) createProtocolStats(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var protocolStats ProtocolStats

View File

@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS receivedEnvelopes (
id SERIAL PRIMARY KEY,
messageHash VARCHAR(255) NOT NULL,
sentAt INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
topic VARCHAR(255) NOT NULL,
pubSubTopic VARCHAR(255) NOT NULL,
receiverKeyUID VARCHAR(255) NOT NULL,
nodeName VARCHAR(255) NOT NULL,
processingError VARCHAR(255) NOT NULL,
CONSTRAINT receivedEnvelopes_unique unique(sentAt, messageHash, receiverKeyUID, nodeName)
);
ALTER TABLE receivedMessages ADD COLUMN pubSubTopic VARCHAR(255);