status-go/mailserver/mailserver_db_postgres.go

313 lines
7.1 KiB
Go
Raw Permalink Normal View History

package mailserver
import (
"database/sql"
"errors"
"fmt"
"time"
"github.com/lib/pq"
"go.uber.org/zap"
// Import postgres driver
_ "github.com/lib/pq"
2019-06-09 09:24:20 +02:00
"github.com/status-im/migrate/v4"
"github.com/status-im/migrate/v4/database/postgres"
2019-10-28 14:50:33 +01:00
bindata "github.com/status-im/migrate/v4/source/go_bindata"
2020-01-02 10:10:19 +01:00
"github.com/status-im/status-go/common"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/mailserver/migrations"
"github.com/ethereum/go-ethereum/rlp"
2020-01-02 10:10:19 +01:00
"github.com/status-im/status-go/eth-node/types"
wakuv1common "github.com/status-im/status-go/wakuv1/common"
)
type PostgresDB struct {
db *sql.DB
name string
done chan struct{}
}
2020-01-08 12:12:23 +01:00
func NewPostgresDB(uri string) (*PostgresDB, error) {
db, err := sql.Open("postgres", uri)
if err != nil {
return nil, err
}
instance := &PostgresDB{
db: db,
done: make(chan struct{}),
}
if err := instance.setup(); err != nil {
return nil, err
}
// name is used for metrics labels
if name, err := instance.getDBName(uri); err == nil {
instance.name = name
}
// initialize the metric value
instance.updateArchivedEnvelopesCount()
// checking count on every insert is inefficient
go func() {
defer common.LogOnPanic()
for {
select {
case <-instance.done:
return
case <-time.After(time.Second * envelopeCountCheckInterval):
instance.updateArchivedEnvelopesCount()
}
}
}()
return instance, nil
}
type postgresIterator struct {
*sql.Rows
}
func (i *PostgresDB) getDBName(uri string) (string, error) {
query := "SELECT current_database()"
var dbName string
return dbName, i.db.QueryRow(query).Scan(&dbName)
}
func (i *PostgresDB) envelopesCount() (int, error) {
query := "SELECT count(*) FROM envelopes"
var count int
return count, i.db.QueryRow(query).Scan(&count)
}
func (i *PostgresDB) updateArchivedEnvelopesCount() {
if count, err := i.envelopesCount(); err != nil {
logutils.ZapLogger().Warn("db query for envelopes count failed", zap.Error(err))
} else {
archivedEnvelopesGauge.WithLabelValues(i.name).Set(float64(count))
}
}
func (i *postgresIterator) DBKey() (*DBKey, error) {
var value []byte
var id []byte
if err := i.Scan(&id, &value); err != nil {
return nil, err
}
return &DBKey{raw: id}, nil
}
func (i *postgresIterator) Error() error {
return i.Err()
}
func (i *postgresIterator) Release() error {
return i.Close()
}
func (i *postgresIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
var value []byte
var id []byte
if err := i.Scan(&id, &value); err != nil {
return nil, err
}
return value, nil
}
func (i *postgresIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
var value []byte
var id []byte
if err := i.Scan(&id, &value); err != nil {
return nil, err
}
return value, nil
}
func (i *PostgresDB) BuildIterator(query CursorQuery) (Iterator, error) {
var args []interface{}
stmtString := "SELECT id, data FROM envelopes"
var historyRange string
if len(query.cursor) > 0 {
args = append(args, query.start, query.cursor)
// If we have a cursor, we don't want to include that envelope in the result set
stmtString += " " + "WHERE id >= $1 AND id < $2"
historyRange = "partial" //nolint: goconst
} else {
args = append(args, query.start, query.end)
stmtString += " " + "WHERE id >= $1 AND id <= $2"
historyRange = "full" //nolint: goconst
}
var filterRange string
if len(query.topics) > 0 {
args = append(args, pq.Array(query.topics))
stmtString += " " + "AND topic = any($3)"
filterRange = "partial" //nolint: goconst
} else {
stmtString += " " + fmt.Sprintf("AND bloom & b'%s'::bit(512) = bloom", toBitString(query.bloom))
filterRange = "full" //nolint: goconst
}
// Positional argument depends on the fact whether the query uses topics or bloom filter.
// If topic is used, the list of topics is passed as an argument to the query.
// If bloom filter is used, it is included into the query statement.
args = append(args, query.limit)
stmtString += " " + fmt.Sprintf("ORDER BY ID DESC LIMIT $%d", len(args))
stmt, err := i.db.Prepare(stmtString)
if err != nil {
return nil, err
}
envelopeQueriesCounter.WithLabelValues(filterRange, historyRange).Inc()
rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}
return &postgresIterator{rows}, nil
}
func (i *PostgresDB) setup() error {
resources := bindata.Resource(
migrations.AssetNames(),
Anon Metrics Broadcast (#2198) * Protobufs and adapters * Added basic anon metric service and config init * Added fibonacci interval incrementer * Added basic Client.Start func and integrated interval incrementer * Added new processed field to app metrics table * Added id column to app metrics table * Added migration clean up * Added appmetrics GetUnprocessed and SetToProcessedByIDs and tests There was a wierd bug where metrics in the db that did not explicitly insert a value would be NULL, so could not be found by . In addition I've added a new primary id field to the app_metrics table so that updates could be done against very specific metric rows. * Updated adaptors and db to handle proto_id I need a way to distinguish individual metric items from each other so that I can ignore the ones that have been seen before. * Moved incrementer into dedicated file * Resolve incrementer test fail * Finalised the main loop functionality * Implemented delete loop framework * Updated adaptors file name * Added delete loop delay and quit, and tweak on RawMessage gen * Completed delete loop logic * Added DBLock to prevent deletion during mainLoop * Added postgres DB connection, integrated into anonmetrics.Server * Removed proto_id from SQL migration and model * Integrated postgres with Server and updated adaptors * Function name update * Added sample config files for client and server * Fixes and testing for low level e2e * make generate * Fix lint * Fix for receiving an anonMetricBatch not in server mode * Postgres test fixes * Tidy up, make vendor and make generate * delinting * Fixing database tests * Attempted fix of does: cannot open `does' (No such file or directory) not: cannot open `not' (No such file or directory) exist: cannot open `exist' (No such file or directory) error on sql resource loas * Moved all anon metric postgres migration logic and sources into a the protocol/anonmetrics package or sub packages. I don't know if this will fix the does: cannot open `does' (No such file or directory) not: cannot open `not' (No such file or directory) exist: cannot open `exist' (No such file or directory) error that happens in Jenkins but this could work * Lint for the lint god * Why doesn't the linter list all its problems at once? * test tweaks * Fix for wakuV2 change * DB reset change * Fix for postgres db migrations fails * More robust implementation of postgres test setup and teardown * Added block for anon metrics functionality * Version Bump to 0.84.0 * Added test to check anon metrics broadcast is deactivated * Protobufs and adapters * Added basic anon metric service and config init * Added new processed field to app metrics table * Added id column to app metrics table * Added migration clean up * Added appmetrics GetUnprocessed and SetToProcessedByIDs and tests There was a wierd bug where metrics in the db that did not explicitly insert a value would be NULL, so could not be found by . In addition I've added a new primary id field to the app_metrics table so that updates could be done against very specific metric rows. * Updated adaptors and db to handle proto_id I need a way to distinguish individual metric items from each other so that I can ignore the ones that have been seen before. * Added postgres DB connection, integrated into anonmetrics.Server * Removed proto_id from SQL migration and model * Integrated postgres with Server and updated adaptors * Added sample config files for client and server * Fix lint * Fix for receiving an anonMetricBatch not in server mode * Postgres test fixes * Tidy up, make vendor and make generate * Moved all anon metric postgres migration logic and sources into a the protocol/anonmetrics package or sub packages. I don't know if this will fix the does: cannot open `does' (No such file or directory) not: cannot open `not' (No such file or directory) exist: cannot open `exist' (No such file or directory) error that happens in Jenkins but this could work
2021-09-01 13:02:18 +01:00
migrations.Asset,
)
source, err := bindata.WithInstance(resources)
if err != nil {
return err
}
driver, err := postgres.WithInstance(i.db, &postgres.Config{})
if err != nil {
return err
}
m, err := migrate.NewWithInstance(
"go-bindata",
source,
"postgres",
driver)
if err != nil {
return err
}
if err = m.Up(); err != migrate.ErrNoChange {
return err
}
return nil
}
func (i *PostgresDB) Close() error {
select {
case <-i.done:
default:
close(i.done)
}
return i.db.Close()
}
func (i *PostgresDB) GetEnvelope(key *DBKey) ([]byte, error) {
statement := `SELECT data FROM envelopes WHERE id = $1`
stmt, err := i.db.Prepare(statement)
if err != nil {
return nil, err
}
defer stmt.Close()
var envelope []byte
if err = stmt.QueryRow(key.Bytes()).Scan(&envelope); err != nil {
return nil, err
}
return envelope, nil
}
func (i *PostgresDB) Prune(t time.Time, batch int) (int, error) {
var zero types.Hash
var emptyTopic types.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
statement := "DELETE FROM envelopes WHERE id BETWEEN $1 AND $2"
stmt, err := i.db.Prepare(statement)
if err != nil {
return 0, err
}
defer stmt.Close()
result, err := stmt.Exec(kl.Bytes(), ku.Bytes())
if err != nil {
return 0, err
}
rows, err := result.RowsAffected()
if err != nil {
return 0, err
}
return int(rows), nil
}
2020-01-08 12:12:23 +01:00
func (i *PostgresDB) SaveEnvelope(env types.Envelope) error {
topic := env.Topic()
key := NewDBKey(env.Expiry()-env.TTL(), topic, env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil {
logutils.ZapLogger().Error("rlp.EncodeToBytes failed", zap.Error(err))
archivedErrorsCounter.WithLabelValues(i.name).Inc()
return err
}
if rawEnvelope == nil {
archivedErrorsCounter.WithLabelValues(i.name).Inc()
return errors.New("failed to encode envelope to bytes")
}
statement := "INSERT INTO envelopes (id, data, topic, bloom) VALUES ($1, $2, $3, B'"
statement += toBitString(env.Bloom())
statement += "'::bit(512)) ON CONFLICT (id) DO NOTHING;"
stmt, err := i.db.Prepare(statement)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(
key.Bytes(),
rawEnvelope,
2019-10-28 14:50:33 +01:00
topicToByte(topic),
)
if err != nil {
archivedErrorsCounter.WithLabelValues(i.name).Inc()
return err
}
archivedEnvelopesGauge.WithLabelValues(i.name).Inc()
archivedEnvelopeSizeMeter.WithLabelValues(i.name).Observe(
float64(wakuv1common.EnvelopeHeaderLength + env.Size()))
return nil
}
func topicToByte(t types.TopicType) []byte {
return []byte{t[0], t[1], t[2], t[3]}
}
func toBitString(bloom []byte) string {
val := ""
for _, n := range bloom {
val += fmt.Sprintf("%08b", n)
}
return val
}