relay: no more wrapping

We don't need the host wrappers, we can just replace the filter AddrsFactory.

Also, always filter out relay addresses.
This commit is contained in:
Steven Allen 2019-04-05 12:13:24 -07:00
parent 28eb467dc4
commit 80ada8a7d6
4 changed files with 115 additions and 137 deletions

View File

@ -109,18 +109,29 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
swrm.Filters = cfg.Filters
}
var h host.Host
h, err = bhost.NewHost(ctx, swrm, &bhost.HostOpts{
h, err := bhost.NewHost(ctx, swrm, &bhost.HostOpts{
ConnManager: cfg.ConnManager,
AddrsFactory: cfg.AddrsFactory,
NATManager: cfg.NATManager,
EnablePing: !cfg.DisablePing,
})
if err != nil {
swrm.Close()
return nil, err
}
if cfg.Relay {
// If we've enabled the relay, we should filter out relay
// addresses by default.
//
// TODO: We shouldn't be doing this here.
oldFactory := h.AddrsFactory
h.AddrsFactory = func(addrs []ma.Multiaddr) []ma.Multiaddr {
return relay.Filter(oldFactory(addrs))
}
}
upgrader := new(tptu.Upgrader)
upgrader.Protector = cfg.Protector
upgrader.Filters = swrm.Filters
@ -206,18 +217,24 @@ func (cfg *Config) NewNode(ctx context.Context) (host.Host, error) {
}
if hop {
h = relay.NewRelayHost(swrm.Context(), h.(*bhost.BasicHost), discovery)
// advertise ourselves
// TODO: Why do we only do this when EnableAutoRelay is
// set? This has absolutely _nothing_ to do with
// autorelay.
relay.Advertise(ctx, discovery)
} else {
h = relay.NewAutoRelayHost(swrm.Context(), h.(*bhost.BasicHost), discovery, router)
// TODO
// 1. Stop abusing contexts like this.
// 2. Introduce a service management system (e.g.,
// uber's fx) so we can actually manage the lifetime of
// this service.
_ = relay.NewAutoRelay(swrm.Context(), h, discovery, router)
}
}
if router != nil {
h = routed.Wrap(h, router)
return routed.Wrap(h, router), nil
}
// TODO: Bootstrapping.
return h, nil
}

View File

