Initial POC

This commit is contained in:
backkem 2018-09-30 21:33:55 +02:00
commit cecfbdd6c7
8 changed files with 502 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
node_modules/
package-lock.json

8
README.md Normal file
View File

@ -0,0 +1,8 @@
Package ``go-libp2p-webrtc-direct`` is a Golang version of the webrtc-direct libp2p transport.
Please refer to ``pions/webrtc`` for additional installation instructions.
The package also requires the following forks to be checked out under their original package name:
- ``backkem/go-multiaddr``
- ``backkem/mafmt``
The transport passes the ``SubtestStress1Conn1Stream1Msg`` test case but there is a long list of known limitations. Therefore, please don't rely on this package. It only serves as a proof of concept and as an experiment to gather some experience building tools on top of ``pions/webrtc``.

27
integration/dial.js Normal file
View File

@ -0,0 +1,27 @@
const WebRTCDirect = require('libp2p-webrtc-direct')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct')
const direct = new WebRTCDirect()
direct.dial(mh, (err, conn) => {
if(err){
console.log(`Error: ${err}`)
}
console.log(`dial success`)
pull(
conn,
pull.collect((err, values) => {
if (!err) {
console.log(`Value: ${values.toString()}`)
} else {
console.log(`Error: ${err}`)
}
}),
)
})

19
integration/listen.js Normal file
View File

@ -0,0 +1,19 @@
const WebRTCDirect = require('libp2p-webrtc-direct')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/http/p2p-webrtc-direct')
const direct = new WebRTCDirect()
const listener = direct.createListener({config:{}},(socket) => {
console.log('new connection opened')
pull(
pull.values(['hello']),
socket
)
})
listener.listen(mh, () => {
console.log('listening')
})

71
net.go Normal file
View File

@ -0,0 +1,71 @@
package libp2pwebrtcdirect
import (
"fmt"
"net"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)
// laddr and raddr don't make much sense. Maybe they can be nil?
func wrapNetListener(listener net.Listener, laddr, raddr ma.Multiaddr) manet.Listener {
return &maListener{
Listener: listener,
laddr: laddr,
raddr: raddr,
}
}
// maListener implements Listener
type maListener struct {
net.Listener
laddr ma.Multiaddr
raddr ma.Multiaddr
}
// Accept waits for and returns the next connection to the listener.
// Returns a Multiaddr friendly Conn
func (l *maListener) Accept() (manet.Conn, error) {
nconn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
fmt.Println("Accept: wrapNetConn")
return wrapNetConn(nconn, l.laddr, l.raddr), nil
}
// Multiaddr returns the listener's (local) Multiaddr.
func (l *maListener) Multiaddr() ma.Multiaddr {
return l.laddr
}
// laddr and raddr don't make much sense. Maybe they can be nil?
func wrapNetConn(conn net.Conn, laddr, raddr ma.Multiaddr) manet.Conn {
endpts := maEndpoints{
laddr: laddr,
raddr: raddr,
}
return &struct {
net.Conn
maEndpoints
}{conn, endpts}
}
type maEndpoints struct {
laddr ma.Multiaddr
raddr ma.Multiaddr
}
// LocalMultiaddr returns the local address associated with
// this connection
func (c *maEndpoints) LocalMultiaddr() ma.Multiaddr {
return c.laddr
}
// RemoteMultiaddr returns the remote address associated with
// this connection
func (c *maEndpoints) RemoteMultiaddr() ma.Multiaddr {
return c.raddr
}

186
signaler.go Normal file
View File

@ -0,0 +1,186 @@
package libp2pwebrtcdirect
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
multibase "github.com/multiformats/go-multibase"
"github.com/pions/dcnet"
"github.com/pions/webrtc"
)
func NewHTTPDirectSignaler(config webrtc.RTCConfiguration, address string) *HTTPDirectSignaler {
ctx, cancel := context.WithCancel(context.Background())
return &HTTPDirectSignaler{
config: webrtc.RTCConfiguration{},
address: address,
ctx: ctx,
cancel: cancel,
}
}
type HTTPDirectSignaler struct {
config webrtc.RTCConfiguration
address string
ctx context.Context
cancel func()
}
func (r *HTTPDirectSignaler) Dial() (*webrtc.RTCDataChannel, net.Addr, error) {
c, err := webrtc.New(r.config)
if err != nil {
return nil, nil, err
}
var dc *webrtc.RTCDataChannel
dc, err = c.CreateDataChannel("data", nil)
if err != nil {
return nil, nil, err
}
// TODO: migrate to OnNegotiationNeeded when available
offer, err := c.CreateOffer(nil)
if err != nil {
return nil, nil, err
}
offerEnc, err := Encode(offer)
if err != nil {
return nil, nil, err
}
resp, err := http.Get("http://" + r.address + "/?signal=" + offerEnc)
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
answerEnc, err := ioutil.ReadAll(resp.Body)
if err != nil && err != io.EOF {
return nil, nil, err
}
answer, err := Decode(string(answerEnc))
if err != nil {
return nil, nil, err
}
if err := c.SetRemoteDescription(answer); err != nil {
return nil, nil, err
}
return dc, &dcnet.NilAddr{}, nil
}
func (r *HTTPDirectSignaler) Accept() (*webrtc.RTCDataChannel, net.Addr, error) {
c, err := webrtc.New(r.config)
if err != nil {
return nil, nil, err
}
//c.OnICEConnectionStateChange = func(connectionState ice.ConnectionState) {
// fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
//}
var dc *webrtc.RTCDataChannel
res := make(chan *webrtc.RTCDataChannel)
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
signals, ok := r.Form["signal"]
if !ok || len(signals) != 1 {
fmt.Println("Failed get offer")
return
}
offer, err := Decode(signals[0])
if err != nil {
fmt.Println("Failed to decode offer:", err)
return
}
if err := c.SetRemoteDescription(offer); err != nil {
fmt.Println("Failed to set remote description:", err)
return
}
answer, err := c.CreateAnswer(nil)
if err != nil {
fmt.Println("Failed to create answer:", err)
return
}
answerEnc, err := Encode(answer)
if err != nil {
fmt.Println("Failed to encode answer:", err)
return
}
_, err = fmt.Fprint(w, answerEnc)
if err != nil {
fmt.Println("Failed to send answer:", err)
return
}
c.OnDataChannel = func(d *webrtc.RTCDataChannel) {
res <- d
}
})
srv := &http.Server{
Addr: r.address,
Handler: mux,
}
go srv.ListenAndServe()
select {
case dc = <-res:
case <-r.ctx.Done():
return nil, nil, errors.New("signaler closed")
}
return dc, &dcnet.NilAddr{}, nil
}
func Encode(desc webrtc.RTCSessionDescription) (string, error) {
descData, err := json.Marshal(desc)
if err != nil {
return "", fmt.Errorf("failed to marshal description: %v", err)
}
descEnc, err := multibase.Encode(multibase.Base58BTC, descData)
if err != nil {
return "", fmt.Errorf("failed to encode description: %v", err)
}
return descEnc, nil
}
func Decode(descEnc string) (webrtc.RTCSessionDescription, error) {
var desc webrtc.RTCSessionDescription
_, descData, err := multibase.Decode(descEnc)
if err != nil {
return desc, fmt.Errorf("failed to decode description: %v", err)
}
err = json.Unmarshal(descData, &desc)
if err != nil {
return desc, fmt.Errorf("failed to unmarshal description: %v", err)
}
return desc, nil
}
func (r *HTTPDirectSignaler) Close() error {
r.cancel()
return nil
}
func (r *HTTPDirectSignaler) Addr() net.Addr {
return &dcnet.NilAddr{}
}

99
webrtcdirect.go Normal file
View File

