diff --git a/p2p/net/conn/dial_test.go b/p2p/net/conn/dial_test.go index 2179cbd3..14e6c95d 100644 --- a/p2p/net/conn/dial_test.go +++ b/p2p/net/conn/dial_test.go @@ -5,7 +5,9 @@ import ( "fmt" "io" "net" + "runtime" "strings" + "sync" "testing" "time" @@ -14,11 +16,16 @@ import ( peer "github.com/ipfs/go-libp2p/p2p/peer" tu "github.com/ipfs/go-libp2p/testutil" + grc "gx/ipfs/QmTd4Jgb4nbJq5uR55KJgGLyHWmM3dovS21D1HcwRneSLu/gorocheck" msmux "gx/ipfs/QmUeEcYJrzAEKdQXjzTxCgNZgc9sRuwharsvzzm5Gd2oGB/go-multistream" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ) +func goroFilter(r *grc.Goroutine) bool { + return strings.Contains(r.Function, "go-log.") +} + func echoListen(ctx context.Context, listener Listener) { for { c, err := listener.Accept() @@ -397,3 +404,247 @@ func TestFailedAccept(t *testing.T) { c.Close() <-done } + +func TestHangingAccept(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1 := tu.RandPeerNetParamsOrFatal(t) + + l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey) + if err != nil { + t.Fatal(err) + } + + p1.Addr = l1.Multiaddr() // Addr has been determined by kernel. + + done := make(chan struct{}) + go func() { + defer close(done) + con, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + t.Error("first dial failed: ", err) + } + // hang this connection + defer con.Close() + + // ensure that the first conn hits first + time.Sleep(time.Millisecond * 50) + + con2, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + t.Error("second dial failed: ", err) + } + defer con2.Close() + + err = msmux.SelectProtoOrFail(SecioTag, con2) + if err != nil { + t.Error("msmux select failed: ", err) + } + + _, err = con2.Write([]byte("test")) + if err != nil { + t.Error("con write failed: ", err) + } + }() + + c, err := l1.Accept() + if err != nil { + t.Fatal("connections after a failed accept should still work: ", err) + } + + c.Close() + <-done +} + +// This test kicks off N (=300) concurrent dials, which wait d (=20ms) seconds before failing. +// That wait holds up the handshake (multistream AND crypto), which will happen BEFORE +// l1.Accept() returns a connection. This test checks that the handshakes all happen +// concurrently in the listener side, and not sequentially. This ensures that a hanging dial +// will not block the listener from accepting other dials concurrently. +func TestConcurrentAccept(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p1 := tu.RandPeerNetParamsOrFatal(t) + + l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey) + if err != nil { + t.Fatal(err) + } + + n := 300 + delay := time.Millisecond * 20 + if runtime.GOOS == "darwin" { + n = 100 + } + + p1.Addr = l1.Multiaddr() // Addr has been determined by kernel. + + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + con, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + log.Error(err) + t.Error("first dial failed: ", err) + return + } + // hang this connection + defer con.Close() + + time.Sleep(delay) + err = msmux.SelectProtoOrFail(SecioTag, con) + if err != nil { + t.Error(err) + } + }() + } + + before := time.Now() + for i := 0; i < n; i++ { + c, err := l1.Accept() + if err != nil { + t.Fatal("connections after a failed accept should still work: ", err) + } + + c.Close() + } + + limit := delay * time.Duration(n) + took := time.Now().Sub(before) + if took > limit { + t.Fatal("took too long!") + } + log.Errorf("took: %s (less than %s)", took, limit) + l1.Close() + wg.Wait() + cancel() + + time.Sleep(time.Millisecond * 100) + + err = grc.CheckForLeaks(goroFilter) + if err != nil { + panic(err) + t.Fatal(err) + } +} + +func TestConnectionTimeouts(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + old := NegotiateReadTimeout + NegotiateReadTimeout = time.Second * 5 + defer func() { NegotiateReadTimeout = old }() + + p1 := tu.RandPeerNetParamsOrFatal(t) + + l1, err := Listen(ctx, p1.Addr, p1.ID, p1.PrivKey) + if err != nil { + t.Fatal(err) + } + + n := 100 + if runtime.GOOS == "darwin" { + n = 50 + } + + p1.Addr = l1.Multiaddr() // Addr has been determined by kernel. + + var wg sync.WaitGroup + for i := 0; i < n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + con, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + log.Error(err) + t.Error("first dial failed: ", err) + return + } + defer con.Close() + + // hang this connection until timeout + io.ReadFull(con, make([]byte, 1000)) + }() + } + + // wait to make sure the hanging dials have started + time.Sleep(time.Millisecond * 50) + + good_n := 20 + for i := 0; i < good_n; i++ { + wg.Add(1) + go func() { + defer wg.Done() + con, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + log.Error(err) + t.Error("first dial failed: ", err) + return + } + defer con.Close() + + // dial these ones through + err = msmux.SelectProtoOrFail(SecioTag, con) + if err != nil { + t.Error(err) + } + }() + } + + before := time.Now() + for i := 0; i < good_n; i++ { + c, err := l1.Accept() + if err != nil { + t.Fatal("connections during hung dials should still work: ", err) + } + + c.Close() + } + + took := time.Now().Sub(before) + + if took > time.Second*5 { + t.Fatal("hanging dials shouldnt block good dials") + } + + wg.Wait() + + go func() { + con, err := net.Dial("tcp", l1.Addr().String()) + if err != nil { + log.Error(err) + t.Error("first dial failed: ", err) + return + } + defer con.Close() + + // dial these ones through + err = msmux.SelectProtoOrFail(SecioTag, con) + if err != nil { + t.Error(err) + } + }() + + // make sure we can dial in still after a bunch of timeouts + con, err := l1.Accept() + if err != nil { + t.Fatal(err) + } + + con.Close() + l1.Close() + cancel() + + time.Sleep(time.Millisecond * 100) + + err = grc.CheckForLeaks(goroFilter) + if err != nil { + panic(err) + t.Fatal(err) + } +} diff --git a/p2p/net/conn/listen.go b/p2p/net/conn/listen.go index 3d81cbaf..eee1fc23 100644 --- a/p2p/net/conn/listen.go +++ b/p2p/net/conn/listen.go @@ -4,6 +4,8 @@ import ( "fmt" "io" "net" + "sync" + "time" ic "github.com/ipfs/go-libp2p/p2p/crypto" filter "github.com/ipfs/go-libp2p/p2p/net/filter" @@ -17,8 +19,15 @@ import ( ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ) -const SecioTag = "/secio/1.0.0" -const NoEncryptionTag = "/plaintext/1.0.0" +const ( + SecioTag = "/secio/1.0.0" + NoEncryptionTag = "/plaintext/1.0.0" +) + +var ( + connAcceptBuffer = 32 + NegotiateReadTimeout = time.Second * 60 +) // ConnWrapper is any function that wraps a raw multiaddr connection type ConnWrapper func(transport.Conn) transport.Conn @@ -33,10 +42,15 @@ type listener struct { filters *filter.Filters wrapper ConnWrapper + catcher tec.TempErrCatcher proc goprocess.Process mux *msmux.MultistreamMuxer + + incoming chan connErr + + ctx context.Context } func (l *listener) teardown() error { @@ -57,62 +71,23 @@ func (l *listener) SetAddrFilters(fs *filter.Filters) { l.filters = fs } +type connErr struct { + conn transport.Conn + err error +} + // Accept waits for and returns the next connection to the listener. // Note that unfortunately this func (l *listener) Accept() (net.Conn, error) { - - // listeners dont have contexts. given changes dont make sense here anymore - // note that the parent of listener will Close, which will interrupt all io. - // Contexts and io don't mix. - ctx := context.Background() - - var catcher tec.TempErrCatcher - - catcher.IsTemp = func(e error) bool { - // ignore connection breakages up to this point. but log them - if e == io.EOF { - log.Debugf("listener ignoring conn with EOF: %s", e) - return true + for con := range l.incoming { + if con.err != nil { + return nil, con.err } - te, ok := e.(tec.Temporary) - if ok { - log.Debugf("listener ignoring conn with temporary err: %s", e) - return te.Temporary() - } - return false - } - - for { - maconn, err := l.Listener.Accept() + c, err := newSingleConn(l.ctx, l.local, "", con.conn) if err != nil { - if catcher.IsTemporary(err) { - continue - } - return nil, err - } - - log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr()) - - if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) { - log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr()) - maconn.Close() - continue - } - // If we have a wrapper func, wrap this conn - if l.wrapper != nil { - maconn = l.wrapper(maconn) - } - - _, _, err = l.mux.Negotiate(maconn) - if err != nil { - log.Info("negotiation of crypto protocol failed: ", err) - continue - } - - c, err := newSingleConn(ctx, l.local, "", maconn) - if err != nil { - if catcher.IsTemporary(err) { + con.conn.Close() + if l.catcher.IsTemporary(err) { continue } return nil, err @@ -122,13 +97,15 @@ func (l *listener) Accept() (net.Conn, error) { log.Warning("listener %s listening INSECURELY!", l) return c, nil } - sc, err := newSecureConn(ctx, l.privk, c) + sc, err := newSecureConn(l.ctx, l.privk, c) if err != nil { + con.conn.Close() log.Infof("ignoring conn we failed to secure: %s %s", err, c) continue } return sc, nil } + return nil, fmt.Errorf("listener is closed") } func (l *listener) Addr() net.Addr { @@ -157,14 +134,82 @@ func (l *listener) Loggable() map[string]interface{} { } } +func (l *listener) handleIncoming() { + var wg sync.WaitGroup + defer func() { + wg.Wait() + close(l.incoming) + }() + + wg.Add(1) + defer wg.Done() + + for { + maconn, err := l.Listener.Accept() + if err != nil { + if l.catcher.IsTemporary(err) { + continue + } + + l.incoming <- connErr{err: err} + return + } + + log.Debugf("listener %s got connection: %s <---> %s", l, maconn.LocalMultiaddr(), maconn.RemoteMultiaddr()) + + if l.filters != nil && l.filters.AddrBlocked(maconn.RemoteMultiaddr()) { + log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr()) + maconn.Close() + continue + } + // If we have a wrapper func, wrap this conn + if l.wrapper != nil { + maconn = l.wrapper(maconn) + } + + wg.Add(1) + go func() { + defer wg.Done() + maconn.SetReadDeadline(time.Now().Add(NegotiateReadTimeout)) + _, _, err = l.mux.Negotiate(maconn) + if err != nil { + log.Info("incoming conn: negotiation of crypto protocol failed: ", err) + maconn.Close() + return + } + + // clear read readline + maconn.SetReadDeadline(time.Time{}) + + l.incoming <- connErr{conn: maconn} + }() + } +} + func WrapTransportListener(ctx context.Context, ml transport.Listener, local peer.ID, sk ic.PrivKey) (Listener, error) { l := &listener{ Listener: ml, local: local, privk: sk, mux: msmux.NewMultistreamMuxer(), + incoming: make(chan connErr, connAcceptBuffer), + ctx: ctx, } l.proc = goprocessctx.WithContextAndTeardown(ctx, l.teardown) + l.catcher.IsTemp = func(e error) bool { + // ignore connection breakages up to this point. but log them + if e == io.EOF { + log.Debugf("listener ignoring conn with EOF: %s", e) + return true + } + + te, ok := e.(tec.Temporary) + if ok { + log.Debugf("listener ignoring conn with temporary err: %s", e) + return te.Temporary() + } + return false + } if EncryptConnections { l.mux.AddHandler(SecioTag, nil) @@ -172,6 +217,8 @@ func WrapTransportListener(ctx context.Context, ml transport.Listener, local pee l.mux.AddHandler(NoEncryptionTag, nil) } + go l.handleIncoming() + log.Debugf("Conn Listener on %s", l.Multiaddr()) log.Event(ctx, "swarmListen", l) return l, nil diff --git a/package.json b/package.json index 23982952..c0b2f0e1 100644 --- a/package.json +++ b/package.json @@ -1,145 +1,153 @@ { - "name": "go-libp2p", "author": "whyrusleeping", - "version": "1.0.0", + "bugs": {}, + "gx": { + "dvcsimport": "github.com/ipfs/go-libp2p" + }, "gxDependencies": [ { - "name": "go-semver", "hash": "QmcrrEpx3VMUbrbgVroH3YiYyUS5c4YAykzyPJWKspUYLa", + "name": "go-semver", "version": "0.0.0" }, { - "name": "mdns", "hash": "QmSscYPCcE1H3UQr2tnsJ2a9dK9LsHTBGgP71VW6fz67e5", + "name": "mdns", "version": "0.0.0" }, { - "name": "go-msgio", "hash": "QmRQhVisS8dmPbjBUthVkenn81pBxrx1GxE281csJhm2vL", + "name": "go-msgio", "version": "0.0.0" }, { - "name": "go-ipfs-util", "hash": "QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1", + "name": "go-ipfs-util", "version": "1.0.0" }, { - "name": "go-keyspace", "hash": "QmUusaX99BZoELh7dmPgirqRQ1FAmMnmnBn3oiqDFGBUSc", + "name": "go-keyspace", "version": "1.0.0" }, { - "name": "go-multistream", "hash": "QmUeEcYJrzAEKdQXjzTxCgNZgc9sRuwharsvzzm5Gd2oGB", + "name": "go-multistream", "version": "0.0.0" }, { - "name": "go-nat", "hash": "QmNLvkCDV6ZjUJsEwGNporYBuZdhWT6q7TBVYQwwRv12HT", + "name": "go-nat", "version": "0.0.0" }, { - "name": "go-detect-race", "hash": "QmQHGMVmrsgmqUG8ih3puNXUJneSpi13dkcZpzLKkskUkH", + "name": "go-detect-race", "version": "0.0.0" }, { - "name": "goprocess", "hash": "QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn", + "name": "goprocess", "version": "0.0.0" }, { - "name": "go-log", "hash": "Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH", + "name": "go-log", "version": "0.0.0" }, { - "name": "go-multiaddr-net", "hash": "QmYVqhVfbK4BKvbW88Lhm26b3ud14sTBvcm1H7uWUx1Fkp", + "name": "go-multiaddr-net", "version": "0.0.0" }, { - "name": "go-multihash", "hash": "QmYf7ng2hG5XBtJA3tN34DQ2GUN5HNksEw1rLDkmr6vGku", + "name": "go-multihash", "version": "0.0.0" }, { - "name": "multiaddr-filter", "hash": "QmPwfFAHUmvWDucLHRS9Xz2Kb1TNX2cY4LJ7pQjg9kVcae", + "name": "multiaddr-filter", "version": "1.0.0" }, { - "name": "go-base58", "hash": "QmT8rehPR3F6bmwL6zjUN8XpiDBFFpMP2myPdC6ApsWfJf", + "name": "go-base58", "version": "0.0.0" }, { - "name": "go-crypto", "hash": "Qme1boxspcQWR8FBzMxeppqug2fYgYc15diNWmqgDVnvn2", + "name": "go-crypto", "version": "0.0.0" }, { - "name": "gogo-protobuf", "hash": "QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV", + "name": "gogo-protobuf", "version": "0.0.0" }, { - "name": "go-multiaddr", "hash": "QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz", + "name": "go-multiaddr", "version": "0.0.0" }, { - "name": "go-metrics", "hash": "QmeYJHEk8UjVVZ4XCRTZe6dFQrb8pGWD81LYCgeLp8CvMB", + "name": "go-metrics", "version": "0.0.0" }, { - "name": "randbo", "hash": "QmYvsG72GsfLgUeSojXArjnU6L4Wmwk7wuAxtNLuyXcc1T", + "name": "randbo", "version": "0.0.0" }, { - "name": "go-net", "hash": "QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt", + "name": "go-net", "version": "0.0.0" }, { - "name": "go-stream-muxer", "hash": "QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn", + "name": "go-stream-muxer", "version": "0.0.0" }, { - "name": "go-reuseport", "hash": "QmaaC9QMYTQHCbMq3Ebr3uMaAR2ev4AVqMmsJpgQijAZbJ", + "name": "go-reuseport", "version": "0.0.0" }, { - "name": "go-notifier", "hash": "QmbcS9XrwZkF1rZj8bBwwzoYhVuA2PCnPhFUL1pyWGgt2A", + "name": "go-notifier", "version": "0.0.0" }, { - "name": "go-temp-err-catcher", "hash": "QmWHgLqrghM9zw77nF6gdvT9ExQ2RB9pLxkd8sDHZf1rWb", + "name": "go-temp-err-catcher", "version": "0.0.0" }, { - "name": "go-peerstream", "hash": "QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8", + "name": "go-peerstream", "version": "0.0.0" }, { "author": "whyrusleeping", - "name": "mafmt", "hash": "QmWLfU4tstw2aNcTykDm44xbSTCYJ9pUJwfhQCKGwckcHx", + "name": "mafmt", + "version": "0.0.0" + }, + { + "author": "whyrusleeping", + "hash": "QmTd4Jgb4nbJq5uR55KJgGLyHWmM3dovS21D1HcwRneSLu", + "name": "gorocheck", "version": "0.0.0" } ], + "gxVersion": "0.4.0", + "gx_version": "0.4.0", + "issues_url": "", "language": "go", "license": "", - "bugs": "", - "gxVersion": "0.4.0", - "gx": { - "dvcsimport": "github.com/ipfs/go-libp2p" - } + "name": "go-libp2p", + "version": "1.0.0" } \ No newline at end of file