feat: use dns discovery if a enrtree is used in the DiscV5BootstrapNodes config (#2814)

This commit is contained in:
Richard Ramos 2022-09-15 10:32:54 -04:00 committed by GitHub
parent e49236a83c
commit 208f075b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 76 additions and 40 deletions

View File

@ -95,7 +95,7 @@ type Waku struct {
node *node.WakuNode // reference to a libp2p waku node node *node.WakuNode // reference to a libp2p waku node
appDB *sql.DB appDB *sql.DB
dnsAddressCache map[string][]multiaddr.Multiaddr // Map to store the multiaddresses returned by dns discovery dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery
dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map
filters *common.Filters // Message filters installed with Subscribe function filters *common.Filters // Message filters installed with Subscribe function
@ -151,7 +151,7 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appDB *sql.DB) (*Waku,
sendQueue: make(chan *pb.WakuMessage, 1000), sendQueue: make(chan *pb.WakuMessage, 1000),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
quit: make(chan struct{}), quit: make(chan struct{}),
dnsAddressCache: make(map[string][]multiaddr.Multiaddr), dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode),
dnsAddressCacheLock: &sync.RWMutex{}, dnsAddressCacheLock: &sync.RWMutex{},
storeMsgIDs: make(map[gethcommon.Hash]bool), storeMsgIDs: make(map[gethcommon.Hash]bool),
storeMsgIDsMu: sync.RWMutex{}, storeMsgIDsMu: sync.RWMutex{},
@ -233,14 +233,10 @@ func New(nodeKey string, cfg *Config, logger *zap.Logger, appDB *sql.DB) (*Waku,
} }
if cfg.EnableDiscV5 { if cfg.EnableDiscV5 {
var bootnodes []*enode.Node bootnodes, err := waku.getDiscV5BootstrapNodes(cfg.DiscV5BootstrapNodes)
for _, addr := range cfg.DiscV5BootstrapNodes {
bootnode, err := enode.Parse(enode.ValidSchemes, addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
bootnodes = append(bootnodes, bootnode)
}
opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit)))) opts = append(opts, node.WithDiscoveryV5(cfg.UDPPort, bootnodes, cfg.AutoUpdate, pubsub.WithDiscoveryOpts(discovery.Limit(cfg.DiscoveryLimit))))
} }
@ -322,7 +318,7 @@ func (w *Waku) SubscribeToConnStatusChanges() *types.ConnStatusSubscription {
return subscription return subscription
} }
type fnApplyToEachPeer func(ma multiaddr.Multiaddr, protocol libp2pproto.ID) type fnApplyToEachPeer func(d dnsdisc.DiscoveredNode, protocol libp2pproto.ID)
func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApplyToEachPeer) { func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApplyToEachPeer) {
for _, addrString := range addresses { for _, addrString := range addresses {
@ -340,22 +336,55 @@ func (w *Waku) addPeers(addresses []string, protocol libp2pproto.ID, apply fnApp
} }
} }
func (w *Waku) getDiscV5BootstrapNodes(addresses []string) ([]*enode.Node, error) {
wg := sync.WaitGroup{}
mu := sync.Mutex{}
var result []*enode.Node
retrieveENR := func(d dnsdisc.DiscoveredNode, protocol libp2pproto.ID) {
mu.Lock()
defer mu.Unlock()
result = append(result, d.ENR)
}
for _, addrString := range addresses {
if addrString == "" {
continue
}
if strings.HasPrefix(addrString, "enrtree://") {
// Use DNS Discovery
wg.Add(1)
go func(addr string) {
defer wg.Done()
w.dnsDiscover(addr, libp2pproto.ID(""), retrieveENR)
}(addrString)
} else {
// It's a normal enr
bootnode, err := enode.Parse(enode.ValidSchemes, addrString)
if err != nil {
return nil, err
}
result = append(result, bootnode)
}
}
wg.Wait()
return result, nil
}
func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply fnApplyToEachPeer) { func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply fnApplyToEachPeer) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
w.dnsAddressCacheLock.RLock() w.dnsAddressCacheLock.RLock()
multiaddresses, ok := w.dnsAddressCache[enrtreeAddress] discNodes, ok := w.dnsAddressCache[enrtreeAddress]
w.dnsAddressCacheLock.RUnlock() w.dnsAddressCacheLock.RUnlock()
if !ok { if !ok {
w.dnsAddressCacheLock.Lock() w.dnsAddressCacheLock.Lock()
var err error var err error
discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrtreeAddress) discoveredNodes, err := dnsdisc.RetrieveNodes(ctx, enrtreeAddress)
for _, d := range discoveredNodes { w.dnsAddressCache[enrtreeAddress] = append(w.dnsAddressCache[enrtreeAddress], discoveredNodes...)
w.dnsAddressCache[enrtreeAddress] = append(w.dnsAddressCache[enrtreeAddress], d.Addresses...)
}
w.dnsAddressCacheLock.Unlock() w.dnsAddressCacheLock.Unlock()
if err != nil { if err != nil {
log.Warn("dns discovery error ", err) log.Warn("dns discovery error ", err)
@ -363,8 +392,8 @@ func (w *Waku) dnsDiscover(enrtreeAddress string, protocol libp2pproto.ID, apply
} }
} }
for _, m := range multiaddresses { for _, d := range discNodes {
apply(m, protocol) apply(d, protocol)
} }
} }
@ -375,12 +404,16 @@ func (w *Waku) addPeerFromString(addrString string, protocol libp2pproto.ID, app
return return
} }
apply(addr, protocol) d := dnsdisc.DiscoveredNode{
Addresses: []multiaddr.Multiaddr{addr},
}
apply(d, protocol)
} }
func (w *Waku) addWakuV2Peers(cfg *Config) { func (w *Waku) addWakuV2Peers(cfg *Config) {
if !cfg.LightClient { if !cfg.LightClient {
addRelayPeer := func(m multiaddr.Multiaddr, protocol libp2pproto.ID) { addRelayPeer := func(d dnsdisc.DiscoveredNode, protocol libp2pproto.ID) {
for _, m := range d.Addresses {
go func(node multiaddr.Multiaddr) { go func(node multiaddr.Multiaddr) {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel() defer cancel()
@ -392,10 +425,12 @@ func (w *Waku) addWakuV2Peers(cfg *Config) {
} }
}(m) }(m)
} }
}
w.addPeers(cfg.RelayNodes, relay.WakuRelayID_v200, addRelayPeer) w.addPeers(cfg.RelayNodes, relay.WakuRelayID_v200, addRelayPeer)
} }
addToStore := func(m multiaddr.Multiaddr, protocol libp2pproto.ID) { addToPeerStore := func(d dnsdisc.DiscoveredNode, protocol libp2pproto.ID) {
for _, m := range d.Addresses {
peerID, err := w.node.AddPeer(m, string(protocol)) peerID, err := w.node.AddPeer(m, string(protocol))
if err != nil { if err != nil {
log.Warn("could not add peer", "multiaddr", m, "err", err) log.Warn("could not add peer", "multiaddr", m, "err", err)
@ -403,11 +438,12 @@ func (w *Waku) addWakuV2Peers(cfg *Config) {
} }
log.Info("peer added successfully", "peerId", peerID) log.Info("peer added successfully", "peerId", peerID)
} }
}
w.addPeers(cfg.StoreNodes, store.StoreID_v20beta4, addToStore) w.addPeers(cfg.StoreNodes, store.StoreID_v20beta4, addToPeerStore)
w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1, addToStore) w.addPeers(cfg.FilterNodes, filter.FilterID_v20beta1, addToPeerStore)
w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1, addToStore) w.addPeers(cfg.LightpushNodes, lightpush.LightPushID_v20beta1, addToPeerStore)
w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001, addToStore) w.addPeers(cfg.WakuRendezvousNodes, rendezvous.RendezvousID_v001, addToPeerStore)
} }
func (w *Waku) GetStats() types.StatsSummary { func (w *Waku) GetStats() types.StatsSummary {