refactor: it's not necessary for storenodes to be connected peers

This commit is contained in:
Richard Ramos 2023-11-24 16:32:02 -04:00 committed by richΛrd
parent 82185b54b5
commit ee8d8473e2
2 changed files with 72 additions and 65 deletions

View File

@ -77,7 +77,13 @@ func (m *Messenger) StartMailserverCycle(mailservers []mailservers.Mailserver) e
go m.updateWakuV1PeerStatus()
case 2:
go m.updateWakuV2PeerStatus()
for _, storenode := range mailservers {
_, err := m.transport.AddStorePeer(storenode.Address)
if err != nil {
return err
}
}
go m.verifyStorenodeStatus()
default:
return fmt.Errorf("unsupported waku version: %d", version)
@ -118,19 +124,9 @@ func (m *Messenger) disconnectMailserver() error {
}
m.mailPeersMutex.Unlock()
if m.mailserverCycle.activeMailserver.Version == 2 {
peerID, err := m.mailserverCycle.activeMailserver.PeerID()
if err != nil {
return err
}
// WakuV2 does not keep an active storenode connection
err = m.transport.DropPeer(peerID.String())
if err != nil {
m.logger.Warn("could not drop peer")
return err
}
} else {
if m.mailserverCycle.activeMailserver.Version == 1 {
node, err := m.mailserverCycle.activeMailserver.Enode()
if err != nil {
return err
@ -267,7 +263,7 @@ func (m *Messenger) findNewMailserver() error {
var availableMailservers []*mailservers.PingResult
for _, result := range pingResult {
if result.Err != nil {
m.logger.Info("connecting error", zap.String("eerr", *result.Err))
m.logger.Info("connecting error", zap.String("err", *result.Err))
continue // The results with error are ignored
}
availableMailservers = append(availableMailservers, result)
@ -294,7 +290,9 @@ func (m *Messenger) findNewMailserver() error {
pInfo, ok := m.mailserverCycle.peers[ms.ID]
m.mailPeersMutex.Unlock()
if ok {
sortedMailserver.CanConnectAfter = pInfo.canConnectAfter
if time.Now().Before(pInfo.canConnectAfter) {
continue // We can't connect to this node yet
}
}
sortedMailservers = append(sortedMailservers, sortedMailserver)
@ -307,6 +305,10 @@ func (m *Messenger) findNewMailserver() error {
pSize := poolSize(len(sortedMailservers) - 1)
if pSize <= 0 {
pSize = len(sortedMailservers)
if pSize <= 0 {
m.logger.Warn("No mailservers available") // Do nothing...
return nil
}
}
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
@ -357,14 +359,10 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
}
if activeMailserverStatus != connected {
// Attempt to connect to mailserver by adding it as a peer
// WakuV2 does not require having the peer connected to query the peer
if ms.Version == 2 {
if err := m.transport.DialPeer(ms.Address); err != nil {
m.logger.Error("failed to dial", zap.Error(err))
return err
}
} else {
// Attempt to connect to mailserver by adding it as a peer
if ms.Version == 1 {
node, err := ms.Enode()
if err != nil {
return err
@ -375,21 +373,34 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error {
}
}
connectionStatus := connecting
if ms.Version == 2 {
connectionStatus = connected
}
m.mailPeersMutex.Lock()
pInfo, ok := m.mailserverCycle.peers[ms.ID]
if ok {
pInfo.status = connecting
pInfo.lastConnectionAttempt = time.Now()
pInfo.mailserver = ms
m.mailserverCycle.peers[ms.ID] = pInfo
} else {
m.mailserverCycle.peers[ms.ID] = peerStatus{
status: connecting,
mailserver: ms,
lastConnectionAttempt: time.Now(),
}
m.mailserverCycle.peers[ms.ID] = peerStatus{
status: connectionStatus,
lastConnectionAttempt: time.Now(),
canConnectAfter: time.Now().Add(defaultBackoff),
mailserver: ms,
}
m.mailPeersMutex.Unlock()
if ms.Version == 2 {
m.mailserverCycle.activeMailserver.FailedRequests = 0
m.logger.Info("mailserver available", zap.String("address", m.mailserverCycle.activeMailserver.UniqueID()))
m.EmitMailserverAvailable()
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID)
// Query mailserver
go func() {
_, err := m.performMailserverRequest(func() (*MessengerResponse, error) { return m.RequestAllHistoricMessages(false) })
if err != nil {
m.logger.Error("could not perform mailserver request", zap.Error(err))
}
}()
}
}
return nil
}
@ -434,17 +445,6 @@ func (m *Messenger) mailserverPeersInfo() []ConnectedPeer {
return connectedPeers
}
func (m *Messenger) storenodesPeersInfo() []ConnectedPeer {
var connectedPeers []ConnectedPeer
for k := range m.transport.Peers() {
connectedPeers = append(connectedPeers, ConnectedPeer{
UniqueID: k,
})
}
return connectedPeers
}
func (m *Messenger) penalizeMailserver(id string) {
m.mailPeersMutex.Lock()
defer m.mailPeersMutex.Unlock()
@ -614,37 +614,20 @@ func (m *Messenger) updateWakuV1PeerStatus() {
}
}
func (m *Messenger) updateWakuV2PeerStatus() {
connSubscription, err := m.transport.SubscribeToConnStatusChanges()
if err != nil {
m.logger.Error("Could not subscribe to connection status changes", zap.Error(err))
}
func (m *Messenger) verifyStorenodeStatus() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case status := <-connSubscription.C:
var connectedPeers []ConnectedPeer
for id := range status.Peers {
connectedPeers = append(connectedPeers, ConnectedPeer{UniqueID: id})
}
err := m.handleMailserverCycleEvent(connectedPeers)
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
return
}
case <-ticker.C:
err := m.handleMailserverCycleEvent(m.storenodesPeersInfo())
err := m.disconnectStorenodeIfRequired()
if err != nil {
m.logger.Error("failed to handle mailserver cycle event", zap.Error(err))
continue
}
case <-m.quit:
connSubscription.Unsubscribe()
return
}
}
@ -705,3 +688,24 @@ func (m *Messenger) SubscribeMailserverAvailable() chan struct{} {
m.mailserverCycle.availabilitySubscriptions = append(m.mailserverCycle.availabilitySubscriptions, c)
return c
}
func (m *Messenger) disconnectStorenodeIfRequired() error {
m.logger.Debug("wakuV2 storenode status verification")
if m.mailserverCycle.activeMailserver == nil {
// No active storenode, find a new one
m.cycleMailservers()
return nil
}
// Check whether we want to disconnect the active storenode
if m.mailserverCycle.activeMailserver.FailedRequests >= mailserverMaxFailedRequests {
m.penalizeMailserver(m.mailserverCycle.activeMailserver.ID)
signal.SendMailserverNotWorking()
m.logger.Info("too many failed requests", zap.String("storenode", m.mailserverCycle.activeMailserver.UniqueID()))
m.mailserverCycle.activeMailserver.FailedRequests = 0
return m.connectToNewMailserverAndWait()
}
return nil
}

View File

@ -102,10 +102,13 @@ func (s *MessengerStoreNodeRequestSuite) SetupTest() {
bobLogger := s.logger.With(zap.String("name", "bob"))
s.bob = s.newMessenger(s.bobWaku, bobLogger, storeNodeAddress)
s.bob.StartRetrieveMessagesLoop(time.Second, nil)
// Connect owner to storenode so message is stored
err := s.ownerWaku.DialPeer(storeNodeAddress)
s.Require().NoError(err)
}
func (s *MessengerStoreNodeRequestSuite) TestRequestCommunityInfo() {
WaitForAvailableStoreNode(&s.Suite, s.owner, time.Second)
createCommunityRequest := &requests.CreateCommunity{