fix: count observations with at least one inbound connection as inbound

We prefer addresses from inbound connections. We don't want outbound connections
to hide these perfectly good addresses.
This commit is contained in:
Steven Allen 2020-06-03 17:50:31 -07:00
parent 6ca83309d2
commit 1dd7a8c4d4
2 changed files with 96 additions and 11 deletions

View File

@ -36,8 +36,8 @@ var observedAddrManagerWorkerChannelSize = 16
var maxObservedAddrsPerIPAndTransport = 2
type observation struct {
seenTime time.Time
connDirection network.Direction
seenTime time.Time
inbound bool
}
// observedAddr is an entry for an address reported by our peers.
@ -62,7 +62,7 @@ func (oa *observedAddr) activated() bool {
func (oa *observedAddr) numInbound() int {
count := 0
for obs := range oa.seenBy {
if oa.seenBy[obs].connDirection == network.DirInbound {
if oa.seenBy[obs].inbound {
count++
}
}
@ -382,16 +382,19 @@ func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, obs
observerString := observerGroup(conn.RemoteMultiaddr())
localString := string(conn.LocalMultiaddr().Bytes())
ob := observation{
seenTime: now,
connDirection: conn.Stat().Direction,
seenTime: now,
inbound: conn.Stat().Direction == network.DirInbound,
}
observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
if previousObserved.addr.Equal(observed) {
observedAddrs[i].seenBy[observerString] = ob
observedAddrs[i].lastSeen = now
for _, observedAddr := range oas.addrs[localString] {
if observedAddr.addr.Equal(observed) {
// Don't trump an outbound observation with an inbound
// one.
ob.inbound = ob.inbound || observedAddr.seenBy[observerString].inbound
observedAddr.seenBy[observerString] = ob
observedAddr.lastSeen = now
return
}
}

View File

@ -49,6 +49,22 @@ func (h *harness) conn(observer peer.ID) network.Conn {
if err != nil {
h.t.Fatal(err)
}
if c.Stat().Direction != network.DirOutbound {
h.t.Fatal("expected conn direction to be outbound")
}
return c
}
func (h *harness) connInbound(observer peer.ID) network.Conn {
c, err := h.mocknet.ConnectPeers(observer, h.host.ID())
if err != nil {
h.t.Fatal(err)
}
c = mocknet.ConnComplement(c)
if c.Stat().Direction != network.DirInbound {
h.t.Fatal("expected conn direction to be inbound")
}
return c
}
@ -59,6 +75,13 @@ func (h *harness) observe(observed ma.Multiaddr, observer peer.ID) network.Conn
return c
}
func (h *harness) observeInbound(observed ma.Multiaddr, observer peer.ID) network.Conn {
c := h.connInbound(observer)
h.oas.Record(c, observed)
time.Sleep(50 * time.Millisecond) // let the worker run
return c
}
func newHarness(ctx context.Context, t *testing.T) harness {
mn := mocknet.New(ctx)
sk, err := p2putil.RandTestBogusPrivateKey()
@ -75,6 +98,7 @@ func newHarness(ctx context.Context, t *testing.T) harness {
oas: identify.NewObservedAddrManager(ctx, h),
mocknet: mn,
host: h,
t: t,
}
}
@ -292,7 +316,28 @@ func TestObservedAddrFiltering(t *testing.T) {
b4 := ma.StringCast("/ip4/1.2.3.9/tcp/1237")
b5 := ma.StringCast("/ip4/1.2.3.10/tcp/1237")
peers := []peer.ID{harness.add(b1), harness.add(b2), harness.add(b3), harness.add(b4), harness.add(b5)}
b6 := ma.StringCast("/ip4/1.2.3.11/tcp/1237")
b7 := ma.StringCast("/ip4/1.2.3.12/tcp/1237")
// These are all observers in the same group.
b8 := ma.StringCast("/ip4/1.2.3.13/tcp/1237")
b9 := ma.StringCast("/ip4/1.2.3.13/tcp/1238")
b10 := ma.StringCast("/ip4/1.2.3.13/tcp/1239")
peers := []peer.ID{
harness.add(b1),
harness.add(b2),
harness.add(b3),
harness.add(b4),
harness.add(b5),
harness.add(b6),
harness.add(b7),
harness.add(b8),
harness.add(b9),
harness.add(b10),
}
for i := 0; i < 4; i++ {
harness.observe(it1, peers[i])
harness.observe(it2, peers[i])
@ -311,4 +356,41 @@ func TestObservedAddrFiltering(t *testing.T) {
require.Contains(t, addrs, it1)
require.Contains(t, addrs, it7)
// Bump the number of observations so 1 & 7 have 7 observations.
harness.observe(it1, peers[5])
harness.observe(it1, peers[6])
harness.observe(it7, peers[5])
harness.observe(it7, peers[6])
// Add an observation from IP 1.2.3.13
// 2 & 3 now have 5 observations
harness.observe(it2, peers[7])
harness.observe(it3, peers[7])
addrs = harness.oas.Addrs()
require.Len(t, addrs, 2)
require.Contains(t, addrs, it1)
require.Contains(t, addrs, it7)
// Add an inbound observation from IP 1.2.3.13, it should override the
// existing observation and it should make these addresses win even
// though we have fewer observations.
//
// 2 & 3 now have 6 observations.
harness.observeInbound(it2, peers[8])
harness.observeInbound(it3, peers[8])
addrs = harness.oas.Addrs()
require.Len(t, addrs, 2)
require.Contains(t, addrs, it2)
require.Contains(t, addrs, it3)
// Adding an outbound observation shouldn't "downgrade" it.
//
// 2 & 3 now have 7 observations.
harness.observe(it2, peers[9])
harness.observe(it3, peers[9])
addrs = harness.oas.Addrs()
require.Len(t, addrs, 2)
require.Contains(t, addrs, it2)
require.Contains(t, addrs, it3)
}