@ -0,0 +1,99 @@
package libp2pwebrtcdirect
import (
"context"
"fmt"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"github.com/pions/dcnet"
"github.com/pions/webrtc"
mafmt "github.com/whyrusleeping/mafmt"
)
var log = logging.Logger("webrtcdirect-tpt")
var webrtcma, _ = ma.NewMultiaddr("/p2p-webrtc-direct")
// WebRTCDirectTransport is the TCP transport.
type WebRTCDirectTransport struct {
webrtcOptions webrtc.RTCConfiguration
// Connection upgrader for upgrading insecure stream connections to
// secure multiplex connections.
Upgrader *tptu.Upgrader
}
var _ tpt.Transport = &WebRTCDirectTransport{}
// NewWebRTCDirectTransport creates a WebRTC transport that signals over a direct HTTP connection.
func NewWebRTCDirectTransport(webrtcOptions webrtc.RTCConfiguration, upgrader *tptu.Upgrader) *WebRTCDirectTransport {
return &WebRTCDirectTransport{
webrtcOptions: webrtcOptions,
Upgrader: upgrader,
}
}
// CanDial returns true if this transport believes it can dial the given
// multiaddr.
func (t *WebRTCDirectTransport) CanDial(addr ma.Multiaddr) bool {
return mafmt.WebRTCDirect.Matches(addr)
}
// Dial dials the peer at the remote address.
func (t *WebRTCDirectTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
if !t.CanDial(raddr) {
return nil, fmt.Errorf("can't dial address %s", raddr)
}
httpMa := raddr.Decapsulate(webrtcma)
_, httpAddr, err := manet.DialArgs(httpMa)
if err != nil {
return nil, fmt.Errorf("failed to get dial args: %v", err)
}
signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr)
conn, err := dcnet.Dial(signaler)
if err != nil {
return nil, fmt.Errorf("failed to dial: %v", err)
}
wrappedConn := wrapNetConn(conn, raddr, raddr)
return t.Upgrader.UpgradeOutbound(ctx, t, wrappedConn, p)
}
// Listen listens on the given multiaddr.
func (t *WebRTCDirectTransport) Listen(laddr ma.Multiaddr) (tpt.Listener, error) {
if !t.CanDial(laddr) {
return nil, fmt.Errorf("can't listen on address %s", laddr)
}
httpMa := laddr.Decapsulate(webrtcma)
_, httpAddr, err := manet.DialArgs(httpMa)
if err != nil {
return nil, fmt.Errorf("failed to get dial args: %v", err)
}
signaler := NewHTTPDirectSignaler(t.webrtcOptions, httpAddr)
listener := dcnet.NewListener(signaler)
wrappedListener := wrapNetListener(listener, laddr, laddr)
return t.Upgrader.UpgradeListener(t, wrappedListener), nil
}
// Protocols returns the list of terminal protocols this transport can dial.
func (t *WebRTCDirectTransport) Protocols() []int {
return []int{ma.P_P2P_WEBRTC_DIRECT}
}
// Proxy always returns false for the TCP transport.
func (t *WebRTCDirectTransport) Proxy() bool {
return false
}
func (t *WebRTCDirectTransport) String() string {
return "p2p-webrtc-direct"
}

90
webrtcdirect_test.go Normal file
View File

@ -0,0 +1,90 @@
package libp2pwebrtcdirect
import (
"reflect"
"runtime"
"testing"
"github.com/libp2p/go-conn-security/insecure"
peer "github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
tptu "github.com/libp2p/go-libp2p-transport-upgrader"
utils "github.com/libp2p/go-libp2p-transport/test"
ma "github.com/multiformats/go-multiaddr"
"github.com/pions/webrtc"
mplex "github.com/whyrusleeping/go-smux-multiplex"
)
var Subtests = []func(t *testing.T, ta, tb tpt.Transport, maddr ma.Multiaddr, peerA peer.ID){
// utils.SubtestProtocols,
// utils.SubtestBasic,
// utils.SubtestCancel,
// utils.SubtestPingPong,
// Stolen from the stream muxer test suite.
utils.SubtestStress1Conn1Stream1Msg,
// utils.SubtestStress1Conn1Stream100Msg,
// utils.SubtestStress1Conn100Stream100Msg,
// utils.SubtestStress50Conn10Stream50Msg,
// utils.SubtestStress1Conn1000Stream10Msg,
// utils.SubtestStress1Conn100Stream100Msg10MB,
// utils.SubtestStreamOpenStress,
// utils.SubtestStreamReset,
}
func getFunctionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}
func SubtestTransport(t *testing.T, ta, tb tpt.Transport, addr string, peerA peer.ID) {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
t.Fatal(err)
}
for _, f := range Subtests {
t.Run(getFunctionName(f), func(t *testing.T) {
f(t, ta, tb, maddr, peerA)
})
}
}
func TestWebRTCDirectTransport(t *testing.T) {
ta := NewWebRTCDirectTransport(
webrtc.RTCConfiguration{},
&tptu.Upgrader{
Secure: insecure.New("peerA"),
Muxer: new(mplex.Transport),
},
)
tb := NewWebRTCDirectTransport(
webrtc.RTCConfiguration{},
&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
},
)
addr := "/ip4/127.0.0.1/tcp/50000/http/p2p-webrtc-direct"
SubtestTransport(t, ta, tb, addr, "peerA")
}
func TestWebRTCDirectTransportCantListenUtp(t *testing.T) {
utpa, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5001")
if err != nil {
t.Fatal(err)
}
tpt := NewWebRTCDirectTransport(
webrtc.RTCConfiguration{},
&tptu.Upgrader{
Secure: insecure.New("peerB"),
Muxer: new(mplex.Transport),
},
)
_, err = tpt.Listen(utpa)
if err == nil {
t.Fatal("shouldnt be able to listen on utp addr with tcp transport")
}
}