diff --git a/conn.go b/conn.go index c0c58c1..fb7b0e3 100644 --- a/conn.go +++ b/conn.go @@ -130,6 +130,22 @@ func (d *Daemon) handleConn(c net.Conn) { return } + case pb.Request_PUBSUB: + res, sub := d.doPubsub(&req) + err := w.WriteMsg(res) + if err != nil { + log.Debugf("Error writing response: %s", err.Error()) + if sub != nil { + sub.Cancel() + } + return + } + + if sub != nil { + d.doPubsubPipe(sub, r, w) + return + } + default: log.Debugf("Unexpected request type: %d", *req.Type) return diff --git a/daemon.go b/daemon.go index 6e95e1c..2728054 100644 --- a/daemon.go +++ b/daemon.go @@ -12,6 +12,7 @@ import ( dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts" peer "github.com/libp2p/go-libp2p-peer" proto "github.com/libp2p/go-libp2p-protocol" + ps "github.com/libp2p/go-libp2p-pubsub" rhost "github.com/libp2p/go-libp2p/p2p/host/routed" ma "github.com/multiformats/go-multiaddr" ) @@ -23,7 +24,8 @@ type Daemon struct { host host.Host listener net.Listener - dht *dht.IpfsDHT + dht *dht.IpfsDHT + pubsub *ps.PubSub mx sync.Mutex // stream handlers: map of protocol.ID to unix socket path diff --git a/pubsub.go b/pubsub.go new file mode 100644 index 0000000..8aaafb2 --- /dev/null +++ b/pubsub.go @@ -0,0 +1,56 @@ +package p2pd + +import ( + pb "github.com/libp2p/go-libp2p-daemon/pb" + + ggio "github.com/gogo/protobuf/io" + ps "github.com/libp2p/go-libp2p-pubsub" +) + +func (d *Daemon) doPubsub(req *pb.Request) (*pb.Response, *ps.Subscription) { + if d.pubsub == nil { + return errorResponseString("PubSub not enabled"), nil + } + + if req.Pubsub == nil { + return errorResponseString("Malformed request; missing parameters"), nil + } + + switch *req.Pubsub.Type { + case pb.PSRequest_GET_TOPICS: + return d.doPubsubGetTopics(req.Pubsub) + + case pb.PSRequest_LIST_PEERS: + return d.doPubsubListPeers(req.Pubsub) + + case pb.PSRequest_PUBLISH: + return d.doPubsubPublish(req.Pubsub) + + case pb.PSRequest_SUBSCRIBE: + return d.doPubsubSubscribe(req.Pubsub) + + default: + log.Debugf("Unexpected pubsub request type: %d", *req.Pubsub.Type) + return errorResponseString("Unexpected request"), nil + } +} + +func (d *Daemon) doPubsubGetTopics(req *pb.PSRequest) (*pb.Response, *ps.Subscription) { + return nil, nil +} + +func (d *Daemon) doPubsubListPeers(req *pb.PSRequest) (*pb.Response, *ps.Subscription) { + return nil, nil +} + +func (d *Daemon) doPubsubPublish(req *pb.PSRequest) (*pb.Response, *ps.Subscription) { + return nil, nil +} + +func (d *Daemon) doPubsubSubscribe(req *pb.PSRequest) (*pb.Response, *ps.Subscription) { + return nil, nil +} + +func (d *Daemon) doPubsubPipe(sub *ps.Subscription, r ggio.ReadCloser, w ggio.WriteCloser) { + +}