719 lines
19 KiB
Go
719 lines
19 KiB
Go
package protocol
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"math"
|
|
"math/big"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
"github.com/pkg/errors"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/go-waku/waku/v2/dnsdisc"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/status-im/status-go/params"
|
|
"github.com/status-im/status-go/services/mailservers"
|
|
"github.com/status-im/status-go/signal"
|
|
)
|
|
|
|
const defaultBackoff = 30 * time.Second
|
|
|
|
type byRTTMs []*mailservers.PingResult
|
|
|
|
func (s byRTTMs) Len() int {
|
|
return len(s)
|
|
}
|
|
|
|
func (s byRTTMs) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
func (s byRTTMs) Less(i, j int) bool {
|
|
return *s[i].RTTMs < *s[j].RTTMs
|
|
}
|
|
|
|
func (m *Messenger) StartMailserverCycle() error {
|
|
canUseMailservers, err := m.settings.CanUseMailservers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !canUseMailservers {
|
|
return errors.New("mailserver use is not allowed")
|
|
}
|
|
|
|
m.logger.Debug("started mailserver cycle")
|
|
|
|
m.mailserverCycle.events = make(chan *p2p.PeerEvent, 20)
|
|
m.mailserverCycle.subscription = m.server.SubscribeEvents(m.mailserverCycle.events)
|
|
|
|
go m.checkMailserverConnection()
|
|
go m.updateWakuV1PeerStatus()
|
|
go m.updateWakuV2PeerStatus()
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) DisconnectActiveMailserver() {
|
|
m.mailserverCycle.Lock()
|
|
defer m.mailserverCycle.Unlock()
|
|
m.disconnectActiveMailserver()
|
|
}
|
|
|
|
func (m *Messenger) disconnectV1Mailserver() {
|
|
// TODO: remove this function once WakuV1 is deprecated
|
|
if m.mailserverCycle.activeMailserver == nil {
|
|
return
|
|
}
|
|
m.logger.Info("Disconnecting active mailserver", zap.Any("nodeID", m.mailserverCycle.activeMailserver.ID()))
|
|
pInfo, ok := m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()]
|
|
if ok {
|
|
pInfo.status = disconnected
|
|
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
|
|
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] = pInfo
|
|
} else {
|
|
m.mailserverCycle.peers[m.mailserverCycle.activeMailserver.ID().String()] = peerStatus{
|
|
status: disconnected,
|
|
canConnectAfter: time.Now().Add(defaultBackoff),
|
|
}
|
|
}
|
|
|
|
m.server.RemovePeer(m.mailserverCycle.activeMailserver)
|
|
m.mailserverCycle.activeMailserver = nil
|
|
}
|
|
|
|
func (m *Messenger) disconnectStoreNode() {
|
|
if m.mailserverCycle.activeStoreNode == nil {
|
|
return
|
|
}
|
|
m.logger.Info("Disconnecting active storeNode", zap.Any("nodeID", m.mailserverCycle.activeStoreNode.Pretty()))
|
|
pInfo, ok := m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)]
|
|
if ok {
|
|
pInfo.status = disconnected
|
|
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
|
|
m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] = pInfo
|
|
} else {
|
|
m.mailserverCycle.peers[string(*m.mailserverCycle.activeStoreNode)] = peerStatus{
|
|
status: disconnected,
|
|
canConnectAfter: time.Now().Add(defaultBackoff),
|
|
}
|
|
}
|
|
|
|
err := m.transport.DropPeer(string(*m.mailserverCycle.activeStoreNode))
|
|
if err != nil {
|
|
m.logger.Warn("Could not drop peer")
|
|
}
|
|
|
|
m.mailserverCycle.activeStoreNode = nil
|
|
}
|
|
|
|
func (m *Messenger) disconnectActiveMailserver() {
|
|
switch m.transport.WakuVersion() {
|
|
case 1:
|
|
m.disconnectV1Mailserver()
|
|
case 2:
|
|
m.disconnectStoreNode()
|
|
}
|
|
signal.SendMailserverChanged("")
|
|
}
|
|
|
|
func (m *Messenger) cycleMailservers() {
|
|
m.mailserverCycle.Lock()
|
|
defer m.mailserverCycle.Unlock()
|
|
|
|
m.logger.Info("Automatically switching mailserver")
|
|
|
|
if m.mailserverCycle.activeMailserver != nil {
|
|
m.disconnectActiveMailserver()
|
|
}
|
|
|
|
err := m.findNewMailserver()
|
|
if err != nil {
|
|
m.logger.Error("Error getting new mailserver", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
func poolSize(fleetSize int) int {
|
|
return int(math.Ceil(float64(fleetSize) / 4))
|
|
}
|
|
|
|
func (m *Messenger) findNewMailserver() error {
|
|
switch m.transport.WakuVersion() {
|
|
case 1:
|
|
return m.findNewMailserverV1()
|
|
case 2:
|
|
return m.findStoreNode()
|
|
default:
|
|
return errors.New("waku version is not supported")
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) findStoreNode() error {
|
|
allMailservers := parseStoreNodeConfig(m.config.clusterConfig.StoreNodes)
|
|
|
|
// TODO: append user mailservers once that functionality is available for waku2
|
|
|
|
var mailserverList []multiaddr.Multiaddr
|
|
now := time.Now()
|
|
for _, node := range allMailservers {
|
|
pID, err := getPeerID(node)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
pInfo, ok := m.mailserverCycle.peers[string(pID)]
|
|
if !ok || pInfo.canConnectAfter.Before(now) {
|
|
mailserverList = append(mailserverList, node)
|
|
}
|
|
}
|
|
|
|
m.logger.Info("Finding a new store node...")
|
|
|
|
var mailserverStr []string
|
|
for _, m := range mailserverList {
|
|
mailserverStr = append(mailserverStr, m.String())
|
|
}
|
|
|
|
pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, mailservers.MultiAddressToAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var availableMailservers []*mailservers.PingResult
|
|
for _, result := range pingResult {
|
|
if result.Err != nil {
|
|
continue // The results with error are ignored
|
|
}
|
|
availableMailservers = append(availableMailservers, result)
|
|
}
|
|
sort.Sort(byRTTMs(availableMailservers))
|
|
|
|
if len(availableMailservers) == 0 {
|
|
m.logger.Warn("No store nodes available") // Do nothing...
|
|
return nil
|
|
}
|
|
|
|
// Picks a random mailserver amongs the ones with the lowest latency
|
|
// The pool size is 1/4 of the mailservers were pinged successfully
|
|
pSize := poolSize(len(availableMailservers) - 1)
|
|
if pSize <= 0 {
|
|
m.logger.Warn("No store nodes available") // Do nothing...
|
|
return nil
|
|
}
|
|
|
|
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.connectToStoreNode(parseMultiaddresses([]string{availableMailservers[r.Int64()].Address})[0])
|
|
}
|
|
|
|
func (m *Messenger) getFleet() (string, error) {
|
|
var fleet string
|
|
dbFleet, err := m.settings.GetFleet()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if dbFleet != "" {
|
|
fleet = dbFleet
|
|
} else if m.config.clusterConfig.Fleet != "" {
|
|
fleet = m.config.clusterConfig.Fleet
|
|
} else {
|
|
fleet = params.FleetProd
|
|
}
|
|
return fleet, nil
|
|
}
|
|
|
|
func (m *Messenger) findNewMailserverV1() error {
|
|
// TODO: remove this function once WakuV1 is deprecated
|
|
|
|
allMailservers := parseNodes(m.config.clusterConfig.TrustedMailServers)
|
|
|
|
// Append user mailservers
|
|
fleet, err := m.getFleet()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
customMailservers, err := m.mailservers.Mailservers()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, c := range customMailservers {
|
|
if c.Fleet == fleet {
|
|
mNode, err := enode.ParseV4(c.Address)
|
|
if err != nil {
|
|
allMailservers = append(allMailservers, mNode)
|
|
}
|
|
}
|
|
}
|
|
|
|
var mailserverList []*enode.Node
|
|
now := time.Now()
|
|
for _, node := range allMailservers {
|
|
pInfo, ok := m.mailserverCycle.peers[node.ID().String()]
|
|
if !ok || pInfo.canConnectAfter.Before(now) {
|
|
mailserverList = append(mailserverList, node)
|
|
}
|
|
}
|
|
|
|
m.logger.Info("Finding a new mailserver...")
|
|
|
|
var mailserverStr []string
|
|
for _, m := range mailserverList {
|
|
mailserverStr = append(mailserverStr, m.String())
|
|
}
|
|
|
|
pingResult, err := mailservers.DoPing(context.Background(), mailserverStr, 500, mailservers.EnodeStringToAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var availableMailservers []*mailservers.PingResult
|
|
for _, result := range pingResult {
|
|
if result.Err != nil {
|
|
continue // The results with error are ignored
|
|
}
|
|
availableMailservers = append(availableMailservers, result)
|
|
}
|
|
sort.Sort(byRTTMs(availableMailservers))
|
|
|
|
if len(availableMailservers) == 0 {
|
|
m.logger.Warn("No mailservers available") // Do nothing...
|
|
return nil
|
|
}
|
|
|
|
// Picks a random mailserver amongs the ones with the lowest latency
|
|
// The pool size is 1/4 of the mailservers were pinged successfully
|
|
pSize := poolSize(len(availableMailservers) - 1)
|
|
r, err := rand.Int(rand.Reader, big.NewInt(int64(pSize)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return m.connectToMailserver(parseNodes([]string{availableMailservers[r.Int64()].Address})[0])
|
|
}
|
|
|
|
func (m *Messenger) activeMailserverStatus() (connStatus, error) {
|
|
var mailserverID string
|
|
switch m.transport.WakuVersion() {
|
|
case 1:
|
|
if m.mailserverCycle.activeMailserver == nil {
|
|
return disconnected, errors.New("Active mailserver is not set")
|
|
}
|
|
mailserverID = m.mailserverCycle.activeMailserver.ID().String()
|
|
case 2:
|
|
if m.mailserverCycle.activeStoreNode == nil {
|
|
return disconnected, errors.New("Active storenode is not set")
|
|
}
|
|
mailserverID = string(*m.mailserverCycle.activeStoreNode)
|
|
default:
|
|
return disconnected, errors.New("waku version is not supported")
|
|
}
|
|
|
|
return m.mailserverCycle.peers[mailserverID].status, nil
|
|
}
|
|
|
|
func (m *Messenger) connectToMailserver(node *enode.Node) error {
|
|
// TODO: remove this function once WakuV1 is deprecated
|
|
|
|
if m.transport.WakuVersion() != 1 {
|
|
return nil // This can only be used with wakuV1
|
|
}
|
|
|
|
m.logger.Info("Connecting to mailserver", zap.Any("peer", node.ID()))
|
|
nodeConnected := false
|
|
|
|
m.mailserverCycle.activeMailserver = node
|
|
signal.SendMailserverChanged(m.mailserverCycle.activeMailserver.String())
|
|
|
|
// Adding a peer and marking it as connected can't be executed sync in WakuV1, because
|
|
// There's a delay between requesting a peer being added, and a signal being
|
|
// received after the peer was added. So we first set the peer status as
|
|
// Connecting and once a peerConnected signal is received, we mark it as
|
|
// Connected
|
|
activeMailserverStatus, err := m.activeMailserverStatus()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if activeMailserverStatus == connected {
|
|
nodeConnected = true
|
|
} else {
|
|
// Attempt to connect to mailserver by adding it as a peer
|
|
m.SetMailserver(node.ID().Bytes())
|
|
m.server.AddPeer(node)
|
|
if err := m.peerStore.Update([]*enode.Node{node}); err != nil {
|
|
return err
|
|
}
|
|
|
|
pInfo, ok := m.mailserverCycle.peers[node.ID().String()]
|
|
if ok {
|
|
pInfo.status = connecting
|
|
pInfo.lastConnectionAttempt = time.Now()
|
|
m.mailserverCycle.peers[node.ID().String()] = pInfo
|
|
} else {
|
|
m.mailserverCycle.peers[node.ID().String()] = peerStatus{
|
|
status: connecting,
|
|
lastConnectionAttempt: time.Now(),
|
|
}
|
|
}
|
|
}
|
|
|
|
if nodeConnected {
|
|
m.logger.Info("Mailserver available")
|
|
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.String())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) connectToStoreNode(node multiaddr.Multiaddr) error {
|
|
if m.transport.WakuVersion() != 2 {
|
|
return nil // This can only be used with wakuV2
|
|
}
|
|
|
|
m.logger.Info("Connecting to storenode", zap.Any("peer", node))
|
|
|
|
nodeConnected := false
|
|
|
|
peerID, err := getPeerID(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
m.mailserverCycle.activeStoreNode = &peerID
|
|
signal.SendMailserverChanged(m.mailserverCycle.activeStoreNode.Pretty())
|
|
|
|
// Adding a peer and marking it as connected can't be executed sync in WakuV1, because
|
|
// There's a delay between requesting a peer being added, and a signal being
|
|
// received after the peer was added. So we first set the peer status as
|
|
// Connecting and once a peerConnected signal is received, we mark it as
|
|
// Connected
|
|
activeMailserverStatus, err := m.activeMailserverStatus()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if activeMailserverStatus == connected {
|
|
nodeConnected = true
|
|
} else {
|
|
// Attempt to connect to mailserver by adding it as a peer
|
|
m.SetMailserver([]byte(peerID.Pretty()))
|
|
if err := m.transport.DialPeer(node.String()); err != nil {
|
|
return err
|
|
}
|
|
|
|
pInfo, ok := m.mailserverCycle.peers[string(peerID)]
|
|
if ok {
|
|
pInfo.status = connected
|
|
pInfo.lastConnectionAttempt = time.Now()
|
|
} else {
|
|
m.mailserverCycle.peers[string(peerID)] = peerStatus{
|
|
status: connected,
|
|
lastConnectionAttempt: time.Now(),
|
|
}
|
|
}
|
|
|
|
nodeConnected = true
|
|
}
|
|
|
|
if nodeConnected {
|
|
m.logger.Info("Storenode available")
|
|
signal.SendMailserverAvailable(m.mailserverCycle.activeStoreNode.Pretty())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Messenger) getActiveMailserver() *enode.Node {
|
|
m.mailserverCycle.RLock()
|
|
defer m.mailserverCycle.RUnlock()
|
|
return m.mailserverCycle.activeMailserver
|
|
}
|
|
|
|
func (m *Messenger) isActiveMailserverAvailable() bool {
|
|
m.mailserverCycle.RLock()
|
|
defer m.mailserverCycle.RUnlock()
|
|
|
|
mailserverStatus, err := m.activeMailserverStatus()
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
return mailserverStatus == connected
|
|
}
|
|
|
|
func (m *Messenger) updateWakuV2PeerStatus() {
|
|
if m.transport.WakuVersion() != 2 {
|
|
return // This can only be used with wakuV2
|
|
}
|
|
|
|
connSubscription, err := m.transport.SubscribeToConnStatusChanges()
|
|
if err != nil {
|
|
m.logger.Error("Could not subscribe to connection status changes", zap.Error(err))
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case status := <-connSubscription.C:
|
|
m.mailserverCycle.Lock()
|
|
|
|
for pID, pInfo := range m.mailserverCycle.peers {
|
|
if pInfo.status == disconnected {
|
|
continue
|
|
}
|
|
|
|
// Removing disconnected
|
|
|
|
found := false
|
|
for connectedPeer := range status.Peers {
|
|
peerID, err := peer.Decode(connectedPeer)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if string(peerID) == pID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found && pInfo.status == connected {
|
|
m.logger.Info("Peer disconnected", zap.String("peer", peer.ID(pID).Pretty()))
|
|
pInfo.status = disconnected
|
|
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
|
|
}
|
|
|
|
m.mailserverCycle.peers[pID] = pInfo
|
|
}
|
|
|
|
for connectedPeer := range status.Peers {
|
|
peerID, err := peer.Decode(connectedPeer)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
pInfo, ok := m.mailserverCycle.peers[string(peerID)]
|
|
if !ok || pInfo.status != connected {
|
|
m.logger.Info("Peer connected", zap.String("peer", connectedPeer))
|
|
pInfo.status = connected
|
|
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
|
|
m.mailserverCycle.peers[string(peerID)] = pInfo
|
|
}
|
|
}
|
|
m.mailserverCycle.Unlock()
|
|
|
|
case <-m.quit:
|
|
connSubscription.Unsubscribe()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) updateWakuV1PeerStatus() {
|
|
// TODO: remove this function once WakuV1 is deprecated
|
|
|
|
if m.transport.WakuVersion() != 1 {
|
|
return // This can only be used with wakuV1
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-m.mailserverCycle.events:
|
|
connectedPeers := m.server.PeersInfo()
|
|
m.mailserverCycle.Lock()
|
|
|
|
for pID, pInfo := range m.mailserverCycle.peers {
|
|
if pInfo.status == disconnected {
|
|
continue
|
|
}
|
|
|
|
// Removing disconnected
|
|
|
|
found := false
|
|
for _, connectedPeer := range connectedPeers {
|
|
if enode.HexID(connectedPeer.ID) == enode.HexID(pID) {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found && (pInfo.status == connected || (pInfo.status == connecting && pInfo.lastConnectionAttempt.Add(8*time.Second).Before(time.Now()))) {
|
|
m.logger.Info("Peer disconnected", zap.String("peer", enode.HexID(pID).String()))
|
|
pInfo.status = disconnected
|
|
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
|
|
}
|
|
|
|
m.mailserverCycle.peers[pID] = pInfo
|
|
}
|
|
|
|
for _, connectedPeer := range connectedPeers {
|
|
hexID := enode.HexID(connectedPeer.ID).String()
|
|
pInfo, ok := m.mailserverCycle.peers[hexID]
|
|
if !ok || pInfo.status != connected {
|
|
m.logger.Info("Peer connected", zap.String("peer", hexID))
|
|
pInfo.status = connected
|
|
pInfo.canConnectAfter = time.Now().Add(defaultBackoff)
|
|
if m.mailserverCycle.activeMailserver != nil && hexID == m.mailserverCycle.activeMailserver.ID().String() {
|
|
m.logger.Info("Mailserver available")
|
|
signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.String())
|
|
}
|
|
m.mailserverCycle.peers[hexID] = pInfo
|
|
}
|
|
}
|
|
m.mailserverCycle.Unlock()
|
|
case <-m.quit:
|
|
m.mailserverCycle.Lock()
|
|
defer m.mailserverCycle.Unlock()
|
|
close(m.mailserverCycle.events)
|
|
m.mailserverCycle.subscription.Unsubscribe()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Messenger) getPinnedMailserver() (string, error) {
|
|
// TODO: Pinned mailservers are ony available in V1 for now
|
|
if m.transport.WakuVersion() != 1 {
|
|
return "", nil
|
|
}
|
|
|
|
fleet, err := m.getFleet()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
pinnedMailservers, err := m.settings.GetPinnedMailservers()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
pinnedMailserver, ok := pinnedMailservers[fleet]
|
|
if !ok {
|
|
return "", nil
|
|
}
|
|
|
|
return pinnedMailserver, nil
|
|
}
|
|
|
|
func (m *Messenger) checkMailserverConnection() {
|
|
ticker := time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
m.logger.Info("Verifying mailserver connection state...")
|
|
|
|
pinnedMailserver, err := m.getPinnedMailserver()
|
|
if err != nil {
|
|
m.logger.Error("Could not obtain the pinned mailserver", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if pinnedMailserver != "" {
|
|
pinnedNode := parseNodes([]string{pinnedMailserver})[0]
|
|
|
|
activeMailserver := m.getActiveMailserver()
|
|
if activeMailserver == nil || activeMailserver.String() != pinnedMailserver {
|
|
m.logger.Info("New pinned mailserver", zap.Any("pinnedMailserver", pinnedMailserver))
|
|
err = m.connectToMailserver(pinnedNode)
|
|
if err != nil {
|
|
m.logger.Error("Could not connect to pinned mailserver", zap.Error(err))
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
// or setup a random mailserver:
|
|
if !m.isActiveMailserverAvailable() {
|
|
m.cycleMailservers()
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-m.quit:
|
|
return
|
|
case <-ticker.C:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func parseNodes(enodes []string) []*enode.Node {
|
|
var nodes []*enode.Node
|
|
for _, item := range enodes {
|
|
parsedPeer, err := enode.ParseV4(item)
|
|
if err == nil {
|
|
nodes = append(nodes, parsedPeer)
|
|
}
|
|
}
|
|
return nodes
|
|
}
|
|
|
|
func parseMultiaddresses(addresses []string) []multiaddr.Multiaddr {
|
|
var result []multiaddr.Multiaddr
|
|
for _, item := range addresses {
|
|
ma, err := multiaddr.NewMultiaddr(item)
|
|
if err == nil {
|
|
result = append(result, ma)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func parseStoreNodeConfig(addresses []string) []multiaddr.Multiaddr {
|
|
// TODO: once a scoring/reputation mechanism is added to waku,
|
|
// this function can be modified to retrieve the storenodes
|
|
// from waku peerstore.
|
|
// We don't do that now because we can't trust any random storenode
|
|
// So we use only those specified in the cluster config
|
|
var result []multiaddr.Multiaddr
|
|
var dnsDiscWg sync.WaitGroup
|
|
|
|
maChan := make(chan multiaddr.Multiaddr, 1000)
|
|
|
|
for _, addrString := range addresses {
|
|
if strings.HasPrefix(addrString, "enrtree://") {
|
|
// Use DNS Discovery
|
|
dnsDiscWg.Add(1)
|
|
go func(addr string) {
|
|
defer dnsDiscWg.Done()
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
multiaddresses, err := dnsdisc.RetrieveNodes(ctx, addr)
|
|
if err == nil {
|
|
for _, ma := range multiaddresses {
|
|
maChan <- ma
|
|
}
|
|
}
|
|
}(addrString)
|
|
|
|
} else {
|
|
// It's a normal multiaddress
|
|
ma, err := multiaddr.NewMultiaddr(addrString)
|
|
if err == nil {
|
|
maChan <- ma
|
|
}
|
|
}
|
|
}
|
|
dnsDiscWg.Wait()
|
|
close(maChan)
|
|
for ma := range maChan {
|
|
result = append(result, ma)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func getPeerID(addr multiaddr.Multiaddr) (peer.ID, error) {
|
|
idStr, err := addr.ValueForProtocol(multiaddr.P_P2P)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return peer.Decode(idStr)
|
|
}
|