Handle connection change

Currently messenger has no notion of being online.
This might cause a problem as we retry to register with a push
notification server even if not connected to any peer, which will
inevitably fail.
This commit adds a method `handleConnectionChange` that will be called
every time the connection change state.
This commit is contained in:
Andrea Maria Piana 2020-08-27 14:38:59 +02:00
parent 0ff2542939
commit 2ef1aa89f4
No known key found for this signature in database
GPG Key ID: AA6CCA6DE0E06424
5 changed files with 65 additions and 0 deletions

View File

@ -99,3 +99,7 @@ func (w *gethNodeWrapper) RemovePeer(url string) error {
return nil
}
func (w *gethNodeWrapper) PeersCount() int {
return len(w.stack.Server().Peers())
}

View File

@ -22,4 +22,5 @@ type Node interface {
GetWaku(ctx interface{}) (Waku, error)
AddPeer(url string) error
RemovePeer(url string) error
PeersCount() int
}

View File

@ -78,6 +78,7 @@ type Messenger struct {
installationID string
mailserver []byte
database *sql.DB
quit chan struct{}
mutex sync.Mutex
}
@ -249,6 +250,7 @@ func NewMessenger(
messagesPersistenceEnabled: c.messagesPersistenceEnabled,
verifyTransactionClient: c.verifyTransactionClient,
database: database,
quit: make(chan struct{}),
shutdownTasks: []func() error{
database.Close,
pushNotificationClient.Stop,
@ -301,9 +303,29 @@ func (m *Messenger) Start() error {
}
m.handleEncryptionLayerSubscriptions(subscriptions)
m.handleConnectionChange(m.online())
m.watchConnectionChange()
return nil
}
// handle connection change is called each time we go from offline/online or viceversa
func (m *Messenger) handleConnectionChange(online bool) {
if online {
if m.pushNotificationClient != nil {
m.pushNotificationClient.Online()
}
} else {
if m.pushNotificationClient != nil {
m.pushNotificationClient.Offline()
}
}
}
func (m *Messenger) online() bool {
return m.node.PeersCount() > 0
}
func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) {
if m.pushNotificationClient == nil || !m.pushNotificationClient.Enabled() {
return nil, nil
@ -406,6 +428,29 @@ func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption
}()
}
// watchConnectionChange checks the connection status and call handleConnectionChange when this changes
func (m *Messenger) watchConnectionChange() {
m.logger.Debug("watching connection changes")
state := m.online()
go func() {
for {
select {
case <-time.After(200 * time.Millisecond):
newState := m.online()
if state != newState {
state = newState
m.logger.Debug("connection changed", zap.Bool("online", state))
m.handleConnectionChange(state)
}
case <-m.quit:
return
}
}
}()
}
// handlePushNotificationClientRegistration handles registration events
func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) {
go func() {
@ -522,6 +567,7 @@ func (m *Messenger) Shutdown() (err error) {
}
}
}
close(m.quit)
return
}

View File

@ -91,6 +91,10 @@ func (n *testNode) GetWhisper(_ interface{}) (types.Whisper, error) {
return nil, nil
}
func (c *testNode) PeersCount() int {
return 1
}
func (s *MessengerSuite) SetupTest() {
s.logger = tt.MustCreateTestLogger()

View File

@ -210,6 +210,16 @@ func (c *Client) Start() error {
return nil
}
func (c *Client) Offline() {
c.stopRegistrationLoop()
c.stopResendingLoop()
}
func (c *Client) Online() {
c.startRegistrationLoop()
c.startResendingLoop()
}
func (c *Client) publishOnRegistrationSubscriptions() {
// Publish on channels, drop if buffer is full
for _, s := range c.registrationSubscriptions {