chore(nwaku)_: ping storenodes using AddrInfo (#6004)

This commit is contained in:
richΛrd 2024-10-29 09:33:50 -04:00 committed by Richard Ramos
parent 948c6d3497
commit fd9d9f384f
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
21 changed files with 91 additions and 120 deletions

View File

@ -63,11 +63,6 @@ func (w *GethWakuWrapper) StopDiscV5() error {
return errors.New("not available in WakuV1")
}
// PeerCount function only added for compatibility with waku V2
func (w *GethWakuWrapper) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
return "", errors.New("not available in WakuV1")
}
// SubscribeToPubsubTopic function only added for compatibility with waku V2
func (w *GethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
// not available in WakuV1

View File

@ -206,10 +206,6 @@ func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error {
return w.waku.RemovePubsubTopicKey(topic)
}
func (w *gethWakuV2Wrapper) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
return w.waku.AddStorePeer(address)
}
func (w *gethWakuV2Wrapper) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
return w.waku.AddRelayPeer(address)
}

View File

@ -134,8 +134,6 @@ type Waku interface {
RemovePubsubTopicKey(topic string) error
AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error)
AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error)
DialPeer(address multiaddr.Multiaddr) error

2
go.mod
View File

@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a
github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2140,8 +2140,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a h1:epN2bp1mPzdg3S7S2iR72GsUPix7irc3UgM4W9NZJpU=
github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057 h1:C/UCg3Z4avOxvZEvY0JzYmeAoqZUBnSE6PK/SaxfEAM=
github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -833,18 +833,11 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
}
response := &MessengerResponse{}
storenodes, err := m.AllMailservers()
response.Mailservers, err = m.AllMailservers()
if err != nil {
return nil, err
}
err = m.setupStorenodes(storenodes)
if err != nil {
return nil, err
}
response.Mailservers = storenodes
m.transport.SetStorenodeConfigProvider(m)
if err := m.communityStorenodes.ReloadFromDB(); err != nil {

View File

@ -13,6 +13,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/api/history"
gocommon "github.com/status-im/status-go/common"
rcommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
@ -95,6 +96,8 @@ func (m *Messenger) performStorenodeTask(task func() (*MessengerResponse, error)
errCh := make(chan error)
go func() {
defer rcommon.LogOnPanic()
err := m.transport.PerformStorenodeTask(func() error {
r, err := task()
if err != nil {

View File

@ -4,8 +4,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/utils"
"github.com/status-im/status-go/common"
gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/mailservers"
@ -39,28 +38,6 @@ func (m *Messenger) AllMailservers() ([]mailservers.Mailserver, error) {
return allMailservers, nil
}
func (m *Messenger) setupStorenodes(storenodes []mailservers.Mailserver) error {
if m.transport.WakuVersion() != 2 {
return nil
}
for _, storenode := range storenodes {
peerInfo, err := storenode.PeerInfo()
if err != nil {
return err
}
for _, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
_, err := m.transport.AddStorePeer(addr)
if err != nil {
return err
}
}
}
return nil
}
func (m *Messenger) getFleet() (string, error) {
var fleet string
dbFleet, err := m.settings.GetFleet()
@ -93,69 +70,72 @@ func (m *Messenger) asyncRequestAllHistoricMessages() {
}()
}
func (m *Messenger) GetPinnedStorenode() (peer.ID, error) {
func (m *Messenger) GetPinnedStorenode() (peer.AddrInfo, error) {
fleet, err := m.getFleet()
if err != nil {
return "", err
return peer.AddrInfo{}, err
}
pinnedMailservers, err := m.settings.GetPinnedMailservers()
if err != nil {
return "", err
return peer.AddrInfo{}, err
}
pinnedMailserver, ok := pinnedMailservers[fleet]
if !ok {
return "", nil
return peer.AddrInfo{}, nil
}
fleetMailservers := mailservers.DefaultMailservers()
for _, c := range fleetMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
return c.PeerID()
return c.PeerInfo()
}
}
if m.mailserversDatabase != nil {
customMailservers, err := m.mailserversDatabase.Mailservers()
if err != nil {
return "", err
return peer.AddrInfo{}, err
}
for _, c := range customMailservers {
if c.Fleet == fleet && c.ID == pinnedMailserver {
return c.PeerID()
return c.PeerInfo()
}
}
}
return "", nil
return peer.AddrInfo{}, nil
}
func (m *Messenger) UseStorenodes() (bool, error) {
return m.settings.CanUseMailservers()
}
func (m *Messenger) Storenodes() ([]peer.ID, error) {
func (m *Messenger) Storenodes() ([]peer.AddrInfo, error) {
mailservers, err := m.AllMailservers()
if err != nil {
return nil, err
}
var result []peer.ID
var result []peer.AddrInfo
for _, m := range mailservers {
peerID, err := m.PeerID()
peerInfo, err := m.PeerInfo()
if err != nil {
return nil, err
}
result = append(result, peerID)
result = append(result, peerInfo)
}
return result, nil
}
func (m *Messenger) checkForStorenodeCycleSignals() {
defer common.LogOnPanic()
if m.transport.WakuVersion() != 2 {
return
}

View File

@ -11,10 +11,6 @@ import (
"github.com/status-im/status-go/eth-node/types"
)
func (m *Messenger) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
return m.transport.AddStorePeer(address)
}
func (m *Messenger) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
return m.transport.AddRelayPeer(address)
}

View File

@ -516,10 +516,6 @@ func (t *Transport) ENR() (*enode.Node, error) {
return t.waku.ENR()
}
func (t *Transport) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
return t.waku.AddStorePeer(address)
}
func (t *Transport) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
return t.waku.AddRelayPeer(address)
}

View File

@ -1478,14 +1478,6 @@ func (api *PublicAPI) StorePubsubTopicKey(topic string, privKey string) error {
return api.service.messenger.StorePubsubTopicKey(topic, p)
}
func (api *PublicAPI) AddStorePeer(address string) (peer.ID, error) {
maddr, err := multiaddr.NewMultiaddr(address)
if err != nil {
return "", err
}
return api.service.messenger.AddStorePeer(maddr)
}
func (api *PublicAPI) AddRelayPeer(address string) (peer.ID, error) {
maddr, err := multiaddr.NewMultiaddr(address)
if err != nil {

View File

@ -49,13 +49,13 @@ type Mailserver struct {
FailedRequests uint `json:"-"`
}
func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) {
func (m Mailserver) PeerInfo() (peer.AddrInfo, error) {
var maddrs []multiaddr.Multiaddr
if m.ENR != nil {
addrInfo, err := enr.EnodeToPeerInfo(m.ENR)
if err != nil {
return nil, err
return peer.AddrInfo{}, err
}
addrInfo.Addrs = utils.EncapsulatePeerID(addrInfo.ID, addrInfo.Addrs...)
maddrs = append(maddrs, addrInfo.Addrs...)
@ -67,14 +67,14 @@ func (m Mailserver) PeerInfo() (*peer.AddrInfo, error) {
p, err := peer.AddrInfosFromP2pAddrs(maddrs...)
if err != nil {
return nil, err
return peer.AddrInfo{}, err
}
if len(p) != 1 {
return nil, errors.New("invalid mailserver setup")
return peer.AddrInfo{}, errors.New("invalid mailserver setup")
}
return &p[0], nil
return p[0], nil
}
func (m Mailserver) PeerID() (peer.ID, error) {

2
third_party/nwaku vendored

@ -1 +1 @@
Subproject commit de11e576f4b69b63b4135cfb9549ef15cdc1ad34
Subproject commit 3665991a655495a4e47c92596e7d9e156f4ed693

View File

@ -6,11 +6,12 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)
type Pinger interface {
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error)
}
type defaultPingImpl struct {
@ -23,8 +24,9 @@ func NewDefaultPinger(host host.Host) Pinger {
}
}
func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
pingResultCh := ping.Ping(ctx, d.host, peerID)
func (d *defaultPingImpl) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
d.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL)
pingResultCh := ping.Ping(ctx, d.host, peerInfo.ID)
select {
case <-ctx.Done():
return 0, ctx.Err()

View File

@ -230,6 +230,7 @@ func (mgr *FilterManager) UnsubscribeFilter(filterID string) {
}
if len(af.sub.ContentFilter.ContentTopics) == 0 {
af.cancel()
delete(mgr.filterSubscriptions, filterConfig.ID)
} else {
go af.sub.Unsubscribe(filterConfig.contentFilter)
}

View File

@ -45,8 +45,8 @@ type peerStatus struct {
type StorenodeConfigProvider interface {
UseStorenodes() (bool, error)
GetPinnedStorenode() (peer.ID, error)
Storenodes() ([]peer.ID, error)
GetPinnedStorenode() (peer.AddrInfo, error)
Storenodes() ([]peer.AddrInfo, error)
}
type StorenodeCycle struct {
@ -104,7 +104,7 @@ func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error
}
// If no pinned storenode, no need to disconnect and wait for it to be available
if pinnedStorenode == "" {
if pinnedStorenode.ID == "" {
m.disconnectActiveStorenode(graylistBackoff)
}
@ -180,21 +180,26 @@ func poolSize(fleetSize int) int {
return int(math.Ceil(float64(fleetSize) / 4))
}
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID {
func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.AddrInfo) []peer.AddrInfo {
peerIDToInfo := make(map[peer.ID]peer.AddrInfo)
for _, p := range allStorenodes {
peerIDToInfo[p.ID] = p
}
availableStorenodes := make(map[peer.ID]time.Duration)
availableStorenodesMutex := sync.Mutex{}
availableStorenodesWg := sync.WaitGroup{}
for _, storenode := range allStorenodes {
availableStorenodesWg.Add(1)
go func(peerID peer.ID) {
go func(peerInfo peer.AddrInfo) {
defer availableStorenodesWg.Done()
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()
rtt, err := m.pinger.PingPeer(ctx, peerID)
rtt, err := m.pinger.PingPeer(ctx, peerInfo)
if err == nil { // pinging storenodes might fail, but we don't care
availableStorenodesMutex.Lock()
availableStorenodes[peerID] = rtt
availableStorenodes[peerInfo.ID] = rtt
availableStorenodesMutex.Unlock()
}
}(storenode)
@ -209,7 +214,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
var sortedStorenodes []SortedStorenode
for storenodeID, rtt := range availableStorenodes {
sortedStorenode := SortedStorenode{
Storenode: storenodeID,
Storenode: peerIDToInfo[storenodeID],
RTT: rtt,
}
m.peersMutex.Lock()
@ -222,7 +227,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
}
sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes))
result := make([]peer.ID, len(sortedStorenodes))
result := make([]peer.AddrInfo, len(sortedStorenodes))
for i, s := range sortedStorenodes {
result[i] = s.Storenode
}
@ -252,8 +257,8 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
return err
}
if pinnedStorenode != "" {
return m.setActiveStorenode(pinnedStorenode)
if pinnedStorenode.ID != "" {
return m.setActiveStorenode(pinnedStorenode.ID)
}
m.logger.Info("Finding a new storenode..")
@ -287,7 +292,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
}
ms := allStorenodes[r.Int64()]
return m.setActiveStorenode(ms)
return m.setActiveStorenode(ms.ID)
}
func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus {

View File

@ -7,7 +7,7 @@ import (
)
type SortedStorenode struct {
Storenode peer.ID
Storenode peer.AddrInfo
RTT time.Duration
CanConnectAfter time.Time
}

View File

@ -101,7 +101,9 @@ const (
const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5
const maxConnsToPeerRatio = 3
const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2
// 80% relay peers 20% service peers
func relayAndServicePeers(maxConnections int) (int, int) {
@ -256,16 +258,32 @@ func (pm *PeerManager) Start(ctx context.Context) {
}
}
func (pm *PeerManager) removeBadPeers() {
if !pm.RelayEnabled {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}
}
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval)
t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop()
defer t1.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
case <-t1.C:
pm.removeBadPeers()
}
}
}
@ -744,4 +762,9 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}

2
vendor/modules.txt vendored
View File

@ -1031,7 +1031,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20241024184757-fdb3c3d0b35a
# github.com/waku-org/go-waku v0.8.1-0.20241028194639-dd82c24e0057
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests

View File

@ -1864,14 +1864,6 @@ func (w *Waku) restartDiscV5(useOnlyDNSDiscCache bool) error {
return w.node.SetDiscV5Bootnodes(bootnodes)
}
func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300)
if err != nil {
return "", err
}
return peerID, nil
}
func (w *Waku) timestamp() int64 {
return w.timesource.Now().UnixNano()
}

View File

@ -1383,7 +1383,12 @@ func (w *Waku) Start() error {
w.HistoryRetriever = history.NewHistoryRetriever(newStorenodeRequestor(w.wakuCtx, w.logger), NewHistoryProcessorWrapper(w), w.logger)
w.StorenodeCycle.Start(w.ctx)
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", w.PeerID()))
peerID, err := w.PeerID()
if err != nil {
return err
}
w.logger.Info("WakuV2 PeerID", zap.Stringer("id", peerID))
/* TODO-nwaku
w.discoverAndConnectPeers()
@ -2142,17 +2147,6 @@ func (w *Waku) restartDiscV5(useOnlyDNSDiscCache bool) error {
}
*/
func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
// TODO-nwaku
/*
peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300)
if err != nil {
return "", err
}
return peerID, nil */
return "", nil
}
func (w *Waku) timestamp() int64 {
return w.timesource.Now().UnixNano()
}
@ -3078,9 +3072,14 @@ func newPinger(wakuCtx unsafe.Pointer) commonapi.Pinger {
}
}
func (p *pinger) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
addrs := make([]string, len(peerInfo.Addrs))
for i, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
addrs[i] = addr.String()
}
var resp = C.allocResp()
var cPeerId = C.CString(peerID.String())
var cPeerId = C.CString(strings.Join(addrs, ","))
defer C.freeResp(resp)
defer C.free(unsafe.Pointer(cPeerId))