From 2cf1c457d5bae01f6b9832d21a1edd362be2105f Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Sun, 5 Sep 2021 18:18:59 +0100 Subject: [PATCH] don't use a context for closing the ObservedAddrManager --- p2p/protocol/identify/id.go | 8 ++--- p2p/protocol/identify/obsaddr.go | 42 +++++++++++++++------------ p2p/protocol/identify/obsaddr_test.go | 19 +++++------- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 024d0972..17801f12 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -124,25 +124,23 @@ func NewIDService(h host.Host, opts ...Option) (*IDService, error) { userAgent = cfg.userAgent } - hostCtx, cancel := context.WithCancel(context.Background()) s := &IDService{ Host: h, UserAgent: userAgent, - ctx: hostCtx, - ctxCancel: cancel, - conns: make(map[network.Conn]chan struct{}), + conns: make(map[network.Conn]chan struct{}), disableSignedPeerRecord: cfg.disableSignedPeerRecord, addPeerHandlerCh: make(chan addPeerHandlerReq), rmPeerHandlerCh: make(chan rmPeerHandlerReq), } + s.ctx, s.ctxCancel = context.WithCancel(context.Background()) // handle local protocol handler updates, and push deltas to peers. var err error - observedAddrs, err := NewObservedAddrManager(hostCtx, h) + observedAddrs, err := NewObservedAddrManager(h) if err != nil { return nil, fmt.Errorf("failed to create observed address manager: %s", err) } diff --git a/p2p/protocol/identify/obsaddr.go b/p2p/protocol/identify/obsaddr.go index ef6b118d..0ec6c265 100644 --- a/p2p/protocol/identify/obsaddr.go +++ b/p2p/protocol/identify/obsaddr.go @@ -98,6 +98,11 @@ type newObservation struct { type ObservedAddrManager struct { host host.Host + closeOnce sync.Once + refCount sync.WaitGroup + ctx context.Context // the context is canceled when Close is called + ctxCancel context.CancelFunc + // latest observation from active connections // we'll "re-observe" these when we gc activeConnsMu sync.Mutex @@ -123,7 +128,7 @@ type ObservedAddrManager struct { // NewObservedAddrManager returns a new address manager using // peerstore.OwnObservedAddressTTL as the TTL. -func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrManager, error) { +func NewObservedAddrManager(host host.Host) (*ObservedAddrManager, error) { oas := &ObservedAddrManager{ addrs: make(map[string][]*observedAddr), ttl: peerstore.OwnObservedAddrTTL, @@ -133,6 +138,7 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM // refresh every ttl/2 so we don't forget observations from connected peers refreshTimer: time.NewTimer(peerstore.OwnObservedAddrTTL / 2), } + oas.ctx, oas.ctxCancel = context.WithCancel(context.Background()) reachabilitySub, err := host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) if err != nil { @@ -147,7 +153,8 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM oas.emitNATDeviceTypeChanged = emitter oas.host.Network().Notify((*obsAddrNotifiee)(oas)) - go oas.worker(ctx) + oas.refCount.Add(1) + go oas.worker() return oas, nil } @@ -239,22 +246,12 @@ func (oas *ObservedAddrManager) Record(conn network.Conn, observed ma.Multiaddr) } } -func (oas *ObservedAddrManager) teardown() { - oas.host.Network().StopNotify((*obsAddrNotifiee)(oas)) - oas.reachabilitySub.Close() - - oas.mu.Lock() - oas.refreshTimer.Stop() - oas.mu.Unlock() -} - -func (oas *ObservedAddrManager) worker(ctx context.Context) { - defer oas.teardown() +func (oas *ObservedAddrManager) worker() { + defer oas.refCount.Done() ticker := time.NewTicker(GCInterval) defer ticker.Stop() - hostClosing := oas.host.Network().Process().Closing() subChan := oas.reachabilitySub.Out() for { select { @@ -265,17 +262,13 @@ func (oas *ObservedAddrManager) worker(ctx context.Context) { } ev := evt.(event.EvtLocalReachabilityChanged) oas.reachability = ev.Reachability - case obs := <-oas.wch: oas.maybeRecordObservation(obs.conn, obs.observed) - case <-ticker.C: oas.gc() case <-oas.refreshTimer.C: oas.refresh() - case <-hostClosing: - return - case <-ctx.Done(): + case <-oas.ctx.Done(): return } } @@ -534,6 +527,17 @@ func (oas *ObservedAddrManager) emitSpecificNATType(addrs []*observedAddr, proto return false, 0 } +func (oas *ObservedAddrManager) Close() error { + oas.closeOnce.Do(func() { + oas.ctxCancel() + oas.refCount.Wait() + oas.reachabilitySub.Close() + oas.refreshTimer.Stop() + oas.host.Network().StopNotify((*obsAddrNotifiee)(oas)) + }) + return nil +} + // observerGroup is a function that determines what part of // a multiaddr counts as a different observer. for example, // two ipfs nodes at the same IP/TCP transport would get diff --git a/p2p/protocol/identify/obsaddr_test.go b/p2p/protocol/identify/obsaddr_test.go index 4b3d0aca..2ce04009 100644 --- a/p2p/protocol/identify/obsaddr_test.go +++ b/p2p/protocol/identify/obsaddr_test.go @@ -85,18 +85,11 @@ func (h *harness) observeInbound(observed ma.Multiaddr, observer peer.ID) networ func newHarness(ctx context.Context, t *testing.T) harness { mn := mocknet.New(ctx) sk, err := p2putil.RandTestBogusPrivateKey() - if err != nil { - t.Fatal(err) - } - - h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086")) - if err != nil { - t.Fatal(err) - } - - oas, err := identify.NewObservedAddrManager(ctx, h) require.NoError(t, err) - + h, err := mn.AddPeer(sk, ma.StringCast("/ip4/127.0.0.1/tcp/10086")) + require.NoError(t, err) + oas, err := identify.NewObservedAddrManager(h) + require.NoError(t, err) return harness{ oas: oas, mocknet: mn, @@ -142,6 +135,7 @@ func TestObsAddrSet(t *testing.T) { defer cancel() harness := newHarness(ctx, t) + defer harness.oas.Close() if !addrsMatch(harness.oas.Addrs(), nil) { t.Error("addrs should be empty") @@ -243,6 +237,7 @@ func TestObservedAddrFiltering(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() harness := newHarness(ctx, t) + defer harness.oas.Close() require.Empty(t, harness.oas.Addrs()) // IP4/TCP @@ -344,6 +339,7 @@ func TestEmitNATDeviceTypeSymmetric(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() harness := newHarness(ctx, t) + defer harness.oas.Close() require.Empty(t, harness.oas.Addrs()) emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) require.NoError(t, err) @@ -390,6 +386,7 @@ func TestEmitNATDeviceTypeCone(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() harness := newHarness(ctx, t) + defer harness.oas.Close() require.Empty(t, harness.oas.Addrs()) emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) require.NoError(t, err)