diff --git a/p2p/net/connmgr/decay.go b/p2p/net/connmgr/decay.go index fd523bf3..62029a53 100644 --- a/p2p/net/connmgr/decay.go +++ b/p2p/net/connmgr/decay.go @@ -22,6 +22,12 @@ type bumpCmd struct { delta int } +// removeCmd represents a tag removal command. +type removeCmd struct { + peer peer.ID + tag *decayingTag +} + // decayer tracks and manages all decaying tags and their values. type decayer struct { cfg *DecayerCfg @@ -34,8 +40,10 @@ type decayer struct { // lastTick stores the last time the decayer ticked. Guarded by atomic. lastTick atomic.Value - // bumpCh queues bump commands to be processed by the loop. - bumpCh chan bumpCmd + // bumpTagCh queues bump commands to be processed by the loop. + bumpTagCh chan bumpCmd + removeTagCh chan removeCmd + closeTagCh chan *decayingTag // closure thingies. closeCh chan struct{} @@ -70,13 +78,15 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) { } d := &decayer{ - cfg: cfg, - mgr: mgr, - clock: cfg.Clock, - knownTags: make(map[string]*decayingTag), - bumpCh: make(chan bumpCmd, 128), - closeCh: make(chan struct{}), - doneCh: make(chan struct{}), + cfg: cfg, + mgr: mgr, + clock: cfg.Clock, + knownTags: make(map[string]*decayingTag), + bumpTagCh: make(chan bumpCmd, 128), + removeTagCh: make(chan removeCmd, 128), + closeTagCh: make(chan *decayingTag, 128), + closeCh: make(chan struct{}), + doneCh: make(chan struct{}), } d.lastTick.Store(d.clock.Now()) @@ -203,7 +213,7 @@ func (d *decayer) process() { delete(visit, tag) } - case bmp = <-d.bumpCh: + case bmp = <-d.bumpTagCh: var ( now = d.clock.Now() peer, tag = bmp.peer, bmp.tag @@ -231,9 +241,42 @@ func (d *decayer) process() { s.Unlock() + case rm := <-d.removeTagCh: + s := d.mgr.segments.get(rm.peer) + s.Lock() + + p := s.tagInfoFor(rm.peer) + v, ok := p.decaying[rm.tag] + if !ok { + s.Unlock() + continue + } + p.value -= v.Value + delete(p.decaying, rm.tag) + s.Unlock() + + case t := <-d.closeTagCh: + // Stop tracking the tag. + d.tagsMu.Lock() + delete(d.knownTags, t.name) + d.tagsMu.Unlock() + + // Remove the tag from all peers that had it in the connmgr. + for _, s := range d.mgr.segments { + // visit all segments, and attempt to remove the tag from all the peers it stores. + s.Lock() + for _, p := range s.peers { + if dt, ok := p.decaying[t]; ok { + // decrease the value of the tagInfo, and delete the tag. + p.value -= dt.Value + delete(p.decaying, t) + } + } + s.Unlock() + } + case <-d.closeCh: return - } } } @@ -247,6 +290,10 @@ type decayingTag struct { nextTick time.Time decayFn connmgr.DecayFn bumpFn connmgr.BumpFn + + // closed marks this tag as closed, so that if it's bumped after being + // closed, we can return an error. 0 = false; 1 = true; guarded by atomic. + closed int32 } var _ connmgr.DecayingTag = (*decayingTag)(nil) @@ -261,18 +308,49 @@ func (t *decayingTag) Interval() time.Duration { // Bump bumps a tag for this peer. func (t *decayingTag) Bump(p peer.ID, delta int) error { + if atomic.LoadInt32(&t.closed) == 1 { + return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name) + } + bmp := bumpCmd{peer: p, tag: t, delta: delta} select { - case t.trkr.bumpCh <- bmp: + case t.trkr.bumpTagCh <- bmp: return nil - default: return fmt.Errorf( "unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)", - p.Pretty(), - t.name, - delta, - len(t.trkr.bumpCh)) + p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh)) + } +} + +func (t *decayingTag) Remove(p peer.ID) error { + if atomic.LoadInt32(&t.closed) == 1 { + return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name) + } + + rm := removeCmd{peer: p, tag: t} + + select { + case t.trkr.removeTagCh <- rm: + return nil + default: + return fmt.Errorf( + "unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)", + p.Pretty(), t.name, len(t.trkr.removeTagCh)) + } +} + +func (t *decayingTag) Close() error { + if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) { + log.Warnf("duplicate decaying tag closure: %s; skipping", t.name) + return nil + } + + select { + case t.trkr.closeTagCh <- t: + return nil + default: + return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh)) } } diff --git a/p2p/net/connmgr/decay_test.go b/p2p/net/connmgr/decay_test.go index fad5e0d8..c75efce1 100644 --- a/p2p/net/connmgr/decay_test.go +++ b/p2p/net/connmgr/decay_test.go @@ -283,7 +283,7 @@ func TestResolutionMisaligned(t *testing.T) { // allow the background goroutine to process bumps. <-time.After(500 * time.Millisecond) - // nothing has happened. + // first tick. mockClock.Add(TestResolution) require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"]) require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"]) @@ -301,6 +301,104 @@ func TestResolutionMisaligned(t *testing.T) { require.Equal(1997, mgr.GetTagInfo(id).Value) } +func TestTagRemoval(t *testing.T) { + var ( + id1, id2 = tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t) + mgr, decay, mockClock = testDecayTracker(t) + require = require.New(t) + ) + + tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + // id1 has both tags; id2 only has the first tag. + _ = tag1.Bump(id1, 1000) + _ = tag2.Bump(id1, 1000) + _ = tag1.Bump(id2, 1000) + + // allow the background goroutine to process bumps. + <-time.After(500 * time.Millisecond) + + // first tick. + mockClock.Add(TestResolution) + require.Equal(999, mgr.GetTagInfo(id1).Tags["beep"]) + require.Equal(999, mgr.GetTagInfo(id1).Tags["bop"]) + require.Equal(999, mgr.GetTagInfo(id2).Tags["beep"]) + + require.Equal(999*2, mgr.GetTagInfo(id1).Value) + require.Equal(999, mgr.GetTagInfo(id2).Value) + + // remove tag1 from p1. + err = tag1.Remove(id1) + + // allow the background goroutine to process the removal. + <-time.After(500 * time.Millisecond) + require.NoError(err) + + // next tick. both peers only have 1 tag, both at 998 value. + mockClock.Add(TestResolution) + require.Zero(mgr.GetTagInfo(id1).Tags["beep"]) + require.Equal(998, mgr.GetTagInfo(id1).Tags["bop"]) + require.Equal(998, mgr.GetTagInfo(id2).Tags["beep"]) + + require.Equal(998, mgr.GetTagInfo(id1).Value) + require.Equal(998, mgr.GetTagInfo(id2).Value) + + // remove tag1 from p1 again; no error. + err = tag1.Remove(id1) + require.NoError(err) +} + +func TestTagClosure(t *testing.T) { + var ( + id = tu.RandPeerIDFatal(t) + mgr, decay, mockClock = testDecayTracker(t) + require = require.New(t) + ) + + tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + _ = tag1.Bump(id, 1000) + _ = tag2.Bump(id, 1000) + // allow the background goroutine to process bumps. + <-time.After(500 * time.Millisecond) + + // nothing has happened. + mockClock.Add(TestResolution) + require.Equal(999, mgr.GetTagInfo(id).Tags["beep"]) + require.Equal(999, mgr.GetTagInfo(id).Tags["bop"]) + require.Equal(999*2, mgr.GetTagInfo(id).Value) + + // next tick; tag1 would've ticked. + mockClock.Add(TestResolution) + require.Equal(998, mgr.GetTagInfo(id).Tags["beep"]) + require.Equal(998, mgr.GetTagInfo(id).Tags["bop"]) + require.Equal(998*2, mgr.GetTagInfo(id).Value) + + // close the tag. + err = tag1.Close() + require.NoError(err) + + // allow the background goroutine to process the closure. + <-time.After(500 * time.Millisecond) + require.Equal(998, mgr.GetTagInfo(id).Value) + + // a second closure should not error. + err = tag1.Close() + require.NoError(err) + + // bumping a tag after it's been closed should error. + err = tag1.Bump(id, 5) + require.Error(err) +} + func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) { mockClock := clock.NewMock() cfg := &DecayerCfg{