feat: switch fleet

This commit is contained in:
Richard Ramos 2022-03-17 14:06:02 -04:00
parent bf8e71cfa9
commit de2b8df033
4 changed files with 69 additions and 5 deletions

View File

@ -1240,3 +1240,27 @@ func (b *GethStatusBackend) SignHash(hexEncodedHash string) (string, error) {
hexEncodedSignature := types.EncodeHex(signature)
return hexEncodedSignature, nil
}
func (b *GethStatusBackend) SwitchFleet(fleet string, conf *params.NodeConfig) error {
if b.appDB == nil {
return ErrDBNotAvailable
}
accountDB := accounts.NewDB(b.appDB)
err := accountDB.SaveSetting("fleet", fleet)
if err != nil {
return err
}
err = nodecfg.SaveNodeConfig(b.appDB, conf)
if err != nil {
return err
}
waku2 := b.statusNode.WakuV2Service()
if waku2 != nil {
return waku2.ClearPeerCache()
}
return nil
}

View File

@ -777,3 +777,19 @@ func GetPasswordStrength(paramsJSON string) string {
}
return string(data)
}
func SwitchFleet(fleet string, configJSON string) string {
var conf params.NodeConfig
if configJSON != "" {
err := json.Unmarshal([]byte(configJSON), &conf)
if err != nil {
return makeJSONResponse(err)
}
}
conf.ClusterConfig.Fleet = fleet
err := statusBackend.SwitchFleet(fleet, &conf)
return makeJSONResponse(err)
}

View File

@ -91,3 +91,12 @@ func CreateTable(db *sql.DB, tableName string) error {
}
return nil
}
func Clean(db *sql.DB, tableName string) error {
sqlStmt := fmt.Sprintf("DELETE FROM %s;", tableName)
_, err := db.Exec(sqlStmt)
if err != nil {
return err
}
return nil
}

View File

@ -79,17 +79,21 @@ import (
const messageQueueLimit = 1024
const requestTimeout = 5 * time.Second
const PeerStoreTable = "peerstore"
type settings struct {
LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol instead of Lightpush
MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations
PersistPeers bool // Indicates if the node will persist peers
}
// Waku represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer.
type Waku struct {
node *node.WakuNode // reference to a libp2p waku node
node *node.WakuNode // reference to a libp2p waku node
appDB *sql.DB
dnsAddressCache map[string][]multiaddr.Multiaddr // Map to store the multiaddresses returned by dns discovery
dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map
@ -128,7 +132,7 @@ type Waku struct {
}
// New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku, error) {
func New(nodeKey string, cfg *Config, logger *zap.Logger, appDB *sql.DB) (*Waku, error) {
if logger == nil {
logger = zap.NewNop()
}
@ -138,6 +142,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
logger.Debug("starting wakuv2 with config", zap.Any("config", cfg))
waku := &Waku{
appDB: appDB,
privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte),
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage),
@ -158,6 +163,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
MaxMsgSize: cfg.MaxMessageSize,
LightClient: cfg.LightClient,
MinPeersForRelay: cfg.MinPeersForRelay,
PersistPeers: cfg.PersistPeers,
}
waku.filters = common.NewFilters()
@ -194,16 +200,17 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appdb *sql.DB) (*Waku,
libp2pOpts = append(libp2pOpts, libp2p.BandwidthReporter(waku.bandwidthCounter))
if cfg.PersistPeers {
if appdb == nil {
if appDB == nil {
return nil, fmt.Errorf("a db connection must be provided in order to persist the peers")
}
// Create persistent peerstore
queries, err := persistence.NewQueries("peerstore", appdb)
queries, err := persistence.NewQueries(PeerStoreTable, appDB)
if err != nil {
return nil, fmt.Errorf("failed to setup peerstore table: %v", err)
}
datastore := dssql.NewDatastore(appdb, queries)
datastore := dssql.NewDatastore(appDB, queries)
opts := pstoreds.DefaultOpts()
peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts)
if err != nil {
@ -1014,6 +1021,14 @@ func (w *Waku) processQueue() {
}
}
func (w *Waku) ClearPeerCache() error {
if !w.settings.PersistPeers {
return nil
}
return persistence.Clean(w.appDB, PeerStoreTable)
}
// Envelopes retrieves all the messages currently pooled by the node.
func (w *Waku) Envelopes() []*common.ReceivedMessage {
w.poolMu.RLock()