package yamux

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"log"
	"math"
	"net"
	"os"
	"runtime/debug"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	pool "github.com/libp2p/go-buffer-pool"
)

// The MemoryManager allows management of memory allocations.
// Memory is allocated:
// 1. When opening / accepting a new stream. This uses the highest priority.
// 2. When trying to increase the stream receive window. This uses a lower priority.
// This is a subset of the libp2p's resource manager ResourceScopeSpan interface.
type MemoryManager interface {
	ReserveMemory(size int, prio uint8) error

	// ReleaseMemory explicitly releases memory previously reserved with ReserveMemory
	ReleaseMemory(size int)

	// Done ends the span and releases associated resources.
	Done()
}

type nullMemoryManagerImpl struct{}

func (n nullMemoryManagerImpl) ReserveMemory(size int, prio uint8) error { return nil }
func (n nullMemoryManagerImpl) ReleaseMemory(size int)                   {}
func (n nullMemoryManagerImpl) Done()                                    {}

var nullMemoryManager = &nullMemoryManagerImpl{}

// Session is used to wrap a reliable ordered connection and to
// multiplex it into multiple streams.
type Session struct {
	rtt int64 // to be accessed atomically, in nanoseconds

	// remoteGoAway indicates the remote side does
	// not want futher connections. Must be first for alignment.
	remoteGoAway int32

	// localGoAway indicates that we should stop
	// accepting futher connections. Must be first for alignment.
	localGoAway int32

	// nextStreamID is the next stream we should
	// send. This depends if we are a client/server.
	nextStreamID uint32

	// config holds our configuration
	config *Config

	// logger is used for our logs
	logger *log.Logger

	// conn is the underlying connection
	conn net.Conn

	// reader is a buffered reader
	reader io.Reader

	newMemoryManager func() (MemoryManager, error)

	// pings is used to track inflight pings
	pingLock   sync.Mutex
	pingID     uint32
	activePing *ping

	// streams maps a stream id to a stream, and inflight has an entry
	// for any outgoing stream that has not yet been established. Both are
	// protected by streamLock.
	numIncomingStreams uint32
	streams            map[uint32]*Stream
	inflight           map[uint32]struct{}
	streamLock         sync.Mutex

	// synCh acts like a semaphore. It is sized to the AcceptBacklog which
	// is assumed to be symmetric between the client and server. This allows
	// the client to avoid exceeding the backlog and instead blocks the open.
	synCh chan struct{}

	// acceptCh is used to pass ready streams to the client
	acceptCh chan *Stream

	// sendCh is used to send messages
	sendCh chan []byte

	// pingCh and pingCh are used to send pings and pongs
	pongCh, pingCh chan uint32

	// recvDoneCh is closed when recv() exits to avoid a race
	// between stream registration and stream shutdown
	recvDoneCh chan struct{}

	// sendDoneCh is closed when send() exits to avoid a race
	// between returning from a Stream.Write and exiting from the send loop
	// (which may be reading a buffer on-load-from Stream.Write).
	sendDoneCh chan struct{}

	// client is true if we're the client and our stream IDs should be odd.
	client bool

	// shutdown is used to safely close a session
	shutdown     bool
	shutdownErr  error
	shutdownCh   chan struct{}
	shutdownLock sync.Mutex

	// keepaliveTimer is a periodic timer for keepalive messages. It's nil
	// when keepalives are disabled.
	keepaliveLock   sync.Mutex
	keepaliveTimer  *time.Timer
	keepaliveActive bool
}

// newSession is used to construct a new session
func newSession(config *Config, conn net.Conn, client bool, readBuf int, newMemoryManager func() (MemoryManager, error)) *Session {
	var reader io.Reader = conn
	if readBuf > 0 {
		reader = bufio.NewReaderSize(reader, readBuf)
	}
	if newMemoryManager == nil {
		newMemoryManager = func() (MemoryManager, error) { return nullMemoryManager, nil }
	}
	s := &Session{
		config:           config,
		client:           client,
		logger:           log.New(config.LogOutput, "", log.LstdFlags),
		conn:             conn,
		reader:           reader,
		streams:          make(map[uint32]*Stream),
		inflight:         make(map[uint32]struct{}),
		synCh:            make(chan struct{}, config.AcceptBacklog),
		acceptCh:         make(chan *Stream, config.AcceptBacklog),
		sendCh:           make(chan []byte, 64),
		pongCh:           make(chan uint32, config.PingBacklog),
		pingCh:           make(chan uint32),
		recvDoneCh:       make(chan struct{}),
		sendDoneCh:       make(chan struct{}),
		shutdownCh:       make(chan struct{}),
		newMemoryManager: newMemoryManager,
	}
	if client {
		s.nextStreamID = 1
	} else {
		s.nextStreamID = 2
	}
	if config.EnableKeepAlive {
		s.startKeepalive()
	}
	go s.recv()
	go s.send()
	go s.startMeasureRTT()
	return s
}

