package discv5 import ( "context" "crypto/ecdsa" "fmt" "math" "math/rand" "net" "sync" "time" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/nat" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/waku/v2/utils" ) var log = logging.Logger("waku_discv5") type DiscoveryV5 struct { sync.Mutex discovery.Discovery params *discV5Parameters host host.Host config discover.Config udpAddr *net.UDPAddr listener *discover.UDPv5 localnode *enode.LocalNode NAT nat.Interface quit chan struct{} wg *sync.WaitGroup peerCache peerCache } type peerCache struct { sync.RWMutex recs map[peer.ID]peerRecord rng *rand.Rand } type peerRecord struct { expire int64 peer peer.AddrInfo } type discV5Parameters struct { autoUpdate bool bootnodes []*enode.Node udpPort int tcpPort int advertiseAddr *net.IP } type DiscoveryV5Option func(*discV5Parameters) func WithAutoUpdate(autoUpdate bool) DiscoveryV5Option { return func(params *discV5Parameters) { params.autoUpdate = autoUpdate } } func WithBootnodes(bootnodes []*enode.Node) DiscoveryV5Option { return func(params *discV5Parameters) { params.bootnodes = bootnodes } } func WithAdvertiseAddr(addr net.IP) DiscoveryV5Option { return func(params *discV5Parameters) { params.advertiseAddr = &addr } } func WithUDPPort(port int) DiscoveryV5Option { return func(params *discV5Parameters) { params.udpPort = port } } func DefaultOptions() []DiscoveryV5Option { return []DiscoveryV5Option{ WithUDPPort(9000), } } func NewDiscoveryV5(host host.Host, ipAddr net.IP, tcpPort int, priv *ecdsa.PrivateKey, wakuFlags utils.WakuEnrBitfield, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { opt(params) } params.tcpPort = tcpPort localnode, err := newLocalnode(priv, ipAddr, params.udpPort, tcpPort, wakuFlags, params.advertiseAddr) if err != nil { return nil, err } var NAT nat.Interface = nil if params.advertiseAddr == nil { NAT = nat.Any() } return &DiscoveryV5{ host: host, params: params, NAT: NAT, wg: &sync.WaitGroup{}, peerCache: peerCache{ rng: rand.New(rand.NewSource(rand.Int63())), recs: make(map[peer.ID]peerRecord), }, localnode: localnode, config: discover.Config{ PrivateKey: priv, Bootnodes: params.bootnodes, }, udpAddr: &net.UDPAddr{ IP: net.IPv4zero, Port: params.udpPort, }, }, nil } func newLocalnode(priv *ecdsa.PrivateKey, ipAddr net.IP, udpPort int, tcpPort int, wakuFlags utils.WakuEnrBitfield, advertiseAddr *net.IP) (*enode.LocalNode, error) { db, err := enode.OpenDB("") if err != nil { return nil, err } localnode := enode.NewLocalNode(db, priv) localnode.SetFallbackIP(net.IP{127, 0, 0, 1}) localnode.SetFallbackUDP(udpPort) localnode.Set(enr.WithEntry(utils.WakuENRField, wakuFlags)) localnode.Set(enr.IP(ipAddr)) if udpPort > 0 && udpPort <= math.MaxUint16 { localnode.Set(enr.UDP(uint16(udpPort))) // lgtm [go/incorrect-integer-conversion] } else { log.Error("could not set udpPort ", udpPort) } if tcpPort > 0 && tcpPort <= math.MaxUint16 { localnode.Set(enr.TCP(uint16(tcpPort))) // lgtm [go/incorrect-integer-conversion] } else { log.Error("could not set tcpPort ", tcpPort) } if advertiseAddr != nil { localnode.SetStaticIP(*advertiseAddr) } return localnode, nil } func (d *DiscoveryV5) listen() error { conn, err := net.ListenUDP("udp", d.udpAddr) if err != nil { return err } d.udpAddr = conn.LocalAddr().(*net.UDPAddr) if d.NAT != nil && !d.udpAddr.IP.IsLoopback() { d.wg.Add(1) go func() { defer d.wg.Done() nat.Map(d.NAT, d.quit, "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery") }() } d.localnode.SetFallbackUDP(d.udpAddr.Port) listener, err := discover.ListenV5(conn, d.localnode, d.config) if err != nil { return err } d.listener = listener log.Info(fmt.Sprintf("Started Discovery V5 at %s:%d, advertising IP: %s:%d", d.udpAddr.IP, d.udpAddr.Port, d.localnode.Node().IP(), d.params.tcpPort)) log.Info("Discovery V5 ", d.localnode.Node()) return nil } func (d *DiscoveryV5) Start() error { d.Lock() defer d.Unlock() d.wg.Wait() // Waiting for other go routines to stop d.quit = make(chan struct{}, 1) err := d.listen() if err != nil { return err } return nil } func (d *DiscoveryV5) Stop() { d.Lock() defer d.Unlock() close(d.quit) d.listener.Close() d.listener = nil log.Info("Stopped Discovery V5") d.wg.Wait() } // IsPrivate reports whether ip is a private address, according to // RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses). // Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go // Copyright (c) The Go Authors. All rights reserved. // @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language func IsPrivate(ip net.IP) bool { if ip4 := ip.To4(); ip4 != nil { // Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says: // The Internet Assigned Numbers Authority (IANA) has reserved the // following three blocks of the IPv4 address space for private internets: // 10.0.0.0 - 10.255.255.255 (10/8 prefix) // 172.16.0.0 - 172.31.255.255 (172.16/12 prefix) // 192.168.0.0 - 192.168.255.255 (192.168/16 prefix) return ip4[0] == 10 || (ip4[0] == 172 && ip4[1]&0xf0 == 16) || (ip4[0] == 192 && ip4[1] == 168) } // Following RFC 4193, Section 3. Private Address Space which says: // The Internet Assigned Numbers Authority (IANA) has reserved the // following block of the IPv6 address space for local internets: // FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix) return len(ip) == net.IPv6len && ip[0]&0xfe == 0xfc } func (d *DiscoveryV5) UpdateAddr(addr net.IP) error { if !d.params.autoUpdate { return nil } d.Lock() defer d.Unlock() if addr.IsUnspecified() || d.localnode.Node().IP().Equal(addr) { return nil } // TODO: improve this logic to determine if an address should be replaced or not if !(d.localnode.Node().IP().IsLoopback() && IsPrivate(addr)) && !(IsPrivate(d.localnode.Node().IP()) && !addr.IsLoopback() && !IsPrivate(addr)) { return nil } d.localnode.Set(enr.IP(addr)) log.Info(fmt.Sprintf("Updated Discovery V5 node IP: %s", d.localnode.Node().IP())) log.Info("Discovery V5 ", d.localnode.Node()) return nil } func isWakuNode(node *enode.Node) bool { enrField := new(utils.WakuEnrBitfield) if err := node.Record().Load(enr.WithEntry(utils.WakuENRField, &enrField)); err != nil { if !enr.IsNotFound(err) { log.Error("could not retrieve port for enr ", node) } return false } if enrField != nil { return *enrField != uint8(0) } return false } func hasTCPPort(node *enode.Node) bool { enrTCP := new(enr.TCP) if err := node.Record().Load(enr.WithEntry(enrTCP.ENRKey(), enrTCP)); err != nil { if !enr.IsNotFound(err) { log.Error("could not retrieve port for enr ", node) } return false } return true } func (d *DiscoveryV5) evaluateNode(node *enode.Node) bool { if node == nil || node.IP() == nil { return false } if !isWakuNode(node) || !hasTCPPort(node) { return false } _, err := utils.EnodeToPeerInfo(node) if err != nil { log.Error("could not obtain peer info from enode:", err) return false } return true } func (c *DiscoveryV5) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { // Get options var options discovery.Options err := options.Apply(opts...) if err != nil { return 0, err } // TODO: once discv5 spec introduces capability and topic discovery, implement this function return 20 * time.Minute, nil } func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limit int, doneCh chan struct{}) { defer d.wg.Done() for { if len(d.peerCache.recs) >= limit { break } if ctx.Err() != nil { break } exists := iterator.Next() if !exists { break } address, err := utils.EnodeToMultiAddr(iterator.Node()) if err != nil { log.Error(err) continue } peerInfo, err := peer.AddrInfoFromP2pAddr(address) if err != nil { log.Error(err) continue } d.peerCache.recs[peerInfo.ID] = peerRecord{ expire: time.Now().Unix() + 3600, // Expires in 1hr peer: *peerInfo, } } close(doneCh) } func (d *DiscoveryV5) removeExpiredPeers() int { // Remove all expired entries from cache currentTime := time.Now().Unix() newCacheSize := len(d.peerCache.recs) for p := range d.peerCache.recs { rec := d.peerCache.recs[p] if rec.expire < currentTime { newCacheSize-- delete(d.peerCache.recs, p) } } return newCacheSize } func (d *DiscoveryV5) FindPeers(ctx context.Context, topic string, opts ...discovery.Option) (<-chan peer.AddrInfo, error) { // Get options var options discovery.Options err := options.Apply(opts...) if err != nil { return nil, err } const maxLimit = 100 limit := options.Limit if limit == 0 || limit > maxLimit { limit = maxLimit } // We are ignoring the topic. Future versions might use a map[string]*peerCache instead where the string represents the pubsub topic d.peerCache.Lock() defer d.peerCache.Unlock() cacheSize := d.removeExpiredPeers() // Discover new records if we don't have enough if cacheSize < limit && d.listener != nil { d.Lock() iterator := d.listener.RandomNodes() iterator = enode.Filter(iterator, d.evaluateNode) defer iterator.Close() doneCh := make(chan struct{}) d.wg.Add(1) go d.iterate(ctx, iterator, limit, doneCh) select { case <-ctx.Done(): case <-doneCh: } d.Unlock() } // Randomize and fill channel with available records count := len(d.peerCache.recs) if limit < count { count = limit } chPeer := make(chan peer.AddrInfo, count) perm := d.peerCache.rng.Perm(len(d.peerCache.recs))[0:count] permSet := make(map[int]int) for i, v := range perm { permSet[v] = i } sendLst := make([]*peer.AddrInfo, count) iter := 0 for k := range d.peerCache.recs { if sendIndex, ok := permSet[iter]; ok { peerInfo := d.peerCache.recs[k].peer sendLst[sendIndex] = &peerInfo } iter++ } for _, send := range sendLst { chPeer <- *send } close(chPeer) return chPeer, err }