status-go/protocol/encryption/publisher/publisher.go

129 lines
2.5 KiB
Go
Raw Permalink Normal View History

2019-07-17 22:25:42 +00:00
package publisher
import (
"crypto/ecdsa"
"errors"
"time"
"go.uber.org/zap"
2020-01-02 09:10:19 +00:00
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/logutils"
2019-07-17 22:25:42 +00:00
)
const (
// How often a ticker fires in seconds.
tickerInterval = 120
// How often we should publish a contact code in seconds.
publishInterval = 21600
// Cooldown period on acking messages when not targeting our device.
deviceNotFoundAckInterval = 7200
)
var (
errNotEnoughTimePassed = errors.New("not enough time passed")
)
type Publisher struct {
persistence *persistence
2019-07-17 22:25:42 +00:00
logger *zap.Logger
notifyCh chan struct{}
quit chan struct{}
}
func New(logger *zap.Logger) *Publisher {
2019-07-17 22:25:42 +00:00
if logger == nil {
logger = logutils.ZapLogger()
2019-07-17 22:25:42 +00:00
}
return &Publisher{
persistence: newPersistence(),
2019-07-17 22:25:42 +00:00
logger: logger.With(zap.Namespace("Publisher")),
}
}
func (p *Publisher) Start() <-chan struct{} {
logger := p.logger.With(zap.String("site", "Start"))
logger.Info("starting publisher")
2020-07-31 12:22:05 +00:00
p.notifyCh = make(chan struct{}, 100)
2019-07-17 22:25:42 +00:00
p.quit = make(chan struct{})
go p.tickerLoop()
return p.notifyCh
}
func (p *Publisher) Stop() {
2020-07-31 09:08:09 +00:00
// If hasn't started, ignore
if p.quit == nil {
return
}
2019-07-17 22:25:42 +00:00
select {
case _, ok := <-p.quit:
if !ok {
// channel already closed
return
}
default:
close(p.quit)
}
}
func (p *Publisher) tickerLoop() {
ticker := time.NewTicker(tickerInterval * time.Second)
go func() {
logger := p.logger.With(zap.String("site", "tickerLoop"))
for {
select {
case <-ticker.C:
err := p.notify()
switch err {
case errNotEnoughTimePassed:
logger.Debug("not enough time passed")
case nil:
// skip
default:
logger.Error("error while sending a contact code", zap.Error(err))
}
case <-p.quit:
ticker.Stop()
return
}
}
}()
}
func (p *Publisher) notify() error {
lastPublished := p.persistence.getLastPublished()
2019-07-17 22:25:42 +00:00
now := time.Now().Unix()
if now-lastPublished < publishInterval {
return errNotEnoughTimePassed
}
2020-07-31 12:22:05 +00:00
select {
case p.notifyCh <- struct{}{}:
default:
p.logger.Warn("publisher channel full, dropping message")
}
2019-07-17 22:25:42 +00:00
p.persistence.setLastPublished(now)
return nil
2019-07-17 22:25:42 +00:00
}
func (p *Publisher) ShouldAdvertiseBundle(publicKey *ecdsa.PublicKey, now int64) (bool, error) {
identity := crypto.CompressPubkey(publicKey)
lastAcked := p.persistence.lastAck(identity)
2019-07-17 22:25:42 +00:00
return now-lastAcked < deviceNotFoundAckInterval, nil
}
func (p *Publisher) SetLastAck(publicKey *ecdsa.PublicKey, now int64) {
identity := crypto.CompressPubkey(publicKey)
p.persistence.setLastAck(identity, now)
}