@ -12,7 +12,6 @@ import (
autonat "github.com/libp2p/go-libp2p-autonat"
_ "github.com/libp2p/go-libp2p-circuit"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
@ -29,13 +28,11 @@ var (
DesiredRelays = 3
BootDelay = 20 * time.Second
unspecificRelay = ma.StringCast("/p2p-circuit")
)
// AutoRelayHost is a Host that uses relays for connectivity when a NAT is detected.
type AutoRelayHost struct {
*basic.BasicHost
// AutoRelay is a Host that uses relays for connectivity when a NAT is detected.
type AutoRelay struct {
host *basic.BasicHost
discover discovery.Discoverer
router routing.PeerRouting
autonat autonat.AutoNAT
@ -48,37 +45,37 @@ type AutoRelayHost struct {
addrs []ma.Multiaddr
}
func NewAutoRelayHost(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer, router routing.PeerRouting) *AutoRelayHost {
h := &AutoRelayHost{
BasicHost: bhost,
func NewAutoRelay(ctx context.Context, bhost *basic.BasicHost, discover discovery.Discoverer, router routing.PeerRouting) *AutoRelay {
ar := &AutoRelay{
host: bhost,
discover: discover,
router: router,
addrsF: bhost.AddrsFactory,
relays: make(map[peer.ID]pstore.PeerInfo),
disconnect: make(chan struct{}, 1),
}
h.autonat = autonat.NewAutoNAT(ctx, bhost, h.baseAddrs)
bhost.AddrsFactory = h.hostAddrs
bhost.Network().Notify(h)
go h.background(ctx)
return h
ar.autonat = autonat.NewAutoNAT(ctx, bhost, ar.baseAddrs)
bhost.AddrsFactory = ar.hostAddrs
bhost.Network().Notify(ar)
go ar.background(ctx)
return ar
}
func (h *AutoRelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
h.mx.Lock()
defer h.mx.Unlock()
if h.addrs != nil && h.autonat.Status() == autonat.NATStatusPrivate {
return h.addrs
func (ar *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
ar.mx.Lock()
defer ar.mx.Unlock()
if ar.addrs != nil && ar.autonat.Status() == autonat.NATStatusPrivate {
return ar.addrs
} else {
return filterUnspecificRelay(h.addrsF(addrs))
return ar.addrsF(addrs)
}
}
func (h *AutoRelayHost) baseAddrs() []ma.Multiaddr {
return filterUnspecificRelay(h.addrsF(h.AllAddrs()))
func (ar *AutoRelay) baseAddrs() []ma.Multiaddr {
return ar.addrsF(ar.host.AllAddrs())
}
func (h *AutoRelayHost) background(ctx context.Context) {
func (ar *AutoRelay) background(ctx context.Context) {
select {
case <-time.After(autonat.AutoNATBootDelay + BootDelay):
case <-ctx.Done():
@ -90,39 +87,39 @@ func (h *AutoRelayHost) background(ctx context.Context) {
for {
wait := autonat.AutoNATRefreshInterval
switch h.autonat.Status() {
switch ar.autonat.Status() {
case autonat.NATStatusUnknown:
wait = autonat.AutoNATRetryInterval
case autonat.NATStatusPublic:
// invalidate addrs
h.mx.Lock()
if h.addrs != nil {
h.addrs = nil
ar.mx.Lock()
if ar.addrs != nil {
ar.addrs = nil
push = true
}
h.mx.Unlock()
ar.mx.Unlock()
// if we had previously announced relay addrs, push our public addrs
if push {
push = false
h.PushIdentify()
ar.host.PushIdentify()
}
case autonat.NATStatusPrivate:
push = false // clear, findRelays pushes as needed
h.findRelays(ctx)
ar.findRelays(ctx)
}
select {
case <-h.disconnect:
case <-ar.disconnect:
// invalidate addrs
h.mx.Lock()
if h.addrs != nil {
h.addrs = nil
ar.mx.Lock()
if ar.addrs != nil {
ar.addrs = nil
push = true
}
h.mx.Unlock()
ar.mx.Unlock()
case <-time.After(wait):
case <-ctx.Done():
return
@ -130,14 +127,14 @@ func (h *AutoRelayHost) background(ctx context.Context) {
}
}
func (h *AutoRelayHost) findRelays(ctx context.Context) {
h.mx.Lock()
if len(h.relays) >= DesiredRelays {
h.mx.Unlock()
func (ar *AutoRelay) findRelays(ctx context.Context) {
ar.mx.Lock()
if len(ar.relays) >= DesiredRelays {
ar.mx.Unlock()
return
}
need := DesiredRelays - len(h.relays)
h.mx.Unlock()
need := DesiredRelays - len(ar.relays)
ar.mx.Unlock()
limit := 20
if need > limit/2 {
@ -145,29 +142,29 @@ func (h *AutoRelayHost) findRelays(ctx context.Context) {
}
dctx, cancel := context.WithTimeout(ctx, 30*time.Second)
pis, err := discovery.FindPeers(dctx, h.discover, RelayRendezvous, limit)
pis, err := discovery.FindPeers(dctx, ar.discover, RelayRendezvous, limit)
cancel()
if err != nil {
log.Debugf("error discovering relays: %s", err.Error())
return
}
pis = h.selectRelays(pis)
pis = ar.selectRelays(pis)
update := 0
for _, pi := range pis {
h.mx.Lock()
if _, ok := h.relays[pi.ID]; ok {
h.mx.Unlock()
ar.mx.Lock()
if _, ok := ar.relays[pi.ID]; ok {
ar.mx.Unlock()
continue
}
h.mx.Unlock()
ar.mx.Unlock()
cctx, cancel := context.WithTimeout(ctx, 60*time.Second)
if len(pi.Addrs) == 0 {
pi, err = h.router.FindPeer(cctx, pi.ID)
pi, err = ar.router.FindPeer(cctx, pi.ID)
if err != nil {
log.Debugf("error finding relay peer %s: %s", pi.ID, err.Error())
cancel()
@ -175,7 +172,7 @@ func (h *AutoRelayHost) findRelays(ctx context.Context) {
}
}
err = h.Connect(cctx, pi)
err = ar.host.Connect(cctx, pi)
cancel()
if err != nil {
log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error())
@ -183,12 +180,12 @@ func (h *AutoRelayHost) findRelays(ctx context.Context) {
}
log.Debugf("connected to relay %s", pi.ID)
h.mx.Lock()
h.relays[pi.ID] = pi
h.mx.Unlock()
ar.mx.Lock()
ar.relays[pi.ID] = pi
ar.mx.Unlock()
// tag the connection as very important
h.ConnManager().TagPeer(pi.ID, "relay", 42)
ar.host.ConnManager().TagPeer(pi.ID, "relay", 42)
update++
need--
@ -197,24 +194,24 @@ func (h *AutoRelayHost) findRelays(ctx context.Context) {
}
}
if update > 0 || h.addrs == nil {
h.updateAddrs()
if update > 0 || ar.addrs == nil {
ar.updateAddrs()
}
}
func (h *AutoRelayHost) selectRelays(pis []pstore.PeerInfo) []pstore.PeerInfo {
func (ar *AutoRelay) selectRelays(pis []pstore.PeerInfo) []pstore.PeerInfo {
// TODO better relay selection strategy; this just selects random relays
// but we should probably use ping latency as the selection metric
shuffleRelays(pis)
return pis
}
func (h *AutoRelayHost) updateAddrs() {
h.doUpdateAddrs()
h.PushIdentify()
func (ar *AutoRelay) updateAddrs() {
ar.doUpdateAddrs()
ar.host.PushIdentify()
}
// This function updates our NATed advertised addrs (h.addrs)
// This function updates our NATed advertised addrs (ar.addrs)
// The public addrs are rewritten so that they only retain the public IP part; they
// become undialable but are useful as a hint to the dialer to determine whether or not
// to dial private addrs.
@ -223,12 +220,12 @@ func (h *AutoRelayHost) updateAddrs() {
// On top of those, we add the relay-specific addrs for the relays to which we are
// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr
// through which we can be dialed.
func (h *AutoRelayHost) doUpdateAddrs() {
h.mx.Lock()
defer h.mx.Unlock()
func (ar *AutoRelay) doUpdateAddrs() {
ar.mx.Lock()
defer ar.mx.Unlock()
addrs := h.baseAddrs()
raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(h.relays))
addrs := ar.baseAddrs()
raddrs := make([]ma.Multiaddr, 0, len(addrs)+len(ar.relays))
// remove our public addresses from the list
for _, addr := range addrs {
@ -239,7 +236,7 @@ func (h *AutoRelayHost) doUpdateAddrs() {
}
// add relay specific addrs to the list
for _, pi := range h.relays {
for _, pi := range ar.relays {
circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", pi.ID.Pretty()))
if err != nil {
panic(err)
@ -253,18 +250,7 @@ func (h *AutoRelayHost) doUpdateAddrs() {
}
}
h.addrs = raddrs
}
func filterUnspecificRelay(addrs []ma.Multiaddr) []ma.Multiaddr {
res := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
if addr.Equal(unspecificRelay) {
continue
}
res = append(res, addr)
}
return res
ar.addrs = raddrs
}
func shuffleRelays(pis []pstore.PeerInfo) {
@ -274,34 +260,23 @@ func shuffleRelays(pis []pstore.PeerInfo) {
}
}
func containsAddr(lst []ma.Multiaddr, addr ma.Multiaddr) bool {
for _, xaddr := range lst {
if xaddr.Equal(addr) {
return true
}
}
return false
}
func (ar *AutoRelay) Listen(inet.Network, ma.Multiaddr) {}
func (ar *AutoRelay) ListenClose(inet.Network, ma.Multiaddr) {}
func (ar *AutoRelay) Connected(inet.Network, inet.Conn) {}
// notify
func (h *AutoRelayHost) Listen(inet.Network, ma.Multiaddr) {}
func (h *AutoRelayHost) ListenClose(inet.Network, ma.Multiaddr) {}
func (h *AutoRelayHost) Connected(inet.Network, inet.Conn) {}
func (h *AutoRelayHost) Disconnected(_ inet.Network, c inet.Conn) {
func (ar *AutoRelay) Disconnected(net inet.Network, c inet.Conn) {
p := c.RemotePeer()
h.mx.Lock()
defer h.mx.Unlock()
if _, ok := h.relays[p]; ok {
delete(h.relays, p)
ar.mx.Lock()
defer ar.mx.Unlock()
if _, ok := ar.relays[p]; ok {
delete(ar.relays, p)
select {
case h.disconnect <- struct{}{}:
case ar.disconnect <- struct{}{}:
default:
}
}
}
func (h *AutoRelayHost) OpenedStream(inet.Network, inet.Stream) {}
func (h *AutoRelayHost) ClosedStream(inet.Network, inet.Stream) {}
var _ host.Host = (*AutoRelayHost)(nil)
func (ar *AutoRelay) OpenedStream(inet.Network, inet.Stream) {}
func (ar *AutoRelay) ClosedStream(inet.Network, inet.Stream) {}

View File

@ -14,11 +14,6 @@ How it works:
- `AutoNATService` instances are instantiated in the
bootstrappers (or other well known publicly reachable hosts)
- `RelayHost`s are constructed with
`libp2p.New(libp2p.EnableRelay(circuit.OptHop), libp2p.Routing(makeDHT))`.
They provide Relay Hop services, and advertise through the DHT
in the `/libp2p/relay` namespace
- `AutoRelayHost`s are constructed with `libp2p.New(libp2p.Routing(makeDHT))`
They passively discover autonat service instances and test dialability of
their listen address set through them. When the presence of NAT is detected,

View File

@ -4,10 +4,8 @@ import (
"context"
"time"
basic "github.com/libp2p/go-libp2p/p2p/host/basic"
circuit "github.com/libp2p/go-libp2p-circuit"
discovery "github.com/libp2p/go-libp2p-discovery"
host "github.com/libp2p/go-libp2p-host"
ma "github.com/multiformats/go-multiaddr"
)
@ -15,21 +13,8 @@ var (
AdvertiseBootDelay = 30 * time.Second
)
// RelayHost is a Host that provides Relay services.
type RelayHost struct {
*basic.BasicHost
advertise discovery.Advertiser
addrsF basic.AddrsFactory
}
// New constructs a new RelayHost
func NewRelayHost(ctx context.Context, bhost *basic.BasicHost, advertise discovery.Advertiser) *RelayHost {
h := &RelayHost{
BasicHost: bhost,
addrsF: bhost.AddrsFactory,
advertise: advertise,
}
bhost.AddrsFactory = h.hostAddrs
// Advertise advertises this node as a libp2p relay.
func Advertise(ctx context.Context, advertise discovery.Advertiser) {
go func() {
select {
case <-time.After(AdvertiseBootDelay):
@ -37,11 +22,17 @@ func NewRelayHost(ctx context.Context, bhost *basic.BasicHost, advertise discove
case <-ctx.Done():
}
}()
return h
}
func (h *RelayHost) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
return filterUnspecificRelay(h.addrsF(addrs))
// Filter filters out all relay addresses.
func Filter(addrs []ma.Multiaddr) []ma.Multiaddr {
raddrs := make([]ma.Multiaddr, 0, len(addrs))
for _, addr := range addrs {
_, err := addr.ValueForProtocol(circuit.P_CIRCUIT)
if err == nil {
continue
}
raddrs = append(raddrs, addr)
}
return raddrs
}
var _ host.Host = (*RelayHost)(nil)