From 57e36021bb9aabb1e76aa89fb903d3ce9fc87352 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sun, 18 Apr 2021 19:41:42 -0400 Subject: [PATCH] Use options pattern for starting a waku node --- waku/node.go | 24 +++-- waku/v2/node/wakunode2.go | 111 +++++++++++----------- waku/v2/node/wakuoptions.go | 99 +++++++++++++++++++ waku/v2/protocol/waku_store/waku_store.go | 13 ++- 4 files changed, 177 insertions(+), 70 deletions(-) create mode 100644 waku/v2/node/wakuoptions.go diff --git a/waku/node.go b/waku/node.go index abcc88c5..96d4d3fb 100644 --- a/waku/node.go +++ b/waku/node.go @@ -88,23 +88,27 @@ var rootCmd = &cobra.Command{ peerStore, err := pstoreds.NewPeerstore(ctx, datastore, opts) checkError(err, "Peerstore") - wakuNode, err := node.New(ctx, prvKey, []net.Addr{hostAddr}, libp2p.Peerstore(peerStore)) - checkError(err, "Wakunode") + nodeOpts := []node.WakuNodeOption{ + node.WithPrivateKey(prvKey), + node.WithHostAddress([]net.Addr{hostAddr}), + node.WithLibP2POptions(append(node.DefaultLibP2POptions, libp2p.Peerstore(peerStore))...), + } if relay { - wakuNode.MountRelay() + nodeOpts = append(nodeOpts, node.WithWakuRelay()) } if store { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) checkError(err, "DBStore") - - err = wakuNode.MountStore(true, dbStore) - checkError(err, "Error mounting store") - - wakuNode.StartStore() + nodeOpts = append(nodeOpts, node.WithWakuStore(true)) + nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } + wakuNode, err := node.New(ctx, nodeOpts...) + + checkError(err, "Wakunode") + for _, t := range topics { nodeTopic := node.Topic(t) _, err := wakuNode.Subscribe(&nodeTopic) @@ -115,7 +119,9 @@ var rootCmd = &cobra.Command{ checkError(errors.New("Store protocol was not started"), "") } else { if storenode != "" { - wakuNode.AddStorePeer(storenode) + _, err = wakuNode.AddStorePeer(storenode) + checkError(err, "Error adding store peer") + } } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index cab6f410..ca2f27ff 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -2,10 +2,8 @@ package node import ( "context" - "crypto/ecdsa" "errors" "fmt" - "net" "sync" "time" @@ -13,12 +11,9 @@ import ( proto "github.com/golang/protobuf/proto" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" - connmgr "github.com/libp2p/go-libp2p-connmgr" - "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr-net" common "github.com/status-im/go-waku/waku/common" "github.com/status-im/go-waku/waku/v2/protocol" store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" @@ -38,11 +33,9 @@ type Message []byte type WakuNode struct { host host.Host + opts *WakuNodeParameters pubsub *wakurelay.PubSub - store *store.WakuStore - isStore bool - topics map[Topic]bool topicsMutex sync.Mutex wakuRelayTopics map[Topic]*wakurelay.Topic @@ -53,41 +46,35 @@ type WakuNode struct { bcaster Broadcaster relaySubs map[Topic]*wakurelay.Subscription - ctx context.Context - cancel context.CancelFunc - privKey crypto.PrivKey + ctx context.Context + cancel context.CancelFunc } -func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, opts ...libp2p.Option) (*WakuNode, error) { - // Creates a Waku Node. - if hostAddr == nil { - return nil, errors.New("host address cannot be null") - } - - var multiAddresses []ma.Multiaddr - for _, addr := range hostAddr { - hostAddrMA, err := manet.FromNetAddr(addr) - if err != nil { - return nil, err - } - multiAddresses = append(multiAddresses, hostAddrMA) - } - - nodeKey := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) - - opts = append(opts, - libp2p.ListenAddrs(multiAddresses...), - libp2p.Identity(nodeKey), - libp2p.DefaultTransports, - libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. - libp2p.EnableNATService(), // TODO: is this needed?) - libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)), // ? - ) +func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { + params := new(WakuNodeParameters) ctx, cancel := context.WithCancel(ctx) _ = cancel - host, err := libp2p.New(ctx, opts...) + params.ctx = ctx + params.libP2POpts = DefaultLibP2POptions + + for _, opt := range opts { + err := opt(params) + if err != nil { + return nil, err + } + } + + if len(params.multiAddr) > 0 { + params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...)) + } + + if params.privKey != nil { + params.libP2POpts = append(params.libP2POpts, libp2p.Identity(*params.privKey)) + } + + host, err := libp2p.New(ctx, params.libP2POpts...) if err != nil { return nil, err } @@ -97,12 +84,26 @@ func New(ctx context.Context, privKey *ecdsa.PrivateKey, hostAddr []net.Addr, op w.pubsub = nil w.host = host w.cancel = cancel - w.privKey = nodeKey w.ctx = ctx w.topics = make(map[Topic]bool) w.wakuRelayTopics = make(map[Topic]*wakurelay.Topic) w.relaySubs = make(map[Topic]*wakurelay.Subscription) w.subscriptions = make(map[Topic][]*Subscription) + w.opts = params + + if params.enableRelay { + err := w.mountRelay(params.wOpts...) + if err != nil { + return nil, err + } + } + + if params.enableStore { + err := w.startStore() + if err != nil { + return nil, err + } + } for _, addr := range w.ListenAddresses() { log.Info("Listening on ", addr) @@ -150,7 +151,7 @@ func (w *WakuNode) SetPubSub(pubSub *wakurelay.PubSub) { w.pubsub = pubSub } -func (w *WakuNode) MountRelay(opts ...wakurelay.Option) error { +func (w *WakuNode) mountRelay(opts ...wakurelay.Option) error { ps, err := wakurelay.NewWakuRelaySub(w.ctx, w.host, opts...) if err != nil { return err @@ -160,31 +161,23 @@ func (w *WakuNode) MountRelay(opts ...wakurelay.Option) error { // TODO: filters // TODO: rlnRelay + log.Info("Relay protocol started") + return nil } -func (w *WakuNode) MountStore(isStore bool, s store.MessageProvider) error { - w.store = store.NewWakuStore(w.ctx, w.host, s) - w.isStore = isStore - return nil -} - -func (w *WakuNode) StartStore() error { - if w.store == nil { - return errors.New("WakuStore is not set") - } - +func (w *WakuNode) startStore() error { _, err := w.Subscribe(nil) if err != nil { return err } - w.store.Start() + w.opts.store.Start(w.host) return nil } func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { - if w.store == nil { + if w.opts.store == nil { return nil, errors.New("WakuStore is not set") } @@ -199,11 +192,11 @@ func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { return nil, err } - return &info.ID, w.store.AddPeer(info.ID, info.Addrs) + return &info.ID, w.opts.store.AddPeer(info.ID, info.Addrs) } func (w *WakuNode) Query(contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*protocol.HistoryResponse, error) { - if w.store == nil { + if w.opts.store == nil { return nil, errors.New("WakuStore is not set") } @@ -212,7 +205,7 @@ func (w *WakuNode) Query(contentTopics []string, startTime float64, endTime floa query.StartTime = startTime query.EndTime = endTime query.PagingInfo = new(protocol.PagingInfo) - result, err := w.store.Query(query, opts...) + result, err := w.opts.store.Query(query, opts...) if err != nil { return nil, err } @@ -324,9 +317,11 @@ func (node *WakuNode) upsertSubscription(topic Topic) (*wakurelay.Subscription, } node.relaySubs[topic] = sub - if node.store != nil && node.isStore { - log.Info("Subscribing store to ", topic) - node.bcaster.Register(node.store.MsgC) + log.Info("Subscribing to topic ", topic) + + if node.opts.store != nil && node.opts.storeMsgs { + log.Info("Subscribing store to topic ", topic) + node.bcaster.Register(node.opts.store.MsgC) } } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go new file mode 100644 index 00000000..6dc00bd4 --- /dev/null +++ b/waku/v2/node/wakuoptions.go @@ -0,0 +1,99 @@ +package node + +import ( + "context" + "crypto/ecdsa" + "net" + + "github.com/libp2p/go-libp2p" + connmgr "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-core/crypto" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr-net" + store "github.com/status-im/go-waku/waku/v2/protocol/waku_store" + wakurelay "github.com/status-im/go-wakurelay-pubsub" +) + +type WakuNodeParameters struct { + multiAddr []ma.Multiaddr + privKey *crypto.PrivKey + libP2POpts []libp2p.Option + + enableRelay bool + wOpts []wakurelay.Option + + enableStore bool + storeMsgs bool + store *store.WakuStore + + ctx context.Context +} + +type WakuNodeOption func(*WakuNodeParameters) error + +func WithHostAddress(hostAddr []net.Addr) WakuNodeOption { + return func(params *WakuNodeParameters) error { + var multiAddresses []ma.Multiaddr + for _, addr := range hostAddr { + hostAddrMA, err := manet.FromNetAddr(addr) + if err != nil { + return err + } + multiAddresses = append(multiAddresses, hostAddrMA) + } + + params.multiAddr = multiAddresses + + return nil + } +} + +func WithPrivateKey(privKey *ecdsa.PrivateKey) WakuNodeOption { + return func(params *WakuNodeParameters) error { + privk := crypto.PrivKey((*crypto.Secp256k1PrivateKey)(privKey)) + params.privKey = &privk + return nil + } +} + +func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.libP2POpts = opts + return nil + } +} + +func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableRelay = true + params.wOpts = opts + return nil + } +} + +func WithWakuStore(shouldStoreMessages bool) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.enableStore = true + params.storeMsgs = shouldStoreMessages + params.store = store.NewWakuStore(params.ctx, nil) + return nil + } +} + +func WithMessageProvider(s store.MessageProvider) WakuNodeOption { + return func(params *WakuNodeParameters) error { + if params.store != nil { + params.store.SetMsgProvider(s) + } else { + params.store = store.NewWakuStore(params.ctx, s) + } + return nil + } +} + +var DefaultLibP2POptions = []libp2p.Option{ + libp2p.DefaultTransports, + libp2p.NATPortMap(), // Attempt to open ports using uPNP for NATed hosts. + libp2p.EnableNATService(), // TODO: is this needed?) + libp2p.ConnectionManager(connmgr.NewConnManager(200, 300, 0)), +} diff --git a/waku/v2/protocol/waku_store/waku_store.go b/waku/v2/protocol/waku_store/waku_store.go index 82f2e505..2f188196 100644 --- a/waku/v2/protocol/waku_store/waku_store.go +++ b/waku/v2/protocol/waku_store/waku_store.go @@ -187,17 +187,22 @@ type WakuStore struct { ctx context.Context } -func NewWakuStore(ctx context.Context, h host.Host, p MessageProvider) *WakuStore { +func NewWakuStore(ctx context.Context, p MessageProvider) *WakuStore { wakuStore := new(WakuStore) wakuStore.MsgC = make(chan *common.Envelope) wakuStore.msgProvider = p - wakuStore.h = h wakuStore.ctx = ctx return wakuStore } -func (store *WakuStore) Start() { +func (store *WakuStore) SetMsgProvider(p MessageProvider) { + store.msgProvider = p +} + +func (store *WakuStore) Start(h host.Host) { + store.h = h + if store.msgProvider == nil { return } @@ -219,6 +224,8 @@ func (store *WakuStore) Start() { store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx}) } + log.Info("Store protocol started") + go store.storeIncomingMessages() }