decaying tags: support removal and closure. (#72)

This commit is contained in:
Raúl Kripalani 2020-05-15 09:27:44 +01:00 committed by GitHub
parent bce720e935
commit 9fba5602d0
2 changed files with 194 additions and 18 deletions

View File

@ -22,6 +22,12 @@ type bumpCmd struct {
delta int
}
// removeCmd represents a tag removal command.
type removeCmd struct {
peer peer.ID
tag *decayingTag
}
// decayer tracks and manages all decaying tags and their values.
type decayer struct {
cfg *DecayerCfg
@ -34,8 +40,10 @@ type decayer struct {
// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value
// bumpCh queues bump commands to be processed by the loop.
bumpCh chan bumpCmd
// bumpTagCh queues bump commands to be processed by the loop.
bumpTagCh chan bumpCmd
removeTagCh chan removeCmd
closeTagCh chan *decayingTag
// closure thingies.
closeCh chan struct{}
@ -70,13 +78,15 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
}
d := &decayer{
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpCh: make(chan bumpCmd, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpTagCh: make(chan bumpCmd, 128),
removeTagCh: make(chan removeCmd, 128),
closeTagCh: make(chan *decayingTag, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}
d.lastTick.Store(d.clock.Now())
@ -203,7 +213,7 @@ func (d *decayer) process() {
delete(visit, tag)
}
case bmp = <-d.bumpCh:
case bmp = <-d.bumpTagCh:
var (
now = d.clock.Now()
peer, tag = bmp.peer, bmp.tag
@ -231,9 +241,42 @@ func (d *decayer) process() {
s.Unlock()
case rm := <-d.removeTagCh:
s := d.mgr.segments.get(rm.peer)
s.Lock()
p := s.tagInfoFor(rm.peer)
v, ok := p.decaying[rm.tag]
if !ok {
s.Unlock()
continue
}
p.value -= v.Value
delete(p.decaying, rm.tag)
s.Unlock()
case t := <-d.closeTagCh:
// Stop tracking the tag.
d.tagsMu.Lock()
delete(d.knownTags, t.name)
d.tagsMu.Unlock()
// Remove the tag from all peers that had it in the connmgr.
for _, s := range d.mgr.segments {
// visit all segments, and attempt to remove the tag from all the peers it stores.
s.Lock()
for _, p := range s.peers {
if dt, ok := p.decaying[t]; ok {
// decrease the value of the tagInfo, and delete the tag.
p.value -= dt.Value
delete(p.decaying, t)
}
}
s.Unlock()
}
case <-d.closeCh:
return
}
}
}
@ -247,6 +290,10 @@ type decayingTag struct {
nextTick time.Time
decayFn connmgr.DecayFn
bumpFn connmgr.BumpFn
// closed marks this tag as closed, so that if it's bumped after being
// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
closed int32
}
var _ connmgr.DecayingTag = (*decayingTag)(nil)
@ -261,18 +308,49 @@ func (t *decayingTag) Interval() time.Duration {
// Bump bumps a tag for this peer.
func (t *decayingTag) Bump(p peer.ID, delta int) error {
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
}
bmp := bumpCmd{peer: p, tag: t, delta: delta}
select {
case t.trkr.bumpCh <- bmp:
case t.trkr.bumpTagCh <- bmp:
return nil
default:
return fmt.Errorf(
"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
p.Pretty(),
t.name,
delta,
len(t.trkr.bumpCh))
p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
}
}
func (t *decayingTag) Remove(p peer.ID) error {
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
}
rm := removeCmd{peer: p, tag: t}
select {
case t.trkr.removeTagCh <- rm:
return nil
default:
return fmt.Errorf(
"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
p.Pretty(), t.name, len(t.trkr.removeTagCh))
}
}
func (t *decayingTag) Close() error {
if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
return nil
}
select {
case t.trkr.closeTagCh <- t:
return nil
default:
return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
}
}

View File

@ -283,7 +283,7 @@ func TestResolutionMisaligned(t *testing.T) {
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
// nothing has happened.
// first tick.
mockClock.Add(TestResolution)
require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])
@ -301,6 +301,104 @@ func TestResolutionMisaligned(t *testing.T) {
require.Equal(1997, mgr.GetTagInfo(id).Value)
}
func TestTagRemoval(t *testing.T) {
var (
id1, id2 = tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)
tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)
tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)
// id1 has both tags; id2 only has the first tag.
_ = tag1.Bump(id1, 1000)
_ = tag2.Bump(id1, 1000)
_ = tag1.Bump(id2, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
// first tick.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id1).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id1).Tags["bop"])
require.Equal(999, mgr.GetTagInfo(id2).Tags["beep"])
require.Equal(999*2, mgr.GetTagInfo(id1).Value)
require.Equal(999, mgr.GetTagInfo(id2).Value)
// remove tag1 from p1.
err = tag1.Remove(id1)
// allow the background goroutine to process the removal.
<-time.After(500 * time.Millisecond)
require.NoError(err)
// next tick. both peers only have 1 tag, both at 998 value.
mockClock.Add(TestResolution)
require.Zero(mgr.GetTagInfo(id1).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id1).Tags["bop"])
require.Equal(998, mgr.GetTagInfo(id2).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id1).Value)
require.Equal(998, mgr.GetTagInfo(id2).Value)
// remove tag1 from p1 again; no error.
err = tag1.Remove(id1)
require.NoError(err)
}
func TestTagClosure(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)
tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)
tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)
_ = tag1.Bump(id, 1000)
_ = tag2.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)
// nothing has happened.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(999*2, mgr.GetTagInfo(id).Value)
// next tick; tag1 would've ticked.
mockClock.Add(TestResolution)
require.Equal(998, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(998*2, mgr.GetTagInfo(id).Value)
// close the tag.
err = tag1.Close()
require.NoError(err)
// allow the background goroutine to process the closure.
<-time.After(500 * time.Millisecond)
require.Equal(998, mgr.GetTagInfo(id).Value)
// a second closure should not error.
err = tag1.Close()
require.NoError(err)
// bumping a tag after it's been closed should error.
err = tag1.Bump(id, 5)
require.Error(err)
}
func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) {
mockClock := clock.NewMock()
cfg := &DecayerCfg{