diff --git a/Gopkg.lock b/Gopkg.lock index 16088f2db..52474ffe0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -822,7 +822,7 @@ [[projects]] branch = "master" - digest = "1:5fb8fc6a365f62e1d771b8a33c808a111ace9ba3506089f00ec252ec1e257a83" + digest = "1:7eae6e73aa5dd99a7ab975fbc4a39cd98cb72c5a5272970735676e099667cf8a" name = "github.com/status-im/rendezvous" packages = [ ".", @@ -830,7 +830,7 @@ "server", ] pruneopts = "NUT" - revision = "fbcc46a78cd43fef95a110df664aab513116a850" + revision = "9e20b11affd0bf0591126a518f3e7b8aa057f88f" [[projects]] digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210" diff --git a/discovery/rendezvous.go b/discovery/rendezvous.go index 7392626ce..3e4f74ea6 100644 --- a/discovery/rendezvous.go +++ b/discovery/rendezvous.go @@ -93,6 +93,9 @@ func (r *Rendezvous) Stop() error { r.mu.Lock() defer r.mu.Unlock() r.cancelRootCtx() + if err := r.client.Close(); err != nil { + return err + } r.client = nil return nil } diff --git a/vendor/github.com/status-im/rendezvous/client.go b/vendor/github.com/status-im/rendezvous/client.go index a4bffe380..fc8faece2 100644 --- a/vendor/github.com/status-im/rendezvous/client.go +++ b/vendor/github.com/status-im/rendezvous/client.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "fmt" - "io" "time" "github.com/ethereum/go-ethereum/log" @@ -13,6 +12,7 @@ import ( libp2p "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" + net "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ethv4 "github.com/status-im/go-multiaddr-ethv4" @@ -38,15 +38,17 @@ func New(identity crypto.PrivKey) (c Client, err error) { return c, err } return Client{ - identity: identity, - h: h, + h: h, + }, nil +} + +func NewWithHost(h host.Host) (c Client, err error) { + return Client{ + h: h, }, nil } type Client struct { - laddr ma.Multiaddr - identity crypto.PrivKey - h host.Host } @@ -55,7 +57,7 @@ func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, re if err != nil { return err } - defer s.Close() + defer net.FullClose(s) if err = rlp.Encode(s, protocol.REGISTER); err != nil { return err } @@ -86,7 +88,7 @@ func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, li if err != nil { return } - defer s.Close() + defer net.FullClose(s) if err = rlp.Encode(s, protocol.DISCOVER); err != nil { return } @@ -112,7 +114,7 @@ func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, li return val.Records, nil } -func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw io.ReadWriteCloser, err error) { +func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw net.Stream, err error) { pid, err := srv.ValueForProtocol(ethv4.P_ETHv4) if err != nil { return @@ -132,5 +134,10 @@ func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw io.ReadWrit if err != nil { return nil, err } - return InstrumenetedStream{s}, nil + return &InstrumentedStream{s}, nil +} + +// Close shutdowns the host and all open connections. +func (c Client) Close() error { + return c.h.Close() } diff --git a/vendor/github.com/status-im/rendezvous/server/cleaner.go b/vendor/github.com/status-im/rendezvous/server/cleaner.go index 69d6a0494..94e4efde5 100644 --- a/vendor/github.com/status-im/rendezvous/server/cleaner.go +++ b/vendor/github.com/status-im/rendezvous/server/cleaner.go @@ -8,7 +8,6 @@ import ( type deadline struct { time time.Time - refs int } // definitely rename @@ -52,16 +51,12 @@ func (c *Cleaner) Pop() interface{} { old := c.heap n := len(old) x := old[n-1] - c.heap = old[0 : n-1] - dl, exist := c.deadlines[x] + c.heap = append([]string{}, old[0:n-1]...) + _, exist := c.deadlines[x] if !exist { return x } - dl.refs-- - c.deadlines[x] = dl - if dl.refs == 0 { - delete(c.deadlines, x) - } + delete(c.deadlines, x) return x } @@ -70,10 +65,15 @@ func (c *Cleaner) Add(deadlineTime time.Time, key string) { defer c.mu.Unlock() dl, exist := c.deadlines[key] if !exist { - dl = deadline{time: deadlineTime, refs: 1} + dl = deadline{time: deadlineTime} } else { dl.time = deadlineTime - dl.refs++ + for i, n := range c.heap { + if n == key { + heap.Remove(c, i) + break + } + } } c.deadlines[key] = dl heap.Push(c, key) @@ -94,10 +94,6 @@ func (c *Cleaner) PopSince(now time.Time) (rst []string) { if !exist { continue } - if dl.refs > 1 { - heap.Pop(c) - continue - } if now.After(dl.time) { rst = append(rst, heap.Pop(c).(string)) } else { diff --git a/vendor/github.com/status-im/rendezvous/server/server.go b/vendor/github.com/status-im/rendezvous/server/server.go index f257ede38..ac34d7dd4 100644 --- a/vendor/github.com/status-im/rendezvous/server/server.go +++ b/vendor/github.com/status-im/rendezvous/server/server.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" @@ -118,27 +119,39 @@ func (srv *Server) startListener() error { srv.h = h srv.h.SetStreamHandler(protocol.VERSION, func(s net.Stream) { defer s.Close() - rs := rlp.NewStream(s, 0) - s.SetReadDeadline(time.Now().Add(srv.readTimeout)) - typ, err := rs.Uint() - if err != nil { - logger.Error("error reading message type", "error", err) - return - } - s.SetReadDeadline(time.Now().Add(srv.readTimeout)) - resptype, resp, err := srv.msgParser(protocol.MessageType(typ), rs) - if err != nil { - logger.Error("error parsing message", "error", err) - return - } - s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) - if err = rlp.Encode(s, resptype); err != nil { - logger.Error("error writing response", "type", resptype, "error", err) - return - } - s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) - if err = rlp.Encode(s, resp); err != nil { - logger.Error("error encoding response", "resp", resp, "error", err) + for { + rs := rlp.NewStream(s, 0) + s.SetReadDeadline(time.Now().Add(srv.readTimeout)) + typ, err := rs.Uint() + if err == io.EOF { + return + } + if err != nil { + logger.Debug("error reading message type", "error", err) + s.Reset() + return + } + s.SetReadDeadline(time.Now().Add(srv.readTimeout)) + resptype, resp, err := srv.msgParser(protocol.MessageType(typ), rs) + if err == io.EOF { + return + } + if err != nil { + logger.Debug("error parsing message", "error", err) + s.Reset() + return + } + s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) + if err = rlp.Encode(s, resptype); err != nil { + logger.Debug("error writing response", "type", resptype, "error", err) + s.Reset() + return + } + s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) + if err = rlp.Encode(s, resp); err != nil { + logger.Debug("error encoding response", "resp", resp, "error", err) + s.Reset() + } } }) addr, err := ma.NewMultiaddr(fmt.Sprintf("/ethv4/%s", h.ID().Pretty())) diff --git a/vendor/github.com/status-im/rendezvous/stream.go b/vendor/github.com/status-im/rendezvous/stream.go index a14bed55e..5b29b7fbb 100644 --- a/vendor/github.com/status-im/rendezvous/stream.go +++ b/vendor/github.com/status-im/rendezvous/stream.go @@ -1,8 +1,11 @@ package rendezvous import ( + "time" + "github.com/ethereum/go-ethereum/metrics" inet "github.com/libp2p/go-libp2p-net" + protocol "github.com/libp2p/go-libp2p-protocol" ) var ( @@ -10,23 +13,51 @@ var ( egressTrafficMeter = metrics.NewRegisteredMeter("rendezvous/OutboundTraffic", nil) ) -// InstrumenetedStream implements read writer interface and collects metrics. -type InstrumenetedStream struct { +// InstrumentedStream implements read writer interface and collects metrics. +type InstrumentedStream struct { s inet.Stream } -func (si InstrumenetedStream) Write(p []byte) (int, error) { +func (si InstrumentedStream) Write(p []byte) (int, error) { n, err := si.s.Write(p) egressTrafficMeter.Mark(int64(n)) return n, err } -func (si InstrumenetedStream) Read(p []byte) (int, error) { +func (si InstrumentedStream) Read(p []byte) (int, error) { n, err := si.s.Read(p) ingressTrafficMeter.Mark(int64(n)) return n, err } -func (si InstrumenetedStream) Close() error { +func (si InstrumentedStream) Close() error { return si.s.Close() } + +func (si InstrumentedStream) Reset() error { + return si.s.Reset() +} + +func (si InstrumentedStream) SetDeadline(timeout time.Time) error { + return si.s.SetDeadline(timeout) +} + +func (si InstrumentedStream) SetReadDeadline(timeout time.Time) error { + return si.s.SetReadDeadline(timeout) +} + +func (si InstrumentedStream) SetWriteDeadline(timeout time.Time) error { + return si.s.SetWriteDeadline(timeout) +} + +func (si InstrumentedStream) Protocol() protocol.ID { + return si.s.Protocol() +} + +func (si InstrumentedStream) SetProtocol(pid protocol.ID) { + si.s.SetProtocol(pid) +} + +func (si InstrumentedStream) Conn() inet.Conn { + return si.s.Conn() +}