Merge pull request #35 from ipfs/fix/listener-hang

Handle incoming conns in their own goroutines
This commit is contained in:
Jeromy Johnson 2016-04-11 11:16:46 -07:00
commit b12b73d0cc
3 changed files with 392 additions and 86 deletions

View File

@ -5,7 +5,9 @@ import (
"fmt"
"io"
"net"
"runtime"
"strings"
"sync"
"testing"
"time"
@ -14,11 +16,16 @@ import (
peer "github.com/ipfs/go-libp2p/p2p/peer"
tu "github.com/ipfs/go-libp2p/testutil"
grc "gx/ipfs/QmTd4Jgb4nbJq5uR55KJgGLyHWmM3dovS21D1HcwRneSLu/gorocheck"
msmux "gx/ipfs/QmUeEcYJrzAEKdQXjzTxCgNZgc9sRuwharsvzzm5Gd2oGB/go-multistream"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)
func goroFilter(r *grc.Goroutine) bool {
return strings.Contains(r.Function, "go-log.")
}
func echoListen(ctx context.Context, listener Listener) {
for {
c, err := listener.Accept()
@ -397,3 +404,247 @@ func TestFailedAccept(t *testing.T) {
c.Close()
<-done
}
func TestHangingAccept(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
done := make(chan struct{})
go func() {
defer close(done)
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("first dial failed: ", err)
}
// hang this connection
defer con.Close()
// ensure that the first conn hits first
time.Sleep(time.Millisecond * 50)
con2, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
t.Error("second dial failed: ", err)
}
defer con2.Close()
err = msmux.SelectProtoOrFail(SecioTag, con2)
if err != nil {
t.Error("msmux select failed: ", err)
}
_, err = con2.Write([]byte("test"))
if err != nil {
t.Error("con write failed: ", err)
}
}()
c, err := l1.Accept()
if err != nil {
t.Fatal("connections after a failed accept should still work: ", err)
}
c.Close()
<-done
}
// This test kicks off N (=300) concurrent dials, which wait d (=20ms) seconds before failing.
// That wait holds up the handshake (multistream AND crypto), which will happen BEFORE
// l1.Accept() returns a connection. This test checks that the handshakes all happen
// concurrently in the listener side, and not sequentially. This ensures that a hanging dial
// will not block the listener from accepting other dials concurrently.
func TestConcurrentAccept(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
n := 300
delay := time.Millisecond * 20
if runtime.GOOS == "darwin" {
n = 100
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
// hang this connection
defer con.Close()
time.Sleep(delay)
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error(err)
}
}()
}
before := time.Now()
for i := 0; i < n; i++ {
c, err := l1.Accept()
if err != nil {
t.Fatal("connections after a failed accept should still work: ", err)
}
c.Close()
}
limit := delay * time.Duration(n)
took := time.Now().Sub(before)
if took > limit {
t.Fatal("took too long!")
}
log.Errorf("took: %s (less than %s)", took, limit)
l1.Close()
wg.Wait()
cancel()
time.Sleep(time.Millisecond * 100)
err = grc.CheckForLeaks(goroFilter)
if err != nil {
panic(err)
t.Fatal(err)
}
}
func TestConnectionTimeouts(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
old := NegotiateReadTimeout
NegotiateReadTimeout = time.Second * 5
defer func() { NegotiateReadTimeout = old }()
p1 := tu.RandPeerNetParamsOrFatal(t)
l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey)
if err != nil {
t.Fatal(err)
}
n := 100
if runtime.GOOS == "darwin" {
n = 50
}
p1.Addr = l1.Multiaddr() // Addr has been determined by kernel.
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
defer con.Close()
// hang this connection until timeout
io.ReadFull(con, make([]byte, 1000))
}()
}
// wait to make sure the hanging dials have started
time.Sleep(time.Millisecond * 50)
good_n := 20
for i := 0; i < good_n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
defer con.Close()
// dial these ones through
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error(err)
}
}()
}
before := time.Now()
for i := 0; i < good_n; i++ {
c, err := l1.Accept()
if err != nil {
t.Fatal("connections during hung dials should still work: ", err)
}
c.Close()
}
took := time.Now().Sub(before)
if took > time.Second*5 {
t.Fatal("hanging dials shouldnt block good dials")
}
wg.Wait()
go func() {
con, err := net.Dial("tcp", l1.Addr().String())
if err != nil {
log.Error(err)
t.Error("first dial failed: ", err)
return
}
defer con.Close()
// dial these ones through
err = msmux.SelectProtoOrFail(SecioTag, con)
if err != nil {
t.Error(err)
}
}()
// make sure we can dial in still after a bunch of timeouts
con, err := l1.Accept()
if err != nil {
t.Fatal(err)
}
con.Close()
l1.Close()
cancel()
time.Sleep(time.Millisecond * 100)
err = grc.CheckForLeaks(goroFilter)
if err != nil {
panic(err)
t.Fatal(err)
}
}

View File

