refactor_: expire envelope cache (#5061)

This commit is contained in:
richΛrd 2024-04-17 08:19:03 -04:00 committed by GitHub
parent d246699c5e
commit 3de2756660
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 61 additions and 37 deletions

2
go.mod
View File

@ -82,7 +82,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.1 github.com/cenkalti/backoff/v4 v4.2.1
github.com/gorilla/sessions v1.2.1 github.com/gorilla/sessions v1.2.1
github.com/ipfs/go-log/v2 v2.5.1 github.com/ipfs/go-log/v2 v2.5.1
github.com/jellydator/ttlcache/v3 v3.1.0 github.com/jellydator/ttlcache/v3 v3.2.0
github.com/jmoiron/sqlx v1.3.5 github.com/jmoiron/sqlx v1.3.5
github.com/ladydascalie/currency v1.6.0 github.com/ladydascalie/currency v1.6.0
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8

4
go.sum
View File

@ -1222,8 +1222,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.1.0 h1:0gPFG0IHHP6xyUyXq+JaD8fwkDCqgqwohXNJBcYE71g= github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.1.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=

View File

@ -148,6 +148,10 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] {
c.evict(EvictionReasonCapacityReached, c.items.lru.Back()) c.evict(EvictionReasonCapacityReached, c.items.lru.Back())
} }
if ttl == PreviousOrDefaultTTL {
ttl = c.options.ttl
}
// create a new item // create a new item
item := newItem(key, value, ttl, c.options.enableVersionTracking) item := newItem(key, value, ttl, c.options.enableVersionTracking)
elem = c.items.lru.PushFront(item) elem = c.items.lru.PushFront(item)
@ -478,6 +482,13 @@ func (c *Cache[K, V]) Items() map[K]*Item[K, V] {
// Range stops the iteration. // Range stops the iteration.
func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) {
c.items.mu.RLock() c.items.mu.RLock()
// Check if cache is empty
if c.items.lru.Len() == 0 {
c.items.mu.RUnlock()
return
}
for item := c.items.lru.Front(); item != c.items.lru.Back().Next(); item = item.Next() { for item := c.items.lru.Front(); item != c.items.lru.Back().Next(); item = item.Next() {
i := item.Value.(*Item[K, V]) i := item.Value.(*Item[K, V])
c.items.mu.RUnlock() c.items.mu.RUnlock()

View File

@ -9,6 +9,10 @@ const (
// NoTTL indicates that an item should never expire. // NoTTL indicates that an item should never expire.
NoTTL time.Duration = -1 NoTTL time.Duration = -1
// PreviousOrDefaultTTL indicates that existing TTL of item should be used
// default TTL will be used as fallback if item doesn't exist
PreviousOrDefaultTTL time.Duration = -2
// DefaultTTL indicates that the default TTL value of the cache // DefaultTTL indicates that the default TTL value of the cache
// instance should be used. // instance should be used.
DefaultTTL time.Duration = 0 DefaultTTL time.Duration = 0
@ -58,17 +62,23 @@ func (item *Item[K, V]) update(value V, ttl time.Duration) {
defer item.mu.Unlock() defer item.mu.Unlock()
item.value = value item.value = value
// update version if enabled
if item.version > -1 {
item.version++
}
// no need to update ttl or expiry in this case
if ttl == PreviousOrDefaultTTL {
return
}
item.ttl = ttl item.ttl = ttl
// reset expiration timestamp because the new TTL may be // reset expiration timestamp because the new TTL may be
// 0 or below // 0 or below
item.expiresAt = time.Time{} item.expiresAt = time.Time{}
item.touchUnsafe() item.touchUnsafe()
// update version if enabled
if item.version > -1 {
item.version++
}
} }
// touch updates the item's expiration timestamp. // touch updates the item's expiration timestamp.

2
vendor/modules.txt vendored
View File

@ -436,7 +436,7 @@ github.com/jackpal/go-nat-pmp
# github.com/jbenet/go-temp-err-catcher v0.1.0 # github.com/jbenet/go-temp-err-catcher v0.1.0
## explicit; go 1.13 ## explicit; go 1.13
github.com/jbenet/go-temp-err-catcher github.com/jbenet/go-temp-err-catcher
# github.com/jellydator/ttlcache/v3 v3.1.0 # github.com/jellydator/ttlcache/v3 v3.2.0
## explicit; go 1.18 ## explicit; go 1.18
github.com/jellydator/ttlcache/v3 github.com/jellydator/ttlcache/v3
# github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a # github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a

View File

@ -32,6 +32,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/jellydator/ttlcache/v3"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/identify" "github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
@ -78,6 +79,7 @@ const messageQueueLimit = 1024
const requestTimeout = 30 * time.Second const requestTimeout = 30 * time.Second
const bootnodesQueryBackoffMs = 200 const bootnodesQueryBackoffMs = 200
const bootnodesMaxRetries = 7 const bootnodesMaxRetries = 7
const cacheTTL = 20 * time.Minute
type ITelemetryClient interface { type ITelemetryClient interface {
PushReceivedEnvelope(*protocol.Envelope) PushReceivedEnvelope(*protocol.Envelope)
@ -101,7 +103,7 @@ type Waku struct {
symKeys map[string][]byte // Symmetric key storage symKeys map[string][]byte // Symmetric key storage
keyMu sync.RWMutex // Mutex associated with key stores keyMu sync.RWMutex // Mutex associated with key stores
envelopes map[gethcommon.Hash]*common.ReceivedMessage // Pool of envelopes currently tracked by this node envelopeCache *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] // Pool of envelopes currently tracked by this node
expirations map[uint32]mapset.Set // Message expiration pool expirations map[uint32]mapset.Set // Message expiration pool
poolMu sync.RWMutex // Mutex to sync the message and expiration pools poolMu sync.RWMutex // Mutex to sync the message and expiration pools
@ -155,6 +157,12 @@ func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) {
w.statusTelemetryClient = client w.statusTelemetryClient = client
} }
func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] {
cache := ttlcache.New[gethcommon.Hash, *common.ReceivedMessage](ttlcache.WithTTL[gethcommon.Hash, *common.ReceivedMessage](cacheTTL))
go cache.Start()
return cache
}
// New creates a WakuV2 client ready to communicate through the LibP2P network. // New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) { func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *sql.DB, ts *timesource.NTPTimeSource, onHistoricMessagesRequestFailed func([]byte, peer.ID, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
var err error var err error
@ -183,7 +191,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
cfg: cfg, cfg: cfg,
privateKeys: make(map[string]*ecdsa.PrivateKey), privateKeys: make(map[string]*ecdsa.PrivateKey),
symKeys: make(map[string][]byte), symKeys: make(map[string][]byte),
envelopes: make(map[gethcommon.Hash]*common.ReceivedMessage), envelopeCache: newTTLCache(),
expirations: make(map[uint32]mapset.Set), expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *protocol.Envelope, 1000), sendQueue: make(chan *protocol.Envelope, 1000),
@ -1035,7 +1043,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
w.sendQueue <- envelope w.sendQueue <- envelope
w.poolMu.Lock() w.poolMu.Lock()
_, alreadyCached := w.envelopes[gethcommon.BytesToHash(envelope.Hash())] alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash()))
w.poolMu.Unlock() w.poolMu.Unlock()
if !alreadyCached { if !alreadyCached {
recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType) recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType)
@ -1280,6 +1288,8 @@ func (w *Waku) setupRelaySubscriptions() error {
func (w *Waku) Stop() error { func (w *Waku) Stop() error {
w.cancel() w.cancel()
w.envelopeCache.Stop()
err := w.identifyService.Close() err := w.identifyService.Close()
if err != nil { if err != nil {
return err return err
@ -1344,21 +1354,19 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
// addEnvelope adds an envelope to the envelope map, used for sending // addEnvelope adds an envelope to the envelope map, used for sending
func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) { func (w *Waku) addEnvelope(envelope *common.ReceivedMessage) {
hash := envelope.Hash()
w.poolMu.Lock() w.poolMu.Lock()
w.envelopes[hash] = envelope w.envelopeCache.Set(envelope.Hash(), envelope, ttlcache.DefaultTTL)
w.poolMu.Unlock() w.poolMu.Unlock()
} }
func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) (bool, error) { func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) (bool, error) {
common.EnvelopesReceivedCounter.Inc() common.EnvelopesReceivedCounter.Inc()
hash := recvMessage.Hash()
w.poolMu.Lock() w.poolMu.Lock()
envelope, alreadyCached := w.envelopes[hash] envelope := w.envelopeCache.Get(recvMessage.Hash())
alreadyCached := envelope != nil
w.poolMu.Unlock() w.poolMu.Unlock()
if !alreadyCached { if !alreadyCached {
recvMessage.Processed.Store(false) recvMessage.Processed.Store(false)
w.addEnvelope(recvMessage) w.addEnvelope(recvMessage)
@ -1375,7 +1383,7 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool)
common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload))) common.EnvelopesSizeMeter.Observe(float64(len(recvMessage.Envelope.Message().Payload)))
} }
if !alreadyCached || !envelope.Processed.Load() { if !alreadyCached || !envelope.Value().Processed.Load() {
if processImmediately { if processImmediately {
logger.Debug("immediately processing envelope") logger.Debug("immediately processing envelope")
w.processReceivedMessage(recvMessage) w.processReceivedMessage(recvMessage)
@ -1444,24 +1452,18 @@ func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) {
}) })
} }
// Envelopes retrieves all the messages currently pooled by the node.
func (w *Waku) Envelopes() []*common.ReceivedMessage {
w.poolMu.RLock()
defer w.poolMu.RUnlock()
all := make([]*common.ReceivedMessage, 0, len(w.envelopes))
for _, envelope := range w.envelopes {
all = append(all, envelope)
}
return all
}
// GetEnvelope retrieves an envelope from the message queue by its hash. // GetEnvelope retrieves an envelope from the message queue by its hash.
// It returns nil if the envelope can not be found. // It returns nil if the envelope can not be found.
func (w *Waku) GetEnvelope(hash gethcommon.Hash) *common.ReceivedMessage { func (w *Waku) GetEnvelope(hash gethcommon.Hash) *common.ReceivedMessage {
w.poolMu.RLock() w.poolMu.RLock()
defer w.poolMu.RUnlock() defer w.poolMu.RUnlock()
return w.envelopes[hash]
envelope := w.envelopeCache.Get(hash)
if envelope == nil {
return nil
}
return envelope.Value()
} }
// isEnvelopeCached checks if envelope with specific hash has already been received and cached. // isEnvelopeCached checks if envelope with specific hash has already been received and cached.
@ -1469,14 +1471,15 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool {
w.poolMu.Lock() w.poolMu.Lock()
defer w.poolMu.Unlock() defer w.poolMu.Unlock()
_, exist := w.envelopes[hash] return w.envelopeCache.Has(hash)
return exist
} }
func (w *Waku) ClearEnvelopesCache() { func (w *Waku) ClearEnvelopesCache() {
w.poolMu.Lock() w.poolMu.Lock()
defer w.poolMu.Unlock() defer w.poolMu.Unlock()
w.envelopes = make(map[gethcommon.Hash]*common.ReceivedMessage)
w.envelopeCache.Stop()
w.envelopeCache = newTTLCache()
} }
func (w *Waku) PeerCount() int { func (w *Waku) PeerCount() int {