From 006488daf2c54a0e59d14855ad6f58a8127761fb Mon Sep 17 00:00:00 2001 From: backkem Date: Sun, 28 Jul 2019 18:02:37 +0200 Subject: [PATCH] Workaround for packet based DataChannel. --- conn.go | 35 +++++++++++++++++++++++++++++++++-- webrtcdirect_test.go | 2 +- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/conn.go b/conn.go index ef5891d..37aeb65 100644 --- a/conn.go +++ b/conn.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "net" "net/http" "sync" @@ -252,7 +253,7 @@ func (c *Conn) getMuxed() (smux.MuxedConn, error) { } } - err := c.useMuxer(&dcWrapper{rawDC, c.config.addr}, c.config.transport.muxer) + err := c.useMuxer(&dcWrapper{channel: rawDC, addr: c.config.addr, buf: make([]byte, dcWrapperBufSize)}, c.config.transport.muxer) if err != nil { return nil, err } @@ -356,17 +357,47 @@ func (c *Conn) Transport() tpt.Transport { return c.config.transport } +// Limit message size until we have a better +// packetizing strategy. +const dcWrapperBufSize = math.MaxUint16 + // dcWrapper wraps datachannel.ReadWriteCloser to form a net.Conn type dcWrapper struct { channel datachannel.ReadWriteCloser addr net.Addr + + buf []byte + bufStart int + bufEnd int } func (w *dcWrapper) Read(p []byte) (int, error) { - return w.channel.Read(p) + var err error + + if w.bufEnd == 0 { + n := 0 + n, err = w.channel.Read(w.buf) + w.bufEnd = n + } + + n := 0 + if w.bufEnd-w.bufStart > 0 { + n = copy(p, w.buf[w.bufStart:w.bufEnd]) + w.bufStart += n + + if w.bufStart >= w.bufEnd { + w.bufStart = 0 + w.bufEnd = 0 + } + } + + return n, err } func (w *dcWrapper) Write(p []byte) (n int, err error) { + if len(p) > dcWrapperBufSize { + return w.channel.Write(p[:dcWrapperBufSize]) + } return w.channel.Write(p) } diff --git a/webrtcdirect_test.go b/webrtcdirect_test.go index fd407d2..bbd39a6 100644 --- a/webrtcdirect_test.go +++ b/webrtcdirect_test.go @@ -24,7 +24,7 @@ func TestTransport(t *testing.T) { addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct" - // TODO: Re-enable the entire test suite + // TODO: Re-enable normal test suite when not hitting CI limits when using race detector // utils.SubtestTransport(t, ta, tb, addr, "peerA") SubtestTransport(t, ta, tb, addr, "peerA") }