package publisher import ( "crypto/ecdsa" "errors" "time" "go.uber.org/zap" gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/logutils" ) const ( // How often a ticker fires in seconds. tickerInterval = 120 // How often we should publish a contact code in seconds. publishInterval = 21600 // Cooldown period on acking messages when not targeting our device. deviceNotFoundAckInterval = 7200 ) var ( errNotEnoughTimePassed = errors.New("not enough time passed") ) type Publisher struct { persistence *persistence logger *zap.Logger notifyCh chan struct{} quit chan struct{} } func New(logger *zap.Logger) *Publisher { if logger == nil { logger = logutils.ZapLogger() } return &Publisher{ persistence: newPersistence(), logger: logger.With(zap.Namespace("Publisher")), } } func (p *Publisher) Start() <-chan struct{} { logger := p.logger.With(zap.String("site", "Start")) logger.Info("starting publisher") p.notifyCh = make(chan struct{}, 100) p.quit = make(chan struct{}) go p.tickerLoop() return p.notifyCh } func (p *Publisher) Stop() { // If hasn't started, ignore if p.quit == nil { return } select { case _, ok := <-p.quit: if !ok { // channel already closed return } default: close(p.quit) } } func (p *Publisher) tickerLoop() { defer gocommon.LogOnPanic() ticker := time.NewTicker(tickerInterval * time.Second) go func() { defer gocommon.LogOnPanic() logger := p.logger.With(zap.String("site", "tickerLoop")) for { select { case <-ticker.C: err := p.notify() switch err { case errNotEnoughTimePassed: logger.Debug("not enough time passed") case nil: // skip default: logger.Error("error while sending a contact code", zap.Error(err)) } case <-p.quit: ticker.Stop() return } } }() } func (p *Publisher) notify() error { lastPublished := p.persistence.getLastPublished() now := time.Now().Unix() if now-lastPublished < publishInterval { return errNotEnoughTimePassed } select { case p.notifyCh <- struct{}{}: default: p.logger.Warn("publisher channel full, dropping message") } p.persistence.setLastPublished(now) return nil } func (p *Publisher) ShouldAdvertiseBundle(publicKey *ecdsa.PublicKey, now int64) (bool, error) { identity := crypto.CompressPubkey(publicKey) lastAcked := p.persistence.lastAck(identity) return now-lastAcked < deviceNotFoundAckInterval, nil } func (p *Publisher) SetLastAck(publicKey *ecdsa.PublicKey, now int64) { identity := crypto.CompressPubkey(publicKey) p.persistence.setLastAck(identity, now) }