From 76a6d3027d138f3c5c83a79e82e1ca05ca1c4d80 Mon Sep 17 00:00:00 2001 From: backkem Date: Tue, 22 Jan 2019 12:02:08 +0100 Subject: [PATCH] Rewrite to implement transport interface directly --- README.md | 20 ++- conn.go | 389 +++++++++++++++++++++++++++++++++++++++++++ listener.go | 149 +++++++++++++++++ net.go | 71 -------- signaler.go | 186 --------------------- signaling.go | 38 +++++ stream.go | 62 +++++++ transport.go | 92 ++++++++++ webrtcdirect.go | 92 +--------- webrtcdirect_test.go | 75 ++++----- 10 files changed, 784 insertions(+), 390 deletions(-) create mode 100644 conn.go create mode 100644 listener.go delete mode 100644 net.go delete mode 100644 signaler.go create mode 100644 signaling.go create mode 100644 stream.go create mode 100644 transport.go diff --git a/README.md b/README.md index 42a9ec8..7bcfbd8 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,16 @@ -Package ``go-libp2p-webrtc-direct`` is a Golang version of the webrtc-direct libp2p transport. +go-libp2p-webrtc-direct +=== +Package ``go-libp2p-webrtc-direct`` aims to be a Go port of [js-libp2p-webrtc-direct](https://github.com/libp2p/js-libp2p-webrtc-direct). -Please refer to ``pions/webrtc`` for additional installation instructions. -The package also requires the following forks to be checked out under their original package name: -- ``backkem/go-multiaddr`` -- ``backkem/mafmt`` +## Dependencies +There is currently no dependency management technique implemented. +However, this package relies on some dependencies that have not been +upstreamed yet. Check out the following forks under their original package name: +- [backkem/go-multiaddr-dns](https://github.com/backkem/go-multiaddr-dns) under `multiformats/go-multiaddr` +- [backkem/mafmt](https://github.com/backkem/mafmt) under `whyrusleeping/mafmt` -The transport passes the ``SubtestStress1Conn1Stream1Msg`` test case but there is a long list of known limitations. Therefore, please don't rely on this package. It only serves as a proof of concept and as an experiment to gather some experience building tools on top of ``pions/webrtc``. \ No newline at end of file +## Status +The package is in alpha status. + +## License +MIT License - see [LICENSE](LICENSE) for full text \ No newline at end of file diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..aabfca1 --- /dev/null +++ b/conn.go @@ -0,0 +1,389 @@ +package libp2pwebrtcdirect + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "sync" + "time" + + ic "github.com/libp2p/go-libp2p-crypto" + peer "github.com/libp2p/go-libp2p-peer" + tpt "github.com/libp2p/go-libp2p-transport" + smux "github.com/libp2p/go-stream-muxer" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" + "github.com/pions/datachannel" + "github.com/pions/webrtc" +) + +type connConfig struct { + transport *Transport + maAddr ma.Multiaddr + addr net.Addr + isServer bool +} + +func newConnConfig(transport *Transport, maAddr ma.Multiaddr, isServer bool) (*connConfig, error) { + httpMa := maAddr.Decapsulate(webrtcma) + + tcpMa := httpMa.Decapsulate(httpma) + addr, err := manet.ToNetAddr(tcpMa) + if err != nil { + return nil, fmt.Errorf("failed to get net addr: %v", err) + } + + return &connConfig{ + transport: transport, + maAddr: maAddr, + addr: addr, + isServer: isServer, + }, nil +} + +// Conn is a stream-multiplexing connection to a remote peer. +type Conn struct { + config *connConfig + + peerConnection *webrtc.RTCPeerConnection + initChannel *datachannel.DataChannel + + lock sync.RWMutex + accept chan chan detachResult + isMuxed bool + muxedConn smux.Conn +} + +func newConn(config *connConfig, pc *webrtc.RTCPeerConnection, initChannel *datachannel.DataChannel) *Conn { + conn := &Conn{ + config: config, + peerConnection: pc, + initChannel: initChannel, + accept: make(chan chan detachResult), + isMuxed: config.transport.muxer != nil, + } + + pc.OnDataChannel(func(dc *webrtc.RTCDataChannel) { + // We have to detach in OnDataChannel + detachRes := detachChannel(dc) + conn.accept <- detachRes + }) + + return conn +} + +func dial(ctx context.Context, config *connConfig) (*Conn, error) { + api := config.transport.api + pc, err := api.NewRTCPeerConnection(config.transport.webrtcOptions) + if err != nil { + return nil, err + } + + dc, err := pc.CreateDataChannel("data", nil) + if err != nil { + return nil, err + } + + detachRes := detachChannel(dc) + + offer, err := pc.CreateOffer(nil) + if err != nil { + return nil, err + } + + offerEnc, err := encodeSignal(offer) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", "http://"+config.addr.String()+"/?signal="+offerEnc, nil) + if err != nil { + return nil, err + } + + req = req.WithContext(ctx) + + var client = &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + answerEnc, err := ioutil.ReadAll(resp.Body) + if err != nil && err != io.EOF { + return nil, err + } + + answer, err := decodeSignal(string(answerEnc)) + if err != nil { + return nil, err + } + + if err := pc.SetRemoteDescription(answer); err != nil { + return nil, err + } + + select { + case res := <-detachRes: + if res.err != nil { + return nil, res.err + } + return newConn(config, pc, res.dc), nil + + case <-ctx.Done(): + return newConn(config, pc, nil), ctx.Err() + } +} + +type detachResult struct { + dc *datachannel.DataChannel + err error +} + +func detachChannel(dc *webrtc.RTCDataChannel) chan detachResult { + onOpenRes := make(chan detachResult) + dc.OnOpen(func() { + // Detach the data channel + raw, err := dc.Detach() + onOpenRes <- detachResult{raw, err} + }) + + return onOpenRes +} + +// Close closes the stream muxer and the the underlying net.Conn. +func (c *Conn) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + var err error + if c.peerConnection != nil { + err = c.peerConnection.Close() + } + c.peerConnection = nil + + close(c.accept) + + return err +} + +// IsClosed returns whether a connection is fully closed, so it can +// be garbage collected. +func (c *Conn) IsClosed() bool { + c.lock.RLock() + pc := c.peerConnection + c.lock.RUnlock() + return pc == nil +} + +// OpenStream creates a new stream. +func (c *Conn) OpenStream() (smux.Stream, error) { + muxed, err := c.getMuxed() + if err != nil { + return nil, err + } + if muxed != nil { + return muxed.OpenStream() + } + + rawDC := c.checkInitChannel() + if rawDC == nil { + pc, err := c.getPC() + if err != nil { + return nil, err + } + dc, err := pc.CreateDataChannel("data", nil) + if err != nil { + return nil, err + } + + detachRes := detachChannel(dc) + + res := <-detachRes + if res.err != nil { + return nil, res.err + } + rawDC = res.dc + } + + return newStream(rawDC), nil +} + +func (c *Conn) getPC() (*webrtc.RTCPeerConnection, error) { + c.lock.RLock() + pc := c.peerConnection + c.lock.RUnlock() + + if pc == nil { + return nil, errors.New("Conn closed") + } + + return pc, nil +} + +func (c *Conn) getMuxed() (smux.Conn, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if !c.isMuxed { + return nil, nil + } + + if c.muxedConn != nil { + return c.muxedConn, nil + } + + rawDC := c.initChannel + if rawDC == nil { + var err error + rawDC, err = c.awaitAccept() + if err != nil { + return nil, err + } + } + + err := c.useMuxer(&dcWrapper{rawDC, c.config.addr}, c.config.transport.muxer) + if err != nil { + return nil, err + } + + return c.muxedConn, nil +} + +// Note: caller should hold the conn lock. +func (c *Conn) useMuxer(conn net.Conn, muxer smux.Transport) error { + muxed, err := muxer.NewConn(conn, c.config.isServer) + if err != nil { + return err + } + c.muxedConn = muxed + + return nil +} + +func (c *Conn) checkInitChannel() *datachannel.DataChannel { + c.lock.Lock() + defer c.lock.Unlock() + // Since a WebRTC offer can't be empty the offering side will have + // an initial data channel opened. We return it here, the first time + // OpenStream is called. + if c.initChannel != nil { + ch := c.initChannel + c.initChannel = nil + return ch + } + + return nil +} + +// AcceptStream accepts a stream opened by the other side. +func (c *Conn) AcceptStream() (smux.Stream, error) { + muxed, err := c.getMuxed() + if err != nil { + return nil, err + } + if muxed != nil { + return muxed.AcceptStream() + } + + rawDC := c.checkInitChannel() + if rawDC == nil { + rawDC, err = c.awaitAccept() + } + + return newStream(rawDC), nil +} + +func (c *Conn) awaitAccept() (*datachannel.DataChannel, error) { + detachRes, ok := <-c.accept + if !ok { + return nil, errors.New("Conn closed") + } + + res := <-detachRes + return res.dc, res.err +} + +// LocalPeer returns our peer ID +func (c *Conn) LocalPeer() peer.ID { + // TODO: How to form a peer ID? + return peer.ID("") +} + +// LocalPrivateKey returns our private key +func (c *Conn) LocalPrivateKey() ic.PrivKey { + // TODO: Expose from pions/webrtc? + return nil + +} + +// RemotePeer returns the peer ID of the remote peer. +func (c *Conn) RemotePeer() peer.ID { + // TODO: How to form a peer ID? + return peer.ID("") +} + +// RemotePublicKey returns the public key of the remote peer. +func (c *Conn) RemotePublicKey() ic.PubKey { + // TODO: Expose from pions/webrtc? + return nil +} + +// LocalMultiaddr returns the local Multiaddr associated +// with this connection +func (c *Conn) LocalMultiaddr() ma.Multiaddr { + return c.config.maAddr +} + +// RemoteMultiaddr returns the remote Multiaddr associated +// with this connection +func (c *Conn) RemoteMultiaddr() ma.Multiaddr { + return c.config.maAddr +} + +// Transport returns the transport to which this connection belongs. +func (c *Conn) Transport() tpt.Transport { + return c.config.transport +} + +// dcWrapper wraps datachannel.DataChannel to form a net.Conn +type dcWrapper struct { + channel *datachannel.DataChannel + addr net.Addr +} + +func (w *dcWrapper) Read(p []byte) (int, error) { + return w.channel.Read(p) +} + +func (w *dcWrapper) Write(p []byte) (n int, err error) { + return w.channel.Write(p) +} + +func (w *dcWrapper) Close() error { + return w.channel.Close() +} + +func (w *dcWrapper) LocalAddr() net.Addr { + return w.addr +} + +func (w *dcWrapper) RemoteAddr() net.Addr { + return w.addr +} + +func (w *dcWrapper) SetDeadline(t time.Time) error { + return nil +} + +func (w *dcWrapper) SetReadDeadline(t time.Time) error { + return nil +} + +func (w *dcWrapper) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..80d0bc3 --- /dev/null +++ b/listener.go @@ -0,0 +1,149 @@ +package libp2pwebrtcdirect + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + + tpt "github.com/libp2p/go-libp2p-transport" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" +) + +// Listener is an interface closely resembling the net.Listener interface. +type Listener struct { + config *connConfig + accept chan *Conn + + srv *http.Server +} + +func newListener(config *connConfig) (*Listener, error) { + + ln, err := net.Listen(config.addr.Network(), config.addr.String()) + if err != nil { + return nil, fmt.Errorf("failed to listen: %v", err) + } + + // Update the addr after listening + tcpMa, err := manet.FromNetAddr(ln.Addr()) + if err != nil { + return nil, fmt.Errorf("failed create ma: %v", err) + } + + httpMa := tcpMa.Encapsulate(httpma) + maAddr := httpMa.Encapsulate(webrtcma) + + config.addr = ln.Addr() + config.maAddr = maAddr + + l := &Listener{ + config: config, + accept: make(chan *Conn), + } + + mux := http.NewServeMux() + mux.HandleFunc("/", l.handler) + + srv := &http.Server{ + Handler: mux, + } + + go func() { + srvErr := srv.Serve(ln) + if srvErr != nil { + log.Warningf("failed to start server: %v", srvErr) + } + }() + + l.srv = srv + return l, nil +} + +func (l *Listener) handler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + signals, ok := r.Form["signal"] + if !ok || len(signals) != 1 { + log.Warningf("failed to handle request: failed to parse signal") + return + } + + answer, err := l.handleSignal(signals[0]) + if err != nil { + log.Warningf("failed to handle request: failed to setup connection: %v", err) + return + } + + _, err = fmt.Fprint(w, answer) + if err != nil { + log.Warningf("failed to handle request: failed to send answer: %v", err) + return + } +} + +func (l *Listener) handleSignal(offerStr string) (string, error) { + offer, err := decodeSignal(offerStr) + if err != nil { + return "", fmt.Errorf("failed to decode offer: %v", err) + } + + api := l.config.transport.api + pc, err := api.NewRTCPeerConnection(l.config.transport.webrtcOptions) + if err != nil { + return "", err + } + + if err := pc.SetRemoteDescription(offer); err != nil { + return "", fmt.Errorf("failed to set remote description: %v", err) + } + + answer, err := pc.CreateAnswer(nil) + if err != nil { + return "", fmt.Errorf("failed to create answer: %v", err) + } + + answerEnc, err := encodeSignal(answer) + if err != nil { + return "", fmt.Errorf("failed to encode answer: %v", err) + } + + c := newConn(l.config, pc, nil) + l.accept <- c + + return answerEnc, nil +} + +// Accept waits for and returns the next connection to the listener. +func (l *Listener) Accept() (tpt.Conn, error) { + conn, ok := <-l.accept + if !ok { + return nil, errors.New("Listener closed") + } + + return conn, nil +} + +// Close closes the listener. +// Any blocked Accept operations will be unblocked and return errors. +func (l *Listener) Close() error { + err := l.srv.Shutdown(context.Background()) + if err != nil { + return err + } + + close(l.accept) + + return nil +} + +// Addr returns the listener's network address. +func (l *Listener) Addr() net.Addr { + return l.config.addr +} + +// Multiaddr returns the listener's network Multi address. +func (l *Listener) Multiaddr() ma.Multiaddr { + return l.config.maAddr +} diff --git a/net.go b/net.go deleted file mode 100644 index ec3e31f..0000000 --- a/net.go +++ /dev/null @@ -1,71 +0,0 @@ -package libp2pwebrtcdirect - -import ( - "fmt" - "net" - - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" -) - -// laddr and raddr don't make much sense. Maybe they can be nil? -func wrapNetListener(listener net.Listener, laddr, raddr ma.Multiaddr) manet.Listener { - return &maListener{ - Listener: listener, - laddr: laddr, - raddr: raddr, - } -} - -// maListener implements Listener -type maListener struct { - net.Listener - laddr ma.Multiaddr - raddr ma.Multiaddr -} - -// Accept waits for and returns the next connection to the listener. -// Returns a Multiaddr friendly Conn -func (l *maListener) Accept() (manet.Conn, error) { - nconn, err := l.Listener.Accept() - if err != nil { - return nil, err - } - fmt.Println("Accept: wrapNetConn") - return wrapNetConn(nconn, l.laddr, l.raddr), nil -} - -// Multiaddr returns the listener's (local) Multiaddr. -func (l *maListener) Multiaddr() ma.Multiaddr { - return l.laddr -} - -// laddr and raddr don't make much sense. Maybe they can be nil? -func wrapNetConn(conn net.Conn, laddr, raddr ma.Multiaddr) manet.Conn { - endpts := maEndpoints{ - laddr: laddr, - raddr: raddr, - } - - return &struct { - net.Conn - maEndpoints - }{conn, endpts} -} - -type maEndpoints struct { - laddr ma.Multiaddr - raddr ma.Multiaddr -} - -// LocalMultiaddr returns the local address associated with -// this connection -func (c *maEndpoints) LocalMultiaddr() ma.Multiaddr { - return c.laddr -} - -// RemoteMultiaddr returns the remote address associated with -// this connection -func (c *maEndpoints) RemoteMultiaddr() ma.Multiaddr { - return c.raddr -} diff --git a/signaler.go b/signaler.go deleted file mode 100644 index 5b169e8..0000000 --- a/signaler.go +++ /dev/null @@ -1,186 +0,0 @@ -package libp2pwebrtcdirect - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "net" - "net/http" - - multibase "github.com/multiformats/go-multibase" - "github.com/pions/dcnet" - "github.com/pions/webrtc" -) - -func NewHTTPDirectSignaler(config webrtc.RTCConfiguration, address string) *HTTPDirectSignaler { - ctx, cancel := context.WithCancel(context.Background()) - return &HTTPDirectSignaler{ - config: webrtc.RTCConfiguration{}, - address: address, - ctx: ctx, - cancel: cancel, - } -} - -type HTTPDirectSignaler struct { - config webrtc.RTCConfiguration - address string - ctx context.Context - cancel func() -} - -func (r *HTTPDirectSignaler) Dial() (*webrtc.RTCDataChannel, net.Addr, error) { - c, err := webrtc.New(r.config) - if err != nil { - return nil, nil, err - } - - var dc *webrtc.RTCDataChannel - dc, err = c.CreateDataChannel("data", nil) - if err != nil { - return nil, nil, err - } - - // TODO: migrate to OnNegotiationNeeded when available - offer, err := c.CreateOffer(nil) - if err != nil { - return nil, nil, err - } - - offerEnc, err := Encode(offer) - if err != nil { - return nil, nil, err - } - - resp, err := http.Get("http://" + r.address + "/?signal=" + offerEnc) - if err != nil { - return nil, nil, err - } - defer resp.Body.Close() - - answerEnc, err := ioutil.ReadAll(resp.Body) - if err != nil && err != io.EOF { - return nil, nil, err - } - - answer, err := Decode(string(answerEnc)) - if err != nil { - return nil, nil, err - } - - if err := c.SetRemoteDescription(answer); err != nil { - return nil, nil, err - } - return dc, &dcnet.NilAddr{}, nil -} - -func (r *HTTPDirectSignaler) Accept() (*webrtc.RTCDataChannel, net.Addr, error) { - c, err := webrtc.New(r.config) - if err != nil { - return nil, nil, err - } - //c.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { - // fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) - //} - - var dc *webrtc.RTCDataChannel - res := make(chan *webrtc.RTCDataChannel) - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - r.ParseForm() - signals, ok := r.Form["signal"] - if !ok || len(signals) != 1 { - fmt.Println("Failed get offer") - return - } - - offer, err := Decode(signals[0]) - if err != nil { - fmt.Println("Failed to decode offer:", err) - return - } - - if err := c.SetRemoteDescription(offer); err != nil { - fmt.Println("Failed to set remote description:", err) - return - } - - answer, err := c.CreateAnswer(nil) - if err != nil { - fmt.Println("Failed to create answer:", err) - return - } - - answerEnc, err := Encode(answer) - if err != nil { - fmt.Println("Failed to encode answer:", err) - return - } - - _, err = fmt.Fprint(w, answerEnc) - if err != nil { - fmt.Println("Failed to send answer:", err) - return - } - - c.OnDataChannel = func(d *webrtc.RTCDataChannel) { - res <- d - } - - }) - - srv := &http.Server{ - Addr: r.address, - Handler: mux, - } - - go srv.ListenAndServe() - - select { - case dc = <-res: - case <-r.ctx.Done(): - return nil, nil, errors.New("signaler closed") - } - return dc, &dcnet.NilAddr{}, nil -} - -func Encode(desc webrtc.RTCSessionDescription) (string, error) { - descData, err := json.Marshal(desc) - if err != nil { - return "", fmt.Errorf("failed to marshal description: %v", err) - } - - descEnc, err := multibase.Encode(multibase.Base58BTC, descData) - if err != nil { - return "", fmt.Errorf("failed to encode description: %v", err) - } - return descEnc, nil -} - -func Decode(descEnc string) (webrtc.RTCSessionDescription, error) { - var desc webrtc.RTCSessionDescription - - _, descData, err := multibase.Decode(descEnc) - if err != nil { - return desc, fmt.Errorf("failed to decode description: %v", err) - } - - err = json.Unmarshal(descData, &desc) - if err != nil { - return desc, fmt.Errorf("failed to unmarshal description: %v", err) - } - - return desc, nil -} - -func (r *HTTPDirectSignaler) Close() error { - r.cancel() - return nil -} - -func (r *HTTPDirectSignaler) Addr() net.Addr { - return &dcnet.NilAddr{} -} diff --git a/signaling.go b/signaling.go new file mode 100644 index 0000000..15071fd --- /dev/null +++ b/signaling.go @@ -0,0 +1,38 @@ +package libp2pwebrtcdirect + +import ( + "encoding/json" + "fmt" + + multibase "github.com/multiformats/go-multibase" + "github.com/pions/webrtc" +) + +func encodeSignal(desc webrtc.RTCSessionDescription) (string, error) { + descData, err := json.Marshal(desc) + if err != nil { + return "", fmt.Errorf("failed to marshal description: %v", err) + } + + descEnc, err := multibase.Encode(multibase.Base58BTC, descData) + if err != nil { + return "", fmt.Errorf("failed to encode description: %v", err) + } + return descEnc, nil +} + +func decodeSignal(descEnc string) (webrtc.RTCSessionDescription, error) { + var desc webrtc.RTCSessionDescription + + _, descData, err := multibase.Decode(descEnc) + if err != nil { + return desc, fmt.Errorf("failed to decode description: %v", err) + } + + err = json.Unmarshal(descData, &desc) + if err != nil { + return desc, fmt.Errorf("failed to unmarshal description: %v", err) + } + + return desc, nil +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..42891e0 --- /dev/null +++ b/stream.go @@ -0,0 +1,62 @@ +package libp2pwebrtcdirect + +import ( + "io" + "time" + + "github.com/pions/datachannel" +) + +// Stream is a bidirectional io pipe within a connection. +type Stream struct { + channel *datachannel.DataChannel +} + +func newStream(channel *datachannel.DataChannel) *Stream { + return &Stream{channel: channel} +} + +// Read implements the io.Reader. +func (s *Stream) Read(p []byte) (int, error) { + i, err := s.channel.Read(p) + if err != nil { + // pions/datachannel retuns an error when the underlying transport + // is closed. Here we turn this into EOF. + return i, io.EOF + } + return i, nil +} + +// Write implements the io.Writer. +func (s *Stream) Write(p []byte) (int, error) { + return s.channel.Write(p) +} + +// Close closes the stream for writing. Reading will still work (that +// is, the remote side can still write). +func (s *Stream) Close() error { + // TODO: figure out close vs reset + return nil +} + +// Reset closes both ends of the stream. Use this to tell the remote +// side to hang up and go away. +func (s *Stream) Reset() error { + // TODO: figure out close vs reset + return s.channel.Close() +} + +// SetDeadline is a stub +func (s *Stream) SetDeadline(t time.Time) error { + return nil +} + +// SetReadDeadline is a stub +func (s *Stream) SetReadDeadline(t time.Time) error { + return nil +} + +// SetWriteDeadline is a stub +func (s *Stream) SetWriteDeadline(t time.Time) error { + return nil +} diff --git a/transport.go b/transport.go new file mode 100644 index 0000000..67ea5a1 --- /dev/null +++ b/transport.go @@ -0,0 +1,92 @@ +package libp2pwebrtcdirect + +import ( + "context" + "fmt" + + peer "github.com/libp2p/go-libp2p-peer" + tpt "github.com/libp2p/go-libp2p-transport" + smux "github.com/libp2p/go-stream-muxer" + ma "github.com/multiformats/go-multiaddr" + "github.com/pions/webrtc" + mafmt "github.com/whyrusleeping/mafmt" +) + +// Transport is the WebRTC transport. +type Transport struct { + webrtcOptions webrtc.RTCConfiguration + muxer smux.Transport + api *webrtc.API +} + +// NewTransport creates a WebRTC transport that signals over a direct HTTP connection. +// It is currently required to provide a muxer. +func NewTransport(webrtcOptions webrtc.RTCConfiguration, muxer smux.Transport) *Transport { + s := webrtc.SettingEngine{} + // Use Detach data channels mode + s.DetachDataChannels() + api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) + return &Transport{ + webrtcOptions: webrtcOptions, + muxer: muxer, // TODO: Make the muxer optional + api: api, + } +} + +// CanDial returns true if this transport believes it can dial the given +// multiaddr. +func (t *Transport) CanDial(addr ma.Multiaddr) bool { + return mafmt.WebRTCDirect.Matches(addr) +} + +// Dial dials the peer at the remote address. +func (t *Transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) { + if !t.CanDial(raddr) { + return nil, fmt.Errorf("can't dial address %s", raddr) + } + + cfg, err := newConnConfig(t, raddr, false) + if err != nil { + return nil, fmt.Errorf("failed to get dial args: %v", err) + } + + conn, err := dial(ctx, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create connection: %v", err) + } + + return conn, nil +} + +// Listen listens on the given multiaddr. +func (t *Transport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { + if !t.CanDial(laddr) { + return nil, fmt.Errorf("can't listen on address %s", laddr) + } + + cfg, err := newConnConfig(t, laddr, true) + if err != nil { + return nil, fmt.Errorf("failed to get dial args: %v", err) + } + + l, err := newListener(cfg) + if err != nil { + return nil, fmt.Errorf("failed to listen: %v", err) + } + + return l, nil +} + +// Protocols returns the list of terminal protocols this transport can dial. +func (t *Transport) Protocols() []int { + return []int{ma.P_P2P_WEBRTC_DIRECT} +} + +// Proxy always returns false for the TCP transport. +func (t *Transport) Proxy() bool { + return false +} + +func (t *Transport) String() string { + return "p2p-webrtc-direct" +} diff --git a/webrtcdirect.go b/webrtcdirect.go index a1db933..92bf88f 100644 --- a/webrtcdirect.go +++ b/webrtcdirect.go @@ -1,99 +1,19 @@ package libp2pwebrtcdirect import ( - "context" - "fmt" - logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-peer" tpt "github.com/libp2p/go-libp2p-transport" - tptu "github.com/libp2p/go-libp2p-transport-upgrader" + smux "github.com/libp2p/go-stream-muxer" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" - "github.com/pions/dcnet" - "github.com/pions/webrtc" - mafmt "github.com/whyrusleeping/mafmt" ) var log = logging.Logger("webrtcdirect-tpt") var webrtcma, _ = ma.NewMultiaddr("/p2p-webrtc-direct") +var httpma, _ = ma.NewMultiaddr("/http") -// WebRTCDirectTransport is the TCP transport. -type WebRTCDirectTransport struct { - webrtcOptions webrtc.RTCConfiguration - // Connection upgrader for upgrading insecure stream connections to - // secure multiplex connections. - Upgrader *tptu.Upgrader -} +var _ tpt.Transport = &Transport{} +var _ tpt.Conn = &Conn{} +var _ tpt.Listener = &Listener{} -var _ tpt.Transport = &WebRTCDirectTransport{} - -// NewWebRTCDirectTransport creates a WebRTC transport that signals over a direct HTTP connection. -func NewWebRTCDirectTransport(webrtcOptions webrtc.RTCConfiguration, upgrader *tptu.Upgrader) *WebRTCDirectTransport { - return &WebRTCDirectTransport{ - webrtcOptions: webrtcOptions, - Upgrader: upgrader, - } -} - -// CanDial returns true if this transport believes it can dial the given -// multiaddr. -func (t *WebRTCDirectTransport) CanDial(addr ma.Multiaddr) bool { - return mafmt.WebRTCDirect.Matches(addr) -} - -// Dial dials the peer at the remote address. -func (t *WebRTCDirectTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) { - if !t.CanDial(raddr) { - return nil, fmt.Errorf("can't dial address %s", raddr) - } - httpMa := raddr.Decapsulate(webrtcma) - _, httpAddr, err := manet.DialArgs(httpMa) - if err != nil { - return nil, fmt.Errorf("failed to get dial args: %v", err) - } - - signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr) - conn, err := dcnet.Dial(signaler) - if err != nil { - return nil, fmt.Errorf("failed to dial: %v", err) - } - - wrappedConn := wrapNetConn(conn, raddr, raddr) - - return t.Upgrader.UpgradeOutbound(ctx, t, wrappedConn, p) -} - -// Listen listens on the given multiaddr. -func (t *WebRTCDirectTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { - if !t.CanDial(laddr) { - return nil, fmt.Errorf("can't listen on address %s", laddr) - } - httpMa := laddr.Decapsulate(webrtcma) - _, httpAddr, err := manet.DialArgs(httpMa) - if err != nil { - return nil, fmt.Errorf("failed to get dial args: %v", err) - } - - signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr) - listener := dcnet.NewListener(signaler) - - wrappedListener := wrapNetListener(listener, laddr, laddr) - - return t.Upgrader.UpgradeListener(t, wrappedListener), nil -} - -// Protocols returns the list of terminal protocols this transport can dial. -func (t *WebRTCDirectTransport) Protocols() []int { - return []int{ma.P_P2P_WEBRTC_DIRECT} -} - -// Proxy always returns false for the TCP transport. -func (t *WebRTCDirectTransport) Proxy() bool { - return false -} - -func (t *WebRTCDirectTransport) String() string { - return "p2p-webrtc-direct" -} +var _ smux.Stream = &Stream{} diff --git a/webrtcdirect_test.go b/webrtcdirect_test.go index a0686dd..e4aac6b 100644 --- a/webrtcdirect_test.go +++ b/webrtcdirect_test.go @@ -5,10 +5,10 @@ import ( "runtime" "testing" - "github.com/libp2p/go-conn-security/insecure" + logging "github.com/ipfs/go-log" + peer "github.com/libp2p/go-libp2p-peer" tpt "github.com/libp2p/go-libp2p-transport" - tptu "github.com/libp2p/go-libp2p-transport-upgrader" utils "github.com/libp2p/go-libp2p-transport/test" ma "github.com/multiformats/go-multiaddr" "github.com/pions/webrtc" @@ -16,20 +16,37 @@ import ( ) var Subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ - // utils.SubtestProtocols, - // utils.SubtestBasic, - // utils.SubtestCancel, - // utils.SubtestPingPong, + utils.SubtestProtocols, + utils.SubtestBasic, + + utils.SubtestCancel, + utils.SubtestPingPong, // Stolen from the stream muxer test suite. utils.SubtestStress1Conn1Stream1Msg, - // utils.SubtestStress1Conn1Stream100Msg, - // utils.SubtestStress1Conn100Stream100Msg, - // utils.SubtestStress50Conn10Stream50Msg, - // utils.SubtestStress1Conn1000Stream10Msg, - // utils.SubtestStress1Conn100Stream100Msg10MB, - // utils.SubtestStreamOpenStress, - // utils.SubtestStreamReset, + // utils.SubtestStress1Conn1Stream100Msg, // Flaky (WIP on SCTP issues) + // utils.SubtestStress1Conn100Stream100Msg, // Flaky (WIP on SCTP issues) + // utils.SubtestStress50Conn10Stream50Msg, // TODO + // utils.SubtestStress1Conn1000Stream10Msg, // TODO + // utils.SubtestStress1Conn100Stream100Msg10MB, // TODO + // utils.SubtestStreamOpenStress, // Passes with higher timeout + utils.SubtestStreamReset, +} + +func TestTransport(t *testing.T) { + logging.SetLogLevel("*", "warning") + + ta := NewTransport( + webrtc.RTCConfiguration{}, + new(mplex.Transport), + ) + tb := NewTransport( + webrtc.RTCConfiguration{}, + new(mplex.Transport), + ) + + addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct" + SubtestTransport(t, ta, tb, addr, "peerA") } func getFunctionName(i interface{}) string { @@ -47,39 +64,15 @@ func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA pee }) } } - -func TestWebRTCDirectTransport(t *testing.T) { - ta := NewWebRTCDirectTransport( - webrtc.RTCConfiguration{}, - &tptu.Upgrader{ - Secure: insecure.New("peerA"), - Muxer: new(mplex.Transport), - }, - ) - tb := NewWebRTCDirectTransport( - webrtc.RTCConfiguration{}, - &tptu.Upgrader{ - Secure: insecure.New("peerB"), - Muxer: new(mplex.Transport), - }, - ) - - addr := "/ip4/127.0.0.1/tcp/50000/http/p2p-webrtc-direct" - SubtestTransport(t, ta, tb, addr, "peerA") -} - -func TestWebRTCDirectTransportCantListenUtp(t *testing.T) { - utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001") +func TestTransportCantListenUtp(t *testing.T) { + utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/50000") if err != nil { t.Fatal(err) } - tpt := NewWebRTCDirectTransport( + tpt := NewTransport( webrtc.RTCConfiguration{}, - &tptu.Upgrader{ - Secure: insecure.New("peerB"), - Muxer: new(mplex.Transport), - }, + new(mplex.Transport), ) _, err = tpt.Listen(utpa)