Merge branch 'master' into feat/autoshard-store-api

This commit is contained in:
Prem Chaitanya Prathi 2023-11-10 11:00:48 +05:30 committed by GitHub
commit ab8cce642a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 252 additions and 147 deletions

View File

@ -17,7 +17,7 @@ pipeline {
string(
name: 'IMAGE_TAG',
description: 'Docker image tag.',
defaultValue: getDefaultImageTag()
defaultValue: getDefaultImageTag(params.IMAGE_TAG)
)
string(
name: 'DOCKER_CRED',
@ -109,10 +109,11 @@ def discordNotify(Map args=[:]) {
}
}
def getDefaultImageTag() {
def getDefaultImageTag(currentValue) {
switch (env.JOB_BASE_NAME) {
case 'docker-latest': return 'latest'
case 'docker-release': return 'stable'
default: return ''
case 'docker-manual': return ''
default: return currentValue
}
}

View File

@ -330,12 +330,6 @@ var (
Destination: &options.Store.DatabaseURL,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_URL"},
})
StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-vacuum",
Usage: "Enable database vacuuming at start.",
Destination: &options.Store.Vacuum,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"},
})
StoreMessageDBMigration = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "store-message-db-migration",
Usage: "Enable database migration at start.",

View File

@ -65,7 +65,6 @@ func main() {
StoreMessageDBURL,
StoreMessageRetentionTime,
StoreMessageRetentionCapacity,
StoreMessageDBVacuum,
StoreMessageDBMigration,
FilterFlag,
FilterNode,

View File

@ -110,9 +110,7 @@ func Execute(options NodeOptions) error {
var db *sql.DB
var migrationFn func(*sql.DB) error
if requiresDB(options) && options.Store.Migration {
dbSettings := dbutils.DBSettings{
Vacuum: options.Store.Vacuum,
}
dbSettings := dbutils.DBSettings{}
db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger)
if err != nil {
return nonRecoverErrorMsg("could not connect to DB: %w", err)

View File

@ -81,7 +81,6 @@ type StoreOptions struct {
RetentionMaxMessages int
//ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool
}

View File

@ -13,7 +13,7 @@ import (
func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))

View File

@ -128,7 +128,7 @@ func NewNode(configJSON string) error {
if *config.EnableStore {
var db *sql.DB
var migrationFn func(*sql.DB) error
db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{Vacuum: true}, utils.Logger())
db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{}, utils.Logger())
if err != nil {
return err
}

View File

@ -0,0 +1,46 @@
package tests
import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"strconv"
"testing"
)
func TestStringGenerators(t *testing.T) {
log := utils.Logger()
// Generate string and print out to console
for i := 0; i < 1000; i++ {
x, err := GenerateRandomASCIIString(1, 4097)
require.NoError(t, err)
log.Info("Generated random ASCII string", zap.String(strconv.Itoa(i), x))
x, err = GenerateRandomUTF8String(1, 4097, false)
require.NoError(t, err)
log.Info("Generated random UTF8 string", zap.String(strconv.Itoa(i), x))
x, err = GenerateRandomUTF8String(1, 4097, true)
require.NoError(t, err)
log.Info("Generated uncommon UTF8 string", zap.String(strconv.Itoa(i), x))
x, err = GenerateRandomJSONString()
require.NoError(t, err)
log.Info("Generated random JSON string", zap.String(strconv.Itoa(i), x))
x, err = GenerateRandomBase64String(1025)
require.NoError(t, err)
log.Info("Generated random Base64 string", zap.String(strconv.Itoa(i), x))
x, err = GenerateRandomURLEncodedString(2049)
require.NoError(t, err)
log.Info("Generated random URL encoded string", zap.String(strconv.Itoa(i), x))
x, err = GenerateRandomSQLInsert()
require.NoError(t, err)
log.Info("Generated random SQL insert string", zap.String(strconv.Itoa(i), x))
}
}

View File

@ -1,16 +1,23 @@
package tests
import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"math"
"math/big"
"net"
"net/url"
"strconv"
"strings"
"testing"
"unicode/utf8"
gcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@ -220,3 +227,165 @@ func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) {
Port: port,
}, nil
}
func RandomInt(min, max int) (int, error) {
n, err := rand.Int(rand.Reader, big.NewInt(int64(max-min+1)))
if err != nil {
return 0, err
}
return min + int(n.Int64()), nil
}
func RandomBytes(n int) ([]byte, error) {
b := make([]byte, n)
_, err := rand.Read(b)
if err != nil {
return nil, err
}
return b, nil
}
func GenerateRandomASCIIString(minLength int, maxLength int) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength-minLength+1)))
if err != nil {
return "", err
}
length.SetInt64(length.Int64() + int64(minLength))
const chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result := make([]byte, length.Int64())
for i := range result {
num, err := rand.Int(rand.Reader, big.NewInt(int64(len(chars))))
if err != nil {
return "", err
}
result[i] = chars[num.Int64()]
}
return string(result), nil
}
func GenerateRandomUTF8String(minLength int, maxLength int, withUncommon bool) (string, error) {
length, err := rand.Int(rand.Reader, big.NewInt(int64(maxLength-minLength+1)))
if err != nil {
return "", err
}
length.SetInt64(length.Int64() + int64(minLength))
var (
runes []rune
start, end int
)
if withUncommon {
// Unicode range for uncommon or unprintable characters, the Private Use Area (E000F8FF)
start = 0xE000
end = 0xF8FF
} else {
// Define unicode range
start = 0x0020 // Space character
end = 0x007F // Tilde (~)
}
for i := 0; int64(i) < length.Int64(); i++ {
randNum, err := rand.Int(rand.Reader, big.NewInt(int64(end-start+1)))
if err != nil {
return "", err
}
char := rune(start + int(randNum.Int64()))
if !utf8.ValidRune(char) {
continue
}
runes = append(runes, char)
}
return string(runes), nil
}
func GenerateRandomJSONString() (string, error) {
// With 5 key-value pairs
m := make(map[string]interface{})
for i := 0; i < 5; i++ {
key, err := GenerateRandomASCIIString(1, 20)
if err != nil {
return "", err
}
value, err := GenerateRandomASCIIString(1, 4097)
if err != nil {
return "", err
}
m[key] = value
}
// Marshal the map into a JSON string
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
encoder.SetEscapeHTML(false)
err := encoder.Encode(m)
if err != nil {
return "", err
}
return buf.String(), nil
}
func GenerateRandomBase64String(length int) (string, error) {
bytes, err := RandomBytes(length)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(bytes), nil
}
func GenerateRandomURLEncodedString(length int) (string, error) {
randomString, err := GenerateRandomASCIIString(1, 4097)
if err != nil {
return "", err
}
// URL-encode the random string
return url.QueryEscape(randomString), nil
}
func GenerateRandomSQLInsert() (string, error) {
// Random table name
tableName, err := GenerateRandomASCIIString(1, 10)
if err != nil {
return "", err
}
// Random column names
columnCount, err := RandomInt(3, 6)
if err != nil {
return "", err
}
columnNames := make([]string, columnCount)
for i := 0; i < columnCount; i++ {
columnName, err := GenerateRandomASCIIString(1, 20)
if err != nil {
return "", err
}
columnNames[i] = columnName
}
// Random values
values := make([]string, columnCount)
for i := 0; i < columnCount; i++ {
value, err := GenerateRandomASCIIString(1, 100)
if err != nil {
return "", err
}
values[i] = "'" + value + "'"
}
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s);",
tableName,
strings.Join(columnNames, ", "),
strings.Join(values, ", "))
return query, nil
}

View File

@ -1,10 +1,8 @@
package postgres
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/pgx"
@ -15,55 +13,13 @@ import (
"go.uber.org/zap"
)
func executeVacuum(db *sql.DB, logger *zap.Logger) error {
logger.Info("starting PostgreSQL database vacuuming")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error)
go func() {
defer cancel()
_, err := db.Exec("VACUUM FULL")
if err != nil {
errCh <- err
}
}()
t := time.NewTicker(2 * time.Minute)
defer t.Stop()
loop:
for {
select {
case <-ctx.Done():
break loop
case err := <-errCh:
return err
case <-t.C:
logger.Info("still vacuuming...")
}
}
logger.Info("finished PostgreSQL database vacuuming")
return nil
}
// NewDB connects to postgres DB in the specified path
func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, error) {
func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) {
db, err := sql.Open("pgx", dburl)
if err != nil {
return nil, err
}
if shouldVacuum {
err := executeVacuum(db, logger)
if err != nil {
return nil, err
}
}
return db, nil
}

View File

@ -1,11 +1,9 @@
package sqlite
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
@ -32,43 +30,8 @@ func addSqliteURLDefaults(dburl string) string {
return dburl
}
func executeVacuum(db *sql.DB, logger *zap.Logger) error {
logger.Info("starting sqlite database vacuuming")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error)
go func() {
defer cancel()
_, err := db.Exec("VACUUM")
if err != nil {
errCh <- err
}
}()
t := time.NewTicker(2 * time.Minute)
defer t.Stop()
loop:
for {
select {
case <-ctx.Done():
break loop
case err := <-errCh:
return err
case <-t.C:
logger.Info("still vacuuming...")
}
}
logger.Info("finished sqlite database vacuuming")
return nil
}
// NewDB creates a sqlite3 DB in the specified path
func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, error) {
func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) {
db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl))
if err != nil {
return nil, err
@ -77,13 +40,6 @@ func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, error)
// Disable concurrent access as not supported by the driver
db.SetMaxOpenConns(1)
if shouldVacuum {
err := executeVacuum(db, logger)
if err != nil {
return nil, err
}
}
return db, nil
}

