mirror of
https://github.com/logos-messaging/go-libp2p-pubsub.git
synced 2026-01-02 12:53:09 +00:00
feat: expire messages from the cache based on last seen time (#513)
* feat: expire messages from the cache based on last seen time * chore: minor renaming * fix: messages should not be found after expiration * chore: editorial * fix: use new time cache strategy consistently * fix: default to old time cache and add todo for background gc
This commit is contained in:
parent
3d2eab3572
commit
973fef56e1
12
blacklist.go
12
blacklist.go
@ -1,11 +1,11 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/whyrusleeping/timecache"
|
||||
|
||||
"github.com/libp2p/go-libp2p-pubsub/timecache"
|
||||
)
|
||||
|
||||
// Blacklist is an interface for peer blacklisting.
|
||||
@ -34,8 +34,7 @@ func (b MapBlacklist) Contains(p peer.ID) bool {
|
||||
|
||||
// TimeCachedBlacklist is a blacklist implementation using a time cache
|
||||
type TimeCachedBlacklist struct {
|
||||
sync.RWMutex
|
||||
tc *timecache.TimeCache
|
||||
tc timecache.TimeCache
|
||||
}
|
||||
|
||||
// NewTimeCachedBlacklist creates a new TimeCachedBlacklist with the given expiry duration
|
||||
@ -46,8 +45,6 @@ func NewTimeCachedBlacklist(expiry time.Duration) (Blacklist, error) {
|
||||
|
||||
// Add returns a bool saying whether Add of peer was successful
|
||||
func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
s := p.String()
|
||||
if b.tc.Has(s) {
|
||||
return false
|
||||
@ -57,8 +54,5 @@ func (b *TimeCachedBlacklist) Add(p peer.ID) bool {
|
||||
}
|
||||
|
||||
func (b *TimeCachedBlacklist) Contains(p peer.ID) bool {
|
||||
b.RLock()
|
||||
defer b.RUnlock()
|
||||
|
||||
return b.tc.Has(p.String())
|
||||
}
|
||||
|
||||
6
go.mod
6
go.mod
@ -4,13 +4,15 @@ go 1.17
|
||||
|
||||
require (
|
||||
github.com/benbjohnson/clock v1.3.0
|
||||
github.com/emirpasic/gods v1.18.1
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
github.com/ipfs/go-log v1.0.5
|
||||
github.com/libp2p/go-buffer-pool v0.1.0
|
||||
github.com/libp2p/go-libp2p v0.22.0
|
||||
github.com/libp2p/go-libp2p-testing v0.12.0
|
||||
github.com/libp2p/go-msgio v0.2.0
|
||||
github.com/multiformats/go-multiaddr v0.6.0
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
|
||||
github.com/multiformats/go-varint v0.0.6
|
||||
)
|
||||
|
||||
require (
|
||||
@ -36,7 +38,6 @@ require (
|
||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||
github.com/klauspost/compress v1.15.1 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
|
||||
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
|
||||
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
|
||||
github.com/libp2p/go-netroute v0.2.0 // indirect
|
||||
github.com/libp2p/go-openssl v0.1.0 // indirect
|
||||
@ -62,7 +63,6 @@ require (
|
||||
github.com/multiformats/go-multicodec v0.5.0 // indirect
|
||||
github.com/multiformats/go-multihash v0.2.1 // indirect
|
||||
github.com/multiformats/go-multistream v0.3.3 // indirect
|
||||
github.com/multiformats/go-varint v0.0.6 // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
github.com/onsi/ginkgo v1.16.5 // indirect
|
||||
github.com/opencontainers/runtime-spec v1.0.2 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@ -125,6 +125,8 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
|
||||
github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
|
||||
github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4=
|
||||
github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs=
|
||||
github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
|
||||
github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
@ -544,8 +546,6 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
|
||||
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
|
||||
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee h1:lYbXeSvJi5zk5GLKVuid9TVjS9a0OmLIDKTfoZBL6Ow=
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee/go.mod h1:m2aV4LZI4Aez7dP5PMyVKEHhUyEJ/RjmPEDOpDvudHg=
|
||||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
|
||||
24
pubsub.go
24
pubsub.go
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
"github.com/libp2p/go-libp2p-pubsub/timecache"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/discovery"
|
||||
@ -20,7 +21,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/whyrusleeping/timecache"
|
||||
)
|
||||
|
||||
// DefaultMaximumMessageSize is 1mb.
|
||||
@ -31,6 +31,10 @@ var (
|
||||
// Use WithSeenMessagesTTL to configure this per pubsub instance, instead of overriding the global default.
|
||||
TimeCacheDuration = 120 * time.Second
|
||||
|
||||
// TimeCacheStrategy specifies which type of lookup/cleanup strategy is used by the seen messages cache.
|
||||
// Use WithSeenMessagesStrategy to configure this per pubsub instance, instead of overriding the global default.
|
||||
TimeCacheStrategy = timecache.Strategy_FirstSeen
|
||||
|
||||
// ErrSubscriptionCancelled may be returned when a subscription Next() is called after the
|
||||
// subscription has been cancelled.
|
||||
ErrSubscriptionCancelled = errors.New("subscription cancelled")
|
||||
@ -148,9 +152,10 @@ type PubSub struct {
|
||||
inboundStreamsMx sync.Mutex
|
||||
inboundStreams map[peer.ID]network.Stream
|
||||
|
||||
seenMessagesMx sync.Mutex
|
||||
seenMessages *timecache.TimeCache
|
||||
seenMsgTTL time.Duration
|
||||
seenMessagesMx sync.Mutex
|
||||
seenMessages timecache.TimeCache
|
||||
seenMsgTTL time.Duration
|
||||
seenMsgStrategy timecache.Strategy
|
||||
|
||||
// generator used to compute the ID for a message
|
||||
idGen *msgIDGenerator
|
||||
@ -286,6 +291,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
blacklist: NewMapBlacklist(),
|
||||
blacklistPeer: make(chan peer.ID),
|
||||
seenMsgTTL: TimeCacheDuration,
|
||||
seenMsgStrategy: TimeCacheStrategy,
|
||||
idGen: newMsgIdGenerator(),
|
||||
counter: uint64(time.Now().UnixNano()),
|
||||
}
|
||||
@ -307,7 +313,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
|
||||
}
|
||||
}
|
||||
|
||||
ps.seenMessages = timecache.NewTimeCache(ps.seenMsgTTL)
|
||||
ps.seenMessages = timecache.NewTimeCacheWithStrategy(ps.seenMsgStrategy, ps.seenMsgTTL)
|
||||
|
||||
if err := ps.disc.Start(ps); err != nil {
|
||||
return nil, err
|
||||
@ -533,6 +539,14 @@ func WithSeenMessagesTTL(ttl time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithSeenMessagesStrategy configures which type of lookup/cleanup strategy is used by the seen messages cache
|
||||
func WithSeenMessagesStrategy(strategy timecache.Strategy) Option {
|
||||
return func(ps *PubSub) error {
|
||||
ps.seenMsgStrategy = strategy
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAppSpecificRpcInspector sets a hook that inspect incomings RPCs prior to
|
||||
// processing them. The inspector is invoked on an accepted RPC just before it
|
||||
// is handled. If inspector's error is nil, the RPC is handled. Otherwise, it
|
||||
|
||||
71
timecache/first_seen_cache.go
Normal file
71
timecache/first_seen_cache.go
Normal file
@ -0,0 +1,71 @@
|
||||
package timecache
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// FirstSeenCache is a thread-safe copy of https://github.com/whyrusleeping/timecache.
|
||||
type FirstSeenCache struct {
|
||||
q *list.List
|
||||
m map[string]time.Time
|
||||
span time.Duration
|
||||
guard *sync.RWMutex
|
||||
}
|
||||
|
||||
func newFirstSeenCache(span time.Duration) TimeCache {
|
||||
return &FirstSeenCache{
|
||||
q: list.New(),
|
||||
m: make(map[string]time.Time),
|
||||
span: span,
|
||||
guard: new(sync.RWMutex),
|
||||
}
|
||||
}
|
||||
|
||||
func (tc FirstSeenCache) Add(s string) {
|
||||
tc.guard.Lock()
|
||||
defer tc.guard.Unlock()
|
||||
|
||||
_, ok := tc.m[s]
|
||||
if ok {
|
||||
panic("putting the same entry twice not supported")
|
||||
}
|
||||
|
||||
// TODO(#515): Do GC in the background
|
||||
tc.sweep()
|
||||
|
||||
tc.m[s] = time.Now()
|
||||
tc.q.PushFront(s)
|
||||
}
|
||||
|
||||
func (tc FirstSeenCache) sweep() {
|
||||
for {
|
||||
back := tc.q.Back()
|
||||
if back == nil {
|
||||
return
|
||||
}
|
||||
|
||||
v := back.Value.(string)
|
||||
t, ok := tc.m[v]
|
||||
if !ok {
|
||||
panic("inconsistent cache state")
|
||||
}
|
||||
|
||||
if time.Since(t) > tc.span {
|
||||
tc.q.Remove(back)
|
||||
delete(tc.m, v)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc FirstSeenCache) Has(s string) bool {
|
||||
tc.guard.RLock()
|
||||
defer tc.guard.RUnlock()
|
||||
|
||||
ts, ok := tc.m[s]
|
||||
// Only consider the entry found if it was present in the cache AND hadn't already expired.
|
||||
return ok && time.Since(ts) <= tc.span
|
||||
}
|
||||
39
timecache/first_seen_cache_test.go
Normal file
39
timecache/first_seen_cache_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package timecache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFirstSeenCacheFound(t *testing.T) {
|
||||
tc := newFirstSeenCache(time.Minute)
|
||||
|
||||
tc.Add("test")
|
||||
|
||||
if !tc.Has("test") {
|
||||
t.Fatal("should have this key")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFirstSeenCacheExpire(t *testing.T) {
|
||||
tc := newFirstSeenCache(time.Second)
|
||||
for i := 0; i < 11; i++ {
|
||||
tc.Add(fmt.Sprint(i))
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
if tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFirstSeenCacheNotFoundAfterExpire(t *testing.T) {
|
||||
tc := newFirstSeenCache(time.Second)
|
||||
tc.Add(fmt.Sprint(0))
|
||||
time.Sleep(1100 * time.Millisecond)
|
||||
|
||||
if tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
}
|
||||
84
timecache/last_seen_cache.go
Normal file
84
timecache/last_seen_cache.go
Normal file
@ -0,0 +1,84 @@
|
||||
package timecache
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/emirpasic/gods/maps/linkedhashmap"
|
||||
)
|
||||
|
||||
// LastSeenCache is a LRU cache that keeps entries for up to a specified time duration. After this duration has elapsed,
|
||||
// "old" entries will be purged from the cache.
|
||||
//
|
||||
// It's also a "sliding window" cache. Every time an unexpired entry is seen again, its timestamp slides forward. This
|
||||
// keeps frequently occurring entries cached and prevents them from being propagated, especially because of network
|
||||
// issues that might increase the number of duplicate messages in the network.
|
||||
//
|
||||
// Garbage collection of expired entries is event-driven, i.e. it only happens when there is a new entry added to the
|
||||
// cache. This should be ok - if existing entries are being looked up then the cache is not growing, and when a new one
|
||||
// appears that would grow the cache, garbage collection will attempt to reduce the pressure on the cache.
|
||||
//
|
||||
// This implementation is heavily inspired by https://github.com/whyrusleeping/timecache.
|
||||
type LastSeenCache struct {
|
||||
m *linkedhashmap.Map
|
||||
span time.Duration
|
||||
guard *sync.Mutex
|
||||
}
|
||||
|
||||
func newLastSeenCache(span time.Duration) TimeCache {
|
||||
return &LastSeenCache{
|
||||
m: linkedhashmap.New(),
|
||||
span: span,
|
||||
guard: new(sync.Mutex),
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *LastSeenCache) Add(s string) {
|
||||
tc.guard.Lock()
|
||||
defer tc.guard.Unlock()
|
||||
|
||||
tc.add(s)
|
||||
|
||||
// Garbage collect expired entries
|
||||
// TODO(#515): Do GC in the background
|
||||
tc.gc()
|
||||
}
|
||||
|
||||
func (tc *LastSeenCache) add(s string) {
|
||||
// We don't need a lock here because this function is always called with the lock already acquired.
|
||||
|
||||
// If an entry already exists, remove it and add a new one to the back of the list to maintain temporal ordering and
|
||||
// an accurate sliding window.
|
||||
tc.m.Remove(s)
|
||||
now := time.Now()
|
||||
tc.m.Put(s, &now)
|
||||
}
|
||||
|
||||
func (tc *LastSeenCache) gc() {
|
||||
// We don't need a lock here because this function is always called with the lock already acquired.
|
||||
iter := tc.m.Iterator()
|
||||
for iter.Next() {
|
||||
key := iter.Key()
|
||||
ts := iter.Value().(*time.Time)
|
||||
// Exit if we've found an entry with an unexpired timestamp. Since we're iterating in order of insertion, all
|
||||
// entries hereafter will be unexpired.
|
||||
if time.Since(*ts) <= tc.span {
|
||||
return
|
||||
}
|
||||
tc.m.Remove(key)
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *LastSeenCache) Has(s string) bool {
|
||||
tc.guard.Lock()
|
||||
defer tc.guard.Unlock()
|
||||
|
||||
// If the entry exists and has not already expired, slide it forward.
|
||||
if ts, found := tc.m.Get(s); found {
|
||||
if t := ts.(*time.Time); time.Since(*t) <= tc.span {
|
||||
tc.add(s)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
84
timecache/last_seen_cache_test.go
Normal file
84
timecache/last_seen_cache_test.go
Normal file
@ -0,0 +1,84 @@
|
||||
package timecache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestLastSeenCacheFound(t *testing.T) {
|
||||
tc := newLastSeenCache(time.Minute)
|
||||
|
||||
tc.Add("test")
|
||||
|
||||
if !tc.Has("test") {
|
||||
t.Fatal("should have this key")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLastSeenCacheExpire(t *testing.T) {
|
||||
tc := newLastSeenCache(time.Second)
|
||||
for i := 0; i < 11; i++ {
|
||||
tc.Add(fmt.Sprint(i))
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
if tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLastSeenCacheSlideForward(t *testing.T) {
|
||||
tc := newLastSeenCache(time.Second)
|
||||
i := 0
|
||||
|
||||
// T0ms: Add 8 entries with a 100ms sleep after each
|
||||
for i < 8 {
|
||||
tc.Add(fmt.Sprint(i))
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
i++
|
||||
}
|
||||
|
||||
// T800ms: Lookup the first entry - this should slide the entry forward so that its expiration is a full second
|
||||
// later.
|
||||
if !tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have this key")
|
||||
}
|
||||
|
||||
// T800ms: Wait till after the first and second entries would have normally expired (had we not looked the first
|
||||
// entry up).
|
||||
time.Sleep(time.Millisecond * 400)
|
||||
|
||||
// T1200ms: The first entry should still be present in the cache - this will also slide the entry forward.
|
||||
if !tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should still have this key")
|
||||
}
|
||||
|
||||
// T1200ms: The second entry should have expired
|
||||
if tc.Has(fmt.Sprint(1)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
|
||||
// T1200ms: Sleep till the first entry actually expires
|
||||
time.Sleep(time.Millisecond * 1100)
|
||||
|
||||
// T2300ms: Now the first entry should have expired
|
||||
if tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
|
||||
// And it should not have been added back
|
||||
if tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLastSeenCacheNotFoundAfterExpire(t *testing.T) {
|
||||
tc := newLastSeenCache(time.Second)
|
||||
tc.Add(fmt.Sprint(0))
|
||||
time.Sleep(1100 * time.Millisecond)
|
||||
|
||||
if tc.Has(fmt.Sprint(0)) {
|
||||
t.Fatal("should have dropped this from the cache already")
|
||||
}
|
||||
}
|
||||
32
timecache/time_cache.go
Normal file
32
timecache/time_cache.go
Normal file
@ -0,0 +1,32 @@
|
||||
package timecache
|
||||
|
||||
import "time"
|
||||
|
||||
type Strategy uint8
|
||||
|
||||
const (
|
||||
Strategy_FirstSeen Strategy = iota
|
||||
Strategy_LastSeen
|
||||
)
|
||||
|
||||
type TimeCache interface {
|
||||
Add(string)
|
||||
Has(string) bool
|
||||
}
|
||||
|
||||
// NewTimeCache defaults to the original ("first seen") cache implementation
|
||||
func NewTimeCache(span time.Duration) TimeCache {
|
||||
return NewTimeCacheWithStrategy(Strategy_FirstSeen, span)
|
||||
}
|
||||
|
||||
func NewTimeCacheWithStrategy(strategy Strategy, span time.Duration) TimeCache {
|
||||
switch strategy {
|
||||
case Strategy_FirstSeen:
|
||||
return newFirstSeenCache(span)
|
||||
case Strategy_LastSeen:
|
||||
return newLastSeenCache(span)
|
||||
default:
|
||||
// Default to the original time cache implementation
|
||||
return newFirstSeenCache(span)
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user