frank 38308d48f2
feat_: log on panic (#5849)
* feat_: log error and stacktrace when panic in goroutine

* test_: add test TestSafeGo

* chore_: rename logAndCall to call

* chore_: rename SafeGo to Go

* chore_: make lint-fix

* chore_: use t.Cleanup

* chore_: Revert "chore_: use t.Cleanup"

This reverts commit 4eb420d179cc0e208e84c13cb941e6b3d1ed9819.

* chore_: Revert "chore_: make lint-fix"

This reverts commit fcc995f157e671a4229b47419c3a0e4004b5fdab.

* chore_: Revert "chore_: rename SafeGo to Go"

This reverts commit a6d73d6df583f313032d79aac62f66328039cb55.

* chore_: Revert "chore_: rename logAndCall to call"

This reverts commit 8fbe993bedb9fbba67349a44f151e2dd5e3bc4cc.

* chore_: Revert "test_: add test TestSafeGo"

This reverts commit a1fa91839f3960398980c6bf456e6462ec944819.

* chore_: Revert "feat_: log error and stacktrace when panic in goroutine"

This reverts commit f612dd828fa2ce410d0e806fe773ecbe3e86a68a.

* feat_: log error and stacktrace when panic in goroutine

* chore_: make lint-fix

* chore_: rename logAndCall to call

* chore_: renaming LogOnPanic

* chore_: update rest goroutine function calls

* chore_: make lint-fix
2024-09-27 06:37:32 +08:00

131 lines
2.6 KiB
Go

package publisher
import (
"crypto/ecdsa"
"errors"
"time"
"go.uber.org/zap"
gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/logutils"
)
const (
// How often a ticker fires in seconds.
tickerInterval = 120
// How often we should publish a contact code in seconds.
publishInterval = 21600
// Cooldown period on acking messages when not targeting our device.
deviceNotFoundAckInterval = 7200
)
var (
errNotEnoughTimePassed = errors.New("not enough time passed")
)
type Publisher struct {
persistence *persistence
logger *zap.Logger
notifyCh chan struct{}
quit chan struct{}
}
func New(logger *zap.Logger) *Publisher {
if logger == nil {
logger = logutils.ZapLogger()
}
return &Publisher{
persistence: newPersistence(),
logger: logger.With(zap.Namespace("Publisher")),
}
}
func (p *Publisher) Start() <-chan struct{} {
logger := p.logger.With(zap.String("site", "Start"))
logger.Info("starting publisher")
p.notifyCh = make(chan struct{}, 100)
p.quit = make(chan struct{})
go p.tickerLoop()
return p.notifyCh
}
func (p *Publisher) Stop() {
// If hasn't started, ignore
if p.quit == nil {
return
}
select {
case _, ok := <-p.quit:
if !ok {
// channel already closed
return
}
default:
close(p.quit)
}
}
func (p *Publisher) tickerLoop() {
ticker := time.NewTicker(tickerInterval * time.Second)
go func() {
defer gocommon.LogOnPanic()
logger := p.logger.With(zap.String("site", "tickerLoop"))
for {
select {
case <-ticker.C:
err := p.notify()
switch err {
case errNotEnoughTimePassed:
logger.Debug("not enough time passed")
case nil:
// skip
default:
logger.Error("error while sending a contact code", zap.Error(err))
}
case <-p.quit:
ticker.Stop()
return
}
}
}()
}
func (p *Publisher) notify() error {
lastPublished := p.persistence.getLastPublished()
now := time.Now().Unix()
if now-lastPublished < publishInterval {
return errNotEnoughTimePassed
}
select {
case p.notifyCh <- struct{}{}:
default:
p.logger.Warn("publisher channel full, dropping message")
}
p.persistence.setLastPublished(now)
return nil
}
func (p *Publisher) ShouldAdvertiseBundle(publicKey *ecdsa.PublicKey, now int64) (bool, error) {
identity := crypto.CompressPubkey(publicKey)
lastAcked := p.persistence.lastAck(identity)
return now-lastAcked < deviceNotFoundAckInterval, nil
}
func (p *Publisher) SetLastAck(publicKey *ecdsa.PublicKey, now int64) {
identity := crypto.CompressPubkey(publicKey)
p.persistence.setLastAck(identity, now)
}