// IsClosed does a safe check to see if we have shutdown
func (s *Session) IsClosed() bool {
	select {
	case <-s.shutdownCh:
		return true
	default:
		return false
	}
}

// CloseChan returns a read-only channel which is closed as
// soon as the session is closed.
func (s *Session) CloseChan() <-chan struct{} {
	return s.shutdownCh
}

// NumStreams returns the number of currently open streams
func (s *Session) NumStreams() int {
	s.streamLock.Lock()
	num := len(s.streams)
	s.streamLock.Unlock()
	return num
}

// Open is used to create a new stream as a net.Conn
func (s *Session) Open(ctx context.Context) (net.Conn, error) {
	conn, err := s.OpenStream(ctx)
	if err != nil {
		return nil, err
	}
	return conn, nil
}

// OpenStream is used to create a new stream
func (s *Session) OpenStream(ctx context.Context) (*Stream, error) {
	if s.IsClosed() {
		return nil, s.shutdownErr
	}
	if atomic.LoadInt32(&s.remoteGoAway) == 1 {
		return nil, ErrRemoteGoAway
	}

	// Block if we have too many inflight SYNs
	select {
	case s.synCh <- struct{}{}:
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-s.shutdownCh:
		return nil, s.shutdownErr
	}

	span, err := s.newMemoryManager()
	if err != nil {
		return nil, fmt.Errorf("failed to create resource scope span: %w", err)
	}
	if err := span.ReserveMemory(initialStreamWindow, 255); err != nil {
		return nil, err
	}

GET_ID:
	// Get an ID, and check for stream exhaustion
	id := atomic.LoadUint32(&s.nextStreamID)
	if id >= math.MaxUint32-1 {
		span.Done()
		return nil, ErrStreamsExhausted
	}
	if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
		goto GET_ID
	}

	// Register the stream
	stream := newStream(s, id, streamInit, initialStreamWindow, span)
	s.streamLock.Lock()
	s.streams[id] = stream
	s.inflight[id] = struct{}{}
	s.streamLock.Unlock()

	// Send the window update to create
	if err := stream.sendWindowUpdate(ctx.Done()); err != nil {
		defer span.Done()
		select {
		case <-s.synCh:
		default:
			s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
		}
		return nil, err
	}
	return stream, nil
}

// Accept is used to block until the next available stream
// is ready to be accepted.
func (s *Session) Accept() (net.Conn, error) {
	conn, err := s.AcceptStream()
	if err != nil {
		return nil, err
	}
	return conn, err
}

// AcceptStream is used to block until the next available stream
// is ready to be accepted.
func (s *Session) AcceptStream() (*Stream, error) {
	for {
		select {
		case stream := <-s.acceptCh:
			if err := stream.sendWindowUpdate(nil); err != nil {
				// don't return accept errors.
				s.logger.Printf("[WARN] error sending window update before accepting: %s", err)
				continue
			}
			return stream, nil
		case <-s.shutdownCh:
			return nil, s.shutdownErr
		}
	}
}

// Close is used to close the session and all streams.
// Attempts to send a GoAway before closing the connection.
func (s *Session) Close() error {
	s.shutdownLock.Lock()
	defer s.shutdownLock.Unlock()

	if s.shutdown {
		return nil
	}
	s.shutdown = true
	if s.shutdownErr == nil {
		s.shutdownErr = ErrSessionShutdown
	}
	close(s.shutdownCh)
	s.conn.Close()
	s.stopKeepalive()
	<-s.recvDoneCh
	<-s.sendDoneCh

	s.streamLock.Lock()
	defer s.streamLock.Unlock()
	for id, stream := range s.streams {
		stream.forceClose()
		delete(s.streams, id)
		stream.memorySpan.Done()
	}
	return nil
}

