Close both sides of the stream when client is done with request (#1313)

Libp2p keeps stream open if EOF wasn't seen and we called Close method.
The most important change is that reader now uses FullClose util, that will
wait for EOF character before closing the stream.
This commit is contained in:
Dmitry Shulyak 2019-01-02 09:45:52 +02:00 committed by GitHub
parent fa97e6927d
commit f151be54c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 102 additions and 52 deletions

4
Gopkg.lock generated
View File

@ -822,7 +822,7 @@
[[projects]] [[projects]]
branch = "master" branch = "master"
digest = "1:5fb8fc6a365f62e1d771b8a33c808a111ace9ba3506089f00ec252ec1e257a83" digest = "1:7eae6e73aa5dd99a7ab975fbc4a39cd98cb72c5a5272970735676e099667cf8a"
name = "github.com/status-im/rendezvous" name = "github.com/status-im/rendezvous"
packages = [ packages = [
".", ".",
@ -830,7 +830,7 @@
"server", "server",
] ]
pruneopts = "NUT" pruneopts = "NUT"
revision = "fbcc46a78cd43fef95a110df664aab513116a850" revision = "9e20b11affd0bf0591126a518f3e7b8aa057f88f"
[[projects]] [[projects]]
digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210" digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210"

View File

@ -93,6 +93,9 @@ func (r *Rendezvous) Stop() error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.cancelRootCtx() r.cancelRootCtx()
if err := r.client.Close(); err != nil {
return err
}
r.client = nil r.client = nil
return nil return nil
} }

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"io"
"time" "time"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -13,6 +12,7 @@ import (
libp2p "github.com/libp2p/go-libp2p" libp2p "github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-crypto" crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host" host "github.com/libp2p/go-libp2p-host"
net "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
ethv4 "github.com/status-im/go-multiaddr-ethv4" ethv4 "github.com/status-im/go-multiaddr-ethv4"
@ -38,15 +38,17 @@ func New(identity crypto.PrivKey) (c Client, err error) {
return c, err return c, err
} }
return Client{ return Client{
identity: identity, h: h,
h: h, }, nil
}
func NewWithHost(h host.Host) (c Client, err error) {
return Client{
h: h,
}, nil }, nil
} }
type Client struct { type Client struct {
laddr ma.Multiaddr
identity crypto.PrivKey
h host.Host h host.Host
} }
@ -55,7 +57,7 @@ func (c Client) Register(ctx context.Context, srv ma.Multiaddr, topic string, re
if err != nil { if err != nil {
return err return err
} }
defer s.Close() defer net.FullClose(s)
if err = rlp.Encode(s, protocol.REGISTER); err != nil { if err = rlp.Encode(s, protocol.REGISTER); err != nil {
return err return err
} }
@ -86,7 +88,7 @@ func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, li
if err != nil { if err != nil {
return return
} }
defer s.Close() defer net.FullClose(s)
if err = rlp.Encode(s, protocol.DISCOVER); err != nil { if err = rlp.Encode(s, protocol.DISCOVER); err != nil {
return return
} }
@ -112,7 +114,7 @@ func (c Client) Discover(ctx context.Context, srv ma.Multiaddr, topic string, li
return val.Records, nil return val.Records, nil
} }
func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw io.ReadWriteCloser, err error) { func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw net.Stream, err error) {
pid, err := srv.ValueForProtocol(ethv4.P_ETHv4) pid, err := srv.ValueForProtocol(ethv4.P_ETHv4)
if err != nil { if err != nil {
return return
@ -132,5 +134,10 @@ func (c Client) newStream(ctx context.Context, srv ma.Multiaddr) (rw io.ReadWrit
if err != nil { if err != nil {
return nil, err return nil, err
} }
return InstrumenetedStream{s}, nil return &InstrumentedStream{s}, nil
}
// Close shutdowns the host and all open connections.
func (c Client) Close() error {
return c.h.Close()
} }

View File

@ -8,7 +8,6 @@ import (
type deadline struct { type deadline struct {
time time.Time time time.Time
refs int
} }
// definitely rename // definitely rename
@ -52,16 +51,12 @@ func (c *Cleaner) Pop() interface{} {
old := c.heap old := c.heap
n := len(old) n := len(old)
x := old[n-1] x := old[n-1]
c.heap = old[0 : n-1] c.heap = append([]string{}, old[0:n-1]...)
dl, exist := c.deadlines[x] _, exist := c.deadlines[x]
if !exist { if !exist {
return x return x
} }
dl.refs-- delete(c.deadlines, x)
c.deadlines[x] = dl
if dl.refs == 0 {
delete(c.deadlines, x)
}
return x return x
} }
@ -70,10 +65,15 @@ func (c *Cleaner) Add(deadlineTime time.Time, key string) {
defer c.mu.Unlock() defer c.mu.Unlock()
dl, exist := c.deadlines[key] dl, exist := c.deadlines[key]
if !exist { if !exist {
dl = deadline{time: deadlineTime, refs: 1} dl = deadline{time: deadlineTime}
} else { } else {
dl.time = deadlineTime dl.time = deadlineTime
dl.refs++ for i, n := range c.heap {
if n == key {
heap.Remove(c, i)
break
}
}
} }
c.deadlines[key] = dl c.deadlines[key] = dl
heap.Push(c, key) heap.Push(c, key)
@ -94,10 +94,6 @@ func (c *Cleaner) PopSince(now time.Time) (rst []string) {
if !exist { if !exist {
continue continue
} }
if dl.refs > 1 {
heap.Pop(c)
continue
}
if now.After(dl.time) { if now.After(dl.time) {
rst = append(rst, heap.Pop(c).(string)) rst = append(rst, heap.Pop(c).(string))
} else { } else {

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
"time" "time"
@ -118,27 +119,39 @@ func (srv *Server) startListener() error {
srv.h = h srv.h = h
srv.h.SetStreamHandler(protocol.VERSION, func(s net.Stream) { srv.h.SetStreamHandler(protocol.VERSION, func(s net.Stream) {
defer s.Close() defer s.Close()
rs := rlp.NewStream(s, 0) for {
s.SetReadDeadline(time.Now().Add(srv.readTimeout)) rs := rlp.NewStream(s, 0)
typ, err := rs.Uint() s.SetReadDeadline(time.Now().Add(srv.readTimeout))
if err != nil { typ, err := rs.Uint()
logger.Error("error reading message type", "error", err) if err == io.EOF {
return return
} }
s.SetReadDeadline(time.Now().Add(srv.readTimeout)) if err != nil {
resptype, resp, err := srv.msgParser(protocol.MessageType(typ), rs) logger.Debug("error reading message type", "error", err)
if err != nil { s.Reset()
logger.Error("error parsing message", "error", err) return
return }
} s.SetReadDeadline(time.Now().Add(srv.readTimeout))
s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) resptype, resp, err := srv.msgParser(protocol.MessageType(typ), rs)
if err = rlp.Encode(s, resptype); err != nil { if err == io.EOF {
logger.Error("error writing response", "type", resptype, "error", err) return
return }
} if err != nil {
s.SetWriteDeadline(time.Now().Add(srv.writeTimeout)) logger.Debug("error parsing message", "error", err)
if err = rlp.Encode(s, resp); err != nil { s.Reset()
logger.Error("error encoding response", "resp", resp, "error", err) return
}
s.SetWriteDeadline(time.Now().Add(srv.writeTimeout))
if err = rlp.Encode(s, resptype); err != nil {
logger.Debug("error writing response", "type", resptype, "error", err)
s.Reset()
return
}
s.SetWriteDeadline(time.Now().Add(srv.writeTimeout))
if err = rlp.Encode(s, resp); err != nil {
logger.Debug("error encoding response", "resp", resp, "error", err)
s.Reset()
}
} }
}) })
addr, err := ma.NewMultiaddr(fmt.Sprintf("/ethv4/%s", h.ID().Pretty())) addr, err := ma.NewMultiaddr(fmt.Sprintf("/ethv4/%s", h.ID().Pretty()))

View File

@ -1,8 +1,11 @@
package rendezvous package rendezvous
import ( import (
"time"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
inet "github.com/libp2p/go-libp2p-net" inet "github.com/libp2p/go-libp2p-net"
protocol "github.com/libp2p/go-libp2p-protocol"
) )
var ( var (
@ -10,23 +13,51 @@ var (
egressTrafficMeter = metrics.NewRegisteredMeter("rendezvous/OutboundTraffic", nil) egressTrafficMeter = metrics.NewRegisteredMeter("rendezvous/OutboundTraffic", nil)
) )
// InstrumenetedStream implements read writer interface and collects metrics. // InstrumentedStream implements read writer interface and collects metrics.
type InstrumenetedStream struct { type InstrumentedStream struct {
s inet.Stream s inet.Stream
} }
func (si InstrumenetedStream) Write(p []byte) (int, error) { func (si InstrumentedStream) Write(p []byte) (int, error) {
n, err := si.s.Write(p) n, err := si.s.Write(p)
egressTrafficMeter.Mark(int64(n)) egressTrafficMeter.Mark(int64(n))
return n, err return n, err
} }
func (si InstrumenetedStream) Read(p []byte) (int, error) { func (si InstrumentedStream) Read(p []byte) (int, error) {
n, err := si.s.Read(p) n, err := si.s.Read(p)
ingressTrafficMeter.Mark(int64(n)) ingressTrafficMeter.Mark(int64(n))
return n, err return n, err
} }
func (si InstrumenetedStream) Close() error { func (si InstrumentedStream) Close() error {
return si.s.Close() return si.s.Close()
} }
func (si InstrumentedStream) Reset() error {
return si.s.Reset()
}
func (si InstrumentedStream) SetDeadline(timeout time.Time) error {
return si.s.SetDeadline(timeout)
}
func (si InstrumentedStream) SetReadDeadline(timeout time.Time) error {
return si.s.SetReadDeadline(timeout)
}
func (si InstrumentedStream) SetWriteDeadline(timeout time.Time) error {
return si.s.SetWriteDeadline(timeout)
}
func (si InstrumentedStream) Protocol() protocol.ID {
return si.s.Protocol()
}
func (si InstrumentedStream) SetProtocol(pid protocol.ID) {
si.s.SetProtocol(pid)
}
func (si InstrumentedStream) Conn() inet.Conn {
return si.s.Conn()
}