Rewrite to implement transport interface directly

This commit is contained in:
backkem 2019-01-22 12:02:08 +01:00 committed by Michiel De Backker
parent 1f5d2f86a3
commit 76a6d3027d
10 changed files with 784 additions and 390 deletions

View File

@ -1,8 +1,16 @@
Package ``go-libp2p-webrtc-direct`` is a Golang version of the webrtc-direct libp2p transport.
go-libp2p-webrtc-direct
===
Package ``go-libp2p-webrtc-direct`` aims to be a Go port of [js-libp2p-webrtc-direct](https://github.com/libp2p/js-libp2p-webrtc-direct).
Please refer to ``pions/webrtc`` for additional installation instructions.
The package also requires the following forks to be checked out under their original package name:
- ``backkem/go-multiaddr``
- ``backkem/mafmt``
## Dependencies
There is currently no dependency management technique implemented.
However, this package relies on some dependencies that have not been
upstreamed yet. Check out the following forks under their original package name:
- [backkem/go-multiaddr-dns](https://github.com/backkem/go-multiaddr-dns) under `multiformats/go-multiaddr`
- [backkem/mafmt](https://github.com/backkem/mafmt) under `whyrusleeping/mafmt`
The transport passes the ``SubtestStress1Conn1Stream1Msg`` test case but there is a long list of known limitations. Therefore, please don't rely on this package. It only serves as a proof of concept and as an experiment to gather some experience building tools on top of ``pions/webrtc``.
## Status
The package is in alpha status.
## License
MIT License - see [LICENSE](LICENSE) for full text

389
conn.go Normal file
View File

@ -0,0 +1,389 @@
package libp2pwebrtcdirect
import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"time"
ic "github.com/libp2p/go-libp2p-crypto"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/pions/datachannel"
"github.com/pions/webrtc"
)
type connConfig struct {
transport *Transport
maAddr ma.Multiaddr
addr net.Addr
isServer bool
}
func newConnConfig(transport *Transport, maAddr ma.Multiaddr, isServer bool) (*connConfig, error) {
httpMa := maAddr.Decapsulate(webrtcma)
tcpMa := httpMa.Decapsulate(httpma)
addr, err := manet.ToNetAddr(tcpMa)
if err != nil {
return nil, fmt.Errorf("failed to get net addr: %v", err)
}
return &connConfig{
transport: transport,
maAddr: maAddr,
addr: addr,
isServer: isServer,
}, nil
}
// Conn is a stream-multiplexing connection to a remote peer.
type Conn struct {
config *connConfig
peerConnection *webrtc.RTCPeerConnection
initChannel *datachannel.DataChannel
lock sync.RWMutex
accept chan chan detachResult
isMuxed bool
muxedConn smux.Conn
}
func newConn(config *connConfig, pc *webrtc.RTCPeerConnection, initChannel *datachannel.DataChannel) *Conn {
conn := &Conn{
config: config,
peerConnection: pc,
initChannel: initChannel,
accept: make(chan chan detachResult),
isMuxed: config.transport.muxer != nil,
}
pc.OnDataChannel(func(dc *webrtc.RTCDataChannel) {
// We have to detach in OnDataChannel
detachRes := detachChannel(dc)
conn.accept <- detachRes
})
return conn
}
func dial(ctx context.Context, config *connConfig) (*Conn, error) {
api := config.transport.api
pc, err := api.NewRTCPeerConnection(config.transport.webrtcOptions)
if err != nil {
return nil, err
}
dc, err := pc.CreateDataChannel("data", nil)
if err != nil {
return nil, err
}
detachRes := detachChannel(dc)
offer, err := pc.CreateOffer(nil)
if err != nil {
return nil, err
}
offerEnc, err := encodeSignal(offer)
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", "http://"+config.addr.String()+"/?signal="+offerEnc, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
var client = &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
answerEnc, err := ioutil.ReadAll(resp.Body)
if err != nil && err != io.EOF {
return nil, err
}
answer, err := decodeSignal(string(answerEnc))
if err != nil {
return nil, err
}
if err := pc.SetRemoteDescription(answer); err != nil {
return nil, err
}
select {
case res := <-detachRes:
if res.err != nil {
return nil, res.err
}
return newConn(config, pc, res.dc), nil
case <-ctx.Done():
return newConn(config, pc, nil), ctx.Err()
}
}
type detachResult struct {
dc *datachannel.DataChannel
err error
}
func detachChannel(dc *webrtc.RTCDataChannel) chan detachResult {
onOpenRes := make(chan detachResult)
dc.OnOpen(func() {
// Detach the data channel
raw, err := dc.Detach()
onOpenRes <- detachResult{raw, err}
})
return onOpenRes
}
// Close closes the stream muxer and the the underlying net.Conn.
func (c *Conn) Close() error {
c.lock.Lock()
defer c.lock.Unlock()
var err error
if c.peerConnection != nil {
err = c.peerConnection.Close()
}
c.peerConnection = nil
close(c.accept)
return err
}
// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
func (c *Conn) IsClosed() bool {
c.lock.RLock()
pc := c.peerConnection
c.lock.RUnlock()
return pc == nil
}
// OpenStream creates a new stream.
func (c *Conn) OpenStream() (smux.Stream, error) {
muxed, err := c.getMuxed()
if err != nil {
return nil, err
}
if muxed != nil {
return muxed.OpenStream()
}
rawDC := c.checkInitChannel()
if rawDC == nil {
pc, err := c.getPC()
if err != nil {
return nil, err
}
dc, err := pc.CreateDataChannel("data", nil)
if err != nil {
return nil, err
}
detachRes := detachChannel(dc)
res := <-detachRes
if res.err != nil {
return nil, res.err
}
rawDC = res.dc
}
return newStream(rawDC), nil
}
func (c *Conn) getPC() (*webrtc.RTCPeerConnection, error) {
c.lock.RLock()
pc := c.peerConnection
c.lock.RUnlock()
if pc == nil {
return nil, errors.New("Conn closed")
}
return pc, nil
}
func (c *Conn) getMuxed() (smux.Conn, error) {
c.lock.Lock()
defer c.lock.Unlock()
if !c.isMuxed {
return nil, nil
}
if c.muxedConn != nil {
return c.muxedConn, nil
}
rawDC := c.initChannel
if rawDC == nil {
var err error
rawDC, err = c.awaitAccept()
if err != nil {
return nil, err
}
}
err := c.useMuxer(&dcWrapper{rawDC, c.config.addr}, c.config.transport.muxer)
if err != nil {
return nil, err
}
return c.muxedConn, nil
}
// Note: caller should hold the conn lock.
func (c *Conn) useMuxer(conn net.Conn, muxer smux.Transport) error {
muxed, err := muxer.NewConn(conn, c.config.isServer)
if err != nil {
return err
}
c.muxedConn = muxed
return nil
}
func (c *Conn) checkInitChannel() *datachannel.DataChannel {
c.lock.Lock()
defer c.lock.Unlock()
// Since a WebRTC offer can't be empty the offering side will have
// an initial data channel opened. We return it here, the first time
// OpenStream is called.
if c.initChannel != nil {
ch := c.initChannel
c.initChannel = nil
return ch
}
return nil
}
// AcceptStream accepts a stream opened by the other side.
func (c *Conn) AcceptStream() (smux.Stream, error) {
muxed, err := c.getMuxed()
if err != nil {
return nil, err
}
if muxed != nil {
return muxed.AcceptStream()
}
rawDC := c.checkInitChannel()
if rawDC == nil {
rawDC, err = c.awaitAccept()
}
return newStream(rawDC), nil
}
func (c *Conn) awaitAccept() (*datachannel.DataChannel, error) {
detachRes, ok := <-c.accept
if !ok {
return nil, errors.New("Conn closed")
}
res := <-detachRes
return res.dc, res.err
}
// LocalPeer returns our peer ID
func (c *Conn) LocalPeer() peer.ID {
// TODO: How to form a peer ID?
return peer.ID("")
}
// LocalPrivateKey returns our private key
func (c *Conn) LocalPrivateKey() ic.PrivKey {
// TODO: Expose from pions/webrtc?
return nil
}
// RemotePeer returns the peer ID of the remote peer.
func (c *Conn) RemotePeer() peer.ID {
// TODO: How to form a peer ID?
return peer.ID("")
}
// RemotePublicKey returns the public key of the remote peer.
func (c *Conn) RemotePublicKey() ic.PubKey {
// TODO: Expose from pions/webrtc?
return nil
}
// LocalMultiaddr returns the local Multiaddr associated
// with this connection
func (c *Conn) LocalMultiaddr() ma.Multiaddr {
return c.config.maAddr
}
// RemoteMultiaddr returns the remote Multiaddr associated
// with this connection
func (c *Conn) RemoteMultiaddr() ma.Multiaddr {
return c.config.maAddr
}
// Transport returns the transport to which this connection belongs.
func (c *Conn) Transport() tpt.Transport {
return c.config.transport
}
// dcWrapper wraps datachannel.DataChannel to form a net.Conn
type dcWrapper struct {
channel *datachannel.DataChannel
addr net.Addr
}
func (w *dcWrapper) Read(p []byte) (int, error) {
return w.channel.Read(p)
}
func (w *dcWrapper) Write(p []byte) (n int, err error) {
return w.channel.Write(p)
}
func (w *dcWrapper) Close() error {
return w.channel.Close()
}
func (w *dcWrapper) LocalAddr() net.Addr {
return w.addr
}
func (w *dcWrapper) RemoteAddr() net.Addr {
return w.addr
}
func (w *dcWrapper) SetDeadline(t time.Time) error {
return nil
}
func (w *dcWrapper) SetReadDeadline(t time.Time) error {
return nil
}
func (w *dcWrapper) SetWriteDeadline(t time.Time) error {
return nil
}

149
listener.go Normal file
View File

@ -0,0 +1,149 @@
package libp2pwebrtcdirect
import (
"context"
"errors"
"fmt"
"net"
"net/http"
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// Listener is an interface closely resembling the net.Listener interface.
type Listener struct {
config *connConfig
accept chan *Conn
srv *http.Server
}
func newListener(config *connConfig) (*Listener, error) {
ln, err := net.Listen(config.addr.Network(), config.addr.String())
if err != nil {
return nil, fmt.Errorf("failed to listen: %v", err)
}
// Update the addr after listening
tcpMa, err := manet.FromNetAddr(ln.Addr())
if err != nil {
return nil, fmt.Errorf("failed create ma: %v", err)
}
httpMa := tcpMa.Encapsulate(httpma)
maAddr := httpMa.Encapsulate(webrtcma)
config.addr = ln.Addr()
config.maAddr = maAddr
l := &Listener{
config: config,
accept: make(chan *Conn),
}
mux := http.NewServeMux()
mux.HandleFunc("/", l.handler)
srv := &http.Server{
Handler: mux,
}
go func() {
srvErr := srv.Serve(ln)
if srvErr != nil {
log.Warningf("failed to start server: %v", srvErr)
}
}()
l.srv = srv
return l, nil
}
func (l *Listener) handler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
signals, ok := r.Form["signal"]
if !ok || len(signals) != 1 {
log.Warningf("failed to handle request: failed to parse signal")
return
}
answer, err := l.handleSignal(signals[0])
if err != nil {
log.Warningf("failed to handle request: failed to setup connection: %v", err)
return
}
_, err = fmt.Fprint(w, answer)
if err != nil {
log.Warningf("failed to handle request: failed to send answer: %v", err)
return
}
}
func (l *Listener) handleSignal(offerStr string) (string, error) {
offer, err := decodeSignal(offerStr)
if err != nil {
return "", fmt.Errorf("failed to decode offer: %v", err)
}
api := l.config.transport.api
pc, err := api.NewRTCPeerConnection(l.config.transport.webrtcOptions)
if err != nil {
return "", err
}
if err := pc.SetRemoteDescription(offer); err != nil {
return "", fmt.Errorf("failed to set remote description: %v", err)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
return "", fmt.Errorf("failed to create answer: %v", err)
}
answerEnc, err := encodeSignal(answer)
if err != nil {
return "", fmt.Errorf("failed to encode answer: %v", err)
}
c := newConn(l.config, pc, nil)
l.accept <- c
return answerEnc, nil
}
// Accept waits for and returns the next connection to the listener.
func (l *Listener) Accept() (tpt.Conn, error) {
conn, ok := <-l.accept
if !ok {
return nil, errors.New("Listener closed")
}
return conn, nil
}
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (l *Listener) Close() error {
err := l.srv.Shutdown(context.Background())
if err != nil {
return err
}
close(l.accept)
return nil
}
// Addr returns the listener's network address.
func (l *Listener) Addr() net.Addr {
return l.config.addr
}
// Multiaddr returns the listener's network Multi address.
func (l *Listener) Multiaddr() ma.Multiaddr {
return l.config.maAddr
}

71
net.go
View File

@ -1,71 +0,0 @@
package libp2pwebrtcdirect
import (
"fmt"
"net"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// laddr and raddr don't make much sense. Maybe they can be nil?
func wrapNetListener(listener net.Listener, laddr, raddr ma.Multiaddr) manet.Listener {
return &maListener{
Listener: listener,
laddr: laddr,
raddr: raddr,
}
}
// maListener implements Listener
type maListener struct {
net.Listener
laddr ma.Multiaddr
raddr ma.Multiaddr
}
// Accept waits for and returns the next connection to the listener.
// Returns a Multiaddr friendly Conn
func (l *maListener) Accept() (manet.Conn, error) {
nconn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
fmt.Println("Accept: wrapNetConn")
return wrapNetConn(nconn, l.laddr, l.raddr), nil
}
// Multiaddr returns the listener's (local) Multiaddr.
func (l *maListener) Multiaddr() ma.Multiaddr {
return l.laddr
}
// laddr and raddr don't make much sense. Maybe they can be nil?
func wrapNetConn(conn net.Conn, laddr, raddr ma.Multiaddr) manet.Conn {
endpts := maEndpoints{
laddr: laddr,
raddr: raddr,
}
return &struct {
net.Conn
maEndpoints
}{conn, endpts}
}
type maEndpoints struct {
laddr ma.Multiaddr
raddr ma.Multiaddr
}
// LocalMultiaddr returns the local address associated with
// this connection
func (c *maEndpoints) LocalMultiaddr() ma.Multiaddr {
return c.laddr
}
// RemoteMultiaddr returns the remote address associated with
// this connection
func (c *maEndpoints) RemoteMultiaddr() ma.Multiaddr {
return c.raddr
}

View File

@ -1,186 +0,0 @@
package libp2pwebrtcdirect
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
multibase "github.com/multiformats/go-multibase"
"github.com/pions/dcnet"
"github.com/pions/webrtc"
)
func NewHTTPDirectSignaler(config webrtc.RTCConfiguration, address string) *HTTPDirectSignaler {
ctx, cancel := context.WithCancel(context.Background())
return &HTTPDirectSignaler{
config: webrtc.RTCConfiguration{},
address: address,
ctx: ctx,
cancel: cancel,
}
}
type HTTPDirectSignaler struct {
config webrtc.RTCConfiguration
address string
ctx context.Context
cancel func()
}
func (r *HTTPDirectSignaler) Dial() (*webrtc.RTCDataChannel, net.Addr, error) {
c, err := webrtc.New(r.config)
if err != nil {
return nil, nil, err
}
var dc *webrtc.RTCDataChannel
dc, err = c.CreateDataChannel("data", nil)
if err != nil {
return nil, nil, err
}
// TODO: migrate to OnNegotiationNeeded when available
offer, err := c.CreateOffer(nil)
if err != nil {
return nil, nil, err
}
offerEnc, err := Encode(offer)
if err != nil {
return nil, nil, err
}
resp, err := http.Get("http://" + r.address + "/?signal=" + offerEnc)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
answerEnc, err := ioutil.ReadAll(resp.Body)
if err != nil && err != io.EOF {
return nil, nil, err
}
answer, err := Decode(string(answerEnc))
if err != nil {
return nil, nil, err
}
if err := c.SetRemoteDescription(answer); err != nil {
return nil, nil, err
}
return dc, &dcnet.NilAddr{}, nil
}
func (r *HTTPDirectSignaler) Accept() (*webrtc.RTCDataChannel, net.Addr, error) {
c, err := webrtc.New(r.config)
if err != nil {
return nil, nil, err
}
//c.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) {
// fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
//}
var dc *webrtc.RTCDataChannel
res := make(chan *webrtc.RTCDataChannel)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
signals, ok := r.Form["signal"]
if !ok || len(signals) != 1 {
fmt.Println("Failed get offer")
return
}
offer, err := Decode(signals[0])
if err != nil {
fmt.Println("Failed to decode offer:", err)
return
}
if err := c.SetRemoteDescription(offer); err != nil {
fmt.Println("Failed to set remote description:", err)
return
}
answer, err := c.CreateAnswer(nil)
if err != nil {
fmt.Println("Failed to create answer:", err)
return
}
answerEnc, err := Encode(answer)
if err != nil {
fmt.Println("Failed to encode answer:", err)
return
}
_, err = fmt.Fprint(w, answerEnc)
if err != nil {
fmt.Println("Failed to send answer:", err)
return
}
c.OnDataChannel = func(d *webrtc.RTCDataChannel) {
res <- d
}
})
srv := &http.Server{
Addr: r.address,
Handler: mux,
}
go srv.ListenAndServe()
select {
case dc = <-res:
case <-r.ctx.Done():
return nil, nil, errors.New("signaler closed")
}
return dc, &dcnet.NilAddr{}, nil
}
func Encode(desc webrtc.RTCSessionDescription) (string, error) {
descData, err := json.Marshal(desc)
if err != nil {
return "", fmt.Errorf("failed to marshal description: %v", err)
}
descEnc, err := multibase.Encode(multibase.Base58BTC, descData)
if err != nil {
return "", fmt.Errorf("failed to encode description: %v", err)
}
return descEnc, nil
}
func Decode(descEnc string) (webrtc.RTCSessionDescription, error) {
var desc webrtc.RTCSessionDescription
_, descData, err := multibase.Decode(descEnc)
if err != nil {
return desc, fmt.Errorf("failed to decode description: %v", err)
}
err = json.Unmarshal(descData, &desc)
if err != nil {
return desc, fmt.Errorf("failed to unmarshal description: %v", err)
}
return desc, nil
}
func (r *HTTPDirectSignaler) Close() error {
r.cancel()
return nil
}
func (r *HTTPDirectSignaler) Addr() net.Addr {
return &dcnet.NilAddr{}
}

