diff --git a/go.mod b/go.mod index fed40f9e..49d4fae8 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/libp2p/go-libp2p-testing v0.9.2 github.com/libp2p/go-libp2p-tls v0.4.1 github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 + github.com/libp2p/go-mplex v0.7.0 github.com/libp2p/go-msgio v0.2.0 github.com/libp2p/go-netroute v0.2.0 github.com/libp2p/go-reuseport v0.1.0 diff --git a/go.sum b/go.sum index d47bff04..3de3c758 100644 --- a/go.sum +++ b/go.sum @@ -486,8 +486,9 @@ github.com/libp2p/go-libp2p-yamux v0.9.1 h1:oplewiRix8s45SOrI30rCPZG5mM087YZp+VY github.com/libp2p/go-libp2p-yamux v0.9.1/go.mod h1:wRc6wvyxQINFcKe7daL4BeQ02Iyp+wxyC8WCNfngBrA= github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU= github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ= -github.com/libp2p/go-mplex v0.4.0 h1:Ukkez9/4EOX5rTw4sHefNJp10dksftAA05ZgyjplUbM= github.com/libp2p/go-mplex v0.4.0/go.mod h1:y26Lx+wNVtMYMaPu300Cbot5LkEZ4tJaNYeHeT9dh6E= +github.com/libp2p/go-mplex v0.7.0 h1:BDhFZdlk5tbr0oyFq/xv/NPGfjbnrsDam1EvutpBDbY= +github.com/libp2p/go-mplex v0.7.0/go.mod h1:rW8ThnRcYWft/Jb2jeORBmPd6xuG3dGxWN/W168L9EU= github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.6/go.mod h1:4ecVB6d9f4BDSL5fqvPiC4A3KivjWn+Venn/1ALLMWA= github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU= diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go new file mode 100644 index 00000000..9c681d16 --- /dev/null +++ b/p2p/muxer/mplex/conn.go @@ -0,0 +1,43 @@ +package mplex + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/network" + + mp "github.com/libp2p/go-mplex" +) + +type conn mp.Multiplex + +var _ network.MuxedConn = &conn{} + +func (c *conn) Close() error { + return c.mplex().Close() +} + +func (c *conn) IsClosed() bool { + return c.mplex().IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { + s, err := c.mplex().NewStream(ctx) + if err != nil { + return nil, err + } + return (*stream)(s), nil +} + +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (network.MuxedStream, error) { + s, err := c.mplex().Accept() + if err != nil { + return nil, err + } + return (*stream)(s), nil +} + +func (c *conn) mplex() *mp.Multiplex { + return (*mp.Multiplex)(c) +} diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go new file mode 100644 index 00000000..64b70fbd --- /dev/null +++ b/p2p/muxer/mplex/stream.go @@ -0,0 +1,64 @@ +package mplex + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/network" + + mp "github.com/libp2p/go-mplex" +) + +// stream implements network.MuxedStream over mplex.Stream. +type stream mp.Stream + +var _ network.MuxedStream = &stream{} + +func (s *stream) Read(b []byte) (n int, err error) { + n, err = s.mplex().Read(b) + if err == mp.ErrStreamReset { + err = network.ErrReset + } + + return n, err +} + +func (s *stream) Write(b []byte) (n int, err error) { + n, err = s.mplex().Write(b) + if err == mp.ErrStreamReset { + err = network.ErrReset + } + + return n, err +} + +func (s *stream) Close() error { + return s.mplex().Close() +} + +func (s *stream) CloseWrite() error { + return s.mplex().CloseWrite() +} + +func (s *stream) CloseRead() error { + return s.mplex().CloseRead() +} + +func (s *stream) Reset() error { + return s.mplex().Reset() +} + +func (s *stream) SetDeadline(t time.Time) error { + return s.mplex().SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.mplex().SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.mplex().SetWriteDeadline(t) +} + +func (s *stream) mplex() *mp.Stream { + return (*mp.Stream)(s) +} diff --git a/p2p/muxer/mplex/transport.go b/p2p/muxer/mplex/transport.go new file mode 100644 index 00000000..f4f0b219 --- /dev/null +++ b/p2p/muxer/mplex/transport.go @@ -0,0 +1,26 @@ +package mplex + +import ( + "net" + + "github.com/libp2p/go-libp2p-core/network" + + mp "github.com/libp2p/go-mplex" +) + +// DefaultTransport has default settings for Transport +var DefaultTransport = &Transport{} + +var _ network.Multiplexer = &Transport{} + +// Transport implements mux.Multiplexer that constructs +// mplex-backed muxed connections. +type Transport struct{} + +func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { + m, err := mp.NewMultiplex(nc, isServer, scope) + if err != nil { + return nil, err + } + return (*conn)(m), nil +} diff --git a/p2p/muxer/mplex/transport_test.go b/p2p/muxer/mplex/transport_test.go new file mode 100644 index 00000000..d4474526 --- /dev/null +++ b/p2p/muxer/mplex/transport_test.go @@ -0,0 +1,52 @@ +package mplex + +import ( + "errors" + "net" + "testing" + + "github.com/libp2p/go-libp2p-core/network" + test "github.com/libp2p/go-libp2p-testing/suites/mux" +) + +func TestDefaultTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} + +type memoryScope struct { + network.PeerScope + limit int + reserved int +} + +func (m *memoryScope) ReserveMemory(size int, prio uint8) error { + if m.reserved+size > m.limit { + return errors.New("too much") + } + m.reserved += size + return nil +} + +func (m *memoryScope) ReleaseMemory(size int) { + m.reserved -= size + if m.reserved < 0 { + panic("too much memory released") + } +} + +type memoryLimitedTransport struct { + Transport +} + +func (t *memoryLimitedTransport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { + return t.Transport.NewConn(nc, isServer, &memoryScope{ + limit: 3 * 1 << 20, + PeerScope: scope, + }) +} + +func TestDefaultTransportWithMemoryLimit(t *testing.T) { + test.SubtestAll(t, &memoryLimitedTransport{ + Transport: *DefaultTransport, + }) +}