From f151be54c6b5c862342ee5ce5ed8198e104fc097 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Wed, 2 Jan 2019 09:45:52 +0200 Subject: [PATCH] Close both sides of the stream when client is done with request (#1313) Libp2p keeps stream open if EOF wasn't seen and we called Close method. The most important change is that reader now uses FullClose util, that will wait for EOF character before closing the stream. --- Gopkg.lock | 4 +- discovery/rendezvous.go | 3 + .../github.com/status-im/rendezvous/client.go | 27 +++++---- .../status-im/rendezvous/server/cleaner.go | 24 ++++---- .../status-im/rendezvous/server/server.go | 55 ++++++++++++------- .../github.com/status-im/rendezvous/stream.go | 41 ++++++++++++-- 6 files changed, 102 insertions(+), 52 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 16088f2db..52474ffe0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -822,7 +822,7 @@ [[projects]] branch = "master" - digest = "1:5fb8fc6a365f62e1d771b8a33c808a111ace9ba3506089f00ec252ec1e257a83" + digest = "1:7eae6e73aa5dd99a7ab975fbc4a39cd98cb72c5a5272970735676e099667cf8a" name = "github.com/status-im/rendezvous" packages = [ ".", @@ -830,7 +830,7 @@ "server", ] pruneopts = "NUT" - revision = "fbcc46a78cd43fef95a110df664aab513116a850" + revision = "9e20b11affd0bf0591126a518f3e7b8aa057f88f" [[projects]] digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210" diff --git a/discovery/rendezvous.go b/discovery/rendezvous.go index 7392626ce..3e4f74ea6 100644 --- a/discovery/rendezvous.go +++ b/discovery/rendezvous.go @@ -93,6 +93,9 @@ func (r *Rendezvous) Stop() error { r.mu.Lock() defer r.mu.Unlock() r.cancelRootCtx() + if err := r.client.Close(); err != nil { + return err + } r.client = nil return nil } diff --git a/vendor/github.com/status-im/rendezvous/client.go b/vendor/github.com/status-im/rendezvous/client.go index a4bffe380..fc8faece2 100644 --- a/vendor/github.com/status-im/rendezvous/client.go +++ b/vendor/github.com/status-im/rendezvous/client.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "fmt" - "io" "time" "github.com/ethereum/go-ethereum/log" @@ -13,6 +12,7 @@ import ( libp2p "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" + net "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ethv4 "github.com/status-im/go-multiaddr-ethv4" @@ -38,15 +38,17 @@ func New(identity crypto.PrivKey) (c Client, err error) { return c, err } return Client{ - identity: identity, - h: h, + h: h, + }, nil +} + +func NewWithHost(h host.Host) (c Client, err error) { + return Client{ + h: h, }, nil } type Client struct { - laddr ma.Multiaddr - identity crypto.PrivKey - h host.Host } @@ -55,7 +57,7 @@ func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, re if err != nil { return err } - defer s.Close() + defer net.FullClose(s) if err = rlp.Encode(s, protocol.REGISTER); err != nil { return err } @@ -86,7 +88,7 @@ func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, li if err != nil { return } - defer s.Close() + defer net.FullClose(s) if err = rlp.Encode(s, protocol.DISCOVER); err != nil { return } @@ -112,7 +114,7 @@ func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, li return val.Records, nil } -func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw io.ReadWriteCloser, err error) { +func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw net.Stream, err error) { pid, err := srv.ValueForProtocol(ethv4.P_ETHv4) if err != nil { return @@ -132,5 +134,10 @@ func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw io.ReadWrit if err != nil { return nil, err } - return InstrumenetedStream{s}, nil + return &InstrumentedStream{s}, nil +} + +// Close shutdowns the host and all open connections. +func (c Client) Close() error { + return c.h.Close() } diff --git a/vendor/github.com/status-im/rendezvous/server/cleaner.go b/vendor/github.com/status-im/rendezvous/server/cleaner.go index 69d6a0494..94e4efde5 100644 --- a/vendor/github.com/status-im/rendezvous/server/cleaner.go +++ b/vendor/github.com/status-im/rendezvous/server/cleaner.go @@ -8,7 +8,6 @@ import ( type deadline struct { time time.Time - refs int } // definitely rename @@ -52,16 +51,12 @@ func (c *Cleaner) Pop() interface{} { old := c.heap n := len(old) x := old[n-1] - c.heap = old[0 : n-1] - dl, exist := c.deadlines[x] + c.heap = append([]string{}, old[0:n-1]...) + _, exist := c.deadlines[x] if !exist { return x } - dl.refs-- - c.deadlines[x] = dl - if dl.refs == 0 { - delete(c.deadlines, x) - } + delete(c.deadlines, x) return x } @@ -70,10 +65,15 @@ func (c *Cleaner) Add(deadlineTime time.Time, key string) { defer c.mu.Unlock() dl, exist := c.deadlines[key] if !exist { - dl = deadline{time: deadlineTime, refs: 1} + dl = deadline{time: deadlineTime} } else { dl.time = deadlineTime - dl.refs++ + for i, n := range c.heap { + if n == key { + heap.Remove(c, i) + break + } + } } c.deadlines[key] = dl heap.Push(c, key) @@ -94,10 +94,6 @@ func (c *Cleaner) PopSince(now time.Time) (rst []string) { if !exist { continue } - if dl.refs > 1 { - heap.Pop(c) - continue - } if now.After(dl.time) { rst = append(rst, heap.Pop(c).(string)) } else { diff --git a/vendor/github.com/status-im/rendezvous/server/server.go b/vendor/github.com/status-im/rendezvous/server/server.go index f257ede38..ac34d7dd4 100644 --- a/vendor/github.com/status-im/rendezvous/server/server.go +++ b/vendor/github.com/status-im/rendezvous/server/server.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" @@ -118,27 +119,39 @@ func (srv *Server) startListener() error { srv.h = h srv.h.SetStreamHandler(protocol.VERSION, func(s net.Stream) { defer s.Close() - rs := rlp.NewStream(s, 0) - s.SetReadDeadline(time.Now().Add(srv.readTimeout)) - typ, err := rs.Uint() - if err != nil { - logger.Error("error reading message type", "error", err) - return - } - s.SetReadDeadline(time.Now().Add(srv.readTimeout)) - resptype, resp, err := srv.msgParser(protocol.MessageType(typ), rs) - if err != nil { - logger.Error("error parsing message", "error", err) - return - } - s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) - if err = rlp.Encode(s, resptype); err != nil { - logger.Error("error writing response", "type", resptype, "error", err) - return - } - s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) - if err = rlp.Encode(s, resp); err != nil { - logger.Error("error encoding response", "resp", resp, "error", err) + for { + rs := rlp.NewStream(s, 0) + s.SetReadDeadline(time.Now().Add(srv.readTimeout)) + typ, err := rs.Uint() + if err == io.EOF { + return + } + if err != nil { + logger.Debug("error reading message type", "error", err) + s.Reset() + return + } + s.SetReadDeadline(time.Now().Add(srv.readTimeout)) + resptype, resp, err := srv.msgParser(protocol.MessageType(typ), rs) + if err == io.EOF { + return + } + if err != nil { + logger.Debug("error parsing message", "error", err) + s.Reset() + return + } + s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) + if err = rlp.Encode(s, resptype); err != nil { + logger.Debug("error writing response", "type", resptype, "error", err) + s.Reset() + return + } + s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) + if err = rlp.Encode(s, resp); err != nil { + logger.Debug("error encoding response", "resp", resp, "error", err) + s.Reset() + } } }) addr, err := ma.NewMultiaddr(fmt.Sprintf("/ethv4/%s", h.ID().Pretty())) diff --git a/vendor/github.com/status-im/rendezvous/stream.go b/vendor/github.com/status-im/rendezvous/stream.go index a14bed55e..5b29b7fbb 100644 --- a/vendor/github.com/status-im/rendezvous/stream.go +++ b/vendor/github.com/status-im/rendezvous/stream.go @@ -1,8 +1,11 @@ package rendezvous import ( + "time" + "github.com/ethereum/go-ethereum/metrics" inet "github.com/libp2p/go-libp2p-net" + protocol "github.com/libp2p/go-libp2p-protocol" ) var ( @@ -10,23 +13,51 @@ var ( egressTrafficMeter = metrics.NewRegisteredMeter("rendezvous/OutboundTraffic", nil) ) -// InstrumenetedStream implements read writer interface and collects metrics. -type InstrumenetedStream struct { +// InstrumentedStream implements read writer interface and collects metrics. +type InstrumentedStream struct { s inet.Stream } -func (si InstrumenetedStream) Write(p []byte) (int, error) { +func (si InstrumentedStream) Write(p []byte) (int, error) { n, err := si.s.Write(p) egressTrafficMeter.Mark(int64(n)) return n, err } -func (si InstrumenetedStream) Read(p []byte) (int, error) { +func (si InstrumentedStream) Read(p []byte) (int, error) { n, err := si.s.Read(p) ingressTrafficMeter.Mark(int64(n)) return n, err } -func (si InstrumenetedStream) Close() error { +func (si InstrumentedStream) Close() error { return si.s.Close() } + +func (si InstrumentedStream) Reset() error { + return si.s.Reset() +} + +func (si InstrumentedStream) SetDeadline(timeout time.Time) error { + return si.s.SetDeadline(timeout) +} + +func (si InstrumentedStream) SetReadDeadline(timeout time.Time) error { + return si.s.SetReadDeadline(timeout) +} + +func (si InstrumentedStream) SetWriteDeadline(timeout time.Time) error { + return si.s.SetWriteDeadline(timeout) +} + +func (si InstrumentedStream) Protocol() protocol.ID { + return si.s.Protocol() +} + +func (si InstrumentedStream) SetProtocol(pid protocol.ID) { + si.s.SetProtocol(pid) +} + +func (si InstrumentedStream) Conn() inet.Conn { + return si.s.Conn() +}