fix: make address observation types private

We don't use and/or expose these anyways. Making them private makes it easier to
reason about their state.
This commit is contained in:
Steven Allen 2020-06-03 17:30:48 -07:00
parent 67eb8b1e93
commit a0813d7292
3 changed files with 70 additions and 59 deletions

View File

@ -40,29 +40,29 @@ type observation struct {
connDirection network.Direction
}
// ObservedAddr is an entry for an address reported by our peers.
// observedAddr is an entry for an address reported by our peers.
// We only use addresses that:
// - have been observed at least 4 times in last 40 minutes. (counter symmetric nats)
// - have been observed at least once recently (10 minutes), because our position in the
// network, or network port mapppings, may have changed.
type ObservedAddr struct {
Addr ma.Multiaddr
SeenBy map[string]observation // peer(observer) address -> observation info
LastSeen time.Time
type observedAddr struct {
addr ma.Multiaddr
seenBy map[string]observation // peer(observer) address -> observation info
lastSeen time.Time
}
func (oa *ObservedAddr) activated() bool {
func (oa *observedAddr) activated() bool {
// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
return len(oa.seenBy) >= ActivationThresh
}
func (oa *ObservedAddr) numInbound() int {
func (oa *observedAddr) numInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
for obs := range oa.seenBy {
if oa.seenBy[obs].connDirection == network.DirInbound {
count++
}
}
@ -74,9 +74,9 @@ func (oa *ObservedAddr) numInbound() int {
// observed address's group is just the address with all ports set to 0. This
// means we can advertise the most commonly observed external ports without
// advertising _every_ observed port.
func (oa *ObservedAddr) GroupKey() string {
key := make([]byte, 0, len(oa.Addr.Bytes()))
ma.ForEach(oa.Addr, func(c ma.Component) bool {
func (oa *observedAddr) groupKey() string {
key := make([]byte, 0, len(oa.addr.Bytes()))
ma.ForEach(oa.addr, func(c ma.Component) bool {
switch proto := c.Protocol(); proto.Code {
case ma.P_TCP, ma.P_UDP:
key = append(key, proto.VCode...)
@ -107,7 +107,7 @@ type ObservedAddrManager struct {
mu sync.RWMutex
// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
addrs map[string][]*observedAddr
ttl time.Duration
refreshTimer *time.Timer
@ -119,7 +119,7 @@ type ObservedAddrManager struct {
// peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(ctx context.Context, host host.Host) *ObservedAddrManager {
oas := &ObservedAddrManager{
addrs: make(map[string][]*ObservedAddr),
addrs: make(map[string][]*observedAddr),
ttl: peerstore.OwnObservedAddrTTL,
wch: make(chan newObservation, observedAddrManagerWorkerChannelSize),
host: host,
@ -160,22 +160,22 @@ func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
return nil
}
var allObserved []*ObservedAddr
var allObserved []*observedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}
func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr)
func (oas *ObservedAddrManager) filter(observedAddrs []*observedAddr) []ma.Multiaddr {
pmap := make(map[string][]*observedAddr)
now := time.Now()
for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
if now.Sub(a.lastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
pat := a.groupKey()
pmap[pat] = append(pmap[pat], a)
}
@ -195,11 +195,11 @@ func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multi
return true
}
return len(first.SeenBy) > len(second.SeenBy)
return len(first.seenBy) > len(second.seenBy)
})
for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
addrs = append(addrs, s[i].addr)
}
}
@ -281,14 +281,14 @@ func (oas *ObservedAddrManager) gc() {
filteredAddrs := observedAddrs[:0]
for _, a := range observedAddrs {
// clean up SeenBy set
for k, ob := range a.SeenBy {
for k, ob := range a.seenBy {
if now.Sub(ob.seenTime) > oas.ttl*time.Duration(ActivationThresh) {
delete(a.SeenBy, k)
delete(a.seenBy, k)
}
}
// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
if now.Sub(a.lastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
}
}
@ -389,19 +389,19 @@ func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, obs
observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
if previousObserved.Addr.Equal(observed) {
observedAddrs[i].SeenBy[observerString] = ob
observedAddrs[i].LastSeen = now
if previousObserved.addr.Equal(observed) {
observedAddrs[i].seenBy[observerString] = ob
observedAddrs[i].lastSeen = now
return
}
}
// observed address not seen yet, append it
oas.addrs[localString] = append(oas.addrs[localString], &ObservedAddr{
Addr: observed,
SeenBy: map[string]observation{
oas.addrs[localString] = append(oas.addrs[localString], &observedAddr{
addr: observed,
seenBy: map[string]observation{
observerString: ob,
},
LastSeen: now,
lastSeen: now,
})
}

View File

@ -0,0 +1,38 @@
package identify
// This test lives in the identify package, not the identify_test package, so it
// can access internal types.
import (
"testing"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/tcp/2345")}
oa2 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
oa3 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.5/tcp/1231")}
oa4 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
oa5 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1531")}
oa6 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1531/quic")}
oa7 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1111/quic")}
oa8 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.5/udp/1111/quic")}
// different ports, same IP => same key
require.Equal(t, oa1.groupKey(), oa2.groupKey())
// different IPs => different key
require.NotEqual(t, oa2.groupKey(), oa3.groupKey())
// same port, different protos => different keys
require.NotEqual(t, oa3.groupKey(), oa4.groupKey())
// same port, same address, different protos => different keys
require.NotEqual(t, oa2.groupKey(), oa4.groupKey())
// udp works as well
require.Equal(t, oa4.groupKey(), oa5.groupKey())
// udp and quic are different
require.NotEqual(t, oa5.groupKey(), oa6.groupKey())
// quic works as well
require.Equal(t, oa6.groupKey(), oa7.groupKey())
require.NotEqual(t, oa7.groupKey(), oa8.groupKey())
}

View File

@ -312,30 +312,3 @@ func TestObservedAddrFiltering(t *testing.T) {
require.Contains(t, addrs, it7)
}
func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/2345")}
oa2 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
oa3 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/tcp/1231")}
oa4 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
oa5 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1531")}
oa6 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1531/quic")}
oa7 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.4/udp/1111/quic")}
oa8 := &identify.ObservedAddr{Addr: ma.StringCast("/ip4/1.2.3.5/udp/1111/quic")}
// different ports, same IP => same key
require.Equal(t, oa1.GroupKey(), oa2.GroupKey())
// different IPs => different key
require.NotEqual(t, oa2.GroupKey(), oa3.GroupKey())
// same port, different protos => different keys
require.NotEqual(t, oa3.GroupKey(), oa4.GroupKey())
// same port, same address, different protos => different keys
require.NotEqual(t, oa2.GroupKey(), oa4.GroupKey())
// udp works as well
require.Equal(t, oa4.GroupKey(), oa5.GroupKey())
// udp and quic are different
require.NotEqual(t, oa5.GroupKey(), oa6.GroupKey())
// quic works as well
require.Equal(t, oa6.GroupKey(), oa7.GroupKey())
require.NotEqual(t, oa7.GroupKey(), oa8.GroupKey())
}