38
signaling.go Normal file
View File

@ -0,0 +1,38 @@
package libp2pwebrtcdirect
import (
"encoding/json"
"fmt"
multibase "github.com/multiformats/go-multibase"
"github.com/pions/webrtc"
)
func encodeSignal(desc webrtc.RTCSessionDescription) (string, error) {
descData, err := json.Marshal(desc)
if err != nil {
return "", fmt.Errorf("failed to marshal description: %v", err)
}
descEnc, err := multibase.Encode(multibase.Base58BTC, descData)
if err != nil {
return "", fmt.Errorf("failed to encode description: %v", err)
}
return descEnc, nil
}
func decodeSignal(descEnc string) (webrtc.RTCSessionDescription, error) {
var desc webrtc.RTCSessionDescription
_, descData, err := multibase.Decode(descEnc)
if err != nil {
return desc, fmt.Errorf("failed to decode description: %v", err)
}
err = json.Unmarshal(descData, &desc)
if err != nil {
return desc, fmt.Errorf("failed to unmarshal description: %v", err)
}
return desc, nil
}

62
stream.go Normal file
View File

@ -0,0 +1,62 @@
package libp2pwebrtcdirect
import (
"io"
"time"
"github.com/pions/datachannel"
)
// Stream is a bidirectional io pipe within a connection.
type Stream struct {
channel *datachannel.DataChannel
}
func newStream(channel *datachannel.DataChannel) *Stream {
return &Stream{channel: channel}
}
// Read implements the io.Reader.
func (s *Stream) Read(p []byte) (int, error) {
i, err := s.channel.Read(p)
if err != nil {
// pions/datachannel retuns an error when the underlying transport
// is closed. Here we turn this into EOF.
return i, io.EOF
}
return i, nil
}
// Write implements the io.Writer.
func (s *Stream) Write(p []byte) (int, error) {
return s.channel.Write(p)
}
// Close closes the stream for writing. Reading will still work (that
// is, the remote side can still write).
func (s *Stream) Close() error {
// TODO: figure out close vs reset
return nil
}
// Reset closes both ends of the stream. Use this to tell the remote
// side to hang up and go away.
func (s *Stream) Reset() error {
// TODO: figure out close vs reset
return s.channel.Close()
}
// SetDeadline is a stub
func (s *Stream) SetDeadline(t time.Time) error {
return nil
}
// SetReadDeadline is a stub
func (s *Stream) SetReadDeadline(t time.Time) error {
return nil
}
// SetWriteDeadline is a stub
func (s *Stream) SetWriteDeadline(t time.Time) error {
return nil
}

92
transport.go Normal file
View File

@ -0,0 +1,92 @@
package libp2pwebrtcdirect
import (
"context"
"fmt"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pions/webrtc"
mafmt "github.com/whyrusleeping/mafmt"
)
// Transport is the WebRTC transport.
type Transport struct {
webrtcOptions webrtc.RTCConfiguration
muxer smux.Transport
api *webrtc.API
}
// NewTransport creates a WebRTC transport that signals over a direct HTTP connection.
// It is currently required to provide a muxer.
func NewTransport(webrtcOptions webrtc.RTCConfiguration, muxer smux.Transport) *Transport {
s := webrtc.SettingEngine{}
// Use Detach data channels mode
s.DetachDataChannels()
api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
return &Transport{
webrtcOptions: webrtcOptions,
muxer: muxer, // TODO: Make the muxer optional
api: api,
}
}
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *Transport) CanDial(addr ma.Multiaddr) bool {
return mafmt.WebRTCDirect.Matches(addr)
}
// Dial dials the peer at the remote address.
func (t *Transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
if !t.CanDial(raddr) {
return nil, fmt.Errorf("can't dial address %s", raddr)
}
cfg, err := newConnConfig(t, raddr, false)
if err != nil {
return nil, fmt.Errorf("failed to get dial args: %v", err)
}
conn, err := dial(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create connection: %v", err)
}
return conn, nil
}
// Listen listens on the given multiaddr.
func (t *Transport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
if !t.CanDial(laddr) {
return nil, fmt.Errorf("can't listen on address %s", laddr)
}
cfg, err := newConnConfig(t, laddr, true)
if err != nil {
return nil, fmt.Errorf("failed to get dial args: %v", err)
}
l, err := newListener(cfg)
if err != nil {
return nil, fmt.Errorf("failed to listen: %v", err)
}
return l, nil
}
// Protocols returns the list of terminal protocols this transport can dial.
func (t *Transport) Protocols() []int {
return []int{ma.P_P2P_WEBRTC_DIRECT}
}
// Proxy always returns false for the TCP transport.
func (t *Transport) Proxy() bool {
return false
}
func (t *Transport) String() string {
return "p2p-webrtc-direct"
}

View File

@ -1,99 +1,19 @@
package libp2pwebrtcdirect
import (
"context"
"fmt"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
smux "github.com/libp2p/go-stream-muxer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/pions/dcnet"
"github.com/pions/webrtc"
mafmt "github.com/whyrusleeping/mafmt"
)
var log = logging.Logger("webrtcdirect-tpt")
var webrtcma, _ = ma.NewMultiaddr("/p2p-webrtc-direct")
var httpma, _ = ma.NewMultiaddr("/http")
// WebRTCDirectTransport is the TCP transport.
type WebRTCDirectTransport struct {
webrtcOptions webrtc.RTCConfiguration
// Connection upgrader for upgrading insecure stream connections to
// secure multiplex connections.
Upgrader *tptu.Upgrader
}
var _ tpt.Transport = &Transport{}
var _ tpt.Conn = &Conn{}
var _ tpt.Listener = &Listener{}
var _ tpt.Transport = &WebRTCDirectTransport{}
// NewWebRTCDirectTransport creates a WebRTC transport that signals over a direct HTTP connection.
func NewWebRTCDirectTransport(webrtcOptions webrtc.RTCConfiguration, upgrader *tptu.Upgrader) *WebRTCDirectTransport {
return &WebRTCDirectTransport{
webrtcOptions: webrtcOptions,
Upgrader: upgrader,
}
}
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *WebRTCDirectTransport) CanDial(addr ma.Multiaddr) bool {
return mafmt.WebRTCDirect.Matches(addr)
}
// Dial dials the peer at the remote address.
func (t *WebRTCDirectTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
if !t.CanDial(raddr) {
return nil, fmt.Errorf("can't dial address %s", raddr)
}
httpMa := raddr.Decapsulate(webrtcma)
_, httpAddr, err := manet.DialArgs(httpMa)
if err != nil {
return nil, fmt.Errorf("failed to get dial args: %v", err)
}
signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr)
conn, err := dcnet.Dial(signaler)
if err != nil {
return nil, fmt.Errorf("failed to dial: %v", err)
}
wrappedConn := wrapNetConn(conn, raddr, raddr)
return t.Upgrader.UpgradeOutbound(ctx, t, wrappedConn, p)
}
// Listen listens on the given multiaddr.
func (t *WebRTCDirectTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
if !t.CanDial(laddr) {
return nil, fmt.Errorf("can't listen on address %s", laddr)
}
httpMa := laddr.Decapsulate(webrtcma)
_, httpAddr, err := manet.DialArgs(httpMa)
if err != nil {
return nil, fmt.Errorf("failed to get dial args: %v", err)
}
signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr)
listener := dcnet.NewListener(signaler)
wrappedListener := wrapNetListener(listener, laddr, laddr)
return t.Upgrader.UpgradeListener(t, wrappedListener), nil
}
// Protocols returns the list of terminal protocols this transport can dial.
func (t *WebRTCDirectTransport) Protocols() []int {
return []int{ma.P_P2P_WEBRTC_DIRECT}
}
// Proxy always returns false for the TCP transport.
func (t *WebRTCDirectTransport) Proxy() bool {
return false
}
func (t *WebRTCDirectTransport) String() string {
return "p2p-webrtc-direct"
}
var _ smux.Stream = &Stream{}

View File

@ -5,10 +5,10 @@ import (
"runtime"
"testing"
"github.com/libp2p/go-conn-security/insecure"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
utils "github.com/libp2p/go-libp2p-transport/test"
ma "github.com/multiformats/go-multiaddr"
"github.com/pions/webrtc"
@ -16,20 +16,37 @@ import (
)
var Subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){
// utils.SubtestProtocols,
// utils.SubtestBasic,
// utils.SubtestCancel,
// utils.SubtestPingPong,
utils.SubtestProtocols,
utils.SubtestBasic,
utils.SubtestCancel,
utils.SubtestPingPong,
// Stolen from the stream muxer test suite.
utils.SubtestStress1Conn1Stream1Msg,
// utils.SubtestStress1Conn1Stream100Msg,
// utils.SubtestStress1Conn100Stream100Msg,
// utils.SubtestStress50Conn10Stream50Msg,
// utils.SubtestStress1Conn1000Stream10Msg,
// utils.SubtestStress1Conn100Stream100Msg10MB,
// utils.SubtestStreamOpenStress,
// utils.SubtestStreamReset,
// utils.SubtestStress1Conn1Stream100Msg, // Flaky (WIP on SCTP issues)
// utils.SubtestStress1Conn100Stream100Msg, // Flaky (WIP on SCTP issues)
// utils.SubtestStress50Conn10Stream50Msg, // TODO
// utils.SubtestStress1Conn1000Stream10Msg, // TODO
// utils.SubtestStress1Conn100Stream100Msg10MB, // TODO
// utils.SubtestStreamOpenStress, // Passes with higher timeout
utils.SubtestStreamReset,
}
func TestTransport(t *testing.T) {
logging.SetLogLevel("*", "warning")
ta := NewTransport(
webrtc.RTCConfiguration{},
new(mplex.Transport),
)
tb := NewTransport(
webrtc.RTCConfiguration{},
new(mplex.Transport),
)
addr := "/ip4/127.0.0.1/tcp/0/http/p2p-webrtc-direct"
SubtestTransport(t, ta, tb, addr, "peerA")
}
func getFunctionName(i interface{}) string {
@ -47,39 +64,15 @@ func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA pee
})
}
}
func TestWebRTCDirectTransport(t *testing.T) {
ta := NewWebRTCDirectTransport(
webrtc.RTCConfiguration{},
&tptu.Upgrader{
Secure: insecure.New("peerA"),
Muxer: new(mplex.Transport),
},
)
tb := NewWebRTCDirectTransport(
webrtc.RTCConfiguration{},
&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
},
)
addr := "/ip4/127.0.0.1/tcp/50000/http/p2p-webrtc-direct"
SubtestTransport(t, ta, tb, addr, "peerA")
}
func TestWebRTCDirectTransportCantListenUtp(t *testing.T) {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001")
func TestTransportCantListenUtp(t *testing.T) {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/50000")
if err != nil {
t.Fatal(err)
}
tpt := NewWebRTCDirectTransport(
tpt := NewTransport(
webrtc.RTCConfiguration{},
&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
},
new(mplex.Transport),
)
_, err = tpt.Listen(utpa)