basic implementation: connections and streams

This commit is contained in:
vyzo 2018-09-13 16:47:08 +03:00
parent 5ba00f20b1
commit 7b98899214
6 changed files with 2331 additions and 0 deletions

178
conn.go Normal file
View File

@ -0,0 +1,178 @@
package p2pd
import (
"context"
"errors"
"io"
"net"
"time"
pb "github.com/libp2p/go-libp2p-daemon/pb"
ggio "github.com/gogo/protobuf/io"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
proto "github.com/libp2p/go-libp2p-protocol"
ma "github.com/multiformats/go-multiaddr"
)
const DefaultTimeout = 60 * time.Second
func (d *Daemon) handleConn(c net.Conn) {
defer c.Close()
r := ggio.NewDelimitedReader(c, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(c)
for {
var req pb.Request
err := r.ReadMsg(&req)
if err != nil {
if err != io.EOF {
log.Debugf("Error reading message: %s", err.Error())
}
return
}
switch *req.Type {
case pb.Request_CONNECT:
res := d.doConnect(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
return
}
case pb.Request_STREAM_OPEN:
res, s := d.doStreamOpen(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
if s != nil {
s.Reset()
}
return
}
if s != nil {
d.doStreamPipe(c, s)
return
}
case pb.Request_STREAM_HANDLER:
res := d.doStreamHandler(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
return
}
default:
log.Debugf("Unexpected request type: %s", req.Type)
return
}
}
}
func (d *Daemon) doConnect(req *pb.Request) *pb.Response {
ctx, cancel := context.WithTimeout(d.ctx, DefaultTimeout)
defer cancel()
if req.Connect == nil {
return errorResponse(errors.New("Malformed request; missing parameters"))
}
pid, err := peer.IDFromBytes(req.Connect.Peer)
if err != nil {
log.Debugf("Error parsing peer ID: %s", err.Error())
return errorResponse(err)
}
var addrs []ma.Multiaddr
addrs = make([]ma.Multiaddr, len(req.Connect.Addrs))
for x, bs := range req.Connect.Addrs {
addr, err := ma.NewMultiaddrBytes(bs)
if err != nil {
log.Debugf("Error parsing multiaddr: %s", err.Error())
return errorResponse(err)
}
addrs[x] = addr
}
pi := pstore.PeerInfo{ID: pid, Addrs: addrs}
log.Debugf("connecting to %s", pid.Pretty())
err = d.host.Connect(ctx, pi)
if err != nil {
log.Debugf("error opening connection to %s: %s", pid.Pretty(), err.Error())
return errorResponse(err)
}
return okResponse()
}
func (d *Daemon) doStreamOpen(req *pb.Request) (*pb.Response, inet.Stream) {
ctx, cancel := context.WithTimeout(d.ctx, DefaultTimeout)
defer cancel()
if req.StreamOpen == nil {
return errorResponse(errors.New("Malformed request; missing parameters")), nil
}
pid, err := peer.IDFromBytes(req.StreamOpen.Peer)
if err != nil {
log.Debugf("Error parsing peer ID: %s", err.Error())
return errorResponse(err), nil
}
protos := make([]proto.ID, len(req.StreamOpen.Proto))
for x, str := range req.StreamOpen.Proto {
protos[x] = proto.ID(str)
}
log.Debugf("opening stream to %s", pid.Pretty())
s, err := d.host.NewStream(ctx, pid, protos...)
if err != nil {
log.Debugf("Error opening stream to %s: %s", pid.Pretty(), err.Error())
return errorResponse(err), nil
}
return okResponse(), s
}
func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response {
if req.StreamHandler == nil {
return errorResponse(errors.New("Malformed request; missing parameters"))
}
p := proto.ID(*req.StreamHandler.Proto)
d.mx.Lock()
defer d.mx.Unlock()
log.Debugf("set stream handler: %s -> %s", p, *req.StreamHandler.Path)
_, ok := d.handlers[p]
if !ok {
d.host.SetStreamHandler(p, d.handleStream)
}
d.handlers[p] = *req.StreamHandler.Path
return okResponse()
}
func okResponse() *pb.Response {
return &pb.Response{
Type: pb.Response_OK.Enum(),
}
}
func errorResponse(err error) *pb.Response {
errstr := err.Error()
return &pb.Response{
Type: pb.Response_ERROR.Enum(),
Error: &pb.ErrorResponse{Msg: &errstr},
}
}

60
daemon.go Normal file
View File

@ -0,0 +1,60 @@
package p2pd
import (
"context"
"net"
"sync"
logging "github.com/ipfs/go-log"
libp2p "github.com/libp2p/go-libp2p"
host "github.com/libp2p/go-libp2p-host"
proto "github.com/libp2p/go-libp2p-protocol"
)
var log = logging.Logger("p2pd")
type Daemon struct {
ctx context.Context
host host.Host
listener net.Listener
mx sync.Mutex
// stream handlers: map of protocol.ID to unix socket path
handlers map[proto.ID]string
}
func NewDaemon(ctx context.Context, path string, opts ...libp2p.Option) (*Daemon, error) {
h, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}
l, err := net.Listen("unix", path)
if err != nil {
h.Close()
return nil, err
}
d := &Daemon{
ctx: ctx,
host: h,
listener: l,
handlers: make(map[proto.ID]string),
}
go d.listen()
return d, nil
}
func (d *Daemon) listen() {
for {
c, err := d.listener.Accept()
if err != nil {
log.Errorf("error accepting connection: %s", err.Error())
}
log.Debug("incoming connection")
go d.handleConn(c)
}
}

69
package.json Normal file
View File

@ -0,0 +1,69 @@
{
"author": "vyzo",
"bugs": {},
"gx": {
"dvcsimport": "github.com/libp2p/go-libp2p-daemon"
},
"gxDependencies": [
{
"author": "whyrusleeping",
"hash": "QmUEqyXr97aUbNmQADHYNknjwjjdVpJXEt1UZXmSG81EV4",
"name": "go-libp2p",
"version": "6.0.12"
},
{
"author": "whyrusleeping",
"hash": "QmeMYW7Nj8jnnEfs9qhm7SxKkoDPUWXu3MsxX6BFwz34tf",
"name": "go-libp2p-host",
"version": "3.0.9"
},
{
"author": "whyrusleeping",
"hash": "QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN",
"name": "go-libp2p-protocol",
"version": "1.0.0"
},
{
"hash": "Qmbq7kGxgcpALGLPaWDyTa6KUq5kBUKdEvkvPZcBkJoLex",
"name": "go-log",
"version": "1.5.6"
},
{
"author": "whyrusleeping",
"hash": "QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W",
"name": "go-libp2p-peer",
"version": "2.3.7"
},
{
"author": "whyrusleeping",
"hash": "QmZNJyx9GGCX4GeuHnLB8fxaxMLs4MjTjHokxfQcCd6Nve",
"name": "go-libp2p-net",
"version": "3.0.9"
},
{
"author": "multiformats",
"hash": "QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7",
"name": "go-multiaddr",
"version": "1.3.0"
},
{
"author": "whyrusleeping",
"hash": "QmdxUuburamoF6zF9qjeQC4WYcWGbWuRmdLacMEsW8ioD8",
"name": "gogo-protobuf",
"version": "0.0.0"
},
{
"author": "whyrusleeping",
"hash": "Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6",
"name": "go-libp2p-peerstore",
"version": "2.0.0"
}
],
"gxVersion": "0.12.1",
"language": "go",
"license": "",
"name": "go-libp2p-daemon",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.0.0"
}

1909
pb/p2pd.pb.go Normal file

File diff suppressed because it is too large Load Diff

49
pb/p2pd.proto Normal file
View File

@ -0,0 +1,49 @@
package p2pd.pb;
message Request {
enum Type {
CONNECT = 1;
STREAM_OPEN = 2;
STREAM_HANDLER = 3;
}
required Type type = 1;
optional ConnectRequest connect = 2;
optional StreamOpenRequest streamOpen = 3;
optional StreamHandlerRequest streamHandler = 4;
}
message Response {
enum Type {
OK = 1;
ERROR = 2;
}
required Type type = 1;
optional ErrorResponse error = 2;
}
message ConnectRequest {
required bytes peer = 1;
repeated bytes addrs = 2;
}
message StreamOpenRequest {
required bytes peer = 1;
repeated string proto = 2;
}
message StreamHandlerRequest {
required string proto = 1;
required string path = 2;
}
message ErrorResponse {
required string msg = 1;
}
message StreamAccept {
required bytes peer = 1;
required bytes addr = 2;
}

66
stream.go Normal file
View File

@ -0,0 +1,66 @@
package p2pd
import (
"io"
"net"
"sync"
pb "github.com/libp2p/go-libp2p-daemon/pb"
ggio "github.com/gogo/protobuf/io"
inet "github.com/libp2p/go-libp2p-net"
)
func (d *Daemon) doStreamPipe(c net.Conn, s inet.Stream) {
var wg sync.WaitGroup
wg.Add(2)
pipe := func(dst io.Writer, src io.Reader) {
_, err := io.Copy(dst, src)
if err != nil && err != io.EOF {
log.Debugf("stream error: %s", err.Error())
}
wg.Done()
}
go pipe(c, s)
go pipe(s, c)
wg.Wait()
s.Close()
}
func (d *Daemon) handleStream(s inet.Stream) {
defer s.Close()
p := s.Protocol()
d.mx.Lock()
path, ok := d.handlers[p]
d.mx.Unlock()
if !ok {
log.Debugf("unexpected stream: %s", p)
return
}
c, err := net.Dial("unix", path)
if err != nil {
log.Debugf("error dialing handler at %s: %s", path, err.Error())
return
}
defer c.Close()
w := ggio.NewDelimitedWriter(c)
msg := pb.StreamAccept{
Peer: []byte(s.Conn().RemotePeer()),
Addr: s.Conn().RemoteMultiaddr().Bytes(),
}
err = w.WriteMsg(&msg)
if err != nil {
log.Debugf("error accepting stream: %s", err.Error())
return
}
d.doStreamPipe(c, s)
}