mirror of
https://github.com/status-im/status-go.git
synced 2025-01-27 06:56:29 +00:00
442a12e996
It was going to be used as proxy from discv5 to rendezvous for les nodes. We are not using it at the moment, and not clear when we will be using it and if we are going to use it all. Additionally it has some memory leaks that need to be fixed, so it is better to remove it for now and restore once/if we will need it again.
256 lines
6.2 KiB
Go
256 lines
6.2 KiB
Go
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"math/rand"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/p2p/discv5"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
"github.com/status-im/rendezvous"
|
|
)
|
|
|
|
const (
|
|
registrationPeriod = 10 * time.Second
|
|
requestTimeout = 5 * time.Second
|
|
bucketSize = 10
|
|
)
|
|
|
|
var (
|
|
errNodeIsNil = errors.New("node cannot be nil")
|
|
errIdentityIsNil = errors.New("identity cannot be nil")
|
|
errDiscoveryIsStopped = errors.New("discovery is stopped")
|
|
)
|
|
|
|
func NewRendezvous(servers []ma.Multiaddr, identity *ecdsa.PrivateKey, node *enode.Node) (*Rendezvous, error) {
|
|
r := new(Rendezvous)
|
|
r.node = node
|
|
r.identity = identity
|
|
r.servers = servers
|
|
r.registrationPeriod = registrationPeriod
|
|
r.bucketSize = bucketSize
|
|
return r, nil
|
|
}
|
|
|
|
func NewRendezvousWithENR(servers []ma.Multiaddr, record enr.Record) *Rendezvous {
|
|
r := new(Rendezvous)
|
|
r.servers = servers
|
|
r.registrationPeriod = registrationPeriod
|
|
r.bucketSize = bucketSize
|
|
r.record = &record
|
|
return r
|
|
}
|
|
|
|
// Rendezvous is an implementation of discovery interface that uses
|
|
// rendezvous client.
|
|
type Rendezvous struct {
|
|
mu sync.RWMutex
|
|
client *rendezvous.Client
|
|
|
|
// Root context is used to cancel running requests
|
|
// when Rendezvous is stopped.
|
|
rootCtx context.Context
|
|
cancelRootCtx context.CancelFunc
|
|
|
|
servers []ma.Multiaddr
|
|
registrationPeriod time.Duration
|
|
bucketSize int
|
|
node *enode.Node
|
|
identity *ecdsa.PrivateKey
|
|
|
|
recordMu sync.Mutex
|
|
record *enr.Record // record is set directly if rendezvous is used in proxy mode
|
|
}
|
|
|
|
func (r *Rendezvous) Running() bool {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
return r.client != nil
|
|
}
|
|
|
|
// Start creates client with ephemeral identity.
|
|
func (r *Rendezvous) Start() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
client, err := rendezvous.NewEphemeral()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.client = &client
|
|
r.rootCtx, r.cancelRootCtx = context.WithCancel(context.Background())
|
|
return nil
|
|
}
|
|
|
|
// Stop removes client reference.
|
|
func (r *Rendezvous) Stop() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
if r.client == nil {
|
|
return nil
|
|
}
|
|
r.cancelRootCtx()
|
|
if err := r.client.Close(); err != nil {
|
|
return err
|
|
}
|
|
r.client = nil
|
|
return nil
|
|
}
|
|
|
|
func (r *Rendezvous) MakeRecord() (record enr.Record, err error) {
|
|
r.recordMu.Lock()
|
|
defer r.recordMu.Unlock()
|
|
if r.record != nil {
|
|
return *r.record, nil
|
|
}
|
|
if r.node == nil {
|
|
return record, errNodeIsNil
|
|
}
|
|
if r.identity == nil {
|
|
return record, errIdentityIsNil
|
|
}
|
|
record.Set(enr.IP(r.node.IP()))
|
|
record.Set(enr.TCP(r.node.TCP()))
|
|
record.Set(enr.UDP(r.node.UDP()))
|
|
// public key is added to ENR when ENR is signed
|
|
if err := enode.SignV4(&record, r.identity); err != nil {
|
|
return record, err
|
|
}
|
|
r.record = &record
|
|
return record, nil
|
|
}
|
|
|
|
func (r *Rendezvous) register(topic string, record enr.Record) error {
|
|
srv := r.servers[rand.Intn(len(r.servers))]
|
|
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
|
|
defer cancel()
|
|
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.client == nil {
|
|
return errDiscoveryIsStopped
|
|
}
|
|
err := r.client.Register(ctx, srv, topic, record, r.registrationPeriod)
|
|
if err != nil {
|
|
log.Error("error registering", "topic", topic, "rendezvous server", srv, "err", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Register renews registration in the specified server.
|
|
func (r *Rendezvous) Register(topic string, stop chan struct{}) error {
|
|
record, err := r.MakeRecord()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// sending registration more often than the whole registraton period
|
|
// will ensure that it won't be accidentally removed
|
|
ticker := time.NewTicker(r.registrationPeriod / 2)
|
|
defer ticker.Stop()
|
|
|
|
if err := r.register(topic, record); err == context.Canceled {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return nil
|
|
case <-ticker.C:
|
|
if err := r.register(topic, record); err == context.Canceled {
|
|
return err
|
|
} else if err == errDiscoveryIsStopped {
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Rendezvous) discoverRequest(srv ma.Multiaddr, topic string) ([]enr.Record, error) {
|
|
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
|
|
defer cancel()
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.client == nil {
|
|
return nil, errDiscoveryIsStopped
|
|
}
|
|
return r.client.Discover(ctx, srv, topic, r.bucketSize)
|
|
}
|
|
|
|
// Discover will search for new records every time period fetched from period channel.
|
|
func (r *Rendezvous) Discover(
|
|
topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool,
|
|
) error {
|
|
ticker := time.NewTicker(<-period)
|
|
for {
|
|
select {
|
|
case newPeriod, ok := <-period:
|
|
ticker.Stop()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
ticker = time.NewTicker(newPeriod)
|
|
case <-ticker.C:
|
|
srv := r.servers[rand.Intn(len(r.servers))]
|
|
records, err := r.discoverRequest(srv, topic)
|
|
if err == context.Canceled {
|
|
return err
|
|
} else if err == errDiscoveryIsStopped {
|
|
return nil
|
|
} else if err != nil {
|
|
log.Debug("error fetching records", "topic", topic, "rendezvous server", srv, "err", err)
|
|
} else {
|
|
for i := range records {
|
|
n, err := enrToNode(records[i])
|
|
log.Debug("converted enr to", "ENODE", n.String())
|
|
if err != nil {
|
|
log.Warn("error converting enr record to node", "err", err)
|
|
|
|
} else {
|
|
select {
|
|
case found <- n:
|
|
case newPeriod, ok := <-period:
|
|
// closing a period channel is a signal to producer that consumer exited
|
|
ticker.Stop()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
ticker = time.NewTicker(newPeriod)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func enrToNode(record enr.Record) (*discv5.Node, error) {
|
|
var (
|
|
key enode.Secp256k1
|
|
ip enr.IP
|
|
tport enr.TCP
|
|
uport enr.UDP
|
|
nodeID discv5.NodeID
|
|
)
|
|
if err := record.Load(&key); err != nil {
|
|
return nil, err
|
|
}
|
|
ecdsaKey := ecdsa.PublicKey(key)
|
|
nodeID = discv5.PubkeyID(&ecdsaKey)
|
|
if err := record.Load(&ip); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := record.Load(&tport); err != nil {
|
|
return nil, err
|
|
}
|
|
// ignore absence of udp port, as it is optional
|
|
_ = record.Load(&uport)
|
|
return discv5.NewNode(nodeID, net.IP(ip), uint16(uport), uint16(tport)), nil
|
|
}
|