go-libp2p-daemon/daemon.go

200 lines
3.9 KiB
Go
Raw Normal View History

package p2pd
import (
"context"
2018-12-03 13:03:59 +00:00
"fmt"
2019-05-26 22:58:53 +00:00
2019-04-23 22:34:22 +00:00
"os"
"sync"
2019-05-26 22:58:53 +00:00
"github.com/libp2p/go-libp2p-daemon/config"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
multierror "github.com/hashicorp/go-multierror"
logging "github.com/ipfs/go-log"
2019-02-07 09:55:20 +00:00
autonat "github.com/libp2p/go-libp2p-autonat-svc"
2018-12-11 19:35:02 +00:00
dht "github.com/libp2p/go-libp2p-kad-dht"
dhtopts "github.com/libp2p/go-libp2p-kad-dht/opts"
2018-12-03 09:25:18 +00:00
ps "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr"
2018-12-11 19:35:02 +00:00
manet "github.com/multiformats/go-multiaddr-net"
)
var log = logging.Logger("p2pd")
type Daemon struct {
ctx context.Context
host host.Host
listener manet.Listener
2019-02-07 09:55:20 +00:00
dht *dht.IpfsDHT
pubsub *ps.PubSub
autonat *autonat.AutoNATService
2018-09-30 08:01:00 +00:00
mx sync.Mutex
// stream handlers: map of protocol.ID to multi-address
2019-05-26 22:58:53 +00:00
handlers map[protocol.ID]ma.Multiaddr
2019-05-03 09:05:43 +00:00
// closed is set when the daemon is shutting down
closed bool
}
func NewDaemon(ctx context.Context, maddr ma.Multiaddr, dhtMode string, opts ...libp2p.Option) (*Daemon, error) {
d := &Daemon{
2019-02-04 23:31:47 +00:00
ctx: ctx,
2019-05-26 22:58:53 +00:00
handlers: make(map[protocol.ID]ma.Multiaddr),
}
if dhtMode != "" {
var dhtOpts []dhtopts.Option
if dhtMode == config.DHTClientMode {
dhtOpts = append(dhtOpts, dhtopts.Client(true))
}
2019-01-04 19:58:59 +00:00
opts = append(opts, libp2p.Routing(d.DHTRoutingFactory(dhtOpts)))
}
h, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}
d.host = h
l, err := manet.Listen(maddr)
if err != nil {
h.Close()
return nil, err
}
2019-01-04 19:54:42 +00:00
d.listener = l
go d.listen()
2019-02-14 16:26:05 +00:00
go d.trapSignals()
return d, nil
}
2019-02-07 22:51:27 +00:00
func (d *Daemon) Listener() manet.Listener {
return d.listener
}
func (d *Daemon) DHTRoutingFactory(opts []dhtopts.Option) func(host.Host) (routing.PeerRouting, error) {
makeRouting := func(h host.Host) (routing.PeerRouting, error) {
dhtInst, err := dht.New(d.ctx, h, opts...)
if err != nil {
return nil, err
}
d.dht = dhtInst
return dhtInst, nil
2018-09-30 12:13:42 +00:00
}
return makeRouting
2018-09-30 12:13:42 +00:00
}
2018-12-03 13:03:59 +00:00
func (d *Daemon) EnablePubsub(router string, sign, strict bool) error {
var opts []ps.Option
2019-05-02 17:23:38 +00:00
if !sign {
opts = append(opts, ps.WithMessageSigning(false))
} else if !strict {
opts = append(opts, ps.WithStrictSignatureVerification(false))
2018-12-03 13:03:59 +00:00
}
switch router {
case "floodsub":
pubsub, err := ps.NewFloodSub(d.ctx, d.host, opts...)
if err != nil {
return err
}
d.pubsub = pubsub
return nil
case "gossipsub":
pubsub, err := ps.NewGossipSub(d.ctx, d.host, opts...)
if err != nil {
return err
}
d.pubsub = pubsub
return nil
default:
return fmt.Errorf("unknown pubsub router: %s", router)
}
}
2019-02-07 09:55:20 +00:00
func (d *Daemon) EnableAutoNAT(opts ...libp2p.Option) error {
svc, err := autonat.NewAutoNATService(d.ctx, d.host, opts...)
d.autonat = svc
return err
}
func (d *Daemon) ID() peer.ID {
return d.host.ID()
}
func (d *Daemon) Addrs() []ma.Multiaddr {
return d.host.Addrs()
}
func (d *Daemon) listen() {
for {
2019-05-03 09:05:43 +00:00
if d.isClosed() {
return
}
c, err := d.listener.Accept()
if err != nil {
log.Errorf("error accepting connection: %s", err.Error())
2019-05-03 09:05:43 +00:00
continue
}
log.Debug("incoming connection")
go d.handleConn(c)
}
}
2019-04-23 22:34:22 +00:00
2019-05-03 09:05:43 +00:00
func (d *Daemon) isClosed() bool {
d.mx.Lock()
defer d.mx.Unlock()
return d.closed
}
2019-04-24 14:21:02 +00:00
func clearUnixSockets(path ma.Multiaddr) error {
2019-04-24 14:28:44 +00:00
c, _ := ma.SplitFirst(path)
if c.Protocol().Code != ma.P_UNIX {
return nil
}
if err := os.Remove(c.Value()); err != nil {
return err
2019-04-23 22:45:07 +00:00
}
2019-04-24 16:27:56 +00:00
2019-04-23 22:45:07 +00:00
return nil
}
2019-04-23 22:34:22 +00:00
func (d *Daemon) Close() error {
2019-05-03 09:05:43 +00:00
d.mx.Lock()
d.closed = true
d.mx.Unlock()
2019-04-24 16:32:39 +00:00
var merr *multierror.Error
2019-04-23 22:34:22 +00:00
if err := d.host.Close(); err != nil {
2019-04-24 16:32:39 +00:00
merr = multierror.Append(err)
2019-04-23 22:34:22 +00:00
}
2019-04-24 14:21:02 +00:00
listenAddr := d.listener.Multiaddr()
2019-04-23 22:34:22 +00:00
if err := d.listener.Close(); err != nil {
2019-04-24 16:32:39 +00:00
merr = multierror.Append(merr, err)
2019-04-23 22:34:22 +00:00
}
2019-04-24 14:21:02 +00:00
if err := clearUnixSockets(listenAddr); err != nil {
2019-04-24 16:32:39 +00:00
merr = multierror.Append(merr, err)
2019-04-23 22:45:07 +00:00
}
2019-04-24 16:57:00 +00:00
return merr.ErrorOrNil()
2019-04-23 22:34:22 +00:00
}