mirror of https://github.com/status-im/go-waku.git
refactor: extract ping interface
This commit is contained in:
parent
38be0dc169
commit
6bdf125dd1
|
@ -0,0 +1,37 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
)
|
||||
|
||||
type Pinger interface {
|
||||
PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error)
|
||||
}
|
||||
|
||||
type defaultPingImpl struct {
|
||||
host host.Host
|
||||
}
|
||||
|
||||
func NewDefaultPinger(host host.Host) Pinger {
|
||||
return &defaultPingImpl{
|
||||
host: host,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||
pingResultCh := ping.Ping(ctx, d.host, peerID)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case r := <-pingResultCh:
|
||||
if r.Error != nil {
|
||||
return 0, r.Error
|
||||
}
|
||||
return r.RTT, nil
|
||||
}
|
||||
}
|
|
@ -14,9 +14,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -55,9 +54,8 @@ type StorenodeCycle struct {
|
|||
|
||||
logger *zap.Logger
|
||||
|
||||
host host.Host
|
||||
|
||||
storenodeConfigProvider StorenodeConfigProvider
|
||||
pinger common.Pinger
|
||||
|
||||
StorenodeAvailableOneshotEmitter *OneShotEmitter[struct{}]
|
||||
StorenodeChangedEmitter *Emitter[peer.ID]
|
||||
|
@ -71,7 +69,7 @@ type StorenodeCycle struct {
|
|||
peers map[peer.ID]peerStatus
|
||||
}
|
||||
|
||||
func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
|
||||
func NewStorenodeCycle(logger *zap.Logger, pinger common.Pinger) *StorenodeCycle {
|
||||
return &StorenodeCycle{
|
||||
StorenodeAvailableOneshotEmitter: NewOneshotEmitter[struct{}](),
|
||||
StorenodeChangedEmitter: NewEmitter[peer.ID](),
|
||||
|
@ -81,9 +79,8 @@ func NewStorenodeCycle(logger *zap.Logger) *StorenodeCycle {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) Start(ctx context.Context, h host.Host) {
|
||||
func (m *StorenodeCycle) Start(ctx context.Context) {
|
||||
m.logger.Debug("starting storenode cycle")
|
||||
m.host = h
|
||||
m.failedRequests = make(map[peer.ID]uint)
|
||||
m.peers = make(map[peer.ID]peerStatus)
|
||||
|
||||
|
@ -194,7 +191,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
|||
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||
defer cancel()
|
||||
|
||||
rtt, err := m.pingPeer(ctx, peerID)
|
||||
rtt, err := m.pinger.PingPeer(ctx, peerID)
|
||||
if err == nil { // pinging storenodes might fail, but we don't care
|
||||
availableStorenodesMutex.Lock()
|
||||
availableStorenodes[peerID] = rtt
|
||||
|
@ -233,19 +230,6 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context,
|
|||
return result
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) pingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
|
||||
pingResultCh := ping.Ping(ctx, m.host, peerID)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
case r := <-pingResultCh:
|
||||
if r.Error != nil {
|
||||
return 0, r.Error
|
||||
}
|
||||
return r.RTT, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error {
|
||||
// we have to override DNS manually because of https://github.com/status-im/status-mobile/issues/19581
|
||||
if overrideDNS {
|
||||
|
|
Loading…
Reference in New Issue