128 lines
2.9 KiB
Go

package telemetry
import (
"database/sql"
"log"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/postgres"
bindata "github.com/golang-migrate/migrate/v4/source/go_bindata"
_ "github.com/lib/pq"
)
// Migrate applies migrations.
func Migrate(db *sql.DB, driver database.Driver) error {
return migrateDB(db, bindata.Resource(
AssetNames(),
Asset,
), driver)
}
// Migrate database using provided resources.
func migrateDB(db *sql.DB, resources *bindata.AssetSource, driver database.Driver) error {
source, err := bindata.WithInstance(resources)
if err != nil {
return err
}
m, err := migrate.NewWithInstance(
"go-bindata",
source,
"telemetrydb",
driver)
if err != nil {
return err
}
if err = m.Up(); err != migrate.ErrNoChange {
return err
}
return nil
}
func OpenDb(dataSourceName string) *sql.DB {
db, err := sql.Open("postgres", dataSourceName)
if err != nil {
log.Fatalf("could not connect to database: %v", err)
}
if err := db.Ping(); err != nil {
log.Fatalf("unable to reach database: %v", err)
}
log.Println("Connected to database")
if err := createTables(db); err != nil {
log.Fatalf("unable to create the table: %v", err)
}
log.Println("DB initialized")
return db
}
func createTables(db *sql.DB) error {
sqlStmt := `CREATE TABLE IF NOT EXISTS receivedMessages (
id SERIAL PRIMARY KEY,
chatId VARCHAR(255) NOT NULL,
messageHash VARCHAR(255) NOT NULL,
messageId VARCHAR(255) NOT NULL,
receiverKeyUID VARCHAR(255) NOT NULL,
nodeName VARCHAR(255) NOT NULL,
sentAt INTEGER NOT NULL,
topic VARCHAR(255) NOT NULL,
createdAt INTEGER NOT NULL,
constraint receivedMessages_unique unique(chatId, messageHash, receiverKeyUID, nodeName)
);`
_, err := db.Exec(sqlStmt)
if err != nil {
return err
}
sqlStmt = `CREATE TABLE IF NOT EXISTS wakuMessages (
id SERIAL PRIMARY KEY,
walletAddress VARCHAR(255),
peerIdSender VARCHAR(255) NOT NULL,
peerIdReporter VARCHAR(255) NOT NULL,
sequenceHash VARCHAR(255) NOT NULL,
sequenceTotal VARCHAR(255) NOT NULL,
sequenceIndex VARCHAR(255) NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
timestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,
constraint wakuMessages_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic)
);`
_, err = db.Exec(sqlStmt)
if err != nil {
return err
}
sqlStmt = `CREATE TABLE IF NOT EXISTS receivedMessageAggregated (
id SERIAL PRIMARY KEY,
durationInSeconds INTEGER NOT NULL,
chatId VARCHAR(255) NOT NULL,
value DECIMAL NOT NULL,
runAt INTEGER NOT NULL
);`
_, err = db.Exec(sqlStmt)
if err != nil {
return err
}
dbDriver, err := postgres.WithInstance(db, &postgres.Config{
MigrationsTable: postgres.DefaultMigrationsTable,
})
if err != nil {
return err
}
return Migrate(db, dbDriver)
}