diff --git a/p2p/muxer/mplex/conn.go b/p2p/muxer/mplex/conn.go new file mode 100644 index 00000000..c8bcb478 --- /dev/null +++ b/p2p/muxer/mplex/conn.go @@ -0,0 +1,43 @@ +package peerstream_multiplex + +import ( + "context" + + "github.com/libp2p/go-libp2p-core/network" + + mp "github.com/libp2p/go-mplex" +) + +type conn mp.Multiplex + +var _ network.MuxedConn = &conn{} + +func (c *conn) Close() error { + return c.mplex().Close() +} + +func (c *conn) IsClosed() bool { + return c.mplex().IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { + s, err := c.mplex().NewStream(ctx) + if err != nil { + return nil, err + } + return (*stream)(s), nil +} + +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (network.MuxedStream, error) { + s, err := c.mplex().Accept() + if err != nil { + return nil, err + } + return (*stream)(s), nil +} + +func (c *conn) mplex() *mp.Multiplex { + return (*mp.Multiplex)(c) +} diff --git a/p2p/muxer/mplex/stream.go b/p2p/muxer/mplex/stream.go new file mode 100644 index 00000000..0a841824 --- /dev/null +++ b/p2p/muxer/mplex/stream.go @@ -0,0 +1,64 @@ +package peerstream_multiplex + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/network" + + mp "github.com/libp2p/go-mplex" +) + +// stream implements network.MuxedStream over mplex.Stream. +type stream mp.Stream + +var _ network.MuxedStream = &stream{} + +func (s *stream) Read(b []byte) (n int, err error) { + n, err = s.mplex().Read(b) + if err == mp.ErrStreamReset { + err = network.ErrReset + } + + return n, err +} + +func (s *stream) Write(b []byte) (n int, err error) { + n, err = s.mplex().Write(b) + if err == mp.ErrStreamReset { + err = network.ErrReset + } + + return n, err +} + +func (s *stream) Close() error { + return s.mplex().Close() +} + +func (s *stream) CloseWrite() error { + return s.mplex().CloseWrite() +} + +func (s *stream) CloseRead() error { + return s.mplex().CloseRead() +} + +func (s *stream) Reset() error { + return s.mplex().Reset() +} + +func (s *stream) SetDeadline(t time.Time) error { + return s.mplex().SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.mplex().SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.mplex().SetWriteDeadline(t) +} + +func (s *stream) mplex() *mp.Stream { + return (*mp.Stream)(s) +} diff --git a/p2p/muxer/mplex/transport.go b/p2p/muxer/mplex/transport.go new file mode 100644 index 00000000..8f9e33a9 --- /dev/null +++ b/p2p/muxer/mplex/transport.go @@ -0,0 +1,26 @@ +package peerstream_multiplex + +import ( + "net" + + "github.com/libp2p/go-libp2p-core/network" + + mp "github.com/libp2p/go-mplex" +) + +// DefaultTransport has default settings for Transport +var DefaultTransport = &Transport{} + +var _ network.Multiplexer = &Transport{} + +// Transport implements mux.Multiplexer that constructs +// mplex-backed muxed connections. +type Transport struct{} + +func (t *Transport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { + m, err := mp.NewMultiplex(nc, isServer, scope) + if err != nil { + return nil, err + } + return (*conn)(m), nil +} diff --git a/p2p/muxer/mplex/transport_test.go b/p2p/muxer/mplex/transport_test.go new file mode 100644 index 00000000..c224667c --- /dev/null +++ b/p2p/muxer/mplex/transport_test.go @@ -0,0 +1,52 @@ +package peerstream_multiplex + +import ( + "errors" + "net" + "testing" + + "github.com/libp2p/go-libp2p-core/network" + test "github.com/libp2p/go-libp2p-testing/suites/mux" +) + +func TestDefaultTransport(t *testing.T) { + test.SubtestAll(t, DefaultTransport) +} + +type memoryScope struct { + network.PeerScope + limit int + reserved int +} + +func (m *memoryScope) ReserveMemory(size int, prio uint8) error { + if m.reserved+size > m.limit { + return errors.New("too much") + } + m.reserved += size + return nil +} + +func (m *memoryScope) ReleaseMemory(size int) { + m.reserved -= size + if m.reserved < 0 { + panic("too much memory released") + } +} + +type memoryLimitedTransport struct { + Transport +} + +func (t *memoryLimitedTransport) NewConn(nc net.Conn, isServer bool, scope network.PeerScope) (network.MuxedConn, error) { + return t.Transport.NewConn(nc, isServer, &memoryScope{ + limit: 3 * 1 << 20, + PeerScope: scope, + }) +} + +func TestDefaultTransportWithMemoryLimit(t *testing.T) { + test.SubtestAll(t, &memoryLimitedTransport{ + Transport: *DefaultTransport, + }) +}