package identify import ( "bytes" "context" "errors" "fmt" "io" "sync" "time" "golang.org/x/exp/slices" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-msgio/pbio" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" msmux "github.com/multiformats/go-multistream" "google.golang.org/protobuf/proto" ) //go:generate protoc --proto_path=$PWD:$PWD/../../.. --go_out=. --go_opt=Mpb/identify.proto=./pb pb/identify.proto var log = logging.Logger("net/identify") var Timeout = 30 * time.Second // timeout on all incoming Identify interactions const ( // ID is the protocol.ID of version 1.0.0 of the identify service. ID = "/ipfs/id/1.0.0" // IDPush is the protocol.ID of the Identify push protocol. // It sends full identify messages containing the current state of the peer. IDPush = "/ipfs/id/push/1.0.0" ServiceName = "libp2p.identify" legacyIDSize = 2 * 1024 signedIDSize = 8 * 1024 maxOwnIdentifyMsgSize = 4 * 1024 // smaller than what we accept. This is 4k to be compatible with rust-libp2p maxMessages = 10 maxPushConcurrency = 32 // number of addresses to keep for peers we have disconnected from for peerstore.RecentlyConnectedTTL time // This number can be small as we already filter peer addresses based on whether the peer is connected to us over // localhost, private IP or public IP address recentlyConnectedPeerMaxAddrs = 20 connectedPeerMaxAddrs = 500 ) var defaultUserAgent = "github.com/libp2p/go-libp2p" type identifySnapshot struct { seq uint64 protocols []protocol.ID addrs []ma.Multiaddr record *record.Envelope } // Equal says if two snapshots are identical. // It does NOT compare the sequence number. func (s identifySnapshot) Equal(other *identifySnapshot) bool { hasRecord := s.record != nil otherHasRecord := other.record != nil if hasRecord != otherHasRecord { return false } if hasRecord && !s.record.Equal(other.record) { return false } if !slices.Equal(s.protocols, other.protocols) { return false } if len(s.addrs) != len(other.addrs) { return false } for i, a := range s.addrs { if !a.Equal(other.addrs[i]) { return false } } return true } type IDService interface { // IdentifyConn synchronously triggers an identify request on the connection and // waits for it to complete. If the connection is being identified by another // caller, this call will wait. If the connection has already been identified, // it will return immediately. IdentifyConn(network.Conn) // IdentifyWait triggers an identify (if the connection has not already been // identified) and returns a channel that is closed when the identify protocol // completes. IdentifyWait(network.Conn) <-chan struct{} // OwnObservedAddrs returns the addresses peers have reported we've dialed from OwnObservedAddrs() []ma.Multiaddr // ObservedAddrsFor returns the addresses peers have reported we've dialed from, // for a specific local address. ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr Start() io.Closer } type identifyPushSupport uint8 const ( identifyPushSupportUnknown identifyPushSupport = iota identifyPushSupported identifyPushUnsupported ) type entry struct { // The IdentifyWaitChan is created when IdentifyWait is called for the first time. // IdentifyWait closes this channel when the Identify request completes, or when it fails. IdentifyWaitChan chan struct{} // PushSupport saves our knowledge about the peer's support of the Identify Push protocol. // Before the identify request returns, we don't know yet if the peer supports Identify Push. PushSupport identifyPushSupport // Sequence is the sequence number of the last snapshot we sent to this peer. Sequence uint64 } // idService is a structure that implements ProtocolIdentify. // It is a trivial service that gives the other peer some // useful information about the local peer. A sort of hello. // // The idService sends: // - Our libp2p Protocol Version // - Our libp2p Agent Version // - Our public Listen Addresses type idService struct { Host host.Host UserAgent string ProtocolVersion string metricsTracer MetricsTracer setupCompleted chan struct{} // is closed when Start has finished setting up ctx context.Context ctxCancel context.CancelFunc // track resources that need to be shut down before we shut down refCount sync.WaitGroup disableSignedPeerRecord bool connsMu sync.RWMutex // The conns map contains all connections we're currently handling. // Connections are inserted as soon as they're available in the swarm // Connections are removed from the map when the connection disconnects. conns map[network.Conn]entry addrMu sync.Mutex // our own observed addresses. observedAddrMgr *ObservedAddrManager disableObservedAddrManager bool emitters struct { evtPeerProtocolsUpdated event.Emitter evtPeerIdentificationCompleted event.Emitter evtPeerIdentificationFailed event.Emitter } currentSnapshot struct { sync.Mutex snapshot identifySnapshot } natEmitter *natEmitter } type normalizer interface { NormalizeMultiaddr(ma.Multiaddr) ma.Multiaddr } // NewIDService constructs a new *idService and activates it by // attaching its stream handler to the given host.Host. func NewIDService(h host.Host, opts ...Option) (*idService, error) { var cfg config for _, opt := range opts { opt(&cfg) } userAgent := defaultUserAgent if cfg.userAgent != "" { userAgent = cfg.userAgent } ctx, cancel := context.WithCancel(context.Background()) s := &idService{ Host: h, UserAgent: userAgent, ProtocolVersion: cfg.protocolVersion, ctx: ctx, ctxCancel: cancel, conns: make(map[network.Conn]entry), disableSignedPeerRecord: cfg.disableSignedPeerRecord, setupCompleted: make(chan struct{}), metricsTracer: cfg.metricsTracer, } var normalize func(ma.Multiaddr) ma.Multiaddr if hn, ok := h.(normalizer); ok { normalize = hn.NormalizeMultiaddr } var err error if cfg.disableObservedAddrManager { s.disableObservedAddrManager = true } else { observedAddrs, err := NewObservedAddrManager(h.Network().ListenAddresses, h.Addrs, h.Network().InterfaceListenAddresses, normalize) if err != nil { return nil, fmt.Errorf("failed to create observed address manager: %s", err) } natEmitter, err := newNATEmitter(h, observedAddrs, time.Minute) if err != nil { return nil, fmt.Errorf("failed to create nat emitter: %s", err) } s.natEmitter = natEmitter s.observedAddrMgr = observedAddrs } s.emitters.evtPeerProtocolsUpdated, err = h.EventBus().Emitter(&event.EvtPeerProtocolsUpdated{}) if err != nil { log.Warnf("identify service not emitting peer protocol updates; err: %s", err) } s.emitters.evtPeerIdentificationCompleted, err = h.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{}) if err != nil { log.Warnf("identify service not emitting identification completed events; err: %s", err) } s.emitters.evtPeerIdentificationFailed, err = h.EventBus().Emitter(&event.EvtPeerIdentificationFailed{}) if err != nil { log.Warnf("identify service not emitting identification failed events; err: %s", err) } return s, nil } func (ids *idService) Start() { ids.Host.Network().Notify((*netNotifiee)(ids)) ids.Host.SetStreamHandler(ID, ids.handleIdentifyRequest) ids.Host.SetStreamHandler(IDPush, ids.handlePush) ids.updateSnapshot() close(ids.setupCompleted) ids.refCount.Add(1) go ids.loop(ids.ctx) } func (ids *idService) loop(ctx context.Context) { defer ids.refCount.Done() sub, err := ids.Host.EventBus().Subscribe( []any{&event.EvtLocalProtocolsUpdated{}, &event.EvtLocalAddressesUpdated{}}, eventbus.BufSize(256), eventbus.Name("identify (loop)"), ) if err != nil { log.Errorf("failed to subscribe to events on the bus, err=%s", err) return } defer sub.Close() // Send pushes from a separate Go routine. // That way, we can end up with // * this Go routine busy looping over all peers in sendPushes // * another push being queued in the triggerPush channel triggerPush := make(chan struct{}, 1) ids.refCount.Add(1) go func() { defer ids.refCount.Done() for { select { case <-ctx.Done(): return case <-triggerPush: ids.sendPushes(ctx) } } }() for { select { case e, ok := <-sub.Out(): if !ok { return } if updated := ids.updateSnapshot(); !updated { continue } if ids.metricsTracer != nil { ids.metricsTracer.TriggeredPushes(e) } select { case triggerPush <- struct{}{}: default: // we already have one more push queued, no need to queue another one } case <-ctx.Done(): return } } } func (ids *idService) sendPushes(ctx context.Context) { ids.connsMu.RLock() conns := make([]network.Conn, 0, len(ids.conns)) for c, e := range ids.conns { // Push even if we don't know if push is supported. // This will be only the case while the IdentifyWaitChan call is in flight. if e.PushSupport == identifyPushSupported || e.PushSupport == identifyPushSupportUnknown { conns = append(conns, c) } } ids.connsMu.RUnlock() sem := make(chan struct{}, maxPushConcurrency) var wg sync.WaitGroup for _, c := range conns { // check if the connection is still alive ids.connsMu.RLock() e, ok := ids.conns[c] ids.connsMu.RUnlock() if !ok { continue } // check if we already sent the current snapshot to this peer ids.currentSnapshot.Lock() snapshot := ids.currentSnapshot.snapshot ids.currentSnapshot.Unlock() if e.Sequence >= snapshot.seq { log.Debugw("already sent this snapshot to peer", "peer", c.RemotePeer(), "seq", snapshot.seq) continue } // we haven't, send it now sem <- struct{}{} wg.Add(1) go func(c network.Conn) { defer wg.Done() defer func() { <-sem }() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() str, err := ids.Host.NewStream(ctx, c.RemotePeer(), IDPush) if err != nil { // connection might have been closed recently return } // TODO: find out if the peer supports push if we didn't have any information about push support if err := ids.sendIdentifyResp(str, true); err != nil { log.Debugw("failed to send identify push", "peer", c.RemotePeer(), "error", err) return } }(c) } wg.Wait() } // Close shuts down the idService func (ids *idService) Close() error { ids.ctxCancel() if !ids.disableObservedAddrManager { ids.observedAddrMgr.Close() ids.natEmitter.Close() } ids.refCount.Wait() return nil } func (ids *idService) OwnObservedAddrs() []ma.Multiaddr { if ids.disableObservedAddrManager { return nil } return ids.observedAddrMgr.Addrs() } func (ids *idService) ObservedAddrsFor(local ma.Multiaddr) []ma.Multiaddr { if ids.disableObservedAddrManager { return nil } return ids.observedAddrMgr.AddrsFor(local) } // IdentifyConn runs the Identify protocol on a connection. // It returns when we've received the peer's Identify message (or the request fails). // If successful, the peer store will contain the peer's addresses and supported protocols. func (ids *idService) IdentifyConn(c network.Conn) { <-ids.IdentifyWait(c) } // IdentifyWait runs the Identify protocol on a connection. // It doesn't block and returns a channel that is closed when we receive // the peer's Identify message (or the request fails). // If successful, the peer store will contain the peer's addresses and supported protocols. func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} { ids.connsMu.Lock() defer ids.connsMu.Unlock() e, found := ids.conns[c] if !found { // No entry found. We may have gotten an out of order notification. Check it we should have this conn (because we're still connected) // We hold the ids.connsMu lock so this is safe since a disconnect event will be processed later if we are connected. if c.IsClosed() { log.Debugw("connection not found in identify service", "peer", c.RemotePeer()) ch := make(chan struct{}) close(ch) return ch } else { ids.addConnWithLock(c) } } if e.IdentifyWaitChan != nil { return e.IdentifyWaitChan } // First call to IdentifyWait for this connection. Create the channel. e.IdentifyWaitChan = make(chan struct{}) ids.conns[c] = e // Spawn an identify. The connection may actually be closed // already, but that doesn't really matter. We'll fail to open a // stream then forget the connection. go func() { defer close(e.IdentifyWaitChan) if err := ids.identifyConn(c); err != nil { log.Warnf("failed to identify %s: %s", c.RemotePeer(), err) ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: c.RemotePeer(), Reason: err}) return } }() return e.IdentifyWaitChan } func (ids *idService) identifyConn(c network.Conn) error { ctx, cancel := context.WithTimeout(context.Background(), Timeout) defer cancel() s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify")) if err != nil { log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err) return err } s.SetDeadline(time.Now().Add(Timeout)) if err := s.SetProtocol(ID); err != nil { log.Warnf("error setting identify protocol for stream: %s", err) s.Reset() } // ok give the response to our handler. if err := msmux.SelectProtoOrFail(ID, s); err != nil { log.Infow("failed negotiate identify protocol with peer", "peer", c.RemotePeer(), "error", err) s.Reset() return err } return ids.handleIdentifyResponse(s, false) } // handlePush handles incoming identify push streams func (ids *idService) handlePush(s network.Stream) { s.SetDeadline(time.Now().Add(Timeout)) ids.handleIdentifyResponse(s, true) } func (ids *idService) handleIdentifyRequest(s network.Stream) { _ = ids.sendIdentifyResp(s, false) } func (ids *idService) sendIdentifyResp(s network.Stream, isPush bool) error { if err := s.Scope().SetService(ServiceName); err != nil { s.Reset() return fmt.Errorf("failed to attaching stream to identify service: %w", err) } defer s.Close() ids.currentSnapshot.Lock() snapshot := ids.currentSnapshot.snapshot ids.currentSnapshot.Unlock() log.Debugw("sending snapshot", "seq", snapshot.seq, "protocols", snapshot.protocols, "addrs", snapshot.addrs) mes := ids.createBaseIdentifyResponse(s.Conn(), &snapshot) mes.SignedPeerRecord = ids.getSignedRecord(&snapshot) log.Debugf("%s sending message to %s %s", ID, s.Conn().RemotePeer(), s.Conn().RemoteMultiaddr()) if err := ids.writeChunkedIdentifyMsg(s, mes); err != nil { return err } if ids.metricsTracer != nil { ids.metricsTracer.IdentifySent(isPush, len(mes.Protocols), len(mes.ListenAddrs)) } ids.connsMu.Lock() defer ids.connsMu.Unlock() e, ok := ids.conns[s.Conn()] // The connection might already have been closed. // We *should* receive the Connected notification from the swarm before we're able to accept the peer's // Identify stream, but if that for some reason doesn't work, we also wouldn't have a map entry here. // The only consequence would be that we send a spurious Push to that peer later. if !ok { return nil } e.Sequence = snapshot.seq ids.conns[s.Conn()] = e return nil } func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) error { if err := s.Scope().SetService(ServiceName); err != nil { log.Warnf("error attaching stream to identify service: %s", err) s.Reset() return err } if err := s.Scope().ReserveMemory(signedIDSize, network.ReservationPriorityAlways); err != nil { log.Warnf("error reserving memory for identify stream: %s", err) s.Reset() return err } defer s.Scope().ReleaseMemory(signedIDSize) c := s.Conn() r := pbio.NewDelimitedReader(s, signedIDSize) mes := &pb.Identify{} if err := readAllIDMessages(r, mes); err != nil { log.Warn("error reading identify message: ", err) s.Reset() return err } defer s.Close() log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr()) ids.consumeMessage(mes, c, isPush) if ids.metricsTracer != nil { ids.metricsTracer.IdentifyReceived(isPush, len(mes.Protocols), len(mes.ListenAddrs)) } ids.connsMu.Lock() defer ids.connsMu.Unlock() e, ok := ids.conns[c] if !ok { // might already have disconnected return nil } sup, err := ids.Host.Peerstore().SupportsProtocols(c.RemotePeer(), IDPush) if supportsIdentifyPush := err == nil && len(sup) > 0; supportsIdentifyPush { e.PushSupport = identifyPushSupported } else { e.PushSupport = identifyPushUnsupported } if ids.metricsTracer != nil { ids.metricsTracer.ConnPushSupport(e.PushSupport) } ids.conns[c] = e return nil } func readAllIDMessages(r pbio.Reader, finalMsg proto.Message) error { mes := &pb.Identify{} for i := 0; i < maxMessages; i++ { switch err := r.ReadMsg(mes); err { case io.EOF: return nil case nil: proto.Merge(finalMsg, mes) default: return err } } return fmt.Errorf("too many parts") } func (ids *idService) updateSnapshot() (updated bool) { protos := ids.Host.Mux().Protocols() slices.Sort(protos) addrs := ids.Host.Addrs() slices.SortFunc(addrs, func(a, b ma.Multiaddr) int { return bytes.Compare(a.Bytes(), b.Bytes()) }) usedSpace := len(ids.ProtocolVersion) + len(ids.UserAgent) for i := 0; i < len(protos); i++ { usedSpace += len(protos[i]) } addrs = trimHostAddrList(addrs, maxOwnIdentifyMsgSize-usedSpace-256) // 256 bytes of buffer snapshot := identifySnapshot{ addrs: addrs, protocols: protos, } if !ids.disableSignedPeerRecord { if cab, ok := peerstore.GetCertifiedAddrBook(ids.Host.Peerstore()); ok { snapshot.record = cab.GetPeerRecord(ids.Host.ID()) } } ids.currentSnapshot.Lock() defer ids.currentSnapshot.Unlock() if ids.currentSnapshot.snapshot.Equal(&snapshot) { return false } snapshot.seq = ids.currentSnapshot.snapshot.seq + 1 ids.currentSnapshot.snapshot = snapshot log.Debugw("updating snapshot", "seq", snapshot.seq, "addrs", snapshot.addrs) return true } func (ids *idService) writeChunkedIdentifyMsg(s network.Stream, mes *pb.Identify) error { writer := pbio.NewDelimitedWriter(s) if mes.SignedPeerRecord == nil || proto.Size(mes) <= legacyIDSize { return writer.WriteMsg(mes) } sr := mes.SignedPeerRecord mes.SignedPeerRecord = nil if err := writer.WriteMsg(mes); err != nil { return err } // then write just the signed record return writer.WriteMsg(&pb.Identify{SignedPeerRecord: sr}) } func (ids *idService) createBaseIdentifyResponse(conn network.Conn, snapshot *identifySnapshot) *pb.Identify { mes := &pb.Identify{} remoteAddr := conn.RemoteMultiaddr() localAddr := conn.LocalMultiaddr() // set protocols this node is currently handling mes.Protocols = protocol.ConvertToStrings(snapshot.protocols) // observed address so other side is informed of their // "public" address, at least in relation to us. mes.ObservedAddr = remoteAddr.Bytes() // populate unsigned addresses. // peers that do not yet support signed addresses will need this. // Note: LocalMultiaddr is sometimes 0.0.0.0 viaLoopback := manet.IsIPLoopback(localAddr) || manet.IsIPLoopback(remoteAddr) mes.ListenAddrs = make([][]byte, 0, len(snapshot.addrs)) for _, addr := range snapshot.addrs { if !viaLoopback && manet.IsIPLoopback(addr) { continue } mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes()) } // set our public key ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID()) // check if we even have a public key. if ownKey == nil { // public key is nil. We are either using insecure transport or something erratic happened. // check if we're even operating in "secure mode" if ids.Host.Peerstore().PrivKey(ids.Host.ID()) != nil { // private key is present. But NO public key. Something bad happened. log.Errorf("did not have own public key in Peerstore") } // if neither of the key is present it is safe to assume that we are using an insecure transport. } else { // public key is present. Safe to proceed. if kb, err := crypto.MarshalPublicKey(ownKey); err != nil { log.Errorf("failed to convert key to bytes") } else { mes.PublicKey = kb } } // set protocol versions mes.ProtocolVersion = &ids.ProtocolVersion mes.AgentVersion = &ids.UserAgent return mes } func (ids *idService) getSignedRecord(snapshot *identifySnapshot) []byte { if ids.disableSignedPeerRecord || snapshot.record == nil { return nil } recBytes, err := snapshot.record.Marshal() if err != nil { log.Errorw("failed to marshal signed record", "err", err) return nil } return recBytes } // diff takes two slices of strings (a and b) and computes which elements were added and removed in b func diff(a, b []protocol.ID) (added, removed []protocol.ID) { // This is O(n^2), but it's fine because the slices are small. for _, x := range b { var found bool for _, y := range a { if x == y { found = true break } } if !found { added = append(added, x) } } for _, x := range a { var found bool for _, y := range b { if x == y { found = true break } } if !found { removed = append(removed, x) } } return } func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bool) { p := c.RemotePeer() supported, _ := ids.Host.Peerstore().GetProtocols(p) mesProtocols := protocol.ConvertFromStrings(mes.Protocols) added, removed := diff(supported, mesProtocols) ids.Host.Peerstore().SetProtocols(p, mesProtocols...) if isPush { ids.emitters.evtPeerProtocolsUpdated.Emit(event.EvtPeerProtocolsUpdated{ Peer: p, Added: added, Removed: removed, }) } obsAddr, err := ma.NewMultiaddrBytes(mes.GetObservedAddr()) if err != nil { log.Debugf("error parsing received observed addr for %s: %s", c, err) obsAddr = nil } if obsAddr != nil && !ids.disableObservedAddrManager { // TODO refactor this to use the emitted events instead of having this func call explicitly. ids.observedAddrMgr.Record(c, obsAddr) } // mes.ListenAddrs laddrs := mes.GetListenAddrs() lmaddrs := make([]ma.Multiaddr, 0, len(laddrs)) for _, addr := range laddrs { maddr, err := ma.NewMultiaddrBytes(addr) if err != nil { log.Debugf("%s failed to parse multiaddr from %s %s", ID, p, c.RemoteMultiaddr()) continue } lmaddrs = append(lmaddrs, maddr) } // NOTE: Do not add `c.RemoteMultiaddr()` to the peerstore if the remote // peer doesn't tell us to do so. Otherwise, we'll advertise it. // // This can cause an "addr-splosion" issue where the network will slowly // gossip and collect observed but unadvertised addresses. Given a NAT // that picks random source ports, this can cause DHT nodes to collect // many undialable addresses for other peers. // add certified addresses for the peer, if they sent us a signed peer record // otherwise use the unsigned addresses. signedPeerRecord, err := signedPeerRecordFromMessage(mes) if err != nil { log.Errorf("error getting peer record from Identify message: %v", err) } // Extend the TTLs on the known (probably) good addresses. // Taking the lock ensures that we don't concurrently process a disconnect. ids.addrMu.Lock() ttl := peerstore.RecentlyConnectedAddrTTL switch ids.Host.Network().Connectedness(p) { case network.Limited, network.Connected: ttl = peerstore.ConnectedAddrTTL } // Downgrade connected and recently connected addrs to a temporary TTL. for _, ttl := range []time.Duration{ peerstore.RecentlyConnectedAddrTTL, peerstore.ConnectedAddrTTL, } { ids.Host.Peerstore().UpdateAddrs(p, ttl, peerstore.TempAddrTTL) } var addrs []ma.Multiaddr if signedPeerRecord != nil { signedAddrs, err := ids.consumeSignedPeerRecord(c.RemotePeer(), signedPeerRecord) if err != nil { log.Debugf("failed to consume signed peer record: %s", err) signedPeerRecord = nil } else { addrs = signedAddrs } } else { addrs = lmaddrs } addrs = filterAddrs(addrs, c.RemoteMultiaddr()) if len(addrs) > connectedPeerMaxAddrs { addrs = addrs[:connectedPeerMaxAddrs] } ids.Host.Peerstore().AddAddrs(p, addrs, ttl) // Finally, expire all temporary addrs. ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0) ids.addrMu.Unlock() log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), addrs) // get protocol versions pv := mes.GetProtocolVersion() av := mes.GetAgentVersion() ids.Host.Peerstore().Put(p, "ProtocolVersion", pv) ids.Host.Peerstore().Put(p, "AgentVersion", av) // get the key from the other side. we may not have it (no-auth transport) ids.consumeReceivedPubKey(c, mes.PublicKey) ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{ Peer: c.RemotePeer(), Conn: c, ListenAddrs: lmaddrs, Protocols: mesProtocols, SignedPeerRecord: signedPeerRecord, ObservedAddr: obsAddr, ProtocolVersion: pv, AgentVersion: av, }) } func (ids *idService) consumeSignedPeerRecord(p peer.ID, signedPeerRecord *record.Envelope) ([]ma.Multiaddr, error) { if signedPeerRecord.PublicKey == nil { return nil, errors.New("missing pubkey") } id, err := peer.IDFromPublicKey(signedPeerRecord.PublicKey) if err != nil { return nil, fmt.Errorf("failed to derive peer ID: %s", err) } if id != p { return nil, fmt.Errorf("received signed peer record envelope for unexpected peer ID. expected %s, got %s", p, id) } r, err := signedPeerRecord.Record() if err != nil { return nil, fmt.Errorf("failed to obtain record: %w", err) } rec, ok := r.(*peer.PeerRecord) if !ok { return nil, errors.New("not a peer record") } if rec.PeerID != p { return nil, fmt.Errorf("received signed peer record for unexpected peer ID. expected %s, got %s", p, rec.PeerID) } // Don't put the signed peer record into the peer store. // They're not used anywhere. // All we care about are the addresses. return rec.Addrs, nil } func (ids *idService) consumeReceivedPubKey(c network.Conn, kb []byte) { lp := c.LocalPeer() rp := c.RemotePeer() if kb == nil { log.Debugf("%s did not receive public key for remote peer: %s", lp, rp) return } newKey, err := crypto.UnmarshalPublicKey(kb) if err != nil { log.Warnf("%s cannot unmarshal key from remote peer: %s, %s", lp, rp, err) return } // verify key matches peer.ID np, err := peer.IDFromPublicKey(newKey) if err != nil { log.Debugf("%s cannot get peer.ID from key of remote peer: %s, %s", lp, rp, err) return } if np != rp { // if the newKey's peer.ID does not match known peer.ID... if rp == "" && np != "" { // if local peerid is empty, then use the new, sent key. err := ids.Host.Peerstore().AddPubKey(rp, newKey) if err != nil { log.Debugf("%s could not add key for %s to peerstore: %s", lp, rp, err) } } else { // we have a local peer.ID and it does not match the sent key... error. log.Errorf("%s received key for remote peer %s mismatch: %s", lp, rp, np) } return } currKey := ids.Host.Peerstore().PubKey(rp) if currKey == nil { // no key? no auth transport. set this one. err := ids.Host.Peerstore().AddPubKey(rp, newKey) if err != nil { log.Debugf("%s could not add key for %s to peerstore: %s", lp, rp, err) } return } // ok, we have a local key, we should verify they match. if currKey.Equals(newKey) { return // ok great. we're done. } // weird, got a different key... but the different key MATCHES the peer.ID. // this odd. let's log error and investigate. this should basically never happen // and it means we have something funky going on and possibly a bug. log.Errorf("%s identify got a different key for: %s", lp, rp) // okay... does ours NOT match the remote peer.ID? cp, err := peer.IDFromPublicKey(currKey) if err != nil { log.Errorf("%s cannot get peer.ID from local key of remote peer: %s, %s", lp, rp, err) return } if cp != rp { log.Errorf("%s local key for remote peer %s yields different peer.ID: %s", lp, rp, cp) return } // okay... curr key DOES NOT match new key. both match peer.ID. wat? log.Errorf("%s local key and received key for %s do not match, but match peer.ID", lp, rp) } // HasConsistentTransport returns true if the address 'a' shares a // protocol set with any address in the green set. This is used // to check if a given address might be one of the addresses a peer is // listening on. func HasConsistentTransport(a ma.Multiaddr, green []ma.Multiaddr) bool { protosMatch := func(a, b []ma.Protocol) bool { if len(a) != len(b) { return false } for i, p := range a { if b[i].Code != p.Code { return false } } return true } protos := a.Protocols() for _, ga := range green { if protosMatch(protos, ga.Protocols()) { return true } } return false } // addConnWithLock assuems caller holds the connsMu lock func (ids *idService) addConnWithLock(c network.Conn) { _, found := ids.conns[c] if !found { <-ids.setupCompleted ids.conns[c] = entry{} } } func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) { if msg.SignedPeerRecord == nil || len(msg.SignedPeerRecord) == 0 { return nil, nil } env, _, err := record.ConsumeEnvelope(msg.SignedPeerRecord, peer.PeerRecordEnvelopeDomain) return env, err } // netNotifiee defines methods to be used with the swarm type netNotifiee idService func (nn *netNotifiee) IDService() *idService { return (*idService)(nn) } func (nn *netNotifiee) Connected(_ network.Network, c network.Conn) { ids := nn.IDService() ids.connsMu.Lock() ids.addConnWithLock(c) ids.connsMu.Unlock() nn.IDService().IdentifyWait(c) } func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { ids := nn.IDService() // Stop tracking the connection. ids.connsMu.Lock() delete(ids.conns, c) ids.connsMu.Unlock() if !ids.disableObservedAddrManager { ids.observedAddrMgr.removeConn(c) } // Last disconnect. // Undo the setting of addresses to peer.ConnectedAddrTTL we did ids.addrMu.Lock() defer ids.addrMu.Unlock() // This check MUST happen after acquiring the Lock as identify on a different connection // might be trying to add addresses. switch ids.Host.Network().Connectedness(c.RemotePeer()) { case network.Connected, network.Limited: return } // peerstore returns the elements in a random order as it uses a map to store the addresses addrs := ids.Host.Peerstore().Addrs(c.RemotePeer()) n := len(addrs) if n > recentlyConnectedPeerMaxAddrs { // We want to always save the address we are connected to for i, a := range addrs { if a.Equal(c.RemoteMultiaddr()) { addrs[i], addrs[0] = addrs[0], addrs[i] } } n = recentlyConnectedPeerMaxAddrs } ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.TempAddrTTL) ids.Host.Peerstore().AddAddrs(c.RemotePeer(), addrs[:n], peerstore.RecentlyConnectedAddrTTL) ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.TempAddrTTL, 0) } func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {} // filterAddrs filters the address slice based on the remote multiaddr: // - if it's a localhost address, no filtering is applied // - if it's a private network address, all localhost addresses are filtered out // - if it's a public address, all non-public addresses are filtered out // - if none of the above, (e.g. discard prefix), no filtering is applied. // We can't do anything meaningful here so we do nothing. func filterAddrs(addrs []ma.Multiaddr, remote ma.Multiaddr) []ma.Multiaddr { switch { case manet.IsIPLoopback(remote): return addrs case manet.IsPrivateAddr(remote): return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) }) case manet.IsPublicAddr(remote): return ma.FilterAddrs(addrs, manet.IsPublicAddr) default: return addrs } } func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr { totalSize := 0 for _, a := range addrs { totalSize += len(a.Bytes()) } if totalSize <= maxSize { return addrs } score := func(addr ma.Multiaddr) int { var res int if manet.IsPublicAddr(addr) { res |= 1 << 12 } else if !manet.IsIPLoopback(addr) { res |= 1 << 11 } var protocolWeight int ma.ForEach(addr, func(c ma.Component) bool { switch c.Protocol().Code { case ma.P_QUIC_V1: protocolWeight = 5 case ma.P_TCP: protocolWeight = 4 case ma.P_WSS: protocolWeight = 3 case ma.P_WEBTRANSPORT: protocolWeight = 2 case ma.P_WEBRTC_DIRECT: protocolWeight = 1 case ma.P_P2P: return false } return true }) res |= 1 << protocolWeight return res } slices.SortStableFunc(addrs, func(a, b ma.Multiaddr) int { return score(b) - score(a) // b-a for reverse order }) totalSize = 0 for i, a := range addrs { totalSize += len(a.Bytes()) if totalSize > maxSize { addrs = addrs[:i] break } } return addrs }