2018-09-13 13:47:08 +00:00
|
|
|
package p2pd
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"time"
|
|
|
|
|
2019-05-26 22:58:53 +00:00
|
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
|
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
pb "github.com/libp2p/go-libp2p-daemon/pb"
|
|
|
|
|
|
|
|
ggio "github.com/gogo/protobuf/io"
|
|
|
|
ma "github.com/multiformats/go-multiaddr"
|
|
|
|
)
|
|
|
|
|
|
|
|
const DefaultTimeout = 60 * time.Second
|
|
|
|
|
|
|
|
func (d *Daemon) handleConn(c net.Conn) {
|
|
|
|
defer c.Close()
|
|
|
|
|
2019-05-26 22:58:53 +00:00
|
|
|
r := ggio.NewDelimitedReader(c, network.MessageSizeMax)
|
2018-09-13 13:47:08 +00:00
|
|
|
w := ggio.NewDelimitedWriter(c)
|
|
|
|
|
|
|
|
for {
|
|
|
|
var req pb.Request
|
|
|
|
|
|
|
|
err := r.ReadMsg(&req)
|
|
|
|
if err != nil {
|
|
|
|
if err != io.EOF {
|
|
|
|
log.Debugf("Error reading message: %s", err.Error())
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-09-19 20:45:54 +00:00
|
|
|
log.Debugf("request: %d [%s]", *req.Type, req.Type.String())
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
switch *req.Type {
|
2018-09-15 07:24:13 +00:00
|
|
|
case pb.Request_IDENTIFY:
|
|
|
|
res := d.doIdentify(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
case pb.Request_CONNECT:
|
|
|
|
res := d.doConnect(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
case pb.Request_STREAM_OPEN:
|
|
|
|
res, s := d.doStreamOpen(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
if s != nil {
|
|
|
|
s.Reset()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if s != nil {
|
|
|
|
d.doStreamPipe(c, s)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
case pb.Request_STREAM_HANDLER:
|
|
|
|
res := d.doStreamHandler(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-09-30 08:01:00 +00:00
|
|
|
case pb.Request_DHT:
|
|
|
|
res, ch, cancel := d.doDHT(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
if ch != nil {
|
|
|
|
cancel()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if ch != nil {
|
|
|
|
for res := range ch {
|
|
|
|
err = w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
cancel()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2018-09-30 09:25:26 +00:00
|
|
|
|
2018-09-30 10:00:44 +00:00
|
|
|
err = w.WriteMsg(dhtResponseEnd())
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
2018-09-30 08:01:00 +00:00
|
|
|
}
|
|
|
|
|
2018-09-30 12:34:19 +00:00
|
|
|
case pb.Request_LIST_PEERS:
|
|
|
|
res := d.doListPeers(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-10-28 12:21:30 +00:00
|
|
|
case pb.Request_CONNMANAGER:
|
|
|
|
res := d.doConnManager(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-11-16 12:01:42 +00:00
|
|
|
case pb.Request_DISCONNECT:
|
|
|
|
res := d.doDisconnect(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-12-03 09:25:18 +00:00
|
|
|
case pb.Request_PUBSUB:
|
|
|
|
res, sub := d.doPubsub(&req)
|
|
|
|
err := w.WriteMsg(res)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error writing response: %s", err.Error())
|
|
|
|
if sub != nil {
|
|
|
|
sub.Cancel()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if sub != nil {
|
|
|
|
d.doPubsubPipe(sub, r, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
default:
|
2018-09-30 08:01:00 +00:00
|
|
|
log.Debugf("Unexpected request type: %d", *req.Type)
|
2018-09-13 13:47:08 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-15 07:24:13 +00:00
|
|
|
func (d *Daemon) doIdentify(req *pb.Request) *pb.Response {
|
2018-09-15 07:33:00 +00:00
|
|
|
id := []byte(d.ID())
|
|
|
|
addrs := d.Addrs()
|
2018-09-15 07:24:13 +00:00
|
|
|
baddrs := make([][]byte, len(addrs))
|
|
|
|
for x, addr := range addrs {
|
|
|
|
baddrs[x] = addr.Bytes()
|
|
|
|
}
|
|
|
|
|
|
|
|
res := okResponse()
|
|
|
|
res.Identify = &pb.IdentifyResponse{Id: id, Addrs: baddrs}
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
func (d *Daemon) doConnect(req *pb.Request) *pb.Response {
|
|
|
|
if req.Connect == nil {
|
2018-09-30 08:46:19 +00:00
|
|
|
return errorResponseString("Malformed request; missing parameters")
|
2018-09-13 13:47:08 +00:00
|
|
|
}
|
|
|
|
|
2018-11-23 07:48:36 +00:00
|
|
|
ctx, cancel := d.requestContext(req.Connect.GetTimeout())
|
|
|
|
defer cancel()
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
pid, err := peer.IDFromBytes(req.Connect.Peer)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error parsing peer ID: %s", err.Error())
|
|
|
|
return errorResponse(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var addrs []ma.Multiaddr
|
|
|
|
addrs = make([]ma.Multiaddr, len(req.Connect.Addrs))
|
|
|
|
for x, bs := range req.Connect.Addrs {
|
|
|
|
addr, err := ma.NewMultiaddrBytes(bs)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error parsing multiaddr: %s", err.Error())
|
|
|
|
return errorResponse(err)
|
|
|
|
}
|
|
|
|
addrs[x] = addr
|
|
|
|
}
|
|
|
|
|
2019-05-26 22:58:53 +00:00
|
|
|
pi := peer.AddrInfo{ID: pid, Addrs: addrs}
|
2018-09-13 13:47:08 +00:00
|
|
|
|
|
|
|
log.Debugf("connecting to %s", pid.Pretty())
|
|
|
|
err = d.host.Connect(ctx, pi)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("error opening connection to %s: %s", pid.Pretty(), err.Error())
|
|
|
|
return errorResponse(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return okResponse()
|
|
|
|
}
|
|
|
|
|
2018-11-16 12:01:42 +00:00
|
|
|
func (d *Daemon) doDisconnect(req *pb.Request) *pb.Response {
|
|
|
|
if req.Disconnect == nil {
|
|
|
|
return errorResponseString("Malformed request; missing parameters")
|
|
|
|
}
|
|
|
|
|
|
|
|
p, err := peer.IDFromBytes(req.Disconnect.GetPeer())
|
|
|
|
if err != nil {
|
|
|
|
return errorResponse(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = d.host.Network().ClosePeer(p)
|
|
|
|
if err != nil {
|
|
|
|
return errorResponse(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return okResponse()
|
|
|
|
}
|
|
|
|
|
2019-05-26 22:58:53 +00:00
|
|
|
func (d *Daemon) doStreamOpen(req *pb.Request) (*pb.Response, network.Stream) {
|
2018-09-13 13:47:08 +00:00
|
|
|
if req.StreamOpen == nil {
|
2018-09-30 08:46:19 +00:00
|
|
|
return errorResponseString("Malformed request; missing parameters"), nil
|
2018-09-13 13:47:08 +00:00
|
|
|
}
|
|
|
|
|
2018-11-23 07:48:36 +00:00
|
|
|
ctx, cancel := d.requestContext(req.StreamOpen.GetTimeout())
|
|
|
|
defer cancel()
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
pid, err := peer.IDFromBytes(req.StreamOpen.Peer)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error parsing peer ID: %s", err.Error())
|
|
|
|
return errorResponse(err), nil
|
|
|
|
}
|
|
|
|
|
2019-05-26 22:58:53 +00:00
|
|
|
protos := make([]protocol.ID, len(req.StreamOpen.Proto))
|
2018-09-13 13:47:08 +00:00
|
|
|
for x, str := range req.StreamOpen.Proto {
|
2019-05-26 22:58:53 +00:00
|
|
|
protos[x] = protocol.ID(str)
|
2018-09-13 13:47:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
log.Debugf("opening stream to %s", pid.Pretty())
|
|
|
|
s, err := d.host.NewStream(ctx, pid, protos...)
|
|
|
|
if err != nil {
|
|
|
|
log.Debugf("Error opening stream to %s: %s", pid.Pretty(), err.Error())
|
|
|
|
return errorResponse(err), nil
|
|
|
|
}
|
|
|
|
|
2018-09-13 14:26:58 +00:00
|
|
|
res := okResponse()
|
|
|
|
res.StreamInfo = makeStreamInfo(s)
|
|
|
|
return res, s
|
2018-09-13 13:47:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response {
|
|
|
|
if req.StreamHandler == nil {
|
2018-09-30 08:46:19 +00:00
|
|
|
return errorResponseString("Malformed request; missing parameters")
|
2018-09-13 13:47:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
d.mx.Lock()
|
|
|
|
defer d.mx.Unlock()
|
|
|
|
|
2018-12-06 22:16:50 +00:00
|
|
|
maddr, err := ma.NewMultiaddrBytes(req.StreamHandler.Addr)
|
|
|
|
if err != nil {
|
2018-12-12 17:02:11 +00:00
|
|
|
return errorResponse(err)
|
2018-12-06 22:16:50 +00:00
|
|
|
}
|
2018-09-19 20:45:54 +00:00
|
|
|
for _, sp := range req.StreamHandler.Proto {
|
2019-05-26 22:58:53 +00:00
|
|
|
p := protocol.ID(sp)
|
2018-09-13 14:07:01 +00:00
|
|
|
_, ok := d.handlers[p]
|
|
|
|
if !ok {
|
|
|
|
d.host.SetStreamHandler(p, d.handleStream)
|
|
|
|
}
|
2018-12-11 17:20:12 +00:00
|
|
|
log.Debugf("set stream handler: %s -> %s", sp, maddr.String())
|
2018-12-06 22:16:50 +00:00
|
|
|
d.handlers[p] = maddr
|
2018-09-13 13:47:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return okResponse()
|
|
|
|
}
|
|
|
|
|
2018-09-30 12:34:19 +00:00
|
|
|
func (d *Daemon) doListPeers(req *pb.Request) *pb.Response {
|
|
|
|
conns := d.host.Network().Conns()
|
|
|
|
peers := make([]*pb.PeerInfo, len(conns))
|
|
|
|
for x, conn := range conns {
|
|
|
|
peers[x] = &pb.PeerInfo{
|
|
|
|
Id: []byte(conn.RemotePeer()),
|
|
|
|
Addrs: [][]byte{conn.RemoteMultiaddr().Bytes()},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
res := okResponse()
|
|
|
|
res.Peers = peers
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
2018-11-23 07:48:36 +00:00
|
|
|
func (d *Daemon) requestContext(utime int64) (context.Context, func()) {
|
|
|
|
timeout := DefaultTimeout
|
|
|
|
if utime > 0 {
|
2018-11-29 08:28:52 +00:00
|
|
|
timeout = time.Duration(utime) * time.Second
|
2018-11-23 07:48:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return context.WithTimeout(d.ctx, timeout)
|
|
|
|
}
|
|
|
|
|
2018-09-13 13:47:08 +00:00
|
|
|
func okResponse() *pb.Response {
|
|
|
|
return &pb.Response{
|
|
|
|
Type: pb.Response_OK.Enum(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func errorResponse(err error) *pb.Response {
|
|
|
|
errstr := err.Error()
|
|
|
|
return &pb.Response{
|
|
|
|
Type: pb.Response_ERROR.Enum(),
|
|
|
|
Error: &pb.ErrorResponse{Msg: &errstr},
|
|
|
|
}
|
|
|
|
}
|
2018-09-13 14:26:58 +00:00
|
|
|
|
2018-09-30 08:46:19 +00:00
|
|
|
func errorResponseString(err string) *pb.Response {
|
|
|
|
return &pb.Response{
|
|
|
|
Type: pb.Response_ERROR.Enum(),
|
|
|
|
Error: &pb.ErrorResponse{Msg: &err},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-05-26 22:58:53 +00:00
|
|
|
func makeStreamInfo(s network.Stream) *pb.StreamInfo {
|
2018-09-13 14:26:58 +00:00
|
|
|
proto := string(s.Protocol())
|
|
|
|
return &pb.StreamInfo{
|
|
|
|
Peer: []byte(s.Conn().RemotePeer()),
|
|
|
|
Addr: s.Conn().RemoteMultiaddr().Bytes(),
|
|
|
|
Proto: &proto,
|
|
|
|
}
|
|
|
|
}
|