pubsub scaffolding

This commit is contained in:
vyzo 2018-12-03 11:25:18 +02:00
parent 3f5df08262
commit ea65999040
3 changed files with 75 additions and 1 deletions

16
conn.go
View File

@ -130,6 +130,22 @@ func (d *Daemon) handleConn(c net.Conn) {
return
}
case pb.Request_PUBSUB:
res, sub := d.doPubsub(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugf("Error writing response: %s", err.Error())
if sub != nil {
sub.Cancel()
}
return
}
if sub != nil {
d.doPubsubPipe(sub, r, w)
return
}
default:
log.Debugf("Unexpected request type: %d", *req.Type)
return

View File

@ -12,6 +12,7 @@ import (
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
peer "github.com/libp2p/go-libp2p-peer"
proto "github.com/libp2p/go-libp2p-protocol"
ps "github.com/libp2p/go-libp2p-pubsub"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
)
@ -23,7 +24,8 @@ type Daemon struct {
host host.Host
listener net.Listener
dht *dht.IpfsDHT
dht *dht.IpfsDHT
pubsub *ps.PubSub
mx sync.Mutex
// stream handlers: map of protocol.ID to unix socket path

56
pubsub.go Normal file
View File

@ -0,0 +1,56 @@
package p2pd
import (
pb "github.com/libp2p/go-libp2p-daemon/pb"
ggio "github.com/gogo/protobuf/io"
ps "github.com/libp2p/go-libp2p-pubsub"
)
func (d *Daemon) doPubsub(req *pb.Request) (*pb.Response, *ps.Subscription) {
if d.pubsub == nil {
return errorResponseString("PubSub not enabled"), nil
}
if req.Pubsub == nil {
return errorResponseString("Malformed request; missing parameters"), nil
}
switch *req.Pubsub.Type {
case pb.PSRequest_GET_TOPICS:
return d.doPubsubGetTopics(req.Pubsub)
case pb.PSRequest_LIST_PEERS:
return d.doPubsubListPeers(req.Pubsub)
case pb.PSRequest_PUBLISH:
return d.doPubsubPublish(req.Pubsub)
case pb.PSRequest_SUBSCRIBE:
return d.doPubsubSubscribe(req.Pubsub)
default:
log.Debugf("Unexpected pubsub request type: %d", *req.Pubsub.Type)
return errorResponseString("Unexpected request"), nil
}
}
func (d *Daemon) doPubsubGetTopics(req *pb.PSRequest) (*pb.Response, *ps.Subscription) {
return nil, nil
}
func (d *Daemon) doPubsubListPeers(req *pb.PSRequest) (*pb.Response, *ps.Subscription) {
return nil, nil
}
func (d *Daemon) doPubsubPublish(req *pb.PSRequest) (*pb.Response, *ps.Subscription) {
return nil, nil
}
func (d *Daemon) doPubsubSubscribe(req *pb.PSRequest) (*pb.Response, *ps.Subscription) {
return nil, nil
}
func (d *Daemon) doPubsubPipe(sub *ps.Subscription, r ggio.ReadCloser, w ggio.WriteCloser) {
}