exchange signed routing records in identify (#747)
* Exchange signed routing records in identify Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
This commit is contained in:
parent
c833e2c9fc
commit
077a81814f
|
@ -2,27 +2,30 @@ package basichost
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/connmgr"
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
|
||||
"github.com/libp2p/go-eventbus"
|
||||
inat "github.com/libp2p/go-libp2p-nat"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
|
@ -93,6 +96,9 @@ type BasicHost struct {
|
|||
}
|
||||
|
||||
addrChangeChan chan struct{}
|
||||
|
||||
signKey crypto.PrivKey
|
||||
caBook peerstore.CertifiedAddrBook
|
||||
}
|
||||
|
||||
var _ host.Host = (*BasicHost)(nil)
|
||||
|
@ -150,10 +156,21 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
|
|||
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
|
||||
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cab, ok := peerstore.GetCertifiedAddrBook(net.Peerstore())
|
||||
if !ok {
|
||||
return nil, errors.New("peerstore should also be a certified address book")
|
||||
}
|
||||
h.caBook = cab
|
||||
|
||||
h.signKey = h.Peerstore().PrivKey(h.ID())
|
||||
if h.signKey == nil {
|
||||
return nil, errors.New("unable to access host key")
|
||||
}
|
||||
|
||||
if opts.MultistreamMuxer != nil {
|
||||
h.mux = opts.MultistreamMuxer
|
||||
}
|
||||
|
@ -221,12 +238,12 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
|
|||
}
|
||||
|
||||
h, err := NewHost(context.Background(), net, hostopts)
|
||||
h.Start()
|
||||
if err != nil {
|
||||
// this cannot happen with legacy options
|
||||
// plus we want to keep the (deprecated) legacy interface unchanged
|
||||
panic(err)
|
||||
}
|
||||
h.Start()
|
||||
|
||||
return h
|
||||
}
|
||||
|
@ -327,39 +344,68 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
|
|||
return &evt
|
||||
}
|
||||
|
||||
func (h *BasicHost) makeSignedPeerRecord(evt *event.EvtLocalAddressesUpdated) (*record.Envelope, error) {
|
||||
current := make([]multiaddr.Multiaddr, 0, len(evt.Current))
|
||||
for _, a := range evt.Current {
|
||||
current = append(current, a.Address)
|
||||
}
|
||||
|
||||
rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{h.ID(), current})
|
||||
return record.Seal(rec, h.signKey)
|
||||
}
|
||||
|
||||
func (h *BasicHost) background() {
|
||||
defer h.refCount.Done()
|
||||
var lastAddrs []ma.Multiaddr
|
||||
|
||||
emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
|
||||
// nothing to do if both are nil..defensive check
|
||||
if currentAddrs == nil && lastAddrs == nil {
|
||||
return
|
||||
}
|
||||
|
||||
changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)
|
||||
|
||||
if changeEvt == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// add signed peer record to the event
|
||||
sr, err := h.makeSignedPeerRecord(changeEvt)
|
||||
if err != nil {
|
||||
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
|
||||
return
|
||||
}
|
||||
changeEvt.SignedPeerRecord = *sr
|
||||
|
||||
// persist the signed record to the peerstore
|
||||
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
|
||||
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
|
||||
return
|
||||
}
|
||||
|
||||
// emit addr change event on the bus
|
||||
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
|
||||
log.Warnf("error emitting event for updated addrs: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// periodically schedules an IdentifyPush to update our peers for changes
|
||||
// in our address set (if needed)
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
// initialize lastAddrs
|
||||
lastAddrs := h.Addrs()
|
||||
|
||||
for {
|
||||
curr := h.Addrs()
|
||||
emitAddrChange(curr, lastAddrs)
|
||||
lastAddrs = curr
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-h.addrChangeChan:
|
||||
case <-h.ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed.
|
||||
addrs := h.Addrs()
|
||||
changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs)
|
||||
if changeEvt != nil {
|
||||
lastAddrs = addrs
|
||||
}
|
||||
|
||||
if changeEvt != nil {
|
||||
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
|
||||
if err != nil {
|
||||
log.Warnf("error emitting event for updated addrs: %s", err)
|
||||
}
|
||||
h.ids.Push()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,21 +5,24 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-eventbus"
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
"github.com/libp2p/go-libp2p-core/helpers"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
"github.com/libp2p/go-libp2p-core/test"
|
||||
|
||||
"github.com/libp2p/go-eventbus"
|
||||
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
madns "github.com/multiformats/go-multiaddr-dns"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -102,16 +105,35 @@ func TestProtocolHandlerEvents(t *testing.T) {
|
|||
}
|
||||
defer sub.Close()
|
||||
|
||||
assert := func(added, removed []protocol.ID) {
|
||||
var next event.EvtLocalProtocolsUpdated
|
||||
select {
|
||||
case evt := <-sub.Out():
|
||||
next = evt.(event.EvtLocalProtocolsUpdated)
|
||||
break
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("event not received in 5 seconds")
|
||||
// the identify service adds new protocol handlers shortly after the host
|
||||
// starts. this helps us filter those events out, since they're unrelated
|
||||
// to the test.
|
||||
isIdentify := func(evt event.EvtLocalProtocolsUpdated) bool {
|
||||
for _, p := range evt.Added {
|
||||
if p == identify.ID || p == identify.IDPush {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
nextEvent := func() event.EvtLocalProtocolsUpdated {
|
||||
for {
|
||||
select {
|
||||
case evt := <-sub.Out():
|
||||
next := evt.(event.EvtLocalProtocolsUpdated)
|
||||
if isIdentify(next) {
|
||||
continue
|
||||
}
|
||||
return next
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("event not received in 5 seconds")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert := func(added, removed []protocol.ID) {
|
||||
next := nextEvent()
|
||||
if !reflect.DeepEqual(added, next.Added) {
|
||||
t.Errorf("expected added: %v; received: %v", added, next.Added)
|
||||
}
|
||||
|
@ -460,11 +482,10 @@ func TestAddrResolution(t *testing.T) {
|
|||
_ = h.Connect(tctx, *pi)
|
||||
|
||||
addrs := h.Peerstore().Addrs(pi.ID)
|
||||
sort.Sort(sortedMultiaddrs(addrs))
|
||||
|
||||
if len(addrs) != 2 || !addrs[0].Equal(addr1) || !addrs[1].Equal(addr2) {
|
||||
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs)
|
||||
}
|
||||
require.Len(t, addrs, 2)
|
||||
require.Contains(t, addrs, addr1)
|
||||
require.Contains(t, addrs, addr2)
|
||||
}
|
||||
|
||||
func TestAddrResolutionRecursive(t *testing.T) {
|
||||
|
@ -515,11 +536,9 @@ func TestAddrResolutionRecursive(t *testing.T) {
|
|||
_ = h.Connect(tctx, *pi1)
|
||||
|
||||
addrs1 := h.Peerstore().Addrs(pi1.ID)
|
||||
sort.Sort(sortedMultiaddrs(addrs1))
|
||||
|
||||
if len(addrs1) != 2 || !addrs1[0].Equal(addr1) || !addrs1[1].Equal(addr2) {
|
||||
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs1)
|
||||
}
|
||||
require.Len(t, addrs1, 2)
|
||||
require.Contains(t, addrs1, addr1)
|
||||
require.Contains(t, addrs1, addr2)
|
||||
|
||||
pi2, err := peer.AddrInfoFromP2pAddr(p2paddr2)
|
||||
if err != nil {
|
||||
|
@ -529,11 +548,49 @@ func TestAddrResolutionRecursive(t *testing.T) {
|
|||
_ = h.Connect(tctx, *pi2)
|
||||
|
||||
addrs2 := h.Peerstore().Addrs(pi2.ID)
|
||||
sort.Sort(sortedMultiaddrs(addrs2))
|
||||
require.Len(t, addrs2, 1)
|
||||
require.Contains(t, addrs2, addr1)
|
||||
}
|
||||
|
||||
if len(addrs2) != 1 || !addrs2[0].Equal(addr1) {
|
||||
t.Fatalf("expected [%s], got %+v", addr1, addrs2)
|
||||
func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")}
|
||||
|
||||
h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
|
||||
return taddrs
|
||||
}))
|
||||
defer h.Close()
|
||||
|
||||
sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
defer sub.Close()
|
||||
// wait for the host background thread to start
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
expected := event.EvtLocalAddressesUpdated{
|
||||
Diffs: true,
|
||||
Current: []event.UpdatedAddress{
|
||||
{Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
|
||||
},
|
||||
Removed: []event.UpdatedAddress{}}
|
||||
|
||||
// assert we get expected event
|
||||
evt := waitForAddrChangeEvent(ctx, sub, t)
|
||||
if !updatedAddrEventsEqual(expected, evt) {
|
||||
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, evt)
|
||||
}
|
||||
|
||||
// assert it's on the signed record
|
||||
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
|
||||
require.Equal(t, taddrs, rc.Addrs)
|
||||
|
||||
// assert it's in the peerstore
|
||||
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
|
||||
require.NotNil(t, ev)
|
||||
rc = peerRecordFromEnvelope(t, *ev)
|
||||
require.Equal(t, taddrs, rc.Addrs)
|
||||
}
|
||||
|
||||
func TestHostAddrChangeDetection(t *testing.T) {
|
||||
|
@ -611,9 +668,18 @@ func TestHostAddrChangeDetection(t *testing.T) {
|
|||
h.SignalAddressChange()
|
||||
evt := waitForAddrChangeEvent(ctx, sub, t)
|
||||
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
|
||||
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt)
|
||||
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i-1], evt)
|
||||
}
|
||||
|
||||
// assert it's on the signed record
|
||||
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
|
||||
require.Equal(t, addrSets[i], rc.Addrs)
|
||||
|
||||
// assert it's in the peerstore
|
||||
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
|
||||
require.NotNil(t, ev)
|
||||
rc = peerRecordFromEnvelope(t, *ev)
|
||||
require.Equal(t, addrSets[i], rc.Addrs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -672,10 +738,17 @@ func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool {
|
|||
updatedAddrsEqual(a.Removed, b.Removed)
|
||||
}
|
||||
|
||||
type sortedMultiaddrs []ma.Multiaddr
|
||||
|
||||
func (sma sortedMultiaddrs) Len() int { return len(sma) }
|
||||
func (sma sortedMultiaddrs) Swap(i, j int) { sma[i], sma[j] = sma[j], sma[i] }
|
||||
func (sma sortedMultiaddrs) Less(i, j int) bool {
|
||||
return bytes.Compare(sma[i].Bytes(), sma[j].Bytes()) == 1
|
||||
func peerRecordFromEnvelope(t *testing.T, ev record.Envelope) *peer.PeerRecord {
|
||||
t.Helper()
|
||||
rec, err := ev.Record()
|
||||
if err != nil {
|
||||
t.Fatalf("error getting PeerRecord from event: %v", err)
|
||||
return nil
|
||||
}
|
||||
peerRec, ok := rec.(*peer.PeerRecord)
|
||||
if !ok {
|
||||
t.Fatalf("wrong type for peer record")
|
||||
return nil
|
||||
}
|
||||
return peerRec
|
||||
}
|
||||
|
|
|
@ -15,13 +15,13 @@ import (
|
|||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
|
||||
"github.com/libp2p/go-eventbus"
|
||||
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
|
||||
|
||||
ggio "github.com/gogo/protobuf/io"
|
||||
logging "github.com/ipfs/go-log"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr-net"
|
||||
msmux "github.com/multiformats/go-multistream"
|
||||
|
@ -30,7 +30,11 @@ import (
|
|||
var log = logging.Logger("net/identify")
|
||||
|
||||
// ID is the protocol.ID of the Identify Service.
|
||||
const ID = "/ipfs/id/1.0.0"
|
||||
const ID = "/p2p/id/1.1.0"
|
||||
|
||||
// LegacyID is the protocol.ID of version 1.0.0 of the identify
|
||||
// service, which does not support signed peer records.
|
||||
const LegacyID = "/ipfs/id/1.0.0"
|
||||
|
||||
// LibP2PVersion holds the current protocol version for a client running this code
|
||||
// TODO(jbenet): fix the versioning mess.
|
||||
|
@ -84,11 +88,13 @@ type IDService struct {
|
|||
|
||||
addrMu sync.Mutex
|
||||
|
||||
peerrec *record.Envelope
|
||||
peerrecMu sync.RWMutex
|
||||
|
||||
// our own observed addresses.
|
||||
observedAddrs *ObservedAddrManager
|
||||
|
||||
subscription event.Subscription
|
||||
emitters struct {
|
||||
emitters struct {
|
||||
evtPeerProtocolsUpdated event.Emitter
|
||||
evtPeerIdentificationCompleted event.Emitter
|
||||
evtPeerIdentificationFailed event.Emitter
|
||||
|
@ -121,34 +127,67 @@ func NewIDService(h host.Host, opts ...Option) *IDService {
|
|||
|
||||
// handle local protocol handler updates, and push deltas to peers.
|
||||
var err error
|
||||
s.subscription, err = h.EventBus().Subscribe(&event.EvtLocalProtocolsUpdated{}, eventbus.BufSize(128))
|
||||
if err != nil {
|
||||
log.Warningf("identify service not subscribed to local protocol handlers updates; err: %s", err)
|
||||
} else {
|
||||
s.refCount.Add(1)
|
||||
go s.handleEvents()
|
||||
}
|
||||
|
||||
s.refCount.Add(1)
|
||||
go s.handleEvents()
|
||||
|
||||
s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{})
|
||||
if err != nil {
|
||||
log.Warningf("identify service not emitting peer protocol updates; err: %s", err)
|
||||
log.Warnf("identify service not emitting peer protocol updates; err: %s", err)
|
||||
}
|
||||
s.emitters.evtPeerIdentificationCompleted, err = h.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{})
|
||||
if err != nil {
|
||||
log.Warningf("identify service not emitting identification completed events; err: %s", err)
|
||||
log.Warnf("identify service not emitting identification completed events; err: %s", err)
|
||||
}
|
||||
s.emitters.evtPeerIdentificationFailed, err = h.EventBus().Emitter(&event.EvtPeerIdentificationFailed{})
|
||||
if err != nil {
|
||||
log.Warningf("identify service not emitting identification failed events; err: %s", err)
|
||||
log.Warnf("identify service not emitting identification failed events; err: %s", err)
|
||||
}
|
||||
|
||||
// register protocols that do not depend on peer records.
|
||||
h.SetStreamHandler(IDDelta, s.deltaHandler)
|
||||
h.SetStreamHandler(LegacyID, s.requestHandler)
|
||||
h.SetStreamHandler(LegacyIDPush, s.pushHandler)
|
||||
|
||||
// register protocols that depend on peer records.
|
||||
h.SetStreamHandler(ID, s.requestHandler)
|
||||
h.SetStreamHandler(IDPush, s.pushHandler)
|
||||
h.SetStreamHandler(IDDelta, s.deltaHandler)
|
||||
|
||||
h.Network().Notify((*netNotifiee)(s))
|
||||
return s
|
||||
}
|
||||
|
||||
func (ids *IDService) handleEvents() {
|
||||
defer ids.refCount.Done()
|
||||
|
||||
sub, err := ids.Host.EventBus().Subscribe([]interface{}{&event.EvtLocalProtocolsUpdated{},
|
||||
&event.EvtLocalAddressesUpdated{}}, eventbus.BufSize(256))
|
||||
if err != nil {
|
||||
log.Errorf("failed to subscribe to events on the bus, err=%s", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer sub.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case e, more := <-sub.Out():
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
switch evt := e.(type) {
|
||||
case event.EvtLocalAddressesUpdated:
|
||||
ids.handleLocalAddrsUpdated(evt)
|
||||
case event.EvtLocalProtocolsUpdated:
|
||||
ids.handleProtosChanged(evt)
|
||||
}
|
||||
|
||||
case <-ids.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close shuts down the IDService
|
||||
func (ids *IDService) Close() error {
|
||||
ids.closeSync.Do(func() {
|
||||
|
@ -158,22 +197,18 @@ func (ids *IDService) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (ids *IDService) handleEvents() {
|
||||
sub := ids.subscription
|
||||
defer ids.refCount.Done()
|
||||
defer sub.Close()
|
||||
func (ids *IDService) handleProtosChanged(evt event.EvtLocalProtocolsUpdated) {
|
||||
ids.fireProtocolDelta(evt)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, more := <-sub.Out():
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
ids.fireProtocolDelta(evt.(event.EvtLocalProtocolsUpdated))
|
||||
case <-ids.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
func (ids *IDService) handleLocalAddrsUpdated(evt event.EvtLocalAddressesUpdated) {
|
||||
ids.peerrecMu.Lock()
|
||||
rec := evt.SignedPeerRecord
|
||||
ids.peerrec = &rec
|
||||
ids.peerrecMu.Unlock()
|
||||
|
||||
log.Debug("triggering push based on updated local PeerRecord")
|
||||
ids.Push()
|
||||
}
|
||||
|
||||
// OwnObservedAddrs returns the addresses peers have reported we've dialed from
|
||||
|
@ -259,25 +294,29 @@ func (ids *IDService) identifyConn(c network.Conn, signal chan struct{}) {
|
|||
return
|
||||
}
|
||||
|
||||
s.SetProtocol(ID)
|
||||
|
||||
protocolIDs := []string{ID, LegacyID}
|
||||
// ok give the response to our handler.
|
||||
if err = msmux.SelectProtoOrFail(ID, s); err != nil {
|
||||
var selectedProto string
|
||||
if selectedProto, err = msmux.SelectOneOf(protocolIDs, s); err != nil {
|
||||
log.Event(context.TODO(), "IdentifyOpenFailed", c.RemotePeer(), logging.Metadata{"error": err})
|
||||
s.Reset()
|
||||
return
|
||||
}
|
||||
|
||||
s.SetProtocol(protocol.ID(selectedProto))
|
||||
ids.responseHandler(s)
|
||||
}
|
||||
|
||||
func protoSupportsPeerRecords(proto protocol.ID) bool {
|
||||
return proto == ID || proto == IDPush
|
||||
}
|
||||
|
||||
func (ids *IDService) requestHandler(s network.Stream) {
|
||||
defer helpers.FullClose(s)
|
||||
c := s.Conn()
|
||||
|
||||
w := ggio.NewDelimitedWriter(s)
|
||||
mes := pb.Identify{}
|
||||
ids.populateMessage(&mes, s.Conn())
|
||||
ids.populateMessage(&mes, s.Conn(), protoSupportsPeerRecords(s.Protocol()))
|
||||
w.WriteMsg(&mes)
|
||||
|
||||
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
|
||||
|
@ -297,14 +336,15 @@ func (ids *IDService) responseHandler(s network.Stream) {
|
|||
defer func() { go helpers.FullClose(s) }()
|
||||
|
||||
log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())
|
||||
ids.consumeMessage(&mes, c)
|
||||
ids.consumeMessage(&mes, c, protoSupportsPeerRecords(s.Protocol()))
|
||||
}
|
||||
|
||||
func (ids *IDService) broadcast(proto protocol.ID, payloadWriter func(s network.Stream)) {
|
||||
func (ids *IDService) broadcast(protos []protocol.ID, payloadWriter func(s network.Stream)) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
protoStrs := protocol.ConvertToStrings(protos)
|
||||
ctx, cancel := context.WithTimeout(ids.ctx, 30*time.Second)
|
||||
ctx = network.WithNoDial(ctx, string(proto))
|
||||
ctx = network.WithNoDial(ctx, protoStrs[0])
|
||||
|
||||
pstore := ids.Host.Peerstore()
|
||||
for _, p := range ids.Host.Network().Peers() {
|
||||
|
@ -324,13 +364,13 @@ func (ids *IDService) broadcast(proto protocol.ID, payloadWriter func(s network.
|
|||
}
|
||||
|
||||
// avoid the unnecessary stream if the peer does not support the protocol.
|
||||
if sup, err := pstore.SupportsProtocols(p, string(proto)); err != nil && len(sup) == 0 {
|
||||
if sup, err := pstore.SupportsProtocols(p, protoStrs...); err != nil && len(sup) == 0 {
|
||||
// the peer does not support the required protocol.
|
||||
return
|
||||
}
|
||||
// if the peerstore query errors, we go ahead anyway.
|
||||
|
||||
s, err := ids.Host.NewStream(ctx, p, proto)
|
||||
s, err := ids.Host.NewStream(ctx, p, protos...)
|
||||
if err != nil {
|
||||
log.Debugf("error opening push stream to %s: %s", p, err.Error())
|
||||
return
|
||||
|
@ -358,7 +398,7 @@ func (ids *IDService) broadcast(proto protocol.ID, payloadWriter func(s network.
|
|||
}()
|
||||
}
|
||||
|
||||
func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn) {
|
||||
func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn, usePeerRecords bool) {
|
||||
// set protocols this node is currently handling
|
||||
protos := ids.Host.Mux().Protocols()
|
||||
mes.Protocols = make([]string, len(protos))
|
||||
|
@ -370,18 +410,35 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn) {
|
|||
// "public" address, at least in relation to us.
|
||||
mes.ObservedAddr = c.RemoteMultiaddr().Bytes()
|
||||
|
||||
// set listen addrs, get our latest addrs from Host.
|
||||
laddrs := ids.Host.Addrs()
|
||||
// Note: LocalMultiaddr is sometimes 0.0.0.0
|
||||
viaLoopback := manet.IsIPLoopback(c.LocalMultiaddr()) || manet.IsIPLoopback(c.RemoteMultiaddr())
|
||||
mes.ListenAddrs = make([][]byte, 0, len(laddrs))
|
||||
for _, addr := range laddrs {
|
||||
if !viaLoopback && manet.IsIPLoopback(addr) {
|
||||
continue
|
||||
if usePeerRecords {
|
||||
ids.peerrecMu.RLock()
|
||||
rec := ids.peerrec
|
||||
ids.peerrecMu.RUnlock()
|
||||
|
||||
if rec == nil {
|
||||
log.Errorf("latest peer record does not exist. identify message incomplete!")
|
||||
} else {
|
||||
recBytes, err := rec.Marshal()
|
||||
if err != nil {
|
||||
log.Errorf("error marshaling peer record: %v", err)
|
||||
} else {
|
||||
mes.SignedPeerRecord = recBytes
|
||||
log.Debugf("%s sent peer record to %s", c.LocalPeer(), c.RemotePeer())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// set listen addrs, get our latest addrs from Host.
|
||||
laddrs := ids.Host.Addrs()
|
||||
// Note: LocalMultiaddr is sometimes 0.0.0.0
|
||||
viaLoopback := manet.IsIPLoopback(c.LocalMultiaddr()) || manet.IsIPLoopback(c.RemoteMultiaddr())
|
||||
mes.ListenAddrs = make([][]byte, 0, len(laddrs))
|
||||
for _, addr := range laddrs {
|
||||
if !viaLoopback && manet.IsIPLoopback(addr) {
|
||||
continue
|
||||
}
|
||||
mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes())
|
||||
}
|
||||
mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes())
|
||||
}
|
||||
log.Debugf("%s sent listen addrs to %s: %s", c.LocalPeer(), c.RemotePeer(), laddrs)
|
||||
|
||||
// set our public key
|
||||
ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID())
|
||||
|
@ -411,7 +468,7 @@ func (ids *IDService) populateMessage(mes *pb.Identify, c network.Conn) {
|
|||
mes.AgentVersion = &av
|
||||
}
|
||||
|
||||
func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) {
|
||||
func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn, usePeerRecords bool) {
|
||||
p := c.RemotePeer()
|
||||
|
||||
// mes.Protocols
|
||||
|
@ -441,18 +498,37 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) {
|
|||
// that picks random source ports, this can cause DHT nodes to collect
|
||||
// many undialable addresses for other peers.
|
||||
|
||||
// add certified addresses for the peer, if they sent us a signed peer record
|
||||
var signedPeerRecord *record.Envelope
|
||||
if usePeerRecords {
|
||||
var err error
|
||||
signedPeerRecord, err = signedPeerRecordFromMessage(mes)
|
||||
if err != nil {
|
||||
log.Errorf("error getting peer record from Identify message: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Extend the TTLs on the known (probably) good addresses.
|
||||
// Taking the lock ensures that we don't concurrently process a disconnect.
|
||||
ids.addrMu.Lock()
|
||||
switch ids.Host.Network().Connectedness(p) {
|
||||
case network.Connected:
|
||||
// invalidate previous addrs -- we use a transient ttl instead of 0 to ensure there
|
||||
// is no period of having no good addrs whatsoever
|
||||
ids.Host.Peerstore().UpdateAddrs(p, peerstore.ConnectedAddrTTL, transientTTL)
|
||||
ids.Host.Peerstore().AddAddrs(p, lmaddrs, peerstore.ConnectedAddrTTL)
|
||||
default:
|
||||
ids.Host.Peerstore().UpdateAddrs(p, peerstore.ConnectedAddrTTL, transientTTL)
|
||||
ids.Host.Peerstore().AddAddrs(p, lmaddrs, peerstore.RecentlyConnectedAddrTTL)
|
||||
ttl := peerstore.RecentlyConnectedAddrTTL
|
||||
if ids.Host.Network().Connectedness(p) == network.Connected {
|
||||
ttl = peerstore.ConnectedAddrTTL
|
||||
}
|
||||
|
||||
// invalidate previous addrs -- we use a transient ttl instead of 0 to ensure there
|
||||
// is no period of having no good addrs whatsoever
|
||||
ids.Host.Peerstore().UpdateAddrs(p, peerstore.ConnectedAddrTTL, transientTTL)
|
||||
|
||||
// add signed addrs if we have them and the peerstore supports them
|
||||
cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore())
|
||||
if ok && signedPeerRecord != nil {
|
||||
_, addErr := cab.ConsumePeerRecord(signedPeerRecord, ttl)
|
||||
if addErr != nil {
|
||||
log.Debugf("error adding signed addrs to peerstore: %v", addErr)
|
||||
}
|
||||
} else {
|
||||
ids.Host.Peerstore().AddAddrs(p, lmaddrs, ttl)
|
||||
}
|
||||
ids.addrMu.Unlock()
|
||||
|
||||
|
@ -595,6 +671,14 @@ func addrInAddrs(a ma.Multiaddr, as []ma.Multiaddr) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) {
|
||||
if msg.SignedPeerRecord == nil || len(msg.SignedPeerRecord) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
env, _, err := record.ConsumeEnvelope(msg.SignedPeerRecord, peer.PeerRecordEnvelopeDomain)
|
||||
return env, err
|
||||
}
|
||||
|
||||
// netNotifiee defines methods to be used with the IpfsDHT
|
||||
type netNotifiee IDService
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ func (ids *IDService) fireProtocolDelta(evt event.EvtLocalProtocolsUpdated) {
|
|||
}
|
||||
log.Debugf("%s sent delta update to %s: %s", IDDelta, c.RemotePeer(), c.RemoteMultiaddr())
|
||||
}
|
||||
ids.broadcast(IDDelta, deltaWriter)
|
||||
ids.broadcast([]protocol.ID{IDDelta}, deltaWriter)
|
||||
}
|
||||
|
||||
// consumeDelta processes an incoming delta from a peer, updating the peerstore
|
||||
|
|
|
@ -1,17 +1,26 @@
|
|||
package identify
|
||||
|
||||
import "github.com/libp2p/go-libp2p-core/network"
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
)
|
||||
|
||||
// IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing
|
||||
// the current state of the peer.
|
||||
//
|
||||
// It is in the process of being replaced by identify delta, which sends only diffs for better
|
||||
// resource utilisation.
|
||||
const IDPush = "/ipfs/id/push/1.0.0"
|
||||
const IDPush = "/p2p/id/push/1.1.0"
|
||||
|
||||
// LegacyIDPush is the protocol.ID of the previous version of the Identify push protocol,
|
||||
// which does not support exchanging signed addresses in PeerRecords.
|
||||
// It is still supported for backwards compatibility if a remote peer does not support
|
||||
// the current version.
|
||||
const LegacyIDPush = "/ipfs/id/push/1.0.0"
|
||||
|
||||
// Push pushes a full identify message to all peers containing the current state.
|
||||
func (ids *IDService) Push() {
|
||||
ids.broadcast(IDPush, ids.requestHandler)
|
||||
ids.broadcast([]protocol.ID{IDPush, LegacyIDPush}, ids.requestHandler)
|
||||
}
|
||||
|
||||
// pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol.
|
||||
|
|
|
@ -2,6 +2,8 @@ package identify_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/libp2p/go-libp2p-core/record"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
@ -34,6 +36,8 @@ func subtestIDService(t *testing.T) {
|
|||
|
||||
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
generatePeerRecord(t, h1)
|
||||
generatePeerRecord(t, h2)
|
||||
|
||||
h1p := h1.ID()
|
||||
h2p := h2.ID()
|
||||
|
@ -46,6 +50,9 @@ func subtestIDService(t *testing.T) {
|
|||
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{}) // nothing
|
||||
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{}) // nothing
|
||||
|
||||
// the forgetMe addr represents an address for h1 that h2 has learned out of band
|
||||
// (not via identify protocol). Shortly after the identify exchange, it will be
|
||||
// forgotten and replaced by the addrs h1 sends during identify
|
||||
forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")
|
||||
|
||||
h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL)
|
||||
|
@ -71,7 +78,8 @@ func subtestIDService(t *testing.T) {
|
|||
// the IDService should be opened automatically, by the network.
|
||||
// what we should see now is that both peers know about each others listen addresses.
|
||||
t.Log("test peer1 has peer2 addrs correctly")
|
||||
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
|
||||
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
|
||||
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // should have signed addrs also
|
||||
testHasProtocolVersions(t, h1, h2p)
|
||||
testHasPublicKey(t, h1, h2p, h2.Peerstore().PubKey(h2p)) // h1 should have h2's public key
|
||||
|
||||
|
@ -89,6 +97,7 @@ func subtestIDService(t *testing.T) {
|
|||
// and the protocol versions.
|
||||
t.Log("test peer2 has peer1 addrs correctly")
|
||||
testKnowsAddrs(t, h2, h1p, addrs) // has them
|
||||
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))
|
||||
testHasProtocolVersions(t, h2, h1p)
|
||||
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key
|
||||
|
||||
|
@ -99,19 +108,21 @@ func subtestIDService(t *testing.T) {
|
|||
t.Fatal("should have no connections")
|
||||
}
|
||||
|
||||
t.Log("testing addrs just after disconnect")
|
||||
// addresses don't immediately expire on disconnect, so we should still have them
|
||||
testKnowsAddrs(t, h2, h1p, addrs)
|
||||
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
|
||||
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
|
||||
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Forget the first one.
|
||||
testKnowsAddrs(t, h2, h1p, addrs[:len(addrs)-1])
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Forget the rest.
|
||||
// the addrs had their TTLs reduced on disconnect, and
|
||||
// will be forgotten soon after
|
||||
t.Log("testing addrs after TTL expiration")
|
||||
time.Sleep(2 * time.Second)
|
||||
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
|
||||
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
|
||||
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
|
||||
testHasCertifiedAddrs(t, h2, h1p, []ma.Multiaddr{})
|
||||
|
||||
// test that we received the "identify completed" event.
|
||||
select {
|
||||
|
@ -125,7 +136,36 @@ func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiadd
|
|||
t.Helper()
|
||||
|
||||
actual := h.Peerstore().Addrs(p)
|
||||
checkAddrs(t, expected, actual, fmt.Sprintf("%s did not have addr for %s", h.ID(), p))
|
||||
}
|
||||
|
||||
func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
|
||||
t.Helper()
|
||||
cab, ok := peerstore.GetCertifiedAddrBook(h.Peerstore())
|
||||
if !ok {
|
||||
t.Error("expected peerstore to implement CertifiedAddrBook")
|
||||
}
|
||||
recordEnvelope := cab.GetPeerRecord(p)
|
||||
if recordEnvelope == nil {
|
||||
if len(expected) == 0 {
|
||||
return
|
||||
}
|
||||
t.Fatalf("peerstore has no signed record for peer %s", p)
|
||||
}
|
||||
r, err := recordEnvelope.Record()
|
||||
if err != nil {
|
||||
t.Error("Error unwrapping signed PeerRecord from envelope", err)
|
||||
}
|
||||
rec, ok := r.(*peer.PeerRecord)
|
||||
if !ok {
|
||||
t.Error("unexpected record type")
|
||||
}
|
||||
|
||||
checkAddrs(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p))
|
||||
}
|
||||
|
||||
func checkAddrs(t *testing.T, expected, actual []ma.Multiaddr, msg string) {
|
||||
t.Helper()
|
||||
if len(actual) != len(expected) {
|
||||
t.Errorf("expected: %s", expected)
|
||||
t.Errorf("actual: %s", actual)
|
||||
|
@ -138,7 +178,7 @@ func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiadd
|
|||
}
|
||||
for _, addr := range expected {
|
||||
if _, found := have[addr.String()]; !found {
|
||||
t.Errorf("%s did not have addr for %s: %s", h.ID(), p, addr)
|
||||
t.Errorf("%s: %s", msg, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -177,6 +217,36 @@ func testHasPublicKey(t *testing.T, h host.Host, p peer.ID, shouldBe ic.PubKey)
|
|||
}
|
||||
}
|
||||
|
||||
// we're using BlankHost in our tests, which doesn't automatically generate peer records
|
||||
// like BasicHost. This generates a record and puts it on the host's event bus, which
|
||||
// will cause the identify service to start supporting new protocol versions that
|
||||
// depend on peer records being available.
|
||||
func generatePeerRecord(t *testing.T, h host.Host) {
|
||||
t.Helper()
|
||||
|
||||
key := h.Peerstore().PrivKey(h.ID())
|
||||
if key == nil {
|
||||
t.Fatal("no private key for host")
|
||||
}
|
||||
|
||||
rec := peer.NewPeerRecord()
|
||||
rec.PeerID = h.ID()
|
||||
rec.Addrs = h.Addrs()
|
||||
signed, err := record.Seal(rec, key)
|
||||
if err != nil {
|
||||
t.Fatalf("error generating peer record: %s", err)
|
||||
}
|
||||
evt := event.EvtLocalAddressesUpdated{SignedPeerRecord: *signed}
|
||||
emitter, err := h.EventBus().Emitter(new(event.EvtLocalAddressesUpdated), eventbus.Stateful)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = emitter.Emit(evt)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIDServiceWait gives the ID service 1s to finish after dialing
|
||||
// this is because it used to be concurrent. Now, Dial wait till the
|
||||
// id service is done.
|
||||
|
@ -423,11 +493,14 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) {
|
|||
// replace the original identify handler by one that blocks until we close the block channel.
|
||||
// this allows us to control how long identify runs.
|
||||
block := make(chan struct{})
|
||||
h1.RemoveStreamHandler(identify.ID)
|
||||
h1.SetStreamHandler(identify.ID, func(s network.Stream) {
|
||||
handler := func(s network.Stream) {
|
||||
<-block
|
||||
go helpers.FullClose(s)
|
||||
})
|
||||
}
|
||||
h1.RemoveStreamHandler(identify.ID)
|
||||
h1.RemoveStreamHandler(identify.LegacyID)
|
||||
h1.SetStreamHandler(identify.ID, handler)
|
||||
h1.SetStreamHandler(identify.LegacyID, handler)
|
||||
|
||||
// from h2 connect to h1.
|
||||
if err := h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}); err != nil {
|
||||
|
@ -507,3 +580,51 @@ func TestUserAgent(t *testing.T) {
|
|||
t.Errorf("expected agent version %q, got %q", "bar", av)
|
||||
}
|
||||
}
|
||||
|
||||
// make sure that we still support older peers using "legacy" versions of identify
|
||||
func TestCompatibilityWithPeersThatDoNotSupportSignedAddrs(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
|
||||
defer h2.Close()
|
||||
defer h1.Close()
|
||||
|
||||
ids := identify.NewIDService(h1)
|
||||
ids2 := identify.NewIDService(h2)
|
||||
|
||||
defer ids.Close()
|
||||
defer ids2.Close()
|
||||
|
||||
// generate initial peer record only for h1. this will cause h1 to enable
|
||||
// the new protocols, but h2 will still use legacy protos
|
||||
generatePeerRecord(t, h1)
|
||||
|
||||
h2p := h2.ID()
|
||||
h2pi := h2.Peerstore().PeerInfo(h2p)
|
||||
if err := h1.Connect(ctx, h2pi); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
h1t2c := h1.Network().ConnsToPeer(h2p)
|
||||
if len(h1t2c) == 0 {
|
||||
t.Fatal("should have a conn here")
|
||||
}
|
||||
|
||||
ids.IdentifyConn(h1t2c[0])
|
||||
// the IDService should be opened automatically, by the network.
|
||||
// what we should see now is that both peers know about each others listen addresses.
|
||||
t.Log("test peer1 has peer2 addrs correctly")
|
||||
testKnowsAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p)) // has them
|
||||
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{}) // should not have signed addrs
|
||||
|
||||
// double check that it works when both peers support the new protos
|
||||
// enable new protos for h2 by generating a peer record
|
||||
generatePeerRecord(t, h2)
|
||||
|
||||
// if we re-identify, h1 should now have certified addrs for h2
|
||||
ids.IdentifyConn(h1t2c[0])
|
||||
t.Log("test peer1 has peer2 certified addrs correctly")
|
||||
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
|
||||
}
|
||||
|
|
|
@ -98,7 +98,13 @@ type Identify struct {
|
|||
// protocols are the services this node is running
|
||||
Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"`
|
||||
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
|
||||
Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"`
|
||||
Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"`
|
||||
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
|
||||
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
|
||||
// in a form that lets us share authenticated addrs with other peers.
|
||||
// see github.com/libp2p/go-libp2p-core/record/pb/envelope.proto and
|
||||
// github.com/libp2p/go-libp2p-core/peer/pb/peer_record.proto for message definitions.
|
||||
SignedPeerRecord []byte `protobuf:"bytes,8,opt,name=signedPeerRecord" json:"signedPeerRecord,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
XXX_sizecache int32 `json:"-"`
|
||||
|
@ -186,6 +192,13 @@ func (m *Identify) GetDelta() *Delta {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Identify) GetSignedPeerRecord() []byte {
|
||||
if m != nil {
|
||||
return m.SignedPeerRecord
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*Delta)(nil), "identify.pb.Delta")
|
||||
proto.RegisterType((*Identify)(nil), "identify.pb.Identify")
|
||||
|
@ -194,23 +207,24 @@ func init() {
|
|||
func init() { proto.RegisterFile("identify.proto", fileDescriptor_83f1e7e6b485409f) }
|
||||
|
||||
var fileDescriptor_83f1e7e6b485409f = []byte{
|
||||
// 251 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x5c, 0x90, 0xb1, 0x4e, 0xc3, 0x30,
|
||||
0x14, 0x45, 0xe5, 0x96, 0x02, 0x79, 0xb1, 0x5a, 0xe9, 0x4d, 0x1e, 0x50, 0x64, 0xb2, 0xe0, 0x29,
|
||||
0x03, 0x7f, 0x00, 0x62, 0x41, 0x2c, 0xc8, 0x48, 0xac, 0x28, 0xa9, 0x1f, 0xc8, 0x52, 0x1a, 0x57,
|
||||
0x8e, 0x41, 0xea, 0xce, 0xc7, 0x31, 0xf2, 0x09, 0x28, 0x5f, 0x82, 0xe2, 0x92, 0x26, 0x65, 0xf4,
|
||||
0xd1, 0x91, 0xef, 0xbb, 0x17, 0x96, 0xd6, 0x50, 0x13, 0xec, 0xeb, 0xae, 0xd8, 0x7a, 0x17, 0x1c,
|
||||
0xa6, 0xe3, 0xbb, 0xca, 0x9f, 0x60, 0x71, 0x47, 0x75, 0x28, 0xf1, 0x0a, 0x56, 0xa5, 0x31, 0x64,
|
||||
0x5e, 0xa2, 0xb4, 0x76, 0x75, 0x2b, 0x98, 0x9c, 0xab, 0x44, 0x2f, 0x23, 0x7e, 0x1c, 0x28, 0x5e,
|
||||
0x02, 0xf7, 0x9b, 0x89, 0x35, 0x8b, 0x56, 0xea, 0x37, 0x07, 0x25, 0xff, 0x9c, 0xc1, 0xf9, 0xfd,
|
||||
0x5f, 0x08, 0x2a, 0x58, 0x0d, 0xf2, 0x33, 0xf9, 0xd6, 0xba, 0x46, 0x2c, 0x24, 0x53, 0x89, 0xfe,
|
||||
0x8f, 0x31, 0x07, 0x5e, 0xbe, 0x51, 0x13, 0x06, 0xed, 0x34, 0x6a, 0x47, 0x0c, 0x2f, 0x20, 0xd9,
|
||||
0xbe, 0x57, 0xb5, 0x5d, 0x3f, 0xd0, 0x4e, 0x30, 0xc9, 0x14, 0xd7, 0x23, 0x40, 0x09, 0x69, 0x6d,
|
||||
0xdb, 0x40, 0xcd, 0x8d, 0x31, 0x7e, 0x7f, 0x1a, 0xd7, 0x53, 0xd4, 0x67, 0xb8, 0xaa, 0x25, 0xff,
|
||||
0x41, 0xa6, 0x07, 0xe2, 0x24, 0x7e, 0x71, 0xc4, 0x62, 0xc6, 0xa1, 0xde, 0x3c, 0xd6, 0x1b, 0x01,
|
||||
0x2a, 0x58, 0x98, 0x7e, 0x31, 0x71, 0x26, 0x99, 0x4a, 0xaf, 0xb1, 0x98, 0xcc, 0x59, 0xc4, 0x2d,
|
||||
0xf5, 0x5e, 0xb8, 0xe5, 0x5f, 0x5d, 0xc6, 0xbe, 0xbb, 0x8c, 0xfd, 0x74, 0x19, 0xfb, 0x0d, 0x00,
|
||||
0x00, 0xff, 0xff, 0xa1, 0x77, 0x03, 0x42, 0x87, 0x01, 0x00, 0x00,
|
||||
// 272 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4a, 0xc3, 0x40,
|
||||
0x14, 0x86, 0x99, 0xd6, 0x6a, 0xf3, 0x12, 0x5a, 0x99, 0xd5, 0x2c, 0x24, 0xc4, 0x6c, 0x1c, 0x5c,
|
||||
0x64, 0xe1, 0x0d, 0x14, 0x37, 0xe2, 0xa6, 0x8c, 0xe0, 0x56, 0x92, 0xbc, 0x67, 0x19, 0x48, 0x33,
|
||||
0x65, 0x32, 0x0a, 0xbd, 0x95, 0xc7, 0x70, 0xe9, 0x11, 0x24, 0x27, 0x91, 0x4c, 0x4d, 0x93, 0xea,
|
||||
0x72, 0x3e, 0x3e, 0xe6, 0x7f, 0xff, 0x0f, 0x0b, 0x8d, 0x54, 0x3b, 0xfd, 0xba, 0xcb, 0xb6, 0xd6,
|
||||
0x38, 0xc3, 0xc3, 0xe1, 0x5d, 0xa4, 0x4f, 0x30, 0xbb, 0xa7, 0xca, 0xe5, 0xfc, 0x0a, 0x96, 0x39,
|
||||
0x22, 0xe1, 0x8b, 0x97, 0x4a, 0x53, 0x35, 0x82, 0x25, 0x53, 0x19, 0xa8, 0x85, 0xc7, 0xab, 0x9e,
|
||||
0xf2, 0x4b, 0x88, 0xec, 0x66, 0x64, 0x4d, 0xbc, 0x15, 0xda, 0xcd, 0x41, 0x49, 0x3f, 0x26, 0x30,
|
||||
0x7f, 0xf8, 0x0d, 0xe1, 0x12, 0x96, 0xbd, 0xfc, 0x4c, 0xb6, 0xd1, 0xa6, 0x16, 0xb3, 0x84, 0xc9,
|
||||
0x40, 0xfd, 0xc5, 0x3c, 0x85, 0x28, 0x5f, 0x53, 0xed, 0x7a, 0xed, 0xd4, 0x6b, 0x47, 0x8c, 0x5f,
|
||||
0x40, 0xb0, 0x7d, 0x2b, 0x2a, 0x5d, 0x3e, 0xd2, 0x4e, 0xb0, 0x84, 0xc9, 0x48, 0x0d, 0x80, 0x27,
|
||||
0x10, 0x56, 0xba, 0x71, 0x54, 0xdf, 0x22, 0xda, 0xfd, 0x69, 0x91, 0x1a, 0xa3, 0x2e, 0xc3, 0x14,
|
||||
0x0d, 0xd9, 0x77, 0xc2, 0x0e, 0x88, 0x13, 0xff, 0xc5, 0x11, 0xf3, 0x19, 0x87, 0x7a, 0x53, 0x5f,
|
||||
0x6f, 0x00, 0x5c, 0xc2, 0x0c, 0xbb, 0xc5, 0xc4, 0x59, 0xc2, 0x64, 0x78, 0xc3, 0xb3, 0xd1, 0x9c,
|
||||
0x99, 0xdf, 0x52, 0xed, 0x05, 0x7e, 0x0d, 0xe7, 0x8d, 0x5e, 0xd7, 0x84, 0x2b, 0x22, 0xab, 0xa8,
|
||||
0x34, 0x16, 0xc5, 0xdc, 0xe7, 0xfd, 0xe3, 0x77, 0xd1, 0x67, 0x1b, 0xb3, 0xaf, 0x36, 0x66, 0xdf,
|
||||
0x6d, 0xcc, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc0, 0x03, 0xc8, 0x41, 0xb3, 0x01, 0x00, 0x00,
|
||||
}
|
||||
|
||||
func (m *Delta) Marshal() (dAtA []byte, err error) {
|
||||
|
@ -282,6 +296,13 @@ func (m *Identify) MarshalToSizedBuffer(dAtA []byte) (int, error) {
|
|||
i -= len(m.XXX_unrecognized)
|
||||
copy(dAtA[i:], m.XXX_unrecognized)
|
||||
}
|
||||
if m.SignedPeerRecord != nil {
|
||||
i -= len(m.SignedPeerRecord)
|
||||
copy(dAtA[i:], m.SignedPeerRecord)
|
||||
i = encodeVarintIdentify(dAtA, i, uint64(len(m.SignedPeerRecord)))
|
||||
i--
|
||||
dAtA[i] = 0x42
|
||||
}
|
||||
if m.Delta != nil {
|
||||
{
|
||||
size, err := m.Delta.MarshalToSizedBuffer(dAtA[:i])
|
||||
|
@ -416,6 +437,10 @@ func (m *Identify) Size() (n int) {
|
|||
l = m.Delta.Size()
|
||||
n += 1 + l + sovIdentify(uint64(l))
|
||||
}
|
||||
if m.SignedPeerRecord != nil {
|
||||
l = len(m.SignedPeerRecord)
|
||||
n += 1 + l + sovIdentify(uint64(l))
|
||||
}
|
||||
if m.XXX_unrecognized != nil {
|
||||
n += len(m.XXX_unrecognized)
|
||||
}
|
||||
|
@ -809,6 +834,40 @@ func (m *Identify) Unmarshal(dAtA []byte) error {
|
|||
return err
|
||||
}
|
||||
iNdEx = postIndex
|
||||
case 8:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field SignedPeerRecord", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowIdentify
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= int(b&0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthIdentify
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex < 0 {
|
||||
return ErrInvalidLengthIdentify
|
||||
}
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.SignedPeerRecord = append(m.SignedPeerRecord[:0], dAtA[iNdEx:postIndex]...)
|
||||
if m.SignedPeerRecord == nil {
|
||||
m.SignedPeerRecord = []byte{}
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipIdentify(dAtA[iNdEx:])
|
||||
|
@ -891,9 +950,6 @@ func skipIdentify(dAtA []byte) (n int, err error) {
|
|||
return 0, ErrInvalidLengthIdentify
|
||||
}
|
||||
iNdEx += length
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthIdentify
|
||||
}
|
||||
case 3:
|
||||
depth++
|
||||
case 4:
|
||||
|
@ -906,6 +962,9 @@ func skipIdentify(dAtA []byte) (n int, err error) {
|
|||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
if iNdEx < 0 {
|
||||
return 0, ErrInvalidLengthIdentify
|
||||
}
|
||||
if depth == 0 {
|
||||
return iNdEx, nil
|
||||
}
|
||||
|
|
|
@ -36,4 +36,11 @@ message Identify {
|
|||
|
||||
// a delta update is incompatible with everything else. If this field is included, none of the others can appear.
|
||||
optional Delta delta = 7;
|
||||
|
||||
// signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord,
|
||||
// signed by the sending node. It contains the same addresses as the listenAddrs field, but
|
||||
// in a form that lets us share authenticated addrs with other peers.
|
||||
// see github.com/libp2p/go-libp2p-core/record/pb/envelope.proto and
|
||||
// github.com/libp2p/go-libp2p-core/peer/pb/peer_record.proto for message definitions.
|
||||
optional bytes signedPeerRecord = 8;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue