go-waku/waku/v2/utils/peer.go
2022-10-19 15:54:02 -04:00

117 lines
2.9 KiB
Go

package utils
import (
"context"
"errors"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/status-im/go-waku/logging"
"go.uber.org/zap"
)
// ErrNoPeersAvailable is emitted when no suitable peers are found for
// some protocol
var ErrNoPeersAvailable = errors.New("no suitable peers found")
// SelectPeer is used to return a random peer that supports a given protocol.
func SelectPeer(host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
// - default store peer?
var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer))
return nil, err
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
if len(peers) >= 1 {
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
return &peers[rand.Intn(len(peers))], nil // nolint: gosec
}
return nil, ErrNoPeersAvailable
}
type pingResult struct {
p peer.ID
rtt time.Duration
}
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) {
var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
log.Error("error obtaining the protocols supported by peers", zap.Error(err))
return nil, err
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
wg := sync.WaitGroup{}
waitCh := make(chan struct{})
pingCh := make(chan pingResult, 1000)
wg.Add(len(peers))
go func() {
for _, p := range peers {
go func(p peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
result := <-ping.Ping(ctx, host, p)
if result.Error == nil {
pingCh <- pingResult{
p: p,
rtt: result.RTT,
}
}
}(p)
}
wg.Wait()
close(waitCh)
close(pingCh)
}()
select {
case <-waitCh:
var min *pingResult
for p := range pingCh {
if min == nil {
min = &p
} else {
if p.rtt < min.rtt {
min = &p
}
}
}
if min == nil {
return nil, ErrNoPeersAvailable
}
return &min.p, nil
case <-ctx.Done():
return nil, ErrNoPeersAvailable
}
}