refactor_: remove status-im/rendezvous (#5685)

This commit is contained in:
richΛrd 2024-08-22 13:12:37 -04:00 committed by GitHub
parent e63a7bbbee
commit 8cf4feec1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 14 additions and 1617 deletions

View File

@ -299,7 +299,6 @@ func DefaultNodeConfig(installationID string, request *requests.CreateAccount, o
}
nodeConfig.Name = DefaultNodeName
nodeConfig.Rendezvous = false
nodeConfig.NoDiscovery = true
nodeConfig.MaxPeers = DefaultMaxPeers
nodeConfig.MaxPendingPeers = DefaultMaxPendingPeers

View File

@ -129,7 +129,6 @@ func randomNodeConfig() *params.NodeConfig {
KeyStoreDir: randomString(),
NodeKey: randomString(),
NoDiscovery: randomBool(),
Rendezvous: randomBool(),
ListenAddr: randomString(),
AdvertiseAddr: randomString(),
Name: randomString(),

View File

@ -399,7 +399,6 @@ func defaultNodeConfig(installationID string) (*params.NodeConfig, error) {
}
nodeConfig.Name = "StatusIM"
nodeConfig.Rendezvous = false
clusterConfig, err := params.LoadClusterConfigFromFleet("eth.prod")
if err != nil {
return nil, err

View File

@ -308,7 +308,6 @@ func defaultNodeConfig(installationID string) (*params.NodeConfig, error) {
}
nodeConfig.Name = "StatusIM"
nodeConfig.Rendezvous = false
clusterConfig, err := params.LoadClusterConfigFromFleet("status.prod")
if err != nil {
return nil, err

View File

@ -9,8 +9,6 @@ import (
const (
// EthereumV5 is kademlia-based discovery from go-ethereum repository.
EthereumV5 = "ethv5"
// RendezvousV1 is req/rep based discovery that uses ENR for records.
RendezvousV1 = "ethvousv1"
)
// Discovery is an abstract interface for using different discovery providers.

View File

@ -1,303 +0,0 @@
package discovery
import (
"context"
"crypto/ecdsa"
"errors"
"math/rand"
"net"
"sync"
"time"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/rendezvous"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
)
const (
registrationPeriod = 10 * time.Second
requestTimeout = 5 * time.Second
bucketSize = 10
)
var (
errNodeIsNil = errors.New("node cannot be nil")
errIdentityIsNil = errors.New("identity cannot be nil")
errDiscoveryIsStopped = errors.New("discovery is stopped")
)
func NewRendezvous(servers []ma.Multiaddr, identity *ecdsa.PrivateKey, node *enode.Node) (*Rendezvous, error) {
r := new(Rendezvous)
r.node = node
r.identity = identity
r.servers = servers
r.registrationPeriod = registrationPeriod
r.bucketSize = bucketSize
return r, nil
}
func NewRendezvousWithENR(servers []ma.Multiaddr, record enr.Record) *Rendezvous {
r := new(Rendezvous)
r.servers = servers
r.registrationPeriod = registrationPeriod
r.bucketSize = bucketSize
r.record = &record
return r
}
// Rendezvous is an implementation of discovery interface that uses
// rendezvous client.
type Rendezvous struct {
mu sync.RWMutex
client *rendezvous.Client
// Root context is used to cancel running requests
// when Rendezvous is stopped.
rootCtx context.Context
cancelRootCtx context.CancelFunc
servers []ma.Multiaddr
registrationPeriod time.Duration
bucketSize int
node *enode.Node
identity *ecdsa.PrivateKey
recordMu sync.Mutex
record *enr.Record // record is set directly if rendezvous is used in proxy mode
}
func (r *Rendezvous) Running() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.client != nil
}
// Start creates client with ephemeral identity.
func (r *Rendezvous) Start() error {
r.mu.Lock()
defer r.mu.Unlock()
client, err := rendezvous.NewEphemeral()
if err != nil {
return err
}
r.client = &client
r.rootCtx, r.cancelRootCtx = context.WithCancel(context.Background())
return nil
}
// Stop removes client reference.
func (r *Rendezvous) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.client == nil {
return nil
}
r.cancelRootCtx()
if err := r.client.Close(); err != nil {
return err
}
r.client = nil
return nil
}
func (r *Rendezvous) MakeRecord(srv *ma.Multiaddr) (record enr.Record, err error) {
r.recordMu.Lock()
defer r.recordMu.Unlock()
if r.record != nil {
return *r.record, nil
}
if r.node == nil {
return record, errNodeIsNil
}
if r.identity == nil {
return record, errIdentityIsNil
}
ip := r.node.IP()
if srv != nil && (ip.IsLoopback() || IsPrivate(ip)) { // If AdvertiseAddr is not specified, 127.0.0.1 might be returned
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
defer cancel()
ipAddr, err := r.client.RemoteIp(ctx, *srv)
if err != nil {
log.Error("could not obtain the external ip address", "err", err)
} else {
parsedIP := net.ParseIP(ipAddr)
if parsedIP != nil {
ip = parsedIP
log.Info("node's external IP address", "ipAddr", ipAddr)
} else {
log.Error("invalid ip address obtained from rendezvous server", "ipaddr", ipAddr, "err", err)
}
}
}
record.Set(enr.IP(ip))
record.Set(enr.TCP(r.node.TCP()))
record.Set(enr.UDP(r.node.UDP()))
// public key is added to ENR when ENR is signed
if err := enode.SignV4(&record, r.identity); err != nil {
return record, err
}
r.record = &record
return record, nil
}
func (r *Rendezvous) register(srv ma.Multiaddr, topic string, record enr.Record) error {
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
defer cancel()
r.mu.RLock()
defer r.mu.RUnlock()
if r.client == nil {
return errDiscoveryIsStopped
}
err := r.client.Register(ctx, srv, topic, record, r.registrationPeriod)
if err != nil {
log.Error("error registering", "topic", topic, "rendezvous server", srv, "err", err)
}
return err
}
// Register renews registration in the specified server.
func (r *Rendezvous) Register(topic string, stop chan struct{}) error {
srv := r.getRandomServer()
record, err := r.MakeRecord(&srv)
if err != nil {
return err
}
// sending registration more often than the whole registraton period
// will ensure that it won't be accidentally removed
ticker := time.NewTicker(r.registrationPeriod / 2)
defer ticker.Stop()
if err := r.register(srv, topic, record); err == context.Canceled {
return err
}
for {
select {
case <-stop:
return nil
case <-ticker.C:
if err := r.register(r.getRandomServer(), topic, record); err == context.Canceled {
return err
} else if err == errDiscoveryIsStopped {
return nil
}
}
}
}
func (r *Rendezvous) discoverRequest(srv ma.Multiaddr, topic string) ([]enr.Record, error) {
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
defer cancel()
r.mu.RLock()
defer r.mu.RUnlock()
if r.client == nil {
return nil, errDiscoveryIsStopped
}
return r.client.Discover(ctx, srv, topic, r.bucketSize)
}
// Discover will search for new records every time period fetched from period channel.
func (r *Rendezvous) Discover(
topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool,
) error {
ticker := time.NewTicker(<-period)
for {
select {
case newPeriod, ok := <-period:
ticker.Stop()
if !ok {
return nil
}
ticker = time.NewTicker(newPeriod)
case <-ticker.C:
srv := r.servers[rand.Intn(len(r.servers))] // nolint: gosec
records, err := r.discoverRequest(srv, topic)
if err == context.Canceled {
return err
} else if err == errDiscoveryIsStopped {
return nil
} else if err != nil {
log.Debug("error fetching records", "topic", topic, "rendezvous server", srv, "err", err)
} else {
for i := range records {
n, err := enrToNode(records[i])
if err != nil {
log.Warn("error converting enr record to node", "err", err)
} else {
log.Debug("converted enr to", "ENODE", n.String())
select {
case found <- n:
case newPeriod, ok := <-period:
// closing a period channel is a signal to producer that consumer exited
ticker.Stop()
if !ok {
return nil
}
ticker = time.NewTicker(newPeriod)
}
}
}
}
}
}
}
func (r *Rendezvous) getRandomServer() ma.Multiaddr {
return r.servers[rand.Intn(len(r.servers))] // nolint: gosec
}
func enrToNode(record enr.Record) (*discv5.Node, error) {
var (
key enode.Secp256k1
ip enr.IPv4
tport enr.TCP
uport enr.UDP
nodeID discv5.NodeID
)
if err := record.Load(&key); err != nil {
return nil, err
}
ecdsaKey := ecdsa.PublicKey(key)
nodeID = discv5.PubkeyID(&ecdsaKey)
if err := record.Load(&ip); err != nil {
return nil, err
}
if err := record.Load(&tport); err != nil {
return nil, err
}
// ignore absence of udp port, as it is optional
_ = record.Load(&uport)
return discv5.NewNode(nodeID, net.IP(ip), uint16(uport), uint16(tport)), nil
}
// IsPrivate reports whether ip is a private address, according to
// RFC 1918 (IPv4 addresses) and RFC 4193 (IPv6 addresses).
// Copied/Adapted from https://go-review.googlesource.com/c/go/+/272668/11/src/net/ip.go
// Copyright (c) The Go Authors. All rights reserved.
// @TODO: once Go 1.17 is released in Q42021, remove this function as it will become part of the language
func IsPrivate(ip net.IP) bool {
if ip4 := ip.To4(); ip4 != nil {
// Following RFC 4193, Section 3. Local IPv6 Unicast Addresses which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following three blocks of the IPv4 address space for private internets:
// 10.0.0.0 - 10.255.255.255 (10/8 prefix)
// 172.16.0.0 - 172.31.255.255 (172.16/12 prefix)
// 192.168.0.0 - 192.168.255.255 (192.168/16 prefix)
return ip4[0] == 10 ||
(ip4[0] == 172 && ip4[1]&0xf0 == 16) ||
(ip4[0] == 192 && ip4[1] == 168)
}
// Following RFC 4193, Section 3. Private Address Space which says:
// The Internet Assigned Numbers Authority (IANA) has reserved the
// following block of the IPv6 address space for local internets:
// FC00:: - FDFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF (FC00::/7 prefix)
return len(ip) == net.IPv6len && ip[0]&0xfe == 0xfc
}

View File

@ -1,106 +0,0 @@
package discovery
import (
"net"
"testing"
"time"
lcrypto "github.com/libp2p/go-libp2p/core/crypto"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/rendezvous/server"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
)
func makeTestRendezvousServer(t *testing.T, addr string) *server.Server {
priv, _, err := lcrypto.GenerateKeyPair(lcrypto.Secp256k1, 0)
require.NoError(t, err)
laddr, err := ma.NewMultiaddr(addr)
require.NoError(t, err)
db, err := leveldb.Open(storage.NewMemStorage(), nil)
require.NoError(t, err)
srv := server.NewServer(laddr, priv, server.NewStorage(db))
require.NoError(t, srv.Start())
return srv
}
func TestRendezvousDiscovery(t *testing.T) {
srv := makeTestRendezvousServer(t, "/ip4/127.0.0.1/tcp/7777")
defer srv.Stop()
identity, err := crypto.GenerateKey()
require.NoError(t, err)
node := enode.NewV4(&identity.PublicKey, net.IP{10, 10, 10, 10}, 10, 20)
c, err := NewRendezvous([]ma.Multiaddr{srv.Addr()}, identity, node)
require.NoError(t, err)
require.NoError(t, c.Start())
require.True(t, c.Running())
topic := "test"
stop := make(chan struct{})
go func() { assert.NoError(t, c.Register(topic, stop)) }()
period := make(chan time.Duration, 1)
period <- 100 * time.Millisecond
found := make(chan *discv5.Node, 1)
lookup := make(chan bool)
go func() { assert.NoError(t, c.Discover(topic, period, found, lookup)) }()
select {
case n := <-found:
assert.Equal(t, discv5.PubkeyID(&identity.PublicKey), n.ID)
case <-time.After(10 * time.Second):
assert.Fail(t, "failed waiting to discover a node")
}
close(stop)
close(period)
}
func TestMakeRecordReturnsCachedRecord(t *testing.T) {
identity, err := crypto.GenerateKey()
require.NoError(t, err)
record := enr.Record{}
require.NoError(t, enode.SignV4(&record, identity))
c := NewRendezvousWithENR(nil, record)
rst, err := c.MakeRecord(nil)
require.NoError(t, err)
require.NotNil(t, enode.V4ID{}.NodeAddr(&rst))
require.Equal(t, enode.V4ID{}.NodeAddr(&record), enode.V4ID{}.NodeAddr(&rst))
}
func TestRendezvousRegisterAndDiscoverExitGracefully(t *testing.T) {
r, err := NewRendezvous(make([]ma.Multiaddr, 1), nil, nil)
require.NoError(t, err)
require.NoError(t, r.Start())
require.NoError(t, r.Stop())
require.EqualError(t, errDiscoveryIsStopped, r.register(r.getRandomServer(), "", enr.Record{}).Error())
_, err = r.discoverRequest(nil, "")
require.EqualError(t, errDiscoveryIsStopped, err.Error())
}
func TestStopStoppedNode(t *testing.T) {
r, err := NewRendezvous(make([]ma.Multiaddr, 1), nil, nil)
require.NoError(t, err)
require.NoError(t, r.Stop())
}
func BenchmarkRendezvousStart(b *testing.B) {
identity, err := crypto.GenerateKey()
require.NoError(b, err)
addr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/7777")
require.NoError(b, err)
node := enode.NewV4(&identity.PublicKey, net.IP{10, 10, 10, 10}, 10, 20)
b.ResetTimer()
for n := 0; n < b.N; n++ {
c, err := NewRendezvous([]ma.Multiaddr{addr}, identity, node)
require.NoError(b, err)
require.NoError(b, c.Start())
}
}

2
go.mod
View File

@ -56,7 +56,6 @@ require (
github.com/status-im/markdown v0.0.0-20240404192634-b7e33c6ac3d4
github.com/status-im/migrate/v4 v4.6.2-status.3
github.com/status-im/mvds v0.0.27-0.20240729032523-f6fba962c2b1
github.com/status-im/rendezvous v1.3.8-0.20240110194857-cc5be22bf83e
github.com/status-im/status-go/extkeys v1.1.2
github.com/status-im/tcp-shaker v1.1.1-status
github.com/status-im/zxcvbn-go v0.0.0-20220311183720-5e8676676857
@ -257,7 +256,6 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/status-im/go-multiaddr-ethv4 v1.2.5 // indirect
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.6 // indirect

6
go.sum
View File

@ -1093,8 +1093,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpg
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/gyuho/goraph v0.0.0-20220410190906-ad625acf7ae3 h1:sqdhbHgf04uwTLE03/FdSoaQbSy2z/hmimOAR/3OmcM=
github.com/gyuho/goraph v0.0.0-20220410190906-ad625acf7ae3/go.mod h1:NtSxZCD+s3sZFwbW6WceOcUD83HM9XD5OE2r4c0P8eg=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
@ -2023,8 +2021,6 @@ github.com/status-im/doubleratchet v3.0.0+incompatible h1:aJ1ejcSERpSzmWZBgtfYti
github.com/status-im/doubleratchet v3.0.0+incompatible/go.mod h1:1sqR0+yhiM/bd+wrdX79AOt2csZuJOni0nUDzKNuqOU=
github.com/status-im/go-ethereum v1.10.25-status.16 h1:6CjK8qdlUc/7n42UJ743rf13x/ICSwxrh/NlDGyvmOk=
github.com/status-im/go-ethereum v1.10.25-status.16/go.mod h1:Dt4K5JYMhJRdtXJwBEyGZLZn9iz/chSOZyjVmt5ZhwQ=
github.com/status-im/go-multiaddr-ethv4 v1.2.5 h1:pN+ey6wYKbvNNu5/xq9+VL0N8Yq0pZUTbZp0URg+Yn4=
github.com/status-im/go-multiaddr-ethv4 v1.2.5/go.mod h1:Fhe/18yWU5QwlAYiOO3Bb1BLe0bn5YobcNBHsjRr4kk=
github.com/status-im/go-sqlcipher/v4 v4.5.4-status.2 h1:Oi9JTAI2DZEe5UKlpUcvKBCCSn3ULsLIrix7jPnEoPE=
github.com/status-im/go-sqlcipher/v4 v4.5.4-status.2/go.mod h1:mF2UmIpBnzFeBdu/ypTDb/LdbS0nk0dfSN1WUsWTjMA=
github.com/status-im/gomoji v1.1.3-0.20220213022530-e5ac4a8732d4 h1:CtobZoiNdHpx+xurFxnuJ1xsGm3oKMfcZkB3vmomJmA=
@ -2040,8 +2036,6 @@ github.com/status-im/mvds v0.0.27-0.20240729032523-f6fba962c2b1 h1:4idVpLp9lqLpX
github.com/status-im/mvds v0.0.27-0.20240729032523-f6fba962c2b1/go.mod h1:2fiAx0q9XYIPKYRq2B1oiO9zZESy/n4D32gWw6lMDsE=
github.com/status-im/notify v1.0.2-status h1:x8wev0Sh8H8KAf4bVcv+L0dVHldBESOKUlqRqRY7uL8=
github.com/status-im/notify v1.0.2-status/go.mod h1:gF3zSOrafR9DQEWSE8TjfI9NkooDxbyT4UgRGKZA0lc=
github.com/status-im/rendezvous v1.3.8-0.20240110194857-cc5be22bf83e h1:pCOHeAYmYttXQBCn+6u01bs5d/W3XslxmplFhru4X1Y=
github.com/status-im/rendezvous v1.3.8-0.20240110194857-cc5be22bf83e/go.mod h1:LEPENTHDBGCxXVZx6FEKNKN+tfPaIK+lmiGv1DxkJW4=
github.com/status-im/resize v0.0.0-20201215164250-7c6d9f0d3088 h1:ClCAP2FPCvl8hGMhbUx/tq/sOu2wibztAa5jAvQEe4Q=
github.com/status-im/resize v0.0.0-20201215164250-7c6d9f0d3088/go.mod h1:+92j1tN27DypDeBFxkg0uzkqfh1bNHTZe3Bv2PjvxpM=
github.com/status-im/status-go/extkeys v1.1.2 h1:FSjARgDathJ3rIapJt851LsIXP9Oyuu2M2jPJKuzloU=

View File

@ -10,7 +10,6 @@ import (
"reflect"
"sync"
ma "github.com/multiformats/go-multiaddr"
"github.com/syndtr/goleveldb/leveldb"
"github.com/ethereum/go-ethereum/accounts"
@ -336,7 +335,7 @@ func (n *StatusNode) setupRPCClient() (err error) {
}
func (n *StatusNode) discoveryEnabled() bool {
return n.config != nil && (!n.config.NoDiscovery || n.config.Rendezvous) && n.config.ClusterConfig.Enabled
return n.config != nil && (!n.config.NoDiscovery) && n.config.ClusterConfig.Enabled
}
func (n *StatusNode) discoverNode() (*enode.Node, error) {
@ -361,29 +360,6 @@ func (n *StatusNode) discoverNode() (*enode.Node, error) {
return enode.New(enode.ValidSchemes[r.IdentityScheme()], r)
}
func (n *StatusNode) startRendezvous() (discovery.Discovery, error) {
if !n.config.Rendezvous {
return nil, errors.New("rendezvous is not enabled")
}
if len(n.config.ClusterConfig.RendezvousNodes) == 0 {
return nil, errors.New("rendezvous node must be provided if rendezvous discovery is enabled")
}
maddrs := make([]ma.Multiaddr, len(n.config.ClusterConfig.RendezvousNodes))
for i, addr := range n.config.ClusterConfig.RendezvousNodes {
var err error
maddrs[i], err = ma.NewMultiaddr(addr)
if err != nil {
return nil, fmt.Errorf("failed to parse rendezvous node %s: %v", n.config.ClusterConfig.RendezvousNodes[0], err)
}
}
node, err := n.discoverNode()
if err != nil {
return nil, fmt.Errorf("failed to get a discover node: %v", err)
}
return discovery.NewRendezvous(maddrs, n.gethNode.Server().PrivateKey, node)
}
// StartDiscovery starts the peers discovery protocols depending on the node config.
func (n *StatusNode) StartDiscovery() error {
n.mu.Lock()
@ -408,13 +384,7 @@ func (n *StatusNode) startDiscovery() error {
n.config.ListenAddr,
parseNodesV5(n.config.ClusterConfig.BootNodes)))
}
if n.config.Rendezvous {
d, err := n.startRendezvous()
if err != nil {
return err
}
discoveries = append(discoveries, d)
}
if len(discoveries) == 0 {
return errors.New("wasn't able to register any discovery")
} else if len(discoveries) > 1 {

View File

@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/status-im/status-go/discovery"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
@ -148,77 +147,6 @@ func TestStatusNodeAddPeer(t *testing.T) {
require.Equal(t, 1, n.PeerCount())
}
func TestStatusNodeRendezvousDiscovery(t *testing.T) {
config := params.NodeConfig{
Rendezvous: true,
NoDiscovery: true,
ClusterConfig: params.ClusterConfig{
Enabled: true,
// not necessarily with id, just valid multiaddr
RendezvousNodes: []string{"/ip4/127.0.0.1/tcp/34012", "/ip4/127.0.0.1/tcp/34011"},
},
// use custom address to test the all possibilities
AdvertiseAddr: "127.0.0.1",
}
n, stop1, stop2, err := createStatusNode()
defer func() {
err := stop1()
if err != nil {
n.log.Error("stopping db", err)
}
}()
defer func() {
err := stop2()
if err != nil {
n.log.Error("stopping multiaccount db", err)
}
}()
require.NoError(t, err)
require.NoError(t, n.Start(&config, nil))
require.NotNil(t, n.discovery)
require.True(t, n.discovery.Running())
require.IsType(t, &discovery.Rendezvous{}, n.discovery)
}
func TestStatusNodeStartDiscoveryManual(t *testing.T) {
config := params.NodeConfig{
Rendezvous: true,
NoDiscovery: true,
ClusterConfig: params.ClusterConfig{
Enabled: true,
// not necessarily with id, just valid multiaddr
RendezvousNodes: []string{"/ip4/127.0.0.1/tcp/34012", "/ip4/127.0.0.1/tcp/34011"},
},
// use custom address to test the all possibilities
AdvertiseAddr: "127.0.0.1",
}
n, stop1, stop2, err := createStatusNode()
defer func() {
err := stop1()
if err != nil {
n.log.Error("stopping db", err)
}
}()
defer func() {
err := stop2()
if err != nil {
n.log.Error("stopping multiaccount db", err)
}
}()
require.NoError(t, err)
require.NoError(t, n.StartWithOptions(&config, StartOptions{}))
require.Nil(t, n.discovery)
// start discovery manually
require.NoError(t, n.StartDiscovery())
require.NotNil(t, n.discovery)
require.True(t, n.discovery.Running())
require.IsType(t, &discovery.Rendezvous{}, n.discovery)
}
func TestStatusNodeDiscoverNode(t *testing.T) {
config := params.NodeConfig{
NoDiscovery: true,

View File

@ -323,7 +323,6 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
Host: nodeConfig.WakuV2Config.Host,
Port: nodeConfig.WakuV2Config.Port,
LightClient: nodeConfig.WakuV2Config.LightClient,
Rendezvous: nodeConfig.Rendezvous,
WakuNodes: nodeConfig.ClusterConfig.WakuNodes,
EnableStore: nodeConfig.WakuV2Config.EnableStore,
StoreCapacity: nodeConfig.WakuV2Config.StoreCapacity,

View File

@ -34,7 +34,7 @@ type insertFn func(tx *sql.Tx, c *params.NodeConfig) error
func insertNodeConfig(tx *sql.Tx, c *params.NodeConfig) error {
_, err := tx.Exec(`
INSERT OR REPLACE INTO node_config (
network_id, data_dir, keystore_dir, node_key, no_discovery, rendezvous,
network_id, data_dir, keystore_dir, node_key, no_discovery,
listen_addr, advertise_addr, name, version, api_modules, tls_enabled,
max_peers, max_pending_peers, enable_status_service, enable_ntp_sync,
bridge_enabled, wallet_enabled, local_notifications_enabled,
@ -44,9 +44,9 @@ func insertNodeConfig(tx *sql.Tx, c *params.NodeConfig) error {
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, 'id'
?, ?, ?, ?, ?, 'id'
)`,
c.NetworkID, c.DataDir, c.KeyStoreDir, c.NodeKey, c.NoDiscovery, c.Rendezvous,
c.NetworkID, c.DataDir, c.KeyStoreDir, c.NodeKey, c.NoDiscovery,
c.ListenAddr, c.AdvertiseAddr, c.Name, c.Version, c.APIModules,
c.TLSEnabled, c.MaxPeers, c.MaxPendingPeers,
c.EnableStatusService, true,
@ -313,7 +313,6 @@ func insertClusterConfigNodes(tx *sql.Tx, c *params.NodeConfig) error {
nodeMap[BootNodes] = c.ClusterConfig.BootNodes
nodeMap[TrustedMailServers] = c.ClusterConfig.TrustedMailServers
nodeMap[PushNotificationsServers] = c.ClusterConfig.PushNotificationsServers
nodeMap[RendezvousNodes] = c.ClusterConfig.RendezvousNodes
nodeMap[DiscV5BootstrapNodes] = c.ClusterConfig.DiscV5BootstrapNodes
nodeMap[WakuNodes] = c.ClusterConfig.WakuNodes
@ -441,14 +440,14 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) {
err := tx.QueryRow(`
SELECT
network_id, data_dir, keystore_dir, node_key, no_discovery, rendezvous,
network_id, data_dir, keystore_dir, node_key, no_discovery,
listen_addr, advertise_addr, name, version, api_modules, tls_enabled, max_peers, max_pending_peers,
enable_status_service, bridge_enabled, wallet_enabled, local_notifications_enabled,
browser_enabled, permissions_enabled, mailservers_enabled, swarm_enabled,
mailserver_registry_address, web3provider_enabled, connector_enabled FROM node_config
WHERE synthetic_id = 'id'
`).Scan(
&nodecfg.NetworkID, &nodecfg.DataDir, &nodecfg.KeyStoreDir, &nodecfg.NodeKey, &nodecfg.NoDiscovery, &nodecfg.Rendezvous,
&nodecfg.NetworkID, &nodecfg.DataDir, &nodecfg.KeyStoreDir, &nodecfg.NodeKey, &nodecfg.NoDiscovery,
&nodecfg.ListenAddr, &nodecfg.AdvertiseAddr, &nodecfg.Name, &nodecfg.Version, &nodecfg.APIModules, &nodecfg.TLSEnabled, &nodecfg.MaxPeers, &nodecfg.MaxPendingPeers,
&nodecfg.EnableStatusService, &nodecfg.BridgeConfig.Enabled, &nodecfg.WalletConfig.Enabled, &nodecfg.LocalNotificationsConfig.Enabled,
&nodecfg.BrowsersConfig.Enabled, &nodecfg.PermissionsConfig.Enabled, &nodecfg.MailserversConfig.Enabled, &nodecfg.SwarmConfig.Enabled,
@ -537,7 +536,6 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) {
nodeMap[BootNodes] = &nodecfg.ClusterConfig.BootNodes
nodeMap[TrustedMailServers] = &nodecfg.ClusterConfig.TrustedMailServers
nodeMap[PushNotificationsServers] = &nodecfg.ClusterConfig.PushNotificationsServers
nodeMap[RendezvousNodes] = &nodecfg.ClusterConfig.RendezvousNodes
nodeMap[WakuNodes] = &nodecfg.ClusterConfig.WakuNodes
nodeMap[DiscV5BootstrapNodes] = &nodecfg.ClusterConfig.DiscV5BootstrapNodes
rows, err = tx.Query(`SELECT node, type FROM cluster_nodes WHERE synthetic_id = 'id' ORDER BY node ASC`)

View File

@ -12,10 +12,9 @@ const (
// Cluster defines a list of Ethereum nodes.
type Cluster struct {
StaticNodes []string `json:"staticnodes"`
BootNodes []string `json:"bootnodes"`
MailServers []string `json:"mailservers"` // list of trusted mail servers
RendezvousNodes []string `json:"rendezvousnodes"`
StaticNodes []string `json:"staticnodes"`
BootNodes []string `json:"bootnodes"`
MailServers []string `json:"mailservers"` // list of trusted mail servers
}
type FleetName = string

View File

@ -264,9 +264,6 @@ type ClusterConfig struct {
// PushNotificationsServers is a list of default push notification servers.
PushNotificationsServers []string
// RendezvousNodes is a list rendezvous discovery nodes.
RendezvousNodes []string
// WakuNodes is a list of waku2 multiaddresses
WakuNodes []string
@ -357,10 +354,6 @@ type NodeConfig struct {
// Deprecated: won't be used at all in wakuv2
NoDiscovery bool
// Rendezvous enables discovery protocol.
// Deprecated: won't be used at all in wakuv2
Rendezvous bool
// ListenAddr is an IP address and port of this node (e.g. 127.0.0.1:30303).
ListenAddr string
@ -917,7 +910,7 @@ func NewNodeConfigWithDefaultsAndFiles(
// updatePeerLimits will set default peer limits expectations based on enabled services.
func (c *NodeConfig) updatePeerLimits() {
if c.NoDiscovery && !c.Rendezvous {
if c.NoDiscovery {
return
}
if c.LightEthConfig.Enabled {
@ -1097,10 +1090,6 @@ func (c *NodeConfig) Validate() error {
return fmt.Errorf("PFSEnabled is true, but InstallationID is empty")
}
if len(c.ClusterConfig.RendezvousNodes) == 0 && c.Rendezvous {
return fmt.Errorf("Rendezvous is enabled, but ClusterConfig.RendezvousNodes is empty")
}
return nil
}

View File

@ -219,18 +219,6 @@ func TestNodeConfigValidate(t *testing.T) {
}`,
Error: "NoDiscovery is false, but ClusterConfig.BootNodes is empty",
},
{
Name: "Validate that ClusterConfig.RendezvousNodes is verified to be empty if Rendezvous is disabled",
Config: `{
"NetworkId": 1,
"DataDir": "/some/dir",
"KeyStoreDir": "/some/dir",
"KeycardPairingDataFile": "/some/dir/keycard/pairings.json",
"NoDiscovery": true,
"Rendezvous": true
}`,
Error: "Rendezvous is enabled, but ClusterConfig.RendezvousNodes is empty",
},
{
Name: "Validate that PFSEnabled & InstallationID are checked for validity",
Config: `{

View File

@ -17,8 +17,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/rendezvous/server"
"github.com/status-im/status-go/discovery"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/signal"
@ -27,10 +25,9 @@ import (
type PeerPoolSimulationSuite struct {
suite.Suite
bootnode *p2p.Server
peers []*p2p.Server
discovery []discovery.Discovery
rendezvousServer *server.Server
bootnode *p2p.Server
peers []*p2p.Server
discovery []discovery.Discovery
}
func TestPeerPoolSimulationSuite(t *testing.T) {
@ -91,9 +88,6 @@ func (s *PeerPoolSimulationSuite) TearDown() {
s.peers[i].Stop()
s.NoError(s.discovery[i].Stop())
}
if s.rendezvousServer != nil {
s.rendezvousServer.Stop()
}
}
func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent, etype p2p.PeerEventType) (nodeID enode.ID) {

View File

@ -1 +0,0 @@
vendor/

View File

@ -1,15 +0,0 @@
ETHv4 multiaddr library
=======================
ETHv4 mutliaddr format adds a spec that is compatible with enode definition, for example:
peer with identity 436cc6f674928fdc9a9f7990f2944002b685d1c37f025c1be425185b5b1f0900feaf1ccc2a6130268f9901be4a7d252f37302c8335a2c1a62736e9232691cc3a ip 174.138.105.243 and tcp port 30404 can be defined as enode:
```
enode://436cc6f674928fdc9a9f7990f2944002b685d1c37f025c1be425185b5b1f0900feaf1ccc2a6130268f9901be4a7d252f37302c8335a2c1a62736e9232691cc3a@174.138.105.243:30404
```
or as multiaddr
```
/ip4/174.138.105.243/tcp/30404/ethv4/436cc6f674928fdc9a9f7990f2944002b685d1c37f025c1be425185b5b1f0900feaf1ccc2a6130268f9901be4a7d252f37302c8335a2c1a62736e9232691cc3a
```

View File

@ -1,67 +0,0 @@
package ethv4
import (
"errors"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
mh "github.com/multiformats/go-multihash"
btcec "github.com/btcsuite/btcd/btcec/v2"
)
const (
P_ETHv4 = 0x01EA
)
func init() {
if err := ma.AddProtocol(
ma.Protocol{
Name: "ethv4",
Code: P_ETHv4,
VCode: ma.CodeToVarint(P_ETHv4),
Size: 312,
Path: false,
Transcoder: TranscoderETHv4,
}); err != nil {
panic(err)
}
}
var TranscoderETHv4 = ma.NewTranscoderFromFunctions(ethv4StB, ethv4BtS, func([]byte) error { return nil })
func ethv4StB(s string) ([]byte, error) {
id, err := mh.FromB58String(s)
if err != nil {
return nil, err
}
return id, err
}
func ethv4BtS(b []byte) (string, error) {
id, err := mh.Cast(b)
if err != nil {
return "", err
}
return id.B58String(), err
}
// PeerIDToNodeID casts peer.ID (b58 encoded string) to discover.NodeID
func PeerIDToNodeID(pid string) (n enode.ID, err error) {
nodeid, err := peer.Decode(pid)
if err != nil {
return n, err
}
pubkey, err := nodeid.ExtractPublicKey()
if err != nil {
return n, err
}
seckey, ok := pubkey.(*crypto.Secp256k1PublicKey)
if !ok {
return n, errors.New("public key is not on the secp256k1 curve")
}
return enode.PubkeyToIDV4((*btcec.PublicKey)(seckey).ToECDSA()), nil
}

View File

@ -1 +0,0 @@
vendor/

View File

@ -1,15 +0,0 @@
notifications:
email: false
language: go
go:
- 1.11.x
- 1.12.x
install: true
env:
- GO111MODULE=on
sudo: false
jobs:
include:
- stage: Unit tests
script:
- make test

View File

@ -1,13 +0,0 @@
FROM golang:1.17.0-alpine as builder
RUN apk add --no-cache gcc musl-dev linux-headers
RUN mkdir -p /go/src/github.com/status-im/rendezvous
ADD . /go/src/github.com/status-im/rendezvous
RUN cd /go/src/github.com/status-im/rendezvous && go build -o rendezvous ./cmd/server/
FROM alpine:latest
RUN apk add --no-cache ca-certificates bash
COPY --from=builder /go/src/github.com/status-im/rendezvous/rendezvous /usr/local/bin/rendezvous

View File

@ -1,16 +0,0 @@
test:
go test ./...
image: AUTHOR = $(shell echo $$USER)
image: GIT_COMMIT = $(shell tag=`git describe --exact-match --tag 2>/dev/null`; \
if [ $$? -eq 0 ]; then echo $$tag | sed 's/^v\(.*\)$$/\1/'; \
else git rev-parse --short HEAD; fi)
image:
docker build . \
--label "commit=$(GIT_COMMIT)" \
--label "author=$(AUTHOR)" \
-t statusteam/rendezvous:$(GIT_COMMIT) \
-t statusteam/rendezvous:latest
push:
docker push statusteam/rendezvous:latest

View File

@ -1,48 +0,0 @@
Rendezvous server
=================
In order to build a docker image, run:
```bash
make image
```
Server usage:
```
-a, --address string listener ip address (default "0.0.0.0")
-d, --data string path where ENR infos will be stored. (default "/tmp/rendevouz")
-g, --generate dump private key and exit.
-h, --keyhex string private key hex
-k, --keypath string path to load private key
-p, --port int listener port (default 9090)
-v, --verbosity string verbosity level, options: crit, error, warning, info, debug (default "info")
```
Option `-g` can be used to generate hex of the private key for convenience.
Option `-h` should be used only in tests.
The only mandatory parameter is keypath `-k`, and not mandatory but i suggest to change data path `-d` not to a temporary
directory.
# Differences with original rendezvous
Original rendezvous description by members of libp2p team - [rendezvous](https://github.com/libp2p/specs/pull/56).
We are using current implementation for a similar purposes, but mainly as a light-peer discovery protocol for mobile
devices. Discovery v5 that depends on the kademlia implementation was too slow for mobile and consumed noticeable amount
of traffic to find peers.
Some differences with original implementation:
1. We are using ENR ([Ethereum Node Records](https://eips.ethereum.org/EIPS/eip-778)) for encoding information
about peers. ENR must be signed.
2. We are using RLP instead of protobuf. Mainly for convenience, because ENR already had util for rlp serialization.
3. Smaller liveness TTL for records. At the time of writing liveness TTL is set to be 20s.
This way we want to provide minimal guarantees that peer is online and dialable.
4. ENRs are fetched from storage randomly. And we don't provide a way to fetch "new" records.
It was done as a naive measure against spamming rendezvous servers with invalid records.
And at the same time spread load of new peers between multiple servers.
5. We don't use UNREGISTER request, since we assume that TTL is very low.
Those are mostly implementation details while idea is pretty much the same, but it is important to note that this implementation
is not compatible with one from libp2p team.

View File

@ -1,178 +0,0 @@
package rendezvous
import (
"context"
"crypto/rand"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
libp2p "github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
ethv4 "github.com/status-im/go-multiaddr-ethv4"
"github.com/status-im/rendezvous/protocol"
)
var logger = log.New("package", "rendezvous/client")
func NewEphemeral() (c Client, err error) {
priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Secp256k1, 0, rand.Reader) // bits are ignored with edwards or secp251k1
if err != nil {
return Client{}, err
}
return New(priv)
}
func New(identity crypto.PrivKey) (c Client, err error) {
opts := []libp2p.Option{
libp2p.Identity(identity),
}
h, err := libp2p.New(opts...)
if err != nil {
return c, err
}
return Client{
h: h,
}, nil
}
func NewWithHost(h host.Host) (c Client, err error) {
return Client{
h: h,
}, nil
}
type Client struct {
h host.Host
}
func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, record enr.Record, ttl time.Duration) error {
s, err := c.newStream(ctx, srv)
if err != nil {
return err
}
defer s.Close()
if err = rlp.Encode(s, protocol.REGISTER); err != nil {
return err
}
if err = rlp.Encode(s, protocol.Register{Topic: topic, Record: record, TTL: uint64(ttl)}); err != nil {
return err
}
rs := rlp.NewStream(s, 0)
typ, err := rs.Uint()
if err != nil {
return err
}
if protocol.MessageType(typ) != protocol.REGISTER_RESPONSE {
return fmt.Errorf("expected %v as response, but got %v", protocol.REGISTER_RESPONSE, typ)
}
var val protocol.RegisterResponse
if err = rs.Decode(&val); err != nil {
return err
}
logger.Debug("received response to register", "status", val.Status, "message", val.Message)
if val.Status != protocol.OK {
return fmt.Errorf("register failed. status code %v", val.Status)
}
return nil
}
func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, limit int) (rst []enr.Record, err error) {
s, err := c.newStream(ctx, srv)
if err != nil {
return
}
defer s.Close()
if err = rlp.Encode(s, protocol.DISCOVER); err != nil {
return
}
if err = rlp.Encode(s, protocol.Discover{Topic: topic, Limit: uint(limit)}); err != nil {
return
}
rs := rlp.NewStream(s, 0)
typ, err := rs.Uint()
if err != nil {
return nil, err
}
if protocol.MessageType(typ) != protocol.DISCOVER_RESPONSE {
return nil, fmt.Errorf("expected %v as response, but got %v", protocol.REGISTER_RESPONSE, typ)
}
var val protocol.DiscoverResponse
if err = rs.Decode(&val); err != nil {
return
}
if val.Status != protocol.OK {
return nil, fmt.Errorf("discover request failed. status code %v", val.Status)
}
logger.Debug("received response to discover request", "status", val.Status, "records lth", len(val.Records))
return val.Records, nil
}
func (c Client) RemoteIp(ctx context.Context, srv ma.Multiaddr) (value string, err error) {
s, err := c.newStream(ctx, srv)
if err != nil {
return
}
defer s.Close()
if err = rlp.Encode(s, protocol.REMOTEIP); err != nil {
return
}
rs := rlp.NewStream(s, 0)
typ, err := rs.Uint()
if err != nil {
return
}
if protocol.MessageType(typ) != protocol.REMOTEIP_RESPONSE {
err = fmt.Errorf("expected %v as response, but got %v", protocol.REMOTEIP_RESPONSE, typ)
return
}
var val protocol.RemoteIpResponse
if err = rs.Decode(&val); err != nil {
return
}
if val.Status != protocol.OK {
err = fmt.Errorf("remoteip request failed. status code %v", val.Status)
return
}
logger.Debug("received response to remoteip request", "status", val.Status, "ip", val.IP)
value = val.IP
return
}
func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw network.Stream, err error) {
pid, err := srv.ValueForProtocol(ethv4.P_ETHv4)
if err != nil {
return
}
peerid, err := peer.Decode(pid)
if err != nil {
return
}
// TODO there must be a better interface
targetPeerAddr, err := ma.NewMultiaddr(fmt.Sprintf("/ethv4/%s", pid))
if err != nil {
return
}
targetAddr := srv.Decapsulate(targetPeerAddr)
c.h.Peerstore().AddAddr(peerid, targetAddr, 5*time.Second)
s, err := c.h.NewStream(ctx, peerid, "/rend/0.1.0")
if err != nil {
return nil, err
}
return &InstrumentedStream{s}, nil
}
// Close shutdowns the host and all open connections.
func (c Client) Close() error {
return c.h.Close()
}

View File

@ -1,58 +0,0 @@
package protocol
import (
"github.com/ethereum/go-ethereum/p2p/enr"
)
type ResponseStatus uint
type MessageType uint64
const (
VERSION = "/rend/0.1.0"
REGISTER MessageType = iota
REGISTER_RESPONSE
DISCOVER
DISCOVER_RESPONSE
REMOTEIP
REMOTEIP_RESPONSE
OK ResponseStatus = 0
E_INVALID_NAMESPACE ResponseStatus = 100
E_INVALID_ENR ResponseStatus = 101
E_INVALID_TTL ResponseStatus = 102
E_INVALID_LIMIT ResponseStatus = 103
E_INVALID_CONTENT ResponseStatus = 104
E_NOT_AUTHORIZED ResponseStatus = 200
E_INTERNAL_ERROR ResponseStatus = 300
)
type Register struct {
Topic string
Record enr.Record
TTL uint64
}
type RegisterResponse struct {
Status ResponseStatus
Message string
}
type Discover struct {
Limit uint
Topic string
}
type DiscoverResponse struct {
Status ResponseStatus
Message string
Records []enr.Record
}
type RemoteIp struct {
}
type RemoteIpResponse struct {
Status ResponseStatus
IP string
}

View File

@ -1,104 +0,0 @@
package server
import (
"container/heap"
"sync"
"time"
)
type deadline struct {
time time.Time
}
// definitely rename
// Rewrite cleaner to operate on a leveldb directly
// if it is impossible to query on topic+timestamp(big endian) for purging
// store an additional key
func NewCleaner() *Cleaner {
return &Cleaner{
heap: []string{},
deadlines: map[string]deadline{},
}
}
type Cleaner struct {
mu sync.RWMutex
heap []string
deadlines map[string]deadline
}
func (c *Cleaner) Id(index int) string {
return c.heap[index]
}
func (c *Cleaner) Len() int {
return len(c.heap)
}
func (c *Cleaner) Less(i, j int) bool {
return c.deadlines[c.Id(i)].time.Before(c.deadlines[c.Id(j)].time)
}
func (c *Cleaner) Swap(i, j int) {
c.heap[i], c.heap[j] = c.heap[j], c.heap[i]
}
func (c *Cleaner) Push(record interface{}) {
c.heap = append(c.heap, record.(string))
}
func (c *Cleaner) Pop() interface{} {
old := c.heap
n := len(old)
x := old[n-1]
c.heap = append([]string{}, old[0:n-1]...)
_, exist := c.deadlines[x]
if !exist {
return x
}
delete(c.deadlines, x)
return x
}
func (c *Cleaner) Add(deadlineTime time.Time, key string) {
c.mu.Lock()
defer c.mu.Unlock()
dl, exist := c.deadlines[key]
if !exist {
dl = deadline{time: deadlineTime}
} else {
dl.time = deadlineTime
for i, n := range c.heap {
if n == key {
heap.Remove(c, i)
break
}
}
}
c.deadlines[key] = dl
heap.Push(c, key)
}
func (c *Cleaner) Exist(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
_, exist := c.deadlines[key]
return exist
}
func (c *Cleaner) PopSince(now time.Time) (rst []string) {
c.mu.Lock()
defer c.mu.Unlock()
for len(c.heap) != 0 {
dl, exist := c.deadlines[c.heap[0]]
if !exist {
continue
}
if now.After(dl.time) {
rst = append(rst, heap.Pop(c).(string))
} else {
return rst
}
}
return rst
}

View File

@ -1,29 +0,0 @@
package server
var (
metrics MetricsInterface = noopMetrics{}
)
func UseMetrics(m MetricsInterface) {
metrics = m
}
type MetricsInterface interface {
AddActiveRegistration(...string)
RemoveActiveRegistration(...string)
ObserveDiscoverSize(float64, ...string)
ObserveDiscoveryDuration(float64, ...string)
CountError(...string)
}
type noopMetrics struct{}
func (n noopMetrics) AddActiveRegistration(lvs ...string) {}
func (n noopMetrics) RemoveActiveRegistration(lvs ...string) {}
func (n noopMetrics) ObserveDiscoverSize(o float64, lvs ...string) {}
func (n noopMetrics) ObserveDiscoveryDuration(o float64, lvs ...string) {}
func (n noopMetrics) CountError(lvs ...string) {}

View File

@ -1,274 +0,0 @@
package server
import (
"bytes"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/rendezvous/protocol"
)
var logger = log.New("package", "rendezvous/server")
const (
longestTTL = 20 * time.Second
networkDelay = 500 * time.Millisecond
cleanerPeriod = 2 * time.Second
maxLimit uint = 10
maxTopicLength = 50
)
// NewServer creates instance of the server.
func NewServer(laddr ma.Multiaddr, identity crypto.PrivKey, s Storage) *Server {
srv := Server{
laddr: laddr,
identity: identity,
storage: s,
cleaner: NewCleaner(),
writeTimeout: 10 * time.Second,
readTimeout: 10 * time.Second,
cleanerPeriod: cleanerPeriod,
networkDelay: networkDelay,
}
return &srv
}
// Server provides rendezbous service over libp2p stream.
type Server struct {
laddr ma.Multiaddr
identity crypto.PrivKey
writeTimeout time.Duration
readTimeout time.Duration
storage Storage
cleaner *Cleaner
cleanerPeriod time.Duration
networkDelay time.Duration
h host.Host
addr ma.Multiaddr
wg sync.WaitGroup
quit chan struct{}
}
// Addr returns full server multiaddr (identity included).
func (srv *Server) Addr() ma.Multiaddr {
return srv.addr
}
// Start creates listener.
func (srv *Server) Start() error {
if err := srv.startListener(); err != nil {
return err
}
if err := srv.startCleaner(); err != nil {
return err
}
// once server is restarted all cleaner info is lost. so we need to rebuild it
return srv.storage.IterateAllKeys(func(key RecordsKey, ttl time.Time) error {
if !srv.cleaner.Exist(key.String()) {
topic := TopicPart(key)
log.Debug("active registration with", "topic", string(topic))
metrics.AddActiveRegistration(string(topic))
}
srv.cleaner.Add(ttl, key.String())
return nil
})
}
func (srv *Server) startCleaner() error {
srv.quit = make(chan struct{})
srv.wg.Add(1)
go func() {
for {
select {
case <-time.After(srv.cleanerPeriod):
srv.purgeOutdated()
case <-srv.quit:
srv.wg.Done()
return
}
}
}()
return nil
}
func (srv *Server) startListener() error {
opts := []libp2p.Option{
libp2p.ListenAddrStrings(srv.laddr.String()),
libp2p.Identity(srv.identity),
}
h, err := libp2p.New(opts...)
if err != nil {
return err
}
srv.h = h
srv.h.SetStreamHandler(protocol.VERSION, func(s network.Stream) {
defer s.Close()
for {
rs := rlp.NewStream(s, 0)
s.SetReadDeadline(time.Now().Add(srv.readTimeout))
typ, err := rs.Uint()
if err == io.EOF {
return
}
if err != nil {
logger.Debug("error reading message type", "error", err)
s.Reset()
return
}
s.SetReadDeadline(time.Now().Add(srv.readTimeout))
resptype, resp, err := srv.msgParser(s, protocol.MessageType(typ), rs)
if err == io.EOF {
return
}
if err != nil {
logger.Debug("error parsing message", "error", err)
s.Reset()
return
}
s.SetWriteDeadline(time.Now().Add(srv.writeTimeout))
if err = rlp.Encode(s, resptype); err != nil {
logger.Debug("error writing response", "type", resptype, "error", err)
s.Reset()
return
}
s.SetWriteDeadline(time.Now().Add(srv.writeTimeout))
if err = rlp.Encode(s, resp); err != nil {
logger.Debug("error encoding response", "resp", resp, "error", err)
s.Reset()
}
}
})
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ethv4/%s", h.ID()))
if err != nil {
return err
}
srv.addr = srv.laddr.Encapsulate(addr)
logger.Info("server started", "address", srv.laddr.Encapsulate(addr))
return nil
}
// Stop closes listener and waits till all helper goroutines are stopped.
func (srv *Server) Stop() {
if srv.quit == nil {
return
}
select {
case <-srv.quit:
return
default:
}
close(srv.quit)
srv.wg.Wait()
if srv.h != nil {
srv.h.Close()
}
}
func (srv *Server) purgeOutdated() {
keys := srv.cleaner.PopSince(time.Now())
log.Info("removed records from cleaner", "deadlines", len(srv.cleaner.deadlines), "heap", len(srv.cleaner.heap), "lth", len(keys))
for _, key := range keys {
topic := TopicPart([]byte(key))
log.Debug("Removing record with", "topic", string(topic))
metrics.RemoveActiveRegistration(string(topic))
if err := srv.storage.RemoveByKey(key); err != nil {
logger.Error("error removing key from storage", "key", key, "error", err)
}
}
}
// Decoder is a decoder!
type Decoder interface {
Decode(val interface{}) error
}
func (srv *Server) msgParser(s network.Stream, typ protocol.MessageType, d Decoder) (resptype protocol.MessageType, resp interface{}, err error) {
switch typ {
case protocol.REGISTER:
var msg protocol.Register
resptype = protocol.REGISTER_RESPONSE
if err = d.Decode(&msg); err != nil {
metrics.CountError("register")
return resptype, protocol.RegisterResponse{Status: protocol.E_INVALID_CONTENT}, nil
}
resp, err = srv.register(msg)
return resptype, resp, err
case protocol.DISCOVER:
var msg protocol.Discover
resptype = protocol.DISCOVER_RESPONSE
if err = d.Decode(&msg); err != nil {
metrics.CountError("discover")
return resptype, protocol.DiscoverResponse{Status: protocol.E_INVALID_CONTENT}, nil
}
limit := msg.Limit
if msg.Limit > maxLimit {
limit = maxLimit
}
start := time.Now()
records, err := srv.storage.GetRandom(msg.Topic, limit)
if err != nil {
metrics.CountError("discover")
return resptype, protocol.DiscoverResponse{Status: protocol.E_INTERNAL_ERROR}, err
}
metrics.ObserveDiscoveryDuration(time.Since(start).Seconds(), msg.Topic)
metrics.ObserveDiscoverSize(float64(len(records)), msg.Topic)
return resptype, protocol.DiscoverResponse{Status: protocol.OK, Records: records}, nil
case protocol.REMOTEIP:
resptype = protocol.REMOTEIP_RESPONSE
ip, err := s.Conn().RemoteMultiaddr().ValueForProtocol(multiaddr.P_IP4)
if err != nil {
metrics.CountError("remoteip")
return resptype, protocol.RemoteIpResponse{Status: protocol.E_INTERNAL_ERROR}, err
}
return resptype, protocol.RemoteIpResponse{Status: protocol.OK, IP: ip}, nil
default:
metrics.CountError("unknown")
// don't send the response
return 0, nil, errors.New("unknown request type")
}
}
func (srv *Server) register(msg protocol.Register) (protocol.RegisterResponse, error) {
if len(msg.Topic) == 0 || len(msg.Topic) > maxTopicLength {
return protocol.RegisterResponse{Status: protocol.E_INVALID_NAMESPACE}, nil
}
if time.Duration(msg.TTL) > longestTTL {
return protocol.RegisterResponse{Status: protocol.E_INVALID_TTL}, nil
}
if bytes.IndexByte([]byte(msg.Topic), TopicBodyDelimiter) != -1 {
return protocol.RegisterResponse{Status: protocol.E_INVALID_NAMESPACE}, nil
}
if err := msg.Record.VerifySignature(enode.ValidSchemes); err != nil {
logger.Error("error verify signature message", "error", err)
return protocol.RegisterResponse{Status: protocol.E_INVALID_ENR}, nil
}
deadline := time.Now().Add(time.Duration(msg.TTL)).Add(srv.networkDelay)
key, err := srv.storage.Add(msg.Topic, msg.Record, deadline)
if err != nil {
return protocol.RegisterResponse{Status: protocol.E_INTERNAL_ERROR}, err
}
if !srv.cleaner.Exist(key) {
log.Debug("active registration with", "topic", msg.Topic)
metrics.AddActiveRegistration(msg.Topic)
}
log.Debug("updating record in the cleaner", "deadline", deadline, "topic", msg.Topic)
srv.cleaner.Add(deadline, key)
return protocol.RegisterResponse{Status: protocol.OK}, nil
}

View File

@ -1,134 +0,0 @@
package server
import (
"bytes"
"crypto/rand"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/rlp"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/ethereum/go-ethereum/p2p/enode"
)
const (
RecordsPrefix byte = 1 + iota
TopicBodyDelimiter = 0xff
)
type StorageRecord struct {
ENR enr.Record
Time time.Time
}
// TopicPart looks for TopicBodyDelimiter and returns topic prefix from the same key.
// It doesn't allocate memory for topic prefix.
func TopicPart(key []byte) []byte {
idx := bytes.IndexByte(key, TopicBodyDelimiter)
if idx == -1 {
return nil
}
return key[1:idx] // first byte is RecordsPrefix
}
type RecordsKey []byte
func NewRecordsKey(topic string, record enr.Record) RecordsKey {
key := make(RecordsKey, 2+len([]byte(topic))+len(enode.ValidSchemes.NodeAddr(&record)))
key[0] = RecordsPrefix
copy(key[1:], []byte(topic))
key[1+len([]byte(topic))] = TopicBodyDelimiter
copy(key[2+len([]byte(topic)):], enode.ValidSchemes.NodeAddr(&record))
return key
}
func (k RecordsKey) SamePrefix(prefix []byte) bool {
return bytes.Equal(k[:len(prefix)], prefix)
}
func (k RecordsKey) String() string {
return string(k)
}
// NewStorage creates instance of the storage.
func NewStorage(db *leveldb.DB) Storage {
return Storage{db: db}
}
// Storage manages records.
type Storage struct {
db *leveldb.DB
}
// Add stores record using specified topic.
func (s Storage) Add(topic string, record enr.Record, t time.Time) (string, error) {
key := NewRecordsKey(topic, record)
stored := StorageRecord{
ENR: record,
Time: t,
}
data, err := rlp.EncodeToBytes(stored)
if err != nil {
return "", err
}
return key.String(), s.db.Put(key, data, nil)
}
// RemoveBykey removes record from storage.
func (s *Storage) RemoveByKey(key string) error {
return s.db.Delete([]byte(key), nil)
}
func (s *Storage) IterateAllKeys(iterator func(key RecordsKey, ttl time.Time) error) error {
iter := s.db.NewIterator(util.BytesPrefix([]byte{RecordsPrefix}), nil)
defer iter.Release()
for iter.Next() {
var stored StorageRecord
if err := rlp.DecodeBytes(iter.Value(), &stored); err != nil {
return err
}
if err := iterator(RecordsKey(iter.Key()), stored.Time); err != nil {
return err
}
}
return nil
}
// GetRandom reads random records for specified topic up to specified limit.
func (s *Storage) GetRandom(topic string, limit uint) (rst []enr.Record, err error) {
prefixlen := 1 + len([]byte(topic))
key := make(RecordsKey, prefixlen+32)
key[0] = RecordsPrefix
copy(key[1:], []byte(topic))
key[prefixlen] = TopicBodyDelimiter
prefixlen++
iter := s.db.NewIterator(util.BytesPrefix(key[:prefixlen]), nil)
defer iter.Release()
uids := map[string]struct{}{}
// it might be too much cause we do crypto/rand.Read. requires profiling
for i := uint(0); i < limit*limit && len(rst) < int(limit); i++ {
if _, err := rand.Read(key[prefixlen:]); err != nil {
return nil, err
}
iter.Seek(key)
for _, f := range []func() bool{iter.Prev, iter.Next} {
if f() && key.SamePrefix(iter.Key()[:prefixlen]) {
var stored StorageRecord
if err = rlp.DecodeBytes(iter.Value(), &stored); err != nil {
return nil, err
}
k := iter.Key()
if _, exist := uids[string(k)]; exist {
continue
}
uids[string(k)] = struct{}{}
rst = append(rst, stored.ENR)
break
}
}
}
return rst, nil
}

View File

@ -1,83 +0,0 @@
package rendezvous
import (
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)
var (
ingressTrafficMeter = metrics.NewRegisteredMeter("rendezvous/InboundTraffic", nil)
egressTrafficMeter = metrics.NewRegisteredMeter("rendezvous/OutboundTraffic", nil)
)
// InstrumentedStream implements read writer interface and collects metrics.
type InstrumentedStream struct {
s network.Stream
}
func (si InstrumentedStream) CloseWrite() error {
return si.s.CloseWrite()
}
func (si InstrumentedStream) CloseRead() error {
return si.s.CloseRead()
}
func (si InstrumentedStream) ID() string {
return si.s.ID()
}
func (si InstrumentedStream) Write(p []byte) (int, error) {
n, err := si.s.Write(p)
egressTrafficMeter.Mark(int64(n))
return n, err
}
func (si InstrumentedStream) Read(p []byte) (int, error) {
n, err := si.s.Read(p)
ingressTrafficMeter.Mark(int64(n))
return n, err
}
func (si InstrumentedStream) Close() error {
return si.s.Close()
}
func (si InstrumentedStream) Reset() error {
return si.s.Reset()
}
func (si InstrumentedStream) SetDeadline(timeout time.Time) error {
return si.s.SetDeadline(timeout)
}
func (si InstrumentedStream) SetReadDeadline(timeout time.Time) error {
return si.s.SetReadDeadline(timeout)
}
func (si InstrumentedStream) SetWriteDeadline(timeout time.Time) error {
return si.s.SetWriteDeadline(timeout)
}
func (si InstrumentedStream) Protocol() protocol.ID {
return si.s.Protocol()
}
func (si InstrumentedStream) SetProtocol(pid protocol.ID) error {
return si.s.SetProtocol(pid)
}
func (si InstrumentedStream) Conn() network.Conn {
return si.s.Conn()
}
func (si InstrumentedStream) Stat() network.Stats {
return si.s.Stat()
}
func (si InstrumentedStream) Scope() network.StreamScope {
return si.s.Scope()
}

8
vendor/modules.txt vendored
View File

@ -916,9 +916,6 @@ github.com/spaolacci/murmur3
# github.com/status-im/doubleratchet v3.0.0+incompatible
## explicit
github.com/status-im/doubleratchet
# github.com/status-im/go-multiaddr-ethv4 v1.2.5
## explicit; go 1.19
github.com/status-im/go-multiaddr-ethv4
# github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969
## explicit
github.com/status-im/keycard-go/derivationpath
@ -947,11 +944,6 @@ github.com/status-im/mvds/state/migrations
github.com/status-im/mvds/store
github.com/status-im/mvds/store/migrations
github.com/status-im/mvds/transport
# github.com/status-im/rendezvous v1.3.8-0.20240110194857-cc5be22bf83e
## explicit; go 1.18
github.com/status-im/rendezvous
github.com/status-im/rendezvous/protocol
github.com/status-im/rendezvous/server
# github.com/status-im/status-go/extkeys v1.1.2
## explicit; go 1.13
github.com/status-im/status-go/extkeys

View File

@ -46,7 +46,6 @@ type Config struct {
MinPeersForFilter int `toml:",omitempty"` // Indicates the minimum number of peers required for using Filter Protocol
LightClient bool `toml:",omitempty"` // Indicates if the node is a light client
WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"`
DiscV5BootstrapNodes []string `toml:",omitempty"`
Nameserver string `toml:",omitempty"` // Optional nameserver to use for dns discovery
Resolver ethdisc.Resolver `toml:",omitempty"` // Optional resolver to use for dns discovery