diff --git a/go.mod b/go.mod index eb07f318..3245266f 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/spf13/cobra v1.1.3 github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.7.1 - github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929200249-1b6d7e4055e6 + github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930154620-020ef08b264a github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a github.com/stretchr/testify v1.7.0 github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 diff --git a/go.sum b/go.sum index 1dc97563..0ebf9372 100644 --- a/go.sum +++ b/go.sum @@ -948,6 +948,12 @@ github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929193940-d227fbccda4f h1: github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929193940-d227fbccda4f/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE= github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929200249-1b6d7e4055e6 h1:nC2lWJGdmLyGqGeIm97fFt3M/F80eLQbrDPH7BpLZVM= github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929200249-1b6d7e4055e6/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE= +github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930143904-ba3a59f2257f h1:S2XA9ZlgSLBioLjTuCiZEg/txJ+C9tv51Q+BSC0dkiA= +github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930143904-ba3a59f2257f/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE= +github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930145448-58fe0c6241ed h1:SGStabA6aT9+dhTTeJQKbuZ5jFZYmaglU0DkL0nxazE= +github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930145448-58fe0c6241ed/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE= +github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930154620-020ef08b264a h1:/37EOjcoN5Lr8/ZQawuhKfNKF723myJUdlOTDVTciu4= +github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930154620-020ef08b264a/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI= github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= diff --git a/waku/node.go b/waku/node.go index 3476acc9..1ad83393 100644 --- a/waku/node.go +++ b/waku/node.go @@ -18,7 +18,7 @@ import ( dssql "github.com/ipfs/go-ds-sql" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-peerstore/pstoreds" "github.com/multiformats/go-multiaddr" "github.com/spf13/cobra" @@ -32,7 +32,10 @@ import ( "github.com/status-im/go-waku/waku/v2/discovery" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/filter" + "github.com/status-im/go-waku/waku/v2/protocol/lightpush" "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/status-im/go-waku/waku/v2/protocol/store" libp2pdisc "github.com/libp2p/go-libp2p-core/discovery" rendezvous "github.com/status-im/go-libp2p-rendezvous" @@ -68,15 +71,15 @@ var rootCmd = &cobra.Command{ enableWs, _ := cmd.Flags().GetBool("ws") wsPort, _ := cmd.Flags().GetInt("ws-port") wakuRelay, _ := cmd.Flags().GetBool("relay") - wakuFilter, _ := cmd.Flags().GetBool("filter") + enableFilter, _ := cmd.Flags().GetBool("filter") key, _ := cmd.Flags().GetString("nodekey") - store, _ := cmd.Flags().GetBool("store") + enableStore, _ := cmd.Flags().GetBool("store") useDB, _ := cmd.Flags().GetBool("use-db") dbPath, _ := cmd.Flags().GetString("dbpath") storenode, _ := cmd.Flags().GetString("storenode") staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes") filternodes, _ := cmd.Flags().GetStringSlice("filternodes") - lightpush, _ := cmd.Flags().GetBool("lightpush") + enableLightpush, _ := cmd.Flags().GetBool("lightpush") lightpushnodes, _ := cmd.Flags().GetStringSlice("lightpushnodes") topics, _ := cmd.Flags().GetStringSlice("topics") keepAlive, _ := cmd.Flags().GetInt("keep-alive") @@ -88,7 +91,7 @@ var rootCmd = &cobra.Command{ dnsDiscoveryNameServer, _ := cmd.Flags().GetString("dns-discovery-nameserver") peerExchange, _ := cmd.Flags().GetBool("peer-exchange") enableRendezvous, _ := cmd.Flags().GetBool("rendezvous") - rendezvousPeerIds, _ := cmd.Flags().GetStringSlice("rendezvous-nodes") + rendezvousnodes, _ := cmd.Flags().GetStringSlice("rendezvous-nodes") enableRendezvousServer, _ := cmd.Flags().GetBool("rendezvous-server") rendezvousData, _ := cmd.Flags().GetString("rendezvous-data") @@ -161,18 +164,6 @@ var rootCmd = &cobra.Command{ nodeOpts = append(nodeOpts, node.WithWakuRelay(wakurelayopts...)) } - if enableRendezvous && len(rendezvousPeerIds) > 0 { - var peers []peer.ID - for _, r := range rendezvousPeerIds { - peerId, err := peer.Decode(r) - if err != nil { - checkError(err, "Rendezvous") - } - peers = append(peers, peerId) - } - nodeOpts = append(nodeOpts, node.WithRendezvous(peers, pubsub.WithDiscoveryOpts(libp2pdisc.TTL(time.Duration(20)*time.Second)))) - } - if enableRendezvousServer { db, err := leveldb.OpenFile(rendezvousData, &opt.Options{OpenFilesCacheCapacity: 3}) checkError(err, "RendezvousDB") @@ -180,11 +171,11 @@ var rootCmd = &cobra.Command{ nodeOpts = append(nodeOpts, node.WithRendezvousServer(storage)) } - if wakuFilter { + if enableFilter { nodeOpts = append(nodeOpts, node.WithWakuFilter()) } - if store { + if enableStore { nodeOpts = append(nodeOpts, node.WithWakuStore(true, true)) if useDB { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) @@ -195,10 +186,14 @@ var rootCmd = &cobra.Command{ } } - if lightpush { + if enableLightpush { nodeOpts = append(nodeOpts, node.WithLightPush()) } + if enableRendezvous { + nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(45), libp2pdisc.TTL(time.Duration(20)*time.Second)))) + } + wakuNode, err := node.New(ctx, nodeOpts...) checkError(err, "Wakunode") @@ -209,15 +204,28 @@ var rootCmd = &cobra.Command{ checkError(err, "Error subscring to topic") } - if storenode != "" && !store { - checkError(errors.New("Store protocol was not started"), "") + if !enableRendezvous && len(rendezvousnodes) > 0 { + checkError(errors.New("rendezvous protocol was not started"), "") } else { - if storenode != "" { - _, err = wakuNode.AddStorePeer(storenode) - if err != nil { - log.Error("error adding store peer ", err) - } - } + addPeers(wakuNode, rendezvousnodes, rendezvous.RendezvousID_v001) + } + + if storenode != "" && !enableStore { + checkError(errors.New("store protocol was not started"), "") + } else { + addPeers(wakuNode, []string{storenode}, store.StoreID_v20beta3) + } + + if len(lightpushnodes) > 0 && !enableLightpush { + checkError(errors.New("lightpush protocol was not started"), "") + } else { + addPeers(wakuNode, lightpushnodes, lightpush.LightPushID_v20beta1) + } + + if len(filternodes) > 0 && !enableFilter { + checkError(errors.New("filter protocol was not started"), "") + } else { + addPeers(wakuNode, filternodes, filter.FilterID_v20beta1) } if len(staticnodes) > 0 { @@ -263,32 +271,6 @@ var rootCmd = &cobra.Command{ } } - if len(lightpushnodes) > 0 && !lightpush { - checkError(errors.New("LightPush protocol was not started"), "") - } else { - if len(lightpushnodes) > 0 { - for _, n := range lightpushnodes { - go func(node string) { - _, err = wakuNode.AddLightPushPeer(node) - checkError(err, "Error adding lightpush peer") - }(n) - } - } - } - - if len(filternodes) > 0 && !wakuFilter { - checkError(errors.New("WakuFilter protocol was not started"), "") - } else { - if len(filternodes) > 0 { - for _, n := range filternodes { - go func(node string) { - _, err = wakuNode.AddFilterPeer(node) - checkError(err, "Error adding filter peer") - }(n) - } - } - } - // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) @@ -315,6 +297,20 @@ func Execute() { cobra.CheckErr(rootCmd.Execute()) } +func addPeers(wakuNode *node.WakuNode, addresses []string, protocol protocol.ID) { + for _, addrString := range addresses { + if addrString == "" { + continue + } + + addr, err := multiaddr.NewMultiaddr(addrString) + checkError(err, "invalid multiaddress") + + _, err = wakuNode.AddPeer(addr, protocol) + checkError(err, "error adding peer") + } +} + func init() { cobra.OnInitialize(initConfig) @@ -343,7 +339,7 @@ func init() { rootCmd.Flags().Bool("peer-exchange", true, "Enable GossipSub Peer Exchange") rootCmd.Flags().Bool("rendezvous", false, "Enable rendezvous for peer discovery") rootCmd.Flags().String("rendezvous-data", "/tmp/rendevouz", "path where peer info will be stored.") - rootCmd.Flags().StringSlice("rendezvous-nodes", []string{}, "Peer IDs of waku2 rendezvous nodes. Argument may be repeated") + rootCmd.Flags().StringSlice("rendezvous-nodes", []string{}, "Multiaddrs of waku2 rendezvous nodes. Argument may be repeated") rootCmd.Flags().Bool("rendezvous-server", false, "Node will act as rendezvous server") } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0dcb1a8e..ee9969a0 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -265,7 +265,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } if params.enableRendezvous { - rendezvous := rendezvous.NewRendezvousDiscovery(w.host, params.rendezvousPeers) + rendezvous := rendezvous.NewRendezvousDiscovery(w.host) params.wOpts = append(params.wOpts, wakurelay.WithDiscovery(rendezvous, params.rendezvousOpts...)) } @@ -343,13 +343,13 @@ func (w *WakuNode) IsOnline() bool { if !hasRelay && protocol == string(wakurelay.WakuRelayID_v200) { hasRelay = true } - if !hasLightPush && protocol == string(lightpush.WakuLightPushProtocolId) { + if !hasLightPush && protocol == string(lightpush.LightPushID_v20beta1) { hasLightPush = true } - if !hasStore && protocol == string(store.WakuStoreProtocolId) { + if !hasStore && protocol == string(store.StoreID_v20beta3) { hasStore = true } - if !hasFilter && protocol == string(filter.WakuFilterProtocolId) { + if !hasFilter && protocol == string(filter.FilterID_v20beta1) { hasFilter = true } if hasRelay || hasLightPush && (hasStore || hasFilter) { @@ -367,7 +367,7 @@ func (w *WakuNode) HasHistory() bool { for _, v := range w.peers { for _, protocol := range v { - if protocol == string(store.WakuStoreProtocolId) { + if protocol == string(store.StoreID_v20beta3) { return true } } @@ -436,14 +436,6 @@ func (w *WakuNode) mountRendezvous() error { return nil } -func (w *WakuNode) AddPeer(info *peer.AddrInfo, protocolId string) error { - log.Info(fmt.Sprintf("adding peer %s with protocol %s", info.ID.Pretty(), protocolId)) - - w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) - - return w.host.Peerstore().AddProtocols(info.ID, protocolId) -} - func (w *WakuNode) startStore() { peerChan := make(chan *event.EvtPeerConnectednessChanged) w.opts.store.Start(w.ctx, w.host, peerChan) @@ -456,29 +448,20 @@ func (w *WakuNode) startStore() { } } -func (w *WakuNode) addPeerWithProtocol(address string, proto p2pproto.ID) (*peer.ID, error) { - info, err := addrInfoFromMultiaddrString(address) +func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error { + log.Info(fmt.Sprintf("adding peer %s", info.ID.Pretty())) + w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL) + return w.host.Peerstore().AddProtocols(info.ID, string(protocolID)) + +} + +func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.ID, error) { + info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { return nil, err } - return &info.ID, w.AddPeer(info, string(proto)) -} - -func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) { - return w.addPeerWithProtocol(address, store.WakuStoreProtocolId) -} - -func (w *WakuNode) AddRelayPeer(address string) (*peer.ID, error) { - return w.addPeerWithProtocol(address, wakurelay.WakuRelayID_v200) -} - -func (w *WakuNode) AddFilterPeer(address string) (*peer.ID, error) { - return w.addPeerWithProtocol(address, filter.WakuFilterProtocolId) -} - -func (w *WakuNode) AddLightPushPeer(address string) (*peer.ID, error) { - return w.addPeerWithProtocol(address, lightpush.WakuLightPushProtocolId) + return &info.ID, w.addPeer(info, protocolID) } func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) { @@ -908,12 +891,3 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { } }() } - -func addrInfoFromMultiaddrString(address string) (*peer.AddrInfo, error) { - ma, err := ma.NewMultiaddr(address) - if err != nil { - return nil, err - } - - return peer.AddrInfoFromP2pAddr(ma) -} diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 67554b9a..58968dbb 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -8,7 +8,6 @@ import ( "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" - "github.com/libp2p/go-libp2p-core/peer" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr-net" rendezvous "github.com/status-im/go-libp2p-rendezvous" @@ -37,7 +36,6 @@ type WakuNodeParameters struct { enableRendezvous bool enableRendezvousServer bool rendevousStorage rendezvous.Storage - rendezvousPeers []peer.ID rendezvousOpts []wakurelay.DiscoverOpt keepAliveInterval time.Duration @@ -104,10 +102,9 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption { } } -func WithRendezvous(peers []peer.ID, discoverOpts ...wakurelay.DiscoverOpt) WakuNodeOption { +func WithRendezvous(discoverOpts ...wakurelay.DiscoverOpt) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableRendezvous = true - params.rendezvousPeers = peers params.rendezvousOpts = discoverOpts return nil }