feat_: integrate on-demand DNS discovery and implement discoverAndConnectPeers (#6017)

This commit is contained in:
gabrielmer 2024-10-31 16:40:13 +02:00 committed by GitHub
parent 6ac236d380
commit d17610d280
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 102 additions and 20 deletions

View File

@ -292,6 +292,20 @@ package wakuv2
resp));
}
static void cGoWakuDnsDiscovery(void* wakuCtx,
const char* entTreeUrl,
const char* nameDnsServer,
int timeoutMs,
void* resp) {
WAKU_CALL (waku_dns_discovery(wakuCtx,
entTreeUrl,
nameDnsServer,
timeoutMs,
(WakuCallBack) callback,
resp));
}
*/
import "C"
@ -837,24 +851,30 @@ func (w *Waku) retryDnsDiscoveryWithBackoff(ctx context.Context, addr string, su
}
}
*/
func (w *Waku) discoverAndConnectPeers() {
fnApply := func(d dnsdisc.DiscoveredNode, wg *sync.WaitGroup) {
defer wg.Done()
if len(d.PeerInfo.Addrs) != 0 {
go w.connect(d.PeerInfo, d.ENR, wps.DNSDiscovery)
}
var addrsToConnect []multiaddr.Multiaddr
nameserver := w.cfg.Nameserver
if nameserver == "" {
nameserver = "8.8.8.8"
}
timeout := int(requestTimeout / time.Millisecond)
for _, addrString := range w.cfg.WakuNodes {
addrString := addrString
if strings.HasPrefix(addrString, "enrtree://") {
// Use DNS Discovery
go func() {
defer gocommon.LogOnPanic()
if err := w.dnsDiscover(w.ctx, addrString, fnApply, false); err != nil {
w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString))
}
}()
res, err := w.WakuDnsDiscovery(addrString, nameserver, timeout)
if err != nil {
w.logger.Error("could not obtain dns discovery peers for ClusterConfig.WakuNodes", zap.Error(err), zap.String("dnsDiscURL", addrString))
continue
}
for _, ma := range res {
addrsToConnect = append(addrsToConnect, ma)
}
} else {
// It is a normal multiaddress
addr, err := multiaddr.NewMultiaddr(addrString)
@ -862,17 +882,16 @@ func (w *Waku) discoverAndConnectPeers() {
w.logger.Warn("invalid peer multiaddress", zap.String("ma", addrString), zap.Error(err))
continue
}
peerInfo, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
w.logger.Warn("invalid peer multiaddress", zap.Stringer("addr", addr), zap.Error(err))
continue
}
go w.connect(*peerInfo, nil, wps.Static)
addrsToConnect = append(addrsToConnect, addr)
}
}
} */
// Now connect to all the Multiaddresses
for _, ma := range addrsToConnect {
w.WakuConnect(ma.String(), timeout)
}
}
func (w *Waku) connect(peerInfo peer.AddrInfo, enr *enode.Node, origin wps.Origin) {
defer gocommon.LogOnPanic()
@ -2726,6 +2745,38 @@ func (self *Waku) WakuDialPeerById(peerId peer.ID, protocol string, timeoutMs in
return errors.New(errMsg)
}
func (self *Waku) WakuDnsDiscovery(entTreeUrl string, nameDnsServer string, timeoutMs int) ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
var cEnrTree = C.CString(entTreeUrl)
var cDnsServer = C.CString(nameDnsServer)
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cEnrTree))
defer C.free(unsafe.Pointer(cDnsServer))
C.cGoWakuDnsDiscovery(self.wakuCtx, cEnrTree, cDnsServer, C.int(timeoutMs), resp)
if C.getRet(resp) == C.RET_OK {
var addrsRet []multiaddr.Multiaddr
nodeAddresses := C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
addrss := strings.Split(nodeAddresses, ",")
for _, addr := range addrss {
addr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
addrsRet = append(addrsRet, addr)
}
return addrsRet, nil
}
errMsg := "error WakuDnsDiscovery: " +
C.GoStringN(C.getMyCharPtr(resp), C.int(C.getMyCharLen(resp)))
return nil, errors.New(errMsg)
}
func (self *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) {
var resp = C.allocResp()
defer C.freeResp(resp)

View File

@ -633,6 +633,37 @@ func TestDial(t *testing.T) {
}
func TestDnsDiscover(t *testing.T) {
logger, err := zap.NewDevelopment()
require.NoError(t, err)
nodeConfig := Config{
UseThrottledPublish: true,
ClusterID: 16,
Nameserver: "8.8.8.8",
}
nodeWakuConfig := WakuConfig{
EnableRelay: true,
LogLevel: "DEBUG",
ClusterID: 16,
Shards: []uint16{64},
Discv5UdpPort: 9040,
TcpPort: 60040,
}
node, err := New(nil, "", &nodeConfig, &nodeWakuConfig, logger.Named("node"), nil, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, node.Start())
time.Sleep(1 * time.Second)
sampleEnrTree := "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"
res, err := node.WakuDnsDiscovery(sampleEnrTree, nodeConfig.Nameserver, int(requestTimeout/time.Millisecond))
require.NoError(t, err)
require.True(t, len(res) > 1, "multiple nodes should be returned from the DNS Discovery query")
// Stop nodes
require.NoError(t, node.Stop())
}
/*
func TestWakuV2Filter(t *testing.T) {