Workaround for packet based DataChannel.
This commit is contained in:
parent
0d8b72ff46
commit
006488daf2
35
conn.go
35
conn.go
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -252,7 +253,7 @@ func (c *Conn) getMuxed() (smux.MuxedConn, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.useMuxer(&dcWrapper{rawDC, c.config.addr}, c.config.transport.muxer)
|
err := c.useMuxer(&dcWrapper{channel: rawDC, addr: c.config.addr, buf: make([]byte, dcWrapperBufSize)}, c.config.transport.muxer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -356,17 +357,47 @@ func (c *Conn) Transport() tpt.Transport {
|
||||||
return c.config.transport
|
return c.config.transport
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Limit message size until we have a better
|
||||||
|
// packetizing strategy.
|
||||||
|
const dcWrapperBufSize = math.MaxUint16
|
||||||
|
|
||||||
// dcWrapper wraps datachannel.ReadWriteCloser to form a net.Conn
|
// dcWrapper wraps datachannel.ReadWriteCloser to form a net.Conn
|
||||||
type dcWrapper struct {
|
type dcWrapper struct {
|
||||||
channel datachannel.ReadWriteCloser
|
channel datachannel.ReadWriteCloser
|
||||||
addr net.Addr
|
addr net.Addr
|
||||||
|
|
||||||
|
buf []byte
|
||||||
|
bufStart int
|
||||||
|
bufEnd int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *dcWrapper) Read(p []byte) (int, error) {
|
func (w *dcWrapper) Read(p []byte) (int, error) {
|
||||||
return w.channel.Read(p)
|
var err error
|
||||||
|
|
||||||
|
if w.bufEnd == 0 {
|
||||||
|
n := 0
|
||||||
|
n, err = w.channel.Read(w.buf)
|
||||||
|
w.bufEnd = n
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 0
|
||||||
|
if w.bufEnd-w.bufStart > 0 {
|
||||||
|
n = copy(p, w.buf[w.bufStart:w.bufEnd])
|
||||||
|
w.bufStart += n
|
||||||
|
|
||||||
|
if w.bufStart >= w.bufEnd {
|
||||||
|
w.bufStart = 0
|
||||||
|
w.bufEnd = 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *dcWrapper) Write(p []byte) (n int, err error) {
|
func (w *dcWrapper) Write(p []byte) (n int, err error) {
|
||||||
|
if len(p) > dcWrapperBufSize {
|
||||||
|
return w.channel.Write(p[:dcWrapperBufSize])
|
||||||
|
}
|
||||||
return w.channel.Write(p)
|
return w.channel.Write(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ func TestTransport(t *testing.T) {
|
||||||
|
|
||||||
addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct"
|
addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct"
|
||||||
|
|
||||||
// TODO: Re-enable the entire test suite
|
// TODO: Re-enable normal test suite when not hitting CI limits when using race detector
|
||||||
// utils.SubtestTransport(t, ta, tb, addr, "peerA")
|
// utils.SubtestTransport(t, ta, tb, addr, "peerA")
|
||||||
SubtestTransport(t, ta, tb, addr, "peerA")
|
SubtestTransport(t, ta, tb, addr, "peerA")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue