From cecfbdd6c744b564e1f2c98ef4f505ca56d99b7c Mon Sep 17 00:00:00 2001 From: backkem Date: Sun, 30 Sep 2018 21:33:55 +0200 Subject: [PATCH] Initial POC --- .gitignore | 2 + README.md | 8 ++ integration/dial.js | 27 ++++++ integration/listen.js | 19 +++++ net.go | 71 ++++++++++++++++ signaler.go | 186 ++++++++++++++++++++++++++++++++++++++++++ webrtcdirect.go | 99 ++++++++++++++++++++++ webrtcdirect_test.go | 90 ++++++++++++++++++++ 8 files changed, 502 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 integration/dial.js create mode 100644 integration/listen.js create mode 100644 net.go create mode 100644 signaler.go create mode 100644 webrtcdirect.go create mode 100644 webrtcdirect_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ccb2c80 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules/ +package-lock.json \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..42a9ec8 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +Package ``go-libp2p-webrtc-direct`` is a Golang version of the webrtc-direct libp2p transport. + +Please refer to ``pions/webrtc`` for additional installation instructions. +The package also requires the following forks to be checked out under their original package name: +- ``backkem/go-multiaddr`` +- ``backkem/mafmt`` + +The transport passes the ``SubtestStress1Conn1Stream1Msg`` test case but there is a long list of known limitations. Therefore, please don't rely on this package. It only serves as a proof of concept and as an experiment to gather some experience building tools on top of ``pions/webrtc``. \ No newline at end of file diff --git a/integration/dial.js b/integration/dial.js new file mode 100644 index 0000000..b16effc --- /dev/null +++ b/integration/dial.js @@ -0,0 +1,27 @@ + +const WebRTCDirect = require('libp2p-webrtc-direct') +const multiaddr = require('multiaddr') +const pull = require('pull-stream') + +const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') + +const direct = new WebRTCDirect() + + +direct.dial(mh, (err, conn) => { + if(err){ + console.log(`Error: ${err}`) + } + + console.log(`dial success`) + pull( + conn, + pull.collect((err, values) => { + if (!err) { + console.log(`Value: ${values.toString()}`) + } else { + console.log(`Error: ${err}`) + } + }), + ) + }) \ No newline at end of file diff --git a/integration/listen.js b/integration/listen.js new file mode 100644 index 0000000..9068077 --- /dev/null +++ b/integration/listen.js @@ -0,0 +1,19 @@ +const WebRTCDirect = require('libp2p-webrtc-direct') +const multiaddr = require('multiaddr') +const pull = require('pull-stream') + +const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct') + +const direct = new WebRTCDirect() + +const listener = direct.createListener({config:{}},(socket) => { + console.log('new connection opened') + pull( + pull.values(['hello']), + socket + ) +}) + +listener.listen(mh, () => { + console.log('listening') +}) \ No newline at end of file diff --git a/net.go b/net.go new file mode 100644 index 0000000..ec3e31f --- /dev/null +++ b/net.go @@ -0,0 +1,71 @@ +package libp2pwebrtcdirect + +import ( + "fmt" + "net" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" +) + +// laddr and raddr don't make much sense. Maybe they can be nil? +func wrapNetListener(listener net.Listener, laddr, raddr ma.Multiaddr) manet.Listener { + return &maListener{ + Listener: listener, + laddr: laddr, + raddr: raddr, + } +} + +// maListener implements Listener +type maListener struct { + net.Listener + laddr ma.Multiaddr + raddr ma.Multiaddr +} + +// Accept waits for and returns the next connection to the listener. +// Returns a Multiaddr friendly Conn +func (l *maListener) Accept() (manet.Conn, error) { + nconn, err := l.Listener.Accept() + if err != nil { + return nil, err + } + fmt.Println("Accept: wrapNetConn") + return wrapNetConn(nconn, l.laddr, l.raddr), nil +} + +// Multiaddr returns the listener's (local) Multiaddr. +func (l *maListener) Multiaddr() ma.Multiaddr { + return l.laddr +} + +// laddr and raddr don't make much sense. Maybe they can be nil? +func wrapNetConn(conn net.Conn, laddr, raddr ma.Multiaddr) manet.Conn { + endpts := maEndpoints{ + laddr: laddr, + raddr: raddr, + } + + return &struct { + net.Conn + maEndpoints + }{conn, endpts} +} + +type maEndpoints struct { + laddr ma.Multiaddr + raddr ma.Multiaddr +} + +// LocalMultiaddr returns the local address associated with +// this connection +func (c *maEndpoints) LocalMultiaddr() ma.Multiaddr { + return c.laddr +} + +// RemoteMultiaddr returns the remote address associated with +// this connection +func (c *maEndpoints) RemoteMultiaddr() ma.Multiaddr { + return c.raddr +} diff --git a/signaler.go b/signaler.go new file mode 100644 index 0000000..5b169e8 --- /dev/null +++ b/signaler.go @@ -0,0 +1,186 @@ +package libp2pwebrtcdirect + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + + multibase "github.com/multiformats/go-multibase" + "github.com/pions/dcnet" + "github.com/pions/webrtc" +) + +func NewHTTPDirectSignaler(config webrtc.RTCConfiguration, address string) *HTTPDirectSignaler { + ctx, cancel := context.WithCancel(context.Background()) + return &HTTPDirectSignaler{ + config: webrtc.RTCConfiguration{}, + address: address, + ctx: ctx, + cancel: cancel, + } +} + +type HTTPDirectSignaler struct { + config webrtc.RTCConfiguration + address string + ctx context.Context + cancel func() +} + +func (r *HTTPDirectSignaler) Dial() (*webrtc.RTCDataChannel, net.Addr, error) { + c, err := webrtc.New(r.config) + if err != nil { + return nil, nil, err + } + + var dc *webrtc.RTCDataChannel + dc, err = c.CreateDataChannel("data", nil) + if err != nil { + return nil, nil, err + } + + // TODO: migrate to OnNegotiationNeeded when available + offer, err := c.CreateOffer(nil) + if err != nil { + return nil, nil, err + } + + offerEnc, err := Encode(offer) + if err != nil { + return nil, nil, err + } + + resp, err := http.Get("http://" + r.address + "/?signal=" + offerEnc) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + answerEnc, err := ioutil.ReadAll(resp.Body) + if err != nil && err != io.EOF { + return nil, nil, err + } + + answer, err := Decode(string(answerEnc)) + if err != nil { + return nil, nil, err + } + + if err := c.SetRemoteDescription(answer); err != nil { + return nil, nil, err + } + return dc, &dcnet.NilAddr{}, nil +} + +func (r *HTTPDirectSignaler) Accept() (*webrtc.RTCDataChannel, net.Addr, error) { + c, err := webrtc.New(r.config) + if err != nil { + return nil, nil, err + } + //c.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) { + // fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String()) + //} + + var dc *webrtc.RTCDataChannel + res := make(chan *webrtc.RTCDataChannel) + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + signals, ok := r.Form["signal"] + if !ok || len(signals) != 1 { + fmt.Println("Failed get offer") + return + } + + offer, err := Decode(signals[0]) + if err != nil { + fmt.Println("Failed to decode offer:", err) + return + } + + if err := c.SetRemoteDescription(offer); err != nil { + fmt.Println("Failed to set remote description:", err) + return + } + + answer, err := c.CreateAnswer(nil) + if err != nil { + fmt.Println("Failed to create answer:", err) + return + } + + answerEnc, err := Encode(answer) + if err != nil { + fmt.Println("Failed to encode answer:", err) + return + } + + _, err = fmt.Fprint(w, answerEnc) + if err != nil { + fmt.Println("Failed to send answer:", err) + return + } + + c.OnDataChannel = func(d *webrtc.RTCDataChannel) { + res <- d + } + + }) + + srv := &http.Server{ + Addr: r.address, + Handler: mux, + } + + go srv.ListenAndServe() + + select { + case dc = <-res: + case <-r.ctx.Done(): + return nil, nil, errors.New("signaler closed") + } + return dc, &dcnet.NilAddr{}, nil +} + +func Encode(desc webrtc.RTCSessionDescription) (string, error) { + descData, err := json.Marshal(desc) + if err != nil { + return "", fmt.Errorf("failed to marshal description: %v", err) + } + + descEnc, err := multibase.Encode(multibase.Base58BTC, descData) + if err != nil { + return "", fmt.Errorf("failed to encode description: %v", err) + } + return descEnc, nil +} + +func Decode(descEnc string) (webrtc.RTCSessionDescription, error) { + var desc webrtc.RTCSessionDescription + + _, descData, err := multibase.Decode(descEnc) + if err != nil { + return desc, fmt.Errorf("failed to decode description: %v", err) + } + + err = json.Unmarshal(descData, &desc) + if err != nil { + return desc, fmt.Errorf("failed to unmarshal description: %v", err) + } + + return desc, nil +} + +func (r *HTTPDirectSignaler) Close() error { + r.cancel() + return nil +} + +func (r *HTTPDirectSignaler) Addr() net.Addr { + return &dcnet.NilAddr{} +} diff --git a/webrtcdirect.go b/webrtcdirect.go new file mode 100644 index 0000000..a1db933 --- /dev/null +++ b/webrtcdirect.go @@ -0,0 +1,99 @@ +package libp2pwebrtcdirect + +import ( + "context" + "fmt" + + logging "github.com/ipfs/go-log" + peer "github.com/libp2p/go-libp2p-peer" + tpt "github.com/libp2p/go-libp2p-transport" + tptu "github.com/libp2p/go-libp2p-transport-upgrader" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" + "github.com/pions/dcnet" + "github.com/pions/webrtc" + mafmt "github.com/whyrusleeping/mafmt" +) + +var log = logging.Logger("webrtcdirect-tpt") + +var webrtcma, _ = ma.NewMultiaddr("/p2p-webrtc-direct") + +// WebRTCDirectTransport is the TCP transport. +type WebRTCDirectTransport struct { + webrtcOptions webrtc.RTCConfiguration + // Connection upgrader for upgrading insecure stream connections to + // secure multiplex connections. + Upgrader *tptu.Upgrader +} + +var _ tpt.Transport = &WebRTCDirectTransport{} + +// NewWebRTCDirectTransport creates a WebRTC transport that signals over a direct HTTP connection. +func NewWebRTCDirectTransport(webrtcOptions webrtc.RTCConfiguration, upgrader *tptu.Upgrader) *WebRTCDirectTransport { + return &WebRTCDirectTransport{ + webrtcOptions: webrtcOptions, + Upgrader: upgrader, + } +} + +// CanDial returns true if this transport believes it can dial the given +// multiaddr. +func (t *WebRTCDirectTransport) CanDial(addr ma.Multiaddr) bool { + return mafmt.WebRTCDirect.Matches(addr) +} + +// Dial dials the peer at the remote address. +func (t *WebRTCDirectTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) { + if !t.CanDial(raddr) { + return nil, fmt.Errorf("can't dial address %s", raddr) + } + httpMa := raddr.Decapsulate(webrtcma) + _, httpAddr, err := manet.DialArgs(httpMa) + if err != nil { + return nil, fmt.Errorf("failed to get dial args: %v", err) + } + + signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr) + conn, err := dcnet.Dial(signaler) + if err != nil { + return nil, fmt.Errorf("failed to dial: %v", err) + } + + wrappedConn := wrapNetConn(conn, raddr, raddr) + + return t.Upgrader.UpgradeOutbound(ctx, t, wrappedConn, p) +} + +// Listen listens on the given multiaddr. +func (t *WebRTCDirectTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) { + if !t.CanDial(laddr) { + return nil, fmt.Errorf("can't listen on address %s", laddr) + } + httpMa := laddr.Decapsulate(webrtcma) + _, httpAddr, err := manet.DialArgs(httpMa) + if err != nil { + return nil, fmt.Errorf("failed to get dial args: %v", err) + } + + signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr) + listener := dcnet.NewListener(signaler) + + wrappedListener := wrapNetListener(listener, laddr, laddr) + + return t.Upgrader.UpgradeListener(t, wrappedListener), nil +} + +// Protocols returns the list of terminal protocols this transport can dial. +func (t *WebRTCDirectTransport) Protocols() []int { + return []int{ma.P_P2P_WEBRTC_DIRECT} +} + +// Proxy always returns false for the TCP transport. +func (t *WebRTCDirectTransport) Proxy() bool { + return false +} + +func (t *WebRTCDirectTransport) String() string { + return "p2p-webrtc-direct" +} diff --git a/webrtcdirect_test.go b/webrtcdirect_test.go new file mode 100644 index 0000000..a0686dd --- /dev/null +++ b/webrtcdirect_test.go @@ -0,0 +1,90 @@ +package libp2pwebrtcdirect + +import ( + "reflect" + "runtime" + "testing" + + "github.com/libp2p/go-conn-security/insecure" + peer "github.com/libp2p/go-libp2p-peer" + tpt "github.com/libp2p/go-libp2p-transport" + tptu "github.com/libp2p/go-libp2p-transport-upgrader" + utils "github.com/libp2p/go-libp2p-transport/test" + ma "github.com/multiformats/go-multiaddr" + "github.com/pions/webrtc" + mplex "github.com/whyrusleeping/go-smux-multiplex" +) + +var Subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){ + // utils.SubtestProtocols, + // utils.SubtestBasic, + // utils.SubtestCancel, + // utils.SubtestPingPong, + + // Stolen from the stream muxer test suite. + utils.SubtestStress1Conn1Stream1Msg, + // utils.SubtestStress1Conn1Stream100Msg, + // utils.SubtestStress1Conn100Stream100Msg, + // utils.SubtestStress50Conn10Stream50Msg, + // utils.SubtestStress1Conn1000Stream10Msg, + // utils.SubtestStress1Conn100Stream100Msg10MB, + // utils.SubtestStreamOpenStress, + // utils.SubtestStreamReset, +} + +func getFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} + +func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA peer.ID) { + maddr, err := ma.NewMultiaddr(addr) + if err != nil { + t.Fatal(err) + } + for _, f := range Subtests { + t.Run(getFunctionName(f), func(t *testing.T) { + f(t, ta, tb, maddr, peerA) + }) + } +} + +func TestWebRTCDirectTransport(t *testing.T) { + ta := NewWebRTCDirectTransport( + webrtc.RTCConfiguration{}, + &tptu.Upgrader{ + Secure: insecure.New("peerA"), + Muxer: new(mplex.Transport), + }, + ) + tb := NewWebRTCDirectTransport( + webrtc.RTCConfiguration{}, + &tptu.Upgrader{ + Secure: insecure.New("peerB"), + Muxer: new(mplex.Transport), + }, + ) + + addr := "/ip4/127.0.0.1/tcp/50000/http/p2p-webrtc-direct" + SubtestTransport(t, ta, tb, addr, "peerA") +} + +func TestWebRTCDirectTransportCantListenUtp(t *testing.T) { + utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001") + if err != nil { + t.Fatal(err) + } + + tpt := NewWebRTCDirectTransport( + webrtc.RTCConfiguration{}, + &tptu.Upgrader{ + Secure: insecure.New("peerB"), + Muxer: new(mplex.Transport), + }, + ) + + _, err = tpt.Listen(utpa) + if err == nil { + t.Fatal("shouldnt be able to listen on utp addr with tcp transport") + } + +}