View File

@ -22,7 +22,7 @@ func validateDBUrl(val string) error {
// DBSettings hold db specific configuration settings required during the db initialization
type DBSettings struct {
Vacuum bool
// TODO: add any DB specific setting here
}
// ExtractDBAndMigration will return a database connection, and migration function that should be used depending on a database connection string
@ -50,10 +50,10 @@ func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *za
dbParams := dbURLParts[1]
switch dbEngine {
case "sqlite3":
db, err = sqlite.NewDB(dbParams, dbSettings.Vacuum, logger)
db, err = sqlite.NewDB(dbParams, logger)
migrationFn = sqlite.Migrations
case "postgresql":
db, err = postgres.NewDB(dbURL, dbSettings.Vacuum, logger)
db, err = postgres.NewDB(dbURL, logger)
migrationFn = postgres.Migrations
default:
err = errors.New("unsupported database engine")

View File

@ -70,7 +70,7 @@ func TestConnectionStatusChanges(t *testing.T) {
err = node2.Start(ctx)
require.NoError(t, err)
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))
require.NoError(t, err)

View File

@ -242,7 +242,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
defer subs[0].Unsubscribe()
// NODE2: Filter Client/Store
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))
require.NoError(t, err)

View File

@ -244,10 +244,7 @@ func (gm *DynamicGroupManager) InsertMembers(toInsert *om.OrderedMap) error {
gm.metrics.RecordRegisteredMembership(gm.rln.LeavesSet())
_, err = gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64))
if err != nil {
return err
}
gm.rootTracker.UpdateLatestRoot(pair.Key.(uint64))
}
return nil
}

View File

@ -31,8 +31,7 @@ func TestHandler(t *testing.T) {
rlnInstance, err := rln.NewRLN()
require.NoError(t, err)
rootTracker, err := group_manager.NewMerkleRootTracker(5, rlnInstance)
require.NoError(t, err)
rootTracker := group_manager.NewMerkleRootTracker(5, rlnInstance)
_, cancel := context.WithCancel(context.TODO())
defer cancel()

View File

@ -22,9 +22,9 @@ func TestFetchingLogic(t *testing.T) {
require.NoError(t, err)
rlnInstance, err := rln.NewRLN()
require.NoError(t, err)
rootTracker, err := group_manager.NewMerkleRootTracker(1, rlnInstance)
require.NoError(t, err)
//
rootTracker := group_manager.NewMerkleRootTracker(1, rlnInstance)
mf := MembershipFetcher{
web3Config: &web3.Config{
RLNContract: web3.RLNContract{

View File

@ -4,7 +4,9 @@ import (
"bytes"
"sync"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
)
// RootsPerBlock stores the merkle root generated at N block number
@ -27,18 +29,15 @@ type MerkleRootTracker struct {
const maxBufferSize = 20
// NewMerkleRootTracker creates an instance of MerkleRootTracker
func NewMerkleRootTracker(acceptableRootWindowSize int, rlnInstance *rln.RLN) (*MerkleRootTracker, error) {
func NewMerkleRootTracker(acceptableRootWindowSize int, rlnInstance *rln.RLN) *MerkleRootTracker {
result := &MerkleRootTracker{
acceptableRootWindowSize: acceptableRootWindowSize,
rln: rlnInstance,
}
_, err := result.UpdateLatestRoot(0)
if err != nil {
return nil, err
}
result.UpdateLatestRoot(0)
return result, nil
return result
}
// Backfill is used to pop merkle roots when there is a chain fork
@ -102,18 +101,18 @@ func (m *MerkleRootTracker) IndexOf(root [32]byte) int {
// UpdateLatestRoot should be called when a block containing a new
// IDCommitment is received so we can keep track of the merkle root change
func (m *MerkleRootTracker) UpdateLatestRoot(blockNumber uint64) (rln.MerkleNode, error) {
func (m *MerkleRootTracker) UpdateLatestRoot(blockNumber uint64) rln.MerkleNode {
m.Lock()
defer m.Unlock()
root, err := m.rln.GetMerkleRoot()
if err != nil {
return [32]byte{}, err
utils.Logger().Named("root-tracker").Panic("could not retrieve merkle root", zap.Error(err))
}
m.pushRoot(blockNumber, root)
return root, nil
return root
}
func (m *MerkleRootTracker) pushRoot(blockNumber uint64, root [32]byte) {

View File

@ -68,10 +68,7 @@ func (gm *StaticGroupManager) insertMembers(idCommitments []rln.IDCommitment) er
latestIndex := gm.nextIndex + uint64(len(idCommitments))
_, err = gm.rootTracker.UpdateLatestRoot(latestIndex)
if err != nil {
return err
}
gm.rootTracker.UpdateLatestRoot(latestIndex)
gm.nextIndex = latestIndex + 1

View File

@ -140,8 +140,7 @@ func (s *WakuRLNRelayDynamicSuite) TestDynamicGroupManagement() {
rlnInstance, err := rln.NewRLN()
s.Require().NoError(err)
rt, err := group_manager.NewMerkleRootTracker(5, rlnInstance)
s.Require().NoError(err)
rt := group_manager.NewMerkleRootTracker(5, rlnInstance)
u1Credentials := s.generateCredentials(rlnInstance)
appKeystore, err := keystore.New(s.tmpKeystorePath(), dynamic.RLNAppInfo, utils.Logger())

View File

@ -88,8 +88,7 @@ func (s *WakuRLNRelaySuite) TestUpdateLogAndHasDuplicate() {
rlnInstance, err := r.NewRLN()
s.Require().NoError(err)
rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
s.Require().NoError(err)
rootTracker := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
rlnRelay := &WakuRLNRelay{
nullifierLog: NewNullifierLog(context.TODO(), utils.Logger()),
@ -184,10 +183,9 @@ func (s *WakuRLNRelaySuite) TestValidateMessage() {
// Create a RLN instance
rlnInstance, err := r.NewRLN()
s.Require().NoError(err)
//
rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
s.Require().NoError(err)
//
rootTracker := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
idCredential := groupKeyPairs[index]
groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, rlnInstance, rootTracker, utils.Logger())
s.Require().NoError(err)

View File

@ -47,10 +47,8 @@ func GetRLNInstanceAndRootTracker(treePath string) (*rln.RLN, *group_manager.Mer
return nil, nil, err
}
rootTracker, err := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
if err != nil {
return nil, nil, err
}
rootTracker := group_manager.NewMerkleRootTracker(acceptableRootWindowSize, rlnInstance)
return rlnInstance, rootTracker, nil
}
func New(

View File

@ -13,7 +13,7 @@ import (
func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))

View File

@ -45,7 +45,7 @@ func TestRendezvous(t *testing.T) {
host1, err := tests.MakeHost(ctx, port1, rand.Reader)
require.NoError(t, err)
db, err := sqlite.NewDB(":memory:", false, utils.Logger())
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
err = sqlite.Migrations(db)