@ -4,6 +4,8 @@ import (
"fmt"
"io"
"net"
"sync"
"time"
ic "github.com/ipfs/go-libp2p/p2p/crypto"
filter "github.com/ipfs/go-libp2p/p2p/net/filter"
@ -17,8 +19,15 @@ import (
ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr"
)
const SecioTag = "/secio/1.0.0"
const NoEncryptionTag = "/plaintext/1.0.0"
const (
SecioTag = "/secio/1.0.0"
NoEncryptionTag = "/plaintext/1.0.0"
)
var (
connAcceptBuffer = 32
NegotiateReadTimeout = time.Second * 60
)
// ConnWrapper is any function that wraps a raw multiaddr connection
type ConnWrapper func(transport.Conn) transport.Conn
@ -33,10 +42,15 @@ type listener struct {
filters *filter.Filters
wrapper ConnWrapper
catcher tec.TempErrCatcher
proc goprocess.Process
mux *msmux.MultistreamMuxer
incoming chan connErr
ctx context.Context
}
func (l *listener) teardown() error {
@ -57,62 +71,23 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) {
l.filters = fs
}
type connErr struct {
conn transport.Conn
err error
}
// Accept waits for and returns the next connection to the listener.
// Note that unfortunately this
func (l *listener) Accept() (net.Conn, error) {
// listeners dont have contexts. given changes dont make sense here anymore
// note that the parent of listener will Close, which will interrupt all io.
// Contexts and io don't mix.
ctx := context.Background()
var catcher tec.TempErrCatcher
catcher.IsTemp = func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
for con := range l.incoming {
if con.err != nil {
return nil, con.err
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
}
for {
maconn, err := l.Listener.Accept()
c, err := newSingleConn(l.ctx, l.local, "", con.conn)
if err != nil {
if catcher.IsTemporary(err) {
continue
}
return nil, err
}
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}
// If we have a wrapper func, wrap this conn
if l.wrapper != nil {
maconn = l.wrapper(maconn)
}
_, _, err = l.mux.Negotiate(maconn)
if err != nil {
log.Info("negotiation of crypto protocol failed: ", err)
continue
}
c, err := newSingleConn(ctx, l.local, "", maconn)
if err != nil {
if catcher.IsTemporary(err) {
con.conn.Close()
if l.catcher.IsTemporary(err) {
continue
}
return nil, err
@ -122,13 +97,15 @@ func (l *listener) Accept() (net.Conn, error) {
log.Warning("listener %s listening INSECURELY!", l)
return c, nil
}
sc, err := newSecureConn(ctx, l.privk, c)
sc, err := newSecureConn(l.ctx, l.privk, c)
if err != nil {
con.conn.Close()
log.Infof("ignoring conn we failed to secure: %s %s", err, c)
continue
}
return sc, nil
}
return nil, fmt.Errorf("listener is closed")
}
func (l *listener) Addr() net.Addr {
@ -157,14 +134,82 @@ func (l *listener) Loggable() map[string]interface{} {
}
}
func (l *listener) handleIncoming() {
var wg sync.WaitGroup
defer func() {
wg.Wait()
close(l.incoming)
}()
wg.Add(1)
defer wg.Done()
for {
maconn, err := l.Listener.Accept()
if err != nil {
if l.catcher.IsTemporary(err) {
continue
}
l.incoming <- connErr{err: err}
return
}
log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())
if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
continue
}
// If we have a wrapper func, wrap this conn
if l.wrapper != nil {
maconn = l.wrapper(maconn)
}
wg.Add(1)
go func() {
defer wg.Done()
maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout))
_, _, err = l.mux.Negotiate(maconn)
if err != nil {
log.Info("incoming conn: negotiation of crypto protocol failed: ", err)
maconn.Close()
return
}
// clear read readline
maconn.SetReadDeadline(time.Time{})
l.incoming <- connErr{conn: maconn}
}()
}
}
func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) {
l := &listener{
Listener: ml,
local: local,
privk: sk,
mux: msmux.NewMultistreamMuxer(),
incoming: make(chan connErr, connAcceptBuffer),
ctx: ctx,
}
l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown)
l.catcher.IsTemp = func(e error) bool {
// ignore connection breakages up to this point. but log them
if e == io.EOF {
log.Debugf("listener ignoring conn with EOF: %s", e)
return true
}
te, ok := e.(tec.Temporary)
if ok {
log.Debugf("listener ignoring conn with temporary err: %s", e)
return te.Temporary()
}
return false
}
if EncryptConnections {
l.mux.AddHandler(SecioTag, nil)
@ -172,6 +217,8 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee
l.mux.AddHandler(NoEncryptionTag, nil)
}
go l.handleIncoming()
log.Debugf("Conn Listener on %s", l.Multiaddr())
log.Event(ctx, "swarmListen", l)
return l, nil

View File

@ -1,145 +1,153 @@
{
"name": "go-libp2p",
"author": "whyrusleeping",
"version": "1.0.0",
"bugs": {},
"gx": {
"dvcsimport": "github.com/ipfs/go-libp2p"
},
"gxDependencies": [
{
"name": "go-semver",
"hash": "QmcrrEpx3VMUbrbgVroH3YiYyUS5c4YAykzyPJWKspUYLa",
"name": "go-semver",
"version": "0.0.0"
},
{
"name": "mdns",
"hash": "QmSscYPCcE1H3UQr2tnsJ2a9dK9LsHTBGgP71VW6fz67e5",
"name": "mdns",
"version": "0.0.0"
},
{
"name": "go-msgio",
"hash": "QmRQhVisS8dmPbjBUthVkenn81pBxrx1GxE281csJhm2vL",
"name": "go-msgio",
"version": "0.0.0"
},
{
"name": "go-ipfs-util",
"hash": "QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1",
"name": "go-ipfs-util",
"version": "1.0.0"
},
{
"name": "go-keyspace",
"hash": "QmUusaX99BZoELh7dmPgirqRQ1FAmMnmnBn3oiqDFGBUSc",
"name": "go-keyspace",
"version": "1.0.0"
},
{
"name": "go-multistream",
"hash": "QmUeEcYJrzAEKdQXjzTxCgNZgc9sRuwharsvzzm5Gd2oGB",
"name": "go-multistream",
"version": "0.0.0"
},
{
"name": "go-nat",
"hash": "QmNLvkCDV6ZjUJsEwGNporYBuZdhWT6q7TBVYQwwRv12HT",
"name": "go-nat",
"version": "0.0.0"
},
{
"name": "go-detect-race",
"hash": "QmQHGMVmrsgmqUG8ih3puNXUJneSpi13dkcZpzLKkskUkH",
"name": "go-detect-race",
"version": "0.0.0"
},
{
"name": "goprocess",
"hash": "QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn",
"name": "goprocess",
"version": "0.0.0"
},
{
"name": "go-log",
"hash": "Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH",
"name": "go-log",
"version": "0.0.0"
},
{
"name": "go-multiaddr-net",
"hash": "QmYVqhVfbK4BKvbW88Lhm26b3ud14sTBvcm1H7uWUx1Fkp",
"name": "go-multiaddr-net",
"version": "0.0.0"
},
{
"name": "go-multihash",
"hash": "QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku",
"name": "go-multihash",
"version": "0.0.0"
},
{
"name": "multiaddr-filter",
"hash": "QmPwfFAHUmvWDucLHRS9Xz2Kb1TNX2cY4LJ7pQjg9kVcae",
"name": "multiaddr-filter",
"version": "1.0.0"
},
{
"name": "go-base58",
"hash": "QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf",
"name": "go-base58",
"version": "0.0.0"
},
{
"name": "go-crypto",
"hash": "Qme1boxspcQWR8FBzMxeppqug2fYgYc15diNWmqgDVnvn2",
"name": "go-crypto",
"version": "0.0.0"
},
{
"name": "gogo-protobuf",
"hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV",
"name": "gogo-protobuf",
"version": "0.0.0"
},
{
"name": "go-multiaddr",
"hash": "QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz",
"name": "go-multiaddr",
"version": "0.0.0"
},
{
"name": "go-metrics",
"hash": "QmeYJHEk8UjVVZ4XCRTZe6dFQrb8pGWD81LYCgeLp8CvMB",
"name": "go-metrics",
"version": "0.0.0"
},
{
"name": "randbo",
"hash": "QmYvsG72GsfLgUeSojXArjnU6L4Wmwk7wuAxtNLuyXcc1T",
"name": "randbo",
"version": "0.0.0"
},
{
"name": "go-net",
"hash": "QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt",
"name": "go-net",
"version": "0.0.0"
},
{
"name": "go-stream-muxer",
"hash": "QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn",
"name": "go-stream-muxer",
"version": "0.0.0"
},
{
"name": "go-reuseport",
"hash": "QmaaC9QMYTQHCbMq3Ebr3uMaAR2ev4AVqMmsJpgQijAZbJ",
"name": "go-reuseport",
"version": "0.0.0"
},
{
"name": "go-notifier",
"hash": "QmbcS9XrwZkF1rZj8bBwwzoYhVuA2PCnPhFUL1pyWGgt2A",
"name": "go-notifier",
"version": "0.0.0"
},
{
"name": "go-temp-err-catcher",
"hash": "QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb",
"name": "go-temp-err-catcher",
"version": "0.0.0"
},
{
"name": "go-peerstream",
"hash": "QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8",
"name": "go-peerstream",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"name": "mafmt",
"hash": "QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx",
"name": "mafmt",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "QmTd4Jgb4nbJq5uR55KJgGLyHWmM3dovS21D1HcwRneSLu",
"name": "gorocheck",
"version": "0.0.0"
}
],
"gxVersion": "0.4.0",
"gx_version": "0.4.0",
"issues_url": "",
"language": "go",
"license": "",
"bugs": "",
"gxVersion": "0.4.0",
"gx": {
"dvcsimport": "github.com/ipfs/go-libp2p"
}
"name": "go-libp2p",
"version": "1.0.0"
}