Update rendezvous client to a latest version

This commit is contained in:
Dmitry 2018-09-12 10:23:54 +03:00 committed by Dmitry Shulyak
parent e8c6841f30
commit 6f937d4ddf
6 changed files with 93 additions and 9 deletions

4
Gopkg.lock generated
View File

@ -767,7 +767,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:0a0848f018c29e4db5d941c2a8dbbbcaf1ea74f294c4759b69e8664b0a242040" digest = "1:b9b4aaa95bb880c345f400d7de8d5023731f71a9dfd4ff62ec77a74acfda529e"
name = "github.com/status-im/rendezvous" name = "github.com/status-im/rendezvous"
packages = [ packages = [
".", ".",
@ -775,7 +775,7 @@
"server", "server",
] ]
pruneopts = "NUT" pruneopts = "NUT"
revision = "444e3eda4a281ca36d61df8a2a0832265d09511d" revision = "7fe5bc0fd1c58bb7f7bec64656d8b821bec8338f"
[[projects]] [[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -50,7 +50,7 @@ type Client struct {
h host.Host h host.Host
} }
func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, record enr.Record) error { func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, record enr.Record, ttl time.Duration) error {
s, err := c.newStream(ctx, srv) s, err := c.newStream(ctx, srv)
if err != nil { if err != nil {
return err return err
@ -59,7 +59,7 @@ func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, re
if err = rlp.Encode(s, protocol.REGISTER); err != nil { if err = rlp.Encode(s, protocol.REGISTER); err != nil {
return err return err
} }
if err = rlp.Encode(s, protocol.Register{Topic: topic, Record: record, TTL: uint64(5 * time.Second)}); err != nil { if err = rlp.Encode(s, protocol.Register{Topic: topic, Record: record, TTL: uint64(ttl)}); err != nil {
return err return err
} }
rs := rlp.NewStream(s, 0) rs := rlp.NewStream(s, 0)

View File

@ -59,12 +59,25 @@ func (c *Cleaner) Add(deadline time.Time, key string) {
heap.Push(c, key) heap.Push(c, key)
} }
func (c *Cleaner) Exist(key string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
_, exist := c.deadlines[key]
return exist
}
func (c *Cleaner) PopOneSince(now time.Time) (rst string) { func (c *Cleaner) PopOneSince(now time.Time) (rst string) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if len(c.heap) == 0 { if len(c.heap) == 0 {
return return
} }
// same key can be inserted multiple times into the heap,
// deadline for a key is removed when it was popped the first time
if _, exist := c.deadlines[c.heap[0]]; !exist {
heap.Pop(c)
return
}
if now.After(c.deadlines[c.heap[0]]) { if now.After(c.deadlines[c.heap[0]]) {
return heap.Pop(c).(string) return heap.Pop(c).(string)
} }

View File

@ -0,0 +1,29 @@
package server
var (
metrics MetricsInterface = noopMetrics{}
)
func UseMetrics(m MetricsInterface) {
metrics = m
}
type MetricsInterface interface {
AddActiveRegistration(...string)
RemoveActiveRegistration(...string)
ObserveDiscoverSize(float64, ...string)
ObserveDiscoveryDuration(float64, ...string)
CountError(...string)
}
type noopMetrics struct{}
func (n noopMetrics) AddActiveRegistration(lvs ...string) {}
func (n noopMetrics) RemoveActiveRegistration(lvs ...string) {}
func (n noopMetrics) ObserveDiscoverSize(o float64, lvs ...string) {}
func (n noopMetrics) ObserveDiscoveryDuration(o float64, lvs ...string) {}
func (n noopMetrics) CountError(lvs ...string) {}

View File

@ -1,6 +1,7 @@
package server package server
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -21,6 +22,7 @@ var logger = log.New("package", "rendezvous/server")
const ( const (
longestTTL = 20 * time.Second longestTTL = 20 * time.Second
networkDelay = 500 * time.Millisecond
cleanerPeriod = 2 * time.Second cleanerPeriod = 2 * time.Second
maxLimit uint = 10 maxLimit uint = 10
maxTopicLength = 50 maxTopicLength = 50
@ -36,6 +38,7 @@ func NewServer(laddr ma.Multiaddr, identity crypto.PrivKey, s Storage) *Server {
writeTimeout: 10 * time.Second, writeTimeout: 10 * time.Second,
readTimeout: 10 * time.Second, readTimeout: 10 * time.Second,
cleanerPeriod: cleanerPeriod, cleanerPeriod: cleanerPeriod,
networkDelay: networkDelay,
} }
return &srv return &srv
} }
@ -51,6 +54,7 @@ type Server struct {
storage Storage storage Storage
cleaner *Cleaner cleaner *Cleaner
cleanerPeriod time.Duration cleanerPeriod time.Duration
networkDelay time.Duration
h host.Host h host.Host
addr ma.Multiaddr addr ma.Multiaddr
@ -74,6 +78,11 @@ func (srv *Server) Start() error {
} }
// once server is restarted all cleaner info is lost. so we need to rebuild it // once server is restarted all cleaner info is lost. so we need to rebuild it
return srv.storage.IterateAllKeys(func(key RecordsKey, ttl time.Time) error { return srv.storage.IterateAllKeys(func(key RecordsKey, ttl time.Time) error {
if !srv.cleaner.Exist(key.String()) {
topic := TopicPart(key)
log.Debug("active registration with", "topic", string(topic))
metrics.AddActiveRegistration(string(topic))
}
srv.cleaner.Add(ttl, key.String()) srv.cleaner.Add(ttl, key.String())
return nil return nil
}) })
@ -162,6 +171,9 @@ func (srv *Server) purgeOutdated() {
if len(key) == 0 { if len(key) == 0 {
return return
} }
topic := TopicPart([]byte(key))
log.Debug("Removing record with", "topic", string(topic))
metrics.RemoveActiveRegistration(string(topic))
if err := srv.storage.RemoveByKey(key); err != nil { if err := srv.storage.RemoveByKey(key); err != nil {
logger.Error("error removing key from storage", "key", key, "error", err) logger.Error("error removing key from storage", "key", key, "error", err)
} }
@ -178,6 +190,7 @@ func (srv *Server) msgParser(typ protocol.MessageType, d Decoder) (resptype prot
var msg protocol.Register var msg protocol.Register
resptype = protocol.REGISTER_RESPONSE resptype = protocol.REGISTER_RESPONSE
if err = d.Decode(&msg); err != nil { if err = d.Decode(&msg); err != nil {
metrics.CountError("register")
return resptype, protocol.RegisterResponse{Status: protocol.E_INVALID_CONTENT}, nil return resptype, protocol.RegisterResponse{Status: protocol.E_INVALID_CONTENT}, nil
} }
resp, err = srv.register(msg) resp, err = srv.register(msg)
@ -186,18 +199,24 @@ func (srv *Server) msgParser(typ protocol.MessageType, d Decoder) (resptype prot
var msg protocol.Discover var msg protocol.Discover
resptype = protocol.DISCOVER_RESPONSE resptype = protocol.DISCOVER_RESPONSE
if err = d.Decode(&msg); err != nil { if err = d.Decode(&msg); err != nil {
metrics.CountError("discover")
return resptype, protocol.DiscoverResponse{Status: protocol.E_INVALID_CONTENT}, nil return resptype, protocol.DiscoverResponse{Status: protocol.E_INVALID_CONTENT}, nil
} }
limit := msg.Limit limit := msg.Limit
if msg.Limit > maxLimit { if msg.Limit > maxLimit {
limit = maxLimit limit = maxLimit
} }
start := time.Now()
records, err := srv.storage.GetRandom(msg.Topic, limit) records, err := srv.storage.GetRandom(msg.Topic, limit)
if err != nil { if err != nil {
metrics.CountError("discover")
return resptype, protocol.DiscoverResponse{Status: protocol.E_INTERNAL_ERROR}, err return resptype, protocol.DiscoverResponse{Status: protocol.E_INTERNAL_ERROR}, err
} }
metrics.ObserveDiscoveryDuration(time.Since(start).Seconds(), msg.Topic)
metrics.ObserveDiscoverSize(float64(len(records)), msg.Topic)
return resptype, protocol.DiscoverResponse{Status: protocol.OK, Records: records}, nil return resptype, protocol.DiscoverResponse{Status: protocol.OK, Records: records}, nil
default: default:
metrics.CountError("unknown")
// don't send the response // don't send the response
return 0, nil, errors.New("unknown request type") return 0, nil, errors.New("unknown request type")
} }
@ -210,14 +229,22 @@ func (srv *Server) register(msg protocol.Register) (protocol.RegisterResponse, e
if time.Duration(msg.TTL) > longestTTL { if time.Duration(msg.TTL) > longestTTL {
return protocol.RegisterResponse{Status: protocol.E_INVALID_TTL}, nil return protocol.RegisterResponse{Status: protocol.E_INVALID_TTL}, nil
} }
if bytes.IndexByte([]byte(msg.Topic), TopicBodyDelimiter) != -1 {
return protocol.RegisterResponse{Status: protocol.E_INVALID_NAMESPACE}, nil
}
if !msg.Record.Signed() { if !msg.Record.Signed() {
return protocol.RegisterResponse{Status: protocol.E_INVALID_ENR}, nil return protocol.RegisterResponse{Status: protocol.E_INVALID_ENR}, nil
} }
ttl := time.Now().Add(time.Duration(msg.TTL)) deadline := time.Now().Add(time.Duration(msg.TTL)).Add(srv.networkDelay)
key, err := srv.storage.Add(msg.Topic, msg.Record, ttl) key, err := srv.storage.Add(msg.Topic, msg.Record, deadline)
if err != nil { if err != nil {
return protocol.RegisterResponse{Status: protocol.E_INTERNAL_ERROR}, err return protocol.RegisterResponse{Status: protocol.E_INTERNAL_ERROR}, err
} }
srv.cleaner.Add(ttl, key) if !srv.cleaner.Exist(key) {
log.Debug("active registration with", "topic", msg.Topic)
metrics.AddActiveRegistration(msg.Topic)
}
log.Debug("updating record in the cleaner", "deadline", deadline, "topic", msg.Topic)
srv.cleaner.Add(deadline, key)
return protocol.RegisterResponse{Status: protocol.OK}, nil return protocol.RegisterResponse{Status: protocol.OK}, nil
} }

View File

@ -13,6 +13,8 @@ import (
const ( const (
RecordsPrefix byte = 1 + iota RecordsPrefix byte = 1 + iota
TopicBodyDelimiter = 0xff
) )
type StorageRecord struct { type StorageRecord struct {
@ -20,13 +22,24 @@ type StorageRecord struct {
Time time.Time Time time.Time
} }
// TopicPart looks for TopicBodyDelimiter and returns topic prefix from the same key.
// It doesn't allocate memory for topic prefix.
func TopicPart(key []byte) []byte {
idx := bytes.IndexByte(key, TopicBodyDelimiter)
if idx == -1 {
return nil
}
return key[1:idx] // first byte is RecordsPrefix
}
type RecordsKey []byte type RecordsKey []byte
func NewRecordsKey(topic string, record enr.Record) RecordsKey { func NewRecordsKey(topic string, record enr.Record) RecordsKey {
key := make(RecordsKey, 1+len([]byte(topic))+len(record.NodeAddr())) key := make(RecordsKey, 2+len([]byte(topic))+len(record.NodeAddr()))
key[0] = RecordsPrefix key[0] = RecordsPrefix
copy(key[1:], []byte(topic)) copy(key[1:], []byte(topic))
copy(key[1+len([]byte(topic)):], record.NodeAddr()) key[1+len([]byte(topic))] = TopicBodyDelimiter
copy(key[2+len([]byte(topic)):], record.NodeAddr())
return key return key
} }
@ -88,6 +101,8 @@ func (s *Storage) GetRandom(topic string, limit uint) (rst []enr.Record, err err
key := make(RecordsKey, prefixlen+32) key := make(RecordsKey, prefixlen+32)
key[0] = RecordsPrefix key[0] = RecordsPrefix
copy(key[1:], []byte(topic)) copy(key[1:], []byte(topic))
key[prefixlen] = TopicBodyDelimiter
prefixlen++
iter := s.db.NewIterator(util.BytesPrefix(key[:prefixlen]), nil) iter := s.db.NewIterator(util.BytesPrefix(key[:prefixlen]), nil)
defer iter.Release() defer iter.Release()