refactor: rendezvous nodes are added to peer store

- No need to specify peerIDs in command line flag
- Rendezvous nodes are selected automatically instead of passing them via waku option
This commit is contained in:
Richard Ramos 2021-09-30 12:01:53 -04:00
parent 034656b2c0
commit a58db1656d
5 changed files with 74 additions and 101 deletions

2
go.mod
View File

@ -30,7 +30,7 @@ require (
github.com/spf13/cobra v1.1.3
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.7.1
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929200249-1b6d7e4055e6
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930154620-020ef08b264a
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954

6
go.sum
View File

@ -948,6 +948,12 @@ github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929193940-d227fbccda4f h1:
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929193940-d227fbccda4f/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929200249-1b6d7e4055e6 h1:nC2lWJGdmLyGqGeIm97fFt3M/F80eLQbrDPH7BpLZVM=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210929200249-1b6d7e4055e6/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930143904-ba3a59f2257f h1:S2XA9ZlgSLBioLjTuCiZEg/txJ+C9tv51Q+BSC0dkiA=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930143904-ba3a59f2257f/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930145448-58fe0c6241ed h1:SGStabA6aT9+dhTTeJQKbuZ5jFZYmaglU0DkL0nxazE=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930145448-58fe0c6241ed/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930154620-020ef08b264a h1:/37EOjcoN5Lr8/ZQawuhKfNKF723myJUdlOTDVTciu4=
github.com/status-im/go-libp2p-rendezvous v0.0.0-20210930154620-020ef08b264a/go.mod h1:Q/GA4TCj4QLJCO02GN4O1CgkuTkn8oKbQSn1+yratvE=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a h1:eCna/q/PuZVqtmOMBqytw9yzZwMNKpao4au0OJDvesI=
github.com/status-im/go-wakurelay-pubsub v0.4.3-0.20210729162817-adc68830282a/go.mod h1:LSCVYR7mnBBsxVJghrGpQ3yJAAATEe6XeQQqGCZhwrE=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=

View File

@ -18,7 +18,7 @@ import (
dssql "github.com/ipfs/go-ds-sql"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
"github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
@ -32,7 +32,10 @@ import (
"github.com/status-im/go-waku/waku/v2/discovery"
"github.com/status-im/go-waku/waku/v2/node"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
libp2pdisc "github.com/libp2p/go-libp2p-core/discovery"
rendezvous "github.com/status-im/go-libp2p-rendezvous"
@ -68,15 +71,15 @@ var rootCmd = &cobra.Command{
enableWs, _ := cmd.Flags().GetBool("ws")
wsPort, _ := cmd.Flags().GetInt("ws-port")
wakuRelay, _ := cmd.Flags().GetBool("relay")
wakuFilter, _ := cmd.Flags().GetBool("filter")
enableFilter, _ := cmd.Flags().GetBool("filter")
key, _ := cmd.Flags().GetString("nodekey")
store, _ := cmd.Flags().GetBool("store")
enableStore, _ := cmd.Flags().GetBool("store")
useDB, _ := cmd.Flags().GetBool("use-db")
dbPath, _ := cmd.Flags().GetString("dbpath")
storenode, _ := cmd.Flags().GetString("storenode")
staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes")
filternodes, _ := cmd.Flags().GetStringSlice("filternodes")
lightpush, _ := cmd.Flags().GetBool("lightpush")
enableLightpush, _ := cmd.Flags().GetBool("lightpush")
lightpushnodes, _ := cmd.Flags().GetStringSlice("lightpushnodes")
topics, _ := cmd.Flags().GetStringSlice("topics")
keepAlive, _ := cmd.Flags().GetInt("keep-alive")
@ -88,7 +91,7 @@ var rootCmd = &cobra.Command{
dnsDiscoveryNameServer, _ := cmd.Flags().GetString("dns-discovery-nameserver")
peerExchange, _ := cmd.Flags().GetBool("peer-exchange")
enableRendezvous, _ := cmd.Flags().GetBool("rendezvous")
rendezvousPeerIds, _ := cmd.Flags().GetStringSlice("rendezvous-nodes")
rendezvousnodes, _ := cmd.Flags().GetStringSlice("rendezvous-nodes")
enableRendezvousServer, _ := cmd.Flags().GetBool("rendezvous-server")
rendezvousData, _ := cmd.Flags().GetString("rendezvous-data")
@ -161,18 +164,6 @@ var rootCmd = &cobra.Command{
nodeOpts = append(nodeOpts, node.WithWakuRelay(wakurelayopts...))
}
if enableRendezvous && len(rendezvousPeerIds) > 0 {
var peers []peer.ID
for _, r := range rendezvousPeerIds {
peerId, err := peer.Decode(r)
if err != nil {
checkError(err, "Rendezvous")
}
peers = append(peers, peerId)
}
nodeOpts = append(nodeOpts, node.WithRendezvous(peers, pubsub.WithDiscoveryOpts(libp2pdisc.TTL(time.Duration(20)*time.Second))))
}
if enableRendezvousServer {
db, err := leveldb.OpenFile(rendezvousData, &opt.Options{OpenFilesCacheCapacity: 3})
checkError(err, "RendezvousDB")
@ -180,11 +171,11 @@ var rootCmd = &cobra.Command{
nodeOpts = append(nodeOpts, node.WithRendezvousServer(storage))
}
if wakuFilter {
if enableFilter {
nodeOpts = append(nodeOpts, node.WithWakuFilter())
}
if store {
if enableStore {
nodeOpts = append(nodeOpts, node.WithWakuStore(true, true))
if useDB {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
@ -195,10 +186,14 @@ var rootCmd = &cobra.Command{
}
}
if lightpush {
if enableLightpush {
nodeOpts = append(nodeOpts, node.WithLightPush())
}
if enableRendezvous {
nodeOpts = append(nodeOpts, node.WithRendezvous(pubsub.WithDiscoveryOpts(libp2pdisc.Limit(45), libp2pdisc.TTL(time.Duration(20)*time.Second))))
}
wakuNode, err := node.New(ctx, nodeOpts...)
checkError(err, "Wakunode")
@ -209,15 +204,28 @@ var rootCmd = &cobra.Command{
checkError(err, "Error subscring to topic")
}
if storenode != "" && !store {
checkError(errors.New("Store protocol was not started"), "")
if !enableRendezvous && len(rendezvousnodes) > 0 {
checkError(errors.New("rendezvous protocol was not started"), "")
} else {
if storenode != "" {
_, err = wakuNode.AddStorePeer(storenode)
if err != nil {
log.Error("error adding store peer ", err)
}
}
addPeers(wakuNode, rendezvousnodes, rendezvous.RendezvousID_v001)
}
if storenode != "" && !enableStore {
checkError(errors.New("store protocol was not started"), "")
} else {
addPeers(wakuNode, []string{storenode}, store.StoreID_v20beta3)
}
if len(lightpushnodes) > 0 && !enableLightpush {
checkError(errors.New("lightpush protocol was not started"), "")
} else {
addPeers(wakuNode, lightpushnodes, lightpush.LightPushID_v20beta1)
}
if len(filternodes) > 0 && !enableFilter {
checkError(errors.New("filter protocol was not started"), "")
} else {
addPeers(wakuNode, filternodes, filter.FilterID_v20beta1)
}
if len(staticnodes) > 0 {
@ -263,32 +271,6 @@ var rootCmd = &cobra.Command{
}
}
if len(lightpushnodes) > 0 && !lightpush {
checkError(errors.New("LightPush protocol was not started"), "")
} else {
if len(lightpushnodes) > 0 {
for _, n := range lightpushnodes {
go func(node string) {
_, err = wakuNode.AddLightPushPeer(node)
checkError(err, "Error adding lightpush peer")
}(n)
}
}
}
if len(filternodes) > 0 && !wakuFilter {
checkError(errors.New("WakuFilter protocol was not started"), "")
} else {
if len(filternodes) > 0 {
for _, n := range filternodes {
go func(node string) {
_, err = wakuNode.AddFilterPeer(node)
checkError(err, "Error adding filter peer")
}(n)
}
}
}
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
@ -315,6 +297,20 @@ func Execute() {
cobra.CheckErr(rootCmd.Execute())
}
func addPeers(wakuNode *node.WakuNode, addresses []string, protocol protocol.ID) {
for _, addrString := range addresses {
if addrString == "" {
continue
}
addr, err := multiaddr.NewMultiaddr(addrString)
checkError(err, "invalid multiaddress")
_, err = wakuNode.AddPeer(addr, protocol)
checkError(err, "error adding peer")
}
}
func init() {
cobra.OnInitialize(initConfig)
@ -343,7 +339,7 @@ func init() {
rootCmd.Flags().Bool("peer-exchange", true, "Enable GossipSub Peer Exchange")
rootCmd.Flags().Bool("rendezvous", false, "Enable rendezvous for peer discovery")
rootCmd.Flags().String("rendezvous-data", "/tmp/rendevouz", "path where peer info will be stored.")
rootCmd.Flags().StringSlice("rendezvous-nodes", []string{}, "Peer IDs of waku2 rendezvous nodes. Argument may be repeated")
rootCmd.Flags().StringSlice("rendezvous-nodes", []string{}, "Multiaddrs of waku2 rendezvous nodes. Argument may be repeated")
rootCmd.Flags().Bool("rendezvous-server", false, "Node will act as rendezvous server")
}

View File

@ -265,7 +265,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
if params.enableRendezvous {
rendezvous := rendezvous.NewRendezvousDiscovery(w.host, params.rendezvousPeers)
rendezvous := rendezvous.NewRendezvousDiscovery(w.host)
params.wOpts = append(params.wOpts, wakurelay.WithDiscovery(rendezvous, params.rendezvousOpts...))
}
@ -343,13 +343,13 @@ func (w *WakuNode) IsOnline() bool {
if !hasRelay && protocol == string(wakurelay.WakuRelayID_v200) {
hasRelay = true
}
if !hasLightPush && protocol == string(lightpush.WakuLightPushProtocolId) {
if !hasLightPush && protocol == string(lightpush.LightPushID_v20beta1) {
hasLightPush = true
}
if !hasStore && protocol == string(store.WakuStoreProtocolId) {
if !hasStore && protocol == string(store.StoreID_v20beta3) {
hasStore = true
}
if !hasFilter && protocol == string(filter.WakuFilterProtocolId) {
if !hasFilter && protocol == string(filter.FilterID_v20beta1) {
hasFilter = true
}
if hasRelay || hasLightPush && (hasStore || hasFilter) {
@ -367,7 +367,7 @@ func (w *WakuNode) HasHistory() bool {
for _, v := range w.peers {
for _, protocol := range v {
if protocol == string(store.WakuStoreProtocolId) {
if protocol == string(store.StoreID_v20beta3) {
return true
}
}
@ -436,14 +436,6 @@ func (w *WakuNode) mountRendezvous() error {
return nil
}
func (w *WakuNode) AddPeer(info *peer.AddrInfo, protocolId string) error {
log.Info(fmt.Sprintf("adding peer %s with protocol %s", info.ID.Pretty(), protocolId))
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
return w.host.Peerstore().AddProtocols(info.ID, protocolId)
}
func (w *WakuNode) startStore() {
peerChan := make(chan *event.EvtPeerConnectednessChanged)
w.opts.store.Start(w.ctx, w.host, peerChan)
@ -456,29 +448,20 @@ func (w *WakuNode) startStore() {
}
}
func (w *WakuNode) addPeerWithProtocol(address string, proto p2pproto.ID) (*peer.ID, error) {
info, err := addrInfoFromMultiaddrString(address)
func (w *WakuNode) addPeer(info *peer.AddrInfo, protocolID p2pproto.ID) error {
log.Info(fmt.Sprintf("adding peer %s", info.ID.Pretty()))
w.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
return w.host.Peerstore().AddProtocols(info.ID, string(protocolID))
}
func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.ID, error) {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return nil, err
}
return &info.ID, w.AddPeer(info, string(proto))
}
func (w *WakuNode) AddStorePeer(address string) (*peer.ID, error) {
return w.addPeerWithProtocol(address, store.WakuStoreProtocolId)
}
func (w *WakuNode) AddRelayPeer(address string) (*peer.ID, error) {
return w.addPeerWithProtocol(address, wakurelay.WakuRelayID_v200)
}
func (w *WakuNode) AddFilterPeer(address string) (*peer.ID, error) {
return w.addPeerWithProtocol(address, filter.WakuFilterProtocolId)
}
func (w *WakuNode) AddLightPushPeer(address string) (*peer.ID, error) {
return w.addPeerWithProtocol(address, lightpush.WakuLightPushProtocolId)
return &info.ID, w.addPeer(info, protocolID)
}
func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) {
@ -908,12 +891,3 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}
}()
}
func addrInfoFromMultiaddrString(address string) (*peer.AddrInfo, error) {
ma, err := ma.NewMultiaddr(address)
if err != nil {
return nil, err
}
return peer.AddrInfoFromP2pAddr(ma)
}

View File

@ -8,7 +8,6 @@ import (
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
rendezvous "github.com/status-im/go-libp2p-rendezvous"
@ -37,7 +36,6 @@ type WakuNodeParameters struct {
enableRendezvous bool
enableRendezvousServer bool
rendevousStorage rendezvous.Storage
rendezvousPeers []peer.ID
rendezvousOpts []wakurelay.DiscoverOpt
keepAliveInterval time.Duration
@ -104,10 +102,9 @@ func WithWakuRelay(opts ...wakurelay.Option) WakuNodeOption {
}
}
func WithRendezvous(peers []peer.ID, discoverOpts ...wakurelay.DiscoverOpt) WakuNodeOption {
func WithRendezvous(discoverOpts ...wakurelay.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableRendezvous = true
params.rendezvousPeers = peers
params.rendezvousOpts = discoverOpts
return nil
}