Proxy records found in ethereum discovery v5 (#1165)
This commit is contained in:
parent
d8b5ba7fbf
commit
6858662d59
15
Makefile
15
Makefile
|
@ -41,6 +41,7 @@ gotest_extraflags =
|
|||
|
||||
DOCKER_IMAGE_NAME ?= statusteam/status-go
|
||||
BOOTNODE_IMAGE_NAME ?= statusteam/bootnode
|
||||
PROXY_IMAGE_NAME ?= statusteam/discovery-proxy
|
||||
STATUSD_PRUNE_IMAGE_NAME ?= statusteam/statusd-prune
|
||||
|
||||
DOCKER_IMAGE_CUSTOM_TAG ?= $(shell BUILD_TAGS="$(BUILD_TAGS)" ./_assets/ci/get-docker-image-tag.sh)
|
||||
|
@ -87,6 +88,10 @@ bootnode: ##@build Build discovery v5 bootnode using status-go deps
|
|||
go build -i -o $(GOBIN)/bootnode -v -tags '$(BUILD_TAGS)' $(BUILD_FLAGS) ./cmd/bootnode/
|
||||
@echo "Compilation done."
|
||||
|
||||
proxy: ##@build Build proxy for rendezvous servers using status-go deps
|
||||
go build -i -o $(GOBIN)/proxy -v -tags '$(BUILD_TAGS)' $(BUILD_FLAGS) ./cmd/proxy/
|
||||
@echo "Compilation done."
|
||||
|
||||
mailserver-canary: ##@build Build mailserver canary using status-go deps
|
||||
go build -i -o $(GOBIN)/mailserver-canary -v -tags '$(BUILD_TAGS)' $(BUILD_FLAGS) ./cmd/mailserver-canary/
|
||||
@echo "Compilation done."
|
||||
|
@ -137,9 +142,19 @@ docker-image: ##@docker Build docker image (use DOCKER_IMAGE_NAME to set the ima
|
|||
bootnode-image:
|
||||
@echo "Building docker image for bootnode..."
|
||||
docker build --file _assets/build/Dockerfile-bootnode . \
|
||||
--build-arg "build_tags=$(BUILD_TAGS)" \
|
||||
--build-arg "build_flags=$(BUILD_FLAGS)" \
|
||||
-t $(BOOTNODE_IMAGE_NAME):$(DOCKER_IMAGE_CUSTOM_TAG) \
|
||||
-t $(BOOTNODE_IMAGE_NAME):latest
|
||||
|
||||
proxy-image:
|
||||
@echo "Building docker image for proxy..."
|
||||
docker build --file _assets/build/Dockerfile-proxy . \
|
||||
--build-arg "build_tags=$(BUILD_TAGS)" \
|
||||
--build-arg "build_flags=$(BUILD_FLAGS)" \
|
||||
-t $(PROXY_IMAGE_NAME):$(DOCKER_IMAGE_CUSTOM_TAG) \
|
||||
-t $(PROXY_IMAGE_NAME):latest
|
||||
|
||||
push-docker-images: docker-image bootnode-image
|
||||
docker push $(BOOTNODE_IMAGE_NAME):$(DOCKER_IMAGE_CUSTOM_TAG)
|
||||
docker push $(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_CUSTOM_TAG)
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
FROM golang:1.10-alpine as builder
|
||||
|
||||
ARG build_tags
|
||||
|
||||
RUN apk add --no-cache make gcc musl-dev linux-headers
|
||||
|
||||
RUN mkdir -p /go/src/github.com/status-im/status-go
|
||||
ADD . /go/src/github.com/status-im/status-go
|
||||
RUN cd /go/src/github.com/status-im/status-go && make proxy BUILD_TAGS="$build_tags"
|
||||
|
||||
FROM alpine:latest
|
||||
|
||||
RUN apk add --no-cache ca-certificates bash
|
||||
|
||||
COPY --from=builder /go/src/github.com/status-im/status-go/build/bin/proxy /usr/local/bin/
|
|
@ -0,0 +1,44 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ErrorEmpty returned when value is empty.
|
||||
var ErrorEmpty = errors.New("empty value not allowed")
|
||||
|
||||
// StringSlice is a type of flag that allows setting multiple string values.
|
||||
type StringSlice []string
|
||||
|
||||
func (s *StringSlice) String() string {
|
||||
return "string slice"
|
||||
}
|
||||
|
||||
// Set trims space from string and stores it.
|
||||
func (s *StringSlice) Set(value string) error {
|
||||
trimmed := strings.TrimSpace(value)
|
||||
if len(trimmed) == 0 {
|
||||
return ErrorEmpty
|
||||
}
|
||||
*s = append(*s, trimmed)
|
||||
return nil
|
||||
}
|
||||
|
||||
// IntSlice is a type of flag that allows setting multiple int values.
|
||||
type IntSlice []int
|
||||
|
||||
func (s *IntSlice) String() string {
|
||||
return "int slice"
|
||||
}
|
||||
|
||||
// Set trims space from string and stores it.
|
||||
func (s *IntSlice) Set(value string) error {
|
||||
val, err := strconv.Atoi(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*s = append(*s, val)
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/status-im/status-go/discovery"
|
||||
sparams "github.com/status-im/status-go/params"
|
||||
)
|
||||
|
||||
var (
|
||||
laddr = flag.String("laddr", "0.0.0.0:31143", "Listening address for discovery v5.")
|
||||
verbosity = flag.String("v", "info", "Logger verbosity")
|
||||
rendezvousNodes = StringSlice{}
|
||||
bootnodes = StringSlice{}
|
||||
topics = StringSlice{}
|
||||
les = IntSlice{}
|
||||
useEthereum = flag.Bool("use-ethereum-boot", false, "If true ethereum bootnodes will be used")
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.Var(&rendezvousNodes, "rendezvous-node", "Rendezvous server.")
|
||||
flag.Var(&bootnodes, "bootnode", "Discovery v5 node.")
|
||||
flag.Var(&les, "les", "Proxy les topic for a given network.")
|
||||
flag.Var(&topics, "topic", "Topic that will be proxied")
|
||||
flag.Parse()
|
||||
|
||||
level, err := log.LvlFromString(strings.ToLower(*verbosity))
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("unable to get logger level from string %s: %v", *verbosity, err))
|
||||
}
|
||||
filteredHandler := log.LvlFilterHandler(level, log.StderrHandler)
|
||||
log.Root().SetHandler(filteredHandler)
|
||||
|
||||
for _, net := range les {
|
||||
if t := sparams.LesTopic(net); len(t) != 0 {
|
||||
topics = append(topics, t)
|
||||
}
|
||||
}
|
||||
key, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
log.Crit("unable to generate a key", "error", err)
|
||||
}
|
||||
rst := []string(bootnodes)
|
||||
if *useEthereum {
|
||||
rst = append(rst, params.DiscoveryV5Bootnodes...)
|
||||
}
|
||||
v5 := discovery.NewDiscV5(key, *laddr, parseNodesV5(rst))
|
||||
if err := v5.Start(); err != nil {
|
||||
log.Crit("unable to start discovery v5", "address", *laddr, "error", err)
|
||||
}
|
||||
rendezvousServers := parseMultiaddrs(rendezvousNodes)
|
||||
var wg sync.WaitGroup
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
for _, t := range topics {
|
||||
log.Info("proxying records for", "topic", t, "bootnodes", rst, "rendezvous servers", rendezvousNodes)
|
||||
t := t
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if err := discovery.ProxyToRendezvous(v5, rendezvousServers, t, stop); err != nil {
|
||||
log.Error("proxying to rendezvous servers failed", "servers", rendezvousNodes, "topic", t, "error", err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func parseMultiaddrs(nodes []string) []ma.Multiaddr {
|
||||
var (
|
||||
rst = make([]ma.Multiaddr, len(nodes))
|
||||
err error
|
||||
)
|
||||
for i := range nodes {
|
||||
rst[i], err = ma.NewMultiaddr(nodes[i])
|
||||
if err != nil {
|
||||
log.Crit("unable to parse mutliaddr", "source", nodes[i], "error", err)
|
||||
}
|
||||
}
|
||||
return rst
|
||||
}
|
||||
|
||||
func parseNodesV5(nodes []string) []*discv5.Node {
|
||||
var (
|
||||
rst = make([]*discv5.Node, len(nodes))
|
||||
err error
|
||||
)
|
||||
for i := range nodes {
|
||||
rst[i], err = discv5.ParseNode(nodes[i])
|
||||
if err != nil {
|
||||
log.Crit("Failed to parse enode", "source", nodes[i], "err", err)
|
||||
}
|
||||
}
|
||||
return rst
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
// ProxyToRendezvous proxies records discovered using original to rendezvous servers for specified topic.
|
||||
func ProxyToRendezvous(original Discovery, servers []ma.Multiaddr, topic string, stop chan struct{}) error {
|
||||
var (
|
||||
identities = map[discv5.NodeID]*Rendezvous{}
|
||||
period = make(chan time.Duration, 1)
|
||||
found = make(chan *discv5.Node, 10)
|
||||
lookup = make(chan bool)
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
period <- 1 * time.Second
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if err := original.Discover(topic, period, found, lookup); err != nil {
|
||||
log.Error("discover request failed", "topic", topic, "error", err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
close(period)
|
||||
wg.Wait()
|
||||
return nil
|
||||
case <-lookup:
|
||||
case n := <-found:
|
||||
if _, exist := identities[n.ID]; exist {
|
||||
continue
|
||||
}
|
||||
log.Info("proxying new record", "topic", topic, "identity", n.String())
|
||||
record, err := makeProxiedENR(n)
|
||||
if err != nil {
|
||||
log.Error("error converting discovered node to ENR", "node", n.String(), "error", err)
|
||||
}
|
||||
r := NewRendezvousWithENR(servers, record)
|
||||
identities[n.ID] = r
|
||||
if err := r.Start(); err != nil {
|
||||
log.Error("unable to start rendezvous proxying", "servers", servers, "error", err)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
if err := r.Register(topic, stop); err != nil {
|
||||
log.Error("register error", "topic", topic, "error", err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func makeProxiedENR(n *discv5.Node) (enr.Record, error) {
|
||||
record := enr.Record{}
|
||||
record.Set(enr.IP(n.IP))
|
||||
record.Set(enr.TCP(n.TCP))
|
||||
record.Set(enr.UDP(n.UDP))
|
||||
record.Set(Proxied(n.ID))
|
||||
key, err := crypto.GenerateKey() // we need separate key for each identity, records are stored based on it
|
||||
if err != nil {
|
||||
return record, fmt.Errorf("unable to generate private key. error : %v", err)
|
||||
}
|
||||
if err := enr.SignV4(&record, key); err != nil {
|
||||
return record, fmt.Errorf("unable to sign enr record. error: %v", err)
|
||||
}
|
||||
return record, nil
|
||||
}
|
||||
|
||||
// Proxied is an Entry for ENR
|
||||
type Proxied discv5.NodeID
|
||||
|
||||
// ENRKey returns unique key that is used by ENR.
|
||||
func (p Proxied) ENRKey() string {
|
||||
return "proxied"
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/status-im/rendezvous"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestProxyToRendezvous(t *testing.T) {
|
||||
var (
|
||||
topic = "test"
|
||||
id = 101
|
||||
reg = newRegistry()
|
||||
original = &fake{id: 110, registry: reg, started: true}
|
||||
srv = makeTestRendezvousServer(t, "/ip4/127.0.0.1/tcp/7788")
|
||||
stop = make(chan struct{})
|
||||
wg sync.WaitGroup
|
||||
)
|
||||
client, err := rendezvous.NewTemporary()
|
||||
require.NoError(t, err)
|
||||
reg.Add(topic, id)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, ProxyToRendezvous(original, []ma.Multiaddr{srv.Addr()}, topic, stop))
|
||||
}()
|
||||
timer := time.After(3 * time.Second)
|
||||
ticker := time.Tick(100 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-timer:
|
||||
close(stop)
|
||||
wg.Wait()
|
||||
require.FailNow(t, "failed waiting for record to be proxied")
|
||||
case <-ticker:
|
||||
records, err := client.Discover(context.TODO(), srv.Addr(), topic, 10)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(records) != 1 {
|
||||
continue
|
||||
}
|
||||
var proxied Proxied
|
||||
require.NoError(t, records[0].Load(&proxied))
|
||||
require.Equal(t, proxied[0], byte(id))
|
||||
close(stop)
|
||||
wg.Wait()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -39,6 +39,15 @@ func NewRendezvous(servers []ma.Multiaddr, identity *ecdsa.PrivateKey, node *dis
|
|||
return r, nil
|
||||
}
|
||||
|
||||
func NewRendezvousWithENR(servers []ma.Multiaddr, record enr.Record) *Rendezvous {
|
||||
r := new(Rendezvous)
|
||||
r.servers = servers
|
||||
r.registrationPeriod = registrationPeriod
|
||||
r.bucketSize = bucketSize
|
||||
r.record = record
|
||||
return r
|
||||
}
|
||||
|
||||
// Rendezvous is an implementation of discovery interface that uses
|
||||
// rendezvous client.
|
||||
type Rendezvous struct {
|
||||
|
@ -148,6 +157,7 @@ func (r *Rendezvous) Discover(
|
|||
}
|
||||
for i := range records {
|
||||
n, err := enrToNode(records[i])
|
||||
log.Debug("converted enr to", "ENODE", n.String())
|
||||
if err != nil {
|
||||
log.Warn("error converting enr record to node", "err", err)
|
||||
}
|
||||
|
@ -163,10 +173,18 @@ func enrToNode(record enr.Record) (*discv5.Node, error) {
|
|||
ip enr.IP
|
||||
tport enr.TCP
|
||||
uport enr.UDP
|
||||
proxied Proxied
|
||||
nodeID discv5.NodeID
|
||||
)
|
||||
if err := record.Load(&proxied); err == nil {
|
||||
nodeID = discv5.NodeID(proxied)
|
||||
} else {
|
||||
if err := record.Load(&key); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ecdsaKey := ecdsa.PublicKey(key)
|
||||
nodeID = discv5.PubkeyID(&ecdsaKey)
|
||||
}
|
||||
if err := record.Load(&ip); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -175,6 +193,5 @@ func enrToNode(record enr.Record) (*discv5.Node, error) {
|
|||
}
|
||||
// ignore absence of udp port, as it is optional
|
||||
_ = record.Load(&uport)
|
||||
ecdsaKey := ecdsa.PublicKey(key)
|
||||
return discv5.NewNode(discv5.PubkeyID(&ecdsaKey), net.IP(ip), uint16(uport), uint16(tport)), nil
|
||||
return discv5.NewNode(nodeID, net.IP(ip), uint16(uport), uint16(tport)), nil
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package discovery
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -18,17 +17,21 @@ import (
|
|||
"github.com/syndtr/goleveldb/leveldb/storage"
|
||||
)
|
||||
|
||||
func TestRendezvousDiscovery(t *testing.T) {
|
||||
func makeTestRendezvousServer(t *testing.T, addr string) *server.Server {
|
||||
priv, _, err := lcrypto.GenerateKeyPair(lcrypto.Secp256k1, 0)
|
||||
require.NoError(t, err)
|
||||
laddr, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/7777"))
|
||||
laddr, err := ma.NewMultiaddr(addr)
|
||||
require.NoError(t, err)
|
||||
db, err := leveldb.Open(storage.NewMemStorage(), nil)
|
||||
require.NoError(t, err)
|
||||
srv := server.NewServer(laddr, priv, server.NewStorage(db))
|
||||
require.NoError(t, srv.Start())
|
||||
defer srv.Stop()
|
||||
return srv
|
||||
}
|
||||
|
||||
func TestRendezvousDiscovery(t *testing.T) {
|
||||
srv := makeTestRendezvousServer(t, "/ip4/127.0.0.1/tcp/7777")
|
||||
defer srv.Stop()
|
||||
identity, err := crypto.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
node := discover.NewNode(discover.PubkeyID(&identity.PublicKey), net.IP{10, 10, 10, 10}, 10, 20)
|
||||
|
|
|
@ -13,10 +13,12 @@ import (
|
|||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
)
|
||||
|
||||
// errors
|
||||
|
@ -687,7 +689,7 @@ func (c *NodeConfig) updateRelativeDirsConfig() error {
|
|||
|
||||
// updatePeerLimits will set default peer limits expectations based on enabled services.
|
||||
func (c *NodeConfig) updatePeerLimits() {
|
||||
if c.NoDiscovery {
|
||||
if c.NoDiscovery && !c.Rendezvous {
|
||||
return
|
||||
}
|
||||
if c.WhisperConfig.Enabled {
|
||||
|
@ -722,6 +724,18 @@ func GetStatusHome() string {
|
|||
if gopath == "" {
|
||||
gopath = build.Default.GOPATH
|
||||
}
|
||||
|
||||
return path.Join(gopath, "/src/github.com/status-im/status-go/")
|
||||
}
|
||||
|
||||
// LesTopic returns discovery v5 topic derived from genesis of the provided network.
|
||||
// 1 - mainnet, 4 - ropsten
|
||||
func LesTopic(netid int) string {
|
||||
switch netid {
|
||||
case 1:
|
||||
return "LES2@" + common.Bytes2Hex(params.MainnetGenesisHash.Bytes()[:8])
|
||||
case 3:
|
||||
return "LES2@" + common.Bytes2Hex(params.TestnetGenesisHash.Bytes()[:8])
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue