feat: verify if mailserver is pinned in cycle (#2489)
This commit is contained in:
parent
1a439baa5c
commit
52b2c5db7b
|
@ -702,6 +702,21 @@ func (db *Database) GetDappsAddress() (rst types.Address, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *Database) GetPinnedMailservers() (rst map[string]string, err error) {
|
||||||
|
rst = make(map[string]string)
|
||||||
|
var pinnedMailservers string
|
||||||
|
err = db.db.QueryRow("SELECT COALESCE(pinned_mailservers, '') FROM settings WHERE synthetic_id = 'id'").Scan(&pinnedMailservers)
|
||||||
|
if err == sql.ErrNoRows || pinnedMailservers == "" {
|
||||||
|
return rst, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal([]byte(pinnedMailservers), &rst)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (db *Database) CanUseMailservers() (bool, error) {
|
func (db *Database) CanUseMailservers() (bool, error) {
|
||||||
var result bool
|
var result bool
|
||||||
err := db.db.QueryRow("SELECT use_mailservers FROM settings WHERE synthetic_id = 'id'").Scan(&result)
|
err := db.db.QueryRow("SELECT use_mailservers FROM settings WHERE synthetic_id = 'id'").Scan(&result)
|
||||||
|
|
|
@ -215,16 +215,11 @@ func (m *Messenger) findStoreNode() error {
|
||||||
return m.connectToStoreNode(parseMultiaddresses([]string{availableMailservers[r.Int64()].Address})[0])
|
return m.connectToStoreNode(parseMultiaddresses([]string{availableMailservers[r.Int64()].Address})[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Messenger) findNewMailserverV1() error {
|
func (m *Messenger) getFleet() (string, error) {
|
||||||
// TODO: remove this function once WakuV1 is deprecated
|
|
||||||
|
|
||||||
allMailservers := parseNodes(m.config.clusterConfig.TrustedMailServers)
|
|
||||||
|
|
||||||
// Append user mailservers
|
|
||||||
var fleet string
|
var fleet string
|
||||||
dbFleet, err := m.settings.GetFleet()
|
dbFleet, err := m.settings.GetFleet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return "", err
|
||||||
}
|
}
|
||||||
if dbFleet != "" {
|
if dbFleet != "" {
|
||||||
fleet = dbFleet
|
fleet = dbFleet
|
||||||
|
@ -233,6 +228,19 @@ func (m *Messenger) findNewMailserverV1() error {
|
||||||
} else {
|
} else {
|
||||||
fleet = params.FleetProd
|
fleet = params.FleetProd
|
||||||
}
|
}
|
||||||
|
return fleet, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Messenger) findNewMailserverV1() error {
|
||||||
|
// TODO: remove this function once WakuV1 is deprecated
|
||||||
|
|
||||||
|
allMailservers := parseNodes(m.config.clusterConfig.TrustedMailServers)
|
||||||
|
|
||||||
|
// Append user mailservers
|
||||||
|
fleet, err := m.getFleet()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
customMailservers, err := m.mailservers.Mailservers()
|
customMailservers, err := m.mailservers.Mailservers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -425,6 +433,12 @@ func (m *Messenger) connectToStoreNode(node multiaddr.Multiaddr) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Messenger) getActiveMailserver() *enode.Node {
|
||||||
|
m.mailserverCycle.RLock()
|
||||||
|
defer m.mailserverCycle.RUnlock()
|
||||||
|
return m.mailserverCycle.activeMailserver
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Messenger) isActiveMailserverAvailable() bool {
|
func (m *Messenger) isActiveMailserverAvailable() bool {
|
||||||
m.mailserverCycle.RLock()
|
m.mailserverCycle.RLock()
|
||||||
defer m.mailserverCycle.RUnlock()
|
defer m.mailserverCycle.RUnlock()
|
||||||
|
@ -564,23 +578,61 @@ func (m *Messenger) updateWakuV1PeerStatus() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Messenger) getPinnedMailserver() (string, error) {
|
||||||
|
// TODO: Pinned mailservers are ony available in V1 for now
|
||||||
|
if m.transport.WakuVersion() != 1 {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fleet, err := m.getFleet()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
pinnedMailservers, err := m.settings.GetPinnedMailservers()
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
pinnedMailserver, ok := pinnedMailservers[fleet]
|
||||||
|
if !ok {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return pinnedMailserver, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Messenger) checkMailserverConnection() {
|
func (m *Messenger) checkMailserverConnection() {
|
||||||
ticker := time.NewTicker(10 * time.Second)
|
ticker := time.NewTicker(10 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
m.logger.Info("Verifying mailserver connection state...")
|
m.logger.Info("Verifying mailserver connection state...")
|
||||||
// m.settings.GetPinnedMailserver
|
|
||||||
//if pinnedMailserver != "" && self.activeMailserver != pinnedMailserver {
|
pinnedMailserver, err := m.getPinnedMailserver()
|
||||||
// connect to current mailserver from the settings
|
if err != nil {
|
||||||
// self.mailservers = pinnedMailserver
|
m.logger.Error("Could not obtain the pinned mailserver", zap.Error(err))
|
||||||
// self.connect(pinnedMailserver)
|
continue
|
||||||
//} else {
|
}
|
||||||
// or setup a random mailserver:
|
|
||||||
if !m.isActiveMailserverAvailable() {
|
if pinnedMailserver != "" {
|
||||||
m.cycleMailservers()
|
pinnedNode := parseNodes([]string{pinnedMailserver})[0]
|
||||||
|
|
||||||
|
activeMailserver := m.getActiveMailserver()
|
||||||
|
if activeMailserver == nil || activeMailserver.String() != pinnedMailserver {
|
||||||
|
m.logger.Info("New pinned mailserver", zap.Any("pinnedMailserver", pinnedMailserver))
|
||||||
|
err = m.connectToMailserver(pinnedNode)
|
||||||
|
if err != nil {
|
||||||
|
m.logger.Error("Could not connect to pinned mailserver", zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// or setup a random mailserver:
|
||||||
|
if !m.isActiveMailserverAvailable() {
|
||||||
|
m.cycleMailservers()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
|
|
Loading…
Reference in New Issue