From 2c3566377a817303238312e1cd56ebf3f4c42aea Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Sun, 16 Apr 2023 20:04:12 -0400 Subject: [PATCH] refactor: inject host on node start --- waku/v2/discovery_connector.go | 8 +- waku/v2/discv5/discover.go | 8 +- waku/v2/discv5/discover_test.go | 9 ++- waku/v2/node/service.go | 2 + waku/v2/node/wakunode2.go | 74 +++++++++++-------- waku/v2/node/wakuoptions.go | 8 +- waku/v2/node/wakuoptions_test.go | 2 +- waku/v2/protocol/filter/client.go | 8 +- waku/v2/protocol/filter/filter_test.go | 15 ++-- waku/v2/protocol/filter/server.go | 8 +- waku/v2/protocol/legacy_filter/waku_filter.go | 8 +- .../legacy_filter/waku_filter_test.go | 12 ++- waku/v2/protocol/lightpush/waku_lightpush.go | 8 +- .../protocol/lightpush/waku_lightpush_test.go | 16 ++-- waku/v2/protocol/noise/pairing_test.go | 3 +- waku/v2/protocol/peer_exchange/protocol.go | 8 +- .../peer_exchange/waku_peer_exchange_test.go | 12 ++- waku/v2/protocol/relay/waku_relay.go | 8 +- waku/v2/protocol/relay/waku_relay_test.go | 3 +- waku/v2/protocol/rln/rln_relay_test.go | 3 +- waku/v2/protocol/store/waku_resume_test.go | 20 +++-- waku/v2/protocol/store/waku_store_common.go | 3 +- .../store/waku_store_persistence_test.go | 2 +- waku/v2/protocol/store/waku_store_protocol.go | 7 ++ .../store/waku_store_protocol_test.go | 27 ++++--- .../protocol/store/waku_store_query_test.go | 16 ++-- waku/v2/rendezvous/rendezvous.go | 8 +- waku/v2/rendezvous/rendezvous_test.go | 9 ++- waku/v2/rpc/admin_test.go | 3 +- waku/v2/rpc/filter_test.go | 6 +- 30 files changed, 212 insertions(+), 112 deletions(-) diff --git a/waku/v2/discovery_connector.go b/waku/v2/discovery_connector.go index 6213ce46..e22232e2 100644 --- a/waku/v2/discovery_connector.go +++ b/waku/v2/discovery_connector.go @@ -46,7 +46,7 @@ type PeerConnectionStrategy struct { // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have // backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer -func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { +func NewPeerConnectionStrategy(cacheSize int, minPeers int, dialTimeout time.Duration, backoff backoff.BackoffFactory, logger *zap.Logger) (*PeerConnectionStrategy, error) { cache, err := lru.New2Q(cacheSize) if err != nil { return nil, err @@ -54,7 +54,6 @@ func NewPeerConnectionStrategy(h host.Host, cacheSize int, minPeers int, dialTim return &PeerConnectionStrategy{ cache: cache, - host: h, wg: sync.WaitGroup{}, minPeers: minPeers, dialTimeout: dialTimeout, @@ -73,6 +72,11 @@ func (c *PeerConnectionStrategy) PeerChannel() chan<- peer.AddrInfo { return c.peerCh } +// Sets the host to be able to mount or consume a protocol +func (c *PeerConnectionStrategy) SetHost(h host.Host) { + c.host = h +} + // Start attempts to connect to the peers passed in by peerCh. Will not connect to peers if they are within the backoff period. func (c *PeerConnectionStrategy) Start(ctx context.Context) error { if c.cancel != nil { diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 21178596..788dbedb 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -87,7 +87,7 @@ type PeerConnector interface { PeerChannel() chan<- peer.AddrInfo } -func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { +func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConnector PeerConnector, log *zap.Logger, opts ...DiscoveryV5Option) (*DiscoveryV5, error) { params := new(discV5Parameters) optList := DefaultOptions() optList = append(optList, opts...) @@ -103,7 +103,6 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc } return &DiscoveryV5{ - host: host, peerConnector: peerConnector, params: params, NAT: NAT, @@ -161,6 +160,11 @@ func (d *DiscoveryV5) listen(ctx context.Context) error { return nil } +// Sets the host to be able to mount or consume a protocol +func (d *DiscoveryV5) SetHost(h host.Host) { + d.host = h +} + func (d *DiscoveryV5) Start(ctx context.Context) error { d.Lock() defer d.Unlock() diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index 92f7f004..a06792ff 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -106,8 +106,9 @@ func TestDiscV5(t *testing.T) { l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) peerconn1 := tests.NewTestPeerDiscoverer() - d1, err := NewDiscoveryV5(host1, prvKey1, l1, peerconn1, utils.Logger(), WithUDPPort(uint(udpPort1))) + d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, utils.Logger(), WithUDPPort(uint(udpPort1))) require.NoError(t, err) + d1.SetHost(host1) // H2 host2, _, prvKey2 := createHost(t) @@ -117,8 +118,9 @@ func TestDiscV5(t *testing.T) { l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) peerconn2 := tests.NewTestPeerDiscoverer() - d2, err := NewDiscoveryV5(host2, prvKey2, l2, peerconn2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) + d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) + d2.SetHost(host2) // H3 host3, _, prvKey3 := createHost(t) @@ -128,8 +130,9 @@ func TestDiscV5(t *testing.T) { l3, err := newLocalnode(prvKey3, ip3, udpPort3, utils.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) peerconn3 := tests.NewTestPeerDiscoverer() - d3, err := NewDiscoveryV5(host3, prvKey3, l3, peerconn3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) + d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) + d3.SetHost(host3) defer d1.Stop() defer d2.Stop() diff --git a/waku/v2/node/service.go b/waku/v2/node/service.go index ef0e95e5..dd66e860 100644 --- a/waku/v2/node/service.go +++ b/waku/v2/node/service.go @@ -3,11 +3,13 @@ package node import ( "context" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" ) type Service interface { + SetHost(h host.Host) Start(ctx context.Context) error Stop() } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 44af2116..deb251c6 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -114,7 +114,7 @@ type WakuNode struct { } func defaultStoreFactory(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.opts.messageProvider, w.timesource, w.log) + return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log) } // New is used to instantiate a WakuNode using a set of WakuNodeOptions @@ -167,19 +167,15 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { params.libP2POpts = append(params.libP2POpts, libp2p.AddrsFactory(params.addressFactory)) } - host, err := libp2p.New(params.libP2POpts...) - if err != nil { - return nil, err - } + var err error w := new(WakuNode) w.bcaster = v2.NewBroadcaster(1024) - w.host = host w.opts = params w.log = params.logger.Named("node2") w.wg = &sync.WaitGroup{} w.keepAliveFails = make(map[peer.ID]int) - w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLefacyFilter, w.opts.enableStore, w.opts.enableRelay) + w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay) if params.enableNTP { w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log) @@ -197,7 +193,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { rngSrc := rand.NewSource(rand.Int63()) minBackoff, maxBackoff := time.Second*30, time.Hour bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - w.peerConnector, err = v2.NewPeerConnectionStrategy(host, cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log) + w.peerConnector, err = v2.NewPeerConnectionStrategy(cacheSize, w.opts.discoveryMinPeers, network.DialPeerTimeout, bkf, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } @@ -209,7 +205,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } - w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.peerConnector, w.log) + w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.peerConnector, w.log) if err != nil { return nil, err } @@ -223,12 +219,12 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { rendezvousPoints = append(rendezvousPoints, peerID) } - w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log) - w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) - w.legacyFilter = legacy_filter.NewWakuFilter(w.host, w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) - w.filterFullnode = filter.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) - w.filterLightnode = filter.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log) - w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log) + w.rendezvous = rendezvous.NewRendezvous(w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log) + w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) + w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...) + w.filterFullnode = filter.NewWakuFilterFullnode(w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.timesource, w.log) + w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.log) if params.storeFactory != nil { w.storeFactory = params.storeFactory @@ -236,18 +232,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.storeFactory = defaultStoreFactory } - if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { - return nil, err - } - - if w.identificationEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)); err != nil { - return nil, err - } - - if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil { - return nil, err - } - if params.connStatusC != nil { w.connStatusChan = params.connStatusC } @@ -296,6 +280,25 @@ func (w *WakuNode) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) w.cancel = cancel + host, err := libp2p.New(w.opts.libP2POpts...) + if err != nil { + return err + } + + w.host = host + + if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { + return err + } + + if w.identificationEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)); err != nil { + return err + } + + if w.addressChangesSub, err = host.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated)); err != nil { + return err + } + w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log) w.host.Network().Notify(w.connectionNotif) @@ -306,7 +309,7 @@ func (w *WakuNode) Start(ctx context.Context) error { go w.watchMultiaddressChanges(ctx) go w.watchENRChanges(ctx) - err := w.bcaster.Start(ctx) + err = w.bcaster.Start(ctx) if err != nil { return err } @@ -316,6 +319,7 @@ func (w *WakuNode) Start(ctx context.Context) error { go w.startKeepAlive(ctx, w.opts.keepAliveInterval) } + w.peerConnector.SetHost(host) err = w.peerConnector.Start(ctx) if err != nil { return err @@ -328,6 +332,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } } + w.relay.SetHost(host) if w.opts.enableRelay { err := w.relay.Start(ctx) if err != nil { @@ -345,6 +350,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } w.store = w.storeFactory(w) + w.store.SetHost(host) if w.opts.enableStore { err := w.startStore(ctx) if err != nil { @@ -355,13 +361,15 @@ func (w *WakuNode) Start(ctx context.Context) error { w.bcaster.Register(nil, w.store.MessageChannel()) } + w.lightPush.SetHost(host) if w.opts.enableLightPush { if err := w.lightPush.Start(ctx); err != nil { return err } } - if w.opts.enableLefacyFilter { + w.legacyFilter.SetHost(host) + if w.opts.enableLegacyFilter { err := w.legacyFilter.Start(ctx) if err != nil { return err @@ -371,7 +379,8 @@ func (w *WakuNode) Start(ctx context.Context) error { w.bcaster.Register(nil, w.legacyFilter.MessageChannel()) } - if w.opts.enableFilterFullnode { + w.filterFullnode.SetHost(host) + if w.opts.enableFilterFullNode { err := w.filterFullnode.Start(ctx) if err != nil { return err @@ -381,6 +390,7 @@ func (w *WakuNode) Start(ctx context.Context) error { w.bcaster.Register(nil, w.filterFullnode.MessageChannel()) } + w.filterLightnode.SetHost(host) if w.opts.enableFilterLightNode { err := w.filterLightnode.Start(ctx) if err != nil { @@ -393,6 +403,7 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } + w.peerExchange.SetHost(host) if w.opts.enablePeerExchange { err := w.peerExchange.Start(ctx) if err != nil { @@ -400,6 +411,7 @@ func (w *WakuNode) Start(ctx context.Context) error { } } + w.rendezvous.SetHost(host) if w.opts.enableRendezvousServer || w.opts.enableRendezvous { err := w.rendezvous.Start(ctx) if err != nil { @@ -624,7 +636,7 @@ func (w *WakuNode) mountDiscV5() error { } var err error - w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...) + w.discoveryV5, err = discv5.NewDiscoveryV5(w.opts.privKey, w.localNode, w.peerConnector, w.log, discV5Options...) return err } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 25d3bedb..b762ebbe 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -66,10 +66,10 @@ type WakuNodeParameters struct { noDefaultWakuTopic bool enableRelay bool - enableLefacyFilter bool + enableLegacyFilter bool isLegacyFilterFullnode bool enableFilterLightNode bool - enableFilterFullnode bool + enableFilterFullNode bool legacyFilterOpts []legacy_filter.Option filterOpts []filter.Option wOpts []pubsub.Option @@ -323,7 +323,7 @@ func WithPeerExchange() WakuNodeOption { // accepts a list of WakuFilter gossipsub options to setup the protocol func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.enableLefacyFilter = true + params.enableLegacyFilter = true params.isLegacyFilterFullnode = fullnode params.legacyFilterOpts = filterOpts return nil @@ -342,7 +342,7 @@ func WithWakuFilterLightNode() WakuNodeOption { // This WakuNodeOption accepts a list of WakuFilter options to setup the protocol func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { - params.enableFilterFullnode = true + params.enableFilterFullNode = true params.filterOpts = filterOpts return nil } diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index aa08e3a0..ab7b228c 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -28,7 +28,7 @@ func TestWakuOptions(t *testing.T) { require.NoError(t, err) storeFactory := func(w *WakuNode) store.Store { - return store.NewWakuStore(w.host, w.opts.messageProvider, w.timesource, w.log) + return store.NewWakuStore(w.opts.messageProvider, w.timesource, w.log) } options := []WakuNodeOption{ diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index e30bc4c6..9baacfaa 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -55,17 +55,21 @@ type WakuFilterPushResult struct { } // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterLightnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { +func NewWakuFilterLightnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode { wf := new(WakuFilterLightnode) wf.log = log.Named("filterv2-lightnode") wf.broadcaster = broadcaster wf.timesource = timesource wf.wg = &sync.WaitGroup{} - wf.h = host return wf } +// Sets the host to be able to mount or consume a protocol +func (wf *WakuFilterLightnode) SetHost(h host.Host) { + wf.h = h +} + func (wf *WakuFilterLightnode) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index adc95c0b..a1245102 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -25,7 +25,8 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) @@ -44,7 +45,8 @@ func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) { b := v2.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) - filterPush := NewWakuFilterLightnode(host, b, timesource.NewDefaultClock(), utils.Logger()) + filterPush := NewWakuFilterLightnode(b, timesource.NewDefaultClock(), utils.Logger()) + filterPush.SetHost(host) err = filterPush.Start(context.Background()) require.NoError(t, err) @@ -77,7 +79,8 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger()) + node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger()) + node2Filter.SetHost(host2) err := node2Filter.Start(ctx) require.NoError(t, err) @@ -166,7 +169,8 @@ func TestSubscriptionPing(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger()) + node2Filter := NewWakuFilterFullnode(broadcaster, timesource.NewDefaultClock(), utils.Logger()) + node2Filter.SetHost(host2) err := node2Filter.Start(ctx) require.NoError(t, err) @@ -208,7 +212,8 @@ func TestWakuFilterPeerFailure(t *testing.T) { broadcaster2 := v2.NewBroadcaster(10) require.NoError(t, broadcaster2.Start(context.Background())) - node2Filter := NewWakuFilterFullnode(host2, broadcaster2, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) + node2Filter := NewWakuFilterFullnode(broadcaster2, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second)) + node2Filter.SetHost(host2) err := node2Filter.Start(ctx) require.NoError(t, err) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 60c5ab2a..0a1a0766 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -44,7 +44,7 @@ type ( ) // NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { +func NewWakuFilterFullnode(broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode { wf := new(WakuFilterFullNode) wf.log = log.Named("filterv2-fullnode") @@ -56,13 +56,17 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc } wf.wg = &sync.WaitGroup{} - wf.h = host wf.subscriptions = NewSubscribersMap(params.Timeout) wf.maxSubscriptions = params.MaxSubscribers return wf } +// Sets the host to be able to mount or consume a protocol +func (wf *WakuFilterFullNode) SetHost(h host.Host) { + wf.h = h +} + func (wf *WakuFilterFullNode) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 8d10008b..820c96bc 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -66,7 +66,7 @@ type ( const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter { +func NewWakuFilter(broadcaster v2.Broadcaster, isFullNode bool, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilter { wf := new(WakuFilter) wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode)) @@ -78,7 +78,6 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool, } wf.wg = &sync.WaitGroup{} - wf.h = host wf.isFullNode = isFullNode wf.filters = NewFilterMap(broadcaster, timesource) wf.subscribers = NewSubscribers(params.Timeout) @@ -86,6 +85,11 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool, return wf } +// Sets the host to be able to mount or consume a protocol +func (wf *WakuFilter) SetHost(h host.Host) { + wf.h = h +} + func (wf *WakuFilter) Start(ctx context.Context) error { wf.wg.Wait() // Wait for any goroutines to stop diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go index dc1cd9df..002d75b3 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_test.go @@ -24,7 +24,8 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) @@ -43,7 +44,8 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { b := v2.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) - filter := NewWakuFilter(host, b, false, timesource.NewDefaultClock(), utils.Logger()) + filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), utils.Logger()) + filter.SetHost(host) err = filter.Start(context.Background()) require.NoError(t, err) @@ -76,7 +78,8 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilter(host2, broadcaster, true, timesource.NewDefaultClock(), utils.Logger()) + node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), utils.Logger()) + node2Filter.SetHost(host2) err := node2Filter.Start(ctx) require.NoError(t, err) @@ -167,7 +170,8 @@ func TestWakuFilterPeerFailure(t *testing.T) { broadcaster2 := v2.NewBroadcaster(10) require.NoError(t, broadcaster2.Start(context.Background())) - node2Filter := NewWakuFilter(host2, broadcaster2, true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second)) + node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), utils.Logger(), WithTimeout(3*time.Second)) + node2Filter.SetHost(host2) err := node2Filter.Start(ctx) require.NoError(t, err) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 7ecd7a3e..179f4191 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -36,15 +36,19 @@ type WakuLightPush struct { } // NewWakuRelay returns a new instance of Waku Lightpush struct -func NewWakuLightPush(h host.Host, relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush { +func NewWakuLightPush(relay *relay.WakuRelay, log *zap.Logger) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay - wakuLP.h = h wakuLP.log = log.Named("lightpush") return wakuLP } +// Sets the host to be able to mount or consume a protocol +func (wakuLP *WakuLightPush) SetHost(h host.Host) { + wakuLP.h = h +} + // Start inits the lighpush protocol func (wakuLP *WakuLightPush) Start(ctx context.Context) error { if wakuLP.relayIsNotAvailable() { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index ce7adece..dc84da79 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -28,7 +28,8 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri b := v2.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) - relay := relay.NewWakuRelay(host, b, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) require.NoError(t, err) err = relay.Start(context.Background()) require.NoError(t, err) @@ -61,7 +62,8 @@ func TestWakuLightPush(t *testing.T) { defer sub2.Unsubscribe() ctx := context.Background() - lightPushNode2 := NewWakuLightPush(host2, node2, utils.Logger()) + lightPushNode2 := NewWakuLightPush(node2, utils.Logger()) + lightPushNode2.SetHost(host2) err := lightPushNode2.Start(ctx) require.NoError(t, err) defer lightPushNode2.Stop() @@ -71,7 +73,8 @@ func TestWakuLightPush(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(clientHost, nil, utils.Logger()) + client := NewWakuLightPush(nil, utils.Logger()) + client.SetHost(clientHost) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200) @@ -127,7 +130,8 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(clientHost, nil, utils.Logger()) + client := NewWakuLightPush(nil, utils.Logger()) + client.SetHost(clientHost) err = client.Start(ctx) require.Errorf(t, err, "relay is required") @@ -141,8 +145,8 @@ func TestWakuLightPushNoPeers(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(clientHost, nil, utils.Logger()) - + client := NewWakuLightPush(nil, utils.Logger()) + client.SetHost(clientHost) _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", 0), testTopic) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/noise/pairing_test.go b/waku/v2/protocol/noise/pairing_test.go index 406f2687..506f9f46 100644 --- a/waku/v2/protocol/noise/pairing_test.go +++ b/waku/v2/protocol/noise/pairing_test.go @@ -28,7 +28,8 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) { b := v2.NewBroadcaster(1024) require.NoError(t, b.Start(context.Background())) - relay := relay.NewWakuRelay(host, b, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 23a7ea40..24c9293a 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -62,9 +62,8 @@ type PeerConnector interface { } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct -func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) { +func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, log *zap.Logger) (*WakuPeerExchange, error) { wakuPX := new(WakuPeerExchange) - wakuPX.h = h wakuPX.disc = disc wakuPX.log = log.Named("wakupx") wakuPX.enrCache = make(map[enode.ID]peerRecord) @@ -73,6 +72,11 @@ func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, peerConnector Pe return wakuPX, nil } +// Sets the host to be able to mount or consume a protocol +func (wakuPX *WakuPeerExchange) SetHost(h host.Host) { + wakuPX.h = h +} + // Start inits the peer exchange protocol func (wakuPX *WakuPeerExchange) Start(ctx context.Context) error { if wakuPX.cancel != nil { diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index d306a879..b4c53014 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -106,8 +106,9 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { l1, err := newLocalnode(prvKey1, ip1, udpPort1, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) discv5PeerConn1 := tests.NewTestPeerDiscoverer() - d1, err := discv5.NewDiscoveryV5(host1, prvKey1, l1, discv5PeerConn1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) + d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) + d1.SetHost(host1) // H2 host2, _, prvKey2 := createHost(t) @@ -117,8 +118,9 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { l2, err := newLocalnode(prvKey2, ip2, udpPort2, utils.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) discv5PeerConn2 := tests.NewTestPeerDiscoverer() - d2, err := discv5.NewDiscoveryV5(host2, prvKey2, l2, discv5PeerConn2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) + d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err) + d2.SetHost(host2) // H3 host3, _, _ := createHost(t) @@ -139,12 +141,14 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { // mount peer exchange pxPeerConn1 := tests.NewTestPeerDiscoverer() - px1, err := NewWakuPeerExchange(host1, d1, pxPeerConn1, utils.Logger()) + px1, err := NewWakuPeerExchange(d1, pxPeerConn1, utils.Logger()) require.NoError(t, err) + px1.SetHost(host1) pxPeerConn3 := tests.NewTestPeerDiscoverer() - px3, err := NewWakuPeerExchange(host3, nil, pxPeerConn3, utils.Logger()) + px3, err := NewWakuPeerExchange(nil, pxPeerConn3, utils.Logger()) require.NoError(t, err) + px3.SetHost(host3) err = px1.Start(context.Background()) require.NoError(t, err) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 452300df..9233840c 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -61,9 +61,8 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { } // NewWakuRelay returns a new instance of a WakuRelay struct -func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { +func NewWakuRelay(bcaster v2.Broadcaster, minPeersToPublish int, timesource timesource.Timesource, log *zap.Logger, opts ...pubsub.Option) *WakuRelay { w := new(WakuRelay) - w.host = h w.timesource = timesource w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.relaySubs = make(map[string]*pubsub.Subscription) @@ -96,6 +95,11 @@ func NewWakuRelay(h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, ti return w } +// Sets the host to be able to mount or consume a protocol +func (w *WakuRelay) SetHost(h host.Host) { + w.host = h +} + func (w *WakuRelay) Start(ctx context.Context) error { w.wg.Wait() ctx, cancel := context.WithCancel(ctx) diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 8568e32f..06c2b19e 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -22,7 +22,8 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := NewWakuRelay(nil, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/protocol/rln/rln_relay_test.go b/waku/v2/protocol/rln/rln_relay_test.go index bf7c47cc..f92970ba 100644 --- a/waku/v2/protocol/rln/rln_relay_test.go +++ b/waku/v2/protocol/rln/rln_relay_test.go @@ -35,7 +35,8 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() { host, err := tests.MakeHost(context.Background(), port, rand.Reader) s.Require().NoError(err) - relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) err = relay.Start(context.Background()) s.Require().NoError(err) defer relay.Stop() diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 706355fa..f0fa19e0 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -22,7 +22,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), utils.GetUnixEpoch(), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), utils.GetUnixEpoch(), "test") - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(msg1) _ = s.storeMessage(msg3) _ = s.storeMessage(msg5) @@ -42,7 +42,8 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -62,7 +63,8 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) defer s2.Stop() @@ -99,7 +101,8 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -112,7 +115,8 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) defer s2.Stop() @@ -138,7 +142,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -151,7 +156,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) diff --git a/waku/v2/protocol/store/waku_store_common.go b/waku/v2/protocol/store/waku_store_common.go index 9ea77e57..728e69c1 100644 --- a/waku/v2/protocol/store/waku_store_common.go +++ b/waku/v2/protocol/store/waku_store_common.go @@ -63,10 +63,9 @@ type WakuStore struct { } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +func NewWakuStore(p MessageProvider, timesource timesource.Timesource, log *zap.Logger) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p - wakuStore.h = host wakuStore.wg = &sync.WaitGroup{} wakuStore.log = log.Named("store") wakuStore.timesource = timesource diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index d49cae3d..b1ee845a 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -14,7 +14,7 @@ import ( func TestStorePersistence(t *testing.T) { db := MemoryDB(t) - s1 := NewWakuStore(nil, db, timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) defaultPubSubTopic := "test" defaultContentTopic := "1" diff --git a/waku/v2/protocol/store/waku_store_protocol.go b/waku/v2/protocol/store/waku_store_protocol.go index a52fa094..4ec2e4ed 100644 --- a/waku/v2/protocol/store/waku_store_protocol.go +++ b/waku/v2/protocol/store/waku_store_protocol.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-msgio/pbio" @@ -85,6 +86,7 @@ type MessageProvider interface { } type Store interface { + SetHost(h host.Host) Start(ctx context.Context) error Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) Find(ctx context.Context, query Query, cb criteriaFN, opts ...HistoryRequestOption) (*wpb.WakuMessage, error) @@ -99,6 +101,11 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) { store.msgProvider = p } +// Sets the host to be able to mount or consume a protocol +func (store *WakuStore) SetHost(h host.Host) { + store.h = h +} + // Start initializes the WakuStore by enabling the protocol and fetching records from a message provider func (store *WakuStore) Start(ctx context.Context) error { if store.started { diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index c242e94c..92412f6d 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -22,7 +22,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -43,7 +44,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) defer s2.Stop() @@ -71,7 +73,8 @@ func TestWakuStoreProtocolLocalQuery(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -111,7 +114,8 @@ func TestWakuStoreProtocolNext(t *testing.T) { require.NoError(t, err) db := MemoryDB(t) - s1 := NewWakuStore(host1, db, timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -137,7 +141,8 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) require.NoError(t, err) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) defer s2.Stop() @@ -181,7 +186,8 @@ func TestWakuStoreResult(t *testing.T) { require.NoError(t, err) db := MemoryDB(t) - s1 := NewWakuStore(host1, db, timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(db, timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) @@ -207,7 +213,8 @@ func TestWakuStoreResult(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) require.NoError(t, err) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) defer s2.Stop() @@ -266,7 +273,8 @@ func TestWakuStoreProtocolFind(t *testing.T) { host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s1.SetHost(host1) err = s1.Start(ctx) require.NoError(t, err) defer s1.Stop() @@ -301,7 +309,8 @@ func TestWakuStoreProtocolFind(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), StoreID_v20beta4) require.NoError(t, err) - s2 := NewWakuStore(host2, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2 := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s2.SetHost(host2) err = s2.Start(ctx) require.NoError(t, err) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index f95a4ffa..b90db2bc 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -21,7 +21,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -47,7 +47,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), defaultPubSubTopic)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), defaultPubSubTopic)) @@ -80,7 +80,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -112,7 +112,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic2)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic2)) @@ -134,7 +134,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) _ = s.storeMessage(protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)) _ = s.storeMessage(protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)) @@ -153,7 +153,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -177,7 +177,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) for i := 0; i < 10; i++ { msg := &wpb.WakuMessage{ Payload: []byte{byte(i)}, @@ -203,7 +203,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + s := NewWakuStore(MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) var messages []*wpb.WakuMessage for i := 0; i < 10; i++ { diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 319507c5..72bae0d8 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -43,7 +43,7 @@ type PeerConnector interface { PeerChannel() chan<- peer.AddrInfo } -func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { +func NewRendezvous(enableServer bool, db *DB, discoverPeers bool, rendezvousPoints []peer.ID, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") var rendevousPoints []*rendezvousPoint @@ -54,7 +54,6 @@ func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool } return &Rendezvous{ - host: host, enableServer: enableServer, db: db, discoverPeers: discoverPeers, @@ -64,6 +63,11 @@ func NewRendezvous(host host.Host, enableServer bool, db *DB, discoverPeers bool } } +// Sets the host to be able to mount or consume a protocol +func (r *Rendezvous) SetHost(h host.Host) { + r.host = h +} + func (r *Rendezvous) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) r.cancel = cancel diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 0f25a781..0e7b9416 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -45,7 +45,8 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) rdb := NewDB(context.Background(), db, utils.Logger()) - rendezvousPoint := NewRendezvous(host1, true, rdb, false, nil, nil, utils.Logger()) + rendezvousPoint := NewRendezvous(true, rdb, false, nil, nil, utils.Logger()) + rendezvousPoint.SetHost(host1) err = rendezvousPoint.Start(context.Background()) require.NoError(t, err) defer rendezvousPoint.Stop() @@ -65,7 +66,8 @@ func TestRendezvous(t *testing.T) { err = host2.Peerstore().AddProtocols(info.ID, RendezvousID) require.NoError(t, err) - rendezvousClient1 := NewRendezvous(host2, false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger()) + rendezvousClient1 := NewRendezvous(false, nil, false, []peer.ID{host1.ID()}, nil, utils.Logger()) + rendezvousClient1.SetHost(host2) err = rendezvousClient1.Start(context.Background()) require.NoError(t, err) defer rendezvousClient1.Stop() @@ -81,7 +83,8 @@ func TestRendezvous(t *testing.T) { myPeerConnector := NewPeerConn() - rendezvousClient2 := NewRendezvous(host3, false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) + rendezvousClient2 := NewRendezvous(false, nil, true, []peer.ID{host1.ID()}, myPeerConnector, utils.Logger()) + rendezvousClient2.SetHost(host3) err = rendezvousClient2.Start(context.Background()) require.NoError(t, err) defer rendezvousClient2.Stop() diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index d735c52e..513d1bbf 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -34,7 +34,8 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay := relay.NewWakuRelay(host, nil, 0, timesource.NewDefaultClock(), utils.Logger()) + relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), utils.Logger()) + relay.SetHost(host) err = relay.Start(context.Background()) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 3469b538..5fbbc897 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -52,7 +52,8 @@ func TestFilterSubscription(t *testing.T) { b := v2.NewBroadcaster(10) require.NoError(t, b.Start(context.Background())) - node := relay.NewWakuRelay(host, b, 0, timesource.NewDefaultClock(), utils.Logger()) + node := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), utils.Logger()) + node.SetHost(host) err = node.Start(context.Background()) require.NoError(t, err) @@ -61,7 +62,8 @@ func TestFilterSubscription(t *testing.T) { b2 := v2.NewBroadcaster(10) require.NoError(t, b2.Start(context.Background())) - f := legacy_filter.NewWakuFilter(host, b2, false, timesource.NewDefaultClock(), utils.Logger()) + f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), utils.Logger()) + f.SetHost(host) err = f.Start(context.Background()) require.NoError(t, err)