// exitErr is used to handle an error that is causing the
// session to terminate.
func (s *Session) exitErr(err error) {
	s.shutdownLock.Lock()
	if s.shutdownErr == nil {
		s.shutdownErr = err
	}
	s.shutdownLock.Unlock()
	s.Close()
}

// GoAway can be used to prevent accepting further
// connections. It does not close the underlying conn.
func (s *Session) GoAway() error {
	return s.sendMsg(s.goAway(goAwayNormal), nil, nil)
}

// goAway is used to send a goAway message
func (s *Session) goAway(reason uint32) header {
	atomic.SwapInt32(&s.localGoAway, 1)
	hdr := encode(typeGoAway, 0, 0, reason)
	return hdr
}

func (s *Session) measureRTT() {
	rtt, err := s.Ping()
	if err != nil {
		return
	}
	if !atomic.CompareAndSwapInt64(&s.rtt, 0, rtt.Nanoseconds()) {
		prev := atomic.LoadInt64(&s.rtt)
		smoothedRTT := prev/2 + rtt.Nanoseconds()/2
		atomic.StoreInt64(&s.rtt, smoothedRTT)
	}
}

func (s *Session) startMeasureRTT() {
	s.measureRTT()
	t := time.NewTicker(s.config.MeasureRTTInterval)
	defer t.Stop()
	for {
		select {
		case <-s.CloseChan():
			return
		case <-t.C:
			s.measureRTT()
		}
	}
}

// 0 if we don't yet have a measurement
func (s *Session) getRTT() time.Duration {
	return time.Duration(atomic.LoadInt64(&s.rtt))
}

// Ping is used to measure the RTT response time
func (s *Session) Ping() (dur time.Duration, err error) {
	// Prepare a ping.
	s.pingLock.Lock()
	// If there's an active ping, jump on the bandwagon.
	if activePing := s.activePing; activePing != nil {
		s.pingLock.Unlock()
		return activePing.wait()
	}

	// Ok, our job to send the ping.
	activePing := newPing(s.pingID)
	s.pingID++
	s.activePing = activePing
	s.pingLock.Unlock()

	defer func() {
		// complete ping promise
		activePing.finish(dur, err)

		// Unset it.
		s.pingLock.Lock()
		s.activePing = nil
		s.pingLock.Unlock()
	}()

	// Send the ping request, waiting at most one connection write timeout
	// to flush it.
	timer := time.NewTimer(s.config.ConnectionWriteTimeout)
	defer timer.Stop()
	select {
	case s.pingCh <- activePing.id:
	case <-timer.C:
		return 0, ErrTimeout
	case <-s.shutdownCh:
		return 0, s.shutdownErr
	}

	// The "time" starts once we've actually sent the ping. Otherwise, we'll
	// measure the time it takes to flush the queue as well.
	start := time.Now()

	// Wait for a response, again waiting at most one write timeout.
	if !timer.Stop() {
		<-timer.C
	}
	timer.Reset(s.config.ConnectionWriteTimeout)
	select {
	case <-activePing.pingResponse:
	case <-timer.C:
		return 0, ErrTimeout
	case <-s.shutdownCh:
		return 0, s.shutdownErr
	}

	// Compute the RTT
	return time.Since(start), nil
}

// startKeepalive starts the keepalive process.
func (s *Session) startKeepalive() {
	s.keepaliveLock.Lock()
	defer s.keepaliveLock.Unlock()
	s.keepaliveTimer = time.AfterFunc(s.config.KeepAliveInterval, func() {
		s.keepaliveLock.Lock()
		if s.keepaliveTimer == nil || s.keepaliveActive {
			// keepalives have been stopped or a keepalive is active.
			s.keepaliveLock.Unlock()
			return
		}
		s.keepaliveActive = true
		s.keepaliveLock.Unlock()

		_, err := s.Ping()

		s.keepaliveLock.Lock()
		s.keepaliveActive = false
		if s.keepaliveTimer != nil {
			s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
		}
		s.keepaliveLock.Unlock()

		if err != nil {
			s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
			s.exitErr(ErrKeepAliveTimeout)
		}
	})
}

// stopKeepalive stops the keepalive process.
func (s *Session) stopKeepalive() {
	s.keepaliveLock.Lock()
	defer s.keepaliveLock.Unlock()
	if s.keepaliveTimer != nil {
		s.keepaliveTimer.Stop()
		s.keepaliveTimer = nil
	}
}

