feat: swith db to postgres

This commit is contained in:
Anthony Laibe 2021-11-08 10:33:13 +01:00
parent 8dc19b6299
commit 3917099be2
4 changed files with 59 additions and 25 deletions

View File

@ -1 +1,18 @@
# telemetry # telemetry
## Dev setup
You need to setup a postgres db as such:
1) Create a telemetry user with password newPassword
2) Create a db telemetry
3) Create a db telemetry_test
Then you can run the server with:
```
go run cmd/server/main.go -data-source-name postgres://telemetry:newPassword@127.0.0.1:5432/telemetry
```
Finally, to run the test:
```
make test
```

View File

@ -12,7 +12,7 @@ import (
) )
func NewMock() *sql.DB { func NewMock() *sql.DB {
db, err := sql.Open("sqlite3", ":memory:") db, err := sql.Open("postgres", "postgres://telemetry:newPassword@127.0.0.1:5432/telemetry_test")
if err != nil { if err != nil {
log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) log.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
} }
@ -26,6 +26,20 @@ func NewMock() *sql.DB {
return db return db
} }
func dropTables(db *sql.DB) {
_, err := db.Exec("DROP TABLE IF EXISTS receivedMessages")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}
_, err = db.Exec("DROP TABLE IF EXISTS receivedMessageAggregated")
if err != nil {
log.Fatalf("an error '%s' was not expected when dropping the table", err)
}
db.Close()
}
func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) { func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) {
rows, err := db.Query("SELECT * FROM receivedMessageAggregated") rows, err := db.Query("SELECT * FROM receivedMessageAggregated")
if err != nil { if err != nil {
@ -53,6 +67,7 @@ func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) {
func TestRunAggregatorSimple(t *testing.T) { func TestRunAggregatorSimple(t *testing.T) {
db := NewMock() db := NewMock()
defer dropTables(db)
m := &ReceivedMessage{ m := &ReceivedMessage{
ChatID: "1", ChatID: "1",
@ -61,7 +76,8 @@ func TestRunAggregatorSimple(t *testing.T) {
SentAt: time.Now().Unix(), SentAt: time.Now().Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err := m.put(db)
require.NoError(t, err)
oneHourAndHalf := time.Hour + time.Minute*30 oneHourAndHalf := time.Hour + time.Minute*30
m = &ReceivedMessage{ m = &ReceivedMessage{
@ -71,7 +87,8 @@ func TestRunAggregatorSimple(t *testing.T) {
SentAt: time.Now().Add(-oneHourAndHalf).Unix(), SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err = m.put(db)
require.NoError(t, err)
agg := NewAggregator(db) agg := NewAggregator(db)
@ -88,6 +105,7 @@ func TestRunAggregatorSimple(t *testing.T) {
func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
db := NewMock() db := NewMock()
defer dropTables(db)
m := &ReceivedMessage{ m := &ReceivedMessage{
ChatID: "1", ChatID: "1",
@ -96,7 +114,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
SentAt: time.Now().Unix(), SentAt: time.Now().Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err := m.put(db)
require.NoError(t, err)
oneHourAndHalf := time.Hour + time.Minute*30 oneHourAndHalf := time.Hour + time.Minute*30
m = &ReceivedMessage{ m = &ReceivedMessage{
@ -106,7 +125,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
SentAt: time.Now().Add(-oneHourAndHalf).Unix(), SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err = m.put(db)
require.NoError(t, err)
m = &ReceivedMessage{ m = &ReceivedMessage{
ChatID: "3", ChatID: "3",
@ -115,7 +135,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
SentAt: time.Now().Add(-oneHourAndHalf).Unix(), SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err = m.put(db)
require.NoError(t, err)
m = &ReceivedMessage{ m = &ReceivedMessage{
ChatID: "1", ChatID: "1",
@ -124,7 +145,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
SentAt: time.Now().Unix(), SentAt: time.Now().Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err = m.put(db)
require.NoError(t, err)
m = &ReceivedMessage{ m = &ReceivedMessage{
ChatID: "3", ChatID: "3",
@ -133,7 +155,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) {
SentAt: time.Now().Add(-oneHourAndHalf).Unix(), SentAt: time.Now().Add(-oneHourAndHalf).Unix(),
Topic: "1", Topic: "1",
} }
m.put(db) err = m.put(db)
require.NoError(t, err)
agg := NewAggregator(db) agg := NewAggregator(db)

View File

@ -28,7 +28,7 @@ func OpenDb(dataSourceName string) *sql.DB {
func createTables(db *sql.DB) error { func createTables(db *sql.DB) error {
sqlStmt := `CREATE TABLE IF NOT EXISTS receivedMessages ( sqlStmt := `CREATE TABLE IF NOT EXISTS receivedMessages (
id INTEGER PRIMARY KEY, id SERIAL PRIMARY KEY,
chatId VARCHAR(255) NOT NULL, chatId VARCHAR(255) NOT NULL,
messageHash VARCHAR(255) NOT NULL, messageHash VARCHAR(255) NOT NULL,
receiverKeyUID VARCHAR(255) NOT NULL, receiverKeyUID VARCHAR(255) NOT NULL,
@ -43,7 +43,7 @@ func createTables(db *sql.DB) error {
} }
sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated ( sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated (
id INTEGER PRIMARY KEY, id SERIAL PRIMARY KEY,
durationInSeconds INTEGER NOT NULL, durationInSeconds INTEGER NOT NULL,
chatId VARCHAR(255) NOT NULL, chatId VARCHAR(255) NOT NULL,
value DECIMAL NOT NULL, value DECIMAL NOT NULL,

View File

@ -53,40 +53,34 @@ func didReceivedMessageAfter(db *sql.DB, receiverPublicKey string, after time.Ti
} }
func (r *ReceivedMessage) put(db *sql.DB) error { func (r *ReceivedMessage) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, receiverKeyUID, sentAt, topic, createdAt) VALUES (?, ?, ?, ?, ?, ?)") stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, receiverKeyUID, sentAt, topic, createdAt) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;")
if err != nil { if err != nil {
return err return err
} }
r.CreatedAt = time.Now().Unix() r.CreatedAt = time.Now().Unix()
res, err := stmt.Exec(r.ChatID, r.MessageHash, r.ReceiverKeyUID, r.SentAt, r.Topic, r.CreatedAt) lastInsertId := 0
if err != nil { err = stmt.QueryRow(r.ChatID, r.MessageHash, r.ReceiverKeyUID, r.SentAt, r.Topic, r.CreatedAt).Scan(&lastInsertId)
return err
}
id, err := res.LastInsertId()
if err != nil { if err != nil {
return err return err
} }
r.ID = lastInsertId
r.ID = int(id)
return nil return nil
} }
func (r *ReceivedMessageAggregated) put(db *sql.DB) error { func (r *ReceivedMessageAggregated) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO receivedMessageAggregated (chatId, durationInSeconds, value, runAt) VALUES (?, ?, ?, ?)") stmt, err := db.Prepare("INSERT INTO receivedMessageAggregated (chatId, durationInSeconds, value, runAt) VALUES ($1, $2, $3, $4) RETURNING id;")
if err != nil { if err != nil {
return err return err
} }
res, err := stmt.Exec(r.ChatID, r.DurationInSeconds, r.Value, r.RunAt) lastInsertId := 0
if err != nil { err = stmt.QueryRow(r.ChatID, r.DurationInSeconds, r.Value, r.RunAt).Scan(&lastInsertId)
return err
}
id, err := res.LastInsertId()
if err != nil { if err != nil {
return err return err
} }
r.ID = lastInsertId
r.ID = int(id)
return nil return nil
} }