From 3917099be296c247650fed8caa052c3388b05b7c Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Mon, 8 Nov 2021 10:33:13 +0100 Subject: [PATCH] feat: swith db to postgres --- README.md | 19 +++++++++++++++++- telemetry/aggregator_test.go | 39 ++++++++++++++++++++++++++++-------- telemetry/db.go | 4 ++-- telemetry/receivedmessage.go | 22 ++++++++------------ 4 files changed, 59 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 23dd52b..04aacc5 100644 --- a/README.md +++ b/README.md @@ -1 +1,18 @@ -# telemetry \ No newline at end of file +# telemetry + +## Dev setup + +You need to setup a postgres db as such: +1) Create a telemetry user with password newPassword +2) Create a db telemetry +3) Create a db telemetry_test + +Then you can run the server with: +``` +go run cmd/server/main.go -data-source-name postgres://telemetry:newPassword@127.0.0.1:5432/telemetry +``` + +Finally, to run the test: +``` +make test +``` diff --git a/telemetry/aggregator_test.go b/telemetry/aggregator_test.go index 3a16ca2..245f233 100644 --- a/telemetry/aggregator_test.go +++ b/telemetry/aggregator_test.go @@ -12,7 +12,7 @@ import ( ) func NewMock() *sql.DB { - db, err := sql.Open("sqlite3", ":memory:") + db, err := sql.Open("postgres", "postgres://telemetry:newPassword@127.0.0.1:5432/telemetry_test") if err != nil { log.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } @@ -26,6 +26,20 @@ func NewMock() *sql.DB { return db } +func dropTables(db *sql.DB) { + _, err := db.Exec("DROP TABLE IF EXISTS receivedMessages") + if err != nil { + log.Fatalf("an error '%s' was not expected when dropping the table", err) + } + + _, err = db.Exec("DROP TABLE IF EXISTS receivedMessageAggregated") + if err != nil { + log.Fatalf("an error '%s' was not expected when dropping the table", err) + } + + db.Close() +} + func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) { rows, err := db.Query("SELECT * FROM receivedMessageAggregated") if err != nil { @@ -53,6 +67,7 @@ func queryAggregatedMessage(db *sql.DB) ([]*ReceivedMessageAggregated, error) { func TestRunAggregatorSimple(t *testing.T) { db := NewMock() + defer dropTables(db) m := &ReceivedMessage{ ChatID: "1", @@ -61,7 +76,8 @@ func TestRunAggregatorSimple(t *testing.T) { SentAt: time.Now().Unix(), Topic: "1", } - m.put(db) + err := m.put(db) + require.NoError(t, err) oneHourAndHalf := time.Hour + time.Minute*30 m = &ReceivedMessage{ @@ -71,7 +87,8 @@ func TestRunAggregatorSimple(t *testing.T) { SentAt: time.Now().Add(-oneHourAndHalf).Unix(), Topic: "1", } - m.put(db) + err = m.put(db) + require.NoError(t, err) agg := NewAggregator(db) @@ -88,6 +105,7 @@ func TestRunAggregatorSimple(t *testing.T) { func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { db := NewMock() + defer dropTables(db) m := &ReceivedMessage{ ChatID: "1", @@ -96,7 +114,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { SentAt: time.Now().Unix(), Topic: "1", } - m.put(db) + err := m.put(db) + require.NoError(t, err) oneHourAndHalf := time.Hour + time.Minute*30 m = &ReceivedMessage{ @@ -106,7 +125,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { SentAt: time.Now().Add(-oneHourAndHalf).Unix(), Topic: "1", } - m.put(db) + err = m.put(db) + require.NoError(t, err) m = &ReceivedMessage{ ChatID: "3", @@ -115,7 +135,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { SentAt: time.Now().Add(-oneHourAndHalf).Unix(), Topic: "1", } - m.put(db) + err = m.put(db) + require.NoError(t, err) m = &ReceivedMessage{ ChatID: "1", @@ -124,7 +145,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { SentAt: time.Now().Unix(), Topic: "1", } - m.put(db) + err = m.put(db) + require.NoError(t, err) m = &ReceivedMessage{ ChatID: "3", @@ -133,7 +155,8 @@ func TestRunAggregatorSimpleWithMessageMissing(t *testing.T) { SentAt: time.Now().Add(-oneHourAndHalf).Unix(), Topic: "1", } - m.put(db) + err = m.put(db) + require.NoError(t, err) agg := NewAggregator(db) diff --git a/telemetry/db.go b/telemetry/db.go index efce152..1f02c30 100644 --- a/telemetry/db.go +++ b/telemetry/db.go @@ -28,7 +28,7 @@ func OpenDb(dataSourceName string) *sql.DB { func createTables(db *sql.DB) error { sqlStmt := `CREATE TABLE IF NOT EXISTS receivedMessages ( - id INTEGER PRIMARY KEY, + id SERIAL PRIMARY KEY, chatId VARCHAR(255) NOT NULL, messageHash VARCHAR(255) NOT NULL, receiverKeyUID VARCHAR(255) NOT NULL, @@ -43,7 +43,7 @@ func createTables(db *sql.DB) error { } sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated ( - id INTEGER PRIMARY KEY, + id SERIAL PRIMARY KEY, durationInSeconds INTEGER NOT NULL, chatId VARCHAR(255) NOT NULL, value DECIMAL NOT NULL, diff --git a/telemetry/receivedmessage.go b/telemetry/receivedmessage.go index e461644..c3cf0e0 100644 --- a/telemetry/receivedmessage.go +++ b/telemetry/receivedmessage.go @@ -53,40 +53,34 @@ func didReceivedMessageAfter(db *sql.DB, receiverPublicKey string, after time.Ti } func (r *ReceivedMessage) put(db *sql.DB) error { - stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, receiverKeyUID, sentAt, topic, createdAt) VALUES (?, ?, ?, ?, ?, ?)") + stmt, err := db.Prepare("INSERT INTO receivedMessages (chatId, messageHash, receiverKeyUID, sentAt, topic, createdAt) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id;") if err != nil { return err } r.CreatedAt = time.Now().Unix() - res, err := stmt.Exec(r.ChatID, r.MessageHash, r.ReceiverKeyUID, r.SentAt, r.Topic, r.CreatedAt) - if err != nil { - return err - } - id, err := res.LastInsertId() + lastInsertId := 0 + err = stmt.QueryRow(r.ChatID, r.MessageHash, r.ReceiverKeyUID, r.SentAt, r.Topic, r.CreatedAt).Scan(&lastInsertId) if err != nil { return err } + r.ID = lastInsertId - r.ID = int(id) return nil } func (r *ReceivedMessageAggregated) put(db *sql.DB) error { - stmt, err := db.Prepare("INSERT INTO receivedMessageAggregated (chatId, durationInSeconds, value, runAt) VALUES (?, ?, ?, ?)") + stmt, err := db.Prepare("INSERT INTO receivedMessageAggregated (chatId, durationInSeconds, value, runAt) VALUES ($1, $2, $3, $4) RETURNING id;") if err != nil { return err } - res, err := stmt.Exec(r.ChatID, r.DurationInSeconds, r.Value, r.RunAt) - if err != nil { - return err - } - id, err := res.LastInsertId() + lastInsertId := 0 + err = stmt.QueryRow(r.ChatID, r.DurationInSeconds, r.Value, r.RunAt).Scan(&lastInsertId) if err != nil { return err } + r.ID = lastInsertId - r.ID = int(id) return nil }