mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-08 17:03:09 +00:00
fix: code review
This commit is contained in:
parent
76961f2bd8
commit
8b3f42310d
@ -326,15 +326,7 @@ func Execute(options Options) {
|
||||
|
||||
if len(options.Rendezvous.Nodes) != 0 {
|
||||
// Register the node in rendezvous point
|
||||
var rp []peer.ID
|
||||
for _, n := range options.Rendezvous.Nodes {
|
||||
peerID, err := utils.GetPeerID(n)
|
||||
if err != nil {
|
||||
failOnErr(err, "registering rendezvous nodes")
|
||||
}
|
||||
rp = append(rp, peerID)
|
||||
}
|
||||
iter := rendezvous.NewRendezvousPointIterator(rp)
|
||||
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
|
||||
|
||||
wg.Add(1)
|
||||
go func(nodeTopic string) {
|
||||
@ -348,7 +340,7 @@ func Execute(options Options) {
|
||||
return
|
||||
case <-t.C:
|
||||
// Register in rendezvous points periodically
|
||||
wakuNode.Rendezvous().RegisterWithTopic(ctx, nodeTopic, iter.RendezvousPoints())
|
||||
wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints())
|
||||
}
|
||||
}
|
||||
}(nodeTopic)
|
||||
@ -375,7 +367,7 @@ func Execute(options Options) {
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
|
||||
wakuNode.Rendezvous().DiscoverWithTopic(ctx, nodeTopic, rp, peersToFind)
|
||||
wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ import (
|
||||
|
||||
var ErrNoDiscV5Listener = errors.New("no discv5 listener")
|
||||
|
||||
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
|
||||
type PeerConnector interface {
|
||||
Subscribe(context.Context, <-chan v2.PeerData)
|
||||
}
|
||||
|
||||
@ -457,7 +457,7 @@ func (w *WakuNode) Start(ctx context.Context) error {
|
||||
}
|
||||
|
||||
w.rendezvous.SetHost(host)
|
||||
if w.opts.enableRendezvous {
|
||||
if w.opts.enableRendezvousPoint {
|
||||
err := w.rendezvous.Start(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -80,8 +80,8 @@ type WakuNodeParameters struct {
|
||||
enableStore bool
|
||||
messageProvider store.MessageProvider
|
||||
|
||||
enableRendezvous bool
|
||||
rendezvousDB *rendezvous.DB
|
||||
enableRendezvousPoint bool
|
||||
rendezvousDB *rendezvous.DB
|
||||
|
||||
discoveryMinPeers int
|
||||
|
||||
@ -472,7 +472,7 @@ func WithWebsockets(address string, port int) WakuNodeOption {
|
||||
// point, using an specific storage for the peer information
|
||||
func WithRendezvous(db *rendezvous.DB) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enableRendezvous = true
|
||||
params.enableRendezvousPoint = true
|
||||
params.rendezvousDB = db
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ var (
|
||||
ErrInvalidId = errors.New("invalid request id")
|
||||
)
|
||||
|
||||
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
|
||||
type PeerConnector interface {
|
||||
Subscribe(context.Context, <-chan v2.PeerData)
|
||||
}
|
||||
|
||||
@ -6,17 +6,22 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
type RendezvousPointIterator struct {
|
||||
rendezvousPoints []*RendezvousPoint
|
||||
}
|
||||
|
||||
func NewRendezvousPointIterator(rendezvousPoints []peer.ID) *RendezvousPointIterator {
|
||||
// NewRendezvousPointIterator creates an iterator with a backoff mechanism to use random rendezvous points taking into account successful/unsuccesful connection attempts
|
||||
func NewRendezvousPointIterator(rendezvousPoints []multiaddr.Multiaddr) *RendezvousPointIterator {
|
||||
var rendevousPoints []*RendezvousPoint
|
||||
for _, rp := range rendezvousPoints {
|
||||
rendevousPoints = append(rendevousPoints, NewRendezvousPoint(rp))
|
||||
peerID, err := utils.GetPeerID(rp)
|
||||
if err == nil {
|
||||
rendevousPoints = append(rendevousPoints, NewRendezvousPoint(peerID))
|
||||
}
|
||||
}
|
||||
|
||||
return &RendezvousPointIterator{
|
||||
@ -24,10 +29,12 @@ func NewRendezvousPointIterator(rendezvousPoints []peer.ID) *RendezvousPointIter
|
||||
}
|
||||
}
|
||||
|
||||
// RendezvousPoints returns the list of rendezvous points registered in this iterator
|
||||
func (r *RendezvousPointIterator) RendezvousPoints() []*RendezvousPoint {
|
||||
return r.rendezvousPoints
|
||||
}
|
||||
|
||||
// Next will return a channel that will be triggered as soon as the next rendevous point is available to be used (depending on backoff time)
|
||||
func (r *RendezvousPointIterator) Next(ctx context.Context) <-chan *RendezvousPoint {
|
||||
var dialableRP []*RendezvousPoint
|
||||
now := time.Now()
|
||||
|
||||
@ -16,9 +16,14 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// RendezvousID is the current protocol ID used for Rendezvous
|
||||
const RendezvousID = rvs.RendezvousProto
|
||||
|
||||
// RegisterDefaultTTL indicates the TTL used by default when registering a node in a rendezvous point
|
||||
// TODO: Register* functions should allow setting up a custom TTL
|
||||
const RegisterDefaultTTL = rvs.DefaultTTL * time.Second
|
||||
|
||||
// Rendezvous is the implementation containing the logic to registering a node and discovering new peers using rendezvous protocol
|
||||
type Rendezvous struct {
|
||||
host host.Host
|
||||
|
||||
@ -32,10 +37,12 @@ type Rendezvous struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
|
||||
type PeerConnector interface {
|
||||
Subscribe(context.Context, <-chan v2.PeerData)
|
||||
}
|
||||
|
||||
// NewRendezvous creates an instance of Rendezvous struct
|
||||
func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
|
||||
logger := log.Named("rendezvous")
|
||||
return &Rendezvous{
|
||||
@ -73,19 +80,22 @@ func (r *Rendezvous) Start(ctx context.Context) error {
|
||||
const registerBackoff = 200 * time.Millisecond
|
||||
const registerMaxRetries = 7
|
||||
|
||||
// Discover is used to find a number of peers that use the default pubsub topic
|
||||
func (r *Rendezvous) Discover(ctx context.Context, rp *RendezvousPoint, numPeers int) {
|
||||
r.DiscoverWithTopic(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers)
|
||||
r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers)
|
||||
}
|
||||
|
||||
// DiscoverShard is used to find a number of peers that support an specific cluster and shard index
|
||||
func (r *Rendezvous) DiscoverShard(ctx context.Context, rp *RendezvousPoint, cluster uint16, shard uint16, numPeers int) {
|
||||
namespace := ShardToNamespace(cluster, shard)
|
||||
r.DiscoverWithTopic(ctx, namespace, rp, numPeers)
|
||||
r.DiscoverWithNamespace(ctx, namespace, rp, numPeers)
|
||||
}
|
||||
|
||||
func (r *Rendezvous) DiscoverWithTopic(ctx context.Context, topic string, rp *RendezvousPoint, numPeers int) {
|
||||
// DiscoverWithNamespace is uded to find a number of peers using a custom namespace (usually a pubsub topic)
|
||||
func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string, rp *RendezvousPoint, numPeers int) {
|
||||
rendezvousClient := rvs.NewRendezvousClient(r.host, rp.id)
|
||||
|
||||
addrInfo, cookie, err := rendezvousClient.Discover(ctx, topic, numPeers, rp.cookie)
|
||||
addrInfo, cookie, err := rendezvousClient.Discover(ctx, namespace, numPeers, rp.cookie)
|
||||
if err != nil {
|
||||
r.log.Error("could not discover new peers", zap.Error(err))
|
||||
rp.Delay()
|
||||
@ -115,8 +125,8 @@ func (r *Rendezvous) DiscoverWithTopic(ctx context.Context, topic string, rp *Re
|
||||
|
||||
}
|
||||
|
||||
func (r *Rendezvous) callRegister(ctx context.Context, topic string, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) {
|
||||
ttl, err := rendezvousClient.Register(ctx, topic, rvs.DefaultTTL)
|
||||
func (r *Rendezvous) callRegister(ctx context.Context, namespace string, rendezvousClient rvs.RendezvousClient, retries int) (<-chan time.Time, int) {
|
||||
ttl, err := rendezvousClient.Register(ctx, namespace, rvs.DefaultTTL)
|
||||
var t <-chan time.Time
|
||||
if err != nil {
|
||||
r.log.Error("registering rendezvous client", zap.Error(err))
|
||||
@ -130,22 +140,26 @@ func (r *Rendezvous) callRegister(ctx context.Context, topic string, rendezvousC
|
||||
return t, retries
|
||||
}
|
||||
|
||||
// Register registers the node in the rendezvous points using the default pubsub topic as namespace
|
||||
func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*RendezvousPoint) {
|
||||
r.RegisterWithTopic(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints)
|
||||
r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints)
|
||||
}
|
||||
|
||||
// RegisterShard registers the node in the rendezvous points using a shard as namespace
|
||||
func (r *Rendezvous) RegisterShard(ctx context.Context, cluster uint16, shard uint16, rendezvousPoints []*RendezvousPoint) {
|
||||
namespace := ShardToNamespace(cluster, shard)
|
||||
r.RegisterWithTopic(ctx, namespace, rendezvousPoints)
|
||||
r.RegisterWithNamespace(ctx, namespace, rendezvousPoints)
|
||||
}
|
||||
|
||||
// RegisterRelayShards registers the node in the rendezvous point by specifying a RelayShards struct (more than one shard index can be registered)
|
||||
func (r *Rendezvous) RegisterRelayShards(ctx context.Context, rs protocol.RelayShards, rendezvousPoints []*RendezvousPoint) {
|
||||
for _, idx := range rs.Indices {
|
||||
go r.RegisterShard(ctx, rs.Cluster, idx, rendezvousPoints)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Rendezvous) RegisterWithTopic(ctx context.Context, topic string, rendezvousPoints []*RendezvousPoint) {
|
||||
// RegisterWithNamespace registers the node in the rendezvous point by using an specific namespace (usually a pubsub topic)
|
||||
func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string, rendezvousPoints []*RendezvousPoint) {
|
||||
for _, m := range rendezvousPoints {
|
||||
r.wg.Add(1)
|
||||
go func(m *RendezvousPoint) {
|
||||
@ -155,13 +169,13 @@ func (r *Rendezvous) RegisterWithTopic(ctx context.Context, topic string, rendez
|
||||
retries := 0
|
||||
var t <-chan time.Time
|
||||
|
||||
t, retries = r.callRegister(ctx, topic, rendezvousClient, retries)
|
||||
t, retries = r.callRegister(ctx, namespace, rendezvousClient, retries)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t:
|
||||
t, retries = r.callRegister(ctx, topic, rendezvousClient, retries)
|
||||
t, retries = r.callRegister(ctx, namespace, rendezvousClient, retries)
|
||||
if retries >= registerMaxRetries {
|
||||
return
|
||||
}
|
||||
@ -182,6 +196,7 @@ func (r *Rendezvous) Stop() {
|
||||
r.rendezvousSvc = nil
|
||||
}
|
||||
|
||||
// ShardToNamespace translates a cluster and shard index into a rendezvous namespace
|
||||
func ShardToNamespace(cluster uint16, shard uint16) string {
|
||||
return fmt.Sprintf("rs/%d/%d", cluster, shard)
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
|
||||
)
|
||||
|
||||
// RendezvousPoint is a structure that represent a node that can be used to discover new peers
|
||||
type RendezvousPoint struct {
|
||||
sync.RWMutex
|
||||
|
||||
@ -19,6 +20,7 @@ type RendezvousPoint struct {
|
||||
nextTry time.Time
|
||||
}
|
||||
|
||||
// NewRendezvousPoint is used to create a RendezvousPoint
|
||||
func NewRendezvousPoint(peerID peer.ID) *RendezvousPoint {
|
||||
rngSrc := rand.NewSource(rand.Int63())
|
||||
minBackoff, maxBackoff := time.Second*30, time.Hour
|
||||
@ -35,6 +37,7 @@ func NewRendezvousPoint(peerID peer.ID) *RendezvousPoint {
|
||||
return rp
|
||||
}
|
||||
|
||||
// Delay is used to indicate that the connection to a rendezvous point failed
|
||||
func (rp *RendezvousPoint) Delay() {
|
||||
rp.Lock()
|
||||
defer rp.Unlock()
|
||||
@ -42,14 +45,17 @@ func (rp *RendezvousPoint) Delay() {
|
||||
rp.nextTry = time.Now().Add(rp.bkf.Delay())
|
||||
}
|
||||
|
||||
// SetSuccess is used to indicate that a connection to a rendezvous point was succesful
|
||||
func (rp *RendezvousPoint) SetSuccess(cookie []byte) {
|
||||
rp.Lock()
|
||||
defer rp.Unlock()
|
||||
|
||||
rp.bkf.Reset()
|
||||
rp.nextTry = time.Now()
|
||||
rp.cookie = cookie
|
||||
}
|
||||
|
||||
// NextTry returns when can a rendezvous point be used again
|
||||
func (rp *RendezvousPoint) NextTry() time.Time {
|
||||
rp.RLock()
|
||||
defer rp.RUnlock()
|
||||
|
||||
@ -76,7 +76,7 @@ func TestRendezvous(t *testing.T) {
|
||||
rendezvousClient1 := NewRendezvous(nil, nil, utils.Logger())
|
||||
rendezvousClient1.SetHost(host2)
|
||||
|
||||
rendezvousClient1.RegisterWithTopic(context.Background(), testTopic, []*RendezvousPoint{host1RP})
|
||||
rendezvousClient1.RegisterWithNamespace(context.Background(), testTopic, []*RendezvousPoint{host1RP})
|
||||
|
||||
port3, err := tests.FindFreePort(t, "", 5)
|
||||
require.NoError(t, err)
|
||||
@ -95,7 +95,7 @@ func TestRendezvous(t *testing.T) {
|
||||
timedCtx, cancel := context.WithTimeout(ctx, 4*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go rendezvousClient2.DiscoverWithTopic(timedCtx, testTopic, host1RP, 1)
|
||||
go rendezvousClient2.DiscoverWithNamespace(timedCtx, testTopic, host1RP, 1)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
timer := time.After(3 * time.Second)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user