From 6f937d4ddf2af4d5515efa81f6808579f0212ed9 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 12 Sep 2018 10:23:54 +0300 Subject: [PATCH] Update rendezvous client to a latest version --- Gopkg.lock | 4 +-- .../github.com/status-im/rendezvous/client.go | 4 +-- .../status-im/rendezvous/server/cleaner.go | 13 ++++++++ .../status-im/rendezvous/server/metrics.go | 29 ++++++++++++++++ .../status-im/rendezvous/server/server.go | 33 +++++++++++++++++-- .../status-im/rendezvous/server/storage.go | 19 +++++++++-- 6 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 vendor/github.com/status-im/rendezvous/server/metrics.go diff --git a/Gopkg.lock b/Gopkg.lock index 64b257834..47c2355ed 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -767,7 +767,7 @@ [[projects]] branch = "master" - digest = "1:0a0848f018c29e4db5d941c2a8dbbbcaf1ea74f294c4759b69e8664b0a242040" + digest = "1:b9b4aaa95bb880c345f400d7de8d5023731f71a9dfd4ff62ec77a74acfda529e" name = "github.com/status-im/rendezvous" packages = [ ".", @@ -775,7 +775,7 @@ "server", ] pruneopts = "NUT" - revision = "444e3eda4a281ca36d61df8a2a0832265d09511d" + revision = "7fe5bc0fd1c58bb7f7bec64656d8b821bec8338f" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/vendor/github.com/status-im/rendezvous/client.go b/vendor/github.com/status-im/rendezvous/client.go index 272ed15be..aaba4d8e9 100644 --- a/vendor/github.com/status-im/rendezvous/client.go +++ b/vendor/github.com/status-im/rendezvous/client.go @@ -50,7 +50,7 @@ type Client struct { h host.Host } -func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, record enr.Record) error { +func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, record enr.Record, ttl time.Duration) error { s, err := c.newStream(ctx, srv) if err != nil { return err @@ -59,7 +59,7 @@ func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, re if err = rlp.Encode(s, protocol.REGISTER); err != nil { return err } - if err = rlp.Encode(s, protocol.Register{Topic: topic, Record: record, TTL: uint64(5 * time.Second)}); err != nil { + if err = rlp.Encode(s, protocol.Register{Topic: topic, Record: record, TTL: uint64(ttl)}); err != nil { return err } rs := rlp.NewStream(s, 0) diff --git a/vendor/github.com/status-im/rendezvous/server/cleaner.go b/vendor/github.com/status-im/rendezvous/server/cleaner.go index 9dd6e0b43..b6e413bd2 100644 --- a/vendor/github.com/status-im/rendezvous/server/cleaner.go +++ b/vendor/github.com/status-im/rendezvous/server/cleaner.go @@ -59,12 +59,25 @@ func (c *Cleaner) Add(deadline time.Time, key string) { heap.Push(c, key) } +func (c *Cleaner) Exist(key string) bool { + c.mu.RLock() + defer c.mu.RUnlock() + _, exist := c.deadlines[key] + return exist +} + func (c *Cleaner) PopOneSince(now time.Time) (rst string) { c.mu.Lock() defer c.mu.Unlock() if len(c.heap) == 0 { return } + // same key can be inserted multiple times into the heap, + // deadline for a key is removed when it was popped the first time + if _, exist := c.deadlines[c.heap[0]]; !exist { + heap.Pop(c) + return + } if now.After(c.deadlines[c.heap[0]]) { return heap.Pop(c).(string) } diff --git a/vendor/github.com/status-im/rendezvous/server/metrics.go b/vendor/github.com/status-im/rendezvous/server/metrics.go new file mode 100644 index 000000000..75a39294c --- /dev/null +++ b/vendor/github.com/status-im/rendezvous/server/metrics.go @@ -0,0 +1,29 @@ +package server + +var ( + metrics MetricsInterface = noopMetrics{} +) + +func UseMetrics(m MetricsInterface) { + metrics = m +} + +type MetricsInterface interface { + AddActiveRegistration(...string) + RemoveActiveRegistration(...string) + ObserveDiscoverSize(float64, ...string) + ObserveDiscoveryDuration(float64, ...string) + CountError(...string) +} + +type noopMetrics struct{} + +func (n noopMetrics) AddActiveRegistration(lvs ...string) {} + +func (n noopMetrics) RemoveActiveRegistration(lvs ...string) {} + +func (n noopMetrics) ObserveDiscoverSize(o float64, lvs ...string) {} + +func (n noopMetrics) ObserveDiscoveryDuration(o float64, lvs ...string) {} + +func (n noopMetrics) CountError(lvs ...string) {} diff --git a/vendor/github.com/status-im/rendezvous/server/server.go b/vendor/github.com/status-im/rendezvous/server/server.go index e3213ed22..69e8293c7 100644 --- a/vendor/github.com/status-im/rendezvous/server/server.go +++ b/vendor/github.com/status-im/rendezvous/server/server.go @@ -1,6 +1,7 @@ package server import ( + "bytes" "context" "errors" "fmt" @@ -21,6 +22,7 @@ var logger = log.New("package", "rendezvous/server") const ( longestTTL = 20 * time.Second + networkDelay = 500 * time.Millisecond cleanerPeriod = 2 * time.Second maxLimit uint = 10 maxTopicLength = 50 @@ -36,6 +38,7 @@ func NewServer(laddr ma.Multiaddr, identity crypto.PrivKey, s Storage) *Server { writeTimeout: 10 * time.Second, readTimeout: 10 * time.Second, cleanerPeriod: cleanerPeriod, + networkDelay: networkDelay, } return &srv } @@ -51,6 +54,7 @@ type Server struct { storage Storage cleaner *Cleaner cleanerPeriod time.Duration + networkDelay time.Duration h host.Host addr ma.Multiaddr @@ -74,6 +78,11 @@ func (srv *Server) Start() error { } // once server is restarted all cleaner info is lost. so we need to rebuild it return srv.storage.IterateAllKeys(func(key RecordsKey, ttl time.Time) error { + if !srv.cleaner.Exist(key.String()) { + topic := TopicPart(key) + log.Debug("active registration with", "topic", string(topic)) + metrics.AddActiveRegistration(string(topic)) + } srv.cleaner.Add(ttl, key.String()) return nil }) @@ -162,6 +171,9 @@ func (srv *Server) purgeOutdated() { if len(key) == 0 { return } + topic := TopicPart([]byte(key)) + log.Debug("Removing record with", "topic", string(topic)) + metrics.RemoveActiveRegistration(string(topic)) if err := srv.storage.RemoveByKey(key); err != nil { logger.Error("error removing key from storage", "key", key, "error", err) } @@ -178,6 +190,7 @@ func (srv *Server) msgParser(typ protocol.MessageType, d Decoder) (resptype prot var msg protocol.Register resptype = protocol.REGISTER_RESPONSE if err = d.Decode(&msg); err != nil { + metrics.CountError("register") return resptype, protocol.RegisterResponse{Status: protocol.E_INVALID_CONTENT}, nil } resp, err = srv.register(msg) @@ -186,18 +199,24 @@ func (srv *Server) msgParser(typ protocol.MessageType, d Decoder) (resptype prot var msg protocol.Discover resptype = protocol.DISCOVER_RESPONSE if err = d.Decode(&msg); err != nil { + metrics.CountError("discover") return resptype, protocol.DiscoverResponse{Status: protocol.E_INVALID_CONTENT}, nil } limit := msg.Limit if msg.Limit > maxLimit { limit = maxLimit } + start := time.Now() records, err := srv.storage.GetRandom(msg.Topic, limit) if err != nil { + metrics.CountError("discover") return resptype, protocol.DiscoverResponse{Status: protocol.E_INTERNAL_ERROR}, err } + metrics.ObserveDiscoveryDuration(time.Since(start).Seconds(), msg.Topic) + metrics.ObserveDiscoverSize(float64(len(records)), msg.Topic) return resptype, protocol.DiscoverResponse{Status: protocol.OK, Records: records}, nil default: + metrics.CountError("unknown") // don't send the response return 0, nil, errors.New("unknown request type") } @@ -210,14 +229,22 @@ func (srv *Server) register(msg protocol.Register) (protocol.RegisterResponse, e if time.Duration(msg.TTL) > longestTTL { return protocol.RegisterResponse{Status: protocol.E_INVALID_TTL}, nil } + if bytes.IndexByte([]byte(msg.Topic), TopicBodyDelimiter) != -1 { + return protocol.RegisterResponse{Status: protocol.E_INVALID_NAMESPACE}, nil + } if !msg.Record.Signed() { return protocol.RegisterResponse{Status: protocol.E_INVALID_ENR}, nil } - ttl := time.Now().Add(time.Duration(msg.TTL)) - key, err := srv.storage.Add(msg.Topic, msg.Record, ttl) + deadline := time.Now().Add(time.Duration(msg.TTL)).Add(srv.networkDelay) + key, err := srv.storage.Add(msg.Topic, msg.Record, deadline) if err != nil { return protocol.RegisterResponse{Status: protocol.E_INTERNAL_ERROR}, err } - srv.cleaner.Add(ttl, key) + if !srv.cleaner.Exist(key) { + log.Debug("active registration with", "topic", msg.Topic) + metrics.AddActiveRegistration(msg.Topic) + } + log.Debug("updating record in the cleaner", "deadline", deadline, "topic", msg.Topic) + srv.cleaner.Add(deadline, key) return protocol.RegisterResponse{Status: protocol.OK}, nil } diff --git a/vendor/github.com/status-im/rendezvous/server/storage.go b/vendor/github.com/status-im/rendezvous/server/storage.go index ea3f30ce4..0e490d0f4 100644 --- a/vendor/github.com/status-im/rendezvous/server/storage.go +++ b/vendor/github.com/status-im/rendezvous/server/storage.go @@ -13,6 +13,8 @@ import ( const ( RecordsPrefix byte = 1 + iota + + TopicBodyDelimiter = 0xff ) type StorageRecord struct { @@ -20,13 +22,24 @@ type StorageRecord struct { Time time.Time } +// TopicPart looks for TopicBodyDelimiter and returns topic prefix from the same key. +// It doesn't allocate memory for topic prefix. +func TopicPart(key []byte) []byte { + idx := bytes.IndexByte(key, TopicBodyDelimiter) + if idx == -1 { + return nil + } + return key[1:idx] // first byte is RecordsPrefix +} + type RecordsKey []byte func NewRecordsKey(topic string, record enr.Record) RecordsKey { - key := make(RecordsKey, 1+len([]byte(topic))+len(record.NodeAddr())) + key := make(RecordsKey, 2+len([]byte(topic))+len(record.NodeAddr())) key[0] = RecordsPrefix copy(key[1:], []byte(topic)) - copy(key[1+len([]byte(topic)):], record.NodeAddr()) + key[1+len([]byte(topic))] = TopicBodyDelimiter + copy(key[2+len([]byte(topic)):], record.NodeAddr()) return key } @@ -88,6 +101,8 @@ func (s *Storage) GetRandom(topic string, limit uint) (rst []enr.Record, err err key := make(RecordsKey, prefixlen+32) key[0] = RecordsPrefix copy(key[1:], []byte(topic)) + key[prefixlen] = TopicBodyDelimiter + prefixlen++ iter := s.db.NewIterator(util.BytesPrefix(key[:prefixlen]), nil) defer iter.Release()