func (s *Session) extendKeepalive() {
	s.keepaliveLock.Lock()
	if s.keepaliveTimer != nil && !s.keepaliveActive {
		// Don't stop the timer and drain the channel. This is an
		// AfterFunc, not a normal timer, and any attempts to drain the
		// channel will block forever.
		//
		// Go will stop the timer for us internally anyways. The docs
		// say one must stop the timer before calling reset but that's
		// to ensure that the timer doesn't end up firing immediately
		// after calling Reset.
		s.keepaliveTimer.Reset(s.config.KeepAliveInterval)
	}
	s.keepaliveLock.Unlock()
}

// send sends the header and body.
func (s *Session) sendMsg(hdr header, body []byte, deadline <-chan struct{}) error {
	select {
	case <-s.shutdownCh:
		return s.shutdownErr
	default:
	}

	// duplicate as we're sending this async.
	buf := pool.Get(headerSize + len(body))
	copy(buf[:headerSize], hdr[:])
	copy(buf[headerSize:], body)

	select {
	case <-s.shutdownCh:
		pool.Put(buf)
		return s.shutdownErr
	case s.sendCh <- buf:
		return nil
	case <-deadline:
		pool.Put(buf)
		return ErrTimeout
	}
}

// send is a long running goroutine that sends data
func (s *Session) send() {
	if err := s.sendLoop(); err != nil {
		s.exitErr(err)
	}
}

func (s *Session) sendLoop() (err error) {
	defer func() {
		if rerr := recover(); rerr != nil {
			fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack())
			err = fmt.Errorf("panic in yamux send loop: %s", rerr)
		}
	}()

	defer close(s.sendDoneCh)

	// Extend the write deadline if we've passed the halfway point. This can
	// be expensive so this ensures we only have to do this once every
	// ConnectionWriteTimeout/2 (usually 5s).
	var lastWriteDeadline time.Time
	extendWriteDeadline := func() error {
		now := time.Now()
		// If over half of the deadline has elapsed, extend it.
		if now.Add(s.config.ConnectionWriteTimeout / 2).After(lastWriteDeadline) {
			lastWriteDeadline = now.Add(s.config.ConnectionWriteTimeout)
			return s.conn.SetWriteDeadline(lastWriteDeadline)
		}
		return nil
	}

	writer := s.conn

	// FIXME: https://github.com/libp2p/go-libp2p/issues/644
	// Write coalescing is disabled for now.

	// writer := pool.Writer{W: s.conn}

	// var writeTimeout *time.Timer
	// var writeTimeoutCh <-chan time.Time
	// if s.config.WriteCoalesceDelay > 0 {
	//	writeTimeout = time.NewTimer(s.config.WriteCoalesceDelay)
	//	defer writeTimeout.Stop()

	//	writeTimeoutCh = writeTimeout.C
	// } else {
	//	ch := make(chan time.Time)
	//	close(ch)
	//	writeTimeoutCh = ch
	// }

	for {
		// yield after processing the last message, if we've shutdown.
		// s.sendCh is a buffered channel and Go doesn't guarantee select order.
		select {
		case <-s.shutdownCh:
			return nil
		default:
		}

		var buf []byte
		// Make sure to send any pings & pongs first so they don't get stuck behind writes.
		select {
		case pingID := <-s.pingCh:
			buf = pool.Get(headerSize)
			hdr := encode(typePing, flagSYN, 0, pingID)
			copy(buf, hdr[:])
		case pingID := <-s.pongCh:
			buf = pool.Get(headerSize)
			hdr := encode(typePing, flagACK, 0, pingID)
			copy(buf, hdr[:])
		default:
			// Then send normal data.
			select {
			case buf = <-s.sendCh:
			case pingID := <-s.pingCh:
				buf = pool.Get(headerSize)
				hdr := encode(typePing, flagSYN, 0, pingID)
				copy(buf, hdr[:])
			case pingID := <-s.pongCh:
				buf = pool.Get(headerSize)
				hdr := encode(typePing, flagACK, 0, pingID)
				copy(buf, hdr[:])
			case <-s.shutdownCh:
				return nil
				// default:
				//	select {
				//	case buf = <-s.sendCh:
				//	case <-s.shutdownCh:
				//		return nil
				//	case <-writeTimeoutCh:
				//		if err := writer.Flush(); err != nil {
				//			if os.IsTimeout(err) {
				//				err = ErrConnectionWriteTimeout
				//			}
				//			return err
				//		}

				//		select {
				//		case buf = <-s.sendCh:
				//		case <-s.shutdownCh:
				//			return nil
				//		}

				//		if writeTimeout != nil {
				//			writeTimeout.Reset(s.config.WriteCoalesceDelay)
				//		}
				//	}
			}
		}

		if err := extendWriteDeadline(); err != nil {
			pool.Put(buf)
			return err
		}

		_, err := writer.Write(buf)
		pool.Put(buf)

		if err != nil {
			if os.IsTimeout(err) {
				err = ErrConnectionWriteTimeout
			}
			return err
		}
	}
}

