From 955ae93a4b0c4c2f4efdee618eb990e93936c147 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 7 Mar 2016 22:43:13 -0800 Subject: [PATCH 1/2] switch to new version of go-stream-muxer --- p2p/net/swarm/swarm.go | 25 +++++++++++++++++++++---- p2p/net/swarm/swarm_conn.go | 2 +- p2p/net/swarm/swarm_listen.go | 6 +++--- p2p/net/swarm/swarm_stream.go | 2 +- p2p/net/swarm/swarm_test.go | 2 +- p2p/test/reconnects/reconnect_test.go | 26 +++++++++++++------------- package.json | 4 ++-- 7 files changed, 42 insertions(+), 25 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 8e4f32f2..0fd7a25e 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -4,6 +4,7 @@ package swarm import ( "fmt" + "io/ioutil" "sync" "time" @@ -16,11 +17,13 @@ import ( transport "github.com/ipfs/go-libp2p/p2p/net/transport" peer "github.com/ipfs/go-libp2p/p2p/peer" - ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context" - pst "gx/ipfs/QmTYr6RrJs8b63LTVwahmtytnuqzsLfNPBJp6EvmFWHbGh/go-stream-muxer" - psmss "gx/ipfs/QmTYr6RrJs8b63LTVwahmtytnuqzsLfNPBJp6EvmFWHbGh/go-stream-muxer/multistream" + pst "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer" + psmss "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/multistream" + spdy "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/spdystream" + yamux "gx/ipfs/QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn/go-stream-muxer/yamux" + ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" mafilter "gx/ipfs/QmcR6dLYF8Eozaae3wGd5wjq76bofzmmbvQmtwobxvfhEt/multiaddr-filter" @@ -32,7 +35,21 @@ var log = logging.Logger("swarm2") var PSTransport pst.Transport func init() { - PSTransport = psmss.NewTransport() + msstpt := psmss.NewBlankTransport() + + ymxtpt := &yamux.Transport{ + AcceptBacklog: 2048, + ConnectionWriteTimeout: time.Second * 10, + KeepAliveInterval: time.Second * 30, + EnableKeepAlive: true, + MaxStreamWindowSize: uint32(1024 * 256), + LogOutput: ioutil.Discard, + } + + msstpt.AddTransport("/yamux", ymxtpt) + msstpt.AddTransport("/spdystream", spdy.Transport) + + PSTransport = msstpt } // Swarm is a connection muxer, allowing connections to other peers to diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index a07818d4..04e9a403 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -8,7 +8,7 @@ import ( conn "github.com/ipfs/go-libp2p/p2p/net/conn" peer "github.com/ipfs/go-libp2p/p2p/peer" - ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" + ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" ) diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 50550248..a4d5e943 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -9,10 +9,10 @@ import ( conn "github.com/ipfs/go-libp2p/p2p/net/conn" transport "github.com/ipfs/go-libp2p/p2p/net/transport" - ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" + ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" -) // Open listeners and reuse-dialers for the given addresses +) // Open listeners and reuse-dialers for the given addresses func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { errs := make([]error, len(addrs)) @@ -152,7 +152,7 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn { if err != nil { log.Debug(err) log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err)) - c.Close() // boom. close it. + c.Close() // boom. close it. return nil } diff --git a/p2p/net/swarm/swarm_stream.go b/p2p/net/swarm/swarm_stream.go index 3f9115fc..3fbcc5d3 100644 --- a/p2p/net/swarm/swarm_stream.go +++ b/p2p/net/swarm/swarm_stream.go @@ -3,7 +3,7 @@ package swarm import ( inet "github.com/ipfs/go-libp2p/p2p/net" - ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" + ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream" ) // a Stream is a wrapper around a ps.Stream that exposes a way to get diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index fe2ef2a2..c69c7db3 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -24,7 +24,7 @@ func EchoStreamHandler(stream inet.Stream) { // pull out the ipfs conn c := stream.Conn() - log.Errorf("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) + log.Infof("%s ponging to %s", c.LocalPeer(), c.RemotePeer()) buf := make([]byte, 4) diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go index 19b39e8e..df2d09a9 100644 --- a/p2p/test/reconnects/reconnect_test.go +++ b/p2p/test/reconnects/reconnect_test.go @@ -12,7 +12,7 @@ import ( swarm "github.com/ipfs/go-libp2p/p2p/net/swarm" protocol "github.com/ipfs/go-libp2p/p2p/protocol" testutil "github.com/ipfs/go-libp2p/p2p/test/util" - ps "gx/ipfs/QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK/go-peerstream" + ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream" u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" @@ -35,20 +35,20 @@ func EchoStreamHandler(stream inet.Stream) { } type sendChans struct { - send chan struct{} - sent chan struct{} - read chan struct{} - close_ chan struct{} - closed chan struct{} + send chan struct{} + sent chan struct{} + read chan struct{} + close_ chan struct{} + closed chan struct{} } func newSendChans() sendChans { return sendChans{ - send: make(chan struct{}), - sent: make(chan struct{}), - read: make(chan struct{}), - close_: make(chan struct{}), - closed: make(chan struct{}), + send: make(chan struct{}), + sent: make(chan struct{}), + read: make(chan struct{}), + close_: make(chan struct{}), + closed: make(chan struct{}), } } @@ -188,7 +188,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { go sF(s) log.Debugf("getting handle %d", j) - sc := <-ss // wait to get handle. + sc := <-ss // wait to get handle. log.Debugf("spawning worker %d", j) for k := 0; k < numMsgs; k++ { @@ -215,7 +215,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { for _, c := range cs { sc := c.(*swarm.Conn) if sc.LocalPeer() > sc.RemotePeer() { - continue // only close it on one side. + continue // only close it on one side. } log.Debugf("closing: %s", sc.RawConn()) diff --git a/package.json b/package.json index 24974775..a0f6932d 100644 --- a/package.json +++ b/package.json @@ -105,7 +105,7 @@ }, { "name": "go-stream-muxer", - "hash": "QmTYr6RrJs8b63LTVwahmtytnuqzsLfNPBJp6EvmFWHbGh", + "hash": "QmWSJzRkCMJFHYUQZxKwPX8WA7XipaPtfiwMPARP51ymfn", "version": "0.0.0" }, { @@ -125,7 +125,7 @@ }, { "name": "go-peerstream", - "hash": "QmQDPXRFzRcCGPbPViQCKjzbQBkZGpLV1f9KwXnksSNcTK", + "hash": "QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8", "version": "0.0.0" }, { From ec85e0bbdff2be839b34272d51cb911b5941c104 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 8 Mar 2016 15:47:25 -0800 Subject: [PATCH 2/2] bump yamux backlog setting way up --- p2p/net/swarm/swarm.go | 2 +- p2p/net/swarm/swarm_listen.go | 5 +++-- p2p/test/reconnects/reconnect_test.go | 24 ++++++++++++------------ 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0fd7a25e..fd66f730 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -38,7 +38,7 @@ func init() { msstpt := psmss.NewBlankTransport() ymxtpt := &yamux.Transport{ - AcceptBacklog: 2048, + AcceptBacklog: 8192, ConnectionWriteTimeout: time.Second * 10, KeepAliveInterval: time.Second * 30, EnableKeepAlive: true, diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index a4d5e943..b83df455 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -12,8 +12,9 @@ import ( ps "gx/ipfs/QmZK81vcgMhpb2t7GNbozk7qzt6Rj4zFqitpvsWT9mduW8/go-peerstream" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ma "gx/ipfs/QmcobAGsCjYt5DXoq9et9L8yR8er7o7Cu3DTvpaq12jYSz/go-multiaddr" -) // Open listeners and reuse-dialers for the given addresses +) +// Open listeners and reuse-dialers for the given addresses func (s *Swarm) setupInterfaces(addrs []ma.Multiaddr) error { errs := make([]error, len(addrs)) var succeeded int @@ -152,7 +153,7 @@ func (s *Swarm) connHandler(c *ps.Conn) *Conn { if err != nil { log.Debug(err) log.Event(ctx, "newConnHandlerDisconnect", lgbl.NetConn(c.NetConn()), lgbl.Error(err)) - c.Close() // boom. close it. + c.Close() // boom. close it. return nil } diff --git a/p2p/test/reconnects/reconnect_test.go b/p2p/test/reconnects/reconnect_test.go index df2d09a9..71a57b6a 100644 --- a/p2p/test/reconnects/reconnect_test.go +++ b/p2p/test/reconnects/reconnect_test.go @@ -35,20 +35,20 @@ func EchoStreamHandler(stream inet.Stream) { } type sendChans struct { - send chan struct{} - sent chan struct{} - read chan struct{} - close_ chan struct{} - closed chan struct{} + send chan struct{} + sent chan struct{} + read chan struct{} + close_ chan struct{} + closed chan struct{} } func newSendChans() sendChans { return sendChans{ - send: make(chan struct{}), - sent: make(chan struct{}), - read: make(chan struct{}), - close_: make(chan struct{}), - closed: make(chan struct{}), + send: make(chan struct{}), + sent: make(chan struct{}), + read: make(chan struct{}), + close_: make(chan struct{}), + closed: make(chan struct{}), } } @@ -188,7 +188,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { go sF(s) log.Debugf("getting handle %d", j) - sc := <-ss // wait to get handle. + sc := <-ss // wait to get handle. log.Debugf("spawning worker %d", j) for k := 0; k < numMsgs; k++ { @@ -215,7 +215,7 @@ func SubtestConnSendDisc(t *testing.T, hosts []host.Host) { for _, c := range cs { sc := c.(*swarm.Conn) if sc.LocalPeer() > sc.RemotePeer() { - continue // only close it on one side. + continue // only close it on one side. } log.Debugf("closing: %s", sc.RawConn())