go-waku/waku/v2/rendezvous/rendezvous.go

194 lines
6.1 KiB
Go
Raw Normal View History

2023-03-09 11:48:25 -04:00
package rendezvous
import (
"context"
2023-06-23 11:50:32 -04:00
"fmt"
2023-03-09 18:42:50 -04:00
"math"
"time"
2023-03-09 11:48:25 -04:00
"github.com/libp2p/go-libp2p/core/host"
2023-06-01 13:30:45 -04:00
rvs "github.com/waku-org/go-libp2p-rendezvous"
"github.com/waku-org/go-waku/waku/v2/peerstore"
2023-06-23 11:50:32 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/service"
2023-03-09 11:48:25 -04:00
"go.uber.org/zap"
)
2023-07-31 14:58:50 -04:00
// RendezvousID is the current protocol ID used for Rendezvous
2023-03-09 11:48:25 -04:00
const RendezvousID = rvs.RendezvousProto
2023-07-31 14:58:50 -04:00
// RegisterDefaultTTL indicates the TTL used by default when registering a node in a rendezvous point
// TODO: Register* functions should allow setting up a custom TTL
2023-07-27 13:04:08 -04:00
const RegisterDefaultTTL = rvs.DefaultTTL * time.Second
2023-07-31 14:58:50 -04:00
// Rendezvous is the implementation containing the logic to registering a node and discovering new peers using rendezvous protocol
2023-03-09 11:48:25 -04:00
type Rendezvous struct {
2023-03-09 18:42:50 -04:00
host host.Host
2023-03-09 11:48:25 -04:00
db *DB
rendezvousSvc *rvs.RendezvousService
2023-07-27 13:04:08 -04:00
peerConnector PeerConnector
2023-03-09 18:42:50 -04:00
log *zap.Logger
*service.CommonDiscoveryService
2023-03-09 11:48:25 -04:00
}
2023-07-31 14:58:50 -04:00
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
2023-07-27 13:04:08 -04:00
type PeerConnector interface {
Subscribe(context.Context, <-chan service.PeerData)
2023-07-27 13:04:08 -04:00
}
2023-03-13 20:37:28 -04:00
2023-07-31 14:58:50 -04:00
// NewRendezvous creates an instance of Rendezvous struct
2023-07-27 13:04:08 -04:00
func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
2023-03-09 11:48:25 -04:00
return &Rendezvous{
db: db,
peerConnector: peerConnector,
log: logger,
CommonDiscoveryService: service.NewCommonDiscoveryService(),
2023-03-09 11:48:25 -04:00
}
}
2023-04-16 20:04:12 -04:00
// Sets the host to be able to mount or consume a protocol
func (r *Rendezvous) SetHost(h host.Host) {
r.host = h
}
2023-03-09 11:48:25 -04:00
func (r *Rendezvous) Start(ctx context.Context) error {
return r.CommonDiscoveryService.Start(ctx, r.start)
}
2023-03-09 18:42:50 -04:00
func (r *Rendezvous) start() error {
if r.db != nil {
if err := r.db.Start(r.Context()); err != nil {
return err
}
2023-03-09 18:42:50 -04:00
}
if r.peerConnector != nil {
r.peerConnector.Subscribe(r.Context(), r.GetListeningChan())
}
2023-07-27 13:04:08 -04:00
r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)
2023-03-09 11:48:25 -04:00
r.log.Info("rendezvous protocol started")
return nil
}
2023-03-09 18:42:50 -04:00
const registerBackoff = 200 * time.Millisecond
const registerMaxRetries = 7
2023-07-31 14:58:50 -04:00
// Discover is used to find a number of peers that use the default pubsub topic
2023-07-27 13:04:08 -04:00
func (r *Rendezvous) Discover(ctx context.Context, rp *RendezvousPoint, numPeers int) {
r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic{}.String(), rp, numPeers)
2023-07-27 13:04:08 -04:00
}
2023-07-31 14:58:50 -04:00
// DiscoverShard is used to find a number of peers that support an specific cluster and shard index
2023-07-27 13:04:08 -04:00
func (r *Rendezvous) DiscoverShard(ctx context.Context, rp *RendezvousPoint, cluster uint16, shard uint16, numPeers int) {
namespace := ShardToNamespace(cluster, shard)
2023-07-31 14:58:50 -04:00
r.DiscoverWithNamespace(ctx, namespace, rp, numPeers)
2023-07-27 13:04:08 -04:00
}
// DiscoverWithNamespace is used to find a number of peers using a custom namespace (usually a pubsub topic)
2023-07-31 14:58:50 -04:00
func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string, rp *RendezvousPoint, numPeers int) {
2023-07-27 13:04:08 -04:00
rendezvousClient := rvs.NewRendezvousClient(r.host, rp.id)
2023-07-31 14:58:50 -04:00
addrInfo, cookie, err := rendezvousClient.Discover(ctx, namespace, numPeers, rp.cookie)
2023-07-27 13:04:08 -04:00
if err != nil {
r.log.Error("could not discover new peers", zap.Error(err))
rp.Delay()
return
}
2023-07-27 13:04:08 -04:00
if len(addrInfo) != 0 {
rp.SetSuccess(cookie)
2023-03-13 20:37:28 -04:00
2023-07-27 13:04:08 -04:00
for _, p := range addrInfo {
peer := service.PeerData{
Origin: peerstore.Rendezvous,
AddrInfo: p,
PubsubTopics: []string{namespace},
2023-03-13 20:37:28 -04:00
}
if !r.PushToChan(peer) {
r.log.Error("could push to closed channel/context completed")
2023-07-27 13:04:08 -04:00
return
2023-03-13 20:37:28 -04:00
}
}
2023-07-27 13:04:08 -04:00
} else {
rp.Delay()
2023-03-13 20:37:28 -04:00
}
2023-06-23 11:50:32 -04:00
}
2023-07-31 14:58:50 -04:00
func (r *Rendezvous) callRegister(ctx context.Context, namespace string, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) {
ttl, err := rendezvousClient.Register(ctx, namespace, rvs.DefaultTTL)
2023-03-09 18:42:50 -04:00
var t <-chan time.Time
if err != nil {
r.log.Error("registering rendezvous client", zap.Error(err))
backoff := registerBackoff * time.Duration(math.Exp2(float64(retries)))
t = time.After(backoff)
retries++
} else {
t = time.After(ttl)
}
return t, retries
}
2023-07-31 14:58:50 -04:00
// Register registers the node in the rendezvous points using the default pubsub topic as namespace
2023-07-27 13:04:08 -04:00
func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*RendezvousPoint) {
r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic{}.String(), rendezvousPoints)
2023-07-27 13:04:08 -04:00
}
2023-07-31 14:58:50 -04:00
// RegisterShard registers the node in the rendezvous points using a shard as namespace
2023-07-27 13:04:08 -04:00
func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) {
namespace := ShardToNamespace(cluster, shard)
2023-07-31 14:58:50 -04:00
r.RegisterWithNamespace(ctx, namespace, rendezvousPoints)
2023-07-27 13:04:08 -04:00
}
2023-07-31 14:58:50 -04:00
// RegisterRelayShards registers the node in the rendezvous point by specifying a RelayShards struct (more than one shard index can be registered)
2023-07-27 13:04:08 -04:00
func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards, rendezvousPoints []*RendezvousPoint) {
for _, idx := range rs.ShardIDs {
go r.RegisterShard(ctx, rs.ClusterID, idx, rendezvousPoints)
2023-07-27 13:04:08 -04:00
}
}
2023-07-31 14:58:50 -04:00
// RegisterWithNamespace registers the node in the rendezvous point by using an specific namespace (usually a pubsub topic)
func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string, rendezvousPoints []*RendezvousPoint) {
2023-07-27 13:04:08 -04:00
for _, m := range rendezvousPoints {
r.WaitGroup().Add(1)
2023-07-27 13:04:08 -04:00
go func(m *RendezvousPoint) {
r.WaitGroup().Done()
2023-03-09 18:42:50 -04:00
2023-03-13 20:37:28 -04:00
rendezvousClient := rvs.NewRendezvousClient(r.host, m.id)
2023-03-09 18:42:50 -04:00
retries := 0
var t <-chan time.Time
2023-07-31 14:58:50 -04:00
t, retries = r.callRegister(ctx, namespace, rendezvousClient, retries)
2023-03-09 18:42:50 -04:00
for {
select {
case <-ctx.Done():
return
case <-t:
2023-07-31 14:58:50 -04:00
t, retries = r.callRegister(ctx, namespace, rendezvousClient, retries)
2023-03-09 18:42:50 -04:00
if retries >= registerMaxRetries {
return
}
}
}
}(m)
}
}
2023-07-27 13:04:08 -04:00
func (r *Rendezvous) Stop() {
r.CommonDiscoveryService.Stop(func() {
r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil
})
2023-03-09 11:48:25 -04:00
}
2023-06-23 11:50:32 -04:00
2023-07-31 14:58:50 -04:00
// ShardToNamespace translates a cluster and shard index into a rendezvous namespace
2023-06-23 11:50:32 -04:00
func ShardToNamespace(cluster uint16, shard uint16) string {
return fmt.Sprintf("rs/%d/%d", cluster, shard)
}