// recv is a long running goroutine that accepts new data
func (s *Session) recv() {
	if err := s.recvLoop(); err != nil {
		s.exitErr(err)
	}
}

// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
var (
	handlers = []func(*Session, header) error{
		typeData:         (*Session).handleStreamMessage,
		typeWindowUpdate: (*Session).handleStreamMessage,
		typePing:         (*Session).handlePing,
		typeGoAway:       (*Session).handleGoAway,
	}
)

// recvLoop continues to receive data until a fatal error is encountered
func (s *Session) recvLoop() (err error) {
	defer func() {
		if rerr := recover(); rerr != nil {
			fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", rerr, debug.Stack())
			err = fmt.Errorf("panic in yamux receive loop: %s", rerr)
		}
	}()
	defer close(s.recvDoneCh)
	var hdr header
	for {
		// fmt.Printf("ReadFull from %#v\n", s.reader)
		// Read the header
		if _, err := io.ReadFull(s.reader, hdr[:]); err != nil {
			if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
				s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
			}
			return err
		}

		// Reset the keepalive timer every time we receive data.
		// There's no reason to keepalive if we're active. Worse, if the
		// peer is busy sending us stuff, the pong might get stuck
		// behind a bunch of data.
		s.extendKeepalive()

		// Verify the version
		if hdr.Version() != protoVersion {
			s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
			return ErrInvalidVersion
		}

		mt := hdr.MsgType()
		if mt < typeData || mt > typeGoAway {
			return ErrInvalidMsgType
		}

		if err := handlers[mt](s, hdr); err != nil {
			return err
		}
	}
}

// handleStreamMessage handles either a data or window update frame
func (s *Session) handleStreamMessage(hdr header) error {
	// Check for a new stream creation
	id := hdr.StreamID()
	flags := hdr.Flags()
	if flags&flagSYN == flagSYN {
		if err := s.incomingStream(id); err != nil {
			return err
		}
	}

	// Get the stream
	s.streamLock.Lock()
	stream := s.streams[id]
	s.streamLock.Unlock()

	// If we do not have a stream, likely we sent a RST
	if stream == nil {
		// Drain any data on the wire
		if hdr.MsgType() == typeData && hdr.Length() > 0 {
			s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
			if _, err := io.CopyN(io.Discard, s.reader, int64(hdr.Length())); err != nil {
				s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
				return nil
			}
		} else {
			s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
		}
		return nil
	}

	// Check if this is a window update
	if hdr.MsgType() == typeWindowUpdate {
		stream.incrSendWindow(hdr, flags)
		return nil
	}

	// Read the new data
	if err := stream.readData(hdr, flags, s.reader); err != nil {
		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
		}
		return err
	}
	return nil
}

// handlePing is invoked for a typePing frame
func (s *Session) handlePing(hdr header) error {
	flags := hdr.Flags()
	pingID := hdr.Length()

	// Check if this is a query, respond back in a separate context so we
	// don't interfere with the receiving thread blocking for the write.
	if flags&flagSYN == flagSYN {
		select {
		case s.pongCh <- pingID:
		default:
			s.logger.Printf("[WARN] yamux: dropped ping reply")
		}
		return nil
	}

	// Handle a response
	s.pingLock.Lock()
	// If we have an active ping, and this is a response to that active
	// ping, complete the ping.
	if s.activePing != nil && s.activePing.id == pingID {
		// Don't assume that the peer won't send multiple responses for
		// the same ping.
		select {
		case s.activePing.pingResponse <- struct{}{}:
		default:
		}
	}
	s.pingLock.Unlock()
	return nil
}

// handleGoAway is invokde for a typeGoAway frame
func (s *Session) handleGoAway(hdr header) error {
	code := hdr.Length()
	switch code {
	case goAwayNormal:
		atomic.SwapInt32(&s.remoteGoAway, 1)
	case goAwayProtoErr:
		s.logger.Printf("[ERR] yamux: received protocol error go away")
		return fmt.Errorf("yamux protocol error")
	case goAwayInternalErr:
		s.logger.Printf("[ERR] yamux: received internal error go away")
		return fmt.Errorf("remote yamux internal error")
	default:
		s.logger.Printf("[ERR] yamux: received unexpected go away")
		return fmt.Errorf("unexpected go away received")
	}
	return nil
}

// incomingStream is used to create a new incoming stream
func (s *Session) incomingStream(id uint32) error {
	if s.client != (id%2 == 0) {
		s.logger.Printf("[ERR] yamux: both endpoints are clients")
		return fmt.Errorf("both yamux endpoints are clients")
	}
	// Reject immediately if we are doing a go away
	if atomic.LoadInt32(&s.localGoAway) == 1 {
		hdr := encode(typeWindowUpdate, flagRST, id, 0)
		return s.sendMsg(hdr, nil, nil)
	}

	// Allocate a new stream
	span, err := s.newMemoryManager()
	if err != nil {
		return fmt.Errorf("failed to create resource span: %w", err)
	}
	if err := span.ReserveMemory(initialStreamWindow, 255); err != nil {
		return err
	}
	stream := newStream(s, id, streamSYNReceived, initialStreamWindow, span)

	s.streamLock.Lock()
	defer s.streamLock.Unlock()

	// Check if stream already exists
	if _, ok := s.streams[id]; ok {
		s.logger.Printf("[ERR] yamux: duplicate stream declared")
		if sendErr := s.sendMsg(s.goAway(goAwayProtoErr), nil, nil); sendErr != nil {
			s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
		}
		span.Done()
		return ErrDuplicateStream
	}

	if s.numIncomingStreams >= s.config.MaxIncomingStreams {
		// too many active streams at the same time
		s.logger.Printf("[WARN] yamux: MaxIncomingStreams exceeded, forcing stream reset")
		defer span.Done()
		hdr := encode(typeWindowUpdate, flagRST, id, 0)
		return s.sendMsg(hdr, nil, nil)
	}

	s.numIncomingStreams++
	// Register the stream
	s.streams[id] = stream

	// Check if we've exceeded the backlog
	select {
	case s.acceptCh <- stream:
		return nil
	default:
		// Backlog exceeded! RST the stream
		defer span.Done()
		s.logger.Printf("[WARN] yamux: backlog exceeded, forcing stream reset")
		s.deleteStream(id)
		hdr := encode(typeWindowUpdate, flagRST, id, 0)
		return s.sendMsg(hdr, nil, nil)
	}
}

// closeStream is used to close a stream once both sides have
// issued a close. If there was an in-flight SYN and the stream
// was not yet established, then this will give the credit back.
func (s *Session) closeStream(id uint32) {
	s.streamLock.Lock()
	defer s.streamLock.Unlock()
	if _, ok := s.inflight[id]; ok {
		select {
		case <-s.synCh:
		default:
			s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
		}
		delete(s.inflight, id)
	}
	s.deleteStream(id)
}

func (s *Session) deleteStream(id uint32) {
	str, ok := s.streams[id]
	if !ok {
		return
	}
	if s.client == (id%2 == 0) {
		if s.numIncomingStreams == 0 {
			s.logger.Printf("[ERR] yamux: numIncomingStreams underflow")
			// prevent the creation of any new streams
			s.numIncomingStreams = math.MaxUint32
		} else {
			s.numIncomingStreams--
		}
	}
	delete(s.streams, id)
	str.memorySpan.Done()
}

// establishStream is used to mark a stream that was in the
// SYN Sent state as established.
func (s *Session) establishStream(id uint32) {
	s.streamLock.Lock()
	if _, ok := s.inflight[id]; ok {
		delete(s.inflight, id)
	} else {
		s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
	}
	select {
	case <-s.synCh:
	default:
		s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
	}
	s.streamLock.Unlock()
}