From 2ef1aa89f422b71e320b4d2a1a1b3b6a38735895 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Thu, 27 Aug 2020 14:38:59 +0200 Subject: [PATCH] Handle connection change Currently messenger has no notion of being online. This might cause a problem as we retry to register with a push notification server even if not connected to any peer, which will inevitably fail. This commit adds a method `handleConnectionChange` that will be called every time the connection change state. --- eth-node/bridge/geth/node.go | 4 ++ eth-node/types/node.go | 1 + protocol/messenger.go | 46 +++++++++++++++++++++++ protocol/messenger_test.go | 4 ++ protocol/pushnotificationclient/client.go | 10 +++++ 5 files changed, 65 insertions(+) diff --git a/eth-node/bridge/geth/node.go b/eth-node/bridge/geth/node.go index a629ee835..fe89734eb 100644 --- a/eth-node/bridge/geth/node.go +++ b/eth-node/bridge/geth/node.go @@ -99,3 +99,7 @@ func (w *gethNodeWrapper) RemovePeer(url string) error { return nil } + +func (w *gethNodeWrapper) PeersCount() int { + return len(w.stack.Server().Peers()) +} diff --git a/eth-node/types/node.go b/eth-node/types/node.go index 4c80c2cb8..03b8085a1 100644 --- a/eth-node/types/node.go +++ b/eth-node/types/node.go @@ -22,4 +22,5 @@ type Node interface { GetWaku(ctx interface{}) (Waku, error) AddPeer(url string) error RemovePeer(url string) error + PeersCount() int } diff --git a/protocol/messenger.go b/protocol/messenger.go index 15037f958..b98983c2e 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -78,6 +78,7 @@ type Messenger struct { installationID string mailserver []byte database *sql.DB + quit chan struct{} mutex sync.Mutex } @@ -249,6 +250,7 @@ func NewMessenger( messagesPersistenceEnabled: c.messagesPersistenceEnabled, verifyTransactionClient: c.verifyTransactionClient, database: database, + quit: make(chan struct{}), shutdownTasks: []func() error{ database.Close, pushNotificationClient.Stop, @@ -301,9 +303,29 @@ func (m *Messenger) Start() error { } m.handleEncryptionLayerSubscriptions(subscriptions) + m.handleConnectionChange(m.online()) + m.watchConnectionChange() return nil } +// handle connection change is called each time we go from offline/online or viceversa +func (m *Messenger) handleConnectionChange(online bool) { + if online { + if m.pushNotificationClient != nil { + m.pushNotificationClient.Online() + } + + } else { + if m.pushNotificationClient != nil { + m.pushNotificationClient.Offline() + } + } +} + +func (m *Messenger) online() bool { + return m.node.PeersCount() > 0 +} + func (m *Messenger) buildContactCodeAdvertisement() (*protobuf.ContactCodeAdvertisement, error) { if m.pushNotificationClient == nil || !m.pushNotificationClient.Enabled() { return nil, nil @@ -406,6 +428,29 @@ func (m *Messenger) handleEncryptionLayerSubscriptions(subscriptions *encryption }() } +// watchConnectionChange checks the connection status and call handleConnectionChange when this changes +func (m *Messenger) watchConnectionChange() { + m.logger.Debug("watching connection changes") + state := m.online() + go func() { + for { + select { + case <-time.After(200 * time.Millisecond): + newState := m.online() + if state != newState { + state = newState + m.logger.Debug("connection changed", zap.Bool("online", state)) + m.handleConnectionChange(state) + } + case <-m.quit: + return + } + + } + + }() +} + // handlePushNotificationClientRegistration handles registration events func (m *Messenger) handlePushNotificationClientRegistrations(c chan struct{}) { go func() { @@ -522,6 +567,7 @@ func (m *Messenger) Shutdown() (err error) { } } } + close(m.quit) return } diff --git a/protocol/messenger_test.go b/protocol/messenger_test.go index a22e19397..c37ce6278 100644 --- a/protocol/messenger_test.go +++ b/protocol/messenger_test.go @@ -91,6 +91,10 @@ func (n *testNode) GetWhisper(_ interface{}) (types.Whisper, error) { return nil, nil } +func (c *testNode) PeersCount() int { + return 1 +} + func (s *MessengerSuite) SetupTest() { s.logger = tt.MustCreateTestLogger() diff --git a/protocol/pushnotificationclient/client.go b/protocol/pushnotificationclient/client.go index d645051d7..16f0c3c03 100644 --- a/protocol/pushnotificationclient/client.go +++ b/protocol/pushnotificationclient/client.go @@ -210,6 +210,16 @@ func (c *Client) Start() error { return nil } +func (c *Client) Offline() { + c.stopRegistrationLoop() + c.stopResendingLoop() +} + +func (c *Client) Online() { + c.startRegistrationLoop() + c.startResendingLoop() +} + func (c *Client) publishOnRegistrationSubscriptions() { // Publish on channels, drop if buffer is full for _, s := range c.registrationSubscriptions {