Merge pull request #109 from libp2p/clean-shutdown

remove context from constructor, implement a proper Close method
This commit is contained in:
Marten Seemann 2021-08-30 10:39:42 +01:00 committed by GitHub
commit 8cc74d3aa8
9 changed files with 63 additions and 1097 deletions

View File

@ -22,11 +22,14 @@ var log = logging.Logger("autonat")
// AmbientAutoNAT is the implementation of ambient NAT autodiscovery // AmbientAutoNAT is the implementation of ambient NAT autodiscovery
type AmbientAutoNAT struct { type AmbientAutoNAT struct {
ctx context.Context
host host.Host host host.Host
*config *config
ctx context.Context
ctxCancel context.CancelFunc // is closed when Close is called
backgroundRunning chan struct{} // is closed when the background go routine exits
inboundConn chan network.Conn inboundConn chan network.Conn
observations chan autoNATResult observations chan autoNATResult
// status is an autoNATResult reflecting current status. // status is an autoNATResult reflecting current status.
@ -50,7 +53,6 @@ type AmbientAutoNAT struct {
// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired. // StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired.
type StaticAutoNAT struct { type StaticAutoNAT struct {
ctx context.Context
host host.Host host host.Host
reachability network.Reachability reachability network.Reachability
service *autoNATService service *autoNATService
@ -62,7 +64,7 @@ type autoNATResult struct {
} }
// New creates a new NAT autodiscovery system attached to a host // New creates a new NAT autodiscovery system attached to a host
func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) { func New(h host.Host, options ...Option) (AutoNAT, error) {
var err error var err error
conf := new(config) conf := new(config)
conf.host = h conf.host = h
@ -84,7 +86,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
var service *autoNATService var service *autoNATService
if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil { if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil {
service, err = newAutoNATService(ctx, conf) service, err = newAutoNATService(conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -95,19 +97,21 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability}) emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability})
return &StaticAutoNAT{ return &StaticAutoNAT{
ctx: ctx,
host: h, host: h,
reachability: conf.reachability, reachability: conf.reachability,
service: service, service: service,
}, nil }, nil
} }
ctx, cancel := context.WithCancel(context.Background())
as := &AmbientAutoNAT{ as := &AmbientAutoNAT{
ctx: ctx, ctx: ctx,
host: h, ctxCancel: cancel,
config: conf, backgroundRunning: make(chan struct{}),
inboundConn: make(chan network.Conn, 5), host: h,
observations: make(chan autoNATResult, 1), config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),
emitReachabilityChanged: emitReachabilityChanged, emitReachabilityChanged: emitReachabilityChanged,
service: service, service: service,
@ -159,6 +163,7 @@ func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool {
} }
func (as *AmbientAutoNAT) background() { func (as *AmbientAutoNAT) background() {
defer close(as.backgroundRunning)
// wait a bit for the node to come online and establish some connections // wait a bit for the node to come online and establish some connections
// before starting autodetection // before starting autodetection
delay := as.config.bootDelay delay := as.config.bootDelay
@ -426,6 +431,15 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID {
return candidates[0] return candidates[0]
} }
func (as *AmbientAutoNAT) Close() error {
as.ctxCancel()
if as.service != nil {
as.service.Disable()
}
<-as.backgroundRunning
return nil
}
func shufflePeers(peers []peer.ID) { func shufflePeers(peers []peer.ID) {
for i := range peers { for i := range peers {
j := rand.Intn(i + 1) j := rand.Intn(i + 1)
@ -445,3 +459,10 @@ func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
} }
return nil, errors.New("no available address") return nil, errors.New("no available address")
} }
func (s *StaticAutoNAT) Close() error {
if s.service != nil {
s.service.Disable()
}
return nil
}

View File

@ -55,7 +55,7 @@ func makeAutoNAT(ctx context.Context, t *testing.T, ash host.Host) (host.Host, A
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute) h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute)
h.Peerstore().AddProtocols(ash.ID(), AutoNATProto) h.Peerstore().AddProtocols(ash.ID(), AutoNATProto)
a, _ := New(ctx, h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay()) a, _ := New(h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay())
a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond
return h, a return h, a
@ -279,7 +279,7 @@ func TestStaticNat(t *testing.T) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})
nat, err := New(ctx, h, WithReachability(network.ReachabilityPrivate)) nat, err := New(h, WithReachability(network.ReachabilityPrivate))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -2,6 +2,7 @@ package autonat
import ( import (
"context" "context"
"io"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -16,6 +17,7 @@ type AutoNAT interface {
// PublicAddr returns the public dial address when NAT status is public and an // PublicAddr returns the public dial address when NAT status is public and an
// error otherwise // error otherwise
PublicAddr() (ma.Multiaddr, error) PublicAddr() (ma.Multiaddr, error)
io.Closer
} }
// Client is a stateless client interface to AutoNAT peers // Client is a stateless client interface to AutoNAT peers

View File

@ -20,9 +20,9 @@ var streamTimeout = 60 * time.Second
// AutoNATService provides NAT autodetection services to other peers // AutoNATService provides NAT autodetection services to other peers
type autoNATService struct { type autoNATService struct {
ctx context.Context instanceLock sync.Mutex
instance context.CancelFunc instance context.CancelFunc
instanceLock sync.Mutex backgroundRunning chan struct{} // closed when background exits
config *config config *config
@ -33,18 +33,14 @@ type autoNATService struct {
} }
// NewAutoNATService creates a new AutoNATService instance attached to a host // NewAutoNATService creates a new AutoNATService instance attached to a host
func newAutoNATService(ctx context.Context, c *config) (*autoNATService, error) { func newAutoNATService(c *config) (*autoNATService, error) {
if c.dialer == nil { if c.dialer == nil {
return nil, errors.New("cannot create NAT service without a network") return nil, errors.New("cannot create NAT service without a network")
} }
return &autoNATService{
as := &autoNATService{
ctx: ctx,
config: c, config: c,
reqs: make(map[peer.ID]int), reqs: make(map[peer.ID]int),
} }, nil
return as, nil
} }
func (as *autoNATService) handleStream(s network.Stream) { func (as *autoNATService) handleStream(s network.Stream) {
@ -190,7 +186,7 @@ func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
as.globalReqs++ as.globalReqs++
as.mx.Unlock() as.mx.Unlock()
ctx, cancel := context.WithTimeout(as.ctx, as.config.dialTimeout) ctx, cancel := context.WithTimeout(context.Background(), as.config.dialTimeout)
defer cancel() defer cancel()
as.config.dialer.Peerstore().ClearAddrs(pi.ID) as.config.dialer.Peerstore().ClearAddrs(pi.ID)
@ -217,10 +213,11 @@ func (as *autoNATService) Enable() {
if as.instance != nil { if as.instance != nil {
return return
} }
inst, cncl := context.WithCancel(as.ctx) ctx, cancel := context.WithCancel(context.Background())
as.instance = cncl as.instance = cancel
as.backgroundRunning = make(chan struct{})
go as.background(inst) go as.background(ctx)
} }
// Disable the autoNAT service if it is running. // Disable the autoNAT service if it is running.
@ -230,10 +227,12 @@ func (as *autoNATService) Disable() {
if as.instance != nil { if as.instance != nil {
as.instance() as.instance()
as.instance = nil as.instance = nil
<-as.backgroundRunning
} }
} }
func (as *autoNATService) background(ctx context.Context) { func (as *autoNATService) background(ctx context.Context) {
defer close(as.backgroundRunning)
as.config.host.SetStreamHandler(AutoNATProto, as.handleStream) as.config.host.SetStreamHandler(AutoNATProto, as.handleStream)
timer := time.NewTimer(as.config.throttleResetPeriod) timer := time.NewTimer(as.config.throttleResetPeriod)

View File

@ -24,8 +24,8 @@ func makeAutoNATConfig(ctx context.Context, t *testing.T) *config {
return &c return &c
} }
func makeAutoNATService(ctx context.Context, t *testing.T, c *config) *autoNATService { func makeAutoNATService(t *testing.T, c *config) *autoNATService {
as, err := newAutoNATService(ctx, c) as, err := newAutoNATService(c)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -48,7 +48,7 @@ func TestAutoNATServiceDialError(t *testing.T) {
c := makeAutoNATConfig(ctx, t) c := makeAutoNATConfig(ctx, t)
c.dialTimeout = 1 * time.Second c.dialTimeout = 1 * time.Second
c.dialPolicy.allowSelfDials = false c.dialPolicy.allowSelfDials = false
_ = makeAutoNATService(ctx, t, c) _ = makeAutoNATService(t, c)
hc, ac := makeAutoNATClient(ctx, t) hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc) connect(t, c.host, hc)
@ -67,7 +67,7 @@ func TestAutoNATServiceDialSuccess(t *testing.T) {
defer cancel() defer cancel()
c := makeAutoNATConfig(ctx, t) c := makeAutoNATConfig(ctx, t)
_ = makeAutoNATService(ctx, t, c) _ = makeAutoNATService(t, c)
hc, ac := makeAutoNATClient(ctx, t) hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc) connect(t, c.host, hc)
@ -87,7 +87,7 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
c.throttleResetPeriod = time.Second c.throttleResetPeriod = time.Second
c.throttleResetJitter = 0 c.throttleResetJitter = 0
c.throttlePeerMax = 1 c.throttlePeerMax = 1
_ = makeAutoNATService(ctx, t, c) _ = makeAutoNATService(t, c)
hc, ac := makeAutoNATClient(ctx, t) hc, ac := makeAutoNATClient(ctx, t)
connect(t, c.host, hc) connect(t, c.host, hc)
@ -124,7 +124,7 @@ func TestAutoNATServiceGlobalLimiter(t *testing.T) {
c.throttleResetJitter = 0 c.throttleResetJitter = 0
c.throttlePeerMax = 1 c.throttlePeerMax = 1
c.throttleGlobalMax = 5 c.throttleGlobalMax = 5
_ = makeAutoNATService(ctx, t, c) _ = makeAutoNATService(t, c)
hs := c.host hs := c.host
@ -157,7 +157,7 @@ func TestAutoNATServiceRateLimitJitter(t *testing.T) {
c.throttleResetPeriod = 100 * time.Millisecond c.throttleResetPeriod = 100 * time.Millisecond
c.throttleResetJitter = 100 * time.Millisecond c.throttleResetJitter = 100 * time.Millisecond
c.throttleGlobalMax = 1 c.throttleGlobalMax = 1
svc := makeAutoNATService(ctx, t, c) svc := makeAutoNATService(t, c)
svc.mx.Lock() svc.mx.Lock()
svc.globalReqs = 1 svc.globalReqs = 1
svc.mx.Unlock() svc.mx.Unlock()
@ -178,7 +178,7 @@ func TestAutoNATServiceStartup(t *testing.T) {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
an, err := New(ctx, h, EnableService(dh.Network())) an, err := New(h, EnableService(dh.Network()))
an.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true an.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -1,3 +1,6 @@
//go:build ignore
// +build ignore
// This separate testing package helps to resolve a circular dependency potentially // This separate testing package helps to resolve a circular dependency potentially
// being created between libp2p and libp2p-autonat // being created between libp2p and libp2p-autonat
package autonat_test package autonat_test
@ -31,7 +34,7 @@ func TestAutonatRoundtrip(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if _, err := autonat.New(ctx, service, autonat.EnableService(dialback.Network())); err != nil { if _, err := autonat.New(service, autonat.EnableService(dialback.Network())); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -0,0 +1,3 @@
package autonat_test
// needed so that go test ./... doesn't error

View File

@ -2,10 +2,4 @@ module github.com/libp2p/go-libp2p-autonat/test
go 1.16 go 1.16
require (
github.com/libp2p/go-libp2p v0.14.4
github.com/libp2p/go-libp2p-autonat v0.4.2
github.com/libp2p/go-libp2p-core v0.8.6
)
replace github.com/libp2p/go-libp2p-autonat => ../ replace github.com/libp2p/go-libp2p-autonat => ../

File diff suppressed because it is too large Load Diff