mirror of
https://github.com/logos-messaging/logos-delivery-go.git
synced 2026-06-28 17:19:30 +00:00
Merge 9f8361930df963ddee5be027ad0104b84055af3c into f3fc70002fd329b01abc194e4b31d829440253fc
This commit is contained in:
commit
3995c03636
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@ -57,6 +58,9 @@ type Sub struct {
|
||||
resubscribeInProgress bool
|
||||
id string
|
||||
errcnt int
|
||||
// backgroundMode suppresses subscription renewal when the app UI is not visible.
|
||||
// Toggled via SetBackgroundMode; read from subscriptionLoop goroutine.
|
||||
backgroundMode atomic.Bool
|
||||
// rateLimitedUntil is set when subscribe() observes a *SubscribeError whose
|
||||
// FailedPeers contain at least one HTTP 429. While time.Now().Before(rateLimitedUntil),
|
||||
// subscriptionLoop suppresses retry triggers (ticker push and checkAndResubscribe).
|
||||
@ -139,6 +143,11 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
apiSub.errcnt = 0 //reset errorCount
|
||||
if apiSub.backgroundMode.Load() {
|
||||
// In background: skip health check to avoid waking the LTE radio.
|
||||
// SetBackgroundMode(false) triggers resubscription on foreground.
|
||||
continue
|
||||
}
|
||||
if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) {
|
||||
apiSub.log.Debug("ticker push suppressed by rate-limit backoff",
|
||||
zap.Time("rate-limited-until", apiSub.rateLimitedUntil),
|
||||
@ -154,6 +163,14 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
|
||||
apiSub.cleanup()
|
||||
return
|
||||
case subId := <-apiSub.closing:
|
||||
if apiSub.backgroundMode.Load() {
|
||||
// In background: subscription expired but don't resubscribe now.
|
||||
// SetBackgroundMode(false) will trigger resubscription on foreground.
|
||||
apiSub.log.Debug("resubscribe suppressed: app in background",
|
||||
zap.String("sub-id", subId),
|
||||
)
|
||||
continue
|
||||
}
|
||||
if shouldHonourRateLimitBackoff(apiSub.rateLimitedUntil, time.Now()) {
|
||||
apiSub.log.Debug("checkAndResubscribe suppressed by rate-limit backoff",
|
||||
zap.Time("rate-limited-until", apiSub.rateLimitedUntil),
|
||||
@ -174,6 +191,23 @@ func (apiSub *Sub) subscriptionLoop(loopInterval time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetBackgroundMode controls whether this subscription suppresses renewal.
|
||||
// Call with background=true when the app UI is not visible (screen locked).
|
||||
// Call with background=false when returning to foreground; this triggers an
|
||||
// immediate resubscription attempt for any subscriptions that expired while
|
||||
// backgrounded.
|
||||
func (apiSub *Sub) SetBackgroundMode(background bool) {
|
||||
apiSub.backgroundMode.Store(background)
|
||||
if !background {
|
||||
// Returning to foreground: prod the loop to resubscribe.
|
||||
select {
|
||||
case apiSub.closing <- "":
|
||||
default:
|
||||
// A resubscription is already queued.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (apiSub *Sub) checkAndResubscribe(subId string) {
|
||||
var failedPeer peer.ID
|
||||
if subId != "" {
|
||||
|
||||
@ -47,9 +47,9 @@ type FilterManager struct {
|
||||
// a deadlock where SubscribeFilter would block on a full channel while still
|
||||
// holding mgr.Lock(), preventing the only drainer (checkAndProcessQueue, also
|
||||
// invoked under the same lock) from running.
|
||||
waitingToSubQueue []filterConfig
|
||||
envProcessor EnevelopeProcessor
|
||||
networkConnType byte
|
||||
waitingToSubQueue []filterConfig
|
||||
envProcessor EnevelopeProcessor
|
||||
networkConnType byte
|
||||
}
|
||||
|
||||
type SubDetails struct {
|
||||
@ -189,6 +189,22 @@ func (mgr *FilterManager) NetworkChange() {
|
||||
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
|
||||
}
|
||||
|
||||
// SetBackgroundMode notifies all active subscriptions of the app's visibility
|
||||
// state. When background=true, subscriptions suppress renewal keepalives to
|
||||
// avoid waking the LTE radio while the screen is locked. When background=false
|
||||
// (returning to foreground), each subscription immediately attempts to
|
||||
// resubscribe if its filter has expired.
|
||||
func (mgr *FilterManager) SetBackgroundMode(background bool) {
|
||||
mgr.Lock()
|
||||
defer mgr.Unlock()
|
||||
// Gate subscription renewal (api/filter layer)
|
||||
for _, subDetails := range mgr.filterSubscriptions {
|
||||
subDetails.sub.SetBackgroundMode(background)
|
||||
}
|
||||
// Gate health-check pings (protocol/filter layer) — the dominant LTE radio wakeup source
|
||||
mgr.node.SetBackgroundMode(background)
|
||||
}
|
||||
|
||||
// checkAndProcessQueue drains the offline-pending filter queue. For each batch
|
||||
// that matches the given pubsubTopic (or always, when pubsubTopic == ""), a
|
||||
// subscribe goroutine is spawned; non-matching batches are retained for a
|
||||
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
@ -57,6 +58,7 @@ type WakuFilterLightNode struct {
|
||||
pm *peermanager.PeerManager
|
||||
limiter *utils.RateLimiter
|
||||
peerPingInterval time.Duration
|
||||
backgroundMode atomic.Bool // true when app UI is not visible; suppresses health-check pings
|
||||
}
|
||||
|
||||
type WakuFilterPushError struct {
|
||||
|
||||
@ -42,6 +42,13 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
|
||||
}
|
||||
}
|
||||
|
||||
// SetBackgroundMode suppresses (background=true) or re-enables (background=false)
|
||||
// the periodic health-check pings sent to filter peers. Call with background=true
|
||||
// when the app UI is not visible to avoid waking the LTE radio during Doze windows.
|
||||
func (wf *WakuFilterLightNode) SetBackgroundMode(background bool) {
|
||||
wf.backgroundMode.Store(background)
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
|
||||
defer utils.LogOnPanic()
|
||||
defer wf.WaitGroup().Done()
|
||||
@ -50,6 +57,11 @@ func (wf *WakuFilterLightNode) FilterHealthCheckLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if wf.backgroundMode.Load() {
|
||||
// In background: skip health-check ping to avoid waking the LTE radio.
|
||||
// SetBackgroundMode(false) will resume pings on foreground return.
|
||||
continue
|
||||
}
|
||||
if wf.onlineChecker.IsOnline() {
|
||||
wf